1
0
Fork 0
mirror of synced 2024-06-26 10:00:19 +12:00

add functions to parse link details jsons and list+apply migrations

This commit is contained in:
Nick Sweeting 2019-04-24 04:07:46 -04:00
parent 0b27f33d2e
commit 0f2497a2a6
2 changed files with 41 additions and 1 deletions

View file

@ -15,6 +15,7 @@ from ..config import (
GIT_SHA,
DEPENDENCIES,
JSON_INDEX_FILENAME,
ARCHIVE_DIR_NAME,
)
from ..util import (
enforce_types,
@ -98,3 +99,12 @@ def parse_json_link_details(out_dir: str) -> Optional[Link]:
link_json = json.load(f)
return Link.from_json(link_json)
return None
@enforce_types
def parse_json_links_details(out_dir: str) -> Iterator[Link]:
"""read through all the archive data folders and return the parsed links"""
for entry in os.scandir(os.path.join(out_dir, ARCHIVE_DIR_NAME)):
if entry.is_dir(follow_symlinks=True):
if os.path.exists(os.path.join(entry.path, 'index.json')):
yield parse_json_link_details(entry.path)

View file

@ -1,6 +1,7 @@
__package__ = 'archivebox.legacy.storage'
from typing import List, Iterator
from io import StringIO
from typing import List, Tuple, Iterator
from ..schema import Link
from ..util import enforce_types
@ -27,3 +28,32 @@ def write_sql_main_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
for link in links:
info = {k: v for k, v in link._asdict().items() if k in Page.keys}
Page.objects.update_or_create(url=link.url, defaults=info)
@enforce_types
def list_migrations(out_dir: str=OUTPUT_DIR) -> List[Tuple[bool, str]]:
setup_django(out_dir, check_db=False)
from django.core.management import call_command
out = StringIO()
call_command("showmigrations", list=True, stdout=out)
out.seek(0)
migrations = []
for line in out.readlines():
if line.strip() and ']' in line:
status_str, name_str = line.strip().split(']', 1)
is_applied = 'X' in status_str
migration_name = name_str.strip()
migrations.append((is_applied, migration_name))
return migrations
@enforce_types
def apply_migrations(out_dir: str=OUTPUT_DIR) -> List[str]:
setup_django(out_dir, check_db=False)
from django.core.management import call_command
null, out = StringIO(), StringIO()
call_command("makemigrations", interactive=False, stdout=null)
call_command("migrate", interactive=False, stdout=out)
out.seek(0)
return [line.strip() for line in out.readlines() if line.strip()]