1
0
Fork 0
mirror of synced 2024-06-24 17:10:21 +12:00

refactor error hint printing to be DRYer

This commit is contained in:
Nick Sweeting 2019-03-08 16:25:15 -05:00
parent bd2cef7fdd
commit b2ccb7dbcb
3 changed files with 121 additions and 122 deletions

View file

@ -5,13 +5,11 @@ from collections import defaultdict
from datetime import datetime
from index import (
wget_output_path,
parse_json_link_index,
write_link_index,
patch_index_title_hack,
)
from config import (
OUTPUT_DIR,
CURL_BINARY,
GIT_BINARY,
WGET_BINARY,
@ -49,7 +47,9 @@ from util import (
progress,
chmod_file,
pretty_path,
print_error_hints,
check_link_structure,
wget_output_path,
run, PIPE, DEVNULL,
)
@ -60,50 +60,46 @@ _RESULTS_TOTALS = { # globals are bad, mmkay
'failed': 0,
}
def load_link_index(link_dir, link):
"""check for an existing link archive in the given directory,
and load+merge it into the given link dict
"""
is_new = not os.path.exists(link_dir)
if is_new:
os.makedirs(link_dir)
else:
link = {
**parse_json_link_index(link_dir),
**link,
}
check_link_structure(link)
print_link_status_line(link_dir, link, is_new)
return link
def archive_link(link_dir, link, overwrite=True):
"""download the DOM, PDF, and a screenshot into a folder named after the link's timestamp"""
check_link_structure(link)
ARCHIVE_METHODS = (
(FETCH_TITLE, fetch_title),
(FETCH_FAVICON, fetch_favicon),
(FETCH_WGET, fetch_wget),
(FETCH_PDF, fetch_pdf),
(FETCH_SCREENSHOT, fetch_screenshot),
(FETCH_DOM, fetch_dom),
(FETCH_GIT, fetch_git),
(FETCH_MEDIA, fetch_media),
(SUBMIT_ARCHIVE_DOT_ORG, archive_dot_org),
)
active_methods = [method for toggle, method in ARCHIVE_METHODS]
try:
update_existing = os.path.exists(link_dir)
if update_existing:
link = {
**parse_json_link_index(link_dir),
**link,
}
else:
os.makedirs(link_dir)
print_link_status_line(link_dir, link, update_existing)
link = load_link_index(link_dir, link)
if FETCH_FAVICON:
link = fetch_favicon(link_dir, link, overwrite=overwrite)
if FETCH_TITLE:
link = fetch_title(link_dir, link, overwrite=overwrite)
if FETCH_WGET:
link = fetch_wget(link_dir, link, overwrite=overwrite)
if FETCH_PDF:
link = fetch_pdf(link_dir, link, overwrite=overwrite)
if FETCH_SCREENSHOT:
link = fetch_screenshot(link_dir, link, overwrite=overwrite)
if FETCH_DOM:
link = fetch_dom(link_dir, link, overwrite=overwrite)
if SUBMIT_ARCHIVE_DOT_ORG:
link = archive_dot_org(link_dir, link, overwrite=overwrite)
if FETCH_GIT:
link = fetch_git(link_dir, link, overwrite=overwrite)
if FETCH_MEDIA:
link = fetch_media(link_dir, link, overwrite=overwrite)
for archive_method in active_methods:
archive_method(link_dir, link, overwrite=overwrite)
write_link_index(link_dir, link)
@ -112,16 +108,16 @@ def archive_link(link_dir, link, overwrite=True):
return link
def print_link_status_line(link_dir, link, update_existing):
def print_link_status_line(link_dir, link, is_new):
print('[{symbol_color}{symbol}{reset}] [{now}] "{title}"\n {blue}{url}{reset}'.format(
symbol='*' if update_existing else '+',
symbol_color=ANSI['black' if update_existing else 'green'],
symbol='+' if is_new else '*',
symbol_color=ANSI['green' if is_new else 'black'],
now=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
**{**link, 'title': link['title'] or link['url']},
**ANSI,
))
print(' > {}{}'.format(pretty_path(link_dir), '' if update_existing else ' (new)'))
print(' > {}{}'.format(pretty_path(link_dir), ' (new)' if is_new else ''))
# if link['type']:
# print(' i {}'.format(link['type']))
@ -218,7 +214,7 @@ def fetch_wget(link_dir, link, requisites=FETCH_WGET_REQUISITES, warc=FETCH_WARC
]
end = progress(timeout, prefix=' ')
try:
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout) # index.html
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout)
end()
output = wget_output_path(link, look_in=domain_dir)
@ -244,19 +240,9 @@ def fetch_wget(link_dir, link, requisites=FETCH_WGET_REQUISITES, warc=FETCH_WARC
raise Exception('Got an error from the server')
except Exception as e:
end()
# to let the user copy-paste the command and run it safely we have
# to quote some of the arguments that could have spaces in them
quoted_cmd = ' '.join(CMD)
quoted_cmd = quoted_cmd.replace(WGET_USER_AGENT, '"{}"'.format(WGET_USER_AGENT))
if COOKIES_FILE:
quoted_cmd = quoted_cmd.replace(COOKIES_FILE, '"{}"'.format(COOKIES_FILE))
print(' {}Some resources were skipped: {}{}'.format(ANSI['lightyellow'], e, ANSI['reset']))
print(' Run to see full output:')
print(' cd {};'.format(link_dir))
print(' {}'.format(quoted_cmd))
output = e
print_error_hints(cmd=CMD, pwd=link_dir, err=e)
return {
'cmd': CMD,
'output': output,
@ -283,7 +269,7 @@ def fetch_pdf(link_dir, link, timeout=TIMEOUT, user_data_dir=CHROME_USER_DATA_DI
]
end = progress(timeout, prefix=' ')
try:
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout) # output.pdf
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout)
end()
if result.returncode:
print(' ', (result.stderr or result.stdout).decode())
@ -292,11 +278,8 @@ def fetch_pdf(link_dir, link, timeout=TIMEOUT, user_data_dir=CHROME_USER_DATA_DI
output = 'output.pdf'
except Exception as e:
end()
print(' {}Failed: {} {}{}'.format(ANSI['red'], e.__class__.__name__, e, ANSI['reset']))
print(' Run to see full output:')
print(' cd {};'.format(link_dir))
print(' {}'.format(' '.join(CMD)))
output = e
print_error_hints(cmd=CMD, pwd=link_dir, err=e)
return {
'cmd': CMD,
@ -325,7 +308,7 @@ def fetch_screenshot(link_dir, link, timeout=TIMEOUT, user_data_dir=CHROME_USER_
]
end = progress(timeout, prefix=' ')
try:
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout) # sreenshot.png
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout)
end()
if result.returncode:
print(' ', (result.stderr or result.stdout).decode())
@ -334,11 +317,8 @@ def fetch_screenshot(link_dir, link, timeout=TIMEOUT, user_data_dir=CHROME_USER_
output = 'screenshot.png'
except Exception as e:
end()
print(' {}Failed: {} {}{}'.format(ANSI['red'], e.__class__.__name__, e, ANSI['reset']))
print(' Run to see full output:')
print(' cd {};'.format(link_dir))
print(' {}'.format(' '.join(CMD)))
output = e
print_error_hints(cmd=CMD, pwd=link_dir, err=e)
return {
'cmd': CMD,
@ -366,7 +346,7 @@ def fetch_dom(link_dir, link, timeout=TIMEOUT, user_data_dir=CHROME_USER_DATA_DI
end = progress(timeout, prefix=' ')
try:
with open(output_path, 'w+') as f:
result = run(CMD, stdout=f, stderr=PIPE, cwd=link_dir, timeout=timeout) # output.html
result = run(CMD, stdout=f, stderr=PIPE, cwd=link_dir, timeout=timeout)
end()
if result.returncode:
print(' ', (result.stderr).decode())
@ -375,63 +355,63 @@ def fetch_dom(link_dir, link, timeout=TIMEOUT, user_data_dir=CHROME_USER_DATA_DI
output = 'output.html'
except Exception as e:
end()
print(' {}Failed: {} {}{}'.format(ANSI['red'], e.__class__.__name__, e, ANSI['reset']))
print(' Run to see full output:')
print(' cd {};'.format(link_dir))
print(' {}'.format(' '.join(CMD)))
output = e
print_error_hints(cmd=CMD, pwd=link_dir, err=e)
return {
'cmd': CMD,
'output': output,
}
def parse_archive_dot_org_response(response):
# Parse archive.org response headers
headers = defaultdict(list)
# lowercase all the header names and store in dict
for header in response.splitlines():
if b':' not in header or not header.strip():
continue
name, val = header.decode().split(':', 1)
headers[name.lower().strip()].append(val.strip())
# Get successful archive url in "content-location" header or any errors
content_location = headers['content-location']
errors = headers['x-archive-wayback-runtime-error']
return content_location, errors
@attach_result_to_link('archive_org')
def archive_dot_org(link_dir, link, timeout=TIMEOUT):
"""submit site to archive.org for archiving via their service, save returned archive url"""
path = os.path.join(link_dir, 'archive.org.txt')
output = 'archive.org.txt'
archive_org_url = None
path = os.path.join(link_dir, output)
if os.path.exists(path):
archive_org_url = open(path, 'r').read().strip()
return {'output': archive_org_url, 'status': 'skipped'}
submit_url = 'https://web.archive.org/save/{}'.format(link['url'])
success = False
CMD = [
CURL_BINARY,
'--location',
'--head',
'--user-agent', 'ArchiveBox/{} (+https://github.com/pirate/ArchiveBox/)'.format(GIT_SHA),
'--user-agent', 'ArchiveBox/{} (+https://github.com/pirate/ArchiveBox/)'.format(GIT_SHA), # be nice to the Archive.org people and show them where all this ArchiveBox traffic is coming from
'--max-time', str(timeout),
'--get',
*(() if CHECK_SSL_VALIDITY else ('--insecure',)),
submit_url,
]
end = progress(timeout, prefix=' ')
try:
result = run(CMD, stdout=PIPE, stderr=DEVNULL, cwd=link_dir, timeout=timeout) # archive.org.txt
result = run(CMD, stdout=PIPE, stderr=DEVNULL, cwd=link_dir, timeout=timeout)
end()
# Parse archive.org response headers
headers = defaultdict(list)
# lowercase all the header names and store in dict
for header in result.stdout.splitlines():
if b':' not in header or not header.strip():
continue
name, val = header.decode().split(':', 1)
headers[name.lower().strip()].append(val.strip())
# Get successful archive url in "content-location" header or any errors
content_location = headers['content-location']
errors = headers['x-archive-wayback-runtime-error']
content_location, errors = parse_archive_dot_org_response(result.stdout)
if content_location:
saved_url = 'https://web.archive.org{}'.format(content_location[0])
success = True
archive_org_url = 'https://web.archive.org{}'.format(content_location[0])
elif len(errors) == 1 and 'RobotAccessControlException' in errors[0]:
output = submit_url
archive_org_url = None
# raise Exception('Archive.org denied by {}/robots.txt'.format(domain(link['url'])))
elif errors:
raise Exception(', '.join(errors))
@ -439,16 +419,19 @@ def archive_dot_org(link_dir, link, timeout=TIMEOUT):
raise Exception('Failed to find "content-location" URL header in Archive.org response.')
except Exception as e:
end()
print(' {}Failed: {} {}{}'.format(ANSI['red'], e.__class__.__name__, e, ANSI['reset']))
print(' Run to see full output:')
print(' {}'.format(' '.join(CMD)))
output = e
print_error_hints(cmd=CMD, pwd=link_dir, err=e)
if success:
with open(os.path.join(link_dir, 'archive.org.txt'), 'w', encoding='utf-8') as f:
f.write(saved_url)
if not isinstance(output, Exception):
# instead of writing None when archive.org rejects the url write the
# url to resubmit it to archive.org. This is so when the user visits
# the URL in person, it will attempt to re-archive it, and it'll show the
# nicer error message explaining why the url was rejected if it fails.
archive_org_url = archive_org_url or submit_url
with open(os.path.join(link_dir, output), 'w', encoding='utf-8') as f:
f.write(archive_org_url)
chmod_file('archive.org.txt', cwd=link_dir)
output = saved_url
output = archive_org_url
return {
'cmd': CMD,
@ -459,30 +442,29 @@ def archive_dot_org(link_dir, link, timeout=TIMEOUT):
def fetch_favicon(link_dir, link, timeout=TIMEOUT):
"""download site favicon from google's favicon api"""
if os.path.exists(os.path.join(link_dir, 'favicon.ico')):
return {'output': 'favicon.ico', 'status': 'skipped'}
output = 'favicon.ico'
if os.path.exists(os.path.join(link_dir, output)):
return {'output': output, 'status': 'skipped'}
CMD = [
CURL_BINARY,
'--max-time', str(timeout),
'--location',
'--output', output,
*(() if CHECK_SSL_VALIDITY else ('--insecure',)),
'https://www.google.com/s2/favicons?domain={}'.format(domain(link['url'])),
]
fout = open('{}/favicon.ico'.format(link_dir), 'w')
end = progress(timeout, prefix=' ')
try:
run(CMD, stdout=fout, stderr=DEVNULL, cwd=link_dir, timeout=timeout) # favicon.ico
fout.close()
run(CMD, stdout=PIPE, stderr=PIPE, cwd=link_dir, timeout=timeout)
end()
chmod_file('favicon.ico', cwd=link_dir)
output = 'favicon.ico'
except Exception as e:
fout.close()
end()
print(' {}Failed: {} {}{}'.format(ANSI['red'], e.__class__.__name__, e, ANSI['reset']))
print(' Run to see full output:')
print(' {}'.format(' '.join(CMD)))
output = e
print_error_hints(cmd=CMD, pwd=link_dir, err=e)
return {
'cmd': CMD,
@ -556,7 +538,7 @@ def fetch_media(link_dir, link, timeout=MEDIA_TIMEOUT, overwrite=False):
end = progress(timeout, prefix=' ')
try:
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=output, timeout=timeout + 1) # audio/audio.mp3
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=output, timeout=timeout + 1)
chmod_file('media', cwd=link_dir)
output = 'media'
end()
@ -574,11 +556,8 @@ def fetch_media(link_dir, link, timeout=MEDIA_TIMEOUT, overwrite=False):
raise Exception('Failed to download media')
except Exception as e:
end()
print(' {}Failed: {} {}{}'.format(ANSI['red'], e.__class__.__name__, e, ANSI['reset']))
print(' Run to see full output:')
print(' cd {};'.format(link_dir))
print(' {}'.format(' '.join(CMD)))
output = e
print_error_hints(cmd=CMD, pwd=link_dir, err=e)
return {
'cmd': CMD,
@ -615,7 +594,7 @@ def fetch_git(link_dir, link, timeout=TIMEOUT):
]
end = progress(timeout, prefix=' ')
try:
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=git_dir, timeout=timeout + 1) # git/<reponame>
result = run(CMD, stdout=PIPE, stderr=PIPE, cwd=git_dir, timeout=timeout + 1)
end()
if result.returncode == 128:
@ -626,11 +605,8 @@ def fetch_git(link_dir, link, timeout=TIMEOUT):
raise Exception('Failed git download')
except Exception as e:
end()
print(' {}Failed: {} {}{}'.format(ANSI['red'], e.__class__.__name__, e, ANSI['reset']))
print(' Run to see full output:')
print(' cd {};'.format(link_dir))
print(' {}'.format(' '.join(CMD)))
output = e
print_error_hints(cmd=CMD, pwd=link_dir, err=e)
return {
'cmd': CMD,

View file

@ -8,14 +8,12 @@ from distutils.dir_util import copy_tree
from config import (
OUTPUT_DIR,
TEMPLATES_DIR,
OUTPUT_PERMISSIONS,
ANSI,
GIT_SHA,
FOOTER_INFO,
)
from util import (
chmod_file,
wget_output_path,
derived_link_info,
pretty_path,
check_link_structure,

View file

@ -159,6 +159,31 @@ def check_url_parsing():
assert len(re.findall(URL_REGEX, test_urls)) == 12
def print_error_hints(cmd, pwd, err=None, hints=None, prefix=' '):
"""quote the argument with whitespace in a command so the user can
copy-paste the outputted string directly to run the cmd
"""
quoted_cmd = ' '.join(
'"{}"'.format(arg) if ' ' in arg else arg
for arg in cmd
)
output_lines = [
'{}Failed: {} {}{}'.format(ANSI['red'], err.__class__.__name__, err, ANSI['reset']),
' {}{}{}'.format(ANSI['lightyellow'], hints, ANSI['reset']) if hints else None,
'Run to see full output:'
' cd {};'.format(pwd),
' {}'.format(quoted_cmd),
]
return '\n'.join(
'{}{}'.format(prefix, line)
for line in output_lines
if line
)
def chmod_file(path, cwd='.', permissions=OUTPUT_PERMISSIONS, timeout=30):
"""chmod -R <permissions> <cwd>/<path>"""