1
0
Fork 0
mirror of synced 2024-05-18 03:12:51 +12:00

Merge pull request #356 from cdvv7788/depth-flag

This commit is contained in:
Nick Sweeting 2020-07-13 05:05:36 -04:00 committed by GitHub
commit 5b571aa166
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 90 additions and 41 deletions

View file

@ -38,18 +38,38 @@ def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional
type=str,
default=None,
help=(
'URL or path to local file containing a list of links to import. e.g.:\n'
'URL or path to local file to start the archiving process from. e.g.:\n'
' https://getpocket.com/users/USERNAME/feed/all\n'
' https://example.com/some/rss/feed.xml\n'
' https://example.com\n'
' ~/Downloads/firefox_bookmarks_export.html\n'
' ~/Desktop/sites_list.csv\n'
)
)
parser.add_argument(
"--depth",
action="store",
default=0,
choices=[0,1],
type=int,
help="Recursively archive all linked pages up to this many hops away"
)
command = parser.parse_args(args or ())
import_str = accept_stdin(stdin)
import_string = accept_stdin(stdin)
if import_string and command.import_path:
stderr(
'[X] You should pass an import path or a page url as an argument or in stdin but not both\n',
color='red',
)
raise SystemExit(2)
elif import_string:
import_path = import_string
else:
import_path = command.import_path
add(
import_str=import_str,
import_path=command.import_path,
url=import_path,
depth=command.depth,
update_all=command.update_all,
index_only=command.index_only,
out_dir=pwd or OUTPUT_DIR,
@ -63,12 +83,6 @@ if __name__ == '__main__':
# TODO: Implement these
#
# parser.add_argument(
# '--depth', #'-d',
# type=int,
# help='Recursively archive all linked pages up to this many hops away',
# default=0,
# )
# parser.add_argument(
# '--mirror', #'-m',
# action='store_true',
# help='Archive an entire site (finding all linked pages below it on the same domain)',

View file

@ -66,12 +66,10 @@ class AddLinks(View):
if form.is_valid():
url = form.cleaned_data["url"]
print(f'[+] Adding URL: {url}')
if form.cleaned_data["source"] == "url":
key = "import_str"
else:
key = "import_path"
depth = 0 if form.cleaned_data["source"] == "url" else 1
input_kwargs = {
key: url,
"url": url,
"depth": depth,
"update_all": False,
"out_dir": OUTPUT_DIR,
}

View file

@ -496,8 +496,8 @@ def status(out_dir: str=OUTPUT_DIR) -> None:
@enforce_types
def add(import_str: Optional[str]=None,
import_path: Optional[str]=None,
def add(url: str,
depth: int=0,
update_all: bool=not ONLY_NEW,
index_only: bool=False,
out_dir: str=OUTPUT_DIR) -> List[Link]:
@ -505,18 +505,9 @@ def add(import_str: Optional[str]=None,
check_data_folder(out_dir=out_dir)
if (import_str and import_path) or (not import_str and not import_path):
stderr(
'[X] You should pass either an import path as an argument, '
'or pass a list of links via stdin, but not both.\n',
color='red',
)
raise SystemExit(2)
elif import_str:
import_path = save_stdin_to_sources(import_str, out_dir=out_dir)
elif import_path:
import_path = save_file_to_sources(import_path, out_dir=out_dir)
base_path = save_stdin_to_sources(url, out_dir=out_dir)
if depth == 1:
depth_path = save_file_to_sources(url, out_dir=out_dir)
check_dependencies()
# Step 1: Load list of links from the existing index
@ -524,8 +515,11 @@ def add(import_str: Optional[str]=None,
all_links: List[Link] = []
new_links: List[Link] = []
all_links = load_main_index(out_dir=out_dir)
if import_path:
all_links, new_links = import_new_links(all_links, import_path, out_dir=out_dir)
all_links, new_links = import_new_links(all_links, base_path, out_dir=out_dir)
if depth == 1:
all_links, new_links_depth = import_new_links(all_links, depth_path, out_dir=out_dir)
new_links = new_links + new_links_depth
# Step 2: Write updated index with deduped old and new links back to disk
write_main_index(links=all_links, out_dir=out_dir)

0
tests/__init__.py Normal file
View file

10
tests/fixtures.py Normal file
View file

@ -0,0 +1,10 @@
import os
import subprocess
import pytest
@pytest.fixture
def process(tmp_path):
os.chdir(tmp_path)
process = subprocess.run(['archivebox', 'init'], capture_output=True)
return process

28
tests/test_args.py Normal file
View file

@ -0,0 +1,28 @@
import subprocess
import json
from .fixtures import *
def test_depth_flag_is_accepted(process):
arg_process = subprocess.run(["archivebox", "add", "https://example.com", "--depth=0"], capture_output=True)
assert 'unrecognized arguments: --depth' not in arg_process.stderr.decode("utf-8")
def test_depth_flag_fails_if_it_is_not_0_or_1(process):
arg_process = subprocess.run(["archivebox", "add", "https://example.com", "--depth=5"], capture_output=True)
assert 'invalid choice' in arg_process.stderr.decode("utf-8")
arg_process = subprocess.run(["archivebox", "add", "https://example.com", "--depth=-1"], capture_output=True)
assert 'invalid choice' in arg_process.stderr.decode("utf-8")
def test_depth_flag_0_crawls_only_the_arg_page(tmp_path, process):
arg_process = subprocess.run(["archivebox", "add", "https://example.com", "--depth=0"], capture_output=True)
archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
with open(archived_item_path / "index.json", "r") as f:
output_json = json.load(f)
assert output_json["base_url"] == "example.com"
def test_depth_flag_1_crawls_the_page_AND_links(tmp_path, process):
arg_process = subprocess.run(["archivebox", "add", "https://example.com", "--depth=1"], capture_output=True)
with open(tmp_path / "index.json", "r") as f:
archive_file = f.read()
assert "https://example.com" in archive_file
assert "https://www.iana.org/domains/example" in archive_file

View file

@ -6,14 +6,7 @@ import subprocess
from pathlib import Path
import json
import pytest
@pytest.fixture
def process(tmp_path):
os.chdir(tmp_path)
process = subprocess.run(['archivebox', 'init'], capture_output=True)
return process
from .fixtures import *
def test_init(tmp_path, process):
assert "Initializing a new ArchiveBox collection in this folder..." in process.stdout.decode("utf-8")
@ -32,9 +25,21 @@ def test_add_link(tmp_path, process):
with open(archived_item_path / "index.json", "r") as f:
output_json = json.load(f)
assert "IANA — IANA-managed Reserved Domains" == output_json['history']['title'][0]['output']
assert "Example Domain" == output_json['history']['title'][0]['output']
with open(tmp_path / "index.html", "r") as f:
output_html = f.read()
assert "IANA — IANA-managed Reserved Domains" in output_html
assert "Example Domain" in output_html
def test_add_link_support_stdin(tmp_path, process):
os.chdir(tmp_path)
stdin_process = subprocess.Popen(["archivebox", "add"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdin_process.communicate(input="http://example.com".encode())
archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
assert "index.json" in [x.name for x in archived_item_path.iterdir()]
with open(archived_item_path / "index.json", "r") as f:
output_json = json.load(f)
assert "Example Domain" == output_json['history']['title'][0]['output']