__package__ = 'archivebox.index' import os from pathlib import Path from datetime import datetime, timedelta from typing import List, Dict, Any, Optional, Union from dataclasses import dataclass, asdict, field, fields from ..system import get_dir_size from ..config import OUTPUT_DIR, ARCHIVE_DIR_NAME class ArchiveError(Exception): def __init__(self, message, hints=None): super().__init__(message) self.hints = hints LinkDict = Dict[str, Any] ArchiveOutput = Union[str, Exception, None] @dataclass(frozen=True) class ArchiveResult: cmd: List[str] pwd: Optional[str] cmd_version: Optional[str] output: ArchiveOutput status: str start_ts: datetime end_ts: datetime schema: str = 'ArchiveResult' def __post_init__(self): self.typecheck() def _asdict(self): return asdict(self) def typecheck(self) -> None: assert self.schema == self.__class__.__name__ assert isinstance(self.status, str) and self.status assert isinstance(self.start_ts, datetime) assert isinstance(self.end_ts, datetime) assert isinstance(self.cmd, list) assert all(isinstance(arg, str) and arg for arg in self.cmd) assert self.pwd is None or isinstance(self.pwd, str) and self.pwd assert self.cmd_version is None or isinstance(self.cmd_version, str) and self.cmd_version assert self.output is None or isinstance(self.output, (str, Exception)) if isinstance(self.output, str): assert self.output @classmethod def guess_ts(_cls, dict_info): from ..util import parse_date parsed_timestamp = parse_date(dict_info["timestamp"]) start_ts = parsed_timestamp end_ts = parsed_timestamp + timedelta(seconds=int(dict_info["duration"])) return start_ts, end_ts @classmethod def from_json(cls, json_info, guess=False): from ..util import parse_date info = { key: val for key, val in json_info.items() if key in cls.field_names() } if guess: keys = info.keys() if "start_ts" not in keys: info["start_ts"], info["end_ts"] = cls.guess_ts(json_info) else: info['start_ts'] = parse_date(info['start_ts']) info['end_ts'] = parse_date(info['end_ts']) if "pwd" not in keys: info["pwd"] = str(Path(OUTPUT_DIR) / ARCHIVE_DIR_NAME / json_info["timestamp"]) if "cmd_version" not in keys: info["cmd_version"] = "Undefined" if "cmd" not in keys: info["cmd"] = [] else: info['start_ts'] = parse_date(info['start_ts']) info['end_ts'] = parse_date(info['end_ts']) info['cmd_version'] = info.get('cmd_version') if type(info["cmd"]) is str: info["cmd"] = [info["cmd"]] return cls(**info) def to_dict(self, *keys) -> dict: if keys: return {k: v for k, v in asdict(self).items() if k in keys} return asdict(self) def to_json(self, indent=4, sort_keys=True) -> str: from .json import to_json return to_json(self, indent=indent, sort_keys=sort_keys) def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str: from .csv import to_csv return to_csv(self, csv_col=cols or self.field_names(), separator=separator, ljust=ljust) @classmethod def field_names(cls): return [f.name for f in fields(cls)] @property def duration(self) -> int: return (self.end_ts - self.start_ts).seconds @dataclass(frozen=True) class Link: timestamp: str url: str title: Optional[str] tags: Optional[str] sources: List[str] history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {}) updated: Optional[datetime] = None schema: str = 'Link' def __str__(self) -> str: return f'[{self.timestamp}] {self.url} "{self.title}"' def __post_init__(self): self.typecheck() def overwrite(self, **kwargs): """pure functional version of dict.update that returns a new instance""" return Link(**{**self._asdict(), **kwargs}) def __eq__(self, other): if not isinstance(other, Link): return NotImplemented return self.url == other.url def __gt__(self, other): if not isinstance(other, Link): return NotImplemented if not self.timestamp or not other.timestamp: return return float(self.timestamp) > float(other.timestamp) def typecheck(self) -> None: from ..config import stderr, ANSI try: assert self.schema == self.__class__.__name__ assert isinstance(self.timestamp, str) and self.timestamp assert self.timestamp.replace('.', '').isdigit() assert isinstance(self.url, str) and '://' in self.url assert self.updated is None or isinstance(self.updated, datetime) assert self.title is None or (isinstance(self.title, str) and self.title) #for tag in self.tags.all(): # assert tag is None or isinstance(tag, TaggedItem) assert isinstance(self.sources, list) assert all(isinstance(source, str) and source for source in self.sources) assert isinstance(self.history, dict) for method, results in self.history.items(): assert isinstance(method, str) and method assert isinstance(results, list) assert all(isinstance(result, ArchiveResult) for result in results) except Exception: stderr('{red}[X] Error while loading link! [{}] {} "{}"{reset}'.format(self.timestamp, self.url, self.title, **ANSI)) raise def _asdict(self, extended=False): info = { 'schema': 'Link', 'url': self.url, 'title': self.title or None, 'timestamp': self.timestamp, 'updated': self.updated or None, 'tags': self.tags or None, 'sources': self.sources or [], 'history': self.history or {}, } if extended: info.update({ 'link_dir': self.link_dir, 'archive_path': self.archive_path, 'hash': self.url_hash, 'base_url': self.base_url, 'scheme': self.scheme, 'domain': self.domain, 'path': self.path, 'basename': self.basename, 'extension': self.extension, 'is_static': self.is_static, 'bookmarked_date': self.bookmarked_date, 'updated_date': self.updated_date, 'oldest_archive_date': self.oldest_archive_date, 'newest_archive_date': self.newest_archive_date, 'is_archived': self.is_archived, 'num_outputs': self.num_outputs, 'num_failures': self.num_failures, 'latest': self.latest_outputs(), 'canonical': self.canonical_outputs(), }) return info @classmethod def from_json(cls, json_info, guess=False): from ..util import parse_date info = { key: val for key, val in json_info.items() if key in cls.field_names() } info['updated'] = parse_date(info.get('updated')) info['sources'] = info.get('sources') or [] json_history = info.get('history') or {} cast_history = {} for method, method_history in json_history.items(): cast_history[method] = [] for json_result in method_history: assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts' cast_result = ArchiveResult.from_json(json_result, guess) cast_history[method].append(cast_result) info['history'] = cast_history return cls(**info) def to_json(self, indent=4, sort_keys=True) -> str: from .json import to_json return to_json(self, indent=indent, sort_keys=sort_keys) def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str: from .csv import to_csv return to_csv(self, cols=cols or self.field_names(), separator=separator, ljust=ljust) @classmethod def field_names(cls): return [f.name for f in fields(cls)] @property def link_dir(self) -> str: from ..config import CONFIG return os.path.join(CONFIG['ARCHIVE_DIR'], self.timestamp) @property def archive_path(self) -> str: from ..config import ARCHIVE_DIR_NAME return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp) @property def archive_size(self) -> float: try: return get_dir_size(self.archive_path)[0] except Exception: return 0 ### URL Helpers @property def url_hash(self): from ..util import hashurl return hashurl(self.url) @property def scheme(self) -> str: from ..util import scheme return scheme(self.url) @property def extension(self) -> str: from ..util import extension return extension(self.url) @property def domain(self) -> str: from ..util import domain return domain(self.url) @property def path(self) -> str: from ..util import path return path(self.url) @property def basename(self) -> str: from ..util import basename return basename(self.url) @property def base_url(self) -> str: from ..util import base_url return base_url(self.url) ### Pretty Printing Helpers @property def bookmarked_date(self) -> Optional[str]: from ..util import ts_to_date max_ts = (datetime.now() + timedelta(days=30)).timestamp() if self.timestamp and self.timestamp.replace('.', '').isdigit(): if 0 < float(self.timestamp) < max_ts: return ts_to_date(datetime.fromtimestamp(float(self.timestamp))) else: return str(self.timestamp) return None @property def updated_date(self) -> Optional[str]: from ..util import ts_to_date return ts_to_date(self.updated) if self.updated else None @property def archive_dates(self) -> List[datetime]: return [ result.start_ts for method in self.history.keys() for result in self.history[method] ] @property def oldest_archive_date(self) -> Optional[datetime]: return min(self.archive_dates, default=None) @property def newest_archive_date(self) -> Optional[datetime]: return max(self.archive_dates, default=None) ### Archive Status Helpers @property def num_outputs(self) -> int: return len(tuple(filter(None, self.latest_outputs().values()))) @property def num_failures(self) -> int: return sum(1 for method in self.history.keys() for result in self.history[method] if result.status == 'failed') @property def is_static(self) -> bool: from ..util import is_static_file return is_static_file(self.url) @property def is_archived(self) -> bool: from ..config import ARCHIVE_DIR from ..util import domain output_paths = ( domain(self.url), 'output.pdf', 'screenshot.png', 'output.html', 'media', 'singlefile.html' ) return any( os.path.exists(os.path.join(ARCHIVE_DIR, self.timestamp, path)) for path in output_paths ) def latest_outputs(self, status: str=None) -> Dict[str, ArchiveOutput]: """get the latest output that each archive method produced for link""" ARCHIVE_METHODS = ( 'title', 'favicon', 'wget', 'warc', 'singlefile', 'pdf', 'screenshot', 'dom', 'git', 'media', 'archive_org', ) latest: Dict[str, ArchiveOutput] = {} for archive_method in ARCHIVE_METHODS: # get most recent succesful result in history for each archive method history = self.history.get(archive_method) or [] history = list(filter(lambda result: result.output, reversed(history))) if status is not None: history = list(filter(lambda result: result.status == status, history)) history = list(history) if history: latest[archive_method] = history[0].output else: latest[archive_method] = None return latest def canonical_outputs(self) -> Dict[str, Optional[str]]: """predict the expected output paths that should be present after archiving""" from ..extractors.wget import wget_output_path canonical = { 'index_path': 'index.html', 'favicon_path': 'favicon.ico', 'google_favicon_path': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain), 'wget_path': wget_output_path(self), 'warc_path': 'warc', 'singlefile_path': 'singlefile.html', 'readability_path': 'readability/content.html', 'mercury_path': 'mercury/content.html', 'pdf_path': 'output.pdf', 'screenshot_path': 'screenshot.png', 'dom_path': 'output.html', 'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url), 'git_path': 'git', 'media_path': 'media', } if self.is_static: # static binary files like PDF and images are handled slightly differently. # they're just downloaded once and aren't archived separately multiple times, # so the wget, screenshot, & pdf urls should all point to the same file static_path = wget_output_path(self) canonical.update({ 'title': self.basename, 'wget_path': static_path, 'pdf_path': static_path, 'screenshot_path': static_path, 'dom_path': static_path, 'singlefile_path': static_path, 'readability_path': static_path, 'mercury_path': static_path, }) return canonical