diff --git a/README.md b/README.md new file mode 100644 index 0000000..9dcd1e0 --- /dev/null +++ b/README.md @@ -0,0 +1,112 @@ +# Legendary (Game Launcher) +### A free and open-source Epic Games Launcher replacement +[![Discord](https://discordapp.com/api/guilds/695233346627698689/widget.png?style=shield)](https://discord.gg/UJKBwPw) [![Twitter Follow](https://img.shields.io/twitter/follow/legendary_gl?label=Follow%20us%20for%20updates%21&style=social)](https://twitter.com/legendary_gl) + +Legendary (named after the next highest tier [in item rarity](https://wow.gamepedia.com/Quality)) is an open-source game launcher that can download and install games from the Epic Games Store on Linux and Windows. + +Right now it is in an early public pre-release stage and still needs a lot of work to work. But it does work! + +**Currently implemented:** + - Authenticate with Epic (can import authentication from EGS installation [Windows only]) + - Download and install games + - Update installed games (not well tested/potentially buggy) + - Launch games with online authentication + +**Planned:** + - Better Linux and WINE support + - Importing installed games from the EGS launcher + - PyPI distribution + - Miscellaneous optimizations + - Simple GUI for managing/launching games + - Lots and lots of bug fixes and refactoring... + +## Requirements + +- python 3.8+ +- requests + +## How to install + +- Windows (standalone): Download the latest EXE from [GitHub](https://github.com/derrod/legendary/releases/latest) +- Linux/Windows (requires setuptools to be installed): `python3.8 setup.py install` + +A PyPI package will follow once it has gotten more testing. + +The Windows .exe was created with PyInstaller and will run standalone without python being installed. + +## Usage + +```` +usage: legendary [-h] (--auth | --download | --install | --update | --uninstall | --launch | --list-games | --list-installed) [-v] [--import] [--base-path ] [--max-shared-memory ] [--max-workers ] [--manifest ] [--base-url ] [--force] + [--disable-patching] [--offline] [--skip-version-check] [--override-username ] [--dry-run] [--check-updates] + +Legendary (Game Launcher) + +optional arguments: + -h, --help show this help message and exit + --auth Authenticate Legendary with your account + --download Download a game's files + --install Download and install a game + --update Update a game (alias for --install) + --uninstall Remove a game + --launch Launch game + --list-games List available games + --list-installed List installed games + -v Set loglevel to debug + +Authentication options: + --import Import EGS authentication data + +Downloading options: + --base-path Path for game installations (defaults to ~/legendary) + --max-shared-memory + Maximum amount of shared memory to use (in MiB), default: 1 GiB + --max-workers Maximum amount of download workers, default: 2 * logical CPU + --manifest Manifest URL or path to use instead of the CDN one (e.g. for downgrading) + --base-url Base URL to download from (e.g. to test or switch to a different CDNs) + --force Ignore existing files (overwrite) + +Installation options: + --disable-patching Do not attempt to patch existing installations (download full game) + +Game launch options: + Note: any additional arguments will be passed to the game. + + --offline Skip login and launch game without online authentication + --skip-version-check Skip version check when launching game in online mode + --override-username + Override username used when launching the game (only works with some titles) + --dry-run Print the command line that would have been used to launch the game and exit + +Listing options: + --check-updates Check for updates when listing installed games +```` + + +## Config file + +Legendary supports some options as well as game specific configuration in `~/.config/legendary/config.ini`: +````ini +[Legendary] +log_level = debug +; maximum shared memory (in MiB) to use for installation +max_memory = 1024 +; default install directory +install_dir = /mnt/tank/games + +[AppName] +; launch game without online authentication by default +offline = true +; Skip checking for updates when launching this game +skip_update_check = true +; start parameters to use (in addition to the required ones) +start_params = -windowed +; (linux) specify wine executable to use +wine_executable = wine + +[AppName.env] +; environment variables to set for this game (mostly useful on linux) +WINEPREFIX = /home/user/legendary/Game/.wine +DXVK_CONFIG_FILE = /home/user/legendary/Game/dxvk.conf +```` + diff --git a/legendary/__init__.py b/legendary/__init__.py new file mode 100644 index 0000000..eab1ff5 --- /dev/null +++ b/legendary/__init__.py @@ -0,0 +1,3 @@ +"""Legendary!""" + +__version__ = '0.0.1' diff --git a/legendary/api/__init__.py b/legendary/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/legendary/api/egs.py b/legendary/api/egs.py new file mode 100644 index 0000000..79e4117 --- /dev/null +++ b/legendary/api/egs.py @@ -0,0 +1,99 @@ +# !/usr/bin/env python +# coding: utf-8 + +import requests +import logging + +from requests.auth import HTTPBasicAuth + +from legendary.models.exceptions import InvalidCredentialsError + + +class EPCAPI: + _user_agent = 'UELauncher/10.13.1-11497744+++Portal+Release-Live Windows/10.0.18363.1.256.64bit' + # required for the oauth request + _user_basic = '34a02cf8f4414e29b15921876da36f9a' + _pw_basic = 'daafbccc737745039dffe53d94fc76cf' + + _oauth_host = 'account-public-service-prod03.ol.epicgames.com' + _launcher_host = 'launcher-public-service-prod06.ol.epicgames.com' + _entitlements_host = 'entitlement-public-service-prod08.ol.epicgames.com' + _catalog_host = 'catalog-public-service-prod06.ol.epicgames.com' + + def __init__(self): + self.session = requests.session() + self.log = logging.getLogger('EPCAPI') + self.unauth_session = requests.session() + self.session.headers['User-Agent'] = self._user_agent + self.unauth_session.headers['User-Agent'] = self._user_agent + self._oauth_basic = HTTPBasicAuth(self._user_basic, self._pw_basic) + + self.access_token = None + self.user = None + + def resume_session(self, session): + self.user = session + self.session.headers['Authorization'] = f'bearer {self.user["access_token"]}' + return self.user + + def start_session(self, refresh_token: str = None, exchange_token: str = None) -> dict: + if refresh_token: + params = dict(grant_type='refresh_token', + refresh_token=refresh_token, + token_type='eg1') + elif exchange_token: + params = dict(grant_type='exchange_code', + exchange_code=exchange_token, + token_type='eg1') + else: + raise ValueError('At least one token type must be specified!') + + r = self.session.post(f'https://{self._oauth_host}/account/api/oauth/token', + data=params, auth=self._oauth_basic) + # Only raise HTTP exceptions on server errors + if r.status_code >= 500: + r.raise_for_status() + + j = r.json() + if 'error' in j: + self.log.warning(f'Login to EGS API failed with errorCode: {j["errorCode"]}') + raise InvalidCredentialsError(j['errorCode']) + + self.user = j + self.session.headers['Authorization'] = f'bearer {self.user["access_token"]}' + return self.user + + def invalidate_session(self): # unused + r = self.session.delete(f'https://{self._oauth_host}/account/api/oauth/sessions/kill/{self.access_token}') + + def get_game_token(self): + r = self.session.get(f'https://{self._oauth_host}/account/api/oauth/exchange') + r.raise_for_status() + return r.json() + + def get_game_assets(self): + r = self.session.get(f'https://{self._launcher_host}/launcher/api/public/assets/Windows', + params=dict(label='Live')) + r.raise_for_status() + return r.json() + + def get_game_manifest(self, namespace, catalog_item_id, app_name): + r = self.session.get(f'https://{self._launcher_host}/launcher/api/public/assets/v2/platform' + f'/Windows/namespace/{namespace}/catalogItem/{catalog_item_id}/app' + f'/{app_name}/label/Live') + r.raise_for_status() + return r.json() + + def get_user_entitlements(self): + user_id = self.user.get('account_id') + r = self.session.get(f'https://{self._entitlements_host}/entitlement/api/account/{user_id}/entitlements', + params=dict(start=0, count=5000)) + r.raise_for_status() + return r.json() + + def get_game_info(self, namespace, catalog_item_id): + r = self.session.get(f'https://{self._catalog_host}/catalog/api/shared/namespace/{namespace}/bulk/items', + params=dict(id=catalog_item_id, includeDLCDetails=True, includeMainGameDetails=True, + country='US', locale='en')) + r.raise_for_status() + return r.json().get(catalog_item_id, None) diff --git a/legendary/cli.py b/legendary/cli.py new file mode 100644 index 0000000..54492da --- /dev/null +++ b/legendary/cli.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python +# coding: utf-8 + +import argparse +import logging +import multiprocessing +import os +import shlex +import subprocess +import time +import webbrowser + +from sys import exit + +from legendary.core import LegendaryCore +from legendary.models.exceptions import InvalidCredentialsError + +logging.basicConfig( + format='[%(asctime)s] [%(name)s] %(levelname)s: %(message)s', + level=logging.INFO +) +logger = logging.getLogger('cli') + + +# todo refactor this + +def main(): + parser = argparse.ArgumentParser(description='Legendary (Game Launcher)') + + group = parser.add_mutually_exclusive_group() + group.required = True + group.title = 'Commands' + group.add_argument('--auth', dest='auth', action='store_true', + help='Authenticate Legendary with your account') + group.add_argument('--download', dest='download', action='store', + help='Download a game\'s files', metavar='') + group.add_argument('--install', dest='install', action='store', + help='Download and install a game', metavar='') + group.add_argument('--update', dest='update', action='store', + help='Update a game (alias for --install)', metavar='') + group.add_argument('--uninstall', dest='uninstall', action='store', + help='Remove a game', metavar='') + group.add_argument('--launch', dest='launch', action='store', + help='Launch game', metavar='') + group.add_argument('--list-games', dest='list_games', action='store_true', + help='List available games') + group.add_argument('--list-installed', dest='list_installed', action='store_true', + help='List installed games') + + # general arguments + parser.add_argument('-v', dest='debug', action='store_true', help='Set loglevel to debug') + + # arguments for the different commands + if os.name == 'nt': + auth_group = parser.add_argument_group('Authentication options') + # auth options + auth_group.add_argument('--import', dest='import_egs_auth', action='store_true', + help='Import EGS authentication data') + + download_group = parser.add_argument_group('Downloading options') + download_group.add_argument('--base-path', dest='base_path', action='store', metavar='', + help='Path for game installations (defaults to ~/legendary)') + download_group.add_argument('--max-shared-memory', dest='shared_memory', action='store', metavar='', + type=int, help='Maximum amount of shared memory to use (in MiB), default: 1 GiB') + download_group.add_argument('--max-workers', dest='max_workers', action='store', metavar='', + type=int, help='Maximum amount of download workers, default: 2 * logical CPU') + download_group.add_argument('--manifest', dest='override_manifest', action='store', metavar='', + help='Manifest URL or path to use instead of the CDN one (e.g. for downgrading)') + download_group.add_argument('--base-url', dest='override_base_url', action='store', metavar='', + help='Base URL to download from (e.g. to test or switch to a different CDNs)') + download_group.add_argument('--force', dest='force', action='store_true', + help='Ignore existing files (overwrite)') + + install_group = parser.add_argument_group('Installation options') + install_group.add_argument('--disable-patching', dest='disable_patching', action='store_true', + help='Do not attempt to patch existing installations (download full game)') + + launch_group = parser.add_argument_group('Game launch options', + description='Note: any additional arguments will be passed to the game.') + launch_group.add_argument('--offline', dest='offline', action='store_true', + default=False, help='Skip login and launch game without online authentication') + launch_group.add_argument('--skip-version-check', dest='skip_version_check', action='store_true', + default=False, help='Skip version check when launching game in online mode') + launch_group.add_argument('--override-username', dest='user_name_override', action='store', metavar='', + help='Override username used when launching the game (only works with some titles)') + launch_group.add_argument('--dry-run', dest='dry_run', action='store_true', + help='Print the command line that would have been used to launch the game and exit') + + list_group = parser.add_argument_group('Listing options') + list_group.add_argument('--check-updates', dest='check_updates', action='store_true', + help='Check for updates when listing installed games') + + args, extra = parser.parse_known_args() + core = LegendaryCore() + + config_ll = core.lgd.config.get('Legendary', 'log_level', fallback='info') + if config_ll == 'debug' or args.debug: + logging.getLogger().setLevel(level=logging.DEBUG) + # keep requests quiet + logging.getLogger('requests').setLevel(logging.WARNING) + logging.getLogger('urllib3').setLevel(logging.WARNING) + + if args.auth: + try: + logger.info('Testing existing login data if present...') + if core.login(): + logger.info('Stored credentials are still valid, if you wish to switch to a different' + 'account, delete ~/.config/legendary/user.json and try again.') + exit(0) + except ValueError: + pass + except InvalidCredentialsError: + logger.error('Stored credentials were found but were no longer valid. Continuing with login...') + core.lgd.invalidate_userdata() + + if os.name == 'nt' and args.import_egs_auth: + logger.info('Importing login session from the Epic Launcher...') + try: + if core.auth_import(): + logger.info('Successfully imported login session from EGS!') + logger.info(f'Now logged in as user "{core.lgd.userdata["displayName"]}"') + exit(0) + else: + logger.warning('Login session from EGS seems to no longer be valid.') + exit(1) + except ValueError: + logger.error('No EGS login session found, please login normally.') + exit(1) + + # unfortunately the captcha stuff makes a complete CLI login flow kinda impossible right now... + print('Please login via the epic web login!') + webbrowser.open('https://www.epicgames.com/id/login') + print('If web page did not open automatically, please navigate ' + 'to https://www.epicgames.com/id/login in your web browser') + _ = input('Once you\'re logged in press [Enter] to continue.') + + # after logging in we need the user to copy a code from a JSON response, less than ideal :/ + webbrowser.open('https://www.epicgames.com/id/api/exchange') + print('If second web page did not open automatically, please navigate ' + 'to https://www.epicgames.com/id/api/exchange in your web browser') + exchange_code = input('Please enter code from response: ') + exchange_token = exchange_code.strip().strip('"') + + if core.auth_code(exchange_token): + logger.info(f'Successfully logged in as "{core.lgd.userdata["displayName"]}"') + else: + logger.error('Login attempt failed, please see log for details.') + + elif args.list_games: + logger.info('Logging in...') + if not core.login(): + logger.error('Login failed, cannot continue!') + exit(1) + logger.info('Getting game list...') + games = core.get_game_list() + + print('\nAvailable games:') + for game in sorted(games, key=lambda x: x.app_title): + print(f' * {game.app_title} (App name: {game.app_name}, version: {game.app_version})') + + print(f'\nTotal: {len(games)}') + + elif args.list_installed: + games = core.get_installed_list() + + if args.check_updates: + logger.info('Logging in to check for updates...') + if not core.login(): + logger.error('Login failed! Not checking for updates.') + else: + core.get_assets(True) + + print('\nInstalled games:') + for game in sorted(games, key=lambda x: x.title): + print(f' * {game.title} (App name: {game.app_name}, version: {game.version})') + game_asset = core.get_asset(game.app_name) + if game_asset.build_version != game.version: + print(f' -> Update available! Installed: {game.version}, Latest: {game_asset.build_version}') + + print(f'\nTotal: {len(games)}') + + elif args.launch: + app_name = args.launch.strip() + if not core.is_installed(app_name): + logger.error(f'Game {app_name} is not currently installed!') + exit(1) + + if not args.offline and not core.is_offline_game(app_name): + logger.info('Logging in...') + if not core.login(): + logger.error('Login failed, cannot continue!') + exit(1) + + if not args.skip_version_check and not core.is_noupdate_game(app_name): + logger.info('Checking for updates...') + installed = core.lgd.get_installed_game(app_name) + latest = core.get_asset(app_name, update=True) + if latest.build_version != installed.version: + logger.error('Game is out of date, please update or launch with update check skipping!') + exit(1) + + params, cwd, env = core.get_launch_parameters(app_name=app_name, offline=args.offline, + extra_args=extra, user=args.user_name_override) + + logger.info(f'Launching {app_name}...') + if args.dry_run: + logger.info(f'Launch parameters: {shlex.join(params)}') + logger.info(f'Working directory: {cwd}') + if env: + logger.info('Environment overrides:', env) + else: + logger.debug(f'Launch parameters: {shlex.join(params)}') + logger.debug(f'Working directory: {cwd}') + if env: + logger.debug('Environment overrides:', env) + + subprocess.Popen(params, cwd=cwd, env=env) + + elif args.download or args.install or args.update: + if not core.login(): + logger.error('Login failed! Cannot continue with download process.') + exit(1) + + target_app = next(i for i in (args.install, args.update, args.download) if i) + if args.update: + if not core.get_installed_game(target_app): + logger.error(f'Update requested for "{target_app}", but app not installed!') + exit(1) + + game = core.get_game(target_app, update_meta=True) + + if not game: + logger.fatal(f'Could not find "{target_app}" in list of available games, did you type the name correctly?') + exit(1) + + # todo use status queue to print progress from CLI + dlm, analysis, igame = core.prepare_download(game=game, base_path=args.base_path, force=args.force, + max_shm=args.shared_memory, max_workers=args.max_workers, + disable_patching=args.disable_patching, + override_manifest=args.override_manifest, + override_base_url=args.override_base_url) + + # game is either up to date or hasn't changed, so we have nothing to do + if not analysis.dl_size: + logger.info('Download size is 0, the game is either already up to date or has not changed. Exiting...') + # if game is downloaded but not "installed", "install" it now (todo handle postinstall as well) + if args.install: + core.install_game(igame) + exit(0) + + logger.info(f'Install size: {analysis.install_size / 1024 / 1024:.02f} MiB') + compression = (1 - (analysis.dl_size / analysis.uncompressed_dl_size)) * 100 + logger.info(f'Download size: {analysis.dl_size / 1024 / 1024:.02f} MiB ' + f'(Compression savings: {compression:.01f}%)') + logger.info(f'Reusable size: {analysis.reuse_size / 1024 / 1024:.02f} MiB (chunks) / ' + f'{analysis.unchanged / 1024 / 1024:.02f} MiB (unchanged)') + + res = core.check_installation_conditions(analysis=analysis, install=igame) + + if res.failures: + logger.fatal('Download cannot proceed, the following errors occured:') + for msg in sorted(res.failures): + logger.fatal(msg) + exit(1) + + if res.warnings: + logger.warning('Installation requirements check returned the following warnings:') + for warn in sorted(res.warnings): + logger.warning(warn) + + _ = input('Do you wish to proceed? [Press Enter]') + start_t = time.time() + + try: + dlm.start() + dlm.join() + except Exception as e: + end_t = time.time() + logger.info(f'Installation failed after {end_t - start_t:.02f} seconds.') + logger.warning(f'The following exception occured while waiting for the donlowader to finish: {e!r}. ' + f'Try restarting the process, the resume file will be used to start where it failed. ' + f'If it continues to fail please open an issue on GitHub.') + else: + end_t = time.time() + if args.install or args.update: + postinstall = core.install_game(igame) + if postinstall: + logger.info('This game lists the following prequisites to be installed:') + logger.info(f'{postinstall["name"]}: {" ".join((postinstall["path"], postinstall["args"]))}') + if os.name == 'nt': + choice = input('Do you wish to install the prerequisites? ([y]es, [n]o, [i]gnore): ') + c = choice.lower()[0] + if c == 'i': + core.prereq_installed(igame.app_name) + elif c == 'y': + req_path, req_exec = os.path.split(postinstall['path']) + work_dir = os.path.join(igame.install_path, req_path) + fullpath = os.path.join(work_dir, req_exec) + subprocess.Popen([fullpath, postinstall['args']], cwd=work_dir) + else: + logger.info('Automatic installation not available on Linux.') + + logger.info(f'Finished installation process in {end_t - start_t:.02f} seconds.') + + elif args.uninstall: + target_app = args.uninstall + igame = core.get_installed_game(target_app) + if not igame: + logger.error(f'Game {target_app} not installed, cannot uninstall!') + + try: + logger.info(f'Removing "{igame.title}" from "{igame.install_path}"...') + core.uninstall_game(igame) + logger.info('Game has been uninstalled.') + except Exception as e: + logger.warning(f'Removing game failed: {e!r}, please remove {igame.install_path} manually.') + + core.exit() + exit(0) + + +if __name__ == '__main__': + multiprocessing.freeze_support() + main() diff --git a/legendary/core.py b/legendary/core.py new file mode 100644 index 0000000..593a85e --- /dev/null +++ b/legendary/core.py @@ -0,0 +1,393 @@ +#!/usr/bin/env python +# coding: utf-8 + +import json +import logging +import os +import shlex +import shutil + +from base64 import b64decode +from datetime import datetime +from random import choice as randchoice +from requests.exceptions import HTTPError +from typing import List + +from legendary.api.egs import EPCAPI +from legendary.downloader.manager import DLManager +from legendary.lfs.egl import EPCLFS +from legendary.lfs.lgndry import LGDLFS +from legendary.lfs.utils import clean_filename, delete_folder +from legendary.models.downloading import AnalysisResult, ConditionCheckResult +from legendary.models.exceptions import * +from legendary.models.game import * +from legendary.models.json_manifest import JSONManifest +from legendary.models.manifest import Manifest, ManifestMeta + + +# ToDo: instead of true/false return values for success/failure actually raise an exception that the CLI/GUI +# can handle to give the user more details. (Not required yet since there's no GUI so log output is fine) + + +class LegendaryCore: + """ + LegendaryCore handles most of the lower level interaction with + the downloader, lfs, and api components to make writing CLI/GUI + code easier and cleaner and avoid duplication. + """ + + def __init__(self): + self.log = logging.getLogger('Core') + self.egs = EPCAPI() + self.lgd = LGDLFS() + + # epic lfs only works on Windows right now + if os.name == 'nt': + self.egl = EPCLFS() + else: + self.egl = None + + def auth(self, username, password): + """ + Attempts direct non-web login, raises CaptchaError if manual login is required + + :param username: + :param password: + :return: + """ + raise NotImplementedError + + def auth_code(self, code) -> bool: + """ + Handles authentication via exchange code (either retrieved manually or automatically) + """ + try: + self.lgd.userdata = self.egs.start_session(exchange_token=code) + return True + except Exception as e: + self.log.error(f'Logging in failed with {e!r}, please try again.') + return False + + def auth_import(self) -> bool: + """Import refresh token from EGL installation and use it for logging in""" + self.egl.read_config() + remember_me_data = self.egl.config.get('RememberMe', 'Data') + re_data = json.loads(b64decode(remember_me_data))[0] + if 'Token' not in re_data: + raise ValueError('No login session in config') + refresh_token = re_data['Token'] + try: + self.lgd.userdata = self.egs.start_session(refresh_token=refresh_token) + return True + except Exception as e: + self.log.error(f'Logging in failed with {e!r}, please try again.') + return False + + def login(self) -> bool: + """ + Attempts logging in with existing credentials. + + raises ValueError if no existing credentials or InvalidCredentialsError if the API return an error + """ + if not self.lgd.userdata: + raise ValueError('No saved credentials') + + if self.lgd.userdata['expires_at']: + dt_old = datetime.fromisoformat(self.lgd.userdata['expires_at'][:-1]) + dt_now = datetime.utcnow() + td = dt_now - dt_old + + # if session still has at least 10 minutes left we can re-use it. + if td.total_seconds() < (self.lgd.userdata['expires_in'] - 600): + self.log.debug('Reusing existing login session...') + self.egs.resume_session(self.lgd.userdata) + return True + + try: + userdata = self.egs.start_session(self.lgd.userdata['refresh_token']) + except InvalidCredentialsError: + self.log.error('Stored credentials are no longer valid! Please login again.') + self.lgd.invalidate_userdata() + return False + except HTTPError as e: + self.log.error(f'HTTP request for login failed: {e!r}, please try again later.') + return False + + self.lgd.userdata = userdata + return True + + def get_assets(self, update_assets=False) -> List[GameAsset]: + if not self.lgd.assets or update_assets: + self.lgd.assets = [GameAsset.from_egs_json(a) for a in self.egs.get_game_assets()] + + return self.lgd.assets + + def get_asset(self, app_name, update=False) -> GameAsset: + if update: + self.get_assets(update_assets=True) + + return next(i for i in self.lgd.assets if i.app_name == app_name) + + def get_game(self, app_name, update_meta=False) -> Game: + if update_meta: + self.get_game_list(True) + return self.lgd.get_game_meta(app_name) + + def get_game_list(self, update_assets=True) -> List[Game]: + _ret = [] + + for ga in self.get_assets(update_assets=update_assets): + if ga.namespace == 'ue': # skip UE demo content + continue + + game = self.lgd.get_game_meta(ga.app_name) + if not game or (game and game.app_version != ga.build_version): + if game and game.app_version != ga.build_version: + self.log.info(f'Updating meta for {game.app_name} due to build version mismatch') + + eg_meta = self.egs.get_game_info(ga.namespace, ga.catalog_item_id) + game = Game(app_name=ga.app_name, app_version=ga.build_version, + app_title=eg_meta['title'], asset_info=ga, metadata=eg_meta) + self.lgd.set_game_meta(game.app_name, game) + _ret.append(game) + + return _ret + + def get_installed_list(self) -> List[InstalledGame]: + return self.lgd.get_installed_list() + + def get_installed_game(self, app_name) -> InstalledGame: + return self.lgd.get_installed_game(app_name) + + def get_launch_parameters(self, app_name: str, offline: bool = False, + user: str = None, extra_args: list = None) -> (list, str, dict): + install = self.lgd.get_installed_game(app_name) + + game_token = '' + if not offline: + self.log.info('Getting authentication token...') + game_token = self.egs.get_game_token()['code'] + + user_name = self.lgd.userdata['displayName'] + account_id = self.lgd.userdata['account_id'] + if user: + user_name = user + + game_exe = os.path.join(install.install_path, install.executable) + working_dir = os.path.split(game_exe)[0] + + params = [] + + if os.name != 'nt': + params.append(self.lgd.config.get(app_name, 'wine_executable', fallback='wine')) + + params.append(game_exe) + + if install.launch_parameters: + params.extend(shlex.split(install.launch_parameters)) + + params.extend([ + '-AUTH_LOGIN=unused', + f'-AUTH_PASSWORD={game_token}', + '-AUTH_TYPE=exchangecode', + f'-epicapp={app_name}', + '-epicenv=Prod', + '-EpicPortal', + f'-epicusername={user_name}', + f'-epicuserid={account_id}', + '-epiclocale=en' + ]) + + if extra_args: + params.extend(extra_args) + + if config_args := self.lgd.config.get(app_name, 'start_params', fallback=None): + params.extend(shlex.split(config_args.strip())) + + # get environment overrides from config + env = None + if f'{app_name}.env' in self.lgd.config: + env = dict(self.lgd.config[f'{app_name}.env']) + + return params, working_dir, env + + def is_offline_game(self, app_name: str) -> bool: + return self.lgd.config.getboolean(app_name, 'offline', fallback=False) + + def is_noupdate_game(self, app_name: str) -> bool: + return self.lgd.config.getboolean(app_name, 'skip_update_check', fallback=False) + + def is_latest(self, app_name: str) -> bool: + installed = self.lgd.get_installed_game(app_name) + + for ass in self.get_assets(True): + if ass.app_name == app_name: + if ass.build_version != installed.version: + return False + else: + return True + # if we get here something is very wrong + raise ValueError(f'Could not find {app_name} in asset list!') + + def is_installed(self, app_name: str) -> bool: + return self.lgd.get_installed_game(app_name) is not None + + @staticmethod + def load_manfiest(data: bytes) -> Manifest: + if data[0:1] == b'{': + return JSONManifest.read_all(data) + else: + return Manifest.read_all(data) + + def prepare_download(self, game: Game, base_path: str = '', + max_shm: int = 0, max_workers: int = 0, force: bool = False, + disable_patching: bool = False, override_manifest: str = '', + override_base_url: str = '') -> (DLManager, AnalysisResult, ManifestMeta): + + # load old manifest + old_manifest = None + new_manifest_data = b'' + + # load old manifest if we have one + if not disable_patching and not force and self.is_installed(game.app_name): + if old_bytes := self.lgd.get_manifest(game.app_name): + old_manifest = self.load_manfiest(old_bytes) + + base_urls = list(game.base_urls) # copy list for manipulation + + if override_manifest: + if override_manifest.startswith('http'): + r = self.egs.unauth_session.get(override_manifest) + r.raise_for_status() + new_manifest_data = r.content + base_urls = [r.url.rpartition('/')[0]] + else: + with open(override_manifest, 'rb') as f: + new_manifest_data = f.read() + else: + # get latest manifest from API + m_api_r = self.egs.get_game_manifest(game.asset_info.namespace, + game.asset_info.catalog_item_id, + game.app_name) + + # never seen this outside the launcher itself, but if it happens: PANIC! + if len(m_api_r['elements']) > 1: + raise ValueError('Manifest response has more than one element!') + + manifest_info = m_api_r['elements'][0] + for manifest in manifest_info['manifests']: + base_url = manifest['uri'].rpartition('/')[0] + if base_url not in base_urls: + base_urls.append(base_url) + + if 'queryParams' in manifest: + continue + + self.log.debug(f'Downloading manifest from {manifest["uri"]} ...') + r = self.egs.unauth_session.get(manifest['uri']) + r.raise_for_status() + new_manifest_data = r.content + + if override_base_url: + base_urls = [override_base_url] + + self.log.debug(f'Base urls: {base_urls}') + new_manifest = self.load_manfiest(new_manifest_data) + self.lgd.save_manifest(game.app_name, new_manifest_data) + # save manifest with version name in "old" folder as well for testing/downgrading/etc. + self.lgd.save_manifest(game.app_name, new_manifest_data, + filename=f'old/{game.app_name}_{new_manifest.meta.build_version}') + + if not base_path: + base_path = self.get_default_install_dir() + + install_path = os.path.join( + base_path, + game.metadata.get('customAttributes', {}).get('FolderName', {}).get('value', game.app_name) + ) + if not os.path.exists(install_path): + os.makedirs(install_path) + + self.log.info(f'Install path: {install_path}') + + if not force: + filename = clean_filename(f'{game.app_name}_{new_manifest.meta.build_version}.resume') + resume_file = os.path.join(self.lgd.get_tmp_path(), filename) + else: + resume_file = None + + # randomly select one CDN + base_url = randchoice(base_urls) + self.log.debug(f'Using base URL: {base_url}') + + if not max_shm: + max_shm = self.lgd.config.getint('Legendary', 'max_memory', fallback=1024) + + dlm = DLManager(install_path, base_url, resume_file=resume_file, + max_shared_memory=max_shm * 1024 * 1024, max_workers=max_workers) + anlres = dlm.run_analysis(manifest=new_manifest, old_manifest=old_manifest, + patch=not disable_patching, resume=not force) + + prereq = None + if new_manifest.meta.prereq_ids: + prereq = dict(ids=new_manifest.meta.prereq_ids, name=new_manifest.meta.prereq_name, + path=new_manifest.meta.prereq_path, args=new_manifest.meta.prereq_args) + + igame = InstalledGame(app_name=game.app_name, title=game.app_title, version=game.app_version, + prereq_info=prereq, manifest_path=override_manifest, base_urls=base_urls, + install_path=install_path, executable=new_manifest.meta.launch_exe, + launch_parameters=new_manifest.meta.launch_command) + + return dlm, anlres, igame + + @staticmethod + def check_installation_conditions(analysis: AnalysisResult, install: InstalledGame) -> ConditionCheckResult: + # ToDo add more checks in the future + results = ConditionCheckResult(failures=list(), warnings=list()) + + # if on linux, check for eac in the files + if os.name != 'nt': + for f in analysis.manifest_comparison.added: + if 'easyanticheat' in f.lower(): + results.warnings.append('(Linux) The game uses EasyAntiCheat and may not run on linux') + break + + # check if enough disk space is free (dl size is the approximate amount the installation will grow) + min_disk_space = analysis.uncompressed_dl_size + analysis.biggest_file_size + _, _, free = shutil.disk_usage(install.install_path) + if free < min_disk_space: + free_mib = free / 1024 / 1024 + required_mib = min_disk_space / 1024 / 1024 + results.failures.append(f'Not enough available disk space! {free_mib:.02f} MiB < {required_mib:.02f} MiB') + + return results + + def get_default_install_dir(self): + return self.lgd.config.get('Legendary', 'install_dir', fallback=os.path.expanduser('~/legendary')) + + def install_game(self, installed_game: InstalledGame) -> dict: # todo class for result? + """Save game metadata and info to mark it "installed" and also show the user the prerequisites""" + self.lgd.set_installed_game(installed_game.app_name, installed_game) + if installed_game.prereq_info: + if not installed_game.prereq_info.get('installed', False): + return installed_game.prereq_info + + return dict() + + def uninstall_game(self, installed_game: InstalledGame, delete_files=True): + self.lgd.remove_installed_game(installed_game.app_name) + if delete_files: + delete_folder(installed_game.install_path, recursive=True) + + def prereq_installed(self, app_name): + igame = self.lgd.get_installed_game(app_name) + igame.prereq_info['installed'] = True + self.lgd.set_installed_game(app_name, igame) + + def exit(self): + """ + Do cleanup, config saving, and exit. + """ + # self.lgd.clean_tmp_data() + self.lgd.save_config() + diff --git a/legendary/downloader/__init__.py b/legendary/downloader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/legendary/downloader/manager.py b/legendary/downloader/manager.py new file mode 100644 index 0000000..fdac54b --- /dev/null +++ b/legendary/downloader/manager.py @@ -0,0 +1,606 @@ +#!/usr/bin/env python +# coding: utf-8 + +# please don't look at this code too hard, it's a mess. + +import logging +import os +import time + +from collections import Counter, defaultdict, deque +from multiprocessing import cpu_count, Process, Queue as MPQueue +from multiprocessing.shared_memory import SharedMemory +from queue import Empty +from sys import exit +from threading import Condition, Thread + +from legendary.downloader.workers import DLWorker, FileWorker +from legendary.models.downloading import * +from legendary.models.manifest import ManifestComparison, Manifest + + +class DLManager(Process): + def __init__(self, download_dir, base_url, cache_dir=None, status_q=None, + max_jobs=100, max_failures=5, max_workers=0, update_interval=1.0, + max_shared_memory=1024 * 1024 * 1024, resume_file=None): + super().__init__(name='DLManager') + self.log = logging.getLogger('DLManager') + self.log_level = self.log.level + + self.base_url = base_url + self.dl_dir = download_dir + self.cache_dir = cache_dir if cache_dir else os.path.join(download_dir, '.cache') + + # All the queues! + self.dl_worker_queue = None + self.writer_queue = None + self.dl_result_q = None + self.writer_result_q = None + self.max_jobs = max_jobs + self.max_workers = max_workers if max_workers else cpu_count() * 2 + + # Analysis stuff + self.analysis = None + self.tasks = deque() + self.dl_cache_guids = set() # guids that should be cached + self.chunks_to_dl = deque() + self.chunk_data_list = None + + # shared memory stuff + self.max_shared_memory = max_shared_memory # 1 GiB by default + self.sms = deque() + self.shared_memory = None + + # Interval for log updates and pushing updates to the queue + self.update_interval = update_interval + self.status_queue = status_q # queue used to relay status info back to GUI/CLI + + # behaviour settings + self.max_failures = max_failures + self.resume_file = resume_file + + # cross-thread runtime information + self.running = True + self.active_tasks = 0 + self.children = [] + # bytes downloaded and decompressed since last report + self.bytes_downloaded_since_last = 0 + self.bytes_decompressed_since_last = 0 + # bytes written since last report + self.bytes_written_since_last = 0 + # bytes read since last report + self.bytes_read_since_last = 0 + # chunks written since last report + self.num_processed_since_last = 0 + self.num_tasks_processed_since_last = 0 + + def download_job_manager(self, task_cond: Condition, shm_cond: Condition): + while self.chunks_to_dl and self.running: + while self.active_tasks < self.max_workers * 2 and self.chunks_to_dl: + try: + sms = self.sms.popleft() + no_shm = False + except IndexError: # no free cache + no_shm = True + break + + c_guid = self.chunks_to_dl.popleft() + chunk = self.chunk_data_list.get_chunk_by_guid(c_guid) + self.log.debug(f'Adding {chunk.guid_str} (active: {self.active_tasks})') + try: + self.dl_worker_queue.put(DownloaderTask(url=self.base_url + '/' + chunk.path, + chunk_guid=c_guid, shm=sms), + timeout=1.0) + except Exception as e: + self.log.warning(f'Failed to add to download queue: {e!r}') + self.chunks_to_dl.appendleft(c_guid) + break + + self.active_tasks += 1 + else: + # active tasks limit hit, wait for tasks to finish + with task_cond: + self.log.debug('Waiting for download tasks to complete..') + task_cond.wait(timeout=1.0) + continue + + if no_shm: + # if we break we ran out of shared memory, so wait for that. + with shm_cond: + self.log.debug('Waiting for more shared memory...') + shm_cond.wait(timeout=1.0) + + self.log.info('Download Job Manager quitting...') + + def dl_results_handler(self, task_cond: Condition): + in_buffer = dict() + + task = self.tasks.popleft() + current_file = '' + + while task and self.running: + if isinstance(task, FileTask): # this wasn't necessarily a good idea... + try: + if task.empty: + self.writer_queue.put(WriterTask(task.filename, empty=True), timeout=1.0) + elif task.rename: + self.writer_queue.put(WriterTask(task.filename, rename=True, + delete=task.delete, + old_filename=task.temporary_filename), + timeout=1.0) + elif task.delete: + self.writer_queue.put(WriterTask(task.filename, delete=True), timeout=1.0) + elif task.open: + self.writer_queue.put(WriterTask(task.filename, fopen=True), timeout=1.0) + current_file = task.filename + elif task.close: + self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0) + except Exception as e: + self.tasks.appendleft(task) + self.log.warning(f'Adding to queue failed: {e!r}') + continue + + try: + task = self.tasks.popleft() + except IndexError: # finished + break + continue + + while task.chunk_guid in in_buffer: + res = in_buffer[task.chunk_guid] + + try: + self.writer_queue.put(WriterTask( + filename=current_file, shared_memory=res.shm, + chunk_offset=task.chunk_offset, chunk_size=task.chunk_size, + chunk_guid=task.chunk_guid, release_memory=task.cleanup, + old_file=task.chunk_file # todo on-disk cache + ), timeout=1.0) + except Exception as e: + self.log.warning(f'Adding to queue failed: {e!r}') + break + + if task.cleanup: + del in_buffer[task.chunk_guid] + + try: + task = self.tasks.popleft() + if isinstance(task, FileTask): + break + except IndexError: # finished + task = None + break + else: # only enter blocking code if the loop did not break + try: + res = self.dl_result_q.get(timeout=1) + self.active_tasks -= 1 + with task_cond: + task_cond.notify() + + if res.success: + in_buffer[res.guid] = res + self.bytes_downloaded_since_last += res.compressed_size + self.bytes_decompressed_since_last += res.size + else: + self.log.error(f'Download for {res.guid} failed, retrying...') + try: + self.dl_worker_queue.put(DownloaderTask( + url=res.url, chunk_guid=res.guid, shm=res.shm + ), timeout=1.0) + self.active_tasks += 1 + except Exception as e: + self.log.warning(f'Failed adding retry task to queue! {e!r}') + # if no reserved memory, add to the beginning of the normal queue + self.chunks_to_dl.appendleft(res.chunk_guid) + except Empty: + pass + except Exception as e: + self.log.warning(f'Unhandled exception when trying to read download result queue: {e!r}') + + self.log.info('Download result handler quitting...') + + def fw_results_handler(self, shm_cond: Condition): + while self.running: + try: + res = self.writer_result_q.get(timeout=1.0) + self.num_tasks_processed_since_last += 1 + + if res.closed and self.resume_file: + # write last completed file to super simple resume file + with open(self.resume_file, 'ab') as rf: + rf.write(f'{res.filename}\n'.encode('utf-8')) + + if res.kill: + self.log.info('Got termination command in FW result handler') + break + + if not res.success: + # todo make this kill the installation process or at least skip the file and mark it as failed + self.log.fatal(f'Writing for {res.filename} failed!') + if res.release_memory: + self.sms.appendleft(res.shm) + with shm_cond: + shm_cond.notify() + + if res.chunk_guid: + self.bytes_written_since_last += res.size + self.num_processed_since_last += 1 + except Empty: + continue + except Exception as e: + self.log.warning(f'Exception when trying to read writer result queue: {e!r}') + self.log.info('Writer result handler quitting...') + + def run_analysis(self, manifest: Manifest, old_manifest: Manifest = None, + patch=True, resume=True) -> AnalysisResult: + """ + Run analysis on manifest and old manifest (if not None) and return a result + with a summary resources required in order to install the provided manifest. + + :param manifest: Manifest to install + :param old_manifest: Old manifest to patch from (if applicable) + :param patch: Patch instead of redownloading the entire file + :param resume: Continue based on resume file if it exists + :return: AnalysisResult + """ + + analysis_res = AnalysisResult() + analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements) + analysis_res.biggest_chunk = max(c.window_size for c in manifest.chunk_data_list.elements) + analysis_res.biggest_file_size = max(f.file_size for f in manifest.file_manifest_list.elements) + is_1mib = analysis_res.biggest_chunk == 1024 * 1024 + self.log.debug(f'Biggest chunk size: {analysis_res.biggest_chunk} bytes (== 1 MiB? {is_1mib})') + + self.log.debug(f'Creating manifest comparison...') + mc = ManifestComparison.create(manifest, old_manifest) + analysis_res.manifest_comparison = mc + + if resume and self.resume_file and os.path.exists(self.resume_file): + try: + completed_files = set(i.strip() for i in open(self.resume_file).readlines()) + # remove completed files from changed/added and move them to unchanged for the analysis. + mc.added -= completed_files + mc.changed -= completed_files + mc.unchanged |= completed_files + self.log.debug(f'Skipped {len(completed_files)} files based on resume data!') + except Exception as e: + self.log.warning(f'Reading resume file failed: {e!r}, continuing as normal...') + + if mc.removed: + analysis_res.removed = len(mc.removed) + self.log.debug(f'{analysis_res.removed} removed files') + if mc.added: + analysis_res.added = len(mc.added) + self.log.debug(f'{analysis_res.added} added files') + if mc.changed: + analysis_res.changed = len(mc.changed) + self.log.debug(f'{analysis_res.changed} changed files') + if mc.unchanged: + analysis_res.unchanged = len(mc.unchanged) + self.log.debug(f'{analysis_res.unchanged} unchanged files') + + references = Counter() + chunkstream_starts = list() + # Chunks can have multiple entire files in them, the deque for a guid contains all files that start + # in that chunk (sorted by offset) so we can quickly and easily find the next link in the chunkstream. + # A nice side effect is that we can use this to check whether or not we missed something in the process. + chunk_to_file_map = defaultdict(deque) + + # Find chunkstream starts and also count references to each chunk + # Note that this has to be sorted to ensure the file map will be in the correct order + self.log.debug('Looking for chunkstreams and counting references...') + for fm in sorted(manifest.file_manifest_list.elements, + key=lambda x: x.chunk_parts[0].offset if x.chunk_parts else 0): + if not fm.chunk_parts: + self.tasks.append(FileTask(fm.filename, empty=True)) + continue + + if fm.filename in mc.unchanged: + analysis_res.unchanged += fm.file_size + + for index, cp in enumerate(fm.chunk_parts): + if index == 0: + chunk_to_file_map[cp.guid_num].append(fm) + if cp.offset == 0: + self.log.debug(f'Found chunk stream start: {fm.filename}, {fm.chunk_parts[0]}') + chunkstream_starts.append(fm.chunk_parts[0]) + + # do not add references in case the file is unchanged and we do not need to download it anyway + if fm.filename not in mc.unchanged: + references[cp.guid_num] += 1 + + # determine reusable chunks and prepare lookup table for reusable ones + re_usable = defaultdict(dict) + if old_manifest and mc.changed and patch: + self.log.debug('Analyzing manifests for re-usable chunks...') + for changed in mc.changed: + old_file = old_manifest.file_manifest_list.get_file_by_path(changed) + new_file = manifest.file_manifest_list.get_file_by_path(changed) + + existing_chunks = dict() + off = 0 + for cp in old_file.chunk_parts: + existing_chunks[(cp.guid_num, cp.offset, cp.size)] = off + off += cp.size + + for cp in new_file.chunk_parts: + key = (cp.guid_num, cp.offset, cp.size) + if key in existing_chunks: + references[cp.guid_num] -= 1 + re_usable[changed][key] = existing_chunks[key] + analysis_res.reuse_size += cp.size + + last_cache_size = current_cache_size = 0 + cached = set() + # Using this secondary set is orders of magnitude faster than checking the deque. + chunks_in_dl_list = set() + + # run through the chunkstreams and create the download jobs, + # also determine minimum runtime cache requirement. + # Yeah this is a bit of a mess but still runs extremely + # quickly even with tens of thousands of files/chunks + self.log.debug('Creating filetasks and chunktasks...') + for next_chunk in chunkstream_starts: + self.log.debug(f'- Chunkstream start: {next_chunk!r}') + + while file_list := chunk_to_file_map.get(next_chunk.guid_num): + current_file = file_list.popleft() + + if len(file_list) == 0: + del chunk_to_file_map[next_chunk.guid_num] + + # skip unchanged files + if current_file.filename in mc.unchanged: + # self.log.debug(f' + Skipping unchanged file: {current_file.filename}') + next_chunk = current_file.chunk_parts[-1] + continue + + existing_chunks = re_usable.get(current_file.filename, None) + chunk_tasks = [] + reused = 0 + + for cp in current_file.chunk_parts: + ct = ChunkTask(cp.guid_num, cp.offset, cp.size) + + # re-use the chunk from the existing file if we can + if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks: + reused += 1 + ct.chunk_file = current_file.filename + ct.offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)] + else: + # add to DL list if not already in it + if cp.guid_num not in chunks_in_dl_list: + self.chunks_to_dl.append(cp.guid_num) + chunks_in_dl_list.add(cp.guid_num) + + # if chunk has more than one use or is already in cache, + # check if we need to add or remove it again. + if references[cp.guid_num] > 1 or cp.guid_num in cached: + references[cp.guid_num] -= 1 + + if references[cp.guid_num] < 1: # delete from cache again + current_cache_size -= analysis_res.biggest_chunk + cached.remove(cp.guid_num) + ct.cleanup = True + elif cp.guid_num not in cached: # add to cache + self.dl_cache_guids.add(cp.guid_num) + cached.add(cp.guid_num) + current_cache_size += analysis_res.biggest_chunk + else: + ct.cleanup = True + + chunk_tasks.append(ct) + + if reused: + self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') + self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True)) + else: + self.tasks.append(FileTask(current_file.filename, fopen=True)) + + self.tasks.extend(chunk_tasks) + + if reused: + self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) + self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, + temporary_filename=current_file.filename + u'.tmp')) + else: + self.tasks.append(FileTask(current_file.filename, close=True)) + + if current_cache_size > last_cache_size: + self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB') + last_cache_size = current_cache_size + + next_chunk = current_file.chunk_parts[-1] + + # If this is not empty something went horribly wrong. + if chunk_to_file_map: + raise ValueError('Some files were not processed:', chunk_to_file_map) + + self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.') + analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe + + # Todo implement on-disk caching to avoid this issue. + if analysis_res.min_memory > self.max_shared_memory: + shared_mib = f'{self.max_shared_memory / 1024 / 1024:.01f} MiB' + required_mib = f'{analysis_res.min_memory / 1024 / 1024:.01} MiB' + raise MemoryError(f'Current shared memory cache is smaller than required! {shared_mib} < {required_mib}') + + # calculate actual dl and patch write size. + analysis_res.dl_size = \ + sum(c.file_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list) + analysis_res.uncompressed_dl_size = \ + sum(c.window_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list) + + # add jobs to remove files + for fname in mc.removed: + self.tasks.append(FileTask(fname, delete=True)) + + analysis_res.num_chunks_cache = len(self.dl_cache_guids) + self.chunk_data_list = manifest.chunk_data_list + self.analysis = analysis_res + + return analysis_res + + def run(self): + if not self.analysis: + raise ValueError('Did not run analysis before trying to run download!') + + # fix loglevel in subprocess + self.log.setLevel(self.log_level) + + try: + self.run_real() + except KeyboardInterrupt: + self.log.warning('Immediate exit requested!') + self.running = False + for proc in self.children: + try: + proc.terminate() + except Exception as e: + print(f'Terminating process {repr(proc)} failed: {e!r}') + + def run_real(self): + self.shared_memory = SharedMemory(create=True, size=self.max_shared_memory) + self.log.debug(f'Created shared memory of size: {self.shared_memory.size / 1024 / 1024:.02f} MiB') + + # create the shared memory segments and add them to their respective pools + for i in range(int(self.shared_memory.size / self.analysis.biggest_chunk)): + _sms = SharedMemorySegment(offset=i * self.analysis.biggest_chunk, + end=i * self.analysis.biggest_chunk + self.analysis.biggest_chunk, + _id=i) + self.sms.append(_sms) + + self.log.debug(f'Created {len(self.sms)} shared memory segments.') + + # Create queues + self.dl_worker_queue = MPQueue() + self.writer_queue = MPQueue() + self.dl_result_q = MPQueue() + self.writer_result_q = MPQueue() + + self.log.info(f'Starting download workers...') + for i in range(self.max_workers): + w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, + self.dl_result_q, self.shared_memory.name) + self.children.append(w) + w.start() + + self.log.info('Starting file writing worker...') + writer_p = FileWorker(self.writer_queue, self.writer_result_q, self.dl_dir, + self.shared_memory.name, self.cache_dir) + writer_p.start() + + num_chunk_tasks = sum(isinstance(t, ChunkTask) for t in self.tasks) + num_dl_tasks = len(self.chunks_to_dl) + num_tasks = len(self.tasks) + num_shared_memory_segments = len(self.sms) + self.log.debug(f'Chunks to download: {num_dl_tasks}, File tasks: {num_tasks}, Chunk tasks: {num_chunk_tasks}') + + # active downloader tasks + self.active_tasks = 0 + processed_chunks = 0 + processed_tasks = 0 + total_dl = 0 + total_write = 0 + + # synchronization conditions + shm_cond = Condition() + task_cond = Condition() + + # start threads + s_time = time.time() + dlj_e = Thread(target=self.download_job_manager, args=(task_cond, shm_cond)) + dlr_e = Thread(target=self.dl_results_handler, args=(task_cond,)) + fwr_e = Thread(target=self.fw_results_handler, args=(shm_cond,)) + + for t in (dlj_e, dlr_e, fwr_e): + t.start() + + last_update = time.time() + + while processed_tasks < num_tasks: + delta = time.time() - last_update + if not delta: + time.sleep(self.update_interval) + continue + + # update all the things + processed_chunks += self.num_processed_since_last + processed_tasks += self.num_tasks_processed_since_last + + total_dl += self.bytes_downloaded_since_last + total_write += self.bytes_written_since_last + + dl_speed = self.bytes_downloaded_since_last / delta + dl_unc_speed = self.bytes_decompressed_since_last / delta + w_speed = self.bytes_written_since_last / delta + r_speed = self.bytes_read_since_last / delta + c_speed = self.num_processed_since_last / delta + + # set temporary counters to 0 + self.bytes_read_since_last = self.bytes_written_since_last = 0 + self.bytes_downloaded_since_last = self.num_processed_since_last = 0 + self.bytes_decompressed_since_last = self.num_tasks_processed_since_last = 0 + last_update = time.time() + + perc = (processed_chunks / num_chunk_tasks) * 100 + self.log.info(f'\n============== {time.time() - s_time:.01f} seconds since start') + self.log.info(f'Progress: {processed_chunks}/{num_chunk_tasks} ({perc:.02f}%) chunk tasks processed.') + self.log.info(f'Downloaded: {total_dl / 1024 / 1024:.02f} MiB, ' + f'Written: {total_write / 1024 / 1024:.02f} MiB') + + # speed meters + self.log.info('Speeds:') + self.log.info(f' + Download - {dl_speed / 1024 / 1024:.02f} MiB/s (raw) ' + f'/ {dl_unc_speed / 1024 / 1024:.02f} MiB/s (decompressed)') + self.log.info(f' + Write (disk) - {w_speed / 1024 / 1024:.02f} MiB/s') + self.log.info(f' + Read (disk) - {r_speed / 1024 / 1024:.02f} MiB/s') + self.log.info(f' + Tasks - {c_speed:.02f} Chunks/s') + self.log.info(f'Active download tasks: {self.active_tasks}') + + # shared memory debugging + total_avail = len(self.sms) + total_used = (num_shared_memory_segments - total_avail) * (self.analysis.biggest_chunk / 1024 / 1024) + self.log.info(f'Shared memory usage: {total_used} MiB, available: {total_avail}') + + # send status update to back to instantiator (if queue exists) + if self.status_queue: + try: + self.status_queue.put(UIUpdate( + progress=perc, download_speed=dl_unc_speed, write_speed=w_speed, read_speed=r_speed, + memory_usage=total_used * 1024 * 1024 + ), timeout=1.0) + except Exception as e: + self.log.warning(f'Failed to send status update to queue: {e!r}') + + time.sleep(self.update_interval) + + for i in range(self.max_workers): + self.dl_worker_queue.put_nowait(DownloaderTask(kill=True)) + + self.writer_queue.put_nowait(WriterTask('', kill=True)) + self.log.info('Waiting for writer process to finish...') + + writer_p.join(timeout=10.0) + if writer_p.exitcode is None: + self.log.warning(f'Terminating writer process {e!r}') + writer_p.terminate() + + # forcibly kill DL workers that are not actually dead yet + for child in self.children: + if child.exitcode is None: + child.terminate() + + # make sure all the threads are dead. + for t in (dlj_e, dlr_e, fwr_e): + t.join(timeout=5.0) + if t.is_alive(): + self.log.warning(f'Thread did not terminate! {repr(t)}') + + # close up shared memory + self.shared_memory.close() + self.shared_memory.unlink() + self.shared_memory = None + + # finally, exit the process. + exit(0) diff --git a/legendary/downloader/workers.py b/legendary/downloader/workers.py new file mode 100644 index 0000000..676a310 --- /dev/null +++ b/legendary/downloader/workers.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python +# coding: utf-8 + +import os +import requests +import time +import traceback +import logging +import gc + +from multiprocessing import Process +from multiprocessing.shared_memory import SharedMemory +from queue import Empty + +from legendary.models.chunk import Chunk +from legendary.models.downloading import DownloaderTaskResult, WriterTaskResult + + +class DLWorker(Process): + def __init__(self, name, queue, out_queue, shm, max_retries=5): + super().__init__(name=name) + self.q = queue + self.o_q = out_queue + self.session = requests.session() + self.session.headers.update({ + 'User-Agent': 'EpicGamesLauncher/10.14.2-12166693+++Portal+Release-Live Windows/10.0.18363.1.256.64bit' + }) + self.max_retries = max_retries + self.shm = SharedMemory(name=shm) + self.log = logging.getLogger('DLWorker') + + def run(self): + empty = False + while True: + try: + job = self.q.get(timeout=10.0) + empty = False + except Empty: + if not empty: + self.log.debug(f'[{self.name}] Queue Empty, waiting for more...') + empty = True + continue + + if job.kill: # let worker die + self.log.info(f'[{self.name}] Queue Empty, waiting for more...') + break + + tries = 0 + dl_start = dl_end = 0 + compressed = 0 + chunk = None + + try: + while tries < self.max_retries: + # print('Downloading', job.url) + self.log.debug(f'[{self.name}] Downloading {job.url}') + dl_start = time.time() + + try: + r = self.session.get(job.url, timeout=5.0) + r.raise_for_status() + except Exception as e: + self.log.warning(f'[{self.name}] Chunk download failed ({e!r}), retrying...') + continue + + dl_end = time.time() + if r.status_code != 200: + self.log.warning(f'[{self.name}] Chunk download failed (Status {r.status_code}), retrying...') + continue + else: + compressed = len(r.content) + chunk = Chunk.read_buffer(r.content) + break + else: + raise TimeoutError('Max retries reached') + except Exception as e: + self.log.error(f'[{self.name}] Job failed with: {e!r}, fetching next one...') + # add failed job to result queue to be requeued + self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) + + if not chunk: + self.log.warning(f'[{self.name}] Chunk smoehow None?') + self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) + continue + + # decompress stuff + try: + size = len(chunk.data) + if size > job.shm.size: + self.log.fatal(f'Downloaded chunk is longer than SharedMemorySegment!') + + self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data) + del chunk + self.o_q.put(DownloaderTaskResult(success=True, chunk_guid=job.guid, shm=job.shm, + url=job.url, size=size, compressed_size=compressed, + time_delta=dl_end - dl_start)) + except Exception as e: + self.log.warning(f'[{self.name}] Job failed with: {e!r}, fetching next one...') + self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) + continue + + +class FileWorker(Process): + def __init__(self, queue, out_queue, base_path, shm, cache_path=None): + super().__init__(name='File worker') + self.q = queue + self.o_q = out_queue + self.base_path = base_path + self.cache_path = cache_path if cache_path else os.path.join(base_path, '.cache') + self.shm = SharedMemory(name=shm) + self.log = logging.getLogger('DLWorker') + + def run(self): + last_filename = '' + current_file = None + + while True: + try: + try: + j = self.q.get(timeout=10.0) + except Empty: + self.log.warning('Writer queue empty!') + continue + + if j.kill: + if current_file: + current_file.close() + self.o_q.put(WriterTaskResult(success=True, kill=True)) + break + + # make directories if required + path = os.path.split(j.filename)[0] + if not os.path.exists(os.path.join(self.base_path, path)): + os.makedirs(os.path.join(self.base_path, path)) + + full_path = os.path.join(self.base_path, j.filename) + + if j.empty: # just create an empty file + open(full_path, 'a').close() + self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) + continue + elif j.open: + if current_file: + self.log.warning(f'Opening new file {j.filename} without closing previous! {last_filename}') + current_file.close() + + current_file = open(full_path, 'wb') + last_filename = j.filename + + self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) + continue + elif j.close: + if current_file: + current_file.close() + current_file = None + else: + self.log.warning(f'Asking to close file that is not open: {j.filename}') + + self.o_q.put(WriterTaskResult(success=True, filename=j.filename, closed=True)) + continue + elif j.rename: + if current_file: + self.log.warning('Trying to rename file without closing first!') + current_file.close() + current_file = None + if j.delete: + try: + os.remove(full_path) + except OSError as e: + self.log.error(f'Removing file failed: {e!r}') + self.o_q.put(WriterTaskResult(success=False, filename=j.filename)) + continue + + try: + os.rename(os.path.join(self.base_path, j.temporary_filename), full_path) + except OSError as e: + self.log.error(f'Renaming file failed: {e!r}') + self.o_q.put(WriterTaskResult(success=False, filename=j.filename)) + continue + + self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) + continue + elif j.delete: + if current_file: + self.log.warning('Trying to delete file without closing first!') + current_file.close() + current_file = None + + try: + os.remove(full_path) + except OSError as e: + self.log.error(f'Removing file failed: {e!r}') + + self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) + continue + + pre_write = post_write = 0 + + try: + if j.shm: + pre_write = time.time() + shm_offset = j.shm.offset + j.chunk_offset + shm_end = shm_offset + j.chunk_size + current_file.write(self.shm.buf[shm_offset:shm_end].tobytes()) + post_write = time.time() + elif j.cache_file: + pre_write = time.time() + with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f: + if j.chunk_offset: + f.seek(j.chunk_offset) + current_file.write(f.read(j.chunk_size)) + post_write = time.time() + elif j.old_file: + pre_write = time.time() + with open(os.path.join(self.base_path, j.cache_file), 'rb') as f: + if j.chunk_offset: + f.seek(j.chunk_offset) + current_file.write(f.read(j.chunk_size)) + post_write = time.time() + except Exception as e: + self.log.warning(f'Something in writing a file failed: {e!r}') + self.o_q.put(WriterTaskResult(success=False, filename=j.filename, + chunk_guid=j.chunk_guid, + release_memory=j.release_memory, + shm=j.shm, size=j.chunk_size, + time_delta=post_write-pre_write)) + else: + self.o_q.put(WriterTaskResult(success=True, filename=j.filename, + chunk_guid=j.chunk_guid, + release_memory=j.release_memory, + shm=j.shm, size=j.chunk_size, + time_delta=post_write-pre_write)) + except Exception as e: + self.log.warning(f'[{self.name}] Job {j.filename} failed with: {e!r}, fetching next one...') + self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid)) + + try: + if current_file: + current_file.close() + current_file = None + except Exception as e: + self.log.error(f'[{self.name}] Closing file after error failed: {e!r}') + except KeyboardInterrupt: + if current_file: + current_file.close() + return diff --git a/legendary/lfs/__init__.py b/legendary/lfs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/legendary/lfs/egl.py b/legendary/lfs/egl.py new file mode 100644 index 0000000..4101c1c --- /dev/null +++ b/legendary/lfs/egl.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# coding: utf-8 + +import configparser +import json +import os + + +# ToDo make it possible to read manifests from game installs for migration. +# Also make paths configurable for importing games from WINE roots in the future + +# this is taken directly from rktlnch, needs to be updated + +class EPCLFS: + def __init__(self): + self.appdata_path = os.path.expandvars( + r'%LOCALAPPDATA%\EpicGamesLauncher\Saved\Config\Windows' + ) + self.programdata_path = os.path.expandvars( + r'%PROGRAMDATA%\Epic\EpicGamesLauncher\Data\Manifests' + ) + self.config = configparser.ConfigParser(strict=False) + self.config.optionxform = lambda option: option + + self.manifests = dict() + self.codename_map = dict() + self.guid_map = dict() + + def read_config(self): + self.config.read(os.path.join(self.appdata_path, 'GameUserSettings.ini')) + + def save_config(self): + with open(os.path.join(self.appdata_path, 'GameUserSettings.ini'), 'w') as f: + self.config.write(f, space_around_delimiters=False) + + def read_manifests(self): + for f in os.listdir(self.programdata_path): + if f.endswith('.item'): + data = json.load(open(os.path.join(self.programdata_path, f))) + self.manifests[data['CatalogItemId']] = data + self.codename_map[data['AppName']] = data['CatalogItemId'] + self.guid_map[data['InstallationGuid'].lower()] = data['CatalogItemId'] + + def get_manifest(self, *, game_name=None, install_guid=None, catalog_item_id=None): + if not game_name and not install_guid and not catalog_item_id: + raise ValueError('What are you doing?') + + if game_name and game_name in self.codename_map: + return self.manifests[self.codename_map[game_name]] + elif install_guid and install_guid in self.guid_map: + return self.manifests[self.guid_map[install_guid]] + elif catalog_item_id and catalog_item_id in self.manifests: + return self.manifests[catalog_item_id] + else: + raise ValueError('Cannot find manifest') diff --git a/legendary/lfs/lgndry.py b/legendary/lfs/lgndry.py new file mode 100644 index 0000000..3c58fc0 --- /dev/null +++ b/legendary/lfs/lgndry.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python +# coding: utf-8 + +import json +import os +import configparser +import logging + +from legendary.models.game import * +from legendary.lfs.utils import clean_filename + + +class LGDLFS: + def __init__(self): + self.log = logging.getLogger('LGDLFS') + self.path = os.path.expanduser('~/.config/legendary') + # EGS user info + self._user_data = None + # EGS entitlements + self._entitlements = None + # EGS asset data + self._assets = None + # EGS metadata + self._game_metadata = dict() + # Config with game specific settings (e.g. start parameters, env variables) + self.config = configparser.ConfigParser() + self.config.optionxform = str + + # ensure folders exist. + for f in ['', 'manifests', 'metadata', 'tmp', 'manifests/old']: + if not os.path.exists(os.path.join(self.path, f)): + os.makedirs(os.path.join(self.path, f)) + + # try loading config + self.config.read(os.path.join(self.path, 'config.ini')) + + try: + self._installed = json.load(open(os.path.join(self.path, 'installed.json'))) + except Exception as e: # todo do not do this + self._installed = None + + # load existing app metadata + for gm_file in os.listdir(os.path.join(self.path, 'metadata')): + try: + _meta = json.load(open(os.path.join(self.path, 'metadata', gm_file))) + self._game_metadata[_meta['app_name']] = _meta + except Exception as e: + self.log.debug(f'Loading game meta file "{gm_file}" failed: {e!r}') + + @property + def userdata(self): + if self._user_data is not None: + return self._user_data + + try: + self._user_data = json.load(open(os.path.join(self.path, 'user.json'))) + return self._user_data + except Exception as e: + self.log.debug(f'Failed to load user data: {e!r}') + return None + + @userdata.setter + def userdata(self, userdata): + if userdata is None: + raise ValueError('Userdata is none!') + + self._user_data = userdata + json.dump(userdata, open(os.path.join(self.path, 'user.json'), 'w'), + indent=2, sort_keys=True) + + def invalidate_userdata(self): + self._user_data = None + if os.path.exists(os.path.join(self.path, 'user.json')): + os.remove(os.path.join(self.path, 'user.json')) + + @property + def entitlements(self): + if self._entitlements is not None: + return self._entitlements + + try: + self._entitlements = json.load(open(os.path.join(self.path, 'entitlements.json'))) + return self._entitlements + except Exception as e: + self.log.debug(f'Failed to load entitlements data: {e!r}') + return None + + @entitlements.setter + def entitlements(self, entitlements): + if entitlements is None: + raise ValueError('Entitlements is none!') + + self._entitlements = entitlements + json.dump(entitlements, open(os.path.join(self.path, 'entitlements.json'), 'w'), + indent=2, sort_keys=True) + + @property + def assets(self): + if self._assets is None: + try: + self._assets = [GameAsset.from_json(a) for a in + json.load(open(os.path.join(self.path, 'assets.json')))] + except Exception as e: + self.log.debug(f'Failed to load assets data: {e!r}') + return None + + return self._assets + + @assets.setter + def assets(self, assets): + if assets is None: + raise ValueError('Assets is none!') + + self._assets = assets + json.dump([a.__dict__ for a in self._assets], + open(os.path.join(self.path, 'assets.json'), 'w'), + indent=2, sort_keys=True) + + def get_manifest(self, app_name): + manifest_file = os.path.join(self.path, 'manifests', f'{app_name}.manifest') + if os.path.exists(manifest_file): + return open(manifest_file, 'rb').read() + else: + return None + + def save_manifest(self, app_name, manifest_data, filename=None): + if not filename: + manifest_file = os.path.join(self.path, 'manifests', f'{app_name}.manifest') + else: + manifest_file = os.path.join(self.path, 'manifests', f'{clean_filename(filename)}.manifest') + + open(manifest_file, 'wb').write(manifest_data) + + def get_game_meta(self, app_name): + _meta = self._game_metadata.get(app_name, None) + if _meta: + return Game.from_json(_meta) + return None + + def set_game_meta(self, app_name, meta): + json_meta = meta.__dict__ + self._game_metadata[app_name] = json_meta + meta_file = os.path.join(self.path, 'metadata', f'{app_name}.json') + json.dump(json_meta, open(meta_file, 'w'), indent=2, sort_keys=True) + + def delete_game_meta(self, app_name): + if app_name in self._game_metadata: + del self._game_metadata[app_name] + meta_file = os.path.join(self.path, 'metadata', f'{app_name}.json') + if os.path.exists(meta_file): + os.remove(meta_file) + else: + raise ValueError(f'Game {app_name} does not exist in metadata DB!') + + def get_tmp_path(self): + return os.path.join(self.path, 'tmp') + + def clean_tmp_data(self): + for f in os.listdir(os.path.join(self.path, 'tmp')): + try: + os.remove(os.path.join(self.path, 'tmp', f)) + except Exception as e: + self.log.warning(f'Failed to delete file "{f}": {e!r}') + + def get_installed_game(self, app_name): + if self._installed is None: + try: + self._installed = json.load(open(os.path.join(self.path, 'installed.json'))) + except Exception as e: + self.log.debug(f'Failed to load installed game data: {e!r}') + return None + + game_json = self._installed.get(app_name, None) + if game_json: + return InstalledGame.from_json(game_json) + return None + + def set_installed_game(self, app_name, install_info): + if self._installed is None: + self._installed = dict() + + if app_name in self._installed: + self._installed[app_name].update(install_info.__dict__) + else: + self._installed[app_name] = install_info.__dict__ + + json.dump(self._installed, open(os.path.join(self.path, 'installed.json'), 'w'), + indent=2, sort_keys=True) + + def remove_installed_game(self, app_name): + if self._installed is None: + self.log.warning('Trying to remove a game, but no installed games?!') + return + + if app_name in self._installed: + del self._installed[app_name] + else: + self.log.warning('Trying to remove non-installed game:', app_name) + return + + json.dump(self._installed, open(os.path.join(self.path, 'installed.json'), 'w'), + indent=2, sort_keys=True) + + def get_installed_list(self): + return [InstalledGame.from_json(i) for i in self._installed.values()] + + def save_config(self): + with open(os.path.join(self.path, 'config.ini'), 'w') as cf: + self.config.write(cf) + diff --git a/legendary/lfs/utils.py b/legendary/lfs/utils.py new file mode 100644 index 0000000..3af71ab --- /dev/null +++ b/legendary/lfs/utils.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# coding: utf-8 + +import os +import shutil +import hashlib +import logging + +from typing import List + +logger = logging.getLogger('LFS Utils') + + +def delete_folder(path: str, recursive=True) -> bool: + try: + logger.debug(f'Deleting "{path}", recursive={recursive}...') + if not recursive: + os.removedirs(path) + else: + shutil.rmtree(path) + except Exception as e: + logger.error(f'Failed deleting files with {e!r}') + return False + else: + return True + + +def validate_files(base_path: str, filelist: List[tuple], hash_type='sha1') -> list: + """ + Validates the files in filelist in path against the provided hashes + + :param base_path: path in which the files are located + :param filelist: list of tuples in format (path, hash [hex]) + :param hash_type: (optional) type of hash, default is sha1 + :return: list of files that failed hash check + """ + + failed = list() + + if not os.path.exists(base_path): + logger.error('Path does not exist!') + failed.extend(i[0] for i in filelist) + return failed + + if not filelist: + logger.info('No files to validate') + return failed + + for file_path, file_hash in filelist: + full_path = os.path.join(base_path, file_path) + logger.debug(f'Checking "{file_path}"...') + + if not os.path.exists(full_path): + logger.warning(f'File "{full_path}" does not exist!') + failed.append(file_path) + continue + + with open(full_path, 'rb') as f: + real_file_hash = hashlib.new(hash_type) + while chunk := f.read(8192): + real_file_hash.update(chunk) + + if file_hash != real_file_hash.hexdigest(): + logger.error(f'Hash for "{full_path}" does not match!') + failed.append(file_path) + + return failed + + +def clean_filename(filename): + return ''.join(i for i in filename if i not in '<>:"/\\|?*') diff --git a/legendary/models/__init__.py b/legendary/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/legendary/models/chunk.py b/legendary/models/chunk.py new file mode 100644 index 0000000..5e5a64d --- /dev/null +++ b/legendary/models/chunk.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# coding: utf-8 + +import struct +import zlib + +from io import BytesIO + + +# ToDo do some reworking to make this more memory efficient + +class Chunk: + header_magic = 0xB1FE3AA2 + + def __init__(self): + self.header_version = 0 + self.header_size = 0 + self.compressed_size = 0 + self.hash = 0 + self.stored_as = 0 + self.guid = [] + + self.hash_type = 0 + self.sha_hash = None + self.uncompressed_size = 1024 * 1024 + + self._guid_str = '' + self._guid_num = 0 + self._bio = None + self._data = None + + @property + def data(self): + if self._data: + return self._data + + if self.compressed: + self._data = zlib.decompress(self._bio.read()) + else: + self._data = self._bio.read() + + # close BytesIO with raw data since we no longer need it + self._bio.close() + self._bio = None + + return self._data + + @property + def guid_str(self): + if not self._guid_str: + self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid) + return self._guid_str + + @property + def guid_num(self): + if not self._guid_num: + self._guid_num = self.guid[3] + (self.guid[2] << 32) + (self.guid[1] << 64) + (self.guid[0] << 96) + return self._guid_num + + @property + def compressed(self): + return self.stored_as & 0x1 + + @classmethod + def read_buffer(cls, data): + _sio = BytesIO(data) + return cls.read(_sio) + + @classmethod + def read(cls, bio): + head_start = bio.tell() + + if struct.unpack('= 2: + _chunk.sha_hash = bio.read(20) + _chunk.hash_type = struct.unpack('B', bio.read(1))[0] + + if _chunk.header_version >= 3: + _chunk.uncompressed_size = struct.unpack('IIII', bytes.fromhex(in_str)) + + +class JSONManifest(Manifest): + """ + Manifest-compatible reader for JSON based manifests + + """ + def __init__(self): + super().__init__() + self.json_data = None + + @classmethod + def read_all(cls, manifest): + _m = cls.read(manifest) + _tmp = deepcopy(_m.json_data) + + _m.meta = JSONManifestMeta.read(_tmp) + _m.chunk_data_list = JSONCDL.read(_tmp, manifest_version=_m.version) + _m.file_manifest_list = JSONFML.read(_tmp) + _m.custom_fields = _tmp.pop('CustomFields', dict()) + + if _tmp.keys(): + print(f'Did not read JSON keys: {_tmp.keys()}!') + + return _m + + @classmethod + def read(cls, manifest): + _manifest = cls() + _manifest.data = manifest + _manifest.json_data = json.loads(manifest.decode('utf-8')) + + _manifest.stored_as = 0 # never compressed + _manifest.version = blob_to_num(_manifest.json_data.get('ManifestFileVersion', '013000000000')) + + return _manifest + + +class JSONManifestMeta(ManifestMeta): + def __init__(self): + super().__init__() + + @classmethod + def read(cls, json_data): + _meta = cls() + + _meta.feature_level = blob_to_num(json_data.pop('ManifestFileVersion', '013000000000')) + _meta.is_file_data = json_data.pop('bIsFileData', False) + _meta.app_id = blob_to_num(json_data.pop('AppID', '000000000000')) + _meta.app_name = json_data.pop('AppNameString', '') + _meta.build_version = json_data.pop('BuildVersionString', '') + _meta.launch_exe = json_data.pop('LaunchExeString', '') + _meta.launch_command = json_data.pop('LaunchCommand', '') + _meta.prereq_ids = json_data.pop('PrereqIds', list()) + _meta.prereq_name = json_data.pop('PrereqName', '') + _meta.prereq_path = json_data.pop('PrereqPath', '') + _meta.prereq_args = json_data.pop('PrereqArgs', '') + + return _meta + + +class JSONCDL(CDL): + def __init__(self): + super().__init__() + + @classmethod + def read(cls, json_data, manifest_version=13): + _cdl = cls() + _cdl._manifest_version = manifest_version + _cdl.count = len(json_data['ChunkFilesizeList']) + + cfl = json_data.pop('ChunkFilesizeList') + chl = json_data.pop('ChunkHashList') + csl = json_data.pop('ChunkShaList') + dgl = json_data.pop('DataGroupList') + _guids = list(cfl.keys()) + + for guid in _guids: + _ci = ChunkInfo(manifest_version=manifest_version) + _ci.guid = guid_from_json(guid) + _ci.file_size = blob_to_num(cfl.pop(guid)) + _ci.hash = blob_to_num(chl.pop(guid)) + _ci.sha_hash = csl.pop(guid) # todo; figure out if we have to decode this somehow + _ci.group_num = blob_to_num(dgl.pop(guid)) + _ci.window_size = 1024*1024 + _cdl.elements.append(_ci) + + for _dc in (cfl, chl, csl, dgl): + if _dc: + print(f'Non-consumed CDL stuff: {_dc}') + + return _cdl + + +class JSONFML(FML): + def __init__(self): + super().__init__() + + @classmethod + def read(cls, json_data): + _fml = cls() + _fml.count = len(json_data['FileManifestList']) + + for _fmj in json_data.pop('FileManifestList'): + _fm = FileManifest() + _fm.filename = _fmj.pop('Filename', '') + _fm.hash = blob_to_num(_fmj.pop('FileHash')).to_bytes(160//8, 'little') + _fm.flags = int(_fmj.pop('bIsUnixExecutable', False)) << 2 + _fm.file_size = 0 + _fm.chunk_parts = [] + _fm.install_tags = _fmj.pop('InstallTags', list()) + + for _cpj in _fmj.pop('FileChunkParts'): + _cp = ChunkPart() + _cp.guid = guid_from_json(_cpj.pop('Guid')) + _cp.offset = blob_to_num(_cpj.pop('Offset')) + _cp.size = blob_to_num(_cpj.pop('Size')) + _fm.file_size += _cp.size + if _cpj: + print(f'Non-read ChunkPart keys: {_cpj.keys()}') + _fm.chunk_parts.append(_cp) + + if _fmj: + print(f'Non-read FileManifest keys: {_fmj.keys()}') + + _fml.elements.append(_fm) + + return _fml diff --git a/legendary/models/manifest.py b/legendary/models/manifest.py new file mode 100644 index 0000000..489504c --- /dev/null +++ b/legendary/models/manifest.py @@ -0,0 +1,509 @@ +#!/usr/bin/env python +# coding: utf-8 + +import hashlib +import logging +import struct +import zlib +from io import BytesIO + +logger = logging.getLogger('Manifest') + + +def read_fstring(bio): + length = struct.unpack(' 0: + s = bio.read(length - 1).decode('ascii') + bio.seek(1, 1) # skip string null terminator + else: # empty string, no terminators or anything + s = '' + + return s + + +def get_chunk_dir(version): + # The lowest version I've ever seen was 12 (Unreal Tournament), but for completeness sake leave all of them in + if version >= 15: + return 'ChunksV4' + elif version >= 6: + return 'ChunksV3' + elif version >= 3: + return 'ChunksV2' + else: + return 'Chunks' + + +class Manifest: + header_magic = 0x44BEC00C + + def __init__(self): + self.header_size = 0 + self.size_compressed = 0 + self.size_uncompressed = 0 + self.sha_hash = '' + self.stored_as = 0 + self.version = 0 + self.data = b'' + + # remainder + self.meta = None + self.chunk_data_list = None + self.file_manifest_list = None + self.custom_fields = None + + @property + def compressed(self): + return self.stored_as & 0x1 + + @classmethod + def read_all(cls, data): + _m = cls.read(data) + _tmp = BytesIO(_m.data) + + _m.meta = ManifestMeta.read(_tmp) + _m.chunk_data_list = CDL.read(_tmp, _m.version) + _m.file_manifest_list = FML.read(_tmp) + _m.custom_fields = CustomFields.read(_tmp) + + unhandled_data = _tmp.read() + if unhandled_data: + logger.warning(f'Did not read {len(unhandled_data)} remaining bytes in manifest! ' + f'This may not be a problem.') + + return _m + + @classmethod + def read(cls, data): + bio = BytesIO(data) + if struct.unpack(''.format( + self.guid_str, self.hash, self.sha_hash.hex(), self.group_num, self.window_size, self.file_size + ) + + @property + def guid_str(self): + if not self._guid_str: + self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid) + + return self._guid_str + + @property + def guid_num(self): + if not self._guid_num: + self._guid_num = self.guid[3] + (self.guid[2] << 32) + (self.guid[1] << 64) + (self.guid[0] << 96) + return self._guid_num + + @property + def path(self): + return '{}/{:02d}/{:016X}_{}.chunk'.format( + get_chunk_dir(self._manifest_version), + # the result of this seems to always match the group number, but this is the "correct way" + (zlib.crc32(struct.pack(''.format( + self.filename, self.symlink_target, self.hash.hex(), self.flags, + ', '.join(self.install_tags), cp_repr, self.file_size + ) + + +class ChunkPart: + def __init__(self): + self.guid = None + self.offset = 0 + self.size = 0 + # caches for things that are "expensive" to compute + self._guid_str = None + self._guid_num = None + + @property + def guid_str(self): + if not self._guid_str: + self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid) + return self._guid_str + + @property + def guid_num(self): + if not self._guid_num: + self._guid_num = self.guid[3] + (self.guid[2] << 32) + (self.guid[1] << 64) + (self.guid[0] << 96) + return self._guid_num + + def __repr__(self): + guid_readable = '-'.join('{:08x}'.format(g) for g in self.guid) + return ''.format( + guid_readable, self.offset, self.size) + + +class CustomFields: # this could probably be replaced with just a dict + def __init__(self): + self.size = 0 + self.version = 0 + self.count = 0 + + self._dict = dict() + + def __getitem__(self, item): + return self._dict.get(item, None) + + def __str__(self): + return str(self._dict) + + def keys(self): + return self._dict.keys() + + def values(self): + return self._dict.values() + + @classmethod + def read(cls, bio): + _cf = cls() + + cf_start = bio.tell() + _cf.size = struct.unpack('