1
0
Fork 0
mirror of synced 2024-06-14 08:25:21 +12:00

0 mypy errors

This commit is contained in:
Nick Sweeting 2019-03-30 21:29:16 -04:00
parent f4e018ba0c
commit 6a8f6f52af
5 changed files with 38 additions and 37 deletions

View file

@ -3,7 +3,7 @@ import json
from datetime import datetime
from string import Template
from typing import List, Tuple, Iterator, Optional
from typing import List, Tuple, Iterator, Optional, Mapping
from .schema import Link, ArchiveResult
from .config import (
@ -132,8 +132,6 @@ def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
def write_html_links_index(links: List[Link], out_dir: str=OUTPUT_DIR, finished: bool=False) -> None:
"""write the html link index to a given path"""
path = os.path.join(out_dir, 'index.html')
copy_and_overwrite(
os.path.join(TEMPLATES_DIR, 'static'),
os.path.join(out_dir, 'static'),
@ -147,8 +145,9 @@ def write_html_links_index(links: List[Link], out_dir: str=OUTPUT_DIR, finished:
with open(os.path.join(TEMPLATES_DIR, 'index_row.html'), 'r', encoding='utf-8') as f:
link_row_html = f.read()
link_rows = '\n'.join(
Template(link_row_html).substitute(**{
link_rows = []
for link in links:
template_row_vars: Mapping[str, str] = {
**derived_link_info(link),
'title': (
link.title
@ -162,22 +161,22 @@ def write_html_links_index(links: List[Link], out_dir: str=OUTPUT_DIR, finished:
'archive_url': urlencode(
wget_output_path(link) or 'index.html'
),
})
for link in links
)
}
link_rows.append(Template(link_row_html).substitute(**template_row_vars))
template_vars = {
'num_links': len(links),
template_vars: Mapping[str, str] = {
'num_links': str(len(links)),
'date_updated': datetime.now().strftime('%Y-%m-%d'),
'time_updated': datetime.now().strftime('%Y-%m-%d %H:%M'),
'footer_info': FOOTER_INFO,
'version': VERSION,
'git_sha': GIT_SHA,
'rows': link_rows,
'rows': '\n'.join(link_rows),
'status': 'finished' if finished else 'running',
}
template_html = Template(index_html).substitute(**template_vars)
atomic_write(Template(index_html).substitute(**template_vars), path)
atomic_write(template_html, os.path.join(out_dir, 'index.html'))

View file

@ -111,6 +111,7 @@ def log_archiving_paused(num_links: int, idx: int, timestamp: str):
def log_archiving_finished(num_links: int):
end_ts = datetime.now()
_LAST_RUN_STATS.archiving_end_ts = end_ts
assert _LAST_RUN_STATS.archiving_start_ts is not None
seconds = end_ts.timestamp() - _LAST_RUN_STATS.archiving_start_ts.timestamp()
if seconds > 60:
duration = '{0:.2f} min'.format(seconds / 60, 2)
@ -194,7 +195,7 @@ def log_archive_method_finished(result: ArchiveResult):
),
*hints,
'{}Run to see full output:{}'.format(ANSI['lightred'], ANSI['reset']),
*((' cd {};'.format(result.pwd),) if result.pwd else ()),
*([' cd {};'.format(result.pwd)] if result.pwd else []),
' {}'.format(quoted_cmd),
]
print('\n'.join(

View file

@ -266,10 +266,12 @@ def parse_pinboard_rss_export(rss_file: IO[str]) -> Iterable[Link]:
root = etree.parse(rss_file).getroot()
items = root.findall("{http://purl.org/rss/1.0/}item")
for item in items:
url = item.find("{http://purl.org/rss/1.0/}link").text
tags = item.find("{http://purl.org/dc/elements/1.1/}subject").text if item.find("{http://purl.org/dc/elements/1.1/}subject") else None
title = item.find("{http://purl.org/rss/1.0/}title").text.strip() if item.find("{http://purl.org/rss/1.0/}title").text.strip() else None
ts_str = item.find("{http://purl.org/dc/elements/1.1/}date").text if item.find("{http://purl.org/dc/elements/1.1/}date").text else None
find = lambda p: item.find(p).text.strip() if item.find(p) else None # type: ignore
url = find("{http://purl.org/rss/1.0/}link")
tags = find("{http://purl.org/dc/elements/1.1/}subject")
title = find("{http://purl.org/rss/1.0/}title")
ts_str = find("{http://purl.org/dc/elements/1.1/}date")
# Pinboard includes a colon in its date stamp timezone offsets, which
# Python can't parse. Remove it:
@ -296,12 +298,12 @@ def parse_medium_rss_export(rss_file: IO[str]) -> Iterable[Link]:
rss_file.seek(0)
root = etree.parse(rss_file).getroot()
items = root.find("channel").findall("item")
items = root.find("channel").findall("item") # type: ignore
for item in items:
url = item.find("link").text
title = item.find("title").text.strip()
ts_str = item.find("pubDate").text
time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %Z")
url = item.find("link").text # type: ignore
title = item.find("title").text.strip() # type: ignore
ts_str = item.find("pubDate").text # type: ignore
time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %Z") # type: ignore
yield Link(
url=htmldecode(url),
@ -319,7 +321,7 @@ def parse_plain_text_export(text_file: IO[str]) -> Iterable[Link]:
text_file.seek(0)
for line in text_file.readlines():
urls = re.findall(URL_REGEX, line) if line.strip() else ()
for url in urls:
for url in urls: # type: ignore
yield Link(
url=htmldecode(url),
timestamp=str(datetime.now().timestamp()),

View file

@ -6,9 +6,8 @@ from os.path import exists, join
from shutil import rmtree
from typing import List
from archive import parse_json_link_index
from config import ARCHIVE_DIR, OUTPUT_DIR
from index import write_html_links_index, write_json_links_index
from .config import ARCHIVE_DIR, OUTPUT_DIR
from .index import parse_json_links_index, write_html_links_index, write_json_links_index
def cleanup_index(regexes: List[str], proceed: bool, delete: bool) -> None:
@ -16,18 +15,18 @@ def cleanup_index(regexes: List[str], proceed: bool, delete: bool) -> None:
exit('index.json is missing; nothing to do')
compiled = [re.compile(r) for r in regexes]
links = parse_json_link_index(OUTPUT_DIR)['links']
links = parse_json_links_index(OUTPUT_DIR)
filtered = []
remaining = []
for l in links:
url = l['url']
for link in links:
url = link.url
for r in compiled:
if r.search(url):
filtered.append((l, r))
filtered.append((link, r))
break
else:
remaining.append(l)
remaining.append(link)
if not filtered:
exit('Search did not match any entries.')
@ -35,7 +34,7 @@ def cleanup_index(regexes: List[str], proceed: bool, delete: bool) -> None:
print('Filtered out {}/{} urls:'.format(len(filtered), len(links)))
for link, regex in filtered:
url = link['url']
url = link.url
print(' {url} via {regex}'.format(url=url, regex=regex.pattern))
if not proceed:

View file

@ -7,7 +7,7 @@ import shutil
from json import JSONEncoder
from typing import List, Optional, Any, Union
from inspect import signature, _empty
from inspect import signature
from functools import wraps
from hashlib import sha256
from urllib.request import Request, urlopen
@ -24,7 +24,7 @@ from subprocess import (
CalledProcessError,
)
from base32_crockford import encode as base32_encode
from base32_crockford import encode as base32_encode # type: ignore
from .schema import Link
from .config import (
@ -127,9 +127,9 @@ def enforce_types(func):
try:
annotation = sig.parameters[arg_key].annotation
except KeyError:
annotation = _empty
annotation = None
if annotation is not _empty and annotation.__class__ is type:
if annotation is not None and annotation.__class__ is type:
if not isinstance(arg_val, annotation):
raise TypeError(
'{}(..., {}: {}) got unexpected {} argument {}={}'.format(
@ -605,7 +605,7 @@ def download_url(url: str, timeout: int=TIMEOUT) -> str:
insecure = ssl._create_unverified_context()
resp = urlopen(req, timeout=timeout, context=insecure)
encoding = resp.headers.get_content_charset() or 'utf-8'
encoding = resp.headers.get_content_charset() or 'utf-8' # type: ignore
return resp.read().decode(encoding)