1
0
Fork 0
mirror of synced 2024-06-28 19:10:33 +12:00

refactor: readability uses snapshot instead of link

This commit is contained in:
Cristian 2020-12-29 13:55:08 -05:00
parent 6230984cb3
commit 5cf9ca0e2c
3 changed files with 64 additions and 48 deletions

View file

@ -2,6 +2,7 @@ __package__ = 'archivebox.core'
import uuid import uuid
from pathlib import Path from pathlib import Path
from typing import Dict, Optional
from django.db import models, transaction from django.db import models, transaction
from django.utils.functional import cached_property from django.utils.functional import cached_property
@ -26,7 +27,6 @@ except AttributeError:
import jsonfield import jsonfield
JSONField = jsonfield.JSONField JSONField = jsonfield.JSONField
class Tag(models.Model): class Tag(models.Model):
""" """
Based on django-taggit model Based on django-taggit model
@ -162,6 +162,56 @@ class Snapshot(models.Model):
return self.history['title'][-1].output.strip() return self.history['title'][-1].output.strip()
return None return None
@cached_property
def domain(self) -> str:
from ..util import domain
return domain(self.url)
@cached_property
def is_static(self) -> bool:
from ..util import is_static_file
return is_static_file(self.url)
def canonical_outputs(self) -> Dict[str, Optional[str]]:
"""predict the expected output paths that should be present after archiving"""
from ..extractors.wget import wget_output_path
canonical = {
'index_path': 'index.html',
'favicon_path': 'favicon.ico',
'google_favicon_path': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain),
'wget_path': wget_output_path(self),
'warc_path': 'warc',
'singlefile_path': 'singlefile.html',
'readability_path': 'readability/content.html',
'mercury_path': 'mercury/content.html',
'pdf_path': 'output.pdf',
'screenshot_path': 'screenshot.png',
'dom_path': 'output.html',
'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url),
'git_path': 'git',
'media_path': 'media',
}
if self.is_static:
# static binary files like PDF and images are handled slightly differently.
# they're just downloaded once and aren't archived separately multiple times,
# so the wget, screenshot, & pdf urls should all point to the same file
static_path = wget_output_path(self)
canonical.update({
'title': self.basename,
'wget_path': static_path,
'pdf_path': static_path,
'screenshot_path': static_path,
'dom_path': static_path,
'singlefile_path': static_path,
'readability_path': static_path,
'mercury_path': static_path,
})
return canonical
def _asdict(self): def _asdict(self):
return { return {
"id": str(self.id), "id": str(self.id),

View file

@ -6,6 +6,8 @@ from tempfile import NamedTemporaryFile
from typing import Optional from typing import Optional
import json import json
from django.db.models import Model
from ..index.schema import Link, ArchiveResult, ArchiveError from ..index.schema import Link, ArchiveResult, ArchiveError
from ..system import run, atomic_write from ..system import run, atomic_write
from ..util import ( from ..util import (
@ -24,12 +26,12 @@ from ..config import (
from ..logging_util import TimedProgress from ..logging_util import TimedProgress
@enforce_types @enforce_types
def get_html(link: Link, path: Path) -> str: def get_html(snapshot: Model, path: Path) -> str:
""" """
Try to find wget, singlefile and then dom files. Try to find wget, singlefile and then dom files.
If none is found, download the url again. If none is found, download the url again.
""" """
canonical = link.canonical_outputs() canonical = snapshot.canonical_outputs()
abs_path = path.absolute() abs_path = path.absolute()
sources = [canonical["singlefile_path"], canonical["wget_path"], canonical["dom_path"]] sources = [canonical["singlefile_path"], canonical["wget_path"], canonical["dom_path"]]
document = None document = None
@ -41,25 +43,25 @@ def get_html(link: Link, path: Path) -> str:
except (FileNotFoundError, TypeError): except (FileNotFoundError, TypeError):
continue continue
if document is None: if document is None:
return download_url(link.url) return download_url(snapshot.url)
else: else:
return document return document
@enforce_types @enforce_types
def should_save_readability(link: Link, out_dir: Optional[str]=None) -> bool: def should_save_readability(snapshot: Model, out_dir: Optional[str]=None) -> bool:
out_dir = out_dir or link.link_dir out_dir = out_dir or snapshot.link_dir
if is_static_file(link.url): if is_static_file(snapshot.url):
return False return False
output = Path(out_dir or link.link_dir) / 'readability' output = Path(out_dir or snapshot.snapshot_dir) / 'readability'
return SAVE_READABILITY and READABILITY_VERSION and (not output.exists()) return SAVE_READABILITY and READABILITY_VERSION and (not output.exists())
@enforce_types @enforce_types
def save_readability(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult: def save_readability(snapshot: Model, out_dir: Optional[str]=None, timeout: int=TIMEOUT) -> ArchiveResult:
"""download reader friendly version using @mozilla/readability""" """download reader friendly version using @mozilla/readability"""
out_dir = Path(out_dir or link.link_dir) out_dir = Path(out_dir or snapshot.snapshot_dir)
output_folder = out_dir.absolute() / "readability" output_folder = out_dir.absolute() / "readability"
output = str(output_folder) output = str(output_folder)
@ -69,12 +71,12 @@ def save_readability(link: Link, out_dir: Optional[str]=None, timeout: int=TIMEO
# fake command to show the user so they have something to try debugging if get_html fails # fake command to show the user so they have something to try debugging if get_html fails
cmd = [ cmd = [
CURL_BINARY, CURL_BINARY,
link.url snapshot.url
] ]
readability_content = None readability_content = None
timer = TimedProgress(timeout, prefix=' ') timer = TimedProgress(timeout, prefix=' ')
try: try:
document = get_html(link, out_dir) document = get_html(snapshot, out_dir)
temp_doc = NamedTemporaryFile(delete=False) temp_doc = NamedTemporaryFile(delete=False)
temp_doc.write(document.encode("utf-8")) temp_doc.write(document.encode("utf-8"))
temp_doc.close() temp_doc.close()

View file

@ -408,41 +408,5 @@ class Link:
return latest return latest
def canonical_outputs(self) -> Dict[str, Optional[str]]:
"""predict the expected output paths that should be present after archiving"""
from ..extractors.wget import wget_output_path
canonical = {
'index_path': 'index.html',
'favicon_path': 'favicon.ico',
'google_favicon_path': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain),
'wget_path': wget_output_path(self),
'warc_path': 'warc',
'singlefile_path': 'singlefile.html',
'readability_path': 'readability/content.html',
'mercury_path': 'mercury/content.html',
'pdf_path': 'output.pdf',
'screenshot_path': 'screenshot.png',
'dom_path': 'output.html',
'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url),
'git_path': 'git',
'media_path': 'media',
}
if self.is_static:
# static binary files like PDF and images are handled slightly differently.
# they're just downloaded once and aren't archived separately multiple times,
# so the wget, screenshot, & pdf urls should all point to the same file
static_path = wget_output_path(self)
canonical.update({
'title': self.basename,
'wget_path': static_path,
'pdf_path': static_path,
'screenshot_path': static_path,
'dom_path': static_path,
'singlefile_path': static_path,
'readability_path': static_path,
'mercury_path': static_path,
})
return canonical