1
0
Fork 0
mirror of synced 2024-06-30 12:00:41 +12:00

Merge pull request #396 from cdvv7788/oneshot-command

This commit is contained in:
Nick Sweeting 2020-08-01 13:44:51 -04:00 committed by GitHub
commit dd916e91d0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 190 additions and 44 deletions

View file

@ -0,0 +1,62 @@
#!/usr/bin/env python3
__package__ = 'archivebox.cli'
__command__ = 'archivebox oneshot'
import sys
import argparse
from pathlib import Path
from typing import List, Optional, IO
from ..main import oneshot
from ..util import docstring
from ..config import OUTPUT_DIR
from ..logging_util import SmartFormatter, accept_stdin, stderr
@docstring(oneshot.__doc__)
def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None:
parser = argparse.ArgumentParser(
prog=__command__,
description=oneshot.__doc__,
add_help=True,
formatter_class=SmartFormatter,
)
parser.add_argument(
'url',
type=str,
default=None,
help=(
'URLs or paths to archive e.g.:\n'
' https://getpocket.com/users/USERNAME/feed/all\n'
' https://example.com/some/rss/feed.xml\n'
' https://example.com\n'
' ~/Downloads/firefox_bookmarks_export.html\n'
' ~/Desktop/sites_list.csv\n'
)
)
parser.add_argument(
'--out-dir',
type=str,
default=OUTPUT_DIR,
help= "Path to save the single archive folder to, e.g. ./example.com_archive"
)
command = parser.parse_args(args or ())
url = command.url
stdin_url = accept_stdin(stdin)
if (stdin_url and url) or (not stdin and not url):
stderr(
'[X] You must pass a URL/path to add via stdin or CLI arguments.\n',
color='red',
)
raise SystemExit(2)
oneshot(
url=stdin_url or url,
out_dir=str(Path(command.out_dir).absolute()),
)
if __name__ == '__main__':
main(args=sys.argv[1:], stdin=sys.stdin)

View file

@ -32,22 +32,32 @@ from .git import should_save_git, save_git
from .media import should_save_media, save_media from .media import should_save_media, save_media
from .archive_org import should_save_archive_dot_org, save_archive_dot_org from .archive_org import should_save_archive_dot_org, save_archive_dot_org
def get_default_archive_methods():
return [
('title', should_save_title, save_title),
('favicon', should_save_favicon, save_favicon),
('wget', should_save_wget, save_wget),
('pdf', should_save_pdf, save_pdf),
('screenshot', should_save_screenshot, save_screenshot),
('dom', should_save_dom, save_dom),
('git', should_save_git, save_git),
('media', should_save_media, save_media),
('archive_org', should_save_archive_dot_org, save_archive_dot_org),
]
@enforce_types @enforce_types
def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[str]=None) -> Link: def ignore_methods(to_ignore: List[str]):
ARCHIVE_METHODS = get_default_archive_methods()
methods = filter(lambda x: x[0] not in to_ignore, ARCHIVE_METHODS)
methods = map(lambda x: x[1], methods)
return list(methods)
@enforce_types
def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[str]=None, skip_index: bool=False) -> Link:
"""download the DOM, PDF, and a screenshot into a folder named after the link's timestamp""" """download the DOM, PDF, and a screenshot into a folder named after the link's timestamp"""
ARCHIVE_METHODS = [ ARCHIVE_METHODS = get_default_archive_methods()
('title', should_save_title, save_title),
('favicon', should_save_favicon, save_favicon),
('wget', should_save_wget, save_wget),
('pdf', should_save_pdf, save_pdf),
('screenshot', should_save_screenshot, save_screenshot),
('dom', should_save_dom, save_dom),
('git', should_save_git, save_git),
('media', should_save_media, save_media),
('archive_org', should_save_archive_dot_org, save_archive_dot_org),
]
if methods is not None: if methods is not None:
ARCHIVE_METHODS = [ ARCHIVE_METHODS = [
method for method in ARCHIVE_METHODS method for method in ARCHIVE_METHODS
@ -61,7 +71,7 @@ def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[s
os.makedirs(out_dir) os.makedirs(out_dir)
link = load_link_details(link, out_dir=out_dir) link = load_link_details(link, out_dir=out_dir)
write_link_details(link, out_dir=link.link_dir) write_link_details(link, out_dir=out_dir, skip_sql_index=skip_index)
log_link_archiving_started(link, out_dir, is_new) log_link_archiving_started(link, out_dir, is_new)
link = link.overwrite(updated=datetime.now()) link = link.overwrite(updated=datetime.now())
stats = {'skipped': 0, 'succeeded': 0, 'failed': 0} stats = {'skipped': 0, 'succeeded': 0, 'failed': 0}
@ -97,8 +107,9 @@ def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[s
except Exception: except Exception:
pass pass
write_link_details(link, out_dir=link.link_dir) write_link_details(link, out_dir=out_dir, skip_sql_index=skip_index)
patch_main_index(link) if not skip_index:
patch_main_index(link)
# # If any changes were made, update the main links index json and html # # If any changes were made, update the main links index json and html
# was_changed = stats['succeeded'] or stats['failed'] # was_changed = stats['succeeded'] or stats['failed']

View file

@ -354,12 +354,13 @@ def patch_main_index(link: Link, out_dir: str=OUTPUT_DIR) -> None:
### Link Details Index ### Link Details Index
@enforce_types @enforce_types
def write_link_details(link: Link, out_dir: Optional[str]=None) -> None: def write_link_details(link: Link, out_dir: Optional[str]=None, skip_sql_index: bool=False) -> None:
out_dir = out_dir or link.link_dir out_dir = out_dir or link.link_dir
write_json_link_details(link, out_dir=out_dir) write_json_link_details(link, out_dir=out_dir)
write_html_link_details(link, out_dir=out_dir) write_html_link_details(link, out_dir=out_dir)
write_sql_link_details(link) if not skip_sql_index:
write_sql_link_details(link)
@enforce_types @enforce_types

View file

@ -18,6 +18,7 @@ from .cli import (
from .parsers import ( from .parsers import (
save_text_as_source, save_text_as_source,
save_file_as_source, save_file_as_source,
parse_links_memory,
) )
from .index.schema import Link from .index.schema import Link
from .util import enforce_types # type: ignore from .util import enforce_types # type: ignore
@ -51,7 +52,7 @@ from .index.sql import (
remove_from_sql_main_index, remove_from_sql_main_index,
) )
from .index.html import parse_html_main_index from .index.html import parse_html_main_index
from .extractors import archive_links from .extractors import archive_links, archive_link, ignore_methods
from .config import ( from .config import (
stderr, stderr,
ConfigDict, ConfigDict,
@ -493,6 +494,23 @@ def status(out_dir: str=OUTPUT_DIR) -> None:
print(ANSI['black'], ' ...', ANSI['reset']) print(ANSI['black'], ' ...', ANSI['reset'])
@enforce_types
def oneshot(url: str, out_dir: str=OUTPUT_DIR):
"""
Create a single URL archive folder with an index.json and index.html, and all the archive method outputs.
You can run this to archive single pages without needing to create a whole collection with archivebox init.
"""
oneshot_link, _ = parse_links_memory([url])
if len(oneshot_link) > 1:
stderr(
'[X] You should pass a single url to the oneshot command',
color='red'
)
raise SystemExit(2)
methods = ignore_methods(['title'])
archive_link(oneshot_link[0], out_dir=out_dir, methods=methods, skip_index=True)
return oneshot_link
@enforce_types @enforce_types
def add(urls: Union[str, List[str]], def add(urls: Union[str, List[str]],
depth: int=0, depth: int=0,
@ -1055,3 +1073,4 @@ def shell(out_dir: str=OUTPUT_DIR) -> None:
setup_django(OUTPUT_DIR) setup_django(OUTPUT_DIR)
from django.core.management import call_command from django.core.management import call_command
call_command("shell_plus") call_command("shell_plus")

View file

@ -9,8 +9,9 @@ __package__ = 'archivebox.parsers'
import re import re
import os import os
from io import StringIO
from typing import Tuple, List from typing import IO, Tuple, List
from datetime import datetime from datetime import datetime
from ..system import atomic_write from ..system import atomic_write
@ -37,15 +38,7 @@ from .generic_rss import parse_generic_rss_export
from .generic_json import parse_generic_json_export from .generic_json import parse_generic_json_export
from .generic_txt import parse_generic_txt_export from .generic_txt import parse_generic_txt_export
PARSERS = (
@enforce_types
def parse_links(source_file: str) -> Tuple[List[Link], str]:
"""parse a list of URLs with their metadata from an
RSS feed, bookmarks export, or text file
"""
check_url_parsing_invariants()
PARSERS = (
# Specialized parsers # Specialized parsers
('Pocket HTML', parse_pocket_html_export), ('Pocket HTML', parse_pocket_html_export),
('Pinboard RSS', parse_pinboard_rss_export), ('Pinboard RSS', parse_pinboard_rss_export),
@ -60,30 +53,66 @@ def parse_links(source_file: str) -> Tuple[List[Link], str]:
# Fallback parser # Fallback parser
('Plain Text', parse_generic_txt_export), ('Plain Text', parse_generic_txt_export),
) )
@enforce_types
def parse_links_memory(urls: List[str]):
"""
parse a list of URLS without touching the filesystem
"""
check_url_parsing_invariants()
timer = TimedProgress(TIMEOUT * 4) timer = TimedProgress(TIMEOUT * 4)
with open(source_file, 'r', encoding='utf-8') as file: #urls = list(map(lambda x: x + "\n", urls))
for parser_name, parser_func in PARSERS: file = StringIO()
try: file.writelines(urls)
links = list(parser_func(file)) file.name = "io_string"
if links: output = _parse(file, timer)
timer.end()
return links, parser_name if output is not None:
except Exception as err: # noqa return output
pass
# Parsers are tried one by one down the list, and the first one
# that succeeds is used. To see why a certain parser was not used
# due to error or format incompatibility, uncomment this line:
# print('[!] Parser {} failed: {} {}'.format(parser_name, err.__class__.__name__, err))
# raise
timer.end() timer.end()
return [], 'Failed to parse' return [], 'Failed to parse'
@enforce_types
def parse_links(source_file: str) -> Tuple[List[Link], str]:
"""parse a list of URLs with their metadata from an
RSS feed, bookmarks export, or text file
"""
check_url_parsing_invariants()
timer = TimedProgress(TIMEOUT * 4)
with open(source_file, 'r', encoding='utf-8') as file:
output = _parse(file, timer)
if output is not None:
return output
timer.end()
return [], 'Failed to parse'
def _parse(to_parse: IO[str], timer) -> Tuple[List[Link], str]:
for parser_name, parser_func in PARSERS:
try:
links = list(parser_func(to_parse))
if links:
timer.end()
return links, parser_name
except Exception as err: # noqa
pass
# Parsers are tried one by one down the list, and the first one
# that succeeds is used. To see why a certain parser was not used
# due to error or format incompatibility, uncomment this line:
# print('[!] Parser {} failed: {} {}'.format(parser_name, err.__class__.__name__, err))
# raise
@enforce_types @enforce_types
def save_text_as_source(raw_text: str, filename: str='{ts}-stdin.txt', out_dir: str=OUTPUT_DIR) -> str: def save_text_as_source(raw_text: str, filename: str='{ts}-stdin.txt', out_dir: str=OUTPUT_DIR) -> str:
ts = str(datetime.now().timestamp()).split('.', 1)[0] ts = str(datetime.now().timestamp()).split('.', 1)[0]
source_path = os.path.join(OUTPUT_DIR, SOURCES_DIR_NAME, filename.format(ts=ts)) source_path = os.path.join(out_dir, SOURCES_DIR_NAME, filename.format(ts=ts))
atomic_write(source_path, raw_text) atomic_write(source_path, raw_text)
log_source_saved(source_file=source_path) log_source_saved(source_file=source_path)
return source_path return source_path

View file

@ -1,5 +1,13 @@
from .fixtures import * from .fixtures import *
from archivebox.extractors import ignore_methods, get_default_archive_methods, should_save_title
def test_wget_broken_pipe(tmp_path, process): def test_wget_broken_pipe(tmp_path, process):
add_process = subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'], capture_output=True) add_process = subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'], capture_output=True)
assert "TypeError chmod_file(..., path: str) got unexpected NoneType argument path=None" not in add_process.stdout.decode("utf-8") assert "TypeError chmod_file(..., path: str) got unexpected NoneType argument path=None" not in add_process.stdout.decode("utf-8")
def test_ignore_methods():
"""
Takes the passed method out of the default methods list and returns that value
"""
ignored = ignore_methods(['title'])
assert should_save_title not in ignored

16
tests/test_oneshot.py Normal file
View file

@ -0,0 +1,16 @@
from pathlib import Path
from .fixtures import *
def test_oneshot_command_exists(tmp_path):
os.chdir(tmp_path)
process = subprocess.run(['archivebox', 'oneshot'], capture_output=True)
assert not "invalid choice: 'oneshot'" in process.stderr.decode("utf-8")
def test_oneshot_commad_saves_page_in_right_folder(tmp_path):
process = subprocess.run(["archivebox", "oneshot", f"--out-dir={tmp_path}", "http://127.0.0.1:8080/static/example.com.html"], capture_output=True)
items = ' '.join([str(x) for x in tmp_path.iterdir()])
current_path = ' '.join([str(x) for x in Path.cwd().iterdir()])
assert "index.json" in items
assert not "index.sqlite3" in current_path