From 64639a55203d85d712ba711f785b1ad174065b2e Mon Sep 17 00:00:00 2001 From: derrod Date: Thu, 16 Nov 2023 02:21:53 +0100 Subject: [PATCH] [core/cli/utils] Extremely WIP Steam Sync from 2022 Unfinished and might not even work anymore, left here as a reference for future work. --- .github/workflows/python.yml | 5 +- legendary/cli.py | 69 +++++- legendary/core.py | 191 +++++++++++++++- legendary/models/game.py | 2 + legendary/utils/pe.py | 176 +++++++++++++++ legendary/utils/steam.py | 407 +++++++++++++++++++++++++++++++++++ setup.py | 7 +- 7 files changed, 849 insertions(+), 8 deletions(-) create mode 100644 legendary/utils/pe.py create mode 100644 legendary/utils/steam.py diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index aec4ffe..8ca1661 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -28,9 +28,12 @@ jobs: pyinstaller requests filelock + pefile + vdf + Pillow - name: Optional dependencies (WebView) - run: pip3 install --upgrade pywebview + run: pip3 install --upgrade "pywebview<4.0" if: runner.os != 'macOS' - name: Set strip option on non-Windows diff --git a/legendary/cli.py b/legendary/cli.py index 06d6a8f..3333bc0 100644 --- a/legendary/cli.py +++ b/legendary/cli.py @@ -576,6 +576,9 @@ class LegendaryCLI: if args.origin: return self._launch_origin(args) + if args.steam and sys_platform == 'linux': + return self._launch_steam(app_name, args) + igame = self.core.get_installed_game(app_name) if not igame: logger.error(f'Game {app_name} is not currently installed!') @@ -703,7 +706,9 @@ class LegendaryCLI: if params.environment: logger.debug('Environment overrides: {}'.format(', '.join( f'{k}={v}' for k, v in params.environment.items()))) - subprocess.Popen(full_params, cwd=params.working_directory, env=full_env) + p = subprocess.Popen(full_params, cwd=params.working_directory, env=full_env) + if args.wait: + p.wait() def _launch_origin(self, args): game = self.core.get_game(app_name=args.app_name) @@ -802,6 +807,50 @@ class LegendaryCLI: logger.debug(f'Opening Origin URI with command: {shlex.join(command)}') subprocess.Popen(command, env=full_env) + def _launch_steam(self, app_name, args): + def exit_error(msg, errcode=1): + print('https://legendary.gl/steam_error?code=' + msg) + exit(errcode) + + igame = self.core.get_installed_game(app_name) + if not igame: + exit_error(f'not_installed') + if igame.is_dlc: + exit_error(f'is_dlc') + if not os.path.exists(igame.install_path): + exit_error(f'install_dir_missing') + + # override with config value + args.offline = self.core.is_offline_game(app_name) or args.offline + if not args.offline: + logger.info('Logging in...') + try: + if not self.core.login(): + exit_error('login_failed') + except ValueError: + exit_error('login_failed_no_credentials') + + if not args.skip_version_check and not self.core.is_noupdate_game(app_name): + logger.info('Checking for updates...') + try: + latest = self.core.get_asset(app_name, update=True, platform=igame.platform) + except ValueError: + exit_error('metadata_missing') + + if latest.build_version != igame.version: + exit_error('app_outdated') + + params = self.core.get_launch_parameters(app_name=app_name, offline=args.offline, + user=args.user_name_override, + language=args.language, disable_wine=True) + + full_params = [] + full_params.extend(params.game_parameters) + full_params.extend(params.user_parameters) + full_params.extend(params.egl_parameters) + logger.debug(f'Launch parameters: {shlex.join(full_params)}') + print(shlex.join(full_params)) + def install_game(self, args): if not self.core.lgd.lock_installed(): logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may ' @@ -2612,6 +2661,13 @@ class LegendaryCLI: self.core.install_game(igame) logger.info('Finished.') + def steam_sync(self, args): + if not self.core.login(): + logger.error('Login failed!') + return + + self.core.steam_sync() + def main(): # Set output encoding to UTF-8 if not outputting to a terminal @@ -2662,6 +2718,7 @@ def main(): list_saves_parser = subparsers.add_parser('list-saves', help='List available cloud saves') move_parser = subparsers.add_parser('move', help='Move specified app name to a new location') status_parser = subparsers.add_parser('status', help='Show legendary status information') + steam_parser = subparsers.add_parser('steam-sync', help='Setup/Run Steam Sync') sync_saves_parser = subparsers.add_parser('sync-saves', help='Sync cloud saves') uninstall_parser = subparsers.add_parser('uninstall', help='Uninstall (delete) a game') verify_parser = subparsers.add_parser('verify', help='Verify a game\'s local files', @@ -2814,6 +2871,10 @@ def main(): help='Launch Origin to activate or run the game.') launch_parser.add_argument('--json', dest='json', action='store_true', help='Print launch information as JSON and exit') + launch_parser.add_argument('--wait', dest='wait', action='store_true', + help='Wait until child process exits') + # hidden option for Steam sync launch + launch_parser.add_argument('--steam', dest='steam', action='store_true', help=argparse.SUPPRESS) if os.name != 'nt': launch_parser.add_argument('--wine', dest='wine_bin', action='store', metavar='', @@ -3010,8 +3071,8 @@ def main(): if args.full_help: # Commands that should not be shown in full help/list of commands (e.g. aliases) - _hidden_commands = {'download', 'update', 'repair', 'get-token', - 'import-game', 'verify-game', 'list-games'} + _hidden_commands = {'download', 'update', 'repair', 'get-token', 'import-game', + 'verify-game', 'list-games'} # Print the help for all of the subparsers. Thanks stackoverflow! print('Individual command help:') subparsers = next(a for a in parser._actions if isinstance(a, argparse._SubParsersAction)) @@ -3099,6 +3160,8 @@ def main(): cli.crossover_setup(args) elif args.subparser_name == 'move': cli.move(args) + elif args.subparser_name == 'steam-sync': + cli.steam_sync(args) except KeyboardInterrupt: logger.info('Command was aborted via KeyboardInterrupt, cleaning up...') diff --git a/legendary/core.py b/legendary/core.py index c6442d2..97f5bf2 100644 --- a/legendary/core.py +++ b/legendary/core.py @@ -40,6 +40,7 @@ from legendary.utils.game_workarounds import is_opt_enabled, update_workarounds, from legendary.utils.savegame_helper import SaveGameHelper from legendary.utils.selective_dl import games as sdl_games from legendary.lfs.wine_helpers import read_registry, get_shell_folders, case_insensitive_path_search +from legendary.utils.steam import SteamHelper # ToDo: instead of true/false return values for success/failure actually raise an exception that the CLI/GUI @@ -574,8 +575,12 @@ class LegendaryCore: def get_installed_game(self, app_name, skip_sync=False) -> InstalledGame: igame = self._get_installed_game(app_name) - if not skip_sync and igame and self.egl_sync_enabled and igame.egl_guid and not igame.is_dlc: - self.egl_sync(app_name) + if not skip_sync and igame: + if self.egl_sync_enabled and igame.egl_guid and not igame.is_dlc: + self.egl_sync(app_name) + if self.steam_sync_enabled and igame.steam_appid and not igame.is_dlc: + self.steam_sync(app_name) + return self._get_installed_game(app_name) else: return igame @@ -2110,6 +2115,188 @@ class LegendaryCore: if os.path.exists(path): delete_folder(path, recursive=True) + @property + def steam_sync_enabled(self): + return self.lgd.config.getboolean('Legendary', 'steam_sync', fallback=False) + + def _steam_export(self, sh: SteamHelper, shortcuts: dict, igame: InstalledGame): + def shortcut_exists(app_id): + for shortcut in shortcuts['shortcuts'].values(): + if (shortcut['appid'] + 2**32) == app_id: + return True + return False + + if igame.steam_appid and shortcut_exists(igame.steam_appid): + return False + + entry = sh.create_shortcut_entry(igame, igame.steam_appid) + + idx = 0 + while str(idx) in shortcuts['shortcuts']: + idx += 1 + + shortcuts['shortcuts'][str(idx)] = entry + # add appid to installed game + igame.steam_appid = entry['appid'] + 2**32 + self._install_game(igame) + + # todo only do this if no wine is configured for this app + if sys_platform == 'linux': + sh.set_compat_tool(igame.steam_appid, 'proton_experimental') + + return True + + def _steam_remove(self): + # todo remove icons and shit as well + pass + + def steam_sync(self, app_name=None, is_install=False, steam_path=None, + legendary_bin=None, steam_user=None, refresh_artwork=False): + try: + steam_path = steam_path or self.lgd.config.get('Legendary', 'steam_path', fallback=None) + legendary_bin = legendary_bin or self.lgd.config.get('Legendary', 'legendary_binary', fallback=None) + sh = SteamHelper(steam_path, legendary_bin, self.lgd.path) + if sys_platform == 'linux': + sh.ensure_launch_script() + except RuntimeError as e: + self.log.error(f'SteamHelper failed to initialize: {e!r}') + return + except FileNotFoundError: + self.log.error('Steam installation not found, please specify the installation directory ' + 'via the config (steam_path) or command line (--steam-path).') + return + + if sh.is_steam_running(): + if not is_install: + # todo use better exception + raise RuntimeError('Steam is running, please close it before running this command.') + else: + self.log.warning('Steam is still running, please restart it to reload Legendary shortcuts.') + + _ = sh.get_user_dir(steam_user or self.lgd.config.get('Legendary', 'steam_user', fallback=None)) + shortcuts = sh.read_shortcuts() + if sys_platform == 'linux': + sh.read_config() + + any_changes = False + + if app_name: + igame = self._get_installed_game(app_name) + any_changes = self._steam_export(sh, shortcuts, igame) + else: + for igame in self._get_installed_list(): + any_changes = self._steam_export(sh, shortcuts, igame) or any_changes + + # todo remove uninstalled games from shortcuts + + if any_changes: + sh.write_shortcuts(shortcuts) + if sys_platform == 'linux': + sh.write_config() + elif not refresh_artwork: + return + + # Download cover art and stuff + self.log.info('Downloading Steam Library artwork, this may take a while...') + + for igame in self._get_installed_list(): + if not igame.steam_appid: + continue + + sh.create_grid_json(igame.steam_appid) + game = self.get_game(igame.app_name) + # go through all available image files and download if necessary + banner = logo = tall = None + + # todo SteamDB instead + # todo move this into Steam Helper + for img in game.metadata.get('keyImages', []): + img_url = img['url'] + img_type = img['type'] + url_fname = img_url.rpartition('/')[2] + + if '.' not in url_fname: + self.log.debug(f'Image url for {igame.app_name} does not have a file extension.') + # extension doesn't really matter, Steam will determine the type when loading + ext = 'jpg' if img['type'] != 'DieselGameBoxLogo' else 'png' + else: + ext = url_fname.rpartition('.')[2] + + if img_type == 'DieselGameBox' or img_type == 'DieselGameBoxWide': + # Sometimes DieselGameBox doesn't exist but DieselGameBoxWide does. + # In cases where both exist they appear to be the same image. + filename = f'{igame.steam_appid}_hero.{ext}' + elif img_type == 'DieselGameBoxLogo': + filename = f'{igame.steam_appid}_logo.{ext}' + elif img_type == 'DieselGameBoxTall': + filename = f'{igame.steam_appid}p_epic.{ext}' + elif img_type == 'Thumbnail': + # If this is square use it instead of manually extracting the icon + if img['height'] == img['width']: + filename = f'{igame.steam_appid}_icon.{ext}' + else: + self.log.debug(f'Non-square thumbnail: {img_url}') + continue + else: + self.log.debug(f'Unknown EGS image type: {img["type"]}') + continue + + file_path = os.path.join(sh.grid_path, filename) + if not os.path.exists(file_path) or refresh_artwork: + self.log.debug(f'Downloading {img["url"]} to {filename}') + r = self.egs.unauth_session.get(img['url'], timeout=20.0) + if r.status_code == 200: + # save component image for big picture/box generation + if img_type == 'DieselGameBox' or img_type == 'DieselGameBoxWide': + banner = r.content + elif img_type == 'DieselGameBoxLogo': + logo = r.content + elif img_type == 'DieselGameBoxTall': + tall = r.content + + with open(file_path, 'wb') as f: + f.write(r.content) + + # assemble the banner (Steam calls it "header") for big picture + if banner: + # Big Picture banner image + banner_id = sh.get_header_id(igame) + banner_file = os.path.join(sh.grid_path, f'{banner_id}.jpg') + if not os.path.exists(banner_file) or refresh_artwork: + with open(banner_file, 'wb') as f: + f.write(sh.make_header_image(banner, logo)) + + # Deck UI banner image + banner_file = os.path.join(sh.grid_path, f'{igame.steam_appid}.png') + if not os.path.exists(banner_file) or refresh_artwork: + with open(banner_file, 'wb') as f: + f.write(sh.make_banner_image(banner, logo)) + + # If the logo exists as a separate file we need to manually generate the "tall" box art as well + if tall and logo: + box_file = os.path.join(sh.grid_path, f'{igame.steam_appid}p.png') + + if not os.path.exists(box_file) or refresh_artwork: + with open(box_file, 'wb') as f: + f.write(sh.make_tall_box(tall, logo)) + + # steam can read exe icons directly, but that doesn't handle alpha correctly, so do it ourselves. + icon_fie = os.path.join(sh.grid_path, f'{igame.steam_appid}_icon.png') + if not os.path.exists(icon_fie) or refresh_artwork: + try: + icon = sh.make_icon(igame) + if icon: + with open(icon_fie, 'wb') as f: + f.write(icon) + except Exception as e: + self.log.warning(f'Getting Steam icon failed with {e!r}') + # todo figure out how to set Proton by default + + self.log.info('Done, Steam may now be restarted.') + + def steam_unlink(self): + pass + def exit(self): """ Do cleanup, config saving, and exit. diff --git a/legendary/models/game.py b/legendary/models/game.py index 0d615ce..f1555e6 100644 --- a/legendary/models/game.py +++ b/legendary/models/game.py @@ -140,6 +140,7 @@ class InstalledGame: base_urls: List[str] = field(default_factory=list) can_run_offline: bool = False egl_guid: str = '' + steam_appid: int = 0 executable: str = '' install_size: int = 0 install_tags: List[str] = field(default_factory=list) @@ -177,6 +178,7 @@ class InstalledGame: tmp.platform = json.get('platform', 'Windows') tmp.install_size = json.get('install_size', 0) tmp.egl_guid = json.get('egl_guid', '') + tmp.steam_appid = json.get('steam_appid', 0) tmp.install_tags = json.get('install_tags', []) return tmp diff --git a/legendary/utils/pe.py b/legendary/utils/pe.py new file mode 100644 index 0000000..bae78e2 --- /dev/null +++ b/legendary/utils/pe.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- + +""" +Utilities for extracting information from PE (Portable Executable) files. +Adapted from https://github.com/robomotic/pemeta + +Original credits: +__author__ = "Paolo Di Prodi" +__copyright__ = "Copyright 2017, LogstTotal Project" +__license__ = "Apache" +__version__ = "2.0" +__maintainer__ = "Paolo Di Prodi" +__email__ = "paolo@logstotal.com" +""" + +import io +import logging +import struct + +import pefile + +from PIL import Image + + +class PEUtils(object): + GRPICONDIRENTRY_format = ('GRPICONDIRENTRY', + ('B,Width', 'B,Height', 'B,ColorCount', 'B,Reserved', + 'H,Planes', 'H,BitCount', 'I,BytesInRes', 'H,ID')) + GRPICONDIR_format = ('GRPICONDIR', + ('H,Reserved', 'H,Type', 'H,Count')) + RES_ICON = 1 + RES_CURSOR = 2 + + def __init__(self, pe_file): + self.pe = pefile.PE(pe_file, fast_load=True) + self.pe.parse_data_directories(directories=[ + pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'], + pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'], + pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'], + pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE'] + ]) + + def _find_resource_base(self, res_type): + try: + + rt_base_idx = [entry.id for + entry in self.pe.DIRECTORY_ENTRY_RESOURCE.entries].index( + pefile.RESOURCE_TYPE[res_type] + ) + except AttributeError: + rt_base_idx = None + except ValueError: + rt_base_idx = None + + if rt_base_idx is not None: + return self.pe.DIRECTORY_ENTRY_RESOURCE.entries[rt_base_idx] + + return None + + def _find_resource(self, res_type, res_index): + rt_base_dir = self._find_resource_base(res_type) + + if res_index < 0: + try: + idx = [entry.id for entry in rt_base_dir.directory.entries].index(-res_index) + except: + return None + else: + idx = res_index if res_index < len(rt_base_dir.directory.entries) else None + + if idx is None: + return None + + test_res_dir = rt_base_dir.directory.entries[idx] + res_dir = test_res_dir + if test_res_dir.struct.DataIsDirectory: + # another Directory + # probably language take the first one + res_dir = test_res_dir.directory.entries[0] + if res_dir.struct.DataIsDirectory: + # a directory there is no icon here + return None + + return res_dir + + def _get_group_icons(self): + rt_base_dir = self._find_resource_base('RT_GROUP_ICON') + groups = list() + + if not hasattr(rt_base_dir, "directory"): + return groups + + for res_index in range(0, len(rt_base_dir.directory.entries)): + grp_icon_dir_entry = self._find_resource('RT_GROUP_ICON', res_index) + + if not grp_icon_dir_entry: + continue + + data_rva = grp_icon_dir_entry.data.struct.OffsetToData + size = grp_icon_dir_entry.data.struct.Size + data = self.pe.get_memory_mapped_image()[data_rva:data_rva + size] + file_offset = self.pe.get_offset_from_rva(data_rva) + + grp_icon_dir = pefile.Structure(self.GRPICONDIR_format, file_offset=file_offset) + grp_icon_dir.__unpack__(data) + + if grp_icon_dir.Reserved != 0 or grp_icon_dir.Type != self.RES_ICON: + continue + offset = grp_icon_dir.sizeof() + + entries = list() + for idx in range(0, grp_icon_dir.Count): + grp_icon = pefile.Structure(self.GRPICONDIRENTRY_format, file_offset=file_offset + offset) + grp_icon.__unpack__(data[offset:]) + offset += grp_icon.sizeof() + entries.append(grp_icon) + + groups.append(entries) + return groups + + def _get_icon(self, index): + icon_entry = self._find_resource('RT_ICON', -index) + if not icon_entry: + return None + + data_rva = icon_entry.data.struct.OffsetToData + size = icon_entry.data.struct.Size + data = self.pe.get_memory_mapped_image()[data_rva:data_rva + size] + + return data + + def _export_raw(self, entries=None, index=None): + if not entries: + # just get the first group + for entries in self._get_group_icons(): + if entries: + break + else: + return None + + if index is not None: + entries = entries[index:index + 1] + + ico = struct.pack('=3.4'], - webview_gtk=['pywebview>=3.4', 'PyGObject'] + webview=['pywebview>=3.4<4.0'], + webview_gtk=['pywebview>=3.4<4.0', 'PyGObject'], + steam=['vdf', 'Pillow', 'pefile'], + full=['vdf', 'Pillow', 'pefile', 'pywebview>=3.4<4.0'], + full_gtk=['vdf', 'Pillow', 'pefile', 'pywebview>=3.4<4.0', 'PyGObject'] ), url='https://github.com/derrod/legendary', description='Free and open-source replacement for the Epic Games Launcher application',