1
0
Fork 0
mirror of synced 2024-06-20 19:30:15 +12:00

Merge pull request #1195 from overhacked/method_allow_deny

This commit is contained in:
Nick Sweeting 2023-10-27 23:05:30 -07:00 committed by GitHub
commit 720061185c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 96 additions and 24 deletions

View file

@ -90,10 +90,13 @@ CONFIG_SCHEMA: Dict[str, ConfigDefaultDict] = {
'MEDIA_TIMEOUT': {'type': int, 'default': 3600},
'OUTPUT_PERMISSIONS': {'type': str, 'default': '644'},
'RESTRICT_FILE_NAMES': {'type': str, 'default': 'windows'},
'URL_BLACKLIST': {'type': str, 'default': r'\.(css|js|otf|ttf|woff|woff2|gstatic\.com|googleapis\.com/css)(\?.*)?$'}, # to avoid downloading code assets as their own pages
'URL_DENYLIST': {'type': str, 'default': r'\.(css|js|otf|ttf|woff|woff2|gstatic\.com|googleapis\.com/css)(\?.*)?$', 'aliases': ('URL_BLACKLIST',)}, # to avoid downloading code assets as their own pages
'URL_ALLOWLIST': {'type': str, 'default': None, 'aliases': ('URL_WHITELIST',)},
'ADMIN_USERNAME': {'type': str, 'default': None},
'ADMIN_PASSWORD': {'type': str, 'default': None},
'URL_WHITELIST': {'type': str, 'default': None},
'ENFORCE_ATOMIC_WRITES': {'type': bool, 'default': True},
'TAG_SEPARATOR_PATTERN': {'type': str, 'default': r'[,]'},
},
@ -144,6 +147,8 @@ CONFIG_SCHEMA: Dict[str, ConfigDefaultDict] = {
'SAVE_GIT': {'type': bool, 'default': True, 'aliases': ('FETCH_GIT',)},
'SAVE_MEDIA': {'type': bool, 'default': True, 'aliases': ('FETCH_MEDIA',)},
'SAVE_ARCHIVE_DOT_ORG': {'type': bool, 'default': True, 'aliases': ('SUBMIT_ARCHIVE_DOT_ORG',)},
'SAVE_ALLOWLIST': {'type': dict, 'default': {},},
'SAVE_DENYLIST': {'type': dict, 'default': {},},
},
'ARCHIVE_METHOD_OPTIONS': {
@ -373,6 +378,8 @@ def get_commit_hash(config):
############################## Derived Config ##################################
ALLOWDENYLIST_REGEX_FLAGS: int = re.IGNORECASE | re.UNICODE | re.MULTILINE
DYNAMIC_CONFIG_SCHEMA: ConfigDefaultDict = {
'TERM_WIDTH': {'default': lambda c: lambda: shutil.get_terminal_size((100, 10)).columns},
'USER': {'default': lambda c: SYSTEM_USER},
@ -389,8 +396,8 @@ DYNAMIC_CONFIG_SCHEMA: ConfigDefaultDict = {
'CONFIG_FILE': {'default': lambda c: Path(c['CONFIG_FILE']).resolve() if c['CONFIG_FILE'] else c['OUTPUT_DIR'] / CONFIG_FILENAME},
'COOKIES_FILE': {'default': lambda c: c['COOKIES_FILE'] and Path(c['COOKIES_FILE']).resolve()},
'CHROME_USER_DATA_DIR': {'default': lambda c: find_chrome_data_dir() if c['CHROME_USER_DATA_DIR'] is None else (Path(c['CHROME_USER_DATA_DIR']).resolve() if c['CHROME_USER_DATA_DIR'] else None)}, # None means unset, so we autodetect it with find_chrome_Data_dir(), but emptystring '' means user manually set it to '', and we should store it as None
'URL_BLACKLIST_PTN': {'default': lambda c: c['URL_BLACKLIST'] and re.compile(c['URL_BLACKLIST'] or '', re.IGNORECASE | re.UNICODE | re.MULTILINE)},
'URL_WHITELIST_PTN': {'default': lambda c: c['URL_WHITELIST'] and re.compile(c['URL_WHITELIST'] or '', re.IGNORECASE | re.UNICODE | re.MULTILINE)},
'URL_DENYLIST_PTN': {'default': lambda c: c['URL_DENYLIST'] and re.compile(c['URL_DENYLIST'] or '', ALLOWDENYLIST_REGEX_FLAGS)},
'URL_ALLOWLIST_PTN': {'default': lambda c: c['URL_ALLOWLIST'] and re.compile(c['URL_ALLOWLIST'] or '', ALLOWDENYLIST_REGEX_FLAGS)},
'DIR_OUTPUT_PERMISSIONS': {'default': lambda c: c['OUTPUT_PERMISSIONS'].replace('6', '7').replace('4', '5')},
'ARCHIVEBOX_BINARY': {'default': lambda c: sys.argv[0] or bin_path('archivebox')},
@ -464,10 +471,11 @@ DYNAMIC_CONFIG_SCHEMA: ConfigDefaultDict = {
'EXTERNAL_LOCATIONS': {'default': lambda c: get_external_locations(c)},
'DATA_LOCATIONS': {'default': lambda c: get_data_locations(c)},
'CHROME_OPTIONS': {'default': lambda c: get_chrome_info(c)},
'SAVE_ALLOWLIST_PTN': {'default': lambda c: c['SAVE_ALLOWLIST'] and {re.compile(k, ALLOWDENYLIST_REGEX_FLAGS): v for k, v in c['SAVE_ALLOWLIST'].items()}},
'SAVE_DENYLIST_PTN': {'default': lambda c: c['SAVE_DENYLIST'] and {re.compile(k, ALLOWDENYLIST_REGEX_FLAGS): v for k, v in c['SAVE_DENYLIST'].items()}},
}
################################### Helpers ####################################

View file

@ -41,7 +41,7 @@ class ConfigDict(BaseConfig, total=False):
MEDIA_TIMEOUT: int
OUTPUT_PERMISSIONS: str
RESTRICT_FILE_NAMES: str
URL_BLACKLIST: str
URL_DENYLIST: str
SECRET_KEY: Optional[str]
BIND_ADDR: str

View file

@ -41,7 +41,7 @@ class AddLinkForm(forms.Form):
# label="Exclude patterns",
# min_length='1',
# required=False,
# initial=URL_BLACKLIST,
# initial=URL_DENYLIST,
# )
# timeout = forms.IntegerField(
# initial=TIMEOUT,

View file

@ -4,12 +4,16 @@ import os
import sys
from pathlib import Path
from typing import Optional, List, Iterable, Union
from typing import Callable, Optional, List, Iterable, Union
from datetime import datetime, timezone
from django.db.models import QuerySet
from ..config import (
SAVE_ALLOWLIST_PTN,
SAVE_DENYLIST_PTN,
)
from ..core.settings import ERROR_LOG
from ..index.schema import Link
from ..index.schema import ArchiveResult, Link
from ..index.sql import write_link_to_sql_index
from ..index import (
load_link_details,
@ -42,7 +46,11 @@ from .archive_org import should_save_archive_dot_org, save_archive_dot_org
from .headers import should_save_headers, save_headers
def get_default_archive_methods():
ShouldSaveFunction = Callable[[Link, Optional[Path], Optional[bool]], bool]
SaveFunction = Callable[[Link, Optional[Path], int], ArchiveResult]
ArchiveMethodEntry = tuple[str, ShouldSaveFunction, SaveFunction]
def get_default_archive_methods() -> List[ArchiveMethodEntry]:
return [
('favicon', should_save_favicon, save_favicon),
('headers', should_save_headers, save_headers),
@ -59,14 +67,31 @@ def get_default_archive_methods():
('archive_org', should_save_archive_dot_org, save_archive_dot_org),
]
@enforce_types
def get_archive_methods_for_link(link: Link) -> Iterable[ArchiveMethodEntry]:
DEFAULT_METHODS = get_default_archive_methods()
allowed_methods = {
m for pat, methods in
SAVE_ALLOWLIST_PTN.items()
if pat.search(link.url)
for m in methods
} or { m[0] for m in DEFAULT_METHODS }
denied_methods = {
m for pat, methods in
SAVE_DENYLIST_PTN.items()
if pat.search(link.url)
for m in methods
}
allowed_methods -= denied_methods
return (m for m in DEFAULT_METHODS if m[0] in allowed_methods)
ARCHIVE_METHODS_INDEXING_PRECEDENCE = [('readability', 1), ('singlefile', 2), ('dom', 3), ('wget', 4)]
@enforce_types
def ignore_methods(to_ignore: List[str]):
def ignore_methods(to_ignore: List[str]) -> Iterable[str]:
ARCHIVE_METHODS = get_default_archive_methods()
methods = filter(lambda x: x[0] not in to_ignore, ARCHIVE_METHODS)
methods = map(lambda x: x[0], methods)
return list(methods)
return [x[0] for x in ARCHIVE_METHODS if x[0] not in to_ignore]
@enforce_types
def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[str]]=None, out_dir: Optional[Path]=None) -> Link:
@ -79,11 +104,11 @@ def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[s
except Snapshot.DoesNotExist:
snapshot = write_link_to_sql_index(link)
ARCHIVE_METHODS = get_default_archive_methods()
active_methods = get_archive_methods_for_link(link)
if methods:
ARCHIVE_METHODS = [
method for method in ARCHIVE_METHODS
active_methods = [
method for method in active_methods
if method[0] in methods
]
@ -100,7 +125,7 @@ def archive_link(link: Link, overwrite: bool=False, methods: Optional[Iterable[s
stats = {'skipped': 0, 'succeeded': 0, 'failed': 0}
start_ts = datetime.now(timezone.utc)
for method_name, should_run, method_function in ARCHIVE_METHODS:
for method_name, should_run, method_function in active_methods:
try:
if method_name not in link.history:
link.history[method_name] = []

View file

@ -22,8 +22,8 @@ from ..config import (
JSON_INDEX_FILENAME,
OUTPUT_DIR,
TIMEOUT,
URL_BLACKLIST_PTN,
URL_WHITELIST_PTN,
URL_DENYLIST_PTN,
URL_ALLOWLIST_PTN,
stderr,
OUTPUT_PERMISSIONS
)
@ -142,9 +142,9 @@ def archivable_links(links: Iterable[Link]) -> Iterable[Link]:
continue
if scheme(link.url) not in ('http', 'https', 'ftp'):
continue
if URL_BLACKLIST_PTN and URL_BLACKLIST_PTN.search(link.url):
if URL_DENYLIST_PTN and URL_DENYLIST_PTN.search(link.url):
continue
if URL_WHITELIST_PTN and (not URL_WHITELIST_PTN.search(link.url)):
if URL_ALLOWLIST_PTN and (not URL_ALLOWLIST_PTN.search(link.url)):
continue
yield link

View file

@ -13,12 +13,51 @@ def test_ignore_methods():
Takes the passed method out of the default methods list and returns that value
"""
ignored = ignore_methods(['title'])
assert should_save_title not in ignored
assert "title" not in ignored
def test_save_allowdenylist_works(tmp_path, process, disable_extractors_dict):
allow_list = {
r'/static': ["headers", "singlefile"],
r'example\.com\.html$': ["headers"],
}
deny_list = {
"/static": ["singlefile"],
}
disable_extractors_dict.update({
"SAVE_HEADERS": "true",
"USE_SINGLEFILE": "true",
"SAVE_ALLOWLIST": pyjson.dumps(allow_list),
"SAVE_DENYLIST": pyjson.dumps(deny_list),
})
add_process = subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'],
capture_output=True, env=disable_extractors_dict)
archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
singlefile_file = archived_item_path / "singlefile.html"
assert not singlefile_file.exists()
headers_file = archived_item_path / "headers.json"
assert headers_file.exists()
def test_save_denylist_works(tmp_path, process, disable_extractors_dict):
deny_list = {
"/static": ["singlefile"],
}
disable_extractors_dict.update({
"SAVE_HEADERS": "true",
"USE_SINGLEFILE": "true",
"SAVE_DENYLIST": pyjson.dumps(deny_list),
})
add_process = subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'],
capture_output=True, env=disable_extractors_dict)
archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
singlefile_file = archived_item_path / "singlefile.html"
assert not singlefile_file.exists()
headers_file = archived_item_path / "headers.json"
assert headers_file.exists()
def test_singlefile_works(tmp_path, process, disable_extractors_dict):
disable_extractors_dict.update({"USE_SINGLEFILE": "true"})
add_process = subprocess.run(['archivebox', 'add', 'http://127.0.0.1:8080/static/example.com.html'],
capture_output=True, env=disable_extractors_dict)
capture_output=True, env=disable_extractors_dict)
archived_item_path = list(tmp_path.glob('archive/**/*'))[0]
output_file = archived_item_path / "singlefile.html"
assert output_file.exists()