This commit is contained in:
toxicrecker 2024-01-01 10:13:11 -07:00 committed by GitHub
commit ba387a58cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 121 additions and 12 deletions

View file

@ -6,6 +6,7 @@ import csv
import json
import logging
import os
import shutil
import shlex
import subprocess
import time
@ -21,7 +22,7 @@ from legendary import __version__, __codename__
from legendary.core import LegendaryCore
from legendary.models.exceptions import InvalidCredentialsError
from legendary.models.game import SaveGameStatus, VerifyResult, Game
from legendary.utils.cli import get_boolean_choice, get_int_choice, sdl_prompt, strtobool
from legendary.utils.cli import get_boolean_choice, get_int_choice, sdl_prompt, strtobool, scan_dir
from legendary.lfs.crossover import *
from legendary.utils.custom_parser import HiddenAliasSubparsersAction
from legendary.utils.env import is_windows_mac_or_pyi
@ -2599,25 +2600,119 @@ class LegendaryCLI:
old_base, game_folder = os.path.split(igame.install_path.replace('\\', '/'))
new_path = os.path.join(args.new_path, game_folder)
logger.info(f'Moving "{game_folder}" from "{old_base}" to "{args.new_path}"')
if not args.skip_move:
try:
if not os.path.exists(args.new_path):
os.makedirs(args.new_path)
if not os.path.exists(args.new_path):
os.makedirs(args.new_path)
if shutil._destinsrc(igame.install_path, new_path):
logger.error(f'Cannot move the folder into itself.')
return
if (shutil._is_immutable(igame.install_path) or (not os.access(igame.install_path, os.W_OK) \
and os.listdir(igame.install_path) and sys_platform == 'darwin')):
logger.error(f'Cannot move the directory "{igame.install_path}", lacking write permission to it.')
return
existing_files = dict(__total__ = 0)
existing_chunks = 0
if os.path.exists(new_path):
if not get_boolean_choice(f'"{game_folder}" is found in the target path, would you like to resume moving this game?'):
return
logger.info('Attempting to resume the process... DO NOT PANIC IF IT LOOKS STUCK')
manifest_data, _ = self.core.get_installed_manifest(igame.app_name)
if manifest_data is None:
logger.critical(f'Manifest appears to be missing! To repair, run "legendary repair '
f'{args.app_name} --repair-and-update", this will however redownload all files '
f'that do not match the latest manifest in their entirety.')
return
manifest = self.core.load_manifest(manifest_data)
files = manifest.file_manifest_list.elements
if config_tags := self.core.lgd.config.get(args.app_name, 'install_tags', fallback=None):
install_tags = set(i.strip() for i in config_tags.split(','))
file_list = [
(f.filename, f.sha_hash.hex())
for f in files
if any(it in install_tags for it in f.install_tags) or not f.install_tags
]
else:
file_list = [(f.filename, f.sha_hash.hex()) for f in files]
for result, path, _, bytes_read in validate_files(new_path, file_list):
if result == VerifyResult.HASH_MATCH:
path = path.replace('\\', '/').split('/')
dir, filename = ('/'.join(path[:-1]), path[-1]) # 'foo/bar/baz.txt' -> ('foo/bar', 'baz.txt')
if dir not in existing_files:
existing_files[dir] = []
existing_files[dir].append(filename)
existing_files['__total__'] += 1
existing_chunks += bytes_read
os.rename(igame.install_path, new_path)
def _ignore(dir, existing_files, game_folder):
dir = dir.replace('\\', '/').split(f'{game_folder}')[1:][0]
if dir.startswith('/'): dir = dir[1:]
return existing_files.get(dir, [])
ignore = lambda dir, _: _ignore(dir, existing_files, game_folder)
logger.info('This process could be stopped and resumed by interrupting it with CTRL-C')
if not get_boolean_choice(f'Are you sure you wish to move "{igame.title}" from "{old_base}" to "{args.new_path}"?'):
print('Aborting...')
exit(0)
try:
data = dict(
**dict(zip(('total_files', 'total_chunks'), scan_dir(igame.install_path))),
copied_files = 0 + existing_files['__total__'],
last_copied_chunks = 0,
copied_chunks = 0 + existing_chunks,
**dict.fromkeys(('start', 'last'), time.perf_counter())
)
def _copy_function(src, dst, data):
shutil.copy2(src, dst, follow_symlinks=True)
size = os.path.getsize(dst)
data['last_copied_chunks'] += size
data['copied_chunks'] += size
data['copied_files'] += 1
now = time.perf_counter()
runtime = now - data['start']
delta = now - data['last']
if delta < 1 and data['copied_files'] < data['total_files']: # to prevent spamming the console
return
data['last'] = now
speed = data['last_copied_chunks'] / delta
data['last_copied_chunks'] = 0
perc = data['copied_files'] / data['total_files'] * 100
average_speed = data['copied_chunks'] / runtime
estimate = (data['total_chunks'] - data['copied_chunks']) / average_speed
minutes, seconds = int(estimate//60), int(estimate%60)
hours, minutes = int(minutes//60), int(minutes%60)
rt_minutes, rt_seconds = int(runtime//60), int(runtime%60)
rt_hours, rt_minutes = int(rt_minutes//60), int(rt_minutes%60)
logger.info(f'= Progress: {perc:.02f}% ({data["copied_files"]}/{data["total_files"]}), ')
logger.info(f' + Running for {rt_hours:02d}:{rt_minutes:02d}:{rt_seconds:02d}, ')
logger.info(f' + ETA: {hours:02d}:{minutes:02d}:{seconds:02d}')
logger.info(f' + Speed: {speed / 1024 / 1024:.02f} MiB/s')
def copy_function(src, dst, *_, **__):
_copy_function(src, dst, data)
shutil.copytree(igame.install_path, new_path, copy_function=copy_function, dirs_exist_ok=True, ignore=ignore)
except Exception as e:
if isinstance(e, OSError) and e.errno == 18:
logger.error(f'Moving to a different drive is not supported. Move the folder manually to '
f'"{new_path}" and run "legendary move {app_name} "{args.new_path}" --skip-move"')
elif isinstance(e, FileExistsError):
logger.error(f'The target path already contains a folder called "{game_folder}", '
f'please remove or rename it first.')
if isinstance(e, PermissionError):
logger.error(f'Cannot move the directory "{igame.install_path}", lacking write permission to it.')
else:
logger.error(f'Moving failed with unknown error {e!r}.')
logger.info(f'Try moving the folder manually to "{new_path}" and running '
f'"legendary move {app_name} "{args.new_path}" --skip-move"')
return
except KeyboardInterrupt:
logger.info('The process has been cancelled.')
return
try:
shutil.rmtree(igame.install_path)
except KeyboardInterrupt:
logger.info('The process cannot be cancelled now. Please wait patiently for a few seconds.')
shutil.rmtree(igame.install_path)
else:
logger.info(f'Not moving, just rewriting legendary metadata...')

View file

@ -1,3 +1,5 @@
import os
def get_boolean_choice(prompt, default=True):
yn = 'Y/n' if default else 'y/N'
@ -89,3 +91,15 @@ def strtobool(val):
else:
raise ValueError("invalid truth value %r" % (val,))
def scan_dir(src):
files = 0
chunks = 0
for entry in os.scandir(src):
if entry.is_dir():
cnt, sz = scan_dir(entry.path)
files += cnt
chunks += sz
else:
files += 1
chunks += entry.stat().st_size
return files, chunks