1
0
Fork 0
mirror of synced 2024-07-06 06:50:42 +12:00
ArchiveBox/archivebox/index.py

291 lines
8.9 KiB
Python

import os
import json
from datetime import datetime
from string import Template
from typing import List, Tuple, Iterator, Optional
try:
from distutils.dir_util import copy_tree
except ImportError:
print('[X] Missing "distutils" python package. To install it, run:')
print(' pip install distutils')
from schema import Link, ArchiveIndex, ArchiveResult
from config import (
OUTPUT_DIR,
TEMPLATES_DIR,
GIT_SHA,
FOOTER_INFO,
)
from util import (
merge_links,
chmod_file,
urlencode,
derived_link_info,
wget_output_path,
ExtendedEncoder,
enforce_types,
)
from parse import parse_links
from links import validate_links
from logs import (
log_indexing_process_started,
log_indexing_started,
log_indexing_finished,
log_parsing_started,
log_parsing_finished,
)
TITLE_LOADING_MSG = 'Not yet archived...'
### Homepage index for all the links
@enforce_types
def write_links_index(out_dir: str, links: List[Link], finished: bool=False) -> None:
"""create index.html file for a given list of links"""
log_indexing_process_started()
log_indexing_started(out_dir, 'index.json')
write_json_links_index(out_dir, links)
log_indexing_finished(out_dir, 'index.json')
log_indexing_started(out_dir, 'index.html')
write_html_links_index(out_dir, links, finished=finished)
log_indexing_finished(out_dir, 'index.html')
@enforce_types
def load_links_index(out_dir: str=OUTPUT_DIR, import_path: str=None) -> Tuple[List[Link], List[Link]]:
"""parse and load existing index with any new links from import_path merged in"""
existing_links: List[Link] = []
if out_dir:
existing_links = list(parse_json_links_index(out_dir))
new_links: List[Link] = []
if import_path:
# parse and validate the import file
log_parsing_started(import_path)
raw_links, parser_name = parse_links(import_path)
new_links = list(validate_links(raw_links))
# merge existing links in out_dir and new links
all_links = list(validate_links(existing_links + new_links))
num_new_links = len(all_links) - len(existing_links)
if import_path and parser_name:
log_parsing_finished(num_new_links, parser_name)
return all_links, new_links
@enforce_types
def write_json_links_index(out_dir: str, links: List[Link]) -> None:
"""write the json link index to a given path"""
assert isinstance(links, List), 'Links must be a list, not a generator.'
assert isinstance(links[0].history, dict)
assert isinstance(links[0].sources, list)
if links[0].history.get('title'):
assert isinstance(links[0].history['title'][0], ArchiveResult)
if links[0].sources:
assert isinstance(links[0].sources[0], str)
path = os.path.join(out_dir, 'index.json')
index_json = ArchiveIndex(
info='ArchiveBox Index',
source='https://github.com/pirate/ArchiveBox',
docs='https://github.com/pirate/ArchiveBox/wiki',
version=GIT_SHA,
num_links=len(links),
updated=datetime.now(),
links=links,
)
assert isinstance(index_json._asdict(), dict)
with open(path, 'w', encoding='utf-8') as f:
json.dump(index_json._asdict(), f, indent=4, cls=ExtendedEncoder)
chmod_file(path)
@enforce_types
def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
"""parse a archive index json file and return the list of links"""
index_path = os.path.join(out_dir, 'index.json')
if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f:
links = json.load(f)['links']
for link in links:
yield Link(**link)
return ()
@enforce_types
def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False) -> None:
"""write the html link index to a given path"""
path = os.path.join(out_dir, 'index.html')
copy_tree(os.path.join(TEMPLATES_DIR, 'static'), os.path.join(out_dir, 'static'))
with open(os.path.join(out_dir, 'robots.txt'), 'w+') as f:
f.write('User-agent: *\nDisallow: /')
with open(os.path.join(TEMPLATES_DIR, 'index.html'), 'r', encoding='utf-8') as f:
index_html = f.read()
with open(os.path.join(TEMPLATES_DIR, 'index_row.html'), 'r', encoding='utf-8') as f:
link_row_html = f.read()
link_rows = '\n'.join(
Template(link_row_html).substitute(**{
**derived_link_info(link),
'title': (
link.title
or (link.base_url if link.is_archived else TITLE_LOADING_MSG)
),
'tags': link.tags or '',
'favicon_url': (
os.path.join('archive', link.timestamp, 'favicon.ico')
# if link['is_archived'] else ''
),
'archive_url': urlencode(
wget_output_path(link) or 'index.html'
),
})
for link in links
)
template_vars = {
'num_links': len(links),
'date_updated': datetime.now().strftime('%Y-%m-%d'),
'time_updated': datetime.now().strftime('%Y-%m-%d %H:%M'),
'footer_info': FOOTER_INFO,
'git_sha': GIT_SHA,
'short_git_sha': GIT_SHA[:8],
'rows': link_rows,
'status': 'finished' if finished else 'running',
}
with open(path, 'w', encoding='utf-8') as f:
f.write(Template(index_html).substitute(**template_vars))
chmod_file(path)
@enforce_types
def patch_links_index(link: Link, out_dir: str=OUTPUT_DIR) -> None:
"""hack to in-place update one row's info in the generated index html"""
title = link.title or link.latest_outputs()['title']
successful = link.num_outputs
# Patch JSON index
changed = False
json_file_links = parse_json_links_index(out_dir)
patched_links = []
for saved_link in json_file_links:
if saved_link.url == link.url:
patched_links.append(Link(**{
**saved_link._asdict(),
'title': title,
'history': link.history,
'updated': link.updated,
}))
else:
patched_links.append(saved_link)
write_json_links_index(out_dir, patched_links)
# Patch HTML index
html_path = os.path.join(out_dir, 'index.html')
html = open(html_path, 'r').read().split('\n')
for idx, line in enumerate(html):
if title and ('<span data-title-for="{}"'.format(link.url) in line):
html[idx] = '<span>{}</span>'.format(title)
elif successful and ('<span data-number-for="{}"'.format(link.url) in line):
html[idx] = '<span>{}</span>'.format(successful)
break
with open(html_path, 'w') as f:
f.write('\n'.join(html))
### Individual link index
@enforce_types
def write_link_index(out_dir: str, link: Link) -> None:
write_json_link_index(out_dir, link)
write_html_link_index(out_dir, link)
@enforce_types
def write_json_link_index(out_dir: str, link: Link) -> None:
"""write a json file with some info about the link"""
path = os.path.join(out_dir, 'index.json')
with open(path, 'w', encoding='utf-8') as f:
json.dump(link._asdict(), f, indent=4, cls=ExtendedEncoder)
chmod_file(path)
@enforce_types
def parse_json_link_index(out_dir: str) -> Optional[Link]:
"""load the json link index from a given directory"""
existing_index = os.path.join(out_dir, 'index.json')
if os.path.exists(existing_index):
with open(existing_index, 'r', encoding='utf-8') as f:
link_json = json.load(f)
return Link(**link_json)
return None
@enforce_types
def load_json_link_index(out_dir: str, link: Link) -> Link:
"""check for an existing link archive in the given directory,
and load+merge it into the given link dict
"""
existing_link = parse_json_link_index(out_dir)
if existing_link:
return merge_links(existing_link, link)
return link
@enforce_types
def write_html_link_index(out_dir: str, link: Link) -> None:
with open(os.path.join(TEMPLATES_DIR, 'link_index.html'), 'r', encoding='utf-8') as f:
link_html = f.read()
path = os.path.join(out_dir, 'index.html')
with open(path, 'w', encoding='utf-8') as f:
f.write(Template(link_html).substitute({
**derived_link_info(link),
'title': (
link.title
or (link.base_url if link.is_archived else TITLE_LOADING_MSG)
),
'archive_url': urlencode(
wget_output_path(link)
or (link.domain if link.is_archived else 'about:blank')
),
'extension': link.extension or 'html',
'tags': link.tags or 'untagged',
'status': 'Archived' if link.is_archived else 'Not yet archived',
'status_color': 'success' if link.is_archived else 'danger',
}))
chmod_file(path)