mirror of
https://github.com/derrod/legendary.git
synced 2024-06-16 09:35:02 +12:00
Compare commits
48 commits
Author | SHA1 | Date | |
---|---|---|---|
7fefdc4973 | |||
96e07ff453 | |||
ac6290627c | |||
691048d481 | |||
837c166187 | |||
1841da51f0 | |||
56d439ed2d | |||
2fdacb75d3 | |||
d2963db5b2 | |||
f1d815797f | |||
591039eaf3 | |||
9131f32c22 | |||
450784283d | |||
c56a81ab64 | |||
488d14c6e0 | |||
4c765325af | |||
c6e622f3ae | |||
013f7d4bde | |||
03b21f49de | |||
bd2e7ca0cd | |||
20b121bdb9 | |||
b759d9dbb1 | |||
51377e8548 | |||
07a16f7b84 | |||
c69301212c | |||
865dd51e2b | |||
6536473063 | |||
6d7909c311 | |||
0e35b70941 | |||
e0428b497e | |||
6500ea73af | |||
96b155800a | |||
4145381b93 | |||
e26b9e60ff | |||
bdd53fb8f8 | |||
bbb19d6cb6 | |||
175168adcb | |||
8b2809779f | |||
4bed49e7e1 | |||
f97d799e87 | |||
09d39b3fe3 | |||
a70ac2d1f9 | |||
362287543b | |||
ae05b4c1e5 | |||
6b8273f983 | |||
00f025dcc9 | |||
87b01b77d8 | |||
f19a1ba69d |
40
.github/workflows/python.yml
vendored
40
.github/workflows/python.yml
vendored
|
@ -16,9 +16,9 @@ jobs:
|
|||
max-parallel: 3
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- uses: actions/setup-python@v2
|
||||
- uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.9'
|
||||
|
||||
|
@ -27,6 +27,7 @@ jobs:
|
|||
setuptools
|
||||
pyinstaller
|
||||
requests
|
||||
filelock
|
||||
|
||||
- name: Optional dependencies (WebView)
|
||||
run: pip3 install --upgrade pywebview
|
||||
|
@ -48,7 +49,7 @@ jobs:
|
|||
env:
|
||||
PYTHONOPTIMIZE: 1
|
||||
|
||||
- uses: actions/upload-artifact@v2
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: ${{ runner.os }}-package
|
||||
path: legendary/dist/*
|
||||
|
@ -57,26 +58,23 @@ jobs:
|
|||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Dependencies
|
||||
run: sudo apt install
|
||||
python3-all
|
||||
python3-stdeb
|
||||
dh-python
|
||||
python3-requests
|
||||
python3-setuptools
|
||||
python3-wheel
|
||||
|
||||
- name: Webview Dependencies
|
||||
run: sudo apt install
|
||||
python3-webview
|
||||
python3-gi
|
||||
python3-gi-cairo
|
||||
gir1.2-gtk-3.0
|
||||
run: |
|
||||
sudo apt install ruby
|
||||
sudo gem install fpm
|
||||
|
||||
- name: Build
|
||||
run: python3 setup.py --command-packages=stdeb.command bdist_deb
|
||||
run: fpm
|
||||
--input-type python
|
||||
--output-type deb
|
||||
--python-package-name-prefix python3
|
||||
--deb-suggests python3-webview
|
||||
--maintainer "Rodney <rodney@rodney.io>"
|
||||
--category python
|
||||
--depends "python3 >= 3.9"
|
||||
setup.py
|
||||
|
||||
- name: Os version
|
||||
id: os_version
|
||||
|
@ -84,7 +82,7 @@ jobs:
|
|||
source /etc/os-release
|
||||
echo ::set-output name=version::$NAME-$VERSION_ID
|
||||
|
||||
- uses: actions/upload-artifact@v2
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: ${{ steps.os_version.outputs.version }}-deb-package
|
||||
path: deb_dist/*.deb
|
||||
path: ./*.deb
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
"""Legendary!"""
|
||||
|
||||
__version__ = '0.20.31'
|
||||
__codename__ = 'Dark Energy (hotfix #5)'
|
||||
__version__ = '0.20.34'
|
||||
__codename__ = 'Direct Intervention'
|
||||
|
|
|
@ -121,9 +121,16 @@ class EPCAPI:
|
|||
r.raise_for_status()
|
||||
|
||||
j = r.json()
|
||||
if 'error' in j:
|
||||
self.log.warning(f'Login to EGS API failed with errorCode: {j["errorCode"]}')
|
||||
if 'errorCode' in j:
|
||||
if j['errorCode'] == 'errors.com.epicgames.oauth.corrective_action_required':
|
||||
self.log.error(f'{j["errorMessage"]} ({j["correctiveAction"]}), '
|
||||
f'open the following URL to take action: {j["continuationUrl"]}')
|
||||
else:
|
||||
self.log.error(f'Login to EGS API failed with errorCode: {j["errorCode"]}')
|
||||
raise InvalidCredentialsError(j['errorCode'])
|
||||
elif r.status_code >= 400:
|
||||
self.log.error(f'EGS API responded with status {r.status_code} but no error in response: {j}')
|
||||
raise InvalidCredentialsError('Unknown error')
|
||||
|
||||
self.session.headers['Authorization'] = f'bearer {j["access_token"]}'
|
||||
# only set user info when using non-anonymous login
|
||||
|
@ -179,13 +186,24 @@ class EPCAPI:
|
|||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def get_user_entitlements(self):
|
||||
def get_user_entitlements(self, start=0):
|
||||
user_id = self.user.get('account_id')
|
||||
r = self.session.get(f'https://{self._entitlements_host}/entitlement/api/account/{user_id}/entitlements',
|
||||
params=dict(start=0, count=5000), timeout=self.request_timeout)
|
||||
params=dict(start=start, count=1000), timeout=self.request_timeout)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def get_user_entitlements_full(self):
|
||||
ret = []
|
||||
|
||||
while True:
|
||||
resp = self.get_user_entitlements(start=len(ret))
|
||||
ret.extend(resp)
|
||||
if len(resp) < 1000:
|
||||
break
|
||||
|
||||
return ret
|
||||
|
||||
def get_game_info(self, namespace, catalog_item_id, timeout=None):
|
||||
r = self.session.get(f'https://{self._catalog_host}/catalog/api/shared/namespace/{namespace}/bulk/items',
|
||||
params=dict(id=catalog_item_id, includeDLCDetails=True, includeMainGameDetails=True,
|
||||
|
|
154
legendary/cli.py
154
legendary/cli.py
|
@ -1,4 +1,4 @@
|
|||
#!/usr/bin/env python
|
||||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
|
||||
import argparse
|
||||
|
@ -28,7 +28,7 @@ from legendary.utils.env import is_windows_mac_or_pyi
|
|||
from legendary.lfs.eos import add_registry_entries, query_registry_entries, remove_registry_entries
|
||||
from legendary.lfs.utils import validate_files, clean_filename
|
||||
from legendary.utils.selective_dl import get_sdl_appname
|
||||
from legendary.lfs.wine_helpers import read_registry, get_shell_folders
|
||||
from legendary.lfs.wine_helpers import read_registry, get_shell_folders, case_insensitive_file_search
|
||||
|
||||
# todo custom formatter for cli logger (clean info, highlighted error/warning)
|
||||
logging.basicConfig(
|
||||
|
@ -104,11 +104,11 @@ class LegendaryCLI:
|
|||
|
||||
if not egl_wine_pfx:
|
||||
logger.info('Please enter the path to the Wine prefix that has EGL installed')
|
||||
wine_pfx = input('Path [empty input to quit]: ').strip()
|
||||
if not wine_pfx:
|
||||
egl_wine_pfx = input('Path [empty input to quit]: ').strip()
|
||||
if not egl_wine_pfx:
|
||||
print('Empty input, quitting...')
|
||||
exit(0)
|
||||
if not os.path.exists(wine_pfx) and os.path.isdir(wine_pfx):
|
||||
if not os.path.exists(egl_wine_pfx) and os.path.isdir(egl_wine_pfx):
|
||||
print('Path is invalid (does not exist)!')
|
||||
exit(1)
|
||||
|
||||
|
@ -144,7 +144,7 @@ class LegendaryCLI:
|
|||
|
||||
exchange_token = ''
|
||||
auth_code = ''
|
||||
if not args.auth_code and not args.session_id:
|
||||
if not args.auth_code and not args.session_id and not args.ex_token:
|
||||
# only import here since pywebview import is slow
|
||||
from legendary.utils.webview_login import webview_available, do_webview_login
|
||||
|
||||
|
@ -162,7 +162,8 @@ class LegendaryCLI:
|
|||
else:
|
||||
auth_code = auth_code.strip('"')
|
||||
else:
|
||||
if do_webview_login(callback_code=self.core.auth_ex_token):
|
||||
if do_webview_login(callback_code=self.core.auth_ex_token,
|
||||
user_agent=f'EpicGamesLauncher/{self.core.get_egl_version()}'):
|
||||
logger.info(f'Successfully logged in as "{self.core.lgd.userdata["displayName"]}" via WebView')
|
||||
else:
|
||||
logger.error('WebView login attempt failed, please see log for details.')
|
||||
|
@ -315,7 +316,7 @@ class LegendaryCLI:
|
|||
|
||||
print('\nInstalled games:')
|
||||
for game in games:
|
||||
if game.install_size == 0:
|
||||
if game.install_size == 0 and self.core.lgd.lock_installed():
|
||||
logger.debug(f'Updating missing size for {game.app_name}')
|
||||
m = self.core.load_manifest(self.core.get_installed_manifest(game.app_name)[0])
|
||||
game.install_size = sum(fm.file_size for fm in m.file_manifest_list.elements)
|
||||
|
@ -371,6 +372,8 @@ class LegendaryCLI:
|
|||
|
||||
if args.install_tag:
|
||||
files = [fm for fm in files if args.install_tag in fm.install_tags]
|
||||
elif args.install_tag is not None:
|
||||
files = [fm for fm in files if not fm.install_tags]
|
||||
|
||||
if args.hashlist:
|
||||
for fm in files:
|
||||
|
@ -415,7 +418,11 @@ class LegendaryCLI:
|
|||
print('Save games:')
|
||||
for save in sorted(saves, key=lambda a: a.app_name + a.manifest_name):
|
||||
if save.app_name != last_app:
|
||||
game_title = self.core.get_game(save.app_name).app_title
|
||||
if game := self.core.get_game(save.app_name):
|
||||
game_title = game.app_title
|
||||
else:
|
||||
game_title = 'Unknown'
|
||||
|
||||
last_app = save.app_name
|
||||
print(f'- {game_title} ("{save.app_name}")')
|
||||
print(' +', save.manifest_name)
|
||||
|
@ -449,7 +456,7 @@ class LegendaryCLI:
|
|||
igames = [igame]
|
||||
|
||||
# check available saves
|
||||
saves = self.core.get_save_games()
|
||||
saves = self.core.get_save_games(args.app_name if args.app_name else '')
|
||||
latest_save = {
|
||||
save.app_name: save for save in sorted(saves, key=lambda a: a.datetime)
|
||||
}
|
||||
|
@ -468,12 +475,15 @@ class LegendaryCLI:
|
|||
logger.info(f'Checking "{igame.title}" ({igame.app_name})')
|
||||
# override save path only if app name is specified
|
||||
if args.app_name and args.save_path:
|
||||
if not self.core.lgd.lock_installed():
|
||||
logger.error('Unable to lock install data, cannot modify save path.')
|
||||
break
|
||||
logger.info(f'Overriding save path with "{args.save_path}"...')
|
||||
igame.save_path = args.save_path
|
||||
self.core.lgd.set_installed_game(igame.app_name, igame)
|
||||
|
||||
# if there is no saved save path, try to get one
|
||||
if not igame.save_path:
|
||||
# if there is no saved save path, try to get one, skip if we cannot get a install data lock
|
||||
if not igame.save_path and self.core.lgd.lock_installed():
|
||||
if args.yes and not args.accept_path:
|
||||
logger.info('Save path for this title has not been set, skipping due to --yes')
|
||||
continue
|
||||
|
@ -558,6 +568,7 @@ class LegendaryCLI:
|
|||
|
||||
def launch_game(self, args, extra):
|
||||
app_name = self._resolve_aliases(args.app_name)
|
||||
addon_app_name = None
|
||||
|
||||
# Interactive CrossOver setup
|
||||
if args.crossover and sys_platform == 'darwin':
|
||||
|
@ -568,12 +579,19 @@ class LegendaryCLI:
|
|||
return self._launch_origin(args)
|
||||
|
||||
igame = self.core.get_installed_game(app_name)
|
||||
if (not igame or not igame.executable) and (game := self.core.get_game(app_name)) is not None:
|
||||
# override installed game with base title
|
||||
if game.is_launchable_addon:
|
||||
addon_app_name = app_name
|
||||
app_name = game.metadata['mainGameItem']['releaseInfo'][0]['appId']
|
||||
igame = self.core.get_installed_game(app_name)
|
||||
|
||||
if not igame:
|
||||
logger.error(f'Game {app_name} is not currently installed!')
|
||||
exit(1)
|
||||
|
||||
if igame.is_dlc:
|
||||
logger.error(f'{app_name} is DLC; please launch the base game instead!')
|
||||
if igame.is_dlc and not igame.executable:
|
||||
logger.error(f'{app_name} is DLC without an executable; please launch the base game instead!')
|
||||
exit(1)
|
||||
|
||||
if not os.path.exists(igame.install_path):
|
||||
|
@ -612,7 +630,8 @@ class LegendaryCLI:
|
|||
disable_wine=args.no_wine,
|
||||
executable_override=args.executable_override,
|
||||
crossover_app=args.crossover_app,
|
||||
crossover_bottle=args.crossover_bottle)
|
||||
crossover_bottle=args.crossover_bottle,
|
||||
addon_app_name=addon_app_name)
|
||||
|
||||
if args.set_defaults:
|
||||
self.core.lgd.config[app_name] = dict()
|
||||
|
@ -772,6 +791,8 @@ class LegendaryCLI:
|
|||
f'wrapper in the configuration file or command line. See the README for details.')
|
||||
return
|
||||
|
||||
# You cannot launch a URI without start.exe
|
||||
command.append('start')
|
||||
command.append(origin_uri)
|
||||
if args.dry_run:
|
||||
if cmd:
|
||||
|
@ -792,6 +813,11 @@ class LegendaryCLI:
|
|||
subprocess.Popen(command, env=full_env)
|
||||
|
||||
def install_game(self, args):
|
||||
if not self.core.lgd.lock_installed():
|
||||
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
|
||||
'install/import/move applications at a time.')
|
||||
return
|
||||
|
||||
args.app_name = self._resolve_aliases(args.app_name)
|
||||
if self.core.is_installed(args.app_name):
|
||||
igame = self.core.get_installed_game(args.app_name)
|
||||
|
@ -897,7 +923,7 @@ class LegendaryCLI:
|
|||
if config_tags:
|
||||
self.core.lgd.config.remove_option(game.app_name, 'install_tags')
|
||||
config_tags = None
|
||||
self.core.lgd.config.set(game.app_name, 'disable_sdl', True)
|
||||
self.core.lgd.config.set(game.app_name, 'disable_sdl', 'true')
|
||||
sdl_enabled = False
|
||||
# just disable SDL, but keep config tags that have been manually specified
|
||||
elif config_disable_sdl or args.disable_sdl:
|
||||
|
@ -951,7 +977,8 @@ class LegendaryCLI:
|
|||
disable_delta=args.disable_delta,
|
||||
override_delta_manifest=args.override_delta_manifest,
|
||||
preferred_cdn=args.preferred_cdn,
|
||||
disable_https=args.disable_https)
|
||||
disable_https=args.disable_https,
|
||||
bind_ip=args.bind_ip)
|
||||
|
||||
# game is either up-to-date or hasn't changed, so we have nothing to do
|
||||
if not analysis.dl_size:
|
||||
|
@ -972,6 +999,15 @@ class LegendaryCLI:
|
|||
self.core.uninstall_tag(old_igame)
|
||||
self.core.install_game(old_igame)
|
||||
|
||||
if old_igame.install_tags:
|
||||
self.core.lgd.config.set(game.app_name, 'install_tags', ','.join(old_igame.install_tags))
|
||||
self.core.lgd.save_config()
|
||||
|
||||
# check if the version changed, this can happen for DLC that gets a version bump with no actual file changes
|
||||
if old_igame and old_igame.version != igame.version:
|
||||
old_igame.version = igame.version
|
||||
self.core.install_game(old_igame)
|
||||
|
||||
exit(0)
|
||||
|
||||
logger.info(f'Install size: {analysis.install_size / 1024 / 1024:.02f} MiB')
|
||||
|
@ -1120,6 +1156,11 @@ class LegendaryCLI:
|
|||
logger.info('Automatic installation not available on Linux.')
|
||||
|
||||
def uninstall_game(self, args):
|
||||
if not self.core.lgd.lock_installed():
|
||||
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
|
||||
'install/import/move applications at a time.')
|
||||
return
|
||||
|
||||
args.app_name = self._resolve_aliases(args.app_name)
|
||||
igame = self.core.get_installed_game(args.app_name)
|
||||
if not igame:
|
||||
|
@ -1131,6 +1172,9 @@ class LegendaryCLI:
|
|||
print('Aborting...')
|
||||
exit(0)
|
||||
|
||||
if os.name == 'nt' and igame.uninstaller and not args.skip_uninstaller:
|
||||
self._handle_uninstaller(igame, args.yes)
|
||||
|
||||
try:
|
||||
if not igame.is_dlc:
|
||||
# Remove DLC first so directory is empty when game uninstall runs
|
||||
|
@ -1147,6 +1191,23 @@ class LegendaryCLI:
|
|||
except Exception as e:
|
||||
logger.warning(f'Removing game failed: {e!r}, please remove {igame.install_path} manually.')
|
||||
|
||||
def _handle_uninstaller(self, igame, yes=False):
|
||||
uninstaller = igame.uninstaller
|
||||
|
||||
print('\nThis game provides the following uninstaller:')
|
||||
print(f'- {uninstaller["path"]} {uninstaller["args"]}\n')
|
||||
|
||||
if yes or get_boolean_choice('Do you wish to run the uninstaller?', default=True):
|
||||
logger.info('Running uninstaller...')
|
||||
req_path, req_exec = os.path.split(uninstaller['path'])
|
||||
work_dir = os.path.join(igame.install_path, req_path)
|
||||
fullpath = os.path.join(work_dir, req_exec)
|
||||
try:
|
||||
p = subprocess.Popen([fullpath, uninstaller['args']], cwd=work_dir, shell=True)
|
||||
p.wait()
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to run uninstaller: {e!r}')
|
||||
|
||||
def verify_game(self, args, print_command=True, repair_mode=False, repair_online=False):
|
||||
args.app_name = self._resolve_aliases(args.app_name)
|
||||
if not self.core.is_installed(args.app_name):
|
||||
|
@ -1183,7 +1244,7 @@ class LegendaryCLI:
|
|||
key=lambda a: a.filename.lower())
|
||||
|
||||
# build list of hashes
|
||||
if config_tags := self.core.lgd.config.get(args.app_name, 'install_tags', fallback=None):
|
||||
if (config_tags := self.core.lgd.config.get(args.app_name, 'install_tags', fallback=None)) is not None:
|
||||
install_tags = set(i.strip() for i in config_tags.split(','))
|
||||
file_list = [
|
||||
(f.filename, f.sha_hash.hex())
|
||||
|
@ -1250,6 +1311,11 @@ class LegendaryCLI:
|
|||
logger.info(f'Run "legendary repair {args.app_name}" to repair your game installation.')
|
||||
|
||||
def import_game(self, args):
|
||||
if not self.core.lgd.lock_installed():
|
||||
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
|
||||
'install/import/move applications at a time.')
|
||||
return
|
||||
|
||||
# make sure path is absolute
|
||||
args.app_path = os.path.abspath(args.app_path)
|
||||
args.app_name = self._resolve_aliases(args.app_name)
|
||||
|
@ -1288,6 +1354,8 @@ class LegendaryCLI:
|
|||
# get everything needed for import from core, then run additional checks.
|
||||
manifest, igame = self.core.import_game(game, args.app_path, platform=args.platform)
|
||||
exe_path = os.path.join(args.app_path, manifest.meta.launch_exe.lstrip('/'))
|
||||
if os.name != 'nt':
|
||||
exe_path = case_insensitive_file_search(exe_path)
|
||||
# check if most files at least exist or if user might have specified the wrong directory
|
||||
total = len(manifest.file_manifest_list.elements)
|
||||
found = sum(os.path.exists(os.path.join(args.app_path, f.filename))
|
||||
|
@ -1343,6 +1411,11 @@ class LegendaryCLI:
|
|||
logger.info(f'{"DLC" if game.is_dlc else "Game"} "{game.app_title}" has been imported.')
|
||||
|
||||
def egs_sync(self, args):
|
||||
if not self.core.lgd.lock_installed():
|
||||
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
|
||||
'install/import/move applications at a time.')
|
||||
return
|
||||
|
||||
if args.unlink:
|
||||
logger.info('Unlinking and resetting EGS and LGD sync...')
|
||||
self.core.lgd.config.remove_option('Legendary', 'egl_programdata')
|
||||
|
@ -1584,7 +1657,7 @@ class LegendaryCLI:
|
|||
else:
|
||||
logger.info('Game not installed and offline mode enabled, cannot load manifest.')
|
||||
elif game:
|
||||
entitlements = self.core.egs.get_user_entitlements()
|
||||
entitlements = self.core.egs.get_user_entitlements_full()
|
||||
egl_meta = self.core.egs.get_game_info(game.namespace, game.catalog_item_id)
|
||||
game.metadata = egl_meta
|
||||
# Get manifest if asset exists for current platform
|
||||
|
@ -1622,7 +1695,7 @@ class LegendaryCLI:
|
|||
# Find custom launch options, if available
|
||||
launch_options = []
|
||||
i = 1
|
||||
while f'extraLaunchOption_{i:03d}_Name' in game.metadata['customAttributes']:
|
||||
while f'extraLaunchOption_{i:03d}_Name' in game.metadata.get('customAttributes', {}):
|
||||
launch_options.append((
|
||||
game.metadata['customAttributes'][f'extraLaunchOption_{i:03d}_Name']['value'],
|
||||
game.metadata['customAttributes'][f'extraLaunchOption_{i:03d}_Args']['value']
|
||||
|
@ -1640,6 +1713,9 @@ class LegendaryCLI:
|
|||
else:
|
||||
game_infos.append(InfoItem('Extra launch options', 'launch_options', None, []))
|
||||
|
||||
game_infos.append(InfoItem('Command Line', 'command_line', game.additional_command_line,
|
||||
game.additional_command_line))
|
||||
|
||||
# list all owned DLC based on entitlements
|
||||
if entitlements and not game.is_dlc:
|
||||
owned_entitlements = {i['entitlementName'] for i in entitlements}
|
||||
|
@ -1650,18 +1726,18 @@ class LegendaryCLI:
|
|||
if dlc['entitlementName'] in owned_entitlements:
|
||||
owned_dlc.append((installable, None, dlc['title'], dlc['id']))
|
||||
elif installable:
|
||||
app_name = dlc['releaseInfo'][0]['appId']
|
||||
if app_name in owned_app_names:
|
||||
owned_dlc.append((installable, app_name, dlc['title'], dlc['id']))
|
||||
dlc_app_name = dlc['releaseInfo'][0]['appId']
|
||||
if dlc_app_name in owned_app_names:
|
||||
owned_dlc.append((installable, dlc_app_name, dlc['title'], dlc['id']))
|
||||
|
||||
if owned_dlc:
|
||||
human_list = []
|
||||
json_list = []
|
||||
for installable, app_name, title, dlc_id in owned_dlc:
|
||||
json_list.append(dict(app_name=app_name, title=title,
|
||||
for installable, dlc_app_name, title, dlc_id in owned_dlc:
|
||||
json_list.append(dict(app_name=dlc_app_name, title=title,
|
||||
installable=installable, id=dlc_id))
|
||||
if installable:
|
||||
human_list.append(f'App name: {app_name}, Title: "{title}"')
|
||||
human_list.append(f'App name: {dlc_app_name}, Title: "{title}"')
|
||||
else:
|
||||
human_list.append(f'Title: "{title}" (no installation required)')
|
||||
game_infos.append(InfoItem('Owned DLC', 'owned_dlc', human_list, json_list))
|
||||
|
@ -1746,6 +1822,17 @@ class LegendaryCLI:
|
|||
else:
|
||||
manifest_info.append(InfoItem('Prerequisites', 'prerequisites', None, None))
|
||||
|
||||
if manifest.meta.uninstall_action_path:
|
||||
human_list = [
|
||||
f'Uninstaller path: {manifest.meta.uninstall_action_path}',
|
||||
f'Uninstaller args: {manifest.meta.uninstall_action_args or "(None)"}',
|
||||
]
|
||||
manifest_info.append(InfoItem('Uninstaller', 'uninstaller', human_list,
|
||||
dict(path=manifest.meta.uninstall_action_path,
|
||||
args=manifest.meta.uninstall_action_args)))
|
||||
else:
|
||||
manifest_info.append(InfoItem('Uninstaller', 'uninstaller', None, None))
|
||||
|
||||
install_tags = {''}
|
||||
for fm in manifest.file_manifest_list.elements:
|
||||
for tag in fm.install_tags:
|
||||
|
@ -1959,7 +2046,7 @@ class LegendaryCLI:
|
|||
redeemed = {k['gameId'] for k in key_list if k['redeemedOnUplay']}
|
||||
|
||||
games = self.core.get_game_list()
|
||||
entitlements = self.core.egs.get_user_entitlements()
|
||||
entitlements = self.core.egs.get_user_entitlements_full()
|
||||
owned_entitlements = {i['entitlementName'] for i in entitlements}
|
||||
|
||||
uplay_games = []
|
||||
|
@ -2498,6 +2585,11 @@ class LegendaryCLI:
|
|||
logger.info('Saved choices to configuration.')
|
||||
|
||||
def move(self, args):
|
||||
if not self.core.lgd.lock_installed():
|
||||
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
|
||||
'install/import/move applications at a time.')
|
||||
return
|
||||
|
||||
app_name = self._resolve_aliases(args.app_name)
|
||||
igame = self.core.get_installed_game(app_name, skip_sync=True)
|
||||
if not igame:
|
||||
|
@ -2535,6 +2627,10 @@ class LegendaryCLI:
|
|||
|
||||
|
||||
def main():
|
||||
# Set output encoding to UTF-8 if not outputting to a terminal
|
||||
if not stdout.isatty():
|
||||
stdout.reconfigure(encoding='utf-8')
|
||||
|
||||
parser = argparse.ArgumentParser(description=f'Legendary v{__version__} - "{__codename__}"')
|
||||
parser.register('action', 'parsers', HiddenAliasSubparsersAction)
|
||||
|
||||
|
@ -2700,9 +2796,13 @@ def main():
|
|||
help='Automatically install all DLCs with the base game')
|
||||
install_parser.add_argument('--skip-dlcs', dest='skip_dlcs', action='store_true',
|
||||
help='Do not ask about installing DLCs.')
|
||||
install_parser.add_argument('--bind', dest='bind_ip', action='store', metavar='<IPs>', type=str,
|
||||
help='Comma-separated list of IPs to bind to for downloading')
|
||||
|
||||
uninstall_parser.add_argument('--keep-files', dest='keep_files', action='store_true',
|
||||
help='Keep files but remove game from Legendary database')
|
||||
uninstall_parser.add_argument('--skip-uninstaller', dest='skip_uninstaller', action='store_true',
|
||||
help='Skip running the uninstaller')
|
||||
|
||||
launch_parser.add_argument('--offline', dest='offline', action='store_true',
|
||||
default=False, help='Skip login and launch game without online authentication')
|
||||
|
|
|
@ -131,7 +131,8 @@ class LegendaryCore:
|
|||
Handles authentication via authorization code (either retrieved manually or automatically)
|
||||
"""
|
||||
try:
|
||||
self.lgd.userdata = self.egs.start_session(authorization_code=code)
|
||||
with self.lgd.userdata_lock as lock:
|
||||
lock.data = self.egs.start_session(authorization_code=code)
|
||||
return True
|
||||
except Exception as e:
|
||||
self.log.error(f'Logging in failed with {e!r}, please try again.')
|
||||
|
@ -142,7 +143,8 @@ class LegendaryCore:
|
|||
Handles authentication via exchange token (either retrieved manually or automatically)
|
||||
"""
|
||||
try:
|
||||
self.lgd.userdata = self.egs.start_session(exchange_token=code)
|
||||
with self.lgd.userdata_lock as lock:
|
||||
lock.data = self.egs.start_session(exchange_token=code)
|
||||
return True
|
||||
except Exception as e:
|
||||
self.log.error(f'Logging in failed with {e!r}, please try again.')
|
||||
|
@ -171,22 +173,23 @@ class LegendaryCore:
|
|||
raise ValueError('No login session in config')
|
||||
refresh_token = re_data['Token']
|
||||
try:
|
||||
self.lgd.userdata = self.egs.start_session(refresh_token=refresh_token)
|
||||
with self.lgd.userdata_lock as lock:
|
||||
lock.data = self.egs.start_session(refresh_token=refresh_token)
|
||||
return True
|
||||
except Exception as e:
|
||||
self.log.error(f'Logging in failed with {e!r}, please try again.')
|
||||
return False
|
||||
|
||||
def login(self, force_refresh=False) -> bool:
|
||||
def _login(self, lock, force_refresh=False) -> bool:
|
||||
"""
|
||||
Attempts logging in with existing credentials.
|
||||
|
||||
raises ValueError if no existing credentials or InvalidCredentialsError if the API return an error
|
||||
"""
|
||||
if not self.lgd.userdata:
|
||||
if not lock.data:
|
||||
raise ValueError('No saved credentials')
|
||||
elif self.logged_in and self.lgd.userdata['expires_at']:
|
||||
dt_exp = datetime.fromisoformat(self.lgd.userdata['expires_at'][:-1])
|
||||
elif self.logged_in and lock.data['expires_at']:
|
||||
dt_exp = datetime.fromisoformat(lock.data['expires_at'][:-1])
|
||||
dt_now = datetime.utcnow()
|
||||
td = dt_now - dt_exp
|
||||
|
||||
|
@ -212,8 +215,8 @@ class LegendaryCore:
|
|||
except Exception as e:
|
||||
self.log.warning(f'Checking for EOS Overlay updates failed: {e!r}')
|
||||
|
||||
if self.lgd.userdata['expires_at'] and not force_refresh:
|
||||
dt_exp = datetime.fromisoformat(self.lgd.userdata['expires_at'][:-1])
|
||||
if lock.data['expires_at'] and not force_refresh:
|
||||
dt_exp = datetime.fromisoformat(lock.data['expires_at'][:-1])
|
||||
dt_now = datetime.utcnow()
|
||||
td = dt_now - dt_exp
|
||||
|
||||
|
@ -221,7 +224,7 @@ class LegendaryCore:
|
|||
if dt_exp > dt_now and abs(td.total_seconds()) > 600:
|
||||
self.log.info('Trying to re-use existing login session...')
|
||||
try:
|
||||
self.egs.resume_session(self.lgd.userdata)
|
||||
self.egs.resume_session(lock.data)
|
||||
self.logged_in = True
|
||||
return True
|
||||
except InvalidCredentialsError as e:
|
||||
|
@ -233,19 +236,23 @@ class LegendaryCore:
|
|||
|
||||
try:
|
||||
self.log.info('Logging in...')
|
||||
userdata = self.egs.start_session(self.lgd.userdata['refresh_token'])
|
||||
userdata = self.egs.start_session(lock.data['refresh_token'])
|
||||
except InvalidCredentialsError:
|
||||
self.log.error('Stored credentials are no longer valid! Please login again.')
|
||||
self.lgd.invalidate_userdata()
|
||||
lock.clear()
|
||||
return False
|
||||
except (HTTPError, ConnectionError) as e:
|
||||
self.log.error(f'HTTP request for login failed: {e!r}, please try again later.')
|
||||
return False
|
||||
|
||||
self.lgd.userdata = userdata
|
||||
lock.data = userdata
|
||||
self.logged_in = True
|
||||
return True
|
||||
|
||||
def login(self, force_refresh=False) -> bool:
|
||||
with self.lgd.userdata_lock as lock:
|
||||
return self._login(lock, force_refresh=force_refresh)
|
||||
|
||||
def update_check_enabled(self):
|
||||
return not self.lgd.config.getboolean('Legendary', 'disable_update_check', fallback=False)
|
||||
|
||||
|
@ -294,6 +301,9 @@ class LegendaryCore:
|
|||
if lgd_config := version_info.get('legendary_config'):
|
||||
self.webview_killswitch = lgd_config.get('webview_killswitch', False)
|
||||
|
||||
def get_egl_version(self):
|
||||
return self._egl_version
|
||||
|
||||
def get_update_info(self):
|
||||
return self.lgd.get_cached_version()['data'].get('release_info')
|
||||
|
||||
|
@ -439,7 +449,10 @@ class LegendaryCore:
|
|||
game = Game(app_name=app_name, app_title=eg_meta['title'], metadata=eg_meta, asset_infos=assets[app_name])
|
||||
self.lgd.set_game_meta(game.app_name, game)
|
||||
games[app_name] = game
|
||||
still_needs_update.remove(app_name)
|
||||
try:
|
||||
still_needs_update.remove(app_name)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# setup and teardown of thread pool takes some time, so only do it when it makes sense.
|
||||
still_needs_update = {e[0] for e in fetch_list}
|
||||
|
@ -507,12 +520,16 @@ class LegendaryCore:
|
|||
_dlc = defaultdict(list)
|
||||
# get all the appnames we have to ignore
|
||||
ignore = set(i.app_name for i in self.get_assets())
|
||||
# broken old app name that we should always ignore
|
||||
ignore |= {'1'}
|
||||
|
||||
for libitem in self.egs.get_library_items():
|
||||
if libitem['namespace'] == 'ue' and skip_ue:
|
||||
continue
|
||||
if libitem['appName'] in ignore:
|
||||
continue
|
||||
if libitem['sandboxType'] == 'PRIVATE':
|
||||
continue
|
||||
|
||||
game = self.lgd.get_game_meta(libitem['appName'])
|
||||
if not game or force_refresh:
|
||||
|
@ -672,9 +689,10 @@ class LegendaryCore:
|
|||
disable_wine: bool = False,
|
||||
executable_override: str = None,
|
||||
crossover_app: str = None,
|
||||
crossover_bottle: str = None) -> LaunchParameters:
|
||||
crossover_bottle: str = None,
|
||||
addon_app_name: str = None) -> LaunchParameters:
|
||||
install = self.lgd.get_installed_game(app_name)
|
||||
game = self.lgd.get_game_meta(app_name)
|
||||
game = self.lgd.get_game_meta(addon_app_name if addon_app_name else app_name)
|
||||
|
||||
# Disable wine for non-Windows executables (e.g. native macOS)
|
||||
if not install.platform.startswith('Win'):
|
||||
|
@ -716,6 +734,13 @@ class LegendaryCore:
|
|||
self.log.warning(f'Parsing predefined launch parameters failed with: {e!r}, '
|
||||
f'input: {install.launch_parameters}')
|
||||
|
||||
if meta_args := game.additional_command_line:
|
||||
try:
|
||||
params.game_parameters.extend(shlex.split(meta_args.strip(), posix=False))
|
||||
except ValueError as e:
|
||||
self.log.warning(f'Parsing metadata launch parameters failed with: {e!r}, '
|
||||
f'input: {install.launch_parameters}')
|
||||
|
||||
game_token = ''
|
||||
if not offline:
|
||||
self.log.info('Getting authentication token...')
|
||||
|
@ -1229,7 +1254,13 @@ class LegendaryCore:
|
|||
|
||||
for url in manifest_urls:
|
||||
self.log.debug(f'Trying to download manifest from "{url}"...')
|
||||
r = self.egs.unauth_session.get(url)
|
||||
try:
|
||||
r = self.egs.unauth_session.get(url, timeout=10.0)
|
||||
except Exception as e:
|
||||
self.log.warning(f'Unable to download manifest from "{urlparse(url).netloc}" '
|
||||
f'(Exception: {e!r}), trying next URL...')
|
||||
continue
|
||||
|
||||
if r.status_code == 200:
|
||||
manifest_bytes = r.content
|
||||
break
|
||||
|
@ -1276,7 +1307,7 @@ class LegendaryCore:
|
|||
repair: bool = False, repair_use_latest: bool = False,
|
||||
disable_delta: bool = False, override_delta_manifest: str = '',
|
||||
egl_guid: str = '', preferred_cdn: str = None,
|
||||
disable_https: bool = False) -> (DLManager, AnalysisResult, ManifestMeta):
|
||||
disable_https: bool = False, bind_ip: str = None) -> (DLManager, AnalysisResult, ManifestMeta):
|
||||
# load old manifest
|
||||
old_manifest = None
|
||||
|
||||
|
@ -1377,7 +1408,7 @@ class LegendaryCore:
|
|||
self.log.info(f'"{base_path}" does not exist, creating...')
|
||||
os.makedirs(base_path)
|
||||
|
||||
install_path = os.path.normpath(os.path.join(base_path, game_folder))
|
||||
install_path = os.path.normpath(os.path.join(base_path, game_folder.strip()))
|
||||
|
||||
# check for write access on the install path or its parent directory if it doesn't exist yet
|
||||
base_path = os.path.dirname(install_path)
|
||||
|
@ -1443,7 +1474,7 @@ class LegendaryCore:
|
|||
|
||||
dlm = DLManager(install_path, base_url, resume_file=resume_file, status_q=status_q,
|
||||
max_shared_memory=max_shm * 1024 * 1024, max_workers=max_workers,
|
||||
dl_timeout=dl_timeout)
|
||||
dl_timeout=dl_timeout, bind_ip=bind_ip)
|
||||
anlres = dlm.run_analysis(manifest=new_manifest, old_manifest=old_manifest,
|
||||
patch=not disable_patching, resume=not force,
|
||||
file_prefix_filter=file_prefix_filter,
|
||||
|
@ -1456,6 +1487,11 @@ class LegendaryCore:
|
|||
prereq = dict(ids=new_manifest.meta.prereq_ids, name=new_manifest.meta.prereq_name,
|
||||
path=new_manifest.meta.prereq_path, args=new_manifest.meta.prereq_args)
|
||||
|
||||
uninstaller = None
|
||||
if new_manifest.meta.uninstall_action_path:
|
||||
uninstaller = dict(path=new_manifest.meta.uninstall_action_path,
|
||||
args=new_manifest.meta.uninstall_action_args)
|
||||
|
||||
offline = game.metadata.get('customAttributes', {}).get('CanRunOffline', {}).get('value', 'true')
|
||||
ot = game.metadata.get('customAttributes', {}).get('OwnershipToken', {}).get('value', 'false')
|
||||
|
||||
|
@ -1479,7 +1515,7 @@ class LegendaryCore:
|
|||
can_run_offline=offline == 'true', requires_ot=ot == 'true',
|
||||
is_dlc=base_game is not None, install_size=anlres.install_size,
|
||||
egl_guid=egl_guid, install_tags=file_install_tag,
|
||||
platform=platform)
|
||||
platform=platform, uninstaller=uninstaller)
|
||||
|
||||
return dlm, anlres, igame
|
||||
|
||||
|
@ -1580,6 +1616,21 @@ class LegendaryCore:
|
|||
results.warnings.add('You may want to consider trying one of the following executables '
|
||||
f'(see README for launch parameter/config option usage):\n{alt_str}')
|
||||
|
||||
# Detect EOS service
|
||||
eos_installer = next((f for f in analysis.manifest_comparison.added
|
||||
if 'epiconlineservicesinstaller' in f.lower()), None)
|
||||
has_bootstrapper = any('eosbootstrapper' in f.lower() for f in analysis.manifest_comparison.added)
|
||||
|
||||
if eos_installer:
|
||||
results.warnings.add('This game ships the Epic Online Services Windows service, '
|
||||
'it may have to be installed for the game to work properly. '
|
||||
f'To do so, run "{eos_installer}" inside the game directory '
|
||||
f'after the install has finished.')
|
||||
elif has_bootstrapper:
|
||||
results.warnings.add('This game ships the Epic Online Services bootstrapper. '
|
||||
'The Epic Online Services Windows service may have to be '
|
||||
'installed manually for the game to function properly.')
|
||||
|
||||
return results
|
||||
|
||||
def get_default_install_dir(self, platform='Windows'):
|
||||
|
@ -1733,6 +1784,9 @@ class LegendaryCore:
|
|||
def egl_import(self, app_name):
|
||||
if not self.asset_valid(app_name):
|
||||
raise ValueError(f'To-be-imported game {app_name} not in game asset database!')
|
||||
if not self.lgd.lock_installed():
|
||||
self.log.warning('Could not acquire lock for EGL import')
|
||||
return
|
||||
|
||||
self.log.debug(f'Importing "{app_name}" from EGL')
|
||||
# load egl json file
|
||||
|
@ -1780,9 +1834,12 @@ class LegendaryCore:
|
|||
|
||||
# mark game as installed
|
||||
_ = self._install_game(lgd_igame)
|
||||
return
|
||||
|
||||
def egl_export(self, app_name):
|
||||
if not self.lgd.lock_installed():
|
||||
self.log.warning('Could not acquire lock for EGL import')
|
||||
return
|
||||
|
||||
self.log.debug(f'Exporting "{app_name}" to EGL')
|
||||
# load igame/game
|
||||
lgd_game = self.get_game(app_name)
|
||||
|
@ -1844,6 +1901,10 @@ class LegendaryCore:
|
|||
"""
|
||||
Sync game installs between Legendary and the Epic Games Launcher
|
||||
"""
|
||||
if not self.lgd.lock_installed():
|
||||
self.log.warning('Could not acquire lock for EGL sync')
|
||||
return
|
||||
|
||||
# read egl json files
|
||||
if app_name:
|
||||
lgd_igame = self._get_installed_game(app_name)
|
||||
|
|
|
@ -22,7 +22,7 @@ from legendary.models.manifest import ManifestComparison, Manifest
|
|||
class DLManager(Process):
|
||||
def __init__(self, download_dir, base_url, cache_dir=None, status_q=None,
|
||||
max_workers=0, update_interval=1.0, dl_timeout=10, resume_file=None,
|
||||
max_shared_memory=1024 * 1024 * 1024):
|
||||
max_shared_memory=1024 * 1024 * 1024, bind_ip=None):
|
||||
super().__init__(name='DLManager')
|
||||
self.log = logging.getLogger('DLM')
|
||||
self.proc_debug = False
|
||||
|
@ -37,8 +37,11 @@ class DLManager(Process):
|
|||
self.writer_queue = None
|
||||
self.dl_result_q = None
|
||||
self.writer_result_q = None
|
||||
|
||||
# Worker stuff
|
||||
self.max_workers = max_workers or min(cpu_count() * 2, 16)
|
||||
self.dl_timeout = dl_timeout
|
||||
self.bind_ips = [] if not bind_ip else bind_ip.split(',')
|
||||
|
||||
# Analysis stuff
|
||||
self.analysis = None
|
||||
|
@ -137,6 +140,24 @@ class DLManager(Process):
|
|||
except Exception as e:
|
||||
self.log.warning(f'Reading resume file failed: {e!r}, continuing as normal...')
|
||||
|
||||
elif resume:
|
||||
# Basic check if files exist locally, put all missing files into "added"
|
||||
# This allows new SDL tags to be installed without having to do a repair as well.
|
||||
missing_files = set()
|
||||
|
||||
for fm in manifest.file_manifest_list.elements:
|
||||
if fm.filename in mc.added:
|
||||
continue
|
||||
|
||||
local_path = os.path.join(self.dl_dir, fm.filename)
|
||||
if not os.path.exists(local_path):
|
||||
missing_files.add(fm.filename)
|
||||
|
||||
self.log.info(f'Found {len(missing_files)} missing files.')
|
||||
mc.added |= missing_files
|
||||
mc.changed -= missing_files
|
||||
mc.unchanged -= missing_files
|
||||
|
||||
# Install tags are used for selective downloading, e.g. for language packs
|
||||
additional_deletion_tasks = []
|
||||
if file_install_tag is not None:
|
||||
|
@ -637,10 +658,15 @@ class DLManager(Process):
|
|||
self.writer_result_q = MPQueue(-1)
|
||||
|
||||
self.log.info(f'Starting download workers...')
|
||||
|
||||
bind_ip = None
|
||||
for i in range(self.max_workers):
|
||||
if self.bind_ips:
|
||||
bind_ip = self.bind_ips[i % len(self.bind_ips)]
|
||||
|
||||
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q,
|
||||
self.shared_memory.name, logging_queue=self.logging_queue,
|
||||
dl_timeout=self.dl_timeout)
|
||||
dl_timeout=self.dl_timeout, bind_addr=bind_ip)
|
||||
self.children.append(w)
|
||||
w.start()
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
# coding: utf-8
|
||||
|
||||
import os
|
||||
import requests
|
||||
import time
|
||||
import logging
|
||||
|
||||
|
@ -10,6 +9,9 @@ from multiprocessing import Process
|
|||
from multiprocessing.shared_memory import SharedMemory
|
||||
from queue import Empty
|
||||
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
|
||||
|
||||
from legendary.models.chunk import Chunk
|
||||
from legendary.models.downloading import (
|
||||
DownloaderTask, DownloaderTaskResult,
|
||||
|
@ -18,9 +20,22 @@ from legendary.models.downloading import (
|
|||
)
|
||||
|
||||
|
||||
class BindingHTTPAdapter(HTTPAdapter):
|
||||
def __init__(self, addr):
|
||||
self.__attrs__.append('addr')
|
||||
self.addr = addr
|
||||
super().__init__()
|
||||
|
||||
def init_poolmanager(
|
||||
self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs
|
||||
):
|
||||
pool_kwargs['source_address'] = (self.addr, 0)
|
||||
super().init_poolmanager(connections, maxsize, block, **pool_kwargs)
|
||||
|
||||
|
||||
class DLWorker(Process):
|
||||
def __init__(self, name, queue, out_queue, shm, max_retries=7,
|
||||
logging_queue=None, dl_timeout=10):
|
||||
logging_queue=None, dl_timeout=10, bind_addr=None):
|
||||
super().__init__(name=name)
|
||||
self.q = queue
|
||||
self.o_q = out_queue
|
||||
|
@ -34,6 +49,12 @@ class DLWorker(Process):
|
|||
self.logging_queue = logging_queue
|
||||
self.dl_timeout = float(dl_timeout) if dl_timeout else 10.0
|
||||
|
||||
# optionally bind an address
|
||||
if bind_addr:
|
||||
adapter = BindingHTTPAdapter(bind_addr)
|
||||
self.session.mount('https://', adapter)
|
||||
self.session.mount('http://', adapter)
|
||||
|
||||
def run(self):
|
||||
# we have to fix up the logger before we can start
|
||||
_root = logging.getLogger()
|
||||
|
@ -105,11 +126,12 @@ class DLWorker(Process):
|
|||
|
||||
# decompress stuff
|
||||
try:
|
||||
size = len(chunk.data)
|
||||
data = chunk.data
|
||||
size = len(data)
|
||||
if size > job.shm.size:
|
||||
logger.fatal('Downloaded chunk is longer than SharedMemorySegment!')
|
||||
|
||||
self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data)
|
||||
self.shm.buf[job.shm.offset:job.shm.offset + size] = data
|
||||
del chunk
|
||||
self.o_q.put(DownloaderTaskResult(success=True, size_decompressed=size,
|
||||
size_downloaded=compressed, **job.__dict__))
|
||||
|
@ -250,7 +272,7 @@ class FileWorker(Process):
|
|||
if j.shared_memory:
|
||||
shm_offset = j.shared_memory.offset + j.chunk_offset
|
||||
shm_end = shm_offset + j.chunk_size
|
||||
current_file.write(self.shm.buf[shm_offset:shm_end].tobytes())
|
||||
current_file.write(self.shm.buf[shm_offset:shm_end])
|
||||
elif j.cache_file:
|
||||
with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f:
|
||||
if j.chunk_offset:
|
||||
|
|
|
@ -4,11 +4,14 @@ import json
|
|||
import os
|
||||
import logging
|
||||
|
||||
from contextlib import contextmanager
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from time import time
|
||||
|
||||
from .utils import clean_filename
|
||||
from filelock import FileLock
|
||||
|
||||
from .utils import clean_filename, LockedJSONData
|
||||
|
||||
from legendary.models.game import *
|
||||
from legendary.utils.aliasing import generate_aliases
|
||||
|
@ -16,11 +19,16 @@ from legendary.models.config import LGDConf
|
|||
from legendary.utils.env import is_windows_mac_or_pyi
|
||||
|
||||
|
||||
FILELOCK_DEBUG = False
|
||||
|
||||
|
||||
class LGDLFS:
|
||||
def __init__(self, config_file=None):
|
||||
self.log = logging.getLogger('LGDLFS')
|
||||
|
||||
if config_path := os.environ.get('XDG_CONFIG_HOME'):
|
||||
if config_path := os.environ.get('LEGENDARY_CONFIG_PATH'):
|
||||
self.path = config_path
|
||||
elif config_path := os.environ.get('XDG_CONFIG_HOME'):
|
||||
self.path = os.path.join(config_path, 'legendary')
|
||||
else:
|
||||
self.path = os.path.expanduser('~/.config/legendary')
|
||||
|
@ -84,6 +92,11 @@ class LGDLFS:
|
|||
self.log.warning(f'Removing "{os.path.join(self.path, "manifests", "old")}" folder failed: '
|
||||
f'{e!r}, please remove manually')
|
||||
|
||||
if not FILELOCK_DEBUG:
|
||||
# Prevent filelock logger from spamming Legendary debug output
|
||||
filelock_logger = logging.getLogger('filelock')
|
||||
filelock_logger.setLevel(logging.INFO)
|
||||
|
||||
# try loading config
|
||||
try:
|
||||
self.config.read(self.config_path)
|
||||
|
@ -105,6 +118,8 @@ class LGDLFS:
|
|||
self.config.set('Legendary', '; Disables the notice about an available update on exit')
|
||||
self.config.set('Legendary', 'disable_update_notice', 'false' if is_windows_mac_or_pyi() else 'true')
|
||||
|
||||
self._installed_lock = FileLock(os.path.join(self.path, 'installed.json') + '.lock')
|
||||
|
||||
try:
|
||||
self._installed = json.load(open(os.path.join(self.path, 'installed.json')))
|
||||
except Exception as e:
|
||||
|
@ -130,31 +145,35 @@ class LGDLFS:
|
|||
except Exception as e:
|
||||
self.log.debug(f'Loading aliases failed with {e!r}')
|
||||
|
||||
@property
|
||||
@contextmanager
|
||||
def userdata_lock(self) -> LockedJSONData:
|
||||
"""Wrapper around the lock to automatically update user data when it is released"""
|
||||
with LockedJSONData(os.path.join(self.path, 'user.json')) as lock:
|
||||
try:
|
||||
yield lock
|
||||
finally:
|
||||
self._user_data = lock.data
|
||||
|
||||
@property
|
||||
def userdata(self):
|
||||
if self._user_data is not None:
|
||||
return self._user_data
|
||||
|
||||
try:
|
||||
self._user_data = json.load(open(os.path.join(self.path, 'user.json')))
|
||||
return self._user_data
|
||||
with self.userdata_lock as locked:
|
||||
return locked.data
|
||||
except Exception as e:
|
||||
self.log.debug(f'Failed to load user data: {e!r}')
|
||||
return None
|
||||
|
||||
@userdata.setter
|
||||
def userdata(self, userdata):
|
||||
if userdata is None:
|
||||
raise ValueError('Userdata is none!')
|
||||
|
||||
self._user_data = userdata
|
||||
json.dump(userdata, open(os.path.join(self.path, 'user.json'), 'w'),
|
||||
indent=2, sort_keys=True)
|
||||
raise NotImplementedError('The setter has been removed, use the locked userdata instead.')
|
||||
|
||||
def invalidate_userdata(self):
|
||||
self._user_data = None
|
||||
if os.path.exists(os.path.join(self.path, 'user.json')):
|
||||
os.remove(os.path.join(self.path, 'user.json'))
|
||||
with self.userdata_lock as lock:
|
||||
lock.clear()
|
||||
|
||||
@property
|
||||
def entitlements(self):
|
||||
|
@ -280,6 +299,27 @@ class LGDLFS:
|
|||
except Exception as e:
|
||||
self.log.warning(f'Failed to delete file "{f}": {e!r}')
|
||||
|
||||
def lock_installed(self) -> bool:
|
||||
"""
|
||||
Locks the install data. We do not care about releasing this lock.
|
||||
If it is acquired by a Legendary instance it should own the lock until it exits.
|
||||
Some operations such as egl sync may be simply skipped if a lock cannot be acquired
|
||||
"""
|
||||
if self._installed_lock.is_locked:
|
||||
return True
|
||||
|
||||
try:
|
||||
self._installed_lock.acquire(blocking=False)
|
||||
# reload data in case it has been updated elsewhere
|
||||
try:
|
||||
self._installed = json.load(open(os.path.join(self.path, 'installed.json')))
|
||||
except Exception as e:
|
||||
self.log.debug(f'Failed to load installed game data: {e!r}')
|
||||
|
||||
return True
|
||||
except TimeoutError:
|
||||
return False
|
||||
|
||||
def get_installed_game(self, app_name):
|
||||
if self._installed is None:
|
||||
try:
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
import os
|
||||
import shutil
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
|
||||
from pathlib import Path
|
||||
|
@ -10,6 +11,8 @@ from sys import stdout
|
|||
from time import perf_counter
|
||||
from typing import List, Iterator
|
||||
|
||||
from filelock import FileLock
|
||||
|
||||
from legendary.models.game import VerifyResult
|
||||
|
||||
logger = logging.getLogger('LFS Utils')
|
||||
|
@ -153,3 +156,45 @@ def clean_filename(filename):
|
|||
|
||||
def get_dir_size(path):
|
||||
return sum(f.stat().st_size for f in Path(path).glob('**/*') if f.is_file())
|
||||
|
||||
|
||||
class LockedJSONData(FileLock):
|
||||
def __init__(self, file_path: str):
|
||||
super().__init__(file_path + '.lock')
|
||||
|
||||
self._file_path = file_path
|
||||
self._data = None
|
||||
self._initial_data = None
|
||||
|
||||
def __enter__(self):
|
||||
super().__enter__()
|
||||
|
||||
if os.path.exists(self._file_path):
|
||||
with open(self._file_path, 'r', encoding='utf-8') as f:
|
||||
self._data = json.load(f)
|
||||
self._initial_data = self._data
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
super().__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
if self._data != self._initial_data:
|
||||
if self._data is not None:
|
||||
with open(self._file_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(self._data, f, indent=2, sort_keys=True)
|
||||
else:
|
||||
if os.path.exists(self._file_path):
|
||||
os.remove(self._file_path)
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
return self._data
|
||||
|
||||
@data.setter
|
||||
def data(self, new_data):
|
||||
if new_data is None:
|
||||
raise ValueError('Invalid new data, use clear() explicitly to reset file data')
|
||||
self._data = new_data
|
||||
|
||||
def clear(self):
|
||||
self._data = None
|
||||
|
|
|
@ -20,6 +20,37 @@ def get_shell_folders(registry, wine_pfx):
|
|||
return folders
|
||||
|
||||
|
||||
def case_insensitive_file_search(path: str) -> str:
|
||||
"""
|
||||
Similar to case_insensitive_path_search: Finds a file case-insensitively
|
||||
Note that this *does* work on Windows, although it's rather pointless
|
||||
"""
|
||||
path_parts = os.path.normpath(path).split(os.sep)
|
||||
# If path_parts[0] is empty, we're on Unix and thus start searching at /
|
||||
if not path_parts[0]:
|
||||
path_parts[0] = '/'
|
||||
|
||||
computed_path = path_parts[0]
|
||||
for part in path_parts[1:]:
|
||||
# If the computed directory does not exist, add all remaining parts as-is to at least return a valid path
|
||||
# at the end
|
||||
if not os.path.exists(computed_path):
|
||||
computed_path = os.path.join(computed_path, part)
|
||||
continue
|
||||
|
||||
# First try to find an exact match
|
||||
actual_file_or_dirname = part if os.path.exists(os.path.join(computed_path, part)) else None
|
||||
|
||||
# If there is no case-sensitive match, find a case-insensitive one
|
||||
if not actual_file_or_dirname:
|
||||
actual_file_or_dirname = next((
|
||||
x for x in os.listdir(computed_path)
|
||||
if x.lower() == part.lower()
|
||||
), part)
|
||||
computed_path = os.path.join(computed_path, actual_file_or_dirname)
|
||||
return computed_path
|
||||
|
||||
|
||||
def case_insensitive_path_search(path):
|
||||
"""
|
||||
Attempts to find a path case-insensitively
|
||||
|
|
|
@ -91,6 +91,18 @@ class Game:
|
|||
def supports_mac_cloud_saves(self):
|
||||
return self.metadata and (self.metadata.get('customAttributes', {}).get('CloudSaveFolder_MAC') is not None)
|
||||
|
||||
@property
|
||||
def additional_command_line(self):
|
||||
if not self.metadata:
|
||||
return None
|
||||
return self.metadata.get('customAttributes', {}).get('AdditionalCommandLine', {}).get('value', None)
|
||||
|
||||
@property
|
||||
def is_launchable_addon(self):
|
||||
if not self.metadata:
|
||||
return False
|
||||
return any(m['path'] == 'addons/launchable' for m in self.metadata.get('categories', []))
|
||||
|
||||
@property
|
||||
def catalog_item_id(self):
|
||||
if not self.metadata:
|
||||
|
@ -149,6 +161,7 @@ class InstalledGame:
|
|||
needs_verification: bool = False
|
||||
platform: str = 'Windows'
|
||||
prereq_info: Optional[Dict] = None
|
||||
uninstaller: Optional[Dict] = None
|
||||
requires_ot: bool = False
|
||||
save_path: Optional[str] = None
|
||||
|
||||
|
@ -165,6 +178,7 @@ class InstalledGame:
|
|||
tmp.executable = json.get('executable', '')
|
||||
tmp.launch_parameters = json.get('launch_parameters', '')
|
||||
tmp.prereq_info = json.get('prereq_info', None)
|
||||
tmp.uninstaller = json.get('uninstaller', None)
|
||||
|
||||
tmp.can_run_offline = json.get('can_run_offline', False)
|
||||
tmp.requires_ot = json.get('requires_ot', False)
|
||||
|
|
|
@ -22,11 +22,14 @@ def _filename_matches(filename, patterns):
|
|||
"""
|
||||
|
||||
for pattern in patterns:
|
||||
if pattern.endswith('/'):
|
||||
# pat is a directory, check if path starts with it
|
||||
if filename.startswith(pattern):
|
||||
return True
|
||||
elif fnmatch(filename, pattern):
|
||||
# Pattern is a directory, just check if path starts with it
|
||||
if pattern.endswith('/') and filename.startswith(pattern):
|
||||
return True
|
||||
# Check if pattern is a suffix of filename
|
||||
if filename.endswith(pattern):
|
||||
return True
|
||||
# Check if pattern with wildcards ('*') matches
|
||||
if fnmatch(filename, pattern):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
|
|
@ -124,7 +124,7 @@ class MockLauncher:
|
|||
self.window.load_url(logout_url)
|
||||
|
||||
|
||||
def do_webview_login(callback_sid=None, callback_code=None):
|
||||
def do_webview_login(callback_sid=None, callback_code=None, user_agent=None):
|
||||
api = MockLauncher(callback_sid=callback_sid, callback_code=callback_code)
|
||||
url = login_url
|
||||
|
||||
|
@ -143,7 +143,7 @@ def do_webview_login(callback_sid=None, callback_code=None):
|
|||
window.events.loaded += api.on_loaded
|
||||
|
||||
try:
|
||||
webview.start()
|
||||
webview.start(user_agent=user_agent)
|
||||
except Exception as we:
|
||||
logger.error(f'Running webview failed with {we!r}. If this error persists try the manual '
|
||||
f'login process by adding --disable-webview to your command line.')
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
requests<3.0
|
||||
filelock
|
||||
|
|
Loading…
Reference in a new issue