diff --git a/archivebox/archive.py b/archivebox/archive.py
index 46ada292..c6e10bd2 100755
--- a/archivebox/archive.py
+++ b/archivebox/archive.py
@@ -12,14 +12,13 @@ Usage & Documentation:
import os
import sys
-from typing import List
+from typing import List, Optional
from schema import Link
from links import links_after_timestamp
from index import write_links_index, load_links_index
from archive_methods import archive_link
from config import (
- ARCHIVE_DIR,
ONLY_NEW,
OUTPUT_DIR,
GIT_SHA,
@@ -109,19 +108,19 @@ def update_archive_data(import_path: str=None, resume: float=None) -> List[Link]
all_links, new_links = load_links_index(out_dir=OUTPUT_DIR, import_path=import_path)
# Step 2: Write updated index with deduped old and new links back to disk
- write_links_index(out_dir=OUTPUT_DIR, links=all_links)
+ write_links_index(out_dir=OUTPUT_DIR, links=list(all_links))
# Step 3: Run the archive methods for each link
links = new_links if ONLY_NEW else all_links
log_archiving_started(len(links), resume)
- idx, link = 0, {'timestamp': 0}
+ idx: int = 0
+ link: Optional[Link] = None
try:
for idx, link in enumerate(links_after_timestamp(links, resume)):
- link_dir = os.path.join(ARCHIVE_DIR, link['timestamp'])
- archive_link(link_dir, link)
+ archive_link(link)
except KeyboardInterrupt:
- log_archiving_paused(len(links), idx, link['timestamp'])
+ log_archiving_paused(len(links), idx, link.timestamp if link else '0')
raise SystemExit(0)
except:
@@ -132,7 +131,7 @@ def update_archive_data(import_path: str=None, resume: float=None) -> List[Link]
# Step 4: Re-write links index with updated titles, icons, and resources
all_links, _ = load_links_index(out_dir=OUTPUT_DIR)
- write_links_index(out_dir=OUTPUT_DIR, links=all_links, finished=True)
+ write_links_index(out_dir=OUTPUT_DIR, links=list(all_links), finished=True)
return all_links
if __name__ == '__main__':
diff --git a/archivebox/archive_methods.py b/archivebox/archive_methods.py
index e214a909..76153e70 100644
--- a/archivebox/archive_methods.py
+++ b/archivebox/archive_methods.py
@@ -52,7 +52,6 @@ from util import (
chmod_file,
wget_output_path,
chrome_args,
- check_link_structure,
run, PIPE, DEVNULL,
Link,
)
@@ -64,9 +63,7 @@ from logs import (
)
-
-
-def archive_link(link_dir: str, link: Link, page=None) -> Link:
+def archive_link(link: Link, page=None) -> Link:
"""download the DOM, PDF, and a screenshot into a folder named after the link's timestamp"""
ARCHIVE_METHODS = (
@@ -82,24 +79,24 @@ def archive_link(link_dir: str, link: Link, page=None) -> Link:
)
try:
- is_new = not os.path.exists(link_dir)
+ is_new = not os.path.exists(link.link_dir)
if is_new:
- os.makedirs(link_dir)
+ os.makedirs(link.link_dir)
- link = load_json_link_index(link_dir, link)
- log_link_archiving_started(link_dir, link, is_new)
+ link = load_json_link_index(link.link_dir, link)
+ log_link_archiving_started(link.link_dir, link, is_new)
stats = {'skipped': 0, 'succeeded': 0, 'failed': 0}
for method_name, should_run, method_function in ARCHIVE_METHODS:
- if method_name not in link['history']:
- link['history'][method_name] = []
+ if method_name not in link.history:
+ link.history[method_name] = []
- if should_run(link_dir, link):
+ if should_run(link.link_dir, link):
log_archive_method_started(method_name)
- result = method_function(link_dir, link)
+ result = method_function(link.link_dir, link)
- link['history'][method_name].append(result._asdict())
+ link.history[method_name].append(result)
stats[result.status] += 1
log_archive_method_finished(result)
@@ -108,14 +105,22 @@ def archive_link(link_dir: str, link: Link, page=None) -> Link:
# print(' ', stats)
- write_link_index(link_dir, link)
+ link = Link(**{
+ **link._asdict(),
+ 'updated': datetime.now(),
+ })
+
+ write_link_index(link.link_dir, link)
patch_links_index(link)
- log_link_archiving_finished(link_dir, link, is_new, stats)
+ log_link_archiving_finished(link.link_dir, link, is_new, stats)
+
+ except KeyboardInterrupt:
+ raise
except Exception as err:
print(' ! Failed to archive link: {}: {}'.format(err.__class__.__name__, err))
raise
-
+
return link
@@ -123,10 +128,10 @@ def archive_link(link_dir: str, link: Link, page=None) -> Link:
def should_fetch_title(link_dir: str, link: Link) -> bool:
# if link already has valid title, skip it
- if link['title'] and not link['title'].lower().startswith('http'):
+ if link.title and not link.title.lower().startswith('http'):
return False
- if is_static_file(link['url']):
+ if is_static_file(link.url):
return False
return FETCH_TITLE
@@ -137,7 +142,7 @@ def fetch_title(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResul
output = None
cmd = [
CURL_BINARY,
- link['url'],
+ link.url,
'|',
'grep',
'
',
@@ -145,7 +150,7 @@ def fetch_title(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResul
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
- output = fetch_page_title(link['url'], timeout=timeout, progress=False)
+ output = fetch_page_title(link.url, timeout=timeout, progress=False)
if not output:
raise ArchiveError('Unable to detect page title')
except Exception as err:
@@ -180,7 +185,7 @@ def fetch_favicon(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveRes
'--location',
'--output', output,
*(() if CHECK_SSL_VALIDITY else ('--insecure',)),
- 'https://www.google.com/s2/favicons?domain={}'.format(domain(link['url'])),
+ 'https://www.google.com/s2/favicons?domain={}'.format(domain(link.url)),
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@@ -240,7 +245,7 @@ def fetch_wget(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult
*(('--user-agent={}'.format(WGET_USER_AGENT),) if WGET_USER_AGENT else ()),
*(('--load-cookies', COOKIES_FILE) if COOKIES_FILE else ()),
*((() if CHECK_SSL_VALIDITY else ('--no-check-certificate', '--no-hsts'))),
- link['url'],
+ link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@@ -290,7 +295,7 @@ def fetch_wget(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult
)
def should_fetch_pdf(link_dir: str, link: Link) -> bool:
- if is_static_file(link['url']):
+ if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'output.pdf')):
@@ -306,7 +311,7 @@ def fetch_pdf(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
cmd = [
*chrome_args(TIMEOUT=timeout),
'--print-to-pdf',
- link['url'],
+ link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@@ -334,7 +339,7 @@ def fetch_pdf(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
)
def should_fetch_screenshot(link_dir: str, link: Link) -> bool:
- if is_static_file(link['url']):
+ if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'screenshot.png')):
@@ -349,7 +354,7 @@ def fetch_screenshot(link_dir: str, link: Link, timeout: int=TIMEOUT) -> Archive
cmd = [
*chrome_args(TIMEOUT=timeout),
'--screenshot',
- link['url'],
+ link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@@ -377,7 +382,7 @@ def fetch_screenshot(link_dir: str, link: Link, timeout: int=TIMEOUT) -> Archive
)
def should_fetch_dom(link_dir: str, link: Link) -> bool:
- if is_static_file(link['url']):
+ if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'output.html')):
@@ -393,7 +398,7 @@ def fetch_dom(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
cmd = [
*chrome_args(TIMEOUT=timeout),
'--dump-dom',
- link['url']
+ link.url
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@@ -422,15 +427,15 @@ def fetch_dom(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
)
def should_fetch_git(link_dir: str, link: Link) -> bool:
- if is_static_file(link['url']):
+ if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'git')):
return False
is_clonable_url = (
- (domain(link['url']) in GIT_DOMAINS)
- or (extension(link['url']) == 'git')
+ (domain(link.url) in GIT_DOMAINS)
+ or (extension(link.url) == 'git')
)
if not is_clonable_url:
return False
@@ -450,7 +455,7 @@ def fetch_git(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
'--mirror',
'--recursive',
*(() if CHECK_SSL_VALIDITY else ('-c', 'http.sslVerify=false')),
- without_query(without_fragment(link['url'])),
+ without_query(without_fragment(link.url)),
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@@ -481,7 +486,7 @@ def fetch_git(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
def should_fetch_media(link_dir: str, link: Link) -> bool:
- if is_static_file(link['url']):
+ if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'media')):
@@ -515,7 +520,7 @@ def fetch_media(link_dir: str, link: Link, timeout: int=MEDIA_TIMEOUT) -> Archiv
'--embed-thumbnail',
'--add-metadata',
*(() if CHECK_SSL_VALIDITY else ('--no-check-certificate',)),
- link['url'],
+ link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@@ -553,7 +558,7 @@ def fetch_media(link_dir: str, link: Link, timeout: int=MEDIA_TIMEOUT) -> Archiv
def should_fetch_archive_dot_org(link_dir: str, link: Link) -> bool:
- if is_static_file(link['url']):
+ if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'archive.org.txt')):
@@ -567,7 +572,7 @@ def archive_dot_org(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveR
output = 'archive.org.txt'
archive_org_url = None
- submit_url = 'https://web.archive.org/save/{}'.format(link['url'])
+ submit_url = 'https://web.archive.org/save/{}'.format(link.url)
cmd = [
CURL_BINARY,
'--location',
@@ -586,7 +591,7 @@ def archive_dot_org(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveR
archive_org_url = 'https://web.archive.org{}'.format(content_location[0])
elif len(errors) == 1 and 'RobotAccessControlException' in errors[0]:
archive_org_url = None
- # raise ArchiveError('Archive.org denied by {}/robots.txt'.format(domain(link['url'])))
+ # raise ArchiveError('Archive.org denied by {}/robots.txt'.format(domain(link.url)))
elif errors:
raise ArchiveError(', '.join(errors))
else:
diff --git a/archivebox/config.py b/archivebox/config.py
index 13d64c3a..ec38b367 100644
--- a/archivebox/config.py
+++ b/archivebox/config.py
@@ -1,5 +1,4 @@
import os
-import re
import sys
import shutil
@@ -77,7 +76,7 @@ if COOKIES_FILE:
COOKIES_FILE = os.path.abspath(COOKIES_FILE)
# ******************************************************************************
-# ************************ Environment & Dependencies **************************
+# ***************************** Helper Functions *******************************
# ******************************************************************************
def check_version(binary: str) -> str:
@@ -95,6 +94,7 @@ def check_version(binary: str) -> str:
print('{red}[X] Unable to find a working version of {cmd}, is it installed and in your $PATH?'.format(cmd=binary, **ANSI))
raise SystemExit(1)
+
def find_chrome_binary() -> Optional[str]:
"""find any installed chrome binaries in the default locations"""
# Precedence: Chromium, Chrome, Beta, Canary, Unstable, Dev
@@ -119,6 +119,7 @@ def find_chrome_binary() -> Optional[str]:
print('{red}[X] Unable to find a working version of Chrome/Chromium, is it installed and in your $PATH?'.format(**ANSI))
raise SystemExit(1)
+
def find_chrome_data_dir() -> Optional[str]:
"""find any installed chrome user data directories in the default locations"""
# Precedence: Chromium, Chrome, Beta, Canary, Unstable, Dev
@@ -142,6 +143,7 @@ def find_chrome_data_dir() -> Optional[str]:
return full_path
return None
+
def get_git_version() -> str:
"""get the git commit hash of the python code folder (aka code version)"""
try:
@@ -151,6 +153,10 @@ def get_git_version() -> str:
return 'unknown'
+# ******************************************************************************
+# ************************ Environment & Dependencies **************************
+# ******************************************************************************
+
try:
GIT_SHA = get_git_version()
@@ -188,19 +194,33 @@ try:
print(' Alternatively, run this script with:')
print(' env PYTHONIOENCODING=UTF-8 ./archive.py export.html')
-
### Make sure curl is installed
USE_CURL = FETCH_FAVICON or SUBMIT_ARCHIVE_DOT_ORG
- CURL_VERSION = USE_CURL and check_version(CURL_BINARY)
+ CURL_VERSION = None
+ if USE_CURL:
+ CURL_VERSION = check_version(CURL_BINARY)
### Make sure wget is installed and calculate version
USE_WGET = FETCH_WGET or FETCH_WARC
- WGET_VERSION = USE_WGET and check_version(WGET_BINARY)
+ WGET_VERSION = None
+ if USE_WGET:
+ WGET_VERSION = check_version(WGET_BINARY)
+
WGET_USER_AGENT = WGET_USER_AGENT.format(
GIT_SHA=GIT_SHA[:9],
WGET_VERSION=WGET_VERSION or '',
)
+ ### Make sure git is installed
+ GIT_VERSION = None
+ if FETCH_GIT:
+ GIT_VERSION = check_version(GIT_BINARY)
+
+ ### Make sure youtube-dl is installed
+ YOUTUBEDL_VERSION = None
+ if FETCH_MEDIA:
+ check_version(YOUTUBEDL_BINARY)
+
### Make sure chrome is installed and calculate version
USE_CHROME = FETCH_PDF or FETCH_SCREENSHOT or FETCH_DOM
CHROME_VERSION = None
@@ -214,13 +234,6 @@ try:
CHROME_USER_DATA_DIR = find_chrome_data_dir()
# print('[i] Using Chrome data dir: {}'.format(os.path.abspath(CHROME_USER_DATA_DIR)))
- ### Make sure git is installed
- GIT_VERSION = FETCH_GIT and check_version(GIT_BINARY)
-
- ### Make sure youtube-dl is installed
- YOUTUBEDL_VERSION = FETCH_MEDIA and check_version(YOUTUBEDL_BINARY)
-
- ### Chrome housekeeping options
CHROME_OPTIONS = {
'TIMEOUT': TIMEOUT,
'RESOLUTION': RESOLUTION,
@@ -236,7 +249,6 @@ try:
# 'ignoreHTTPSErrors': not CHECK_SSL_VALIDITY,
# # 'executablePath': CHROME_BINARY,
# }
-
except KeyboardInterrupt:
raise SystemExit(1)
diff --git a/archivebox/index.py b/archivebox/index.py
index 3c31ac84..0a60dd23 100644
--- a/archivebox/index.py
+++ b/archivebox/index.py
@@ -1,9 +1,10 @@
import os
import json
+from itertools import chain
from datetime import datetime
from string import Template
-from typing import List, Tuple
+from typing import List, Tuple, Iterator, Optional
try:
from distutils.dir_util import copy_tree
@@ -11,7 +12,7 @@ except ImportError:
print('[X] Missing "distutils" python package. To install it, run:')
print(' pip install distutils')
-from schema import Link, ArchiveIndex
+from schema import Link, ArchiveIndex, ArchiveResult
from config import (
OUTPUT_DIR,
TEMPLATES_DIR,
@@ -22,11 +23,10 @@ from util import (
chmod_file,
urlencode,
derived_link_info,
+ wget_output_path,
+ ExtendedEncoder,
check_link_structure,
check_links_structure,
- wget_output_path,
- latest_output,
- ExtendedEncoder,
)
from parse import parse_links
from links import validate_links
@@ -47,7 +47,6 @@ def write_links_index(out_dir: str, links: List[Link], finished: bool=False) ->
"""create index.html file for a given list of links"""
log_indexing_process_started()
- check_links_structure(links)
log_indexing_started(out_dir, 'index.json')
write_json_links_index(out_dir, links)
@@ -63,20 +62,17 @@ def load_links_index(out_dir: str=OUTPUT_DIR, import_path: str=None) -> Tuple[Li
existing_links: List[Link] = []
if out_dir:
- existing_links = parse_json_links_index(out_dir)
- check_links_structure(existing_links)
+ existing_links = list(parse_json_links_index(out_dir))
new_links: List[Link] = []
if import_path:
# parse and validate the import file
log_parsing_started(import_path)
raw_links, parser_name = parse_links(import_path)
- new_links = validate_links(raw_links)
- check_links_structure(new_links)
+ new_links = list(validate_links(raw_links))
# merge existing links in out_dir and new links
- all_links = validate_links(existing_links + new_links)
- check_links_structure(all_links)
+ all_links = list(validate_links(existing_links + new_links))
num_new_links = len(all_links) - len(existing_links)
if import_path and parser_name:
@@ -88,7 +84,15 @@ def load_links_index(out_dir: str=OUTPUT_DIR, import_path: str=None) -> Tuple[Li
def write_json_links_index(out_dir: str, links: List[Link]) -> None:
"""write the json link index to a given path"""
- check_links_structure(links)
+ assert isinstance(links, List), 'Links must be a list, not a generator.'
+ assert isinstance(links[0].history, dict)
+ assert isinstance(links[0].sources, list)
+
+ if links[0].history.get('title'):
+ assert isinstance(links[0].history['title'][0], ArchiveResult)
+
+ if links[0].sources:
+ assert isinstance(links[0].sources[0], str)
path = os.path.join(out_dir, 'index.json')
@@ -98,7 +102,7 @@ def write_json_links_index(out_dir: str, links: List[Link]) -> None:
docs='https://github.com/pirate/ArchiveBox/wiki',
version=GIT_SHA,
num_links=len(links),
- updated=str(datetime.now().timestamp()),
+ updated=datetime.now(),
links=links,
)
@@ -110,23 +114,23 @@ def write_json_links_index(out_dir: str, links: List[Link]) -> None:
chmod_file(path)
-def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> List[Link]:
+def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
"""parse a archive index json file and return the list of links"""
+
index_path = os.path.join(out_dir, 'index.json')
if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f:
links = json.load(f)['links']
check_links_structure(links)
- return links
+ for link in links:
+ yield Link(**link)
- return []
+ return ()
def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False) -> None:
"""write the html link index to a given path"""
- check_links_structure(links)
-
path = os.path.join(out_dir, 'index.html')
copy_tree(os.path.join(TEMPLATES_DIR, 'static'), os.path.join(out_dir, 'static'))
@@ -140,24 +144,22 @@ def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False
with open(os.path.join(TEMPLATES_DIR, 'index_row.html'), 'r', encoding='utf-8') as f:
link_row_html = f.read()
- full_links_info = (derived_link_info(link) for link in links)
-
link_rows = '\n'.join(
Template(link_row_html).substitute(**{
- **link,
+ **derived_link_info(link),
'title': (
- link['title']
- or (link['base_url'] if link['is_archived'] else TITLE_LOADING_MSG)
+ link.title
+ or (link.base_url if link.is_archived else TITLE_LOADING_MSG)
),
'favicon_url': (
- os.path.join('archive', link['timestamp'], 'favicon.ico')
+ os.path.join('archive', link.timestamp, 'favicon.ico')
# if link['is_archived'] else 'data:image/gif;base64,R0lGODlhAQABAAD/ACwAAAAAAQABAAACADs='
),
'archive_url': urlencode(
wget_output_path(link) or 'index.html'
),
})
- for link in full_links_info
+ for link in links
)
template_vars = {
@@ -180,28 +182,33 @@ def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False
def patch_links_index(link: Link, out_dir: str=OUTPUT_DIR) -> None:
"""hack to in-place update one row's info in the generated index html"""
- title = link['title'] or latest_output(link)['title']
- successful = len(tuple(filter(None, latest_output(link).values())))
+ title = link.title or link.latest_outputs()['title']
+ successful = link.num_outputs
# Patch JSON index
changed = False
json_file_links = parse_json_links_index(out_dir)
+ patched_links = []
for saved_link in json_file_links:
- if saved_link['url'] == link['url']:
- saved_link['title'] = title
- saved_link['history'] = link['history']
- changed = True
- break
- if changed:
- write_json_links_index(out_dir, json_file_links)
+ if saved_link.url == link.url:
+ patched_links.append(Link(**{
+ **saved_link._asdict(),
+ 'title': title,
+ 'history': link.history,
+ 'updated': link.updated,
+ }))
+ else:
+ patched_links.append(saved_link)
+
+ write_json_links_index(out_dir, patched_links)
# Patch HTML index
html_path = os.path.join(out_dir, 'index.html')
html = open(html_path, 'r').read().split('\n')
for idx, line in enumerate(html):
- if title and ('{}'.format(title)
- elif successful and ('{}'.format(successful)
break
@@ -212,7 +219,6 @@ def patch_links_index(link: Link, out_dir: str=OUTPUT_DIR) -> None:
### Individual link index
def write_link_index(out_dir: str, link: Link) -> None:
- link['updated'] = str(datetime.now().timestamp())
write_json_link_index(out_dir, link)
write_html_link_index(out_dir, link)
@@ -220,66 +226,58 @@ def write_link_index(out_dir: str, link: Link) -> None:
def write_json_link_index(out_dir: str, link: Link) -> None:
"""write a json file with some info about the link"""
- check_link_structure(link)
path = os.path.join(out_dir, 'index.json')
with open(path, 'w', encoding='utf-8') as f:
- json.dump(link, f, indent=4, cls=ExtendedEncoder)
+ json.dump(link._asdict(), f, indent=4, cls=ExtendedEncoder)
chmod_file(path)
-def parse_json_link_index(out_dir: str) -> dict:
+def parse_json_link_index(out_dir: str) -> Optional[Link]:
"""load the json link index from a given directory"""
existing_index = os.path.join(out_dir, 'index.json')
if os.path.exists(existing_index):
with open(existing_index, 'r', encoding='utf-8') as f:
link_json = json.load(f)
check_link_structure(link_json)
- return link_json
- return {}
+ return Link(**link_json)
+ return None
def load_json_link_index(out_dir: str, link: Link) -> Link:
"""check for an existing link archive in the given directory,
and load+merge it into the given link dict
"""
- link = {
- **parse_json_link_index(out_dir),
- **link,
- }
- link.update({
- 'history': link.get('history') or {},
- })
- check_link_structure(link)
- return link
+ existing_link = parse_json_link_index(out_dir)
+ existing_link = existing_link._asdict() if existing_link else {}
+ new_link = link._asdict()
+
+ return Link(**{**existing_link, **new_link})
def write_html_link_index(out_dir: str, link: Link) -> None:
- check_link_structure(link)
with open(os.path.join(TEMPLATES_DIR, 'link_index.html'), 'r', encoding='utf-8') as f:
link_html = f.read()
path = os.path.join(out_dir, 'index.html')
- link = derived_link_info(link)
-
with open(path, 'w', encoding='utf-8') as f:
f.write(Template(link_html).substitute({
- **link,
+ **derived_link_info(link),
'title': (
- link['title']
- or (link['base_url'] if link['is_archived'] else TITLE_LOADING_MSG)
+ link.title
+ or (link.base_url if link.is_archived else TITLE_LOADING_MSG)
),
'archive_url': urlencode(
wget_output_path(link)
- or (link['domain'] if link['is_archived'] else 'about:blank')
+ or (link.domain if link.is_archived else 'about:blank')
),
- 'extension': link['extension'] or 'html',
- 'tags': link['tags'].strip() or 'untagged',
- 'status': 'Archived' if link['is_archived'] else 'Not yet archived',
- 'status_color': 'success' if link['is_archived'] else 'danger',
+ 'extension': link.extension or 'html',
+ 'tags': link.tags or 'untagged',
+ 'status': 'Archived' if link.is_archived else 'Not yet archived',
+ 'status_color': 'success' if link.is_archived else 'danger',
}))
chmod_file(path)
diff --git a/archivebox/links.py b/archivebox/links.py
index 41aceebc..4692943c 100644
--- a/archivebox/links.py
+++ b/archivebox/links.py
@@ -11,7 +11,7 @@ Link {
sources: [str],
history: {
pdf: [
- {start_ts, end_ts, duration, cmd, pwd, status, output},
+ {start_ts, end_ts, cmd, pwd, cmd_version, status, output},
...
],
...
@@ -19,41 +19,36 @@ Link {
}
"""
-from typing import List, Iterable
+from typing import Iterable
from collections import OrderedDict
from schema import Link
from util import (
+ scheme,
+ fuzzy_url,
merge_links,
- check_link_structure,
- check_links_structure,
htmldecode,
+ hashurl,
)
-def validate_links(links: Iterable[Link]) -> List[Link]:
- check_links_structure(links)
+def validate_links(links: Iterable[Link]) -> Iterable[Link]:
links = archivable_links(links) # remove chrome://, about:, mailto: etc.
- links = uniquefied_links(links) # merge/dedupe duplicate timestamps & urls
links = sorted_links(links) # deterministically sort the links based on timstamp, url
+ links = uniquefied_links(links) # merge/dedupe duplicate timestamps & urls
if not links:
print('[X] No links found :(')
raise SystemExit(1)
- for link in links:
- link['title'] = htmldecode(link['title'].strip()) if link['title'] else None
- check_link_structure(link)
-
- return list(links)
-
+ return links
def archivable_links(links: Iterable[Link]) -> Iterable[Link]:
"""remove chrome://, about:// or other schemed links that cant be archived"""
return (
link
for link in links
- if any(link['url'].lower().startswith(s) for s in ('http://', 'https://', 'ftp://'))
+ if scheme(link.url) in ('http', 'https', 'ftp')
)
@@ -64,38 +59,37 @@ def uniquefied_links(sorted_links: Iterable[Link]) -> Iterable[Link]:
unique_urls: OrderedDict[str, Link] = OrderedDict()
- lower = lambda url: url.lower().strip()
- without_www = lambda url: url.replace('://www.', '://', 1)
- without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
-
for link in sorted_links:
- fuzzy_url = without_www(without_trailing_slash(lower(link['url'])))
- if fuzzy_url in unique_urls:
+ fuzzy = fuzzy_url(link.url)
+ if fuzzy in unique_urls:
# merge with any other links that share the same url
- link = merge_links(unique_urls[fuzzy_url], link)
- unique_urls[fuzzy_url] = link
+ link = merge_links(unique_urls[fuzzy], link)
+ unique_urls[fuzzy] = link
unique_timestamps: OrderedDict[str, Link] = OrderedDict()
for link in unique_urls.values():
- link['timestamp'] = lowest_uniq_timestamp(unique_timestamps, link['timestamp'])
- unique_timestamps[link['timestamp']] = link
+ new_link = Link(**{
+ **link._asdict(),
+ 'timestamp': lowest_uniq_timestamp(unique_timestamps, link.timestamp),
+ })
+ unique_timestamps[new_link.timestamp] = new_link
return unique_timestamps.values()
def sorted_links(links: Iterable[Link]) -> Iterable[Link]:
- sort_func = lambda link: (link['timestamp'].split('.', 1)[0], link['url'])
+ sort_func = lambda link: (link.timestamp.split('.', 1)[0], link.url)
return sorted(links, key=sort_func, reverse=True)
-def links_after_timestamp(links: Iterable[Link], timestamp: str=None) -> Iterable[Link]:
- if not timestamp:
+def links_after_timestamp(links: Iterable[Link], resume: float=None) -> Iterable[Link]:
+ if not resume:
yield from links
return
for link in links:
try:
- if float(link['timestamp']) <= float(timestamp):
+ if float(link.timestamp) <= resume:
yield link
except (ValueError, TypeError):
print('Resume value and all timestamp values must be valid numbers.')
diff --git a/archivebox/logs.py b/archivebox/logs.py
index 769257a6..fd1f0bc5 100644
--- a/archivebox/logs.py
+++ b/archivebox/logs.py
@@ -1,6 +1,7 @@
import sys
from datetime import datetime
+from typing import Optional
from schema import Link, ArchiveResult, RuntimeStats
from config import ANSI, REPO_DIR, OUTPUT_DIR
@@ -66,7 +67,7 @@ def log_indexing_finished(out_dir: str, out_file: str):
### Archiving Stage
-def log_archiving_started(num_links: int, resume: float):
+def log_archiving_started(num_links: int, resume: Optional[float]):
start_ts = datetime.now()
_LAST_RUN_STATS.archiving_start_ts = start_ts
if resume:
@@ -132,10 +133,10 @@ def log_link_archiving_started(link_dir: str, link: Link, is_new: bool):
symbol_color=ANSI['green' if is_new else 'black'],
symbol='+' if is_new else '*',
now=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- title=link['title'] or link['url'],
+ title=link.title or link.base_url,
**ANSI,
))
- print(' {blue}{url}{reset}'.format(url=link['url'], **ANSI))
+ print(' {blue}{url}{reset}'.format(url=link.url, **ANSI))
print(' {} {}'.format(
'>' if is_new else '√',
pretty_path(link_dir),
diff --git a/archivebox/parse.py b/archivebox/parse.py
index 3da3cb35..ba200ff3 100644
--- a/archivebox/parse.py
+++ b/archivebox/parse.py
@@ -26,6 +26,7 @@ import xml.etree.ElementTree as etree
from config import TIMEOUT
from util import (
+ htmldecode,
str_between,
URL_REGEX,
check_url_parsing_invariants,
@@ -91,13 +92,13 @@ def parse_pocket_html_export(html_file: IO[str]) -> Iterable[Link]:
tags = match.group(3)
title = match.group(4).replace(' — Readability', '').replace('http://www.readability.com/read?url=', '')
- yield {
- 'url': url,
- 'timestamp': str(time.timestamp()),
- 'title': title or None,
- 'tags': tags or '',
- 'sources': [html_file.name],
- }
+ yield Link(
+ url=url,
+ timestamp=str(time.timestamp()),
+ title=title or None,
+ tags=tags or '',
+ sources=[html_file.name],
+ )
def parse_json_export(json_file: IO[str]) -> Iterable[Link]:
@@ -137,19 +138,19 @@ def parse_json_export(json_file: IO[str]) -> Iterable[Link]:
# Parse the title
title = None
if link.get('title'):
- title = link['title'].strip() or None
+ title = link['title'].strip()
elif link.get('description'):
- title = link['description'].replace(' — Readability', '').strip() or None
+ title = link['description'].replace(' — Readability', '').strip()
elif link.get('name'):
- title = link['name'].strip() or None
+ title = link['name'].strip()
- yield {
- 'url': url,
- 'timestamp': ts_str,
- 'title': title,
- 'tags': link.get('tags') or '',
- 'sources': [json_file.name],
- }
+ yield Link(
+ url=url,
+ timestamp=ts_str,
+ title=htmldecode(title) or None,
+ tags=link.get('tags') or '',
+ sources=[json_file.name],
+ )
def parse_rss_export(rss_file: IO[str]) -> Iterable[Link]:
@@ -178,15 +179,15 @@ def parse_rss_export(rss_file: IO[str]) -> Iterable[Link]:
url = str_between(get_row('link'), '', '')
ts_str = str_between(get_row('pubDate'), '', '')
time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %z")
- title = str_between(get_row('title'), ' Iterable[Link]:
@@ -217,13 +218,13 @@ def parse_shaarli_rss_export(rss_file: IO[str]) -> Iterable[Link]:
ts_str = str_between(get_row('published'), '', '')
time = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")
- yield {
- 'url': url,
- 'timestamp': str(time.timestamp()),
- 'title': title or None,
- 'tags': '',
- 'sources': [rss_file.name],
- }
+ yield Link(
+ url=url,
+ timestamp=str(time.timestamp()),
+ title=htmldecode(title) or None,
+ tags='',
+ sources=[rss_file.name],
+ )
def parse_netscape_html_export(html_file: IO[str]) -> Iterable[Link]:
@@ -239,14 +240,15 @@ def parse_netscape_html_export(html_file: IO[str]) -> Iterable[Link]:
if match:
url = match.group(1)
time = datetime.fromtimestamp(float(match.group(2)))
+ title = match.group(3).strip()
- yield {
- 'url': url,
- 'timestamp': str(time.timestamp()),
- 'title': match.group(3).strip() or None,
- 'tags': '',
- 'sources': [html_file.name],
- }
+ yield Link(
+ url=url,
+ timestamp=str(time.timestamp()),
+ title=htmldecode(title) or None,
+ tags='',
+ sources=[html_file.name],
+ )
def parse_pinboard_rss_export(rss_file: IO[str]) -> Iterable[Link]:
@@ -271,13 +273,13 @@ def parse_pinboard_rss_export(rss_file: IO[str]) -> Iterable[Link]:
else:
time = datetime.now()
- yield {
- 'url': url,
- 'timestamp': str(time.timestamp()),
- 'title': title or None,
- 'tags': tags or '',
- 'sources': [rss_file.name],
- }
+ yield Link(
+ url=url,
+ timestamp=str(time.timestamp()),
+ title=htmldecode(title) or None,
+ tags=tags or '',
+ sources=[rss_file.name],
+ )
def parse_medium_rss_export(rss_file: IO[str]) -> Iterable[Link]:
@@ -292,13 +294,13 @@ def parse_medium_rss_export(rss_file: IO[str]) -> Iterable[Link]:
ts_str = item.find("pubDate").text
time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %Z")
- yield {
- 'url': url,
- 'timestamp': str(time.timestamp()),
- 'title': title or None,
- 'tags': '',
- 'sources': [rss_file.name],
- }
+ yield Link(
+ url=url,
+ timestamp=str(time.timestamp()),
+ title=htmldecode(title) or None,
+ tags='',
+ sources=[rss_file.name],
+ )
def parse_plain_text_export(text_file: IO[str]) -> Iterable[Link]:
@@ -308,10 +310,10 @@ def parse_plain_text_export(text_file: IO[str]) -> Iterable[Link]:
for line in text_file.readlines():
urls = re.findall(URL_REGEX, line) if line.strip() else ()
for url in urls:
- yield {
- 'url': url,
- 'timestamp': str(datetime.now().timestamp()),
- 'title': None,
- 'tags': '',
- 'sources': [text_file.name],
- }
+ yield Link(
+ url=url,
+ timestamp=str(datetime.now().timestamp()),
+ title=None,
+ tags='',
+ sources=[text_file.name],
+ )
diff --git a/archivebox/schema.py b/archivebox/schema.py
index 719298e8..b92d1779 100644
--- a/archivebox/schema.py
+++ b/archivebox/schema.py
@@ -1,11 +1,223 @@
+import os
+
from datetime import datetime
-from typing import List, Dict, Any, Optional, Union, NamedTuple
-from recordclass import RecordClass
+from typing import List, Dict, Any, Optional, Union
-Link = Dict[str, Any]
+from dataclasses import dataclass, asdict, field
-class ArchiveIndex(NamedTuple):
+
+class ArchiveError(Exception):
+ def __init__(self, message, hints=None):
+ super().__init__(message)
+ self.hints = hints
+
+LinkDict = Dict[str, Any]
+
+@dataclass(frozen=True)
+class ArchiveResult:
+ cmd: List[str]
+ pwd: Optional[str]
+ cmd_version: Optional[str]
+ output: Union[str, Exception, None]
+ status: str
+ start_ts: datetime
+ end_ts: datetime
+
+ def _asdict(self):
+ return asdict(self)
+
+ @property
+ def duration(self) -> int:
+ return (self.end_ts - self.start_ts).seconds
+
+@dataclass(frozen=True)
+class Link:
+ timestamp: str
+ url: str
+ title: Optional[str]
+ tags: Optional[str]
+ sources: List[str]
+ history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {})
+ updated: Optional[str] = None
+
+ def __hash__(self):
+ return self.urlhash
+
+ def __eq__(self, other):
+ if not isinstance(other, Link):
+ return NotImplemented
+ return self.urlhash == other.urlhash
+
+ def __gt__(self, other):
+ if not isinstance(other, Link):
+ return NotImplemented
+ if not self.timestamp or not other.timestamp:
+ return
+ return float(self.timestamp) > float(other.timestamp)
+
+ def _asdict(self, extended=False):
+ info = {
+ 'url': self.url,
+ 'title': self.title or None,
+ 'timestamp': self.timestamp,
+ 'updated': self.updated or None,
+ 'tags': self.tags or None,
+ 'sources': self.sources or [],
+ 'history': self.history or {},
+ }
+ if extended:
+ info.update({
+ 'link_dir': self.link_dir,
+ 'archive_path': self.archive_path,
+ 'bookmarked_date': self.bookmarked_date,
+ 'updated_date': self.updated_date,
+ 'domain': self.domain,
+ 'path': self.path,
+ 'basename': self.basename,
+ 'extension': self.extension,
+ 'base_url': self.base_url,
+ 'is_static': self.is_static,
+ 'is_archived': self.is_archived,
+ 'num_outputs': self.num_outputs,
+ })
+ return info
+
+ @property
+ def link_dir(self) -> str:
+ from config import ARCHIVE_DIR
+ return os.path.join(ARCHIVE_DIR, self.timestamp)
+
+ @property
+ def archive_path(self) -> str:
+ from config import ARCHIVE_DIR_NAME
+ return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp)
+
+ ### URL Helpers
+ @property
+ def urlhash(self):
+ from util import hashurl
+
+ return hashurl(self.url)
+
+ @property
+ def extension(self) -> str:
+ from util import extension
+ return extension(self.url)
+
+ @property
+ def domain(self) -> str:
+ from util import domain
+ return domain(self.url)
+
+ @property
+ def path(self) -> str:
+ from util import path
+ return path(self.url)
+
+ @property
+ def basename(self) -> str:
+ from util import basename
+ return basename(self.url)
+
+ @property
+ def base_url(self) -> str:
+ from util import base_url
+ return base_url(self.url)
+
+ ### Pretty Printing Helpers
+ @property
+ def bookmarked_date(self) -> Optional[str]:
+ from util import ts_to_date
+ return ts_to_date(self.timestamp) if self.timestamp else None
+
+ @property
+ def updated_date(self) -> Optional[str]:
+ from util import ts_to_date
+ return ts_to_date(self.updated) if self.updated else None
+
+ ### Archive Status Helpers
+ @property
+ def num_outputs(self) -> int:
+ return len(tuple(filter(None, self.latest_outputs().values())))
+
+ @property
+ def is_static(self) -> bool:
+ from util import is_static_file
+ return is_static_file(self.url)
+
+ @property
+ def is_archived(self) -> bool:
+ from config import ARCHIVE_DIR
+ from util import domain
+
+ return os.path.exists(os.path.join(
+ ARCHIVE_DIR,
+ self.timestamp,
+ domain(self.url),
+ ))
+
+ def latest_outputs(self, status: str=None) -> Dict[str, Optional[str]]:
+ """get the latest output that each archive method produced for link"""
+
+ latest = {
+ 'title': None,
+ 'favicon': None,
+ 'wget': None,
+ 'warc': None,
+ 'pdf': None,
+ 'screenshot': None,
+ 'dom': None,
+ 'git': None,
+ 'media': None,
+ 'archive_org': None,
+ }
+ for archive_method in latest.keys():
+ # get most recent succesful result in history for each archive method
+ history = self.history.get(archive_method) or []
+ history = filter(lambda result: result.output, reversed(history))
+ if status is not None:
+ history = filter(lambda result: result.status == status, history)
+
+ history = list(history)
+ if history:
+ latest[archive_method] = history[0].output
+
+ return latest
+
+ def canonical_outputs(self) -> Dict[str, Optional[str]]:
+ from util import wget_output_path
+ canonical = {
+ 'index_url': 'index.html',
+ 'favicon_url': 'favicon.ico',
+ 'google_favicon_url': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain),
+ 'archive_url': wget_output_path(self),
+ 'warc_url': 'warc',
+ 'pdf_url': 'output.pdf',
+ 'screenshot_url': 'screenshot.png',
+ 'dom_url': 'output.html',
+ 'archive_org_url': 'https://web.archive.org/web/{}'.format(self.base_url),
+ 'git_url': 'git',
+ 'media_url': 'media',
+ }
+ if self.is_static:
+ # static binary files like PDF and images are handled slightly differently.
+ # they're just downloaded once and aren't archived separately multiple times,
+ # so the wget, screenshot, & pdf urls should all point to the same file
+
+ static_url = wget_output_path(self)
+ canonical.update({
+ 'title': self.basename,
+ 'archive_url': static_url,
+ 'pdf_url': static_url,
+ 'screenshot_url': static_url,
+ 'dom_url': static_url,
+ })
+ return canonical
+
+
+@dataclass(frozen=True)
+class ArchiveIndex:
info: str
version: str
source: str
@@ -14,33 +226,11 @@ class ArchiveIndex(NamedTuple):
updated: str
links: List[Link]
-class ArchiveResult(NamedTuple):
- cmd: List[str]
- pwd: Optional[str]
- cmd_version: Optional[str]
- output: Union[str, Exception, None]
- status: str
- start_ts: datetime
- end_ts: datetime
- duration: int
+ def _asdict(self):
+ return asdict(self)
-
-class ArchiveError(Exception):
- def __init__(self, message, hints=None):
- super().__init__(message)
- self.hints = hints
-
-
-class LinkDict(NamedTuple):
- timestamp: str
- url: str
- title: Optional[str]
- tags: str
- sources: List[str]
- history: Dict[str, ArchiveResult]
-
-
-class RuntimeStats(RecordClass):
+@dataclass
+class RuntimeStats:
skipped: int
succeeded: int
failed: int
diff --git a/archivebox/templates/index_row.html b/archivebox/templates/index_row.html
index d3174ec0..766f8038 100644
--- a/archivebox/templates/index_row.html
+++ b/archivebox/templates/index_row.html
@@ -1,14 +1,14 @@
$bookmarked_date |
-
-
+
+
$title
$tags
|
- 📄
+ 📄
$num_outputs
|
diff --git a/archivebox/util.py b/archivebox/util.py
index 2c2c6a05..ef0b8fe6 100644
--- a/archivebox/util.py
+++ b/archivebox/util.py
@@ -4,9 +4,8 @@ import sys
import time
from json import JSONEncoder
-
-from typing import List, Dict, Optional, Iterable
-
+from typing import List, Optional, Iterable
+from hashlib import sha256
from urllib.request import Request, urlopen
from urllib.parse import urlparse, quote, unquote
from html import escape, unescape
@@ -21,17 +20,17 @@ from subprocess import (
CalledProcessError,
)
-from schema import Link
+from base32_crockford import encode as base32_encode
+
+from schema import Link, LinkDict, ArchiveResult
from config import (
ANSI,
TERM_WIDTH,
SOURCES_DIR,
- ARCHIVE_DIR,
OUTPUT_PERMISSIONS,
TIMEOUT,
SHOW_PROGRESS,
FETCH_TITLE,
- ARCHIVE_DIR_NAME,
CHECK_SSL_VALIDITY,
WGET_USER_AGENT,
CHROME_OPTIONS,
@@ -43,7 +42,7 @@ from logs import pretty_path
# All of these are (str) -> str
# shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
-scheme = lambda url: urlparse(url).scheme
+scheme = lambda url: urlparse(url).scheme.lower()
without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
@@ -56,11 +55,33 @@ fragment = lambda url: urlparse(url).fragment
extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
-short_ts = lambda ts: ts.split('.')[0]
-urlencode = lambda s: quote(s, encoding='utf-8', errors='replace')
-urldecode = lambda s: unquote(s)
-htmlencode = lambda s: escape(s, quote=True)
-htmldecode = lambda s: unescape(s)
+
+without_www = lambda url: url.replace('://www.', '://', 1)
+without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
+fuzzy_url = lambda url: without_trailing_slash(without_www(without_scheme(url.lower())))
+
+short_ts = lambda ts: (
+ str(ts.timestamp()).split('.')[0]
+ if isinstance(ts, datetime) else
+ str(ts).split('.')[0]
+)
+ts_to_date = lambda ts: (
+ ts.strftime('%Y-%m-%d %H:%M')
+ if isinstance(ts, datetime) else
+ datetime.fromtimestamp(float(ts)).strftime('%Y-%m-%d %H:%M')
+)
+ts_to_iso = lambda ts: (
+ ts.isoformat()
+ if isinstance(ts, datetime) else
+ datetime.fromtimestamp(float(ts)).isoformat()
+)
+
+urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
+urldecode = lambda s: s and unquote(s)
+htmlencode = lambda s: s and escape(s, quote=True)
+htmldecode = lambda s: s and unescape(s)
+
+hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
URL_REGEX = re.compile(
r'http[s]?://' # start matching from allowed schemes
@@ -80,7 +101,8 @@ STATICFILE_EXTENSIONS = {
# that can be downloaded as-is, not html pages that need to be rendered
'gif', 'jpeg', 'jpg', 'png', 'tif', 'tiff', 'wbmp', 'ico', 'jng', 'bmp',
'svg', 'svgz', 'webp', 'ps', 'eps', 'ai',
- 'mp3', 'mp4', 'm4a', 'mpeg', 'mpg', 'mkv', 'mov', 'webm', 'm4v', 'flv', 'wmv', 'avi', 'ogg', 'ts', 'm3u8'
+ 'mp3', 'mp4', 'm4a', 'mpeg', 'mpg', 'mkv', 'mov', 'webm', 'm4v',
+ 'flv', 'wmv', 'avi', 'ogg', 'ts', 'm3u8'
'pdf', 'txt', 'rtf', 'rtfd', 'doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx',
'atom', 'rss', 'css', 'js', 'json',
'dmg', 'iso', 'img',
@@ -100,7 +122,7 @@ STATICFILE_EXTENSIONS = {
### Checks & Tests
-def check_link_structure(link: Link) -> None:
+def check_link_structure(link: LinkDict) -> None:
"""basic sanity check invariants to make sure the data is valid"""
assert isinstance(link, dict)
assert isinstance(link.get('url'), str)
@@ -112,7 +134,7 @@ def check_link_structure(link: Link) -> None:
assert isinstance(key, str)
assert isinstance(val, list), 'history must be a Dict[str, List], got: {}'.format(link['history'])
-def check_links_structure(links: Iterable[Link]) -> None:
+def check_links_structure(links: Iterable[LinkDict]) -> None:
"""basic sanity check invariants to make sure the data is valid"""
assert isinstance(links, list)
if links:
@@ -213,7 +235,7 @@ def fetch_page_title(url: str, timeout: int=10, progress: bool=SHOW_PROGRESS) ->
html = download_url(url, timeout=timeout)
match = re.search(HTML_TITLE_REGEX, html)
- return match.group(1).strip() if match else None
+ return htmldecode(match.group(1).strip()) if match else None
except Exception as err: # noqa
# print('[!] Failed to fetch title because of {}: {}'.format(
# err.__class__.__name__,
@@ -228,8 +250,8 @@ def wget_output_path(link: Link) -> Optional[str]:
See docs on wget --adjust-extension (-E)
"""
- if is_static_file(link['url']):
- return without_scheme(without_fragment(link['url']))
+ if is_static_file(link.url):
+ return without_scheme(without_fragment(link.url))
# Wget downloads can save in a number of different ways depending on the url:
# https://example.com
@@ -262,11 +284,10 @@ def wget_output_path(link: Link) -> Optional[str]:
# and there's no way to get the computed output path from wget
# in order to avoid having to reverse-engineer how they calculate it,
# we just look in the output folder read the filename wget used from the filesystem
- link_dir = os.path.join(ARCHIVE_DIR, link['timestamp'])
- full_path = without_fragment(without_query(path(link['url']))).strip('/')
+ full_path = without_fragment(without_query(path(link.url))).strip('/')
search_dir = os.path.join(
- link_dir,
- domain(link['url']),
+ link.link_dir,
+ domain(link.url),
full_path,
)
@@ -278,13 +299,13 @@ def wget_output_path(link: Link) -> Optional[str]:
if re.search(".+\\.[Hh][Tt][Mm][Ll]?$", f, re.I | re.M)
]
if html_files:
- path_from_link_dir = search_dir.split(link_dir)[-1].strip('/')
+ path_from_link_dir = search_dir.split(link.link_dir)[-1].strip('/')
return os.path.join(path_from_link_dir, html_files[0])
# Move up one directory level
search_dir = search_dir.rsplit('/', 1)[0]
- if search_dir == link_dir:
+ if search_dir == link.link_dir:
break
return None
@@ -314,19 +335,20 @@ def merge_links(a: Link, b: Link) -> Link:
"""deterministially merge two links, favoring longer field values over shorter,
and "cleaner" values over worse ones.
"""
+ a, b = a._asdict(), b._asdict()
longer = lambda key: (a[key] if len(a[key]) > len(b[key]) else b[key]) if (a[key] and b[key]) else (a[key] or b[key])
earlier = lambda key: a[key] if a[key] < b[key] else b[key]
url = longer('url')
longest_title = longer('title')
cleanest_title = a['title'] if '://' not in (a['title'] or '') else b['title']
- return {
- 'url': url,
- 'timestamp': earlier('timestamp'),
- 'title': longest_title if '://' not in (longest_title or '') else cleanest_title,
- 'tags': longer('tags'),
- 'sources': list(set(a.get('sources', []) + b.get('sources', []))),
- }
+ return Link(
+ url=url,
+ timestamp=earlier('timestamp'),
+ title=longest_title if '://' not in (longest_title or '') else cleanest_title,
+ tags=longer('tags'),
+ sources=list(set(a.get('sources', []) + b.get('sources', []))),
+ )
def is_static_file(url: str) -> bool:
"""Certain URLs just point to a single static file, and
@@ -339,85 +361,11 @@ def is_static_file(url: str) -> bool:
def derived_link_info(link: Link) -> dict:
"""extend link info with the archive urls and other derived data"""
- url = link['url']
+ info = link._asdict(extended=True)
+ info.update(link.canonical_outputs())
- to_date_str = lambda ts: datetime.fromtimestamp(float(ts)).strftime('%Y-%m-%d %H:%M')
+ return info
- extended_info = {
- **link,
- 'link_dir': '{}/{}'.format(ARCHIVE_DIR_NAME, link['timestamp']),
- 'bookmarked_date': to_date_str(link['timestamp']),
- 'updated_date': to_date_str(link['updated']) if 'updated' in link else None,
- 'domain': domain(url),
- 'path': path(url),
- 'basename': basename(url),
- 'extension': extension(url),
- 'base_url': base_url(url),
- 'is_static': is_static_file(url),
- 'is_archived': os.path.exists(os.path.join(
- ARCHIVE_DIR,
- link['timestamp'],
- domain(url),
- )),
- 'num_outputs': len([entry for entry in latest_output(link).values() if entry]),
- }
-
- # Archive Method Output URLs
- extended_info.update({
- 'index_url': 'index.html',
- 'favicon_url': 'favicon.ico',
- 'google_favicon_url': 'https://www.google.com/s2/favicons?domain={domain}'.format(**extended_info),
- 'archive_url': wget_output_path(link),
- 'warc_url': 'warc',
- 'pdf_url': 'output.pdf',
- 'screenshot_url': 'screenshot.png',
- 'dom_url': 'output.html',
- 'archive_org_url': 'https://web.archive.org/web/{base_url}'.format(**extended_info),
- 'git_url': 'git',
- 'media_url': 'media',
- })
- # static binary files like PDF and images are handled slightly differently.
- # they're just downloaded once and aren't archived separately multiple times,
- # so the wget, screenshot, & pdf urls should all point to the same file
- if is_static_file(url):
- extended_info.update({
- 'title': basename(url),
- 'archive_url': base_url(url),
- 'pdf_url': base_url(url),
- 'screenshot_url': base_url(url),
- 'dom_url': base_url(url),
- })
-
- return extended_info
-
-
-def latest_output(link: Link, status: str=None) -> Dict[str, Optional[str]]:
- """get the latest output that each archive method produced for link"""
-
- latest = {
- 'title': None,
- 'favicon': None,
- 'wget': None,
- 'warc': None,
- 'pdf': None,
- 'screenshot': None,
- 'dom': None,
- 'git': None,
- 'media': None,
- 'archive_org': None,
- }
- for archive_method in latest.keys():
- # get most recent succesful result in history for each archive method
- history = link.get('history', {}).get(archive_method) or []
- history = filter(lambda result: result['output'], reversed(history))
- if status is not None:
- history = filter(lambda result: result['status'] == status, history)
-
- history = list(history)
- if history:
- latest[archive_method] = history[0]['output']
-
- return latest
### Python / System Helpers
@@ -466,21 +414,13 @@ class TimedProgress:
self.p = Process(target=progress_bar, args=(seconds, prefix))
self.p.start()
- self.stats = {
- 'start_ts': datetime.now(),
- 'end_ts': None,
- 'duration': None,
- }
+ self.stats = {'start_ts': datetime.now(), 'end_ts': None}
def end(self):
"""immediately end progress, clear the progressbar line, and save end_ts"""
end_ts = datetime.now()
- self.stats.update({
- 'end_ts': end_ts,
- 'duration': (end_ts - self.stats['start_ts']).seconds,
- })
-
+ self.stats['end_ts'] = end_ts
if SHOW_PROGRESS:
# protect from double termination
#if p is None or not hasattr(p, 'kill'):