Cherry-pick some Sourcery refactoring suggestions

This commit is contained in:
derrod 2022-10-25 15:38:55 +02:00
parent 85f6bd3220
commit 0e23b8e4f0
14 changed files with 109 additions and 141 deletions

View file

@ -51,10 +51,7 @@ class EPCAPI:
self.language_code = lc
self.country_code = cc
if timeout > 0:
self.request_timeout = timeout
else:
self.request_timeout = None
self.request_timeout = timeout if timeout > 0 else None
def get_auth_url(self):
login_url = 'https://www.epicgames.com/id/login?redirectUrl='
@ -236,10 +233,8 @@ class EPCAPI:
return records
def get_user_cloud_saves(self, app_name='', manifests=False, filenames=None):
if app_name and manifests:
app_name += '/manifests/'
elif app_name:
app_name += '/'
if app_name:
app_name += '/manifests/' if manifests else '/'
user_id = self.user.get('account_id')

View file

@ -247,7 +247,7 @@ class LegendaryCLI:
elif _store:
print(f' ! This game has to be installed through a third-party store ({_store}, not supported)')
else:
print(f' ! No version information (unknown cause)')
print(' ! No version information (unknown cause)')
# Games that have assets, but only require a one-time activation before they can be independently installed
# via a third-party platform (e.g. Uplay)
if game.partner_link_type:
@ -380,15 +380,16 @@ class LegendaryCLI:
writer.writerow(['path', 'hash', 'size', 'install_tags'])
writer.writerows((fm.filename, fm.hash.hex(), fm.file_size, '|'.join(fm.install_tags)) for fm in files)
elif args.json:
_files = []
for fm in files:
_files.append(dict(
_files = [
dict(
filename=fm.filename,
sha_hash=fm.hash.hex(),
install_tags=fm.install_tags,
file_size=fm.file_size,
flags=fm.flags,
))
flags=fm.flags
)
for fm in files
]
return self._print_json(_files, args.pretty_json)
else:
install_tags = set()
@ -430,7 +431,7 @@ class LegendaryCLI:
if not self.core.login():
logger.error('Login failed! Cannot continue with download process.')
exit(1)
logger.info(f'Cleaning saves...')
logger.info('Cleaning saves...')
self.core.clean_saves(self._resolve_aliases(args.app_name), args.delete_incomplete)
def sync_saves(self, args):
@ -449,10 +450,9 @@ class LegendaryCLI:
# check available saves
saves = self.core.get_save_games()
latest_save = dict()
for save in sorted(saves, key=lambda a: a.datetime):
latest_save[save.app_name] = save
latest_save = {
save.app_name: save for save in sorted(saves, key=lambda a: a.datetime)
}
logger.info(f'Got {len(latest_save)} remote save game(s)')
@ -475,7 +475,7 @@ class LegendaryCLI:
# if there is no saved save path, try to get one
if not igame.save_path:
if args.yes:
logger.info(f'Save path for this title has not been set, skipping due to --yes')
logger.info('Save path for this title has not been set, skipping due to --yes')
continue
save_path = self.core.get_save_path(igame.app_name, platform=igame.platform)

View file

@ -85,7 +85,7 @@ class LegendaryCore:
except Exception as e:
self.log.warning(f'Getting locale failed: {e!r}, falling back to using en-US.')
elif system() != 'Darwin': # macOS doesn't have a default locale we can query
self.log.warning(f'Could not determine locale, falling back to en-US')
self.log.warning('Could not determine locale, falling back to en-US')
self.update_available = False
self.force_show_update = False
@ -123,9 +123,9 @@ class LegendaryCore:
if r.status_code == 200:
return r.json()['code']
else:
self.log.error(f'Getting exchange code failed: {r.json()}')
return ''
self.log.error(f'Getting exchange code failed: {r.json()}')
return ''
def auth_code(self, code) -> bool:
"""
@ -274,10 +274,10 @@ class LegendaryCore:
"""Applies configuration options returned by update API"""
if not version_info:
version_info = self.lgd.get_cached_version()['data']
# if cached data is invalid
if not version_info:
self.log.debug('No cached legendary config to apply.')
return
# if cached data is invalid
if not version_info:
self.log.debug('No cached legendary config to apply.')
return
if 'egl_config' in version_info:
self.egs.update_egs_params(version_info['egl_config'])
@ -342,10 +342,7 @@ class LegendaryCore:
if not self.egs.user:
return []
if self.lgd.assets:
assets = self.lgd.assets.copy()
else:
assets = dict()
assets = self.lgd.assets.copy() if self.lgd.assets else dict()
assets.update({
platform: [
@ -579,9 +576,17 @@ class LegendaryCore:
# get environment overrides from config
env = dict()
if 'default.env' in self.lgd.config:
env.update({k: v for k, v in self.lgd.config[f'default.env'].items() if v and not k.startswith(';')})
env |= {
k: v
for k, v in self.lgd.config['default.env'].items()
if v and not k.startswith(';')
}
if f'{app_name}.env' in self.lgd.config:
env.update({k: v for k, v in self.lgd.config[f'{app_name}.env'].items() if v and not k.startswith(';')})
env |= {
k: v
for k, v in self.lgd.config[f'{app_name}.env'].items()
if v and not k.startswith(';')
}
if disable_wine:
return env
@ -719,10 +724,8 @@ class LegendaryCore:
elif not install.can_run_offline:
self.log.warning('Game is not approved for offline use and may not work correctly.')
user_name = self.lgd.userdata['displayName']
account_id = self.lgd.userdata['account_id']
if user:
user_name = user
user_name = user or self.lgd.userdata['displayName']
params.egl_parameters.extend([
'-AUTH_LOGIN=unused',
@ -760,10 +763,7 @@ class LegendaryCore:
return params
def get_origin_uri(self, app_name: str, offline: bool = False) -> str:
if offline:
token = '0'
else:
token = self.egs.get_game_token()['code']
token = '0' if offline else self.egs.get_game_token()['code']
user_name = self.lgd.userdata['displayName']
account_id = self.lgd.userdata['account_id']
@ -819,18 +819,18 @@ class LegendaryCore:
}
if sys_platform == 'win32':
path_vars.update({
path_vars |= {
'{appdata}': os.path.expandvars('%LOCALAPPDATA%'),
'{userdir}': os.path.expandvars('%userprofile%/documents'),
'{userprofile}': os.path.expandvars('%userprofile%'),
'{usersavedgames}': os.path.expandvars('%userprofile%/Saved Games')
})
'{usersavedgames}': os.path.expandvars('%userprofile%/Saved Games'),
}
elif sys_platform == 'darwin' and platform == 'Mac':
path_vars.update({
path_vars |= {
'{appdata}': os.path.expanduser('~/Library/Application Support'),
'{userdir}': os.path.expanduser('~/Documents'),
'{userlibrary}': os.path.expanduser('~/Library')
})
'{userlibrary}': os.path.expanduser('~/Library'),
}
else:
wine_pfx = None
# on mac CrossOver takes precedence so check for a bottle first
@ -868,10 +868,10 @@ class LegendaryCore:
wine_pfx = mac_get_bottle_path(cx_bottle)
if not wine_pfx:
proton_pfx = os.getenv('STEAM_COMPAT_DATA_PATH')
if proton_pfx:
if proton_pfx := os.getenv('STEAM_COMPAT_DATA_PATH'):
wine_pfx = f'{proton_pfx}/pfx'
wine_pfx = os.getenv('WINEPREFIX', wine_pfx)
else:
wine_pfx = os.getenv('WINEPREFIX', wine_pfx)
# if all else fails, use the WINE default
if not wine_pfx:
@ -1111,7 +1111,7 @@ class LegendaryCore:
missing_chunks += 1
if (0 < missing_chunks < total_chunks and delete_incomplete) or missing_chunks == total_chunks:
self.log.error(f'Chunk(s) missing, marking manifest for deletion.')
self.log.error('Chunk(s) missing, marking manifest for deletion.')
deletion_list.append(fname)
continue
elif 0 < missing_chunks < total_chunks:
@ -1153,10 +1153,7 @@ class LegendaryCore:
for ass in self.get_assets(True):
if ass.app_name == app_name:
if ass.build_version != installed.version:
return False
else:
return True
return ass.build_version == installed.version
# if we get here something is very wrong
raise ValueError(f'Could not find {app_name} in asset list!')
@ -1167,10 +1164,10 @@ class LegendaryCore:
return self._get_installed_game(app_name) is not None
def is_dlc(self, app_name: str) -> bool:
meta = self.lgd.get_game_meta(app_name)
if not meta:
if meta := self.lgd.get_game_meta(app_name):
return meta.is_dlc
else:
raise ValueError('Game unknown!')
return meta.is_dlc
@staticmethod
def load_manifest(data: bytes) -> Manifest:
@ -1252,10 +1249,7 @@ class LegendaryCore:
return None
r = self.egs.unauth_session.get(f'{base_url}/Deltas/{new_build_id}/{old_build_id}.delta')
if r.status_code == 200:
return r.content
else:
return None
return r.content if r.status_code == 200 else None
def prepare_download(self, game: Game, base_game: Game = None, base_path: str = '',
status_q: Queue = None, max_shm: int = 0, max_workers: int = 0,

View file

@ -29,7 +29,7 @@ class DLManager(Process):
self.base_url = base_url
self.dl_dir = download_dir
self.cache_dir = cache_dir if cache_dir else os.path.join(download_dir, '.cache')
self.cache_dir = cache_dir or os.path.join(download_dir, '.cache')
# All the queues!
self.logging_queue = None
@ -37,7 +37,7 @@ class DLManager(Process):
self.writer_queue = None
self.dl_result_q = None
self.writer_result_q = None
self.max_workers = max_workers if max_workers else min(cpu_count() * 2, 16)
self.max_workers = max_workers or min(cpu_count() * 2, 16)
self.dl_timeout = dl_timeout
# Analysis stuff

View file

@ -51,12 +51,12 @@ class DLWorker(Process):
empty = False
except Empty:
if not empty:
logger.debug(f'Queue Empty, waiting for more...')
logger.debug('Queue Empty, waiting for more...')
empty = True
continue
if isinstance(job, TerminateWorkerTask): # let worker die
logger.debug(f'Worker received termination signal, shutting down...')
logger.debug('Worker received termination signal, shutting down...')
break
tries = 0
@ -99,7 +99,7 @@ class DLWorker(Process):
break
if not chunk:
logger.warning(f'Chunk somehow None?')
logger.warning('Chunk somehow None?')
self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__))
continue
@ -107,7 +107,7 @@ class DLWorker(Process):
try:
size = len(chunk.data)
if size > job.shm.size:
logger.fatal(f'Downloaded chunk is longer than SharedMemorySegment!')
logger.fatal('Downloaded chunk is longer than SharedMemorySegment!')
self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data)
del chunk
@ -130,7 +130,7 @@ class FileWorker(Process):
self.q = queue
self.o_q = out_queue
self.base_path = base_path
self.cache_path = cache_path if cache_path else os.path.join(base_path, '.cache')
self.cache_path = cache_path or os.path.join(base_path, '.cache')
self.shm = SharedMemory(name=shm)
self.log_level = logging.getLogger().level
self.logging_queue = logging_queue
@ -143,7 +143,7 @@ class FileWorker(Process):
logger = logging.getLogger(self.name)
logger.setLevel(self.log_level)
logger.debug(f'Download worker reporting for duty!')
logger.debug('Download worker reporting for duty!')
last_filename = ''
current_file = None
@ -159,7 +159,7 @@ class FileWorker(Process):
if isinstance(j, TerminateWorkerTask):
if current_file:
current_file.close()
logger.debug(f'Worker received termination signal, shutting down...')
logger.debug('Worker received termination signal, shutting down...')
# send termination task to results halnder as well
self.o_q.put(TerminateWorkerTask())
break

View file

@ -90,7 +90,7 @@ class LGDLFS:
except Exception as e:
self.log.error(f'Unable to read configuration file, please ensure that file is valid! '
f'(Error: {repr(e)})')
self.log.warning(f'Continuing with blank config in safe-mode...')
self.log.warning('Continuing with blank config in safe-mode...')
self.config.read_only = True
# make sure "Legendary" section exists
@ -221,8 +221,7 @@ class LGDLFS:
f.write(manifest_data)
def get_game_meta(self, app_name):
_meta = self._game_metadata.get(app_name, None)
if _meta:
if _meta := self._game_metadata.get(app_name, None):
return Game.from_json(_meta)
return None
@ -233,14 +232,14 @@ class LGDLFS:
json.dump(json_meta, open(meta_file, 'w'), indent=2, sort_keys=True)
def delete_game_meta(self, app_name):
if app_name in self._game_metadata:
del self._game_metadata[app_name]
meta_file = os.path.join(self.path, 'metadata', f'{app_name}.json')
if os.path.exists(meta_file):
os.remove(meta_file)
else:
if app_name not in self._game_metadata:
raise ValueError(f'Game {app_name} does not exist in metadata DB!')
del self._game_metadata[app_name]
meta_file = os.path.join(self.path, 'metadata', f'{app_name}.json')
if os.path.exists(meta_file):
os.remove(meta_file)
def get_game_app_names(self):
return sorted(self._game_metadata.keys())
@ -264,9 +263,16 @@ class LGDLFS:
self.log.warning(f'Failed to delete file "{f}": {e!r}')
def clean_manifests(self, in_use):
in_use_files = set(f'{clean_filename(f"{app_name}_{version}")}.manifest' for app_name, version, _ in in_use)
in_use_files |= set(f'{clean_filename(f"{app_name}_{platform}_{version}")}.manifest'
for app_name, version, platform in in_use)
in_use_files = {
f'{clean_filename(f"{app_name}_{version}")}.manifest'
for app_name, version, _ in in_use
}
in_use_files |= {
f'{clean_filename(f"{app_name}_{platform}_{version}")}.manifest'
for app_name, version, platform in in_use
}
for f in os.listdir(os.path.join(self.path, 'manifests')):
if f not in in_use_files:
try:
@ -282,8 +288,7 @@ class LGDLFS:
self.log.debug(f'Failed to load installed game data: {e!r}')
return None
game_json = self._installed.get(app_name, None)
if game_json:
if game_json := self._installed.get(app_name, None):
return InstalledGame.from_json(game_json)
return None
@ -392,7 +397,7 @@ class LGDLFS:
def get_overlay_install_info(self):
if not self._overlay_install_info:
try:
data = json.load(open(os.path.join(self.path, f'overlay_install.json')))
data = json.load(open(os.path.join(self.path, 'overlay_install.json')))
self._overlay_install_info = InstalledGame.from_json(data)
except Exception as e:
self.log.debug(f'Failed to load overlay install data: {e!r}')
@ -440,9 +445,7 @@ class LGDLFS:
def serialise_sets(obj):
"""Turn sets into sorted lists for storage"""
if isinstance(obj, set):
return sorted(obj)
return obj
return sorted(obj) if isinstance(obj, set) else obj
json.dump(alias_map, open(os.path.join(self.path, 'aliases.json'), 'w', newline='\n'),
indent=2, sort_keys=True, default=serialise_sets)

View file

@ -113,10 +113,7 @@ class Chunk:
return _chunk
def write(self, fp=None, compress=True):
if not fp:
bio = BytesIO()
else:
bio = fp
bio = fp or BytesIO()
self.uncompressed_size = self.compressed_size = len(self.data)
if compress or self.compressed:
@ -143,7 +140,4 @@ class Chunk:
# finally, add the data
bio.write(self._data)
if not fp:
return bio.getvalue()
else:
return bio.tell()
return bio.tell() if fp else bio.getvalue()

View file

@ -145,9 +145,9 @@ class EGLManifest:
tmp.executable = igame.executable
tmp.main_game_appname = game.app_name # todo for DLC support this needs to be the base game
tmp.app_folder_name = game.metadata.get('customAttributes', {}).get('FolderName', {}).get('value', '')
tmp.manifest_location = igame.install_path + '/.egstore'
tmp.manifest_location = f'{igame.install_path}/.egstore'
tmp.ownership_token = igame.requires_ot
tmp.staging_location = igame.install_path + '/.egstore/bps'
tmp.staging_location = f'{igame.install_path}/.egstore/bps'
tmp.can_run_offline = igame.can_run_offline
tmp.is_incomplete_install = False
tmp.needs_validation = igame.needs_verification

View file

@ -92,8 +92,7 @@ class Manifest:
_m.file_manifest_list = FML.read(_tmp)
_m.custom_fields = CustomFields.read(_tmp)
unhandled_data = _tmp.read()
if unhandled_data:
if unhandled_data := _tmp.read():
logger.warning(f'Did not read {len(unhandled_data)} remaining bytes in manifest! '
f'This may not be a problem.')
@ -152,10 +151,7 @@ class Manifest:
self.data = zlib.compress(self.data)
self.size_compressed = len(self.data)
if not fp:
bio = BytesIO()
else:
bio = fp
bio = fp or BytesIO()
bio.write(struct.pack('<I', self.header_magic))
bio.write(struct.pack('<I', self.header_size))
@ -166,10 +162,7 @@ class Manifest:
bio.write(struct.pack('<I', self.serialisation_version))
bio.write(self.data)
if not fp:
return bio.getvalue()
else:
return bio.tell()
return bio.tell() if fp else bio.getvalue()
class ManifestMeta:
@ -226,7 +219,7 @@ class ManifestMeta:
# This is a list though I've never seen more than one entry
entries = struct.unpack('<I', bio.read(4))[0]
for i in range(entries):
for _ in range(entries):
_meta.prereq_ids.append(read_fstring(bio))
_meta.prereq_name = read_fstring(bio)
@ -348,7 +341,7 @@ class CDL:
# the way this data is stored is rather odd, maybe there's a nicer way to write this...
for i in range(_cdl.count):
for _ in range(_cdl.count):
_cdl.elements.append(ChunkInfo(manifest_version=manifest_version))
# guid, doesn't seem to be a standard like UUID but is fairly straightfoward, 4 bytes, 128 bit.
@ -495,7 +488,7 @@ class FML:
_fml.version = struct.unpack('B', bio.read(1))[0]
_fml.count = struct.unpack('<I', bio.read(4))[0]
for i in range(_fml.count):
for _ in range(_fml.count):
_fml.elements.append(FileManifest())
for fm in _fml.elements:
@ -516,14 +509,14 @@ class FML:
# install tags, no idea what they do, I've only seen them in the Fortnite manifest
for fm in _fml.elements:
_elem = struct.unpack('<I', bio.read(4))[0]
for i in range(_elem):
for _ in range(_elem):
fm.install_tags.append(read_fstring(bio))
# Each file is made up of "Chunk Parts" that can be spread across the "chunk stream"
for fm in _fml.elements:
_elem = struct.unpack('<I', bio.read(4))[0]
_offset = 0
for i in range(_elem):
for _ in range(_elem):
chunkp = ChunkPart()
_start = bio.tell()
_size = struct.unpack('<I', bio.read(4))[0]
@ -709,15 +702,8 @@ class CustomFields:
_cf.version = struct.unpack('B', bio.read(1))[0]
_cf.count = struct.unpack('<I', bio.read(4))[0]
_keys = []
_values = []
for i in range(_cf.count):
_keys.append(read_fstring(bio))
for i in range(_cf.count):
_values.append(read_fstring(bio))
_keys = [read_fstring(bio) for _ in range(_cf.count)]
_values = [read_fstring(bio) for _ in range(_cf.count)]
_cf._dict = dict(zip(_keys, _values))
if (size_read := bio.tell() - cf_start) != _cf.size:
@ -766,8 +752,7 @@ class ManifestComparison:
old_files = {fm.filename: fm.hash for fm in old_manifest.file_manifest_list.elements}
for fm in manifest.file_manifest_list.elements:
old_file_hash = old_files.pop(fm.filename, None)
if old_file_hash:
if old_file_hash := old_files.pop(fm.filename, None):
if fm.hash == old_file_hash:
comp.unchanged.add(fm.filename)
else:

View file

@ -1,8 +1,5 @@
def get_boolean_choice(prompt, default=True):
if default:
yn = 'Y/n'
else:
yn = 'y/N'
yn = 'Y/n' if default else 'y/N'
choice = input(f'{prompt} [{yn}]: ')
if not choice:
@ -21,10 +18,10 @@ def get_int_choice(prompt, default=None, min_choice=None, max_choice=None, retur
while True:
try:
inp = input(prompt)
if not inp:
if inp := input(prompt):
choice = int(inp)
else:
return default
choice = int(inp)
except ValueError:
if return_on_invalid:
return None
@ -61,7 +58,7 @@ def sdl_prompt(sdl_data, title):
examples = ', '.join([g for g in sdl_data.keys() if g != '__required'][:2])
print(f'Please enter tags of pack(s) to install (space/comma-separated, e.g. "{examples}")')
print('Leave blank to use defaults (only required data will be downloaded).')
choices = input(f'Additional packs [Enter to confirm]: ')
choices = input('Additional packs [Enter to confirm]: ')
if not choices:
return tags

View file

@ -5,7 +5,7 @@ class HiddenAliasSubparsersAction(argparse._SubParsersAction):
def add_parser(self, name, **kwargs):
# set prog from the existing prefix
if kwargs.get('prog') is None:
kwargs['prog'] = '%s %s' % (self._prog_prefix, name)
kwargs['prog'] = f'{self._prog_prefix} {name}'
aliases = kwargs.pop('aliases', ())
hide_aliases = kwargs.pop('hide_aliases', False)

View file

@ -22,7 +22,7 @@ except Exception as e:
login_url = 'https://www.epicgames.com/id/login'
sid_url = 'https://www.epicgames.com/id/api/redirect?'
logout_url = 'https://www.epicgames.com/id/logout?productName=epic-games&redirectUrl=' + login_url
logout_url = f'https://www.epicgames.com/id/logout?productName=epic-games&redirectUrl={login_url}'
goodbye_url = 'https://legendary.gl/goodbye'
window_js = '''
window.ue = {
@ -102,7 +102,7 @@ class MockLauncher:
def trigger_sid_exchange(self, *args, **kwargs):
# check if code-based login hasn't already set the destroy flag
if not self.destroy_on_load:
logger.debug(f'Injecting SID JS')
logger.debug('Injecting SID JS')
# inject JS to get SID API response and call our API
self.window.evaluate_js(get_sid_js)
@ -139,6 +139,6 @@ def do_webview_login(callback_sid=None, callback_code=None):
return None
if api.callback_result is None:
logger.error(f'Login aborted by user.')
logger.error('Login aborted by user.')
return api.callback_result