1
0
Fork 0
mirror of synced 2024-06-28 11:00:35 +12:00
ArchiveBox/archivebox/util.py

266 lines
9 KiB
Python
Raw Normal View History

2017-10-23 22:58:41 +13:00
import re
2019-04-28 09:26:24 +12:00
import ssl
2019-05-01 15:13:04 +12:00
from typing import List, Optional, Any
2019-03-31 14:29:16 +13:00
from inspect import signature
from functools import wraps
from hashlib import sha256
from urllib.request import Request, urlopen
from urllib.parse import urlparse, quote, unquote
from html import escape, unescape
from datetime import datetime
2019-03-31 14:29:16 +13:00
from base32_crockford import encode as base32_encode # type: ignore
2019-05-01 15:13:04 +12:00
import json as pyjson
from .config import (
TIMEOUT,
2019-05-01 15:13:04 +12:00
STATICFILE_EXTENSIONS,
2019-03-21 18:28:12 +13:00
CHECK_SSL_VALIDITY,
WGET_USER_AGENT,
2019-03-23 16:00:53 +13:00
CHROME_OPTIONS,
)
### Parsing Helpers
# All of these are (str) -> str
# shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
scheme = lambda url: urlparse(url).scheme.lower()
without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
without_path = lambda url: urlparse(url)._replace(path='', fragment='', query='').geturl().strip('//')
path = lambda url: urlparse(url).path
basename = lambda url: urlparse(url).path.rsplit('/', 1)[-1]
domain = lambda url: urlparse(url).netloc
query = lambda url: urlparse(url).query
fragment = lambda url: urlparse(url).fragment
extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
2017-10-23 22:58:41 +13:00
without_www = lambda url: url.replace('://www.', '://', 1)
without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
2019-04-03 09:36:41 +13:00
hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
2019-05-01 15:13:04 +12:00
is_static_file = lambda url: extension(url).lower() in STATICFILE_EXTENSIONS # TODO: the proper way is with MIME type detection, not using extension
urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
urldecode = lambda s: s and unquote(s)
htmlencode = lambda s: s and escape(s, quote=True)
htmldecode = lambda s: s and unescape(s)
2019-04-03 09:36:41 +13:00
short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
ts_to_date = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
2017-10-23 22:58:41 +13:00
URL_REGEX = re.compile(
r'http[s]?://' # start matching from allowed schemes
r'(?:[a-zA-Z]|[0-9]' # followed by allowed alphanum characters
r'|[$-_@.&+]|[!*\(\),]' # or allowed symbols
r'|(?:%[0-9a-fA-F][0-9a-fA-F]))' # or allowed unicode bytes
r'[^\]\[\(\)<>\""\'\s]+', # stop parsing at these symbols
re.IGNORECASE,
)
2019-03-26 20:20:41 +13:00
def enforce_types(func):
"""
2019-03-27 16:25:07 +13:00
Enforce function arg and kwarg types at runtime using its python3 type hints
"""
2019-03-27 16:25:07 +13:00
# TODO: check return type as well
@wraps(func)
def typechecked_function(*args, **kwargs):
sig = signature(func)
def check_argument_type(arg_key, arg_val):
try:
annotation = sig.parameters[arg_key].annotation
except KeyError:
2019-03-31 14:29:16 +13:00
annotation = None
2019-03-31 14:29:16 +13:00
if annotation is not None and annotation.__class__ is type:
if not isinstance(arg_val, annotation):
raise TypeError(
'{}(..., {}: {}) got unexpected {} argument {}={}'.format(
func.__name__,
arg_key,
annotation.__name__,
type(arg_val).__name__,
arg_key,
str(arg_val)[:64],
)
)
# check args
for arg_val, arg_key in zip(args, sig.parameters):
check_argument_type(arg_key, arg_val)
# check kwargs
for arg_key, arg_val in kwargs.items():
check_argument_type(arg_key, arg_val)
return func(*args, **kwargs)
return typechecked_function
2017-10-23 22:58:41 +13:00
2019-05-01 15:13:04 +12:00
def docstring(text: Optional[str]):
"""attach the given docstring to the decorated function"""
def decorator(func):
if text:
func.__doc__ = text
return func
return decorator
2017-10-23 22:58:41 +13:00
2019-03-27 16:25:07 +13:00
@enforce_types
2019-03-26 20:20:41 +13:00
def str_between(string: str, start: str, end: str=None) -> str:
"""(<abc>12345</def>, <abc>, </def>) -> 12345"""
content = string.split(start, 1)[-1]
if end is not None:
content = content.rsplit(end, 1)[0]
return content
2019-03-27 16:25:07 +13:00
@enforce_types
def parse_date(date: Any) -> Optional[datetime]:
"""Parse unix timestamps, iso format, and human-readable strings"""
if date is None:
return None
2019-04-03 09:36:41 +13:00
if isinstance(date, datetime):
return date
if isinstance(date, (float, int)):
date = str(date)
if isinstance(date, str):
if date.replace('.', '').isdigit():
2019-03-31 10:51:23 +13:00
# this is a brittle attempt at unix timestamp parsing (which is
# notoriously hard to do). It may lead to dates being off by
# anything from hours to decades, depending on which app, OS,
# and sytem time configuration was used for the original timestamp
# more info: https://github.com/pirate/ArchiveBox/issues/119
# Note: always always always store the original timestamp string
# somewhere indepentendly of the parsed datetime, so that later
# bugs dont repeatedly misparse and rewrite increasingly worse dates.
# the correct date can always be re-derived from the timestamp str
timestamp = float(date)
EARLIEST_POSSIBLE = 473403600.0 # 1985
LATEST_POSSIBLE = 1735707600.0 # 2025
if EARLIEST_POSSIBLE < timestamp < LATEST_POSSIBLE:
# number is seconds
return datetime.fromtimestamp(timestamp)
2019-03-31 10:59:56 +13:00
elif EARLIEST_POSSIBLE * 1000 < timestamp < LATEST_POSSIBLE * 1000:
# number is milliseconds
return datetime.fromtimestamp(timestamp / 1000)
elif EARLIEST_POSSIBLE * 1000*1000 < timestamp < LATEST_POSSIBLE * 1000*1000:
# number is microseconds
return datetime.fromtimestamp(timestamp / (1000*1000))
else:
# continue to the end and raise a parsing failed error.
# we dont want to even attempt parsing timestamp strings that
# arent within these ranges
pass
if '-' in date:
# 2019-04-07T05:44:39.227520
try:
return datetime.fromisoformat(date)
except Exception:
pass
try:
return datetime.strptime(date, '%Y-%m-%d %H:%M')
except Exception:
pass
raise ValueError('Tried to parse invalid date! {}'.format(date))
2019-03-27 16:25:07 +13:00
@enforce_types
2019-03-26 20:20:41 +13:00
def download_url(url: str, timeout: int=TIMEOUT) -> str:
"""Download the contents of a remote url and return the text"""
req = Request(url, headers={'User-Agent': WGET_USER_AGENT})
if CHECK_SSL_VALIDITY:
resp = urlopen(req, timeout=timeout)
else:
insecure = ssl._create_unverified_context()
resp = urlopen(req, timeout=timeout, context=insecure)
2019-03-31 14:29:16 +13:00
encoding = resp.headers.get_content_charset() or 'utf-8' # type: ignore
return resp.read().decode(encoding)
2019-03-27 16:25:07 +13:00
@enforce_types
2019-03-26 20:20:41 +13:00
def chrome_args(**options) -> List[str]:
2019-03-21 18:28:12 +13:00
"""helper to build up a chrome shell command with arguments"""
2019-03-23 16:00:53 +13:00
options = {**CHROME_OPTIONS, **options}
cmd_args = [options['CHROME_BINARY']]
2019-03-21 18:28:12 +13:00
2019-03-23 16:00:53 +13:00
if options['CHROME_HEADLESS']:
2019-03-21 18:28:12 +13:00
cmd_args += ('--headless',)
if not options['CHROME_SANDBOX']:
2019-03-21 18:28:12 +13:00
# dont use GPU or sandbox when running inside docker container
cmd_args += ('--no-sandbox', '--disable-gpu')
if not options['CHECK_SSL_VALIDITY']:
2019-03-21 18:28:12 +13:00
cmd_args += ('--disable-web-security', '--ignore-certificate-errors')
if options['CHROME_USER_AGENT']:
cmd_args += ('--user-agent={}'.format(options['CHROME_USER_AGENT']),)
2019-03-21 18:28:12 +13:00
if options['RESOLUTION']:
cmd_args += ('--window-size={}'.format(options['RESOLUTION']),)
2019-03-21 18:28:12 +13:00
if options['TIMEOUT']:
cmd_args += ('--timeout={}'.format((options['TIMEOUT']) * 1000),)
2019-03-21 18:28:12 +13:00
if options['CHROME_USER_DATA_DIR']:
cmd_args.append('--user-data-dir={}'.format(options['CHROME_USER_DATA_DIR']))
2019-03-21 18:28:12 +13:00
return cmd_args
2019-05-01 15:13:04 +12:00
class ExtendedEncoder(pyjson.JSONEncoder):
"""
Extended json serializer that supports serializing several model
fields and objects
"""
def default(self, obj):
cls_name = obj.__class__.__name__
if hasattr(obj, '_asdict'):
return obj._asdict()
elif isinstance(obj, bytes):
return obj.decode()
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, Exception):
return '{}: {}'.format(obj.__class__.__name__, obj)
elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
return tuple(obj)
2019-05-01 15:13:04 +12:00
return pyjson.JSONEncoder.default(self, obj)
2019-04-28 09:26:24 +12:00