legendary/legendary/lfs/utils.py
derrod e26b9e60ff [core/lfs] Use filelock for user data
Closes #566

Co-authored-by: Mathis Dröge <mathis.droege@ewe.net>
2023-06-17 22:58:16 +02:00

201 lines
6.5 KiB
Python

# coding: utf-8
import os
import shutil
import hashlib
import json
import logging
from pathlib import Path
from sys import stdout
from time import perf_counter
from typing import List, Iterator
from filelock import FileLock
from legendary.models.game import VerifyResult
logger = logging.getLogger('LFS Utils')
def delete_folder(path: str, recursive=True) -> bool:
try:
logger.debug(f'Deleting "{path}", recursive={recursive}...')
if not recursive:
os.removedirs(path)
else:
shutil.rmtree(path)
except Exception as e:
logger.error(f'Failed deleting files with {e!r}')
return False
else:
return True
def delete_filelist(path: str, filenames: List[str],
delete_root_directory: bool = False,
silent: bool = False) -> bool:
dirs = set()
no_error = True
# delete all files that were installed
for filename in filenames:
_dir, _fn = os.path.split(filename)
if _dir:
dirs.add(_dir)
try:
os.remove(os.path.join(path, _dir, _fn))
except Exception as e:
if not silent:
logger.error(f'Failed deleting file {filename} with {e!r}')
no_error = False
# add intermediate directories that would have been missed otherwise
for _dir in sorted(dirs):
head, _ = os.path.split(_dir)
while head:
dirs.add(head)
head, _ = os.path.split(head)
# remove all directories
for _dir in sorted(dirs, key=len, reverse=True):
try:
os.rmdir(os.path.join(path, _dir))
except FileNotFoundError:
# directory has already been deleted, ignore that
continue
except Exception as e:
if not silent:
logger.error(f'Failed removing directory "{_dir}" with {e!r}')
no_error = False
if delete_root_directory:
try:
os.rmdir(path)
except Exception as e:
if not silent:
logger.error(f'Removing game directory failed with {e!r}')
return no_error
def validate_files(base_path: str, filelist: List[tuple], hash_type='sha1',
large_file_threshold=1024 * 1024 * 512) -> Iterator[tuple]:
"""
Validates the files in filelist in path against the provided hashes
:param base_path: path in which the files are located
:param filelist: list of tuples in format (path, hash [hex])
:param hash_type: (optional) type of hash, default is sha1
:param large_file_threshold: (optional) threshold for large files, default is 512 MiB
:return: yields tuples in format (VerifyResult, path, hash [hex], bytes read)
"""
if not filelist:
raise ValueError('No files to validate!')
if not os.path.exists(base_path):
raise OSError('Path does not exist')
for file_path, file_hash in filelist:
full_path = os.path.join(base_path, file_path)
# logger.debug(f'Checking "{file_path}"...')
if not os.path.exists(full_path):
yield VerifyResult.FILE_MISSING, file_path, '', 0
continue
show_progress = False
interval = 0
speed = 0.0
start_time = 0.0
try:
_size = os.path.getsize(full_path)
if _size > large_file_threshold:
# enable progress indicator and go to new line
stdout.write('\n')
show_progress = True
interval = (_size / (1024 * 1024)) // 100
start_time = perf_counter()
with open(full_path, 'rb') as f:
real_file_hash = hashlib.new(hash_type)
i = 0
while chunk := f.read(1024*1024):
real_file_hash.update(chunk)
if show_progress and i % interval == 0:
pos = f.tell()
perc = (pos / _size) * 100
speed = pos / 1024 / 1024 / (perf_counter() - start_time)
stdout.write(f'\r=> Verifying large file "{file_path}": {perc:.0f}% '
f'({pos / 1024 / 1024:.1f}/{_size / 1024 / 1024:.1f} MiB) '
f'[{speed:.1f} MiB/s]\t')
stdout.flush()
i += 1
if show_progress:
stdout.write(f'\r=> Verifying large file "{file_path}": 100% '
f'({_size / 1024 / 1024:.1f}/{_size / 1024 / 1024:.1f} MiB) '
f'[{speed:.1f} MiB/s]\t\n')
result_hash = real_file_hash.hexdigest()
if file_hash != result_hash:
yield VerifyResult.HASH_MISMATCH, file_path, result_hash, f.tell()
else:
yield VerifyResult.HASH_MATCH, file_path, result_hash, f.tell()
except Exception as e:
logger.fatal(f'Could not verify "{file_path}"; opening failed with: {e!r}')
yield VerifyResult.OTHER_ERROR, file_path, '', 0
def clean_filename(filename):
return ''.join(i for i in filename if i not in '<>:"/\\|?*')
def get_dir_size(path):
return sum(f.stat().st_size for f in Path(path).glob('**/*') if f.is_file())
class LockedJSONData(FileLock):
def __init__(self, file_path: str):
super().__init__(file_path + '.lock')
self._file_path = file_path
self._data = None
self._initial_data = None
def __enter__(self):
super().__enter__()
if os.path.exists(self._file_path):
with open(self._file_path, 'r', encoding='utf-8') as f:
self._data = json.load(f)
self._initial_data = self._data
return self
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
if self._data != self._initial_data:
if self._data is not None:
with open(self._file_path, 'w', encoding='utf-8') as f:
json.dump(self._data, f, indent=2, sort_keys=True)
else:
if os.path.exists(self._file_path):
os.remove(self._file_path)
@property
def data(self):
return self._data
@data.setter
def data(self, new_data):
if new_data is None:
raise ValueError('Invalid new data, use clear() explicitly to reset file data')
self._data = new_data
def clear(self):
self._data = None