1
0
Fork 0
mirror of synced 2024-06-30 20:10:35 +12:00
ArchiveBox/archivebox/index/sql.py

153 lines
4.9 KiB
Python
Raw Permalink Normal View History

2019-04-28 09:26:24 +12:00
__package__ = 'archivebox.index'
import re
from io import StringIO
2020-09-04 10:26:49 +12:00
from pathlib import Path
from typing import List, Tuple, Iterator
from django.db.models import QuerySet
2020-12-06 13:11:36 +13:00
from django.db import transaction
2019-04-28 09:26:24 +12:00
from .schema import Link
from ..util import enforce_types, parse_date
from ..config import (
OUTPUT_DIR,
TAG_SEPARATOR_PATTERN,
)
### Main Links Index
@enforce_types
2020-09-04 10:26:49 +12:00
def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
2019-05-01 15:44:51 +12:00
from core.models import Snapshot
return (
2019-05-01 15:44:51 +12:00
Link.from_json(page.as_json(*Snapshot.keys))
for page in Snapshot.objects.all()
)
@enforce_types
2021-03-01 16:54:40 +13:00
def remove_from_sql_main_index(snapshots: QuerySet, atomic: bool=False, out_dir: Path=OUTPUT_DIR) -> None:
if atomic:
with transaction.atomic():
return snapshots.delete()
return snapshots.delete()
@enforce_types
def write_link_to_sql_index(link: Link):
from core.models import Snapshot, ArchiveResult
info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
2021-12-24 06:17:55 +13:00
tag_list = list(dict.fromkeys(
tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')
2021-12-24 06:17:55 +13:00
))
info.pop('tags')
try:
info["timestamp"] = Snapshot.objects.get(url=link.url).timestamp
except Snapshot.DoesNotExist:
while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
info["timestamp"] = str(float(info["timestamp"]) + 1.0)
2021-03-01 16:54:40 +13:00
snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
snapshot.save_tags(tag_list)
for extractor, entries in link.history.items():
for entry in entries:
if isinstance(entry, dict):
result, _ = ArchiveResult.objects.get_or_create(
snapshot_id=snapshot.id,
extractor=extractor,
start_ts=parse_date(entry['start_ts']),
defaults={
'end_ts': parse_date(entry['end_ts']),
'cmd': entry['cmd'],
'output': entry['output'],
'cmd_version': entry.get('cmd_version') or 'unknown',
'pwd': entry['pwd'],
'status': entry['status'],
}
)
else:
2021-02-18 22:26:56 +13:00
result, _ = ArchiveResult.objects.update_or_create(
snapshot_id=snapshot.id,
extractor=extractor,
start_ts=parse_date(entry.start_ts),
defaults={
'end_ts': parse_date(entry.end_ts),
'cmd': entry.cmd,
'output': entry.output,
'cmd_version': entry.cmd_version or 'unknown',
'pwd': entry.pwd,
'status': entry.status,
}
)
return snapshot
@enforce_types
2020-09-04 10:26:49 +12:00
def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None:
2021-03-01 16:54:40 +13:00
for link in links:
# with transaction.atomic():
# write_link_to_sql_index(link)
write_link_to_sql_index(link)
2019-04-28 09:26:24 +12:00
@enforce_types
2020-09-04 10:26:49 +12:00
def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR) -> None:
from core.models import Snapshot
2021-03-01 16:54:40 +13:00
# with transaction.atomic():
# try:
# snap = Snapshot.objects.get(url=link.url)
# except Snapshot.DoesNotExist:
# snap = write_link_to_sql_index(link)
# snap.title = link.title
try:
snap = Snapshot.objects.get(url=link.url)
except Snapshot.DoesNotExist:
snap = write_link_to_sql_index(link)
snap.title = link.title
2020-09-22 04:50:26 +12:00
2021-12-24 06:17:55 +13:00
tag_list = list(dict.fromkeys(
tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')
2021-12-24 06:17:55 +13:00
))
2020-09-22 04:50:26 +12:00
2021-03-01 16:54:40 +13:00
snap.save()
snap.save_tags(tag_list)
@enforce_types
2020-09-04 10:26:49 +12:00
def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
from django.core.management import call_command
out = StringIO()
call_command("showmigrations", list=True, stdout=out)
out.seek(0)
migrations = []
for line in out.readlines():
if line.strip() and ']' in line:
status_str, name_str = line.strip().split(']', 1)
is_applied = 'X' in status_str
migration_name = name_str.strip()
migrations.append((is_applied, migration_name))
return migrations
@enforce_types
2020-09-04 10:26:49 +12:00
def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
from django.core.management import call_command
null, out = StringIO(), StringIO()
call_command("makemigrations", interactive=False, stdout=null)
call_command("migrate", interactive=False, stdout=out)
out.seek(0)
return [line.strip() for line in out.readlines() if line.strip()]
2019-04-25 03:37:30 +12:00
@enforce_types
2020-09-04 10:26:49 +12:00
def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
2019-04-25 03:37:30 +12:00
from django.contrib.auth.models import User
return User.objects.filter(is_superuser=True)