1
0
Fork 0
mirror of synced 2024-07-01 12:30:24 +12:00
ArchiveBox/archivebox/index/__init__.py

583 lines
20 KiB
Python
Raw Normal View History

2019-04-28 09:26:24 +12:00
__package__ = 'archivebox.index'
import os
2019-04-28 09:26:24 +12:00
import shutil
import json as pyjson
from pathlib import Path
2019-04-28 09:26:24 +12:00
from itertools import chain
from typing import List, Tuple, Dict, Optional, Iterable
from collections import OrderedDict
from contextlib import contextmanager
2020-08-17 19:12:17 +12:00
from urllib.parse import urlparse
from django.db.models import QuerySet, Q, Model
2019-03-26 20:20:41 +13:00
2019-04-28 09:26:24 +12:00
from ..util import (
scheme,
enforce_types,
ExtendedEncoder,
)
from ..config import (
ARCHIVE_DIR_NAME,
SQL_INDEX_FILENAME,
JSON_INDEX_FILENAME,
OUTPUT_DIR,
TIMEOUT,
URL_BLACKLIST_PTN,
stderr,
OUTPUT_PERMISSIONS
2017-07-06 09:33:51 +12:00
)
from ..logging_util import (
2019-05-01 15:13:04 +12:00
TimedProgress,
2019-04-28 09:26:24 +12:00
log_indexing_process_started,
log_indexing_process_finished,
log_indexing_started,
log_indexing_finished,
log_parsing_finished,
2020-07-14 03:26:30 +12:00
log_deduping_finished,
2019-04-28 09:26:24 +12:00
)
from .schema import Link, ArchiveResult
from .html import (
write_html_snapshot_details,
2019-04-28 09:26:24 +12:00
)
from .json import (
2020-12-31 06:53:20 +13:00
load_json_snapshot,
write_json_snapshot_details,
)
2019-04-28 09:26:24 +12:00
from .sql import (
write_sql_main_index,
write_sql_snapshot_details,
)
from ..search import search_backend_enabled, query_search_index
### Link filtering and checking
@enforce_types
def merge_snapshots(a: Model, b: Model) -> Model:
"""deterministially merge two snapshots, favoring longer field values over shorter,
and "cleaner" values over worse ones.
TODO: Check if this makes sense with the new setup
"""
return a
assert a.base_url == b.base_url, f'Cannot merge two links with different URLs ({a.base_url} != {b.base_url})'
# longest url wins (because a fuzzy url will always be shorter)
url = a.url if len(a.url) > len(b.url) else b.url
# best title based on length and quality
possible_titles = [
title
for title in (a.title, b.title)
if title and title.strip() and '://' not in title
]
title = None
if len(possible_titles) == 2:
title = max(possible_titles, key=lambda t: len(t))
elif len(possible_titles) == 1:
title = possible_titles[0]
# earliest valid timestamp
timestamp = (
a.timestamp
if float(a.timestamp or 0) < float(b.timestamp or 0) else
b.timestamp
)
# all unique, truthy tags
tags_set = (
set(tag.strip() for tag in (a.tags or '').split(','))
| set(tag.strip() for tag in (b.tags or '').split(','))
)
tags = ','.join(tags_set) or None
# all unique source entries
sources = list(set(a.sources + b.sources))
# all unique history entries for the combined archive methods
all_methods = set(list(a.history.keys()) + list(a.history.keys()))
history = {
method: (a.history.get(method) or []) + (b.history.get(method) or [])
for method in all_methods
}
for method in all_methods:
deduped_jsons = {
2019-04-28 09:26:24 +12:00
pyjson.dumps(result, sort_keys=True, cls=ExtendedEncoder)
for result in history[method]
}
history[method] = list(reversed(sorted(
2019-04-28 09:26:24 +12:00
(ArchiveResult.from_json(pyjson.loads(result)) for result in deduped_jsons),
key=lambda result: result.start_ts,
)))
return Snapshot(
url=url,
timestamp=timestamp,
title=title,
tags=tags,
#sources=sources,
#history=history,
)
@enforce_types
def validate_snapshots(snapshots: List[Model]) -> List[Model]:
2019-05-01 18:28:26 +12:00
timer = TimedProgress(TIMEOUT * 4)
try:
snapshots = archivable_snapshots(snapshots) # remove chrome://, about:, mailto: etc.
snapshots = sorted_snapshots(snapshots) # deterministically sort the links based on timestamp, url
snapshots = fix_duplicate_snapshots(snapshots) # merge/dedupe duplicate timestamps & urls
2019-05-01 18:28:26 +12:00
finally:
timer.end()
return list(snapshots)
@enforce_types
def archivable_snapshots(snapshots: Iterable[Model]) -> Iterable[Model]:
"""remove chrome://, about:// or other schemed links that cant be archived"""
for snapshot in snapshots:
2020-08-17 19:12:17 +12:00
try:
urlparse(snapshot.url)
2020-08-17 19:12:17 +12:00
except ValueError:
continue
if scheme(snapshot.url) not in ('http', 'https', 'ftp'):
continue
if URL_BLACKLIST_PTN and URL_BLACKLIST_PTN.search(snapshot.url):
continue
yield snapshot
@enforce_types
def fix_duplicate_snapshots(sorted_snapshots: Iterable[Model]) -> Iterable[Model]:
"""
ensures that all non-duplicate links have monotonically increasing timestamps
TODO: Review how to do this with the new snapshots refactor
"""
return sorted_snapshots
unique_urls: OrderedDict[str, Link] = OrderedDict()
for snapshot in sorted_snapshots:
if snapshot.url in unique_urls:
# merge with any other links that share the same url
link = merge_links(unique_urls[link.url], link)
unique_urls[link.url] = link
return unique_urls.values()
@enforce_types
def sorted_snapshots(snapshots: Iterable[Model]) -> Iterable[Model]:
sort_func = lambda snapshot: (snapshot.timestamp.split('.', 1)[0], snapshot.url)
return sorted(snapshots, key=sort_func, reverse=True)
@enforce_types
def links_after_timestamp(links: Iterable[Link], resume: Optional[float]=None) -> Iterable[Link]:
if not resume:
yield from links
return
for link in links:
try:
if float(link.timestamp) <= resume:
yield link
except (ValueError, TypeError):
print('Resume value and all timestamp values must be valid numbers.')
@enforce_types
def lowest_uniq_timestamp(used_timestamps: OrderedDict, timestamp: str) -> str:
"""resolve duplicate timestamps by appending a decimal 1234, 1234 -> 1234.1, 1234.2"""
timestamp = timestamp.split('.')[0]
nonce = 0
# first try 152323423 before 152323423.0
if timestamp not in used_timestamps:
return timestamp
new_timestamp = '{}.{}'.format(timestamp, nonce)
while new_timestamp in used_timestamps:
nonce += 1
new_timestamp = '{}.{}'.format(timestamp, nonce)
return new_timestamp
### Main Links Index
@contextmanager
@enforce_types
def timed_index_update(out_path: Path):
log_indexing_started(out_path)
timer = TimedProgress(TIMEOUT * 2, prefix=' ')
try:
yield
finally:
timer.end()
assert out_path.exists(), f'Failed to write index file: {out_path}'
log_indexing_finished(out_path)
@enforce_types
def write_main_index(snapshots: List[Model], out_dir: Path=OUTPUT_DIR) -> None:
"""Writes links to sqlite3 file for a given list of links"""
log_indexing_process_started(len(snapshots))
try:
with timed_index_update(out_dir / SQL_INDEX_FILENAME):
write_sql_main_index(snapshots, out_dir=out_dir)
os.chmod(out_dir / SQL_INDEX_FILENAME, int(OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes
except (KeyboardInterrupt, SystemExit):
stderr('[!] Warning: Still writing index to disk...', color='lightyellow')
stderr(' Run archivebox init to fix any inconsistencies from an ungraceful exit.')
with timed_index_update(out_dir / SQL_INDEX_FILENAME):
write_sql_main_index(links, out_dir=out_dir)
os.chmod(out_dir / SQL_INDEX_FILENAME, int(OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes
raise SystemExit(0)
log_indexing_process_finished()
@enforce_types
2020-09-04 10:26:49 +12:00
def load_main_index(out_dir: Path=OUTPUT_DIR, warn: bool=True) -> List[Link]:
"""
Returns all of the snapshots currently in index
"""
from core.models import Snapshot
try:
return Snapshot.objects.all()
except (KeyboardInterrupt, SystemExit):
raise SystemExit(0)
@enforce_types
2020-09-04 10:26:49 +12:00
def load_main_index_meta(out_dir: Path=OUTPUT_DIR) -> Optional[dict]:
index_path = out_dir / JSON_INDEX_FILENAME
if index_path.exists():
with open(index_path, 'r', encoding='utf-8') as f:
2019-04-28 09:26:24 +12:00
meta_dict = pyjson.load(f)
meta_dict.pop('links')
return meta_dict
return None
2020-07-14 03:26:30 +12:00
@enforce_types
def parse_snapshots_from_source(source_path: str, root_url: Optional[str]=None) -> List[Model]:
2019-04-28 09:26:24 +12:00
from ..parsers import parse_snapshots
2019-05-01 15:13:04 +12:00
new_links: List[Model] = []
# parse and validate the import file
raw_snapshots, parser_name = parse_snapshots(source_path, root_url=root_url)
new_snapshots = validate_snapshots(raw_snapshots)
2019-03-21 18:28:12 +13:00
2020-07-14 03:26:30 +12:00
if parser_name:
num_parsed = len(raw_snapshots)
2020-07-14 03:26:30 +12:00
log_parsing_finished(num_parsed, parser_name)
return new_snapshots
2020-07-14 03:26:30 +12:00
@enforce_types
def filter_new_urls(snapshots: QuerySet,
new_snapshots: List) -> List:
"""
Returns a list of Snapshots corresponding to the urls that were not present in the index
"""
urls = {snapshot.url: snapshot for snapshot in new_snapshots}
filtered_snapshots = snapshots.filter(url__in=urls.keys())
for found_snapshot in filtered_snapshots:
urls.pop(found_snapshot.url)
log_deduping_finished(len(urls.keys()))
2019-03-21 18:28:12 +13:00
return list(urls.values())
### Link Details Index
@enforce_types
def write_snapshot_details(snapshot: List[Model], out_dir: Optional[str]=None, skip_sql_index: bool=False) -> None:
out_dir = out_dir or snapshot.snapshot_dir
write_json_snapshot_details(snapshot, out_dir=out_dir)
2021-01-19 03:10:07 +13:00
write_html_snapshot_details(snapshot, out_dir=out_dir)
2020-07-30 04:19:06 +12:00
if not skip_sql_index:
write_sql_snapshot_details(snapshot)
2017-10-23 22:58:41 +13:00
@enforce_types
def load_snapshot_details(snapshot: Model, out_dir: Optional[str]=None) -> Model:
2019-03-21 18:28:12 +13:00
"""check for an existing link archive in the given directory,
and load+merge it into the given link dict
"""
2020-12-31 06:25:32 +13:00
out_dir = out_dir or Path(snapshot.snapshot_dir)
2021-01-01 07:21:40 +13:00
existing_snapshot = load_json_snapshot_details(Path(out_dir))
if existing_snapshot:
return merge_snapshots(existing_snapshot, snapshot)
return snapshot
2019-04-28 09:26:24 +12:00
LINK_FILTERS = {
'exact': lambda pattern: Q(url=pattern),
'substring': lambda pattern: Q(url__icontains=pattern),
'regex': lambda pattern: Q(url__iregex=pattern),
2020-08-22 06:32:31 +12:00
'domain': lambda pattern: Q(url__istartswith=f"http://{pattern}") | Q(url__istartswith=f"https://{pattern}") | Q(url__istartswith=f"ftp://{pattern}"),
2020-11-14 06:06:12 +13:00
'tag': lambda pattern: Q(tags__name=pattern),
2019-04-28 09:26:24 +12:00
}
@enforce_types
def q_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='exact') -> QuerySet:
q_filter = Q()
2019-04-28 09:26:24 +12:00
for pattern in filter_patterns:
try:
q_filter = q_filter | LINK_FILTERS[filter_type](pattern)
except KeyError:
2019-04-28 09:26:24 +12:00
stderr()
stderr(
f'[X] Got invalid pattern for --filter-type={filter_type}:',
color='red',
)
stderr(f' {pattern}')
raise SystemExit(2)
return snapshots.filter(q_filter)
2019-04-28 09:26:24 +12:00
def search_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='search') -> QuerySet:
if not search_backend_enabled():
stderr()
stderr(
'[X] The search backend is not enabled, set config.USE_SEARCHING_BACKEND = True',
color='red',
)
raise SystemExit(2)
from core.models import Snapshot
qsearch = Snapshot.objects.none()
for pattern in filter_patterns:
try:
qsearch |= query_search_index(pattern)
except:
raise SystemExit(2)
return snapshots & qsearch
@enforce_types
def snapshot_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='exact') -> QuerySet:
if filter_type != 'search':
return q_filter(snapshots, filter_patterns, filter_type)
else:
return search_filter(snapshots, filter_patterns, filter_type)
2019-04-28 09:26:24 +12:00
2020-12-31 06:25:32 +13:00
def get_indexed_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-04-28 09:26:24 +12:00
"""indexed links without checking archive status or data directory validity"""
2020-12-31 06:25:32 +13:00
return {snapshot.snapshot_dir: snapshot for snapshot in snapshots}
2019-04-28 09:26:24 +12:00
2020-12-31 06:25:32 +13:00
def get_archived_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-04-28 09:26:24 +12:00
"""indexed links that are archived with a valid data directory"""
2020-12-31 06:25:32 +13:00
return {snapshot.snapshot_dir: snapshot for snapshot in filter(is_archived, snapshots)}
2019-04-28 09:26:24 +12:00
2020-12-31 06:25:32 +13:00
def get_unarchived_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-04-28 09:26:24 +12:00
"""indexed links that are unarchived with no data directory or an empty data directory"""
2020-12-31 06:25:32 +13:00
return {snapshot.snapshot_dir: snapshot for snapshot in filter(is_unarchived, snapshots)}
2019-04-28 09:26:24 +12:00
2020-12-31 06:25:32 +13:00
def get_present_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-05-03 07:20:21 +12:00
"""dirs that actually exist in the archive/ folder"""
2020-12-31 06:25:32 +13:00
from core.models import Snapshot
2019-04-28 09:26:24 +12:00
all_folders = {}
for entry in (out_dir / ARCHIVE_DIR_NAME).iterdir():
if entry.is_dir():
2020-12-31 06:25:32 +13:00
snapshot = None
2019-04-28 09:26:24 +12:00
try:
2021-01-01 07:21:40 +13:00
snapshot = load_json_snapshot(Path(entry.path))
2019-04-28 09:26:24 +12:00
except Exception:
pass
2020-12-31 06:25:32 +13:00
all_folders[entry.name] = snapshot
2019-04-28 09:26:24 +12:00
return all_folders
2020-12-31 06:25:32 +13:00
def get_valid_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-04-28 09:26:24 +12:00
"""dirs with a valid index matched to the main index and archived content"""
2020-12-31 06:25:32 +13:00
return {snapshot.snapshot_dir: snapshot for snapshot in filter(is_valid, snapshots)}
2019-04-28 09:26:24 +12:00
2020-12-31 06:25:32 +13:00
def get_invalid_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-04-28 09:26:24 +12:00
"""dirs that are invalid for any reason: corrupted/duplicate/orphaned/unrecognized"""
duplicate = get_duplicate_folders(snapshots, out_dir=OUTPUT_DIR)
orphaned = get_orphaned_folders(snapshots, out_dir=OUTPUT_DIR)
corrupted = get_corrupted_folders(snapshots, out_dir=OUTPUT_DIR)
unrecognized = get_unrecognized_folders(snapshots, out_dir=OUTPUT_DIR)
2019-04-28 09:26:24 +12:00
return {**duplicate, **orphaned, **corrupted, **unrecognized}
2020-12-31 06:25:32 +13:00
def get_duplicate_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-04-28 09:26:24 +12:00
"""dirs that conflict with other directories that have the same link URL or timestamp"""
by_url = {}
by_timestamp = {}
2019-04-28 09:26:24 +12:00
duplicate_folders = {}
data_folders = (
str(entry)
for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir()
if entry.is_dir() and not snapshots.filter(timestamp=entry.name).exists()
2019-04-28 09:26:24 +12:00
)
for path in chain(snapshots.iterator(), data_folders):
2020-12-31 06:25:32 +13:00
snapshot = None
if type(path) is not str:
2020-12-31 06:25:32 +13:00
path = path.snapshot_dir
2019-04-28 09:26:24 +12:00
try:
2021-01-01 07:21:40 +13:00
snapshot = load_json_snapshot(Path(path))
2019-04-28 09:26:24 +12:00
except Exception:
pass
2020-12-31 06:25:32 +13:00
if snapshot:
# snapshot folder has same timestamp as different link folder
by_timestamp[snapshot.timestamp] = by_timestamp.get(snapshot.timestamp, 0) + 1
if by_timestamp[snapshot.timestamp] > 1:
duplicate_folders[path] = snapshot
2019-04-28 09:26:24 +12:00
# link folder has same url as different link folder
2020-12-31 06:25:32 +13:00
by_url[snapshot.url] = by_url.get(snapshot.url, 0) + 1
if by_url[snapshot.url] > 1:
duplicate_folders[path] = snapshot
2019-04-28 09:26:24 +12:00
return duplicate_folders
2020-12-31 06:25:32 +13:00
def get_orphaned_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-04-28 09:26:24 +12:00
"""dirs that contain a valid index but aren't listed in the main index"""
orphaned_folders = {}
for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir():
if entry.is_dir():
2020-12-31 06:25:32 +13:00
snapshot = None
2019-04-28 09:26:24 +12:00
try:
snapshot = load_json_snapshot(entry)
2019-04-28 09:26:24 +12:00
except Exception:
pass
2020-12-31 06:25:32 +13:00
if snapshot and not snapshots.filter(timestamp=entry.name).exists():
2019-04-28 09:26:24 +12:00
# folder is a valid link data dir with index details, but it's not in the main index
2020-12-31 06:25:32 +13:00
orphaned_folders[str(entry)] = snapshot
2019-04-28 09:26:24 +12:00
return orphaned_folders
2020-12-31 06:25:32 +13:00
def get_corrupted_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-04-28 09:26:24 +12:00
"""dirs that don't contain a valid index and aren't listed in the main index"""
corrupted = {}
for snapshot in snapshots.iterator():
2020-12-31 06:25:32 +13:00
if is_corrupt(snapshot):
corrupted[snapshot.snapshot_dir] = snapshot
return corrupted
2020-12-31 06:25:32 +13:00
def get_unrecognized_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Model]]:
2019-04-28 09:26:24 +12:00
"""dirs that don't contain recognizable archive data and aren't listed in the main index"""
2020-12-31 06:25:32 +13:00
unrecognized_folders: Dict[str, Optional[Model]] = {}
2019-04-28 09:26:24 +12:00
for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir():
if entry.is_dir():
index_exists = (entry / "index.json").exists()
2020-12-31 06:25:32 +13:00
snapshot = None
2019-04-28 09:26:24 +12:00
try:
2021-01-01 07:21:40 +13:00
snapshot = load_json_snapshot(entry)
except KeyError:
# Try to fix index
if index_exists:
2020-12-31 06:25:32 +13:00
pass
# TODO: Implement the `guess` bit for snapshots
# try:
# Last attempt to repair the detail index
2020-12-31 06:25:32 +13:00
# link_guessed = parse_json_snapshot_details(str(entry), guess=True)
# write_json_snapshot_details(link_guessed, out_dir=str(entry))
# link = parse_json_link_details(str(entry))
# except Exception:
# pass
2019-04-28 09:26:24 +12:00
2020-12-31 06:25:32 +13:00
if index_exists and snapshot is None:
2019-04-28 09:26:24 +12:00
# index exists but it's corrupted or unparseable
2020-12-31 06:25:32 +13:00
unrecognized_folders[str(entry)] = snapshot
2019-04-28 09:26:24 +12:00
elif not index_exists:
# link details index doesn't exist and the folder isn't in the main index
timestamp = entry.name
if not snapshots.filter(timestamp=timestamp).exists():
2020-12-31 06:25:32 +13:00
unrecognized_folders[str(entry)] = snapshot
2019-04-28 09:26:24 +12:00
return unrecognized_folders
2020-12-31 06:25:32 +13:00
def is_valid(snapshot: Model) -> bool:
dir_exists = Path(snapshot.snapshot_dir).exists()
index_exists = (Path(snapshot.snapshot_dir) / "index.json").exists()
2019-04-28 09:26:24 +12:00
if not dir_exists:
# unarchived links are not included in the valid list
return False
if dir_exists and not index_exists:
return False
if dir_exists and index_exists:
try:
2020-12-31 06:25:32 +13:00
# TODO: review if the `guess` was necessary here
2020-12-31 06:53:20 +13:00
parsed_snapshot = load_json_snapshot(snapshot.snapshot_dir)
2020-12-31 06:25:32 +13:00
return snapshot.url == parsed_snapshot.url
except Exception:
2019-04-28 09:26:24 +12:00
pass
return False
2020-12-31 06:25:32 +13:00
def is_corrupt(snapshot: Model) -> bool:
if not Path(snapshot.snapshot_dir).exists():
2019-04-28 09:26:24 +12:00
# unarchived links are not considered corrupt
return False
2020-12-31 06:25:32 +13:00
if is_valid(snapshot):
2019-04-28 09:26:24 +12:00
return False
return True
2020-12-31 06:25:32 +13:00
def is_archived(snapshot: Model) -> bool:
return is_valid(snapshot) and snapshot.is_archived
2019-04-28 09:26:24 +12:00
2020-12-31 06:25:32 +13:00
def is_unarchived(snapshot: Model) -> bool:
if not Path(snapshot.snapshot_dir).exists():
2019-04-28 09:26:24 +12:00
return True
2020-12-31 06:25:32 +13:00
return not snapshot.is_archived
2019-04-28 09:26:24 +12:00
2020-09-04 10:26:49 +12:00
def fix_invalid_folder_locations(out_dir: Path=OUTPUT_DIR) -> Tuple[List[str], List[str]]:
2019-04-28 09:26:24 +12:00
fixed = []
cant_fix = []
for entry in os.scandir(out_dir / ARCHIVE_DIR_NAME):
2019-04-28 09:26:24 +12:00
if entry.is_dir(follow_symlinks=True):
if (Path(entry.path) / 'index.json').exists():
try:
2021-01-01 07:21:40 +13:00
snapshot = load_json_snapshot(Path(entry.path))
except KeyError:
2020-12-31 06:25:32 +13:00
snapshot = None
if not snapshot:
2019-04-28 09:26:24 +12:00
continue
2021-01-01 07:21:40 +13:00
if not entry.path.endswith(f'/{snapshot.timestamp}'):
2020-12-31 06:25:32 +13:00
dest = out_dir / ARCHIVE_DIR_NAME / snapshot.timestamp
if dest.exists():
2019-04-28 09:26:24 +12:00
cant_fix.append(entry.path)
else:
shutil.move(entry.path, dest)
fixed.append(dest)
2019-05-01 15:13:04 +12:00
timestamp = entry.path.rsplit('/', 1)[-1]
2020-12-31 06:25:32 +13:00
assert snapshot.snapshot_dir == entry.path
assert snapshot.timestamp == timestamp
write_json_snapshot_details(snapshot, out_dir=entry.path)
2019-04-28 09:26:24 +12:00
return fixed, cant_fix