Compare commits

..

No commits in common. "master" and "0.20.25" have entirely different histories.

29 changed files with 466 additions and 1041 deletions

View file

@ -11,23 +11,26 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: ['ubuntu-20.04', 'windows-2019', 'macos-11']
os: ['ubuntu-20.04', 'windows-latest', 'macos-11']
fail-fast: false
max-parallel: 3
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v2
- uses: actions/setup-python@v4
- uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Legendary dependencies and build tools
- name: Python components
run: pip3 install --upgrade
setuptools
wheel
- name: Legendary dependencies and build tools
run: pip3 install --upgrade
pyinstaller
requests
filelock
- name: Optional dependencies (WebView)
run: pip3 install --upgrade pywebview
@ -44,37 +47,43 @@ jobs:
--onefile
--name legendary
${{ steps.strip.outputs.option }}
-i ../assets/windows_icon.ico
cli.py
env:
PYTHONOPTIMIZE: 1
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v2
with:
name: ${{ runner.os }}-package
path: legendary/dist/*
deb:
runs-on: ubuntu-22.04
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: ['ubuntu-20.04']
fail-fast: true
max-parallel: 3
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v2
- name: Dependencies
run: |
sudo apt install ruby
sudo gem install fpm
run: sudo apt install
python3-all
python3-stdeb
dh-python
python3-requests
python3-setuptools
python3-wheel
# pywebview is too outdated on 20.04, re-enable this on 22.04
# python3-webview
# python3-gi
# python3-gi-cairo
# gir1.2-gtk-3.0
- name: Build
run: fpm
--input-type python
--output-type deb
--python-package-name-prefix python3
--deb-suggests python3-webview
--maintainer "Rodney <rodney@rodney.io>"
--category python
--depends "python3 >= 3.9"
setup.py
run: python3 setup.py --command-packages=stdeb.command bdist_deb
- name: Os version
id: os_version
@ -82,7 +91,7 @@ jobs:
source /etc/os-release
echo ::set-output name=version::$NAME-$VERSION_ID
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v2
with:
name: ${{ steps.os_version.outputs.version }}-deb-package
path: ./*.deb
path: deb_dist/*.deb

View file

@ -33,13 +33,10 @@ it has to be run from a terminal (e.g. PowerShell)
## Requirements
- Linux, Windows (8.1+), or macOS (12.0+)
+ macOS support is in an early stage, and only tested on 12.0+
+ 32-bit operating systems are not supported
- python 3.9+ (64-bit)
+ (Windows) `pythonnet` is not yet compatible with 3.10+, use 3.9 if you plan to install `pywebview`
- PyPI packages:
+ `requests`
+ (optional) `pywebview` for webview-based login
+ (optional) `setuptools` and `wheel` for setup/building
- PyPI packages: `requests`, optionally `setuptools` and `wheel` for setup/building
**Note:** Running Windows applications on Linux or macOS requires [Wine](https://www.winehq.org/).
@ -140,7 +137,7 @@ legendary auth
When using the prebuilt Windows executables of version 0.20.14 or higher this should open a new window with the Epic Login.
Otherwise, authentication is a little finicky since we have to go through the Epic website and manually copy a code.
The login page should open in your browser and after logging in you should be presented with a JSON response that contains a code ("authorizationCode"), just copy the code into the terminal and hit enter.
The login page should open in your browser and after logging in you should be presented with a JSON response that contains a code ("sid"), just copy the code into the terminal and hit enter.
Alternatively you can use the `--import` flag to import the authentication from the Epic Games Launcher (manually specifying the used WINE prefix may be required on Linux).
Note that this will log you out of the Epic Launcher.
@ -268,10 +265,9 @@ optional arguments:
-h, --help show this help message and exit
--import Import Epic Games Launcher authentication data (logs
out of EGL)
--code <authorization code>
Use specified authorization code instead of interactive authentication
--token <exchange token>
Use specified exchange token instead of interactive authentication
--code <exchange code>
Use specified exchange code instead of interactive
authentication
--sid <session id> Use specified session id instead of interactive
authentication
--delete Remove existing authentication (log out)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 25 KiB

View file

@ -1,4 +1,4 @@
"""Legendary!"""
__version__ = '0.20.34'
__codename__ = 'Direct Intervention'
__version__ = '0.20.25'
__codename__ = 'Our Benefactors'

View file

@ -1,8 +1,6 @@
# !/usr/bin/env python
# coding: utf-8
import urllib.parse
import requests
import requests.adapters
import logging
@ -15,7 +13,7 @@ from legendary.models.gql import *
class EPCAPI:
_user_agent = 'UELauncher/11.0.1-14907503+++Portal+Release-Live Windows/10.0.19041.1.256.64bit'
_store_user_agent = 'EpicGamesLauncher/14.0.8-22004686+++Portal+Release-Live'
_store_user_agent = 'EpicGamesLauncher/11.0.1-14907503+++Portal+Release-Live'
# required for the oauth request
_user_basic = '34a02cf8f4414e29b15921876da36f9a'
_pw_basic = 'daafbccc737745039dffe53d94fc76cf'
@ -28,10 +26,7 @@ class EPCAPI:
_ecommerce_host = 'ecommerceintegration-public-service-ecomprod02.ol.epicgames.com'
_datastorage_host = 'datastorage-public-service-liveegs.live.use1a.on.epicgames.com'
_library_host = 'library-service.live.use1a.on.epicgames.com'
# Using the actual store host with a user-agent newer than 14.0.8 leads to a CF verification page,
# but the dedicated graphql host works fine.
# _store_gql_host = 'launcher.store.epicgames.com'
_store_gql_host = 'graphql.epicgames.com'
_store_gql_host = 'store-launcher.epicgames.com'
_artifact_service_host = 'artifact-public-service-prod.beee.live.use1a.on.epicgames.com'
def __init__(self, lc='en', cc='US', timeout=10.0):
@ -53,12 +48,10 @@ class EPCAPI:
self.language_code = lc
self.country_code = cc
self.request_timeout = timeout if timeout > 0 else None
def get_auth_url(self):
login_url = 'https://www.epicgames.com/id/login?redirectUrl='
redirect_url = f'https://www.epicgames.com/id/api/redirect?clientId={self._user_basic}&responseType=code'
return login_url + urllib.parse.quote(redirect_url)
if timeout > 0:
self.request_timeout = timeout
else:
self.request_timeout = None
def update_egs_params(self, egs_params):
# update user-agent
@ -94,7 +87,7 @@ class EPCAPI:
return self.user
def start_session(self, refresh_token: str = None, exchange_token: str = None,
authorization_code: str = None, client_credentials: bool = False) -> dict:
client_credentials: bool = False) -> dict:
if refresh_token:
params = dict(grant_type='refresh_token',
refresh_token=refresh_token,
@ -103,10 +96,6 @@ class EPCAPI:
params = dict(grant_type='exchange_code',
exchange_code=exchange_token,
token_type='eg1')
elif authorization_code:
params = dict(grant_type='authorization_code',
code=authorization_code,
token_type='eg1')
elif client_credentials:
params = dict(grant_type='client_credentials',
token_type='eg1')
@ -121,16 +110,9 @@ class EPCAPI:
r.raise_for_status()
j = r.json()
if 'errorCode' in j:
if j['errorCode'] == 'errors.com.epicgames.oauth.corrective_action_required':
self.log.error(f'{j["errorMessage"]} ({j["correctiveAction"]}), '
f'open the following URL to take action: {j["continuationUrl"]}')
else:
self.log.error(f'Login to EGS API failed with errorCode: {j["errorCode"]}')
if 'error' in j:
self.log.warning(f'Login to EGS API failed with errorCode: {j["errorCode"]}')
raise InvalidCredentialsError(j['errorCode'])
elif r.status_code >= 400:
self.log.error(f'EGS API responded with status {r.status_code} but no error in response: {j}')
raise InvalidCredentialsError('Unknown error')
self.session.headers['Authorization'] = f'bearer {j["access_token"]}'
# only set user info when using non-anonymous login
@ -186,24 +168,13 @@ class EPCAPI:
r.raise_for_status()
return r.json()
def get_user_entitlements(self, start=0):
def get_user_entitlements(self):
user_id = self.user.get('account_id')
r = self.session.get(f'https://{self._entitlements_host}/entitlement/api/account/{user_id}/entitlements',
params=dict(start=start, count=1000), timeout=self.request_timeout)
params=dict(start=0, count=5000), timeout=self.request_timeout)
r.raise_for_status()
return r.json()
def get_user_entitlements_full(self):
ret = []
while True:
resp = self.get_user_entitlements(start=len(ret))
ret.extend(resp)
if len(resp) < 1000:
break
return ret
def get_game_info(self, namespace, catalog_item_id, timeout=None):
r = self.session.get(f'https://{self._catalog_host}/catalog/api/shared/namespace/{namespace}/bulk/items',
params=dict(id=catalog_item_id, includeDLCDetails=True, includeMainGameDetails=True,
@ -213,7 +184,7 @@ class EPCAPI:
return r.json().get(catalog_item_id, None)
def get_artifact_service_ticket(self, sandbox_id: str, artifact_id: str, label='Live', platform='Windows'):
# Based on EOS Helper Windows service implementation. Only works with anonymous EOSH session.
# based on EOS windows service implementation, untested as it's not live yet (just 403s)
# sandbox_id is the same as the namespace, artifact_id is the same as the app name
r = self.session.post(f'https://{self._artifact_service_host}/artifact-service/api/public/v1/dependency/'
f'sandbox/{sandbox_id}/artifact/{artifact_id}/ticket',
@ -223,11 +194,11 @@ class EPCAPI:
r.raise_for_status()
return r.json()
def get_game_manifest_by_ticket(self, artifact_id: str, signed_ticket: str, label='Live', platform='Windows'):
# Based on EOS Helper Windows service implementation.
def get_game_manifest_by_ticket(self, artifact_id: str, ticket: dict):
# Untested as get_artifact_service_ticket is not working yet either
r = self.session.post(f'https://{self._launcher_host}/launcher/api/public/assets/v2/'
f'by-ticket/app/{artifact_id}',
json=dict(platform=platform, label=label, signedTicket=signed_ticket),
headers=dict(authorization=f'bearer {ticket["signedTicket"]}'),
timeout=self.request_timeout)
r.raise_for_status()
return r.json()
@ -253,8 +224,10 @@ class EPCAPI:
return records
def get_user_cloud_saves(self, app_name='', manifests=False, filenames=None):
if app_name:
app_name += '/manifests/' if manifests else '/'
if app_name and manifests:
app_name += '/manifests/'
elif app_name:
app_name += '/'
user_id = self.user.get('account_id')

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python3
#!/usr/bin/env python
# coding: utf-8
import argparse
@ -22,13 +22,13 @@ from legendary.core import LegendaryCore
from legendary.models.exceptions import InvalidCredentialsError
from legendary.models.game import SaveGameStatus, VerifyResult, Game
from legendary.utils.cli import get_boolean_choice, get_int_choice, sdl_prompt, strtobool
from legendary.lfs.crossover import *
from legendary.utils.crossover import *
from legendary.utils.custom_parser import HiddenAliasSubparsersAction
from legendary.utils.env import is_windows_mac_or_pyi
from legendary.lfs.eos import add_registry_entries, query_registry_entries, remove_registry_entries
from legendary.lfs.utils import validate_files, clean_filename
from legendary.utils.eos import add_registry_entries, query_registry_entries, remove_registry_entries
from legendary.utils.lfs import validate_files, clean_filename
from legendary.utils.selective_dl import get_sdl_appname
from legendary.lfs.wine_helpers import read_registry, get_shell_folders, case_insensitive_file_search
from legendary.utils.wine_helpers import read_registry, get_shell_folders
# todo custom formatter for cli logger (clean info, highlighted error/warning)
logging.basicConfig(
@ -104,11 +104,11 @@ class LegendaryCLI:
if not egl_wine_pfx:
logger.info('Please enter the path to the Wine prefix that has EGL installed')
egl_wine_pfx = input('Path [empty input to quit]: ').strip()
if not egl_wine_pfx:
wine_pfx = input('Path [empty input to quit]: ').strip()
if not wine_pfx:
print('Empty input, quitting...')
exit(0)
if not os.path.exists(egl_wine_pfx) and os.path.isdir(egl_wine_pfx):
if not os.path.exists(wine_pfx) and os.path.isdir(wine_pfx):
print('Path is invalid (does not exist)!')
exit(1)
@ -143,27 +143,29 @@ class LegendaryCLI:
exit(1)
exchange_token = ''
auth_code = ''
if not args.auth_code and not args.session_id and not args.ex_token:
if not args.auth_code and not args.session_id:
# only import here since pywebview import is slow
from legendary.utils.webview_login import webview_available, do_webview_login
if not webview_available or args.no_webview or self.core.webview_killswitch:
# unfortunately the captcha stuff makes a complete CLI login flow kinda impossible right now...
print('Please login via the epic web login!')
url = 'https://legendary.gl/epiclogin'
webbrowser.open(url)
print(f'If the web page did not open automatically, please manually open the following URL: {url}')
auth_code = input('Please enter the "authorizationCode" value from the JSON response: ')
auth_code = auth_code.strip()
if auth_code[0] == '{':
tmp = json.loads(auth_code)
auth_code = tmp['authorizationCode']
webbrowser.open(
'https://www.epicgames.com/id/login?redirectUrl='
'https%3A%2F%2Fwww.epicgames.com%2Fid%2Fapi%2Fredirect'
)
print('If the web page did not open automatically, please manually open the following URL: '
'https://www.epicgames.com/id/login?redirectUrl=https://www.epicgames.com/id/api/redirect')
sid = input('Please enter the "sid" value from the JSON response: ')
sid = sid.strip()
if sid[0] == '{':
tmp = json.loads(sid)
sid = tmp['sid']
else:
auth_code = auth_code.strip('"')
sid = sid.strip('"')
exchange_token = self.core.auth_sid(sid)
else:
if do_webview_login(callback_code=self.core.auth_ex_token,
user_agent=f'EpicGamesLauncher/{self.core.get_egl_version()}'):
if do_webview_login(callback_sid=self.core.auth_sid, callback_code=self.core.auth_code):
logger.info(f'Successfully logged in as "{self.core.lgd.userdata["displayName"]}" via WebView')
else:
logger.error('WebView login attempt failed, please see log for details.')
@ -171,17 +173,13 @@ class LegendaryCLI:
elif args.session_id:
exchange_token = self.core.auth_sid(args.session_id)
elif args.auth_code:
auth_code = args.auth_code
elif args.ex_token:
exchange_token = args.ex_token
exchange_token = args.auth_code
if not exchange_token and not auth_code:
logger.fatal('No exchange token/authorization code, cannot login.')
if not exchange_token:
logger.fatal('No exchange token, cannot login.')
return
if exchange_token and self.core.auth_ex_token(exchange_token):
logger.info(f'Successfully logged in as "{self.core.lgd.userdata["displayName"]}"')
elif auth_code and self.core.auth_code(auth_code):
if self.core.auth_code(exchange_token):
logger.info(f'Successfully logged in as "{self.core.lgd.userdata["displayName"]}"')
else:
logger.error('Login attempt failed, please see log for details.')
@ -248,7 +246,7 @@ class LegendaryCLI:
elif _store:
print(f' ! This game has to be installed through a third-party store ({_store}, not supported)')
else:
print(' ! No version information (unknown cause)')
print(f' ! No version information (unknown cause)')
# Games that have assets, but only require a one-time activation before they can be independently installed
# via a third-party platform (e.g. Uplay)
if game.partner_link_type:
@ -316,7 +314,7 @@ class LegendaryCLI:
print('\nInstalled games:')
for game in games:
if game.install_size == 0 and self.core.lgd.lock_installed():
if game.install_size == 0:
logger.debug(f'Updating missing size for {game.app_name}')
m = self.core.load_manifest(self.core.get_installed_manifest(game.app_name)[0])
game.install_size = sum(fm.file_size for fm in m.file_manifest_list.elements)
@ -372,8 +370,6 @@ class LegendaryCLI:
if args.install_tag:
files = [fm for fm in files if args.install_tag in fm.install_tags]
elif args.install_tag is not None:
files = [fm for fm in files if not fm.install_tags]
if args.hashlist:
for fm in files:
@ -383,16 +379,15 @@ class LegendaryCLI:
writer.writerow(['path', 'hash', 'size', 'install_tags'])
writer.writerows((fm.filename, fm.hash.hex(), fm.file_size, '|'.join(fm.install_tags)) for fm in files)
elif args.json:
_files = [
dict(
_files = []
for fm in files:
_files.append(dict(
filename=fm.filename,
sha_hash=fm.hash.hex(),
install_tags=fm.install_tags,
file_size=fm.file_size,
flags=fm.flags
)
for fm in files
]
flags=fm.flags,
))
return self._print_json(_files, args.pretty_json)
else:
install_tags = set()
@ -418,11 +413,7 @@ class LegendaryCLI:
print('Save games:')
for save in sorted(saves, key=lambda a: a.app_name + a.manifest_name):
if save.app_name != last_app:
if game := self.core.get_game(save.app_name):
game_title = game.app_title
else:
game_title = 'Unknown'
game_title = self.core.get_game(save.app_name).app_title
last_app = save.app_name
print(f'- {game_title} ("{save.app_name}")')
print(' +', save.manifest_name)
@ -438,7 +429,7 @@ class LegendaryCLI:
if not self.core.login():
logger.error('Login failed! Cannot continue with download process.')
exit(1)
logger.info('Cleaning saves...')
logger.info(f'Cleaning saves...')
self.core.clean_saves(self._resolve_aliases(args.app_name), args.delete_incomplete)
def sync_saves(self, args):
@ -456,10 +447,11 @@ class LegendaryCLI:
igames = [igame]
# check available saves
saves = self.core.get_save_games(args.app_name if args.app_name else '')
latest_save = {
save.app_name: save for save in sorted(saves, key=lambda a: a.datetime)
}
saves = self.core.get_save_games()
latest_save = dict()
for save in sorted(saves, key=lambda a: a.datetime):
latest_save[save.app_name] = save
logger.info(f'Got {len(latest_save)} remote save game(s)')
@ -475,19 +467,12 @@ class LegendaryCLI:
logger.info(f'Checking "{igame.title}" ({igame.app_name})')
# override save path only if app name is specified
if args.app_name and args.save_path:
if not self.core.lgd.lock_installed():
logger.error('Unable to lock install data, cannot modify save path.')
break
logger.info(f'Overriding save path with "{args.save_path}"...')
igame.save_path = args.save_path
self.core.lgd.set_installed_game(igame.app_name, igame)
# if there is no saved save path, try to get one, skip if we cannot get a install data lock
if not igame.save_path and self.core.lgd.lock_installed():
if args.yes and not args.accept_path:
logger.info('Save path for this title has not been set, skipping due to --yes')
continue
# if there is no saved save path, try to get one
if not igame.save_path:
save_path = self.core.get_save_path(igame.app_name, platform=igame.platform)
# ask user if path is correct if computing for the first time
@ -496,11 +481,6 @@ class LegendaryCLI:
if '%' in save_path or '{' in save_path:
logger.warning('Path contains unprocessed variables, please enter the correct path manually.')
yn = False
# When accept_path is set we don't want to fall back to interactive mode
if args.accept_path:
continue
elif args.accept_path:
yn = True
else:
yn = get_boolean_choice('Is this correct?')
@ -568,7 +548,6 @@ class LegendaryCLI:
def launch_game(self, args, extra):
app_name = self._resolve_aliases(args.app_name)
addon_app_name = None
# Interactive CrossOver setup
if args.crossover and sys_platform == 'darwin':
@ -579,19 +558,12 @@ class LegendaryCLI:
return self._launch_origin(args)
igame = self.core.get_installed_game(app_name)
if (not igame or not igame.executable) and (game := self.core.get_game(app_name)) is not None:
# override installed game with base title
if game.is_launchable_addon:
addon_app_name = app_name
app_name = game.metadata['mainGameItem']['releaseInfo'][0]['appId']
igame = self.core.get_installed_game(app_name)
if not igame:
logger.error(f'Game {app_name} is not currently installed!')
exit(1)
if igame.is_dlc and not igame.executable:
logger.error(f'{app_name} is DLC without an executable; please launch the base game instead!')
if igame.is_dlc:
logger.error(f'{app_name} is DLC; please launch the base game instead!')
exit(1)
if not os.path.exists(igame.install_path):
@ -630,8 +602,7 @@ class LegendaryCLI:
disable_wine=args.no_wine,
executable_override=args.executable_override,
crossover_app=args.crossover_app,
crossover_bottle=args.crossover_bottle,
addon_app_name=addon_app_name)
crossover_bottle=args.crossover_bottle)
if args.set_defaults:
self.core.lgd.config[app_name] = dict()
@ -672,7 +643,7 @@ class LegendaryCLI:
full_env = os.environ.copy()
full_env.update(params.environment)
if 'CX_BOTTLE' in full_env and any('SharedSupport/CrossOver' in p for p in params.launch_command):
if 'CX_BOTTLE' in full_env:
# if using crossover, unset WINEPREFIX
full_env.pop('WINEPREFIX', None)
# check that bottle is valid, show error otherwise
@ -684,8 +655,6 @@ class LegendaryCLI:
else:
logger.error(f'Specified CrossOver bottle {bottle_name} does not exist, cannot launch.')
exit(1)
else:
logger.info(f'Using CrossOver Bottle "{bottle_name}"')
if args.dry_run:
logger.info(f'Not Launching {app_name} (dry run)')
@ -771,7 +740,7 @@ class LegendaryCLI:
full_env = os.environ.copy()
full_env.update(env)
if 'CX_BOTTLE' in full_env and any('SharedSupport/CrossOver' in p for p in command):
if 'CX_BOTTLE' in full_env:
# if using crossover, unset WINEPREFIX
full_env.pop('WINEPREFIX', None)
# check that bottle is valid, show error otherwise
@ -783,16 +752,12 @@ class LegendaryCLI:
else:
logger.error(f'Specified CrossOver bottle {bottle_name} does not exist, cannot launch.')
exit(1)
else:
logger.info(f'Using CrossOver Bottle "{bottle_name}"')
if not command:
logger.error(f'In order to launch Origin correctly you must specify a prefix and wine binary or '
f'wrapper in the configuration file or command line. See the README for details.')
return
# You cannot launch a URI without start.exe
command.append('start')
command.append(origin_uri)
if args.dry_run:
if cmd:
@ -813,11 +778,6 @@ class LegendaryCLI:
subprocess.Popen(command, env=full_env)
def install_game(self, args):
if not self.core.lgd.lock_installed():
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
'install/import/move applications at a time.')
return
args.app_name = self._resolve_aliases(args.app_name)
if self.core.is_installed(args.app_name):
igame = self.core.get_installed_game(args.app_name)
@ -881,7 +841,7 @@ class LegendaryCLI:
base_game = self.core.get_game(app_name)
# check if base_game is actually installed
if not self.core.is_installed(app_name):
# download mode doesn't care about whether something's installed
# download mode doesn't care about whether or not something's installed
if not args.no_install:
logger.fatal(f'Base game "{app_name}" is not installed!')
exit(1)
@ -900,13 +860,7 @@ class LegendaryCLI:
print('Aborting...')
exit(0)
try:
self.verify_game(args, print_command=False, repair_mode=True, repair_online=args.repair_and_update)
except ValueError:
logger.error('To repair a game with a missing manifest you must run the command with '
'"--repair-and-update". However this will redownload any file that does '
'not match the current hash in its entirety.')
return
self.verify_game(args, print_command=False)
else:
logger.info(f'Using existing repair file: {repair_file}')
@ -923,7 +877,7 @@ class LegendaryCLI:
if config_tags:
self.core.lgd.config.remove_option(game.app_name, 'install_tags')
config_tags = None
self.core.lgd.config.set(game.app_name, 'disable_sdl', 'true')
self.core.lgd.config.set(game.app_name, 'disable_sdl', True)
sdl_enabled = False
# just disable SDL, but keep config tags that have been manually specified
elif config_disable_sdl or args.disable_sdl:
@ -977,10 +931,9 @@ class LegendaryCLI:
disable_delta=args.disable_delta,
override_delta_manifest=args.override_delta_manifest,
preferred_cdn=args.preferred_cdn,
disable_https=args.disable_https,
bind_ip=args.bind_ip)
disable_https=args.disable_https)
# game is either up-to-date or hasn't changed, so we have nothing to do
# game is either up to date or hasn't changed, so we have nothing to do
if not analysis.dl_size:
old_igame = self.core.get_installed_game(game.app_name)
logger.info('Download size is 0, the game is either already up to date or has not changed. Exiting...')
@ -999,15 +952,6 @@ class LegendaryCLI:
self.core.uninstall_tag(old_igame)
self.core.install_game(old_igame)
if old_igame.install_tags:
self.core.lgd.config.set(game.app_name, 'install_tags', ','.join(old_igame.install_tags))
self.core.lgd.save_config()
# check if the version changed, this can happen for DLC that gets a version bump with no actual file changes
if old_igame and old_igame.version != igame.version:
old_igame.version = igame.version
self.core.install_game(old_igame)
exit(0)
logger.info(f'Install size: {analysis.install_size / 1024 / 1024:.02f} MiB')
@ -1039,9 +983,6 @@ class LegendaryCLI:
logger.fatal('Installation cannot proceed, exiting.')
exit(1)
if tip_url := self.core.get_game_tip(igame.app_name):
print(f'\nThis game may have compatibility issues or require additional setup, see: {tip_url}\n')
if not args.yes:
if not get_boolean_choice(f'Do you wish to install "{igame.title}"?'):
print('Aborting...')
@ -1071,7 +1012,7 @@ class LegendaryCLI:
postinstall = self.core.install_game(igame)
if postinstall:
self._handle_postinstall(postinstall, igame, skip_prereqs=args.yes)
self._handle_postinstall(postinstall, igame, yes=args.yes)
dlcs = self.core.get_dlc_for_game(game.app_name)
if dlcs and not args.skip_dlcs:
@ -1103,7 +1044,6 @@ class LegendaryCLI:
logger.info(f'This game supports cloud saves, syncing is handled by the "sync-saves" command. '
f'To download saves for this game run "legendary sync-saves {args.app_name}"')
# show tip again after installation finishes so users hopefully actually see it
if tip_url := self.core.get_game_tip(igame.app_name):
print(f'\nThis game may require additional setup, see: {tip_url}\n')
@ -1125,13 +1065,13 @@ class LegendaryCLI:
logger.info(f'Finished installation process in {end_t - start_t:.02f} seconds.')
def _handle_postinstall(self, postinstall, igame, skip_prereqs=False):
print('\nThis game lists the following prerequisites to be installed:')
def _handle_postinstall(self, postinstall, igame, yes=False):
print('\nThis game lists the following prequisites to be installed:')
print(f'- {postinstall["name"]}: {" ".join((postinstall["path"], postinstall["args"]))}')
print('')
if os.name == 'nt':
if skip_prereqs:
if yes:
c = 'n' # we don't want to launch anything, just silent install.
else:
choice = input('Do you wish to install the prerequisites? ([y]es, [n]o, [i]gnore): ')
@ -1156,11 +1096,6 @@ class LegendaryCLI:
logger.info('Automatic installation not available on Linux.')
def uninstall_game(self, args):
if not self.core.lgd.lock_installed():
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
'install/import/move applications at a time.')
return
args.app_name = self._resolve_aliases(args.app_name)
igame = self.core.get_installed_game(args.app_name)
if not igame:
@ -1172,9 +1107,6 @@ class LegendaryCLI:
print('Aborting...')
exit(0)
if os.name == 'nt' and igame.uninstaller and not args.skip_uninstaller:
self._handle_uninstaller(igame, args.yes)
try:
if not igame.is_dlc:
# Remove DLC first so directory is empty when game uninstall runs
@ -1191,24 +1123,7 @@ class LegendaryCLI:
except Exception as e:
logger.warning(f'Removing game failed: {e!r}, please remove {igame.install_path} manually.')
def _handle_uninstaller(self, igame, yes=False):
uninstaller = igame.uninstaller
print('\nThis game provides the following uninstaller:')
print(f'- {uninstaller["path"]} {uninstaller["args"]}\n')
if yes or get_boolean_choice('Do you wish to run the uninstaller?', default=True):
logger.info('Running uninstaller...')
req_path, req_exec = os.path.split(uninstaller['path'])
work_dir = os.path.join(igame.install_path, req_path)
fullpath = os.path.join(work_dir, req_exec)
try:
p = subprocess.Popen([fullpath, uninstaller['args']], cwd=work_dir, shell=True)
p.wait()
except Exception as e:
logger.error(f'Failed to run uninstaller: {e!r}')
def verify_game(self, args, print_command=True, repair_mode=False, repair_online=False):
def verify_game(self, args, print_command=True):
args.app_name = self._resolve_aliases(args.app_name)
if not self.core.is_installed(args.app_name):
logger.error(f'Game "{args.app_name}" is not installed')
@ -1223,28 +1138,13 @@ class LegendaryCLI:
return
manifest_data, _ = self.core.get_installed_manifest(args.app_name)
if manifest_data is None:
if repair_mode:
if not repair_online:
logger.critical('No manifest could be loaded, the manifest file may be missing!')
raise ValueError('Local manifest is missing')
logger.warning('No manifest could be loaded, the file may be missing. Downloading the latest manifest.')
game = self.core.get_game(args.app_name, platform=igame.platform)
manifest_data, _ = self.core.get_cdn_manifest(game, igame.platform)
else:
logger.critical(f'Manifest appears to be missing! To repair, run "legendary repair '
f'{args.app_name} --repair-and-update", this will however redownload all files '
f'that do not match the latest manifest in their entirety.')
return
manifest = self.core.load_manifest(manifest_data)
files = sorted(manifest.file_manifest_list.elements,
key=lambda a: a.filename.lower())
# build list of hashes
if (config_tags := self.core.lgd.config.get(args.app_name, 'install_tags', fallback=None)) is not None:
if config_tags := self.core.lgd.config.get(args.app_name, 'install_tags', fallback=None):
install_tags = set(i.strip() for i in config_tags.split(','))
file_list = [
(f.filename, f.sha_hash.hex())
@ -1272,7 +1172,7 @@ class LegendaryCLI:
percentage = (processed / total_size) * 100.0
num += 1
if (delta := ((current_time := time.time()) - last_update)) > 1:
if (delta := ((current_time := time.time()) - last_update)) > 1 or not last_processed:
last_update = current_time
speed = (processed - last_processed) / 1024 / 1024 / delta
last_processed = processed
@ -1311,11 +1211,6 @@ class LegendaryCLI:
logger.info(f'Run "legendary repair {args.app_name}" to repair your game installation.')
def import_game(self, args):
if not self.core.lgd.lock_installed():
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
'install/import/move applications at a time.')
return
# make sure path is absolute
args.app_path = os.path.abspath(args.app_path)
args.app_name = self._resolve_aliases(args.app_name)
@ -1354,8 +1249,6 @@ class LegendaryCLI:
# get everything needed for import from core, then run additional checks.
manifest, igame = self.core.import_game(game, args.app_path, platform=args.platform)
exe_path = os.path.join(args.app_path, manifest.meta.launch_exe.lstrip('/'))
if os.name != 'nt':
exe_path = case_insensitive_file_search(exe_path)
# check if most files at least exist or if user might have specified the wrong directory
total = len(manifest.file_manifest_list.elements)
found = sum(os.path.exists(os.path.join(args.app_path, f.filename))
@ -1411,11 +1304,6 @@ class LegendaryCLI:
logger.info(f'{"DLC" if game.is_dlc else "Game"} "{game.app_title}" has been imported.')
def egs_sync(self, args):
if not self.core.lgd.lock_installed():
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
'install/import/move applications at a time.')
return
if args.unlink:
logger.info('Unlinking and resetting EGS and LGD sync...')
self.core.lgd.config.remove_option('Legendary', 'egl_programdata')
@ -1657,7 +1545,7 @@ class LegendaryCLI:
else:
logger.info('Game not installed and offline mode enabled, cannot load manifest.')
elif game:
entitlements = self.core.egs.get_user_entitlements_full()
entitlements = self.core.egs.get_user_entitlements()
egl_meta = self.core.egs.get_game_info(game.namespace, game.catalog_item_id)
game.metadata = egl_meta
# Get manifest if asset exists for current platform
@ -1695,7 +1583,7 @@ class LegendaryCLI:
# Find custom launch options, if available
launch_options = []
i = 1
while f'extraLaunchOption_{i:03d}_Name' in game.metadata.get('customAttributes', {}):
while f'extraLaunchOption_{i:03d}_Name' in game.metadata['customAttributes']:
launch_options.append((
game.metadata['customAttributes'][f'extraLaunchOption_{i:03d}_Name']['value'],
game.metadata['customAttributes'][f'extraLaunchOption_{i:03d}_Args']['value']
@ -1713,9 +1601,6 @@ class LegendaryCLI:
else:
game_infos.append(InfoItem('Extra launch options', 'launch_options', None, []))
game_infos.append(InfoItem('Command Line', 'command_line', game.additional_command_line,
game.additional_command_line))
# list all owned DLC based on entitlements
if entitlements and not game.is_dlc:
owned_entitlements = {i['entitlementName'] for i in entitlements}
@ -1726,18 +1611,18 @@ class LegendaryCLI:
if dlc['entitlementName'] in owned_entitlements:
owned_dlc.append((installable, None, dlc['title'], dlc['id']))
elif installable:
dlc_app_name = dlc['releaseInfo'][0]['appId']
if dlc_app_name in owned_app_names:
owned_dlc.append((installable, dlc_app_name, dlc['title'], dlc['id']))
app_name = dlc['releaseInfo'][0]['appId']
if app_name in owned_app_names:
owned_dlc.append((installable, app_name, dlc['title'], dlc['id']))
if owned_dlc:
human_list = []
json_list = []
for installable, dlc_app_name, title, dlc_id in owned_dlc:
json_list.append(dict(app_name=dlc_app_name, title=title,
for installable, app_name, title, dlc_id in owned_dlc:
json_list.append(dict(app_name=app_name, title=title,
installable=installable, id=dlc_id))
if installable:
human_list.append(f'App name: {dlc_app_name}, Title: "{title}"')
human_list.append(f'App name: {app_name}, Title: "{title}"')
else:
human_list.append(f'Title: "{title}" (no installation required)')
game_infos.append(InfoItem('Owned DLC', 'owned_dlc', human_list, json_list))
@ -1822,17 +1707,6 @@ class LegendaryCLI:
else:
manifest_info.append(InfoItem('Prerequisites', 'prerequisites', None, None))
if manifest.meta.uninstall_action_path:
human_list = [
f'Uninstaller path: {manifest.meta.uninstall_action_path}',
f'Uninstaller args: {manifest.meta.uninstall_action_args or "(None)"}',
]
manifest_info.append(InfoItem('Uninstaller', 'uninstaller', human_list,
dict(path=manifest.meta.uninstall_action_path,
args=manifest.meta.uninstall_action_args)))
else:
manifest_info.append(InfoItem('Uninstaller', 'uninstaller', None, None))
install_tags = {''}
for fm in manifest.file_manifest_list.elements:
for tag in fm.install_tags:
@ -2011,8 +1885,8 @@ class LegendaryCLI:
if not args.keep_manifests:
logger.debug('Removing manifests...')
installed = [(ig.app_name, ig.version, ig.platform) for ig in self.core.get_installed_list()]
installed.extend((ig.app_name, ig.version, ig.platform) for ig in self.core.get_installed_dlc_list())
installed = [(ig.app_name, ig.version) for ig in self.core.get_installed_list()]
installed.extend((ig.app_name, ig.version) for ig in self.core.get_installed_dlc_list())
self.core.lgd.clean_manifests(installed)
logger.debug('Removing tmp data')
@ -2046,7 +1920,7 @@ class LegendaryCLI:
redeemed = {k['gameId'] for k in key_list if k['redeemedOnUplay']}
games = self.core.get_game_list()
entitlements = self.core.egs.get_user_entitlements_full()
entitlements = self.core.egs.get_user_entitlements()
owned_entitlements = {i['entitlementName'] for i in entitlements}
uplay_games = []
@ -2127,10 +2001,6 @@ class LegendaryCLI:
na_games, _ = self.core.get_non_asset_library_items(skip_ue=True)
origin_games = [game for game in na_games if game.third_party_store == 'Origin']
if not origin_games:
logger.info('No redeemable games found.')
return
logger.info(f'Found {len(origin_games)} game(s) to redeem:')
for game in origin_games:
logger.info(f' - {game.app_title}')
@ -2207,6 +2077,14 @@ class LegendaryCLI:
args.prefix = self.core.lgd.config.get(f'{app_name}.env', 'WINEPREFIX', fallback=None)
args.prefix = self.core.lgd.config.get(app_name, 'wine_prefix', fallback=args.prefix)
if not args.prefix and not args.bottle:
# try using defaults if they exist
if sys_platform == 'darwin':
args.bottle = self.core.lgd.config.get('default', 'crossover_bottle', fallback=None)
args.prefix = self.core.lgd.config.get('default.env', 'WINEPREFIX', fallback=None)
args.prefix = self.core.lgd.config.get('default', 'wine_prefix', fallback=args.prefix)
if sys_platform == 'darwin' and args.bottle:
if not mac_is_valid_bottle(args.bottle):
logger.error('Invalid bottle specified.')
@ -2217,18 +2095,19 @@ class LegendaryCLI:
logger.error(f'Prefix "{args.prefix}" does not exist.')
return
prefix = args.prefix
elif args.action not in {'info', 'install', 'remove', 'update'}:
logger.error('Need either --prefix, --bottle, or --app for this command.')
else:
logger.error('Need either config default, --prefix, --bottle, or --app to install the overlay to.')
return
if prefix:
if not os.path.exists(prefix):
logger.error(f'Prefix "{prefix}" does not exist.')
return
else:
logger.info(f'Using prefix "{prefix}"')
if not os.path.exists(prefix):
logger.error(f'Prefix "{prefix}" does not exist.')
return
else:
logger.info(f'Using prefix "{prefix}"')
if args.action == 'info':
reg_paths = query_registry_entries(prefix)
available_installs = self.core.search_overlay_installs(prefix)
igame = self.core.lgd.get_overlay_install_info()
if not igame:
logger.info('No Legendary-managed installation found.')
@ -2236,11 +2115,6 @@ class LegendaryCLI:
logger.info(f'Installed version: {igame.version}')
logger.info(f'Installed path: {igame.install_path}')
if os.name != 'nt' and not prefix:
return
reg_paths = query_registry_entries(prefix)
available_installs = self.core.search_overlay_installs(prefix)
logger.info('Found available Overlay installations in:')
for install in available_installs:
logger.info(f' - {install}')
@ -2312,19 +2186,15 @@ class LegendaryCLI:
print('Aborting...')
return
logger.info('Removing registry entries...')
remove_registry_entries(prefix)
if os.name != 'nt':
logger.info(f'Registry entries in prefixes other than "{prefix}" were not removed. '
f'This shoouldn\'t cause any issues as the overlay will simply fail to load.')
logger.info('Deleting overlay installation...')
self.core.remove_overlay_install()
if os.name != 'nt' and not prefix:
logger.info('Registry entries in prefixes (if any) have not been removed. '
f'This shouldn\'t cause any issues as the overlay will simply fail to load.')
else:
logger.info('Removing registry entries...')
remove_registry_entries(prefix)
if os.name != 'nt':
logger.info(f'Registry entries in prefixes other than "{prefix}" were not removed. '
f'This shouldn\'t cause any issues as the overlay will simply fail to load.')
logger.info('Done.')
elif args.action in {'install', 'update'}:
@ -2357,25 +2227,21 @@ class LegendaryCLI:
logger.error(f'The following exception occurred while waiting for the downloader to finish: {e!r}. '
f'Try restarting the process, if it continues to fail please open an issue on GitHub.')
else:
logger.info('Finished downloading, setting up overlay...')
self.core.finish_overlay_install(igame)
if os.name == 'nt' or prefix:
logger.info('Finished downloading, setting up overlay...')
# Check for existing registry entries, and remove them if necessary
install_path = os.path.normpath(igame.install_path)
reg_paths = query_registry_entries(prefix)
if old_path := reg_paths["overlay_path"]:
if os.path.normpath(old_path) != install_path:
logger.info(f'Updating overlay registry entries from "{old_path}" to "{install_path}"')
remove_registry_entries(prefix)
else:
logger.info(f'Registry entries already exist. Done.')
return
add_registry_entries(install_path, prefix)
logger.info('Done.')
else:
logger.info('Overlay has been downloaded. Run "legendary eos-overlay enable -h" to see '
'available options for enabling the overlay by specifying a prefix, app, or bottle.')
# Check for existing registry entries, and remove them if necessary
install_path = os.path.normpath(igame.install_path)
reg_paths = query_registry_entries(prefix)
if old_path := reg_paths["overlay_path"]:
if os.path.normpath(old_path) != install_path:
logger.info(f'Updating overlay registry entries from "{old_path}" to "{install_path}"')
remove_registry_entries(prefix)
else:
logger.info(f'Registry entries already exist. Done.')
return
add_registry_entries(install_path, prefix)
logger.info('Done.')
def crossover_setup(self, args):
if sys_platform != 'darwin':
@ -2585,11 +2451,6 @@ class LegendaryCLI:
logger.info('Saved choices to configuration.')
def move(self, args):
if not self.core.lgd.lock_installed():
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
'install/import/move applications at a time.')
return
app_name = self._resolve_aliases(args.app_name)
igame = self.core.get_installed_game(app_name, skip_sync=True)
if not igame:
@ -2609,14 +2470,14 @@ class LegendaryCLI:
except Exception as e:
if isinstance(e, OSError) and e.errno == 18:
logger.error(f'Moving to a different drive is not supported. Move the folder manually to '
f'"{new_path}" and run "legendary move {app_name} "{args.new_path}" --skip-move"')
f'"{new_path}" and then run "legendary {app_name} "{args.new_path}" --skip-move"')
elif isinstance(e, FileExistsError):
logger.error(f'The target path already contains a folder called "{game_folder}", '
f'please remove or rename it first.')
else:
logger.error(f'Moving failed with unknown error {e!r}.')
logger.info(f'Try moving the folder manually to "{new_path}" and running '
f'"legendary move {app_name} "{args.new_path}" --skip-move"')
f'"legendary {app_name} "{args.new_path}" --skip-move"')
return
else:
logger.info(f'Not moving, just rewriting legendary metadata...')
@ -2627,10 +2488,6 @@ class LegendaryCLI:
def main():
# Set output encoding to UTF-8 if not outputting to a terminal
if not stdout.isatty():
stdout.reconfigure(encoding='utf-8')
parser = argparse.ArgumentParser(description=f'Legendary v{__version__} - "{__codename__}"')
parser.register('action', 'parsers', HiddenAliasSubparsersAction)
@ -2722,10 +2579,8 @@ def main():
# Flags
auth_parser.add_argument('--import', dest='import_egs_auth', action='store_true',
help='Import Epic Games Launcher authentication data (logs out of EGL)')
auth_parser.add_argument('--code', dest='auth_code', action='store', metavar='<authorization code>',
help='Use specified authorization code instead of interactive authentication')
auth_parser.add_argument('--token', dest='ex_token', action='store', metavar='<exchange token>',
help='Use specified exchange token instead of interactive authentication')
auth_parser.add_argument('--code', dest='auth_code', action='store', metavar='<exchange code>',
help='Use specified exchange code instead of interactive authentication')
auth_parser.add_argument('--sid', dest='session_id', action='store', metavar='<session id>',
help='Use specified session id instead of interactive authentication')
auth_parser.add_argument('--delete', dest='auth_delete', action='store_true',
@ -2796,13 +2651,9 @@ def main():
help='Automatically install all DLCs with the base game')
install_parser.add_argument('--skip-dlcs', dest='skip_dlcs', action='store_true',
help='Do not ask about installing DLCs.')
install_parser.add_argument('--bind', dest='bind_ip', action='store', metavar='<IPs>', type=str,
help='Comma-separated list of IPs to bind to for downloading')
uninstall_parser.add_argument('--keep-files', dest='keep_files', action='store_true',
help='Keep files but remove game from Legendary database')
uninstall_parser.add_argument('--skip-uninstaller', dest='skip_uninstaller', action='store_true',
help='Skip running the uninstaller')
launch_parser.add_argument('--offline', dest='offline', action='store_true',
default=False, help='Skip login and launch game without online authentication')
@ -2910,8 +2761,6 @@ def main():
help='Override savegame path (requires single app name to be specified)')
sync_saves_parser.add_argument('--disable-filters', dest='disable_filters', action='store_true',
help='Disable save game file filtering')
sync_saves_parser.add_argument('--accept-path', dest='accept_path', action='store_true',
help=argparse.SUPPRESS)
clean_saves_parser.add_argument('--delete-incomplete', dest='delete_incomplete', action='store_true',
help='Delete incomplete save files')
@ -3033,12 +2882,6 @@ def main():
continue
print(f'\nCommand: {choice}')
print(subparser.format_help())
elif os.name == 'nt':
from legendary.lfs.windows_helpers import double_clicked
if double_clicked():
print('Please note that this is not the intended way to run Legendary.')
print('Follow https://github.com/derrod/legendary/wiki/Setup-Instructions to set it up properly')
subprocess.Popen(['cmd', '/K', 'echo>nul'])
return
cli = LegendaryCLI(override_config=args.config_file, api_timeout=args.api_timeout)

View file

@ -1,6 +1,8 @@
# coding: utf-8
import json
import logging
import os
import shlex
import shutil
@ -13,10 +15,10 @@ from locale import getdefaultlocale
from multiprocessing import Queue
from platform import system
from requests import session
from requests.exceptions import HTTPError, ConnectionError
from requests.exceptions import HTTPError
from sys import platform as sys_platform
from uuid import uuid4
from urllib.parse import urlencode, parse_qsl, urlparse
from urllib.parse import urlencode, parse_qsl
from legendary import __version__
from legendary.api.egs import EPCAPI
@ -24,7 +26,7 @@ from legendary.api.lgd import LGDAPI
from legendary.downloader.mp.manager import DLManager
from legendary.lfs.egl import EPCLFS
from legendary.lfs.lgndry import LGDLFS
from legendary.lfs.utils import clean_filename, delete_folder, delete_filelist, get_dir_size
from legendary.utils.lfs import clean_filename, delete_folder, delete_filelist, get_dir_size
from legendary.models.downloading import AnalysisResult, ConditionCheckResult
from legendary.models.egl import EGLManifest
from legendary.models.exceptions import *
@ -32,14 +34,15 @@ from legendary.models.game import *
from legendary.models.json_manifest import JSONManifest
from legendary.models.manifest import Manifest, ManifestMeta
from legendary.models.chunk import Chunk
from legendary.lfs.crossover import *
from legendary.utils.crossover import *
from legendary.utils.egl_crypt import decrypt_epic_data
from legendary.utils.env import is_windows_mac_or_pyi
from legendary.lfs.eos import EOSOverlayApp, query_registry_entries
from legendary.utils.game_workarounds import is_opt_enabled, update_workarounds, get_exe_override
from legendary.utils.eos import EOSOverlayApp, query_registry_entries
from legendary.utils.game_workarounds import is_opt_enabled, update_workarounds
from legendary.utils.savegame_helper import SaveGameHelper
from legendary.utils.selective_dl import games as sdl_games
from legendary.lfs.wine_helpers import read_registry, get_shell_folders, case_insensitive_path_search
from legendary.utils.manifests import combine_manifests
from legendary.utils.wine_helpers import read_registry, get_shell_folders, case_insensitive_path_search
# ToDo: instead of true/false return values for success/failure actually raise an exception that the CLI/GUI
@ -74,8 +77,7 @@ class LegendaryCore:
self.local_timezone = datetime.now().astimezone().tzinfo
self.language_code, self.country_code = ('en', 'US')
if locale := self.lgd.config.get('Legendary', 'locale',
fallback=getdefaultlocale(('LANG', 'LANGUAGE', 'LC_ALL', 'LC_CTYPE'))[0]):
if locale := self.lgd.config.get('Legendary', 'locale', fallback=getdefaultlocale()[0]):
try:
self.language_code, self.country_code = locale.split('-' if '-' in locale else '_')
self.log.debug(f'Set locale to {self.language_code}-{self.country_code}')
@ -84,7 +86,7 @@ class LegendaryCore:
except Exception as e:
self.log.warning(f'Getting locale failed: {e!r}, falling back to using en-US.')
elif system() != 'Darwin': # macOS doesn't have a default locale we can query
self.log.warning('Could not determine locale, falling back to en-US')
self.log.warning(f'Could not determine locale, falling back to en-US')
self.update_available = False
self.force_show_update = False
@ -92,6 +94,16 @@ class LegendaryCore:
self.overlay_update_available = False
self.logged_in = False
def auth(self, username, password):
"""
Attempts direct non-web login, raises CaptchaError if manual login is required
:param username:
:param password:
:return:
"""
raise NotImplementedError
def auth_sid(self, sid) -> str:
"""
Handles getting an exchange code from a session id
@ -122,29 +134,16 @@ class LegendaryCore:
if r.status_code == 200:
return r.json()['code']
self.log.error(f'Getting exchange code failed: {r.json()}')
return ''
else:
self.log.error(f'Getting exchange code failed: {r.json()}')
return ''
def auth_code(self, code) -> bool:
"""
Handles authentication via authorization code (either retrieved manually or automatically)
Handles authentication via exchange code (either retrieved manually or automatically)
"""
try:
with self.lgd.userdata_lock as lock:
lock.data = self.egs.start_session(authorization_code=code)
return True
except Exception as e:
self.log.error(f'Logging in failed with {e!r}, please try again.')
return False
def auth_ex_token(self, code) -> bool:
"""
Handles authentication via exchange token (either retrieved manually or automatically)
"""
try:
with self.lgd.userdata_lock as lock:
lock.data = self.egs.start_session(exchange_token=code)
self.lgd.userdata = self.egs.start_session(exchange_token=code)
return True
except Exception as e:
self.log.error(f'Logging in failed with {e!r}, please try again.')
@ -173,23 +172,22 @@ class LegendaryCore:
raise ValueError('No login session in config')
refresh_token = re_data['Token']
try:
with self.lgd.userdata_lock as lock:
lock.data = self.egs.start_session(refresh_token=refresh_token)
self.lgd.userdata = self.egs.start_session(refresh_token=refresh_token)
return True
except Exception as e:
self.log.error(f'Logging in failed with {e!r}, please try again.')
return False
def _login(self, lock, force_refresh=False) -> bool:
def login(self, force_refresh=False) -> bool:
"""
Attempts logging in with existing credentials.
raises ValueError if no existing credentials or InvalidCredentialsError if the API return an error
"""
if not lock.data:
if not self.lgd.userdata:
raise ValueError('No saved credentials')
elif self.logged_in and lock.data['expires_at']:
dt_exp = datetime.fromisoformat(lock.data['expires_at'][:-1])
elif self.logged_in and self.lgd.userdata['expires_at']:
dt_exp = datetime.fromisoformat(self.lgd.userdata['expires_at'][:-1])
dt_now = datetime.utcnow()
td = dt_now - dt_exp
@ -215,8 +213,8 @@ class LegendaryCore:
except Exception as e:
self.log.warning(f'Checking for EOS Overlay updates failed: {e!r}')
if lock.data['expires_at'] and not force_refresh:
dt_exp = datetime.fromisoformat(lock.data['expires_at'][:-1])
if self.lgd.userdata['expires_at'] and not force_refresh:
dt_exp = datetime.fromisoformat(self.lgd.userdata['expires_at'][:-1])
dt_now = datetime.utcnow()
td = dt_now - dt_exp
@ -224,7 +222,7 @@ class LegendaryCore:
if dt_exp > dt_now and abs(td.total_seconds()) > 600:
self.log.info('Trying to re-use existing login session...')
try:
self.egs.resume_session(lock.data)
self.egs.resume_session(self.lgd.userdata)
self.logged_in = True
return True
except InvalidCredentialsError as e:
@ -236,23 +234,19 @@ class LegendaryCore:
try:
self.log.info('Logging in...')
userdata = self.egs.start_session(lock.data['refresh_token'])
userdata = self.egs.start_session(self.lgd.userdata['refresh_token'])
except InvalidCredentialsError:
self.log.error('Stored credentials are no longer valid! Please login again.')
lock.clear()
self.lgd.invalidate_userdata()
return False
except (HTTPError, ConnectionError) as e:
except HTTPError as e:
self.log.error(f'HTTP request for login failed: {e!r}, please try again later.')
return False
lock.data = userdata
self.lgd.userdata = userdata
self.logged_in = True
return True
def login(self, force_refresh=False) -> bool:
with self.lgd.userdata_lock as lock:
return self._login(lock, force_refresh=force_refresh)
def update_check_enabled(self):
return not self.lgd.config.getboolean('Legendary', 'disable_update_check', fallback=False)
@ -280,10 +274,10 @@ class LegendaryCore:
"""Applies configuration options returned by update API"""
if not version_info:
version_info = self.lgd.get_cached_version()['data']
# if cached data is invalid
if not version_info:
self.log.debug('No cached legendary config to apply.')
return
# if cached data is invalid
if not version_info:
self.log.debug('No cached legendary config to apply.')
return
if 'egl_config' in version_info:
self.egs.update_egs_params(version_info['egl_config'])
@ -301,9 +295,6 @@ class LegendaryCore:
if lgd_config := version_info.get('legendary_config'):
self.webview_killswitch = lgd_config.get('webview_killswitch', False)
def get_egl_version(self):
return self._egl_version
def get_update_info(self):
return self.lgd.get_cached_version()['data'].get('release_info')
@ -351,7 +342,10 @@ class LegendaryCore:
if not self.egs.user:
return []
assets = self.lgd.assets.copy() if self.lgd.assets else dict()
if self.lgd.assets:
assets = self.lgd.assets.copy()
else:
assets = dict()
assets.update({
platform: [
@ -449,10 +443,7 @@ class LegendaryCore:
game = Game(app_name=app_name, app_title=eg_meta['title'], metadata=eg_meta, asset_infos=assets[app_name])
self.lgd.set_game_meta(game.app_name, game)
games[app_name] = game
try:
still_needs_update.remove(app_name)
except KeyError:
pass
still_needs_update.remove(app_name)
# setup and teardown of thread pool takes some time, so only do it when it makes sense.
still_needs_update = {e[0] for e in fetch_list}
@ -476,7 +467,7 @@ class LegendaryCore:
fetch_game_meta((app_name, _ga.namespace, _ga.catalog_item_id))
game = games[app_name]
if game.is_dlc and platform in app_assets:
if game.is_dlc:
_dlc[game.metadata['mainGameItem']['id']].append(game)
elif not any(i['path'] == 'mods' for i in game.metadata.get('categories', [])) and platform in app_assets:
_ret.append(game)
@ -520,16 +511,12 @@ class LegendaryCore:
_dlc = defaultdict(list)
# get all the appnames we have to ignore
ignore = set(i.app_name for i in self.get_assets())
# broken old app name that we should always ignore
ignore |= {'1'}
for libitem in self.egs.get_library_items():
if libitem['namespace'] == 'ue' and skip_ue:
continue
if libitem['appName'] in ignore:
continue
if libitem['sandboxType'] == 'PRIVATE':
continue
game = self.lgd.get_game_meta(libitem['appName'])
if not game or force_refresh:
@ -592,17 +579,9 @@ class LegendaryCore:
# get environment overrides from config
env = dict()
if 'default.env' in self.lgd.config:
env |= {
k: v
for k, v in self.lgd.config['default.env'].items()
if v and not k.startswith(';')
}
env.update({k: v for k, v in self.lgd.config[f'default.env'].items() if v and not k.startswith(';')})
if f'{app_name}.env' in self.lgd.config:
env |= {
k: v
for k, v in self.lgd.config[f'{app_name}.env'].items()
if v and not k.startswith(';')
}
env.update({k: v for k, v in self.lgd.config[f'{app_name}.env'].items() if v and not k.startswith(';')})
if disable_wine:
return env
@ -618,9 +597,7 @@ class LegendaryCore:
env['CX_BOTTLE'] = cx_bottle
else:
cx_bottle = os.environ['CX_BOTTLE']
if cx_bottle:
self.log.debug(f'Using CrossOver Bottle "{cx_bottle}"')
self.log.info(f'Using CrossOver Bottle "{cx_bottle}"')
if wine_pfx:
env['WINEPREFIX'] = wine_pfx
@ -689,10 +666,9 @@ class LegendaryCore:
disable_wine: bool = False,
executable_override: str = None,
crossover_app: str = None,
crossover_bottle: str = None,
addon_app_name: str = None) -> LaunchParameters:
crossover_bottle: str = None) -> LaunchParameters:
install = self.lgd.get_installed_game(app_name)
game = self.lgd.get_game_meta(addon_app_name if addon_app_name else app_name)
game = self.lgd.get_game_meta(app_name)
# Disable wine for non-Windows executables (e.g. native macOS)
if not install.platform.startswith('Win'):
@ -734,13 +710,6 @@ class LegendaryCore:
self.log.warning(f'Parsing predefined launch parameters failed with: {e!r}, '
f'input: {install.launch_parameters}')
if meta_args := game.additional_command_line:
try:
params.game_parameters.extend(shlex.split(meta_args.strip(), posix=False))
except ValueError as e:
self.log.warning(f'Parsing metadata launch parameters failed with: {e!r}, '
f'input: {install.launch_parameters}')
game_token = ''
if not offline:
self.log.info('Getting authentication token...')
@ -748,8 +717,10 @@ class LegendaryCore:
elif not install.can_run_offline:
self.log.warning('Game is not approved for offline use and may not work correctly.')
user_name = self.lgd.userdata['displayName']
account_id = self.lgd.userdata['account_id']
user_name = user or self.lgd.userdata['displayName']
if user:
user_name = user
params.egl_parameters.extend([
'-AUTH_LOGIN=unused',
@ -787,7 +758,10 @@ class LegendaryCore:
return params
def get_origin_uri(self, app_name: str, offline: bool = False) -> str:
token = '0' if offline else self.egs.get_game_token()['code']
if offline:
token = '0'
else:
token = self.egs.get_game_token()['code']
user_name = self.lgd.userdata['displayName']
account_id = self.lgd.userdata['account_id']
@ -843,20 +817,18 @@ class LegendaryCore:
}
if sys_platform == 'win32':
path_vars |= {
path_vars.update({
'{appdata}': os.path.expandvars('%LOCALAPPDATA%'),
'{userdir}': os.path.expandvars('%userprofile%/documents'),
'{userprofile}': os.path.expandvars('%userprofile%'),
'{usersavedgames}': os.path.expandvars('%userprofile%/Saved Games'),
}
'{usersavedgames}': os.path.expandvars('%userprofile%/Saved Games')
})
elif sys_platform == 'darwin' and platform == 'Mac':
path_vars |= {
# Note: EGL actually resolves this to "~/Library/Application Support/Epic", but the only game
# I could find using this (Loop Hero) expects it to be "~/Library/Application Support".
path_vars.update({
'{appdata}': os.path.expanduser('~/Library/Application Support'),
'{userdir}': os.path.expanduser('~/Documents'),
'{userlibrary}': os.path.expanduser('~/Library'),
}
'{userlibrary}': os.path.expanduser('~/Library')
})
else:
wine_pfx = None
# on mac CrossOver takes precedence so check for a bottle first
@ -887,18 +859,6 @@ class LegendaryCore:
wine_pfx = self.lgd.config.get('default.env', 'WINEPREFIX', fallback=None)
wine_pfx = self.lgd.config.get('default', 'wine_prefix', fallback=wine_pfx)
# If we still didn't find anything, try to read the prefix from the environment variables of this process
if not wine_pfx and sys_platform == 'darwin':
cx_bottle = os.getenv('CX_BOTTLE')
if cx_bottle and mac_is_valid_bottle(cx_bottle):
wine_pfx = mac_get_bottle_path(cx_bottle)
if not wine_pfx:
if proton_pfx := os.getenv('STEAM_COMPAT_DATA_PATH'):
wine_pfx = f'{proton_pfx}/pfx'
else:
wine_pfx = os.getenv('WINEPREFIX', wine_pfx)
# if all else fails, use the WINE default
if not wine_pfx:
wine_pfx = os.path.expanduser('~/.wine')
@ -1024,22 +984,9 @@ class LegendaryCore:
if not os.path.exists(_save_dir):
os.makedirs(_save_dir)
if app_name and clean_dir:
game = self.lgd.get_game_meta(app_name)
custom_attr = game.metadata['customAttributes']
include_f = exclude_f = None
# Make sure to only delete files that match the include/exclude filters.
# This is particularly import for games that store save games in their install dir...
if (_include := custom_attr.get('CloudIncludeList', {}).get('value', None)) is not None:
include_f = _include.split(',')
if (_exclude := custom_attr.get('CloudExcludeList', {}).get('value', None)) is not None:
exclude_f = _exclude.split(',')
sgh = SaveGameHelper()
save_files = sgh.get_deletion_list(_save_dir, include_f, exclude_f)
if clean_dir:
self.log.info('Deleting old save files...')
delete_filelist(_save_dir, save_files, silent=True)
delete_folder(_save_dir)
self.log.info(f'Downloading "{fname.split("/", 2)[2]}"...')
# download manifest
@ -1150,7 +1097,7 @@ class LegendaryCore:
missing_chunks += 1
if (0 < missing_chunks < total_chunks and delete_incomplete) or missing_chunks == total_chunks:
self.log.error('Chunk(s) missing, marking manifest for deletion.')
self.log.error(f'Chunk(s) missing, marking manifest for deletion.')
deletion_list.append(fname)
continue
elif 0 < missing_chunks < total_chunks:
@ -1192,7 +1139,10 @@ class LegendaryCore:
for ass in self.get_assets(True):
if ass.app_name == app_name:
return ass.build_version == installed.version
if ass.build_version != installed.version:
return False
else:
return True
# if we get here something is very wrong
raise ValueError(f'Could not find {app_name} in asset list!')
@ -1203,10 +1153,10 @@ class LegendaryCore:
return self._get_installed_game(app_name) is not None
def is_dlc(self, app_name: str) -> bool:
if meta := self.lgd.get_game_meta(app_name):
return meta.is_dlc
else:
meta = self.lgd.get_game_meta(app_name)
if not meta:
raise ValueError('Game unknown!')
return meta.is_dlc
@staticmethod
def load_manifest(data: bytes) -> Manifest:
@ -1246,34 +1196,19 @@ class LegendaryCore:
def get_cdn_manifest(self, game, platform='Windows', disable_https=False):
manifest_urls, base_urls, manifest_hash = self.get_cdn_urls(game, platform)
if not manifest_urls:
raise ValueError('No manifest URLs returned by API')
if disable_https:
manifest_urls = [url.replace('https://', 'http://') for url in manifest_urls]
for url in manifest_urls:
self.log.debug(f'Trying to download manifest from "{url}"...')
try:
r = self.egs.unauth_session.get(url, timeout=10.0)
except Exception as e:
self.log.warning(f'Unable to download manifest from "{urlparse(url).netloc}" '
f'(Exception: {e!r}), trying next URL...')
continue
if r.status_code == 200:
manifest_bytes = r.content
break
else:
self.log.warning(f'Unable to download manifest from "{urlparse(url).netloc}" '
f'(status: {r.status_code}), trying next URL...')
else:
raise ValueError(f'Unable to get manifest from any CDN URL, last result: {r.status_code} ({r.reason})')
self.log.debug(f'Downloading manifest from {manifest_urls[0]} ...')
r = self.egs.unauth_session.get(manifest_urls[0])
r.raise_for_status()
manifest_bytes = r.content
if sha1(manifest_bytes).hexdigest() != manifest_hash:
raise ValueError('Manifest sha hash mismatch!')
return manifest_bytes, base_urls
return r.content, base_urls
def get_uri_manifest(self, uri):
if uri.startswith('http'):
@ -1294,7 +1229,10 @@ class LegendaryCore:
return None
r = self.egs.unauth_session.get(f'{base_url}/Deltas/{new_build_id}/{old_build_id}.delta')
return r.content if r.status_code == 200 else None
if r.status_code == 200:
return r.content
else:
return None
def prepare_download(self, game: Game, base_game: Game = None, base_path: str = '',
status_q: Queue = None, max_shm: int = 0, max_workers: int = 0,
@ -1307,7 +1245,7 @@ class LegendaryCore:
repair: bool = False, repair_use_latest: bool = False,
disable_delta: bool = False, override_delta_manifest: str = '',
egl_guid: str = '', preferred_cdn: str = None,
disable_https: bool = False, bind_ip: str = None) -> (DLManager, AnalysisResult, ManifestMeta):
disable_https: bool = False) -> (DLManager, AnalysisResult, ManifestMeta):
# load old manifest
old_manifest = None
@ -1370,7 +1308,7 @@ class LegendaryCore:
self.log.info(f'Using optimized delta manifest to upgrade from build '
f'"{old_manifest.meta.build_id}" to '
f'"{new_manifest.meta.build_id}"...')
new_manifest.apply_delta_manifest(delta_manifest)
combine_manifests(new_manifest, delta_manifest)
else:
self.log.debug(f'No Delta manifest received from CDN.')
@ -1395,20 +1333,18 @@ class LegendaryCore:
if platform == 'Mac':
# if we're on mac and the path to the binary does not start with <something>.app,
# treat it as if it were a Windows game instead and install it to the default folder.
if all('.app' in fm.filename.partition('/')[0].lower()
for fm in new_manifest.file_manifest_list.elements):
if '.app' not in new_manifest.meta.launch_exe.partition('/')[0].lower():
base_path = self.get_default_install_dir(platform='Windows')
else:
# If it is a .app omit the game folder
game_folder = ''
else:
base_path = self.get_default_install_dir(platform='Windows')
# make sure base directory actually exists (but do not create game dir)
if not os.path.exists(base_path):
self.log.info(f'"{base_path}" does not exist, creating...')
os.makedirs(base_path)
install_path = os.path.normpath(os.path.join(base_path, game_folder.strip()))
install_path = os.path.normpath(os.path.join(base_path, game_folder))
# check for write access on the install path or its parent directory if it doesn't exist yet
base_path = os.path.dirname(install_path)
@ -1474,7 +1410,7 @@ class LegendaryCore:
dlm = DLManager(install_path, base_url, resume_file=resume_file, status_q=status_q,
max_shared_memory=max_shm * 1024 * 1024, max_workers=max_workers,
dl_timeout=dl_timeout, bind_ip=bind_ip)
dl_timeout=dl_timeout)
anlres = dlm.run_analysis(manifest=new_manifest, old_manifest=old_manifest,
patch=not disable_patching, resume=not force,
file_prefix_filter=file_prefix_filter,
@ -1487,35 +1423,20 @@ class LegendaryCore:
prereq = dict(ids=new_manifest.meta.prereq_ids, name=new_manifest.meta.prereq_name,
path=new_manifest.meta.prereq_path, args=new_manifest.meta.prereq_args)
uninstaller = None
if new_manifest.meta.uninstall_action_path:
uninstaller = dict(path=new_manifest.meta.uninstall_action_path,
args=new_manifest.meta.uninstall_action_args)
offline = game.metadata.get('customAttributes', {}).get('CanRunOffline', {}).get('value', 'true')
ot = game.metadata.get('customAttributes', {}).get('OwnershipToken', {}).get('value', 'false')
if file_install_tag is None:
file_install_tag = []
# Override exe at an install level to avoid breaking existing config overrides
executable = new_manifest.meta.launch_exe
if platform != 'Mac' and (exe_override := get_exe_override(app_name=game.app_name)):
exe_override_l = exe_override.lower()
# make sure that override exe even exists
if any(fm.filename.lower() == exe_override_l for fm in new_manifest.file_manifest_list.elements):
self.log.info(f'Launch exe will be changed from "{executable}" to "{exe_override}" for compatibility')
executable = exe_override
igame = InstalledGame(app_name=game.app_name, title=game.app_title,
version=new_manifest.meta.build_version, prereq_info=prereq,
manifest_path=override_manifest, base_urls=base_urls,
install_path=install_path, executable=executable,
install_path=install_path, executable=new_manifest.meta.launch_exe,
launch_parameters=new_manifest.meta.launch_command,
can_run_offline=offline == 'true', requires_ot=ot == 'true',
is_dlc=base_game is not None, install_size=anlres.install_size,
egl_guid=egl_guid, install_tags=file_install_tag,
platform=platform, uninstaller=uninstaller)
platform=platform)
return dlm, anlres, igame
@ -1551,14 +1472,14 @@ class LegendaryCore:
min_disk_space = analysis.disk_space_delta
_, _, free = shutil.disk_usage(base_path)
if free < min_disk_space:
free_gib = free / 1024**3
required_gib = min_disk_space / 1024**3
free_mib = free / 1024 / 1024
required_mib = min_disk_space / 1024 / 1024
if ignore_space_req:
results.warnings.add(f'Potentially not enough available disk space! '
f'{free_gib:.02f} GiB < {required_gib:.02f} GiB')
f'{free_mib:.02f} MiB < {required_mib:.02f} MiB')
else:
results.failures.add(f'Not enough available disk space! '
f'{free_gib:.02f} GiB < {required_gib:.02f} GiB')
f'{free_mib:.02f} MiB < {required_mib:.02f} MiB')
else:
results.failures.add(f'Install path "{base_path}" does not exist, make sure all necessary mounts are '
f'available. If you previously deleted the game folder without uninstalling, run '
@ -1597,40 +1518,6 @@ class LegendaryCore:
'installation via Uplay running in WINE (e.g. using Lutris) is recommended. '
'Use "legendary activate --uplay" and follow the instructions.')
# Detect 2K launcher, warn about it
if '2klauncher' in install.executable.lower():
results.warnings.add('This game uses the 2K Launcher which is does not work with Legendary. '
'Consult the Legendary or EpicLinux Wikis for workarounds (e.g. exe override).')
# suggest alternative EXEs
alts = []
all_files = analysis.manifest_comparison.added \
| analysis.manifest_comparison.unchanged \
| analysis.manifest_comparison.changed
for fname in sorted(all_files):
if (fname_l := fname.lower()).endswith('.exe'):
if 'prereq' in fname_l or '2klauncher' in fname_l:
continue
alts.append(fname)
# todo move this to "install" command as an interactive selection
alt_str = '\n'.join(f' + {alt}' for alt in alts)
results.warnings.add('You may want to consider trying one of the following executables '
f'(see README for launch parameter/config option usage):\n{alt_str}')
# Detect EOS service
eos_installer = next((f for f in analysis.manifest_comparison.added
if 'epiconlineservicesinstaller' in f.lower()), None)
has_bootstrapper = any('eosbootstrapper' in f.lower() for f in analysis.manifest_comparison.added)
if eos_installer:
results.warnings.add('This game ships the Epic Online Services Windows service, '
'it may have to be installed for the game to work properly. '
f'To do so, run "{eos_installer}" inside the game directory '
f'after the install has finished.')
elif has_bootstrapper:
results.warnings.add('This game ships the Epic Online Services bootstrapper. '
'The Epic Online Services Windows service may have to be '
'installed manually for the game to function properly.')
return results
def get_default_install_dir(self, platform='Windows'):
@ -1784,9 +1671,6 @@ class LegendaryCore:
def egl_import(self, app_name):
if not self.asset_valid(app_name):
raise ValueError(f'To-be-imported game {app_name} not in game asset database!')
if not self.lgd.lock_installed():
self.log.warning('Could not acquire lock for EGL import')
return
self.log.debug(f'Importing "{app_name}" from EGL')
# load egl json file
@ -1834,12 +1718,9 @@ class LegendaryCore:
# mark game as installed
_ = self._install_game(lgd_igame)
return
def egl_export(self, app_name):
if not self.lgd.lock_installed():
self.log.warning('Could not acquire lock for EGL import')
return
self.log.debug(f'Exporting "{app_name}" to EGL')
# load igame/game
lgd_game = self.get_game(app_name)
@ -1901,10 +1782,6 @@ class LegendaryCore:
"""
Sync game installs between Legendary and the Epic Games Launcher
"""
if not self.lgd.lock_installed():
self.log.warning('Could not acquire lock for EGL sync')
return
# read egl json files
if app_name:
lgd_igame = self._get_installed_game(app_name)

View file

@ -22,14 +22,14 @@ from legendary.models.manifest import ManifestComparison, Manifest
class DLManager(Process):
def __init__(self, download_dir, base_url, cache_dir=None, status_q=None,
max_workers=0, update_interval=1.0, dl_timeout=10, resume_file=None,
max_shared_memory=1024 * 1024 * 1024, bind_ip=None):
max_shared_memory=1024 * 1024 * 1024):
super().__init__(name='DLManager')
self.log = logging.getLogger('DLM')
self.proc_debug = False
self.base_url = base_url
self.dl_dir = download_dir
self.cache_dir = cache_dir or os.path.join(download_dir, '.cache')
self.cache_dir = cache_dir if cache_dir else os.path.join(download_dir, '.cache')
# All the queues!
self.logging_queue = None
@ -37,11 +37,8 @@ class DLManager(Process):
self.writer_queue = None
self.dl_result_q = None
self.writer_result_q = None
# Worker stuff
self.max_workers = max_workers or min(cpu_count() * 2, 16)
self.max_workers = max_workers if max_workers else min(cpu_count() * 2, 16)
self.dl_timeout = dl_timeout
self.bind_ips = [] if not bind_ip else bind_ip.split(',')
# Analysis stuff
self.analysis = None
@ -140,24 +137,6 @@ class DLManager(Process):
except Exception as e:
self.log.warning(f'Reading resume file failed: {e!r}, continuing as normal...')
elif resume:
# Basic check if files exist locally, put all missing files into "added"
# This allows new SDL tags to be installed without having to do a repair as well.
missing_files = set()
for fm in manifest.file_manifest_list.elements:
if fm.filename in mc.added:
continue
local_path = os.path.join(self.dl_dir, fm.filename)
if not os.path.exists(local_path):
missing_files.add(fm.filename)
self.log.info(f'Found {len(missing_files)} missing files.')
mc.added |= missing_files
mc.changed -= missing_files
mc.unchanged -= missing_files
# Install tags are used for selective downloading, e.g. for language packs
additional_deletion_tasks = []
if file_install_tag is not None:
@ -658,15 +637,10 @@ class DLManager(Process):
self.writer_result_q = MPQueue(-1)
self.log.info(f'Starting download workers...')
bind_ip = None
for i in range(self.max_workers):
if self.bind_ips:
bind_ip = self.bind_ips[i % len(self.bind_ips)]
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q,
self.shared_memory.name, logging_queue=self.logging_queue,
dl_timeout=self.dl_timeout, bind_addr=bind_ip)
dl_timeout=self.dl_timeout)
self.children.append(w)
w.start()

View file

@ -1,6 +1,7 @@
# coding: utf-8
import os
import requests
import time
import logging
@ -9,9 +10,6 @@ from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
from queue import Empty
import requests
from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
from legendary.models.chunk import Chunk
from legendary.models.downloading import (
DownloaderTask, DownloaderTaskResult,
@ -20,22 +18,9 @@ from legendary.models.downloading import (
)
class BindingHTTPAdapter(HTTPAdapter):
def __init__(self, addr):
self.__attrs__.append('addr')
self.addr = addr
super().__init__()
def init_poolmanager(
self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs
):
pool_kwargs['source_address'] = (self.addr, 0)
super().init_poolmanager(connections, maxsize, block, **pool_kwargs)
class DLWorker(Process):
def __init__(self, name, queue, out_queue, shm, max_retries=7,
logging_queue=None, dl_timeout=10, bind_addr=None):
logging_queue=None, dl_timeout=10):
super().__init__(name=name)
self.q = queue
self.o_q = out_queue
@ -49,12 +34,6 @@ class DLWorker(Process):
self.logging_queue = logging_queue
self.dl_timeout = float(dl_timeout) if dl_timeout else 10.0
# optionally bind an address
if bind_addr:
adapter = BindingHTTPAdapter(bind_addr)
self.session.mount('https://', adapter)
self.session.mount('http://', adapter)
def run(self):
# we have to fix up the logger before we can start
_root = logging.getLogger()
@ -72,12 +51,12 @@ class DLWorker(Process):
empty = False
except Empty:
if not empty:
logger.debug('Queue Empty, waiting for more...')
logger.debug(f'Queue Empty, waiting for more...')
empty = True
continue
if isinstance(job, TerminateWorkerTask): # let worker die
logger.debug('Worker received termination signal, shutting down...')
logger.debug(f'Worker received termination signal, shutting down...')
break
tries = 0
@ -120,18 +99,17 @@ class DLWorker(Process):
break
if not chunk:
logger.warning('Chunk somehow None?')
logger.warning(f'Chunk somehow None?')
self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__))
continue
# decompress stuff
try:
data = chunk.data
size = len(data)
size = len(chunk.data)
if size > job.shm.size:
logger.fatal('Downloaded chunk is longer than SharedMemorySegment!')
logger.fatal(f'Downloaded chunk is longer than SharedMemorySegment!')
self.shm.buf[job.shm.offset:job.shm.offset + size] = data
self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data)
del chunk
self.o_q.put(DownloaderTaskResult(success=True, size_decompressed=size,
size_downloaded=compressed, **job.__dict__))
@ -152,7 +130,7 @@ class FileWorker(Process):
self.q = queue
self.o_q = out_queue
self.base_path = base_path
self.cache_path = cache_path or os.path.join(base_path, '.cache')
self.cache_path = cache_path if cache_path else os.path.join(base_path, '.cache')
self.shm = SharedMemory(name=shm)
self.log_level = logging.getLogger().level
self.logging_queue = logging_queue
@ -165,7 +143,7 @@ class FileWorker(Process):
logger = logging.getLogger(self.name)
logger.setLevel(self.log_level)
logger.debug('Download worker reporting for duty!')
logger.debug(f'Download worker reporting for duty!')
last_filename = ''
current_file = None
@ -181,7 +159,7 @@ class FileWorker(Process):
if isinstance(j, TerminateWorkerTask):
if current_file:
current_file.close()
logger.debug('Worker received termination signal, shutting down...')
logger.debug(f'Worker received termination signal, shutting down...')
# send termination task to results halnder as well
self.o_q.put(TerminateWorkerTask())
break
@ -272,7 +250,7 @@ class FileWorker(Process):
if j.shared_memory:
shm_offset = j.shared_memory.offset + j.chunk_offset
shm_end = shm_offset + j.chunk_size
current_file.write(self.shm.buf[shm_offset:shm_end])
current_file.write(self.shm.buf[shm_offset:shm_end].tobytes())
elif j.cache_file:
with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f:
if j.chunk_offset:

View file

@ -34,18 +34,12 @@ class EPCLFS:
if not self.appdata_path:
raise ValueError('EGS AppData path is not set')
if not os.path.isdir(self.appdata_path):
raise ValueError('EGS AppData path does not exist')
self.config.read(os.path.join(self.appdata_path, 'GameUserSettings.ini'), encoding='utf-8')
def save_config(self):
if not self.appdata_path:
raise ValueError('EGS AppData path is not set')
if not os.path.isdir(self.appdata_path):
raise ValueError('EGS AppData path does not exist')
with open(os.path.join(self.appdata_path, 'GameUserSettings.ini'), 'w', encoding='utf-8') as f:
self.config.write(f, space_around_delimiters=False)
@ -53,10 +47,6 @@ class EPCLFS:
if not self.programdata_path:
raise ValueError('EGS ProgramData path is not set')
if not os.path.isdir(self.programdata_path):
# Not sure if we should `raise` here as well
return
for f in os.listdir(self.programdata_path):
if f.endswith('.item'):
data = json.load(open(os.path.join(self.programdata_path, f), encoding='utf-8'))
@ -81,9 +71,6 @@ class EPCLFS:
if not self.programdata_path:
raise ValueError('EGS ProgramData path is not set')
if not os.path.isdir(self.programdata_path):
raise ValueError('EGS ProgramData path does not exist')
manifest_data = manifest.to_json()
self.manifests[manifest.app_name] = manifest_data
_path = os.path.join(self.programdata_path, f'{manifest.installation_guid}.item')

View file

@ -4,31 +4,22 @@ import json
import os
import logging
from contextlib import contextmanager
from collections import defaultdict
from pathlib import Path
from time import time
from filelock import FileLock
from .utils import clean_filename, LockedJSONData
from legendary.models.game import *
from legendary.utils.aliasing import generate_aliases
from legendary.models.config import LGDConf
from legendary.utils.config import LGDConf
from legendary.utils.env import is_windows_mac_or_pyi
FILELOCK_DEBUG = False
from legendary.utils.lfs import clean_filename
class LGDLFS:
def __init__(self, config_file=None):
self.log = logging.getLogger('LGDLFS')
if config_path := os.environ.get('LEGENDARY_CONFIG_PATH'):
self.path = config_path
elif config_path := os.environ.get('XDG_CONFIG_HOME'):
if config_path := os.environ.get('XDG_CONFIG_HOME'):
self.path = os.path.join(config_path, 'legendary')
else:
self.path = os.path.expanduser('~/.config/legendary')
@ -92,18 +83,13 @@ class LGDLFS:
self.log.warning(f'Removing "{os.path.join(self.path, "manifests", "old")}" folder failed: '
f'{e!r}, please remove manually')
if not FILELOCK_DEBUG:
# Prevent filelock logger from spamming Legendary debug output
filelock_logger = logging.getLogger('filelock')
filelock_logger.setLevel(logging.INFO)
# try loading config
try:
self.config.read(self.config_path)
except Exception as e:
self.log.error(f'Unable to read configuration file, please ensure that file is valid! '
f'(Error: {repr(e)})')
self.log.warning('Continuing with blank config in safe-mode...')
self.log.warning(f'Continuing with blank config in safe-mode...')
self.config.read_only = True
# make sure "Legendary" section exists
@ -118,8 +104,6 @@ class LGDLFS:
self.config.set('Legendary', '; Disables the notice about an available update on exit')
self.config.set('Legendary', 'disable_update_notice', 'false' if is_windows_mac_or_pyi() else 'true')
self._installed_lock = FileLock(os.path.join(self.path, 'installed.json') + '.lock')
try:
self._installed = json.load(open(os.path.join(self.path, 'installed.json')))
except Exception as e:
@ -145,35 +129,31 @@ class LGDLFS:
except Exception as e:
self.log.debug(f'Loading aliases failed with {e!r}')
@property
@contextmanager
def userdata_lock(self) -> LockedJSONData:
"""Wrapper around the lock to automatically update user data when it is released"""
with LockedJSONData(os.path.join(self.path, 'user.json')) as lock:
try:
yield lock
finally:
self._user_data = lock.data
@property
def userdata(self):
if self._user_data is not None:
return self._user_data
try:
with self.userdata_lock as locked:
return locked.data
self._user_data = json.load(open(os.path.join(self.path, 'user.json')))
return self._user_data
except Exception as e:
self.log.debug(f'Failed to load user data: {e!r}')
return None
@userdata.setter
def userdata(self, userdata):
raise NotImplementedError('The setter has been removed, use the locked userdata instead.')
if userdata is None:
raise ValueError('Userdata is none!')
self._user_data = userdata
json.dump(userdata, open(os.path.join(self.path, 'user.json'), 'w'),
indent=2, sort_keys=True)
def invalidate_userdata(self):
with self.userdata_lock as lock:
lock.clear()
self._user_data = None
if os.path.exists(os.path.join(self.path, 'user.json')):
os.remove(os.path.join(self.path, 'user.json'))
@property
def entitlements(self):
@ -240,7 +220,8 @@ class LGDLFS:
f.write(manifest_data)
def get_game_meta(self, app_name):
if _meta := self._game_metadata.get(app_name, None):
_meta = self._game_metadata.get(app_name, None)
if _meta:
return Game.from_json(_meta)
return None
@ -251,14 +232,14 @@ class LGDLFS:
json.dump(json_meta, open(meta_file, 'w'), indent=2, sort_keys=True)
def delete_game_meta(self, app_name):
if app_name not in self._game_metadata:
if app_name in self._game_metadata:
del self._game_metadata[app_name]
meta_file = os.path.join(self.path, 'metadata', f'{app_name}.json')
if os.path.exists(meta_file):
os.remove(meta_file)
else:
raise ValueError(f'Game {app_name} does not exist in metadata DB!')
del self._game_metadata[app_name]
meta_file = os.path.join(self.path, 'metadata', f'{app_name}.json')
if os.path.exists(meta_file):
os.remove(meta_file)
def get_game_app_names(self):
return sorted(self._game_metadata.keys())
@ -282,16 +263,7 @@ class LGDLFS:
self.log.warning(f'Failed to delete file "{f}": {e!r}')
def clean_manifests(self, in_use):
in_use_files = {
f'{clean_filename(f"{app_name}_{version}")}.manifest'
for app_name, version, _ in in_use
}
in_use_files |= {
f'{clean_filename(f"{app_name}_{platform}_{version}")}.manifest'
for app_name, version, platform in in_use
}
in_use_files = set(f'{clean_filename(f"{app_name}_{version}")}.manifest' for app_name, version in in_use)
for f in os.listdir(os.path.join(self.path, 'manifests')):
if f not in in_use_files:
try:
@ -299,27 +271,6 @@ class LGDLFS:
except Exception as e:
self.log.warning(f'Failed to delete file "{f}": {e!r}')
def lock_installed(self) -> bool:
"""
Locks the install data. We do not care about releasing this lock.
If it is acquired by a Legendary instance it should own the lock until it exits.
Some operations such as egl sync may be simply skipped if a lock cannot be acquired
"""
if self._installed_lock.is_locked:
return True
try:
self._installed_lock.acquire(blocking=False)
# reload data in case it has been updated elsewhere
try:
self._installed = json.load(open(os.path.join(self.path, 'installed.json')))
except Exception as e:
self.log.debug(f'Failed to load installed game data: {e!r}')
return True
except TimeoutError:
return False
def get_installed_game(self, app_name):
if self._installed is None:
try:
@ -328,7 +279,8 @@ class LGDLFS:
self.log.debug(f'Failed to load installed game data: {e!r}')
return None
if game_json := self._installed.get(app_name, None):
game_json = self._installed.get(app_name, None)
if game_json:
return InstalledGame.from_json(game_json)
return None
@ -437,7 +389,7 @@ class LGDLFS:
def get_overlay_install_info(self):
if not self._overlay_install_info:
try:
data = json.load(open(os.path.join(self.path, 'overlay_install.json')))
data = json.load(open(os.path.join(self.path, f'overlay_install.json')))
self._overlay_install_info = InstalledGame.from_json(data)
except Exception as e:
self.log.debug(f'Failed to load overlay install data: {e!r}')
@ -485,7 +437,9 @@ class LGDLFS:
def serialise_sets(obj):
"""Turn sets into sorted lists for storage"""
return sorted(obj) if isinstance(obj, set) else obj
if isinstance(obj, set):
return sorted(obj)
return obj
json.dump(alias_map, open(os.path.join(self.path, 'aliases.json'), 'w', newline='\n'),
indent=2, sort_keys=True, default=serialise_sets)

View file

@ -113,7 +113,10 @@ class Chunk:
return _chunk
def write(self, fp=None, compress=True):
bio = fp or BytesIO()
if not fp:
bio = BytesIO()
else:
bio = fp
self.uncompressed_size = self.compressed_size = len(self.data)
if compress or self.compressed:
@ -140,4 +143,7 @@ class Chunk:
# finally, add the data
bio.write(self._data)
return bio.tell() if fp else bio.getvalue()
if not fp:
return bio.getvalue()
else:
return bio.tell()

View file

@ -145,9 +145,9 @@ class EGLManifest:
tmp.executable = igame.executable
tmp.main_game_appname = game.app_name # todo for DLC support this needs to be the base game
tmp.app_folder_name = game.metadata.get('customAttributes', {}).get('FolderName', {}).get('value', '')
tmp.manifest_location = f'{igame.install_path}/.egstore'
tmp.manifest_location = igame.install_path + '/.egstore'
tmp.ownership_token = igame.requires_ot
tmp.staging_location = f'{igame.install_path}/.egstore/bps'
tmp.staging_location = igame.install_path + '/.egstore/bps'
tmp.can_run_offline = igame.can_run_offline
tmp.is_incomplete_install = False
tmp.needs_validation = igame.needs_verification

View file

@ -91,18 +91,6 @@ class Game:
def supports_mac_cloud_saves(self):
return self.metadata and (self.metadata.get('customAttributes', {}).get('CloudSaveFolder_MAC') is not None)
@property
def additional_command_line(self):
if not self.metadata:
return None
return self.metadata.get('customAttributes', {}).get('AdditionalCommandLine', {}).get('value', None)
@property
def is_launchable_addon(self):
if not self.metadata:
return False
return any(m['path'] == 'addons/launchable' for m in self.metadata.get('categories', []))
@property
def catalog_item_id(self):
if not self.metadata:
@ -161,7 +149,6 @@ class InstalledGame:
needs_verification: bool = False
platform: str = 'Windows'
prereq_info: Optional[Dict] = None
uninstaller: Optional[Dict] = None
requires_ot: bool = False
save_path: Optional[str] = None
@ -178,7 +165,6 @@ class InstalledGame:
tmp.executable = json.get('executable', '')
tmp.launch_parameters = json.get('launch_parameters', '')
tmp.prereq_info = json.get('prereq_info', None)
tmp.uninstaller = json.get('uninstaller', None)
tmp.can_run_offline = json.get('can_run_offline', False)
tmp.requires_ot = json.get('requires_ot', False)

View file

@ -1,7 +1,5 @@
# coding: utf-8
from __future__ import annotations
import hashlib
import logging
import struct
@ -9,7 +7,6 @@ import zlib
from base64 import b64encode
from io import BytesIO
from typing import Optional
logger = logging.getLogger('Manifest')
@ -64,7 +61,7 @@ def get_chunk_dir(version):
class Manifest:
header_magic = 0x44BEC00C
default_serialisation_version = 17
serialisation_version = 18
def __init__(self):
self.header_size = 41
@ -76,10 +73,10 @@ class Manifest:
self.data = b''
# remainder
self.meta: Optional[ManifestMeta] = None
self.chunk_data_list: Optional[CDL] = None
self.file_manifest_list: Optional[FML] = None
self.custom_fields: Optional[CustomFields] = None
self.meta = None
self.chunk_data_list = None
self.file_manifest_list = None
self.custom_fields = None
@property
def compressed(self):
@ -95,7 +92,8 @@ class Manifest:
_m.file_manifest_list = FML.read(_tmp)
_m.custom_fields = CustomFields.read(_tmp)
if unhandled_data := _tmp.read():
unhandled_data = _tmp.read()
if unhandled_data:
logger.warning(f'Did not read {len(unhandled_data)} remaining bytes in manifest! '
f'This may not be a problem.')
@ -140,26 +138,6 @@ class Manifest:
def write(self, fp=None, compress=True):
body_bio = BytesIO()
# set serialisation version based on enabled features or original version
target_version = max(self.default_serialisation_version, self.meta.feature_level)
if self.meta.data_version == 2:
target_version = max(21, target_version)
elif self.file_manifest_list.version == 2:
target_version = max(20, target_version)
elif self.file_manifest_list.version == 1:
target_version = max(19, target_version)
elif self.meta.data_version == 1:
target_version = max(18, target_version)
# Downgrade manifest if unknown newer version
if target_version > 21:
logger.warning(f'Trying to serialise an unknown target version: {target_version},'
f'clamping to 21.')
target_version = 21
# Ensure metadata will be correct
self.meta.feature_level = target_version
self.meta.write(body_bio)
self.chunk_data_list.write(body_bio)
self.file_manifest_list.write(body_bio)
@ -174,7 +152,10 @@ class Manifest:
self.data = zlib.compress(self.data)
self.size_compressed = len(self.data)
bio = fp or BytesIO()
if not fp:
bio = BytesIO()
else:
bio = fp
bio.write(struct.pack('<I', self.header_magic))
bio.write(struct.pack('<I', self.header_size))
@ -182,50 +163,18 @@ class Manifest:
bio.write(struct.pack('<I', self.size_compressed))
bio.write(self.sha_hash)
bio.write(struct.pack('B', self.stored_as))
bio.write(struct.pack('<I', target_version))
bio.write(struct.pack('<I', self.serialisation_version))
bio.write(self.data)
return bio.tell() if fp else bio.getvalue()
def apply_delta_manifest(self, delta_manifest: Manifest):
added = set()
# overwrite file elements with the ones from the delta manifest
for idx, file_elem in enumerate(self.file_manifest_list.elements):
try:
delta_file = delta_manifest.file_manifest_list.get_file_by_path(file_elem.filename)
self.file_manifest_list.elements[idx] = delta_file
added.add(delta_file.filename)
except ValueError:
pass
# add other files that may be missing
for delta_file in delta_manifest.file_manifest_list.elements:
if delta_file.filename not in added:
self.file_manifest_list.elements.append(delta_file)
# update count and clear map
self.file_manifest_list.count = len(self.file_manifest_list.elements)
self.file_manifest_list._path_map = None
# ensure guid map exists (0 will most likely yield no result, so ignore ValueError)
try:
self.chunk_data_list.get_chunk_by_guid(0)
except ValueError:
pass
# add new chunks from delta manifest to main manifest and again clear maps and update count
existing_chunk_guids = self.chunk_data_list._guid_int_map.keys()
for chunk in delta_manifest.chunk_data_list.elements:
if chunk.guid_num not in existing_chunk_guids:
self.chunk_data_list.elements.append(chunk)
self.chunk_data_list.count = len(self.chunk_data_list.elements)
self.chunk_data_list._guid_map = None
self.chunk_data_list._guid_int_map = None
self.chunk_data_list._path_map = None
if not fp:
return bio.getvalue()
else:
return bio.tell()
class ManifestMeta:
serialisation_version = 0
def __init__(self):
self.meta_size = 0
self.data_version = 0
@ -240,8 +189,6 @@ class ManifestMeta:
self.prereq_name = ''
self.prereq_path = ''
self.prereq_args = ''
self.uninstall_action_path = ''
self.uninstall_action_args = ''
# this build id is used for something called "delta file" which I guess I'll have to implement eventually
self._build_id = ''
@ -279,20 +226,16 @@ class ManifestMeta:
# This is a list though I've never seen more than one entry
entries = struct.unpack('<I', bio.read(4))[0]
for _ in range(entries):
for i in range(entries):
_meta.prereq_ids.append(read_fstring(bio))
_meta.prereq_name = read_fstring(bio)
_meta.prereq_path = read_fstring(bio)
_meta.prereq_args = read_fstring(bio)
# Manifest version 18 with data version >= 1 stores build ID
if _meta.data_version >= 1:
# apparently there's a newer version that actually stores *a* build id.
if _meta.data_version > 0:
_meta._build_id = read_fstring(bio)
# Manifest version 21 with data version >= 2 stores uninstall commands
if _meta.data_version >= 2:
_meta.uninstall_action_path = read_fstring(bio)
_meta.uninstall_action_args = read_fstring(bio)
if (size_read := bio.tell()) != _meta.meta_size:
logger.warning(f'Did not read entire manifest metadata! Version: {_meta.data_version}, '
@ -307,7 +250,7 @@ class ManifestMeta:
meta_start = bio.tell()
bio.write(struct.pack('<I', 0)) # placeholder size
bio.write(struct.pack('B', self.data_version))
bio.write(struct.pack('B', self.serialisation_version))
bio.write(struct.pack('<I', self.feature_level))
bio.write(struct.pack('B', self.is_file_data))
bio.write(struct.pack('<I', self.app_id))
@ -324,11 +267,8 @@ class ManifestMeta:
write_fstring(bio, self.prereq_path)
write_fstring(bio, self.prereq_args)
if self.data_version >= 1:
if self.data_version > 0:
write_fstring(bio, self.build_id)
if self.data_version >= 2:
write_fstring(bio, self.uninstall_action_path)
write_fstring(bio, self.uninstall_action_args)
meta_end = bio.tell()
bio.seek(meta_start)
@ -337,6 +277,8 @@ class ManifestMeta:
class CDL:
serialisation_version = 0
def __init__(self):
self.version = 0
self.size = 0
@ -406,7 +348,7 @@ class CDL:
# the way this data is stored is rather odd, maybe there's a nicer way to write this...
for _ in range(_cdl.count):
for i in range(_cdl.count):
_cdl.elements.append(ChunkInfo(manifest_version=manifest_version))
# guid, doesn't seem to be a standard like UUID but is fairly straightfoward, 4 bytes, 128 bit.
@ -445,7 +387,7 @@ class CDL:
def write(self, bio):
cdl_start = bio.tell()
bio.write(struct.pack('<I', 0)) # placeholder size
bio.write(struct.pack('B', self.version))
bio.write(struct.pack('B', self.serialisation_version))
bio.write(struct.pack('<I', len(self.elements)))
for chunk in self.elements:
@ -524,6 +466,8 @@ class ChunkInfo:
class FML:
serialisation_version = 0
def __init__(self):
self.version = 0
self.size = 0
@ -551,7 +495,7 @@ class FML:
_fml.version = struct.unpack('B', bio.read(1))[0]
_fml.count = struct.unpack('<I', bio.read(4))[0]
for _ in range(_fml.count):
for i in range(_fml.count):
_fml.elements.append(FileManifest())
for fm in _fml.elements:
@ -572,14 +516,14 @@ class FML:
# install tags, no idea what they do, I've only seen them in the Fortnite manifest
for fm in _fml.elements:
_elem = struct.unpack('<I', bio.read(4))[0]
for _ in range(_elem):
for i in range(_elem):
fm.install_tags.append(read_fstring(bio))
# Each file is made up of "Chunk Parts" that can be spread across the "chunk stream"
for fm in _fml.elements:
_elem = struct.unpack('<I', bio.read(4))[0]
_offset = 0
for _ in range(_elem):
for i in range(_elem):
chunkp = ChunkPart()
_start = bio.tell()
_size = struct.unpack('<I', bio.read(4))[0]
@ -593,7 +537,7 @@ class FML:
logger.warning(f'Did not read {diff} bytes from chunk part!')
bio.seek(diff)
# MD5 hash + MIME type (Manifest feature level 19)
# MD5 hash + MIME type
if _fml.version >= 1:
for fm in _fml.elements:
_has_md5 = struct.unpack('<I', bio.read(4))[0]
@ -603,7 +547,7 @@ class FML:
for fm in _fml.elements:
fm.mime_type = read_fstring(bio)
# SHA256 hash (Manifest feature level 20)
# SHA256 hash
if _fml.version >= 2:
for fm in _fml.elements:
fm.hash_sha256 = bio.read(32)
@ -624,7 +568,7 @@ class FML:
def write(self, bio):
fml_start = bio.tell()
bio.write(struct.pack('<I', 0)) # placeholder size
bio.write(struct.pack('B', self.version))
bio.write(struct.pack('B', self.serialisation_version))
bio.write(struct.pack('<I', len(self.elements)))
for fm in self.elements:
@ -650,20 +594,6 @@ class FML:
bio.write(struct.pack('<I', cp.offset))
bio.write(struct.pack('<I', cp.size))
if self.version >= 1:
for fm in self.elements:
has_md5 = 1 if fm.hash_md5 else 0
bio.write(struct.pack('<I', has_md5))
if has_md5:
bio.write(fm.hash_md5)
for fm in self.elements:
write_fstring(bio, fm.mime_type)
if self.version >= 2:
for fm in self.elements:
bio.write(fm.hash_sha256)
fml_end = bio.tell()
bio.seek(fml_start)
bio.write(struct.pack('<I', fml_end - fml_start))
@ -707,7 +637,6 @@ class FileManifest:
_cp.append('[...]')
cp_repr = ', '.join(_cp)
# ToDo add MD5, MIME, SHA256 if those ever become relevant
return '<FileManifest (filename="{}", symlink_target="{}", hash={}, flags={}, ' \
'install_tags=[{}], chunk_parts=[{}], file_size={})>'.format(
self.filename, self.symlink_target, self.hash.hex(), self.flags,
@ -744,6 +673,8 @@ class ChunkPart:
class CustomFields:
serialisation_version = 0
def __init__(self):
self.size = 0
self.version = 0
@ -778,8 +709,15 @@ class CustomFields:
_cf.version = struct.unpack('B', bio.read(1))[0]
_cf.count = struct.unpack('<I', bio.read(4))[0]
_keys = [read_fstring(bio) for _ in range(_cf.count)]
_values = [read_fstring(bio) for _ in range(_cf.count)]
_keys = []
_values = []
for i in range(_cf.count):
_keys.append(read_fstring(bio))
for i in range(_cf.count):
_values.append(read_fstring(bio))
_cf._dict = dict(zip(_keys, _values))
if (size_read := bio.tell() - cf_start) != _cf.size:
@ -794,7 +732,7 @@ class CustomFields:
def write(self, bio):
cf_start = bio.tell()
bio.write(struct.pack('<I', 0)) # placeholder size
bio.write(struct.pack('B', self.version))
bio.write(struct.pack('B', self.serialisation_version))
bio.write(struct.pack('<I', len(self._dict)))
for key in self.keys():
@ -828,7 +766,8 @@ class ManifestComparison:
old_files = {fm.filename: fm.hash for fm in old_manifest.file_manifest_list.elements}
for fm in manifest.file_manifest_list.elements:
if old_file_hash := old_files.pop(fm.filename, None):
old_file_hash = old_files.pop(fm.filename, None)
if old_file_hash:
if fm.hash == old_file_hash:
comp.unchanged.add(fm.filename)
else:

View file

@ -1,5 +1,8 @@
def get_boolean_choice(prompt, default=True):
yn = 'Y/n' if default else 'y/N'
if default:
yn = 'Y/n'
else:
yn = 'y/N'
choice = input(f'{prompt} [{yn}]: ')
if not choice:
@ -18,10 +21,10 @@ def get_int_choice(prompt, default=None, min_choice=None, max_choice=None, retur
while True:
try:
if inp := input(prompt):
choice = int(inp)
else:
inp = input(prompt)
if not inp:
return default
choice = int(inp)
except ValueError:
if return_on_invalid:
return None
@ -58,7 +61,7 @@ def sdl_prompt(sdl_data, title):
examples = ', '.join([g for g in sdl_data.keys() if g != '__required'][:2])
print(f'Please enter tags of pack(s) to install (space/comma-separated, e.g. "{examples}")')
print('Leave blank to use defaults (only required data will be downloaded).')
choices = input('Additional packs [Enter to confirm]: ')
choices = input(f'Additional packs [Enter to confirm]: ')
if not choices:
return tags

View file

@ -5,7 +5,7 @@ class HiddenAliasSubparsersAction(argparse._SubParsersAction):
def add_parser(self, name, **kwargs):
# set prog from the existing prefix
if kwargs.get('prog') is None:
kwargs['prog'] = f'{self._prog_prefix} {name}'
kwargs['prog'] = '%s %s' % (self._prog_prefix, name)
aliases = kwargs.pop('aliases', ())
hide_aliases = kwargs.pop('hide_aliases', False)

View file

@ -4,7 +4,7 @@ import logging
from legendary.models.game import Game
if os.name == 'nt':
from legendary.lfs.windows_helpers import *
from legendary.utils.windows_helpers import *
logger = logging.getLogger('EOSUtils')
# Dummy Game objects to use with Core methods that expect them

View file

@ -1,7 +1,5 @@
# coding: utf-8
from sys import platform
# games where the download order optimizations are enabled by default
# a set() of versions can be specified, empty set means all versions.
_optimize_default = {
@ -13,15 +11,6 @@ _optimize_default = {
}
}
# Some games use launchers that don't work with Legendary, these are overriden here
_exe_overrides = {
'kinglet': {
'darwin': 'Base/Binaries/Win64EOS/CivilizationVI.exe',
'linux': 'Base/Binaries/Win64EOS/CivilizationVI.exe',
'win32': 'LaunchPad/LaunchPad.exe'
}
}
def is_opt_enabled(app_name, version):
if (versions := _optimize_default.get(app_name.lower())) is not None:
@ -30,15 +19,8 @@ def is_opt_enabled(app_name, version):
return False
def get_exe_override(app_name):
return _exe_overrides.get(app_name.lower(), {}).get(platform, None)
def update_workarounds(api_data):
if 'reorder_optimization' in api_data:
_optimize_default.clear()
_optimize_default.update(api_data['reorder_optimization'])
if 'executable_override' in api_data:
_exe_overrides.clear()
_exe_overrides.update(api_data['executable_override'])

View file

@ -3,16 +3,13 @@
import os
import shutil
import hashlib
import json
import logging
from pathlib import Path
from sys import stdout
from time import perf_counter
from time import time
from typing import List, Iterator
from filelock import FileLock
from legendary.models.game import VerifyResult
logger = logging.getLogger('LFS Utils')
@ -43,7 +40,7 @@ def delete_filelist(path: str, filenames: List[str],
_dir, _fn = os.path.split(filename)
if _dir:
dirs.add(_dir)
try:
os.remove(os.path.join(path, _dir, _fn))
except Exception as e:
@ -69,14 +66,14 @@ def delete_filelist(path: str, filenames: List[str],
if not silent:
logger.error(f'Failed removing directory "{_dir}" with {e!r}')
no_error = False
if delete_root_directory:
try:
os.rmdir(path)
except Exception as e:
if not silent:
logger.error(f'Removing game directory failed with {e!r}')
return no_error
@ -118,7 +115,7 @@ def validate_files(base_path: str, filelist: List[tuple], hash_type='sha1',
stdout.write('\n')
show_progress = True
interval = (_size / (1024 * 1024)) // 100
start_time = perf_counter()
start_time = time()
with open(full_path, 'rb') as f:
real_file_hash = hashlib.new(hash_type)
@ -128,7 +125,7 @@ def validate_files(base_path: str, filelist: List[tuple], hash_type='sha1',
if show_progress and i % interval == 0:
pos = f.tell()
perc = (pos / _size) * 100
speed = pos / 1024 / 1024 / (perf_counter() - start_time)
speed = pos / 1024 / 1024 / (time() - start_time)
stdout.write(f'\r=> Verifying large file "{file_path}": {perc:.0f}% '
f'({pos / 1024 / 1024:.1f}/{_size / 1024 / 1024:.1f} MiB) '
f'[{speed:.1f} MiB/s]\t')
@ -156,45 +153,3 @@ def clean_filename(filename):
def get_dir_size(path):
return sum(f.stat().st_size for f in Path(path).glob('**/*') if f.is_file())
class LockedJSONData(FileLock):
def __init__(self, file_path: str):
super().__init__(file_path + '.lock')
self._file_path = file_path
self._data = None
self._initial_data = None
def __enter__(self):
super().__enter__()
if os.path.exists(self._file_path):
with open(self._file_path, 'r', encoding='utf-8') as f:
self._data = json.load(f)
self._initial_data = self._data
return self
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
if self._data != self._initial_data:
if self._data is not None:
with open(self._file_path, 'w', encoding='utf-8') as f:
json.dump(self._data, f, indent=2, sort_keys=True)
else:
if os.path.exists(self._file_path):
os.remove(self._file_path)
@property
def data(self):
return self._data
@data.setter
def data(self, new_data):
if new_data is None:
raise ValueError('Invalid new data, use clear() explicitly to reset file data')
self._data = new_data
def clear(self):
self._data = None

View file

@ -0,0 +1,39 @@
from legendary.models.manifest import Manifest
def combine_manifests(base_manifest: Manifest, delta_manifest: Manifest):
added = set()
# overwrite file elements with the ones from the delta manifest
for idx, file_elem in enumerate(base_manifest.file_manifest_list.elements):
try:
delta_file = delta_manifest.file_manifest_list.get_file_by_path(file_elem.filename)
base_manifest.file_manifest_list.elements[idx] = delta_file
added.add(delta_file.filename)
except ValueError:
pass
# add other files that may be missing
for delta_file in delta_manifest.file_manifest_list.elements:
if delta_file.filename not in added:
base_manifest.file_manifest_list.elements.append(delta_file)
# update count and clear map
base_manifest.file_manifest_list.count = len(base_manifest.file_manifest_list.elements)
base_manifest.file_manifest_list._path_map = None
# ensure guid map exists
try:
base_manifest.chunk_data_list.get_chunk_by_guid(0)
except:
pass
# add new chunks from delta manifest to main manifest and again clear maps and update count
existing_chunk_guids = base_manifest.chunk_data_list._guid_int_map.keys()
for chunk in delta_manifest.chunk_data_list.elements:
if chunk.guid_num not in existing_chunk_guids:
base_manifest.chunk_data_list.elements.append(chunk)
base_manifest.chunk_data_list.count = len(base_manifest.chunk_data_list.elements)
base_manifest.chunk_data_list._guid_map = None
base_manifest.chunk_data_list._guid_int_map = None
base_manifest.chunk_data_list._path_map = None

View file

@ -22,14 +22,11 @@ def _filename_matches(filename, patterns):
"""
for pattern in patterns:
# Pattern is a directory, just check if path starts with it
if pattern.endswith('/') and filename.startswith(pattern):
return True
# Check if pattern is a suffix of filename
if filename.endswith(pattern):
return True
# Check if pattern with wildcards ('*') matches
if fnmatch(filename, pattern):
if pattern.endswith('/'):
# pat is a directory, check if path starts with it
if filename.startswith(pattern):
return True
elif fnmatch(filename, pattern):
return True
return False
@ -136,7 +133,7 @@ class SaveGameHelper:
self.log.warning(f'Got EOF for "{f.filename}" with {remaining} bytes remaining! '
f'File may have been corrupted/modified.')
break
cur_buffer.write(_tmp)
fhash.update(_tmp) # update sha1 hash with new data
f.chunk_parts.append(cp)
@ -170,21 +167,3 @@ class SaveGameHelper:
# return dict with created files for uploading/whatever
return self.files
def get_deletion_list(self, save_folder, include_filter=None, exclude_filter=None):
files = []
for _dir, _, _files in os.walk(save_folder):
for _file in _files:
_file_path = os.path.join(_dir, _file)
_file_path_rel = os.path.relpath(_file_path, save_folder).replace('\\', '/')
if include_filter and not _filename_matches(_file_path_rel, include_filter):
self.log.debug(f'Excluding "{_file_path_rel}" (does not match include filter)')
continue
elif exclude_filter and _filename_matches(_file_path_rel, exclude_filter):
self.log.debug(f'Excluding "{_file_path_rel}" (does match exclude filter)')
continue
files.append(_file_path_rel)
return files

View file

@ -22,7 +22,7 @@ except Exception as e:
login_url = 'https://www.epicgames.com/id/login'
sid_url = 'https://www.epicgames.com/id/api/redirect?'
logout_url = f'https://www.epicgames.com/id/logout?productName=epic-games&redirectUrl={login_url}'
logout_url = 'https://www.epicgames.com/id/logout?productName=epic-games&redirectUrl=' + login_url
goodbye_url = 'https://legendary.gl/goodbye'
window_js = '''
window.ue = {
@ -31,7 +31,11 @@ window.ue = {
registersignincompletecallback: pywebview.api.trigger_sid_exchange
},
common: {
launchexternalurl: pywebview.api.open_url_external
launchexternalurl: pywebview.api.open_url_external,
// not required, just needs to be non-null
auth: {
completeLogin: pywebview.api.nop
}
}
}
'''
@ -70,11 +74,9 @@ class MockLauncher:
if self.inject_js:
self.window.evaluate_js(window_js)
if 'logout' in url and self.callback_sid:
if 'logout' in url:
# prepare to close browser after logout redirect
self.destroy_on_load = True
elif 'logout' in url:
self.inject_js = True
def nop(self, *args, **kwargs):
return
@ -89,22 +91,21 @@ class MockLauncher:
# skip logging out on those platforms and directly use the exchange code we're given.
# On windows we have to do a little dance with the SID to create a session that
# remains valid after logging out in the embedded browser.
# Update: Epic broke SID login, we'll also do this on Windows now
# if self.window.gui.renderer in ('gtkwebkit2', 'qtwebengine', 'qtwebkit'):
self.destroy_on_load = True
try:
self.callback_result = self.callback_code(exchange_code)
except Exception as e:
logger.error(f'Logging in via exchange-code failed with {e!r}')
finally:
# We cannot destroy the browser from here,
# so we'll load a small goodbye site first.
self.window.load_url(goodbye_url)
if self.window.gui.renderer in ('gtkwebkit2', 'qtwebengine', 'qtwebkit'):
self.destroy_on_load = True
try:
self.callback_result = self.callback_code(exchange_code)
except Exception as e:
logger.error(f'Logging in via exchange-code failed with {e!r}')
finally:
# We cannot destroy the browser from here,
# so we'll load a small goodbye site first.
self.window.load_url(goodbye_url)
def trigger_sid_exchange(self, *args, **kwargs):
# check if code-based login hasn't already set the destroy flag
if not self.destroy_on_load:
logger.debug('Injecting SID JS')
logger.debug(f'Injecting SID JS')
# inject JS to get SID API response and call our API
self.window.evaluate_js(get_sid_js)
@ -124,32 +125,22 @@ class MockLauncher:
self.window.load_url(logout_url)
def do_webview_login(callback_sid=None, callback_code=None, user_agent=None):
def do_webview_login(callback_sid=None, callback_code=None):
api = MockLauncher(callback_sid=callback_sid, callback_code=callback_code)
url = login_url
if os.name == 'nt':
# On Windows we open the logout URL first to invalidate the current cookies (if any).
# Additionally, we have to disable JS injection for the first load, as otherwise the user
# will get an error for some reason.
url = logout_url
api.inject_js = False
logger.info('Opening Epic Games login window...')
# Open logout URL first to remove existing cookies, then redirect to login.
window = webview.create_window(f'Legendary {__version__} - Epic Games Account Login',
url=url, width=768, height=1024, js_api=api)
url=login_url, width=768, height=1024, js_api=api)
api.window = window
window.events.loaded += api.on_loaded
window.loaded += api.on_loaded
try:
webview.start(user_agent=user_agent)
webview.start()
except Exception as we:
logger.error(f'Running webview failed with {we!r}. If this error persists try the manual '
f'login process by adding --disable-webview to your command line.')
return None
if api.callback_result is None:
logger.error('Login aborted by user.')
logger.error(f'Login aborted by user.')
return api.callback_result

View file

@ -1,6 +1,5 @@
import logging
import winreg
import ctypes
_logger = logging.getLogger('WindowsHelpers')
@ -81,16 +80,3 @@ def set_registry_value(hive, key, value, data, reg_type=winreg.REG_SZ, use_32bit
except Exception as e:
_logger.debug(f'Setting "{key}":"{value}" to "{data}" failed with {repr(e)}')
winreg.CloseKey(k)
def double_clicked() -> bool:
# Thanks https://stackoverflow.com/a/55476145
# Load kernel32.dll
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
# Create an array to store the processes in. This doesn't actually need to
# be large enough to store the whole process list since GetConsoleProcessList()
# just returns the number of processes if the array is too small.
process_array = (ctypes.c_uint * 1)()
num_processes = kernel32.GetConsoleProcessList(process_array, 1)
return num_processes < 3

View file

@ -6,7 +6,7 @@ logger = logging.getLogger('WineHelpers')
def read_registry(wine_pfx):
reg = configparser.ConfigParser(comment_prefixes=(';', '#', '/', 'WINE'), allow_no_value=True, strict=False)
reg = configparser.ConfigParser(comment_prefixes=(';', '#', '/', 'WINE'), allow_no_value=True)
reg.optionxform = str
reg.read(os.path.join(wine_pfx, 'user.reg'))
return reg
@ -20,37 +20,6 @@ def get_shell_folders(registry, wine_pfx):
return folders
def case_insensitive_file_search(path: str) -> str:
"""
Similar to case_insensitive_path_search: Finds a file case-insensitively
Note that this *does* work on Windows, although it's rather pointless
"""
path_parts = os.path.normpath(path).split(os.sep)
# If path_parts[0] is empty, we're on Unix and thus start searching at /
if not path_parts[0]:
path_parts[0] = '/'
computed_path = path_parts[0]
for part in path_parts[1:]:
# If the computed directory does not exist, add all remaining parts as-is to at least return a valid path
# at the end
if not os.path.exists(computed_path):
computed_path = os.path.join(computed_path, part)
continue
# First try to find an exact match
actual_file_or_dirname = part if os.path.exists(os.path.join(computed_path, part)) else None
# If there is no case-sensitive match, find a case-insensitive one
if not actual_file_or_dirname:
actual_file_or_dirname = next((
x for x in os.listdir(computed_path)
if x.lower() == part.lower()
), part)
computed_path = os.path.join(computed_path, actual_file_or_dirname)
return computed_path
def case_insensitive_path_search(path):
"""
Attempts to find a path case-insensitively

View file

@ -1,2 +1 @@
requests<3.0
filelock

View file

@ -8,8 +8,8 @@ from setuptools import setup
from legendary import __version__ as legendary_version
if sys.version_info < (3, 9):
sys.exit('python 3.9 or higher is required for legendary')
if sys.version_info < (3, 8):
sys.exit('python 3.8 or higher is required for legendary')
with open("README.md", "r") as fh:
long_description_l = fh.readlines()
@ -37,8 +37,7 @@ setup(
install_requires=[
'requests<3.0',
'setuptools',
'wheel',
'filelock'
'wheel'
],
extras_require=dict(
webview=['pywebview>=3.4'],
@ -48,10 +47,11 @@ setup(
description='Free and open-source replacement for the Epic Games Launcher application',
long_description=long_description,
long_description_content_type="text/markdown",
python_requires='>=3.9',
python_requires='>=3.8',
classifiers=[
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Programming Language :: Python',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Operating System :: POSIX :: Linux',
'Operating System :: Microsoft',