1
0
Fork 0
mirror of synced 2024-05-06 13:42:52 +12:00
This commit is contained in:
loathingKernel 2024-02-12 18:30:23 +02:00
parent 3fe02e5026
commit 582b83c12b
20 changed files with 338 additions and 236 deletions

View file

@ -28,7 +28,7 @@ from rare.widgets.indicator_edit import PathEdit, IndicatorReasonsCommon
from rare.widgets.loading_widget import LoadingWidget
from rare.widgets.side_tab import SideTabContents
logger = getLogger("CloudWidget")
logger = getLogger("CloudSaves")
class CloudSaves(QWidget, SideTabContents):
@ -122,11 +122,11 @@ class CloudSaves(QWidget, SideTabContents):
except Exception as e:
logger.warning(str(e))
resolver = WineSavePathResolver(self.core, self.rgame)
if not resolver.environ.get("WINEPREFIX"):
del resolver
self.cloud_save_path_edit.setText("")
QMessageBox.warning(self, "Warning", "No wine prefix selected. Please set it in settings")
return
# if not resolver.environ.get("WINEPREFIX"):
# del resolver
# self.cloud_save_path_edit.setText("")
# QMessageBox.warning(self, "Warning", "No wine prefix selected. Please set it in settings")
# return
self.cloud_save_path_edit.setText(self.tr("Loading..."))
self.cloud_save_path_edit.setDisabled(True)
self.compute_save_path_button.setDisabled(True)
@ -139,7 +139,8 @@ class CloudSaves(QWidget, SideTabContents):
@pyqtSlot(str, str)
def __on_wine_resolver_result(self, path, app_name):
logger.info(f"Wine resolver finished for {app_name}. Computed save path: {path}")
logger.info("Wine resolver finished for %s", app_name)
logger.info("Computed save path: %s", path)
if app_name == self.rgame.app_name:
self.cloud_save_path_edit.setDisabled(False)
self.compute_save_path_button.setDisabled(False)

View file

@ -16,7 +16,7 @@ from rare.shared import RareCore
from rare.shared.workers.wine_resolver import WinePathResolver
from rare.ui.components.tabs.games.integrations.egl_sync_group import Ui_EGLSyncGroup
from rare.ui.components.tabs.games.integrations.egl_sync_list_group import Ui_EGLSyncListGroup
from rare.utils import runners
from rare.utils.compat import utils as compat_utils
from rare.widgets.elide_label import ElideLabel
from rare.widgets.indicator_edit import PathEdit, IndicatorReasonsCommon
@ -88,11 +88,7 @@ class EGLSyncGroup(QGroupBox):
def __run_wine_resolver(self):
self.egl_path_info.setText(self.tr("Updating..."))
wine_resolver = WinePathResolver(
self.core.get_app_launch_command("default"),
runners.get_environment(self.core.get_app_environment("default")),
PathSpec.egl_programdata()
)
wine_resolver = WinePathResolver(self.core, "default", str(PathSpec.egl_programdata()))
wine_resolver.signals.result_ready.connect(self.__on_wine_resolver_result)
QThreadPool.globalInstance().start(wine_resolver)
@ -306,7 +302,8 @@ class EGLSyncListGroup(QGroupBox):
def items(self) -> Iterable[EGLSyncListItem]:
# for i in range(self.list.count()):
# yield self.list.item(i)
return [self.ui.list.item(i) for i in range(self.ui.list.count())]
return map(self.ui.list.item, range(self.ui.list.count()))
# return [self.ui.list.item(i) for i in range(self.ui.list.count())]
class EGLSyncExportGroup(EGLSyncListGroup):

View file

@ -224,7 +224,7 @@ class EosGroup(QGroupBox):
if platform.system() != "Windows":
prefixes = config.get_prefixes()
prefixes = {prefix for prefix in prefixes if config.prefix_exists(prefix)}
prefixes = {prefix for prefix, _ in prefixes if config.prefix_exists(prefix)}
if platform.system() == "Darwin":
# TODO: add crossover support
pass

View file

@ -11,10 +11,10 @@ from rare.lgndr.core import LegendaryCore
from rare.utils.misc import icon
if platform.system() != "Windows":
from rare.utils.runners.wine import get_wine_environment
from rare.utils.compat.wine import get_wine_environment
if platform.system() in {"Linux", "FreeBSD"}:
from rare.utils.runners.proton import get_steam_environment
from rare.utils.compat.proton import get_steam_environment
class EnvVarsTableModel(QAbstractTableModel):

View file

@ -10,7 +10,8 @@ from rare.models.wrapper import Wrapper, WrapperType
from rare.shared import RareCore
from rare.shared.wrappers import Wrappers
from rare.utils import config_helper as config
from rare.utils.runners import proton
from rare.utils.compat import proton
from rare.utils.paths import proton_compat_dir
from rare.widgets.indicator_edit import PathEdit, IndicatorReasonsCommon
logger = getLogger("ProtonSettings")
@ -82,7 +83,7 @@ class ProtonSettings(QGroupBox):
def __on_proton_changed(self, index):
steam_tool: Union[proton.ProtonTool, proton.CompatibilityTool] = self.tool_combo.itemData(index)
steam_environ = proton.get_steam_environment(steam_tool)
steam_environ = proton.get_steam_environment(steam_tool, self.tool_prefix.text())
for key, value in steam_environ.items():
config.save_envvar(self.app_name, key, value)
self.environ_changed.emit(key)
@ -101,8 +102,16 @@ class ProtonSettings(QGroupBox):
self.wrappers.set_game_wrapper_list(self.app_name, wrappers)
self.tool_prefix.setEnabled(steam_tool is not None)
if steam_tool and not config.get_proton_compatdata(self.app_name, fallback=""):
self.tool_prefix.setText(os.path.expanduser("~/.proton"))
if steam_tool:
if not (compatdata_path := config.get_proton_compatdata(self.app_name, fallback="")):
compatdata_path = proton_compat_dir(self.app_name)
config.save_proton_compatdata(self.app_name, str(compatdata_path))
target = compatdata_path.joinpath("pfx")
if not target.is_dir():
os.makedirs(target, exist_ok=True)
self.tool_prefix.setText(str(compatdata_path))
else:
self.tool_prefix.setText("")
self.tool_enabled.emit(steam_tool is not None)

View file

@ -25,7 +25,7 @@ from rare.utils.misc import icon
from rare.widgets.dialogs import ButtonDialog, game_title
if pf.system() in {"Linux", "FreeBSD"}:
from rare.utils.runners import proton
from rare.utils.compat import proton
logger = getLogger("WrapperSettings")
@ -54,7 +54,7 @@ class WrapperEditDialog(ButtonDialog):
header = self.tr("Edit wrapper")
self.setWindowTitle(header)
self.setSubtitle(game_title(header, wrapper.name))
self.line_edit.setText(wrapper.command)
self.line_edit.setText(wrapper.as_str)
@pyqtSlot(str)
def __on_text_changed(self, text: str):
@ -83,7 +83,7 @@ class WrapperAddDialog(WrapperEditDialog):
self.setWindowTitle(header)
self.setSubtitle(header)
for wrapper in wrappers:
self.combo_box.addItem(f"{wrapper.name} ({wrapper.command})", wrapper.command)
self.combo_box.addItem(f"{wrapper.name} ({wrapper.as_str})", wrapper.as_str)
@pyqtSlot(int)
def __on_index_changed(self, index: int):
@ -101,7 +101,7 @@ class WrapperWidget(QFrame):
super(WrapperWidget, self).__init__(parent=parent)
self.setFrameShape(QFrame.StyledPanel)
self.setSizePolicy(QSizePolicy.Maximum, QSizePolicy.Fixed)
self.setToolTip(wrapper.command)
self.setToolTip(wrapper.as_str)
text_lbl = QLabel(wrapper.name, parent=self)
text_lbl.setFont(QFont("monospace"))
@ -296,7 +296,7 @@ class WrapperSettings(QWidget):
if pf.system() in {"Linux", "FreeBSD"}:
compat_cmds = [tool.command() for tool in proton.find_tools()]
if wrapper.command in compat_cmds:
if wrapper.as_str in compat_cmds:
QMessageBox.warning(
self,
self.tr("Warning"),
@ -306,7 +306,7 @@ class WrapperSettings(QWidget):
if wrapper.checksum in self.wrappers.get_game_md5sum_list(self.app_name):
QMessageBox.warning(
self, self.tr("Warning"), self.tr("Wrapper <b>{0}</b> is already in the list").format(wrapper.command)
self, self.tr("Warning"), self.tr("Wrapper <b>{0}</b> is already in the list").format(wrapper.as_str)
)
return

View file

@ -39,7 +39,7 @@ class PathSpec:
possible_prefixes = get_prefixes()
prefixes = [
prefix
for prefix in possible_prefixes
for prefix, _ in possible_prefixes
if os.path.exists(os.path.join(prefix, PathSpec.wine_egl_programdata()))
]
if not prefixes:
@ -66,5 +66,5 @@ class PathSpec:
self.__egl_path_vars["{installdir}"] = igame.install_path
def resolve_egl_path_vars(self, path: str) -> Union[LiteralString, str, bytes]:
cooked_path = [self.__egl_path_vars.get(p.lower(), p) for p in path.split("/")]
cooked_path = (self.__egl_path_vars.get(p.lower(), p) for p in path.split("/"))
return os.path.join(*cooked_path)

View file

@ -28,15 +28,20 @@ class Wrapper:
@property
def checksum(self) -> str:
return md5(self.command.encode("utf-8")).hexdigest()
return md5(self.as_str.encode("utf-8")).hexdigest()
@property
def executable(self) -> str:
return shlex.quote(self.__command[0])
@property
def command(self) -> str:
return " ".join(shlex.quote(part) for part in self.__command)
def command(self) -> List[str]:
return self.__command
@property
def as_str(self) -> str:
# return " ".join(shlex.quote(part) for part in self.__command)
return " ".join(map(shlex.quote, self.__command))
@property
def name(self) -> str:
@ -47,13 +52,13 @@ class Wrapper:
return self.__wtype
def __eq__(self, other) -> bool:
return self.command == other.command
return self.as_str == other.as_str
def __hash__(self):
return hash(self.__command)
def __bool__(self) -> bool:
return True if not self.is_editable else bool(self.command.strip())
return True if not self.is_editable else bool(self.as_str.strip())
@classmethod
def from_dict(cls, data: Dict):

View file

@ -102,13 +102,17 @@ class RareCore(QObject):
self.__signals.application.update_statusbar.emit()
def active_workers(self) -> Iterable[QueueWorker]:
return list(filter(lambda w: w.state == QueueWorkerState.ACTIVE, self.queue_workers))
# return list(filter(lambda w: w.state == QueueWorkerState.ACTIVE, self.queue_workers))
yield from filter(lambda w: w.state == QueueWorkerState.ACTIVE, self.queue_workers)
def queued_workers(self) -> Iterable[QueueWorker]:
return list(filter(lambda w: w.state == QueueWorkerState.QUEUED, self.queue_workers))
# return list(filter(lambda w: w.state == QueueWorkerState.QUEUED, self.queue_workers))
yield from filter(lambda w: w.state == QueueWorkerState.QUEUED, self.queue_workers)
def queue_info(self) -> List[QueueWorkerInfo]:
return [w.worker_info() for w in self.queue_workers]
def queue_info(self) -> Iterable[QueueWorkerInfo]:
# return (w.worker_info() for w in self.queue_workers)
for w in self.queue_workers:
yield w.worker_info()
@staticmethod
def instance() -> 'RareCore':

View file

@ -58,7 +58,8 @@ class InstallInfoWorker(Worker):
if not download.res or not download.res.failures:
self.signals.result.emit(download)
else:
self.signals.failed.emit("\n".join(str(i) for i in download.res.failures))
# self.signals.failed.emit("\n".join(str(i) for i in download.res.failures))
self.signals.failed.emit("\n".join(map(str, download.res.failures)))
except LgndrException as ret:
self.signals.failed.emit(ret.message)
except Exception as e:

View file

@ -35,8 +35,8 @@ def uninstall_game(
if platform.system() == "Darwin":
# TODO: add crossover support
pass
if prefixes is not None:
for prefix in prefixes:
if len(prefixes):
for prefix, _ in prefixes:
remove_registry_entries(prefix)
logger.debug("Removed registry entries for prefix %s", prefix)
else:

View file

@ -3,13 +3,14 @@ import platform
import time
from configparser import ConfigParser
from logging import getLogger
from typing import Union, Iterable, Mapping, List
from typing import Union, Iterable, List, Tuple, Dict
from PyQt5.QtCore import pyqtSignal, QObject, QRunnable
from PyQt5.QtCore import pyqtSignal, QObject
from rare.lgndr.core import LegendaryCore
from rare.models.game import RareGame
from rare.models.pathspec import PathSpec
from rare.shared.wrappers import Wrappers
from rare.utils import config_helper as config
from rare.utils.misc import path_size, format_size
from .worker import Worker
@ -19,7 +20,7 @@ if platform.system() == "Windows":
import winreg # pylint: disable=E0401
from legendary.lfs import windows_helpers
else:
from rare.utils import runners
from rare.utils.compat import utils as compat_utils, proton
logger = getLogger("WineResolver")
@ -28,45 +29,71 @@ class WinePathResolver(Worker):
class Signals(QObject):
result_ready = pyqtSignal(str, str)
def __init__(self, command: List[str], environ: Mapping, path: str):
def __init__(self, core: LegendaryCore, app_name: str, path: str):
super(WinePathResolver, self). __init__()
self.signals = WinePathResolver.Signals()
self.command = command
self.environ = environ
self.core = core
self.app_name = app_name
self.path = path
@staticmethod
def _configure_process(core: LegendaryCore, app_name: str) -> Tuple[List, Dict]:
tool: proton.CompatibilityTool = None
if config.get_boolean(app_name, "no_wine"):
wrappers = Wrappers()
for w in wrappers.get_game_wrapper_list(app_name):
if w.is_compat_tool:
for t in proton.find_tools():
if t.checksum == w.checksum:
tool = t
break
cmd = core.get_app_launch_command(
app_name,
wrapper=tool.as_str(proton.SteamVerb.RUN_IN_PREFIX) if tool is not None else None,
disable_wine=config.get_boolean(app_name, "no_wine")
)
env = core.get_app_environment(app_name, disable_wine=config.get_boolean(app_name, "no_wine"))
env = compat_utils.get_host_environment(env, silent=True)
return cmd, env
@staticmethod
def _resolve_unix_path(cmd, env, path: str) -> str:
if "waitforexitandrun" in cmd:
cmd[cmd.index("waitforexitandrun")] = "runinprefix"
logger.info("Resolving path '%s'", path)
wine_path = runners.resolve_path(cmd, env, path)
logger.debug("Resolved Wine path '%s'", path)
unix_path = runners.convert_to_unix_path(cmd, env, wine_path)
logger.debug("Resolved Unix path '%s'", unix_path)
wine_path = compat_utils.resolve_path(cmd, env, path)
logger.info("Resolved Wine path '%s'", wine_path)
unix_path = compat_utils.convert_to_unix_path(cmd, env, wine_path)
logger.info("Resolved Unix path '%s'", unix_path)
return unix_path
def run_real(self):
path = self._resolve_unix_path(self.command, self.environ, self.path)
self.signals.result_ready.emit(path, "default")
command, environ = self._configure_process(self.core, self.app_name)
if not (command and environ):
logger.error("Cannot setup %s, missing infomation", {type(self).__name__})
self.signals.result_ready.emit("", self.app_name)
path = self._resolve_unix_path(command, environ, self.path)
self.signals.result_ready.emit(path, self.app_name)
return
class WineSavePathResolver(WinePathResolver):
def __init__(self, core: LegendaryCore, rgame: RareGame):
cmd = core.get_app_launch_command(rgame.app_name, disable_wine=config.get_boolean(rgame.app_name, "no_wine"))
env = core.get_app_environment(rgame.app_name, disable_wine=config.get_boolean(rgame.app_name, "no_wine"))
env = runners.get_environment(env, silent=True)
path = PathSpec(core, rgame.igame).resolve_egl_path_vars(rgame.raw_save_path)
if not (cmd and env and path):
raise RuntimeError(f"Cannot setup {type(self).__name__}, missing infomation")
super(WineSavePathResolver, self).__init__(cmd, env, path)
super(WineSavePathResolver, self).__init__(rgame.core, rgame.app_name, str(path))
self.rgame = rgame
def run_real(self):
logger.info("Resolving save path for %s (%s)", self.rgame.app_title, self.rgame.app_name)
path = self._resolve_unix_path(self.command, self.environ, self.path)
command, environ = self._configure_process(self.core, self.rgame.app_name)
if not (command and environ):
logger.error("Cannot setup %s, missing infomation", {type(self).__name__})
self.signals.result_ready.emit("", self.rgame.app_name)
path = self._resolve_unix_path(command, environ, self.path)
# Clean wine output
# pylint: disable=E1136
if os.path.exists(path):
@ -75,14 +102,14 @@ class WineSavePathResolver(WinePathResolver):
return
class OriginWineWorker(QRunnable):
class OriginWineWorker(WinePathResolver):
def __init__(self, core: LegendaryCore, games: Union[Iterable[RareGame], RareGame]):
super(OriginWineWorker, self).__init__()
super(OriginWineWorker, self).__init__(core, "", "")
self.__cache: dict[str, ConfigParser] = {}
self.core = core
self.games = [games] if isinstance(games, RareGame) else games
def run(self) -> None:
def run_real(self) -> None:
t = time.time()
for rgame in self.games:
@ -102,18 +129,16 @@ class OriginWineWorker(QRunnable):
if platform.system() == "Windows":
install_dir = windows_helpers.query_registry_value(winreg.HKEY_LOCAL_MACHINE, reg_path, reg_key)
else:
command = self.core.get_app_launch_command(rgame.app_name)
environ = self.core.get_app_environment(rgame.app_name)
environ = runners.get_environment(environ, silent=True)
command, environ = self._configure_process(self.core, rgame.app_name)
prefix = config.get_prefix(rgame.app_name)
if not prefix:
return
use_wine = False
use_wine = True
if not use_wine:
# lk: this is the original way of getting the path by parsing "system.reg"
reg = self.__cache.get(prefix, None) or runners.read_registry("system.reg", prefix)
reg = self.__cache.get(prefix, None) or compat_utils.read_registry("system.reg", prefix)
self.__cache[prefix] = reg
reg_path = reg_path.replace("SOFTWARE", "Software").replace("WOW6432Node", "Wow6432Node")
@ -123,11 +148,11 @@ class OriginWineWorker(QRunnable):
install_dir = reg.get(reg_path, f'"{reg_key}"', fallback=None)
else:
# lk: this is the alternative way of getting the path by using wine itself
install_dir = runners.query_reg_key(command, environ, f"HKLM\\{reg_path}", reg_key)
install_dir = compat_utils.query_reg_key(command, environ, f"HKLM\\{reg_path}", reg_key)
if install_dir:
logger.debug("Found Wine install directory %s", install_dir)
install_dir = runners.convert_to_unix_path(command, environ, install_dir)
install_dir = compat_utils.convert_to_unix_path(command, environ, install_dir)
if install_dir:
logger.debug("Found Unix install directory %s", install_dir)
else:

View file

@ -69,7 +69,7 @@ class Wrappers:
# yield wrap
def get_game_wrapper_string(self, app_name: str) -> str:
commands = [wrapper.command for wrapper in self.get_game_wrapper_list(app_name)]
commands = [wrapper.as_str for wrapper in self.get_game_wrapper_list(app_name)]
return " ".join(commands)
def get_game_wrapper_list(self, app_name: str) -> List[Wrapper]:
@ -122,7 +122,7 @@ if __name__ == "__main__":
from pprint import pprint
from argparse import Namespace
from rare.utils.runners import proton
from rare.utils.compat import proton
global config_dir
config_dir = os.getcwd
@ -159,7 +159,7 @@ if __name__ == "__main__":
for i, tool in enumerate(proton.find_tools()):
wt = Wrapper(command=tool.command(), name=tool.name, wtype=WrapperType.COMPAT_TOOL)
wr.set_game_wrapper_list(f"compat_game_{i}", [wt])
print(wt.command)
print(wt.as_str)
for wrp in wr.user_wrappers:
pprint(wrp)

View file

View file

@ -2,9 +2,10 @@ import platform as pf
import os
import shlex
from dataclasses import dataclass
from enum import StrEnum
from hashlib import md5
from logging import getLogger
from typing import Optional, Union, List, Dict
from typing import Optional, Union, List, Dict, Set
if pf.system() in {"Linux", "FreeBSD"}:
# noinspection PyUnresolvedReferences
@ -22,11 +23,12 @@ def find_steam() -> str:
return path
def find_libraries(steam_path: str) -> List[str]:
def find_libraries(steam_path: str) -> Set[str]:
vdf_path = os.path.join(steam_path, "steamapps", "libraryfolders.vdf")
with open(vdf_path, "r") as f:
libraryfolders = vdf.load(f)["libraryfolders"]
libraries = [os.path.join(folder["path"], "steamapps") for key, folder in libraryfolders.items()]
# libraries = [os.path.join(folder["path"], "steamapps") for key, folder in libraryfolders.items()]
libraries = {os.path.join(folder["path"], "steamapps") for key, folder in libraryfolders.items()}
return libraries
@ -41,6 +43,16 @@ def find_libraries(steam_path: str) -> List[str]:
# is a good trade-off for the amount of complexity supporting everything would ensue.
class SteamVerb(StrEnum):
RUN = "run"
WAIT_FOR_EXIT_AND_RUN = "waitforexitandrun"
RUN_IN_PREFIX = "runinprefix"
DESTROY_PREFIX = "destroyprefix"
GET_COMPAT_PATH = "getcompatpath"
GET_NATIVE_PATH = "getnativepath"
DEFAULT = WAIT_FOR_EXIT_AND_RUN
@dataclass
class SteamBase:
steam_path: str
@ -57,19 +69,20 @@ class SteamBase:
def required_tool(self) -> Optional[str]:
return self.toolmanifest["manifest"].get("require_tool_appid", None)
def command(self, setup: bool = False) -> List[str]:
def command(self, verb: SteamVerb = SteamVerb.DEFAULT) -> List[str]:
tool_path = os.path.normpath(self.tool_path)
cmd = "".join([shlex.quote(tool_path), self.toolmanifest["manifest"]["commandline"]])
# NOTE: "waitforexitandrun" seems to be the verb used in by steam to execute stuff
# `run` is used when setting up the environment, so use that if we are setting up the prefix.
verb = "run" if setup else "waitforexitandrun"
cmd = cmd.replace("%verb%", verb)
cmd = cmd.replace("%verb%", str(verb))
return shlex.split(cmd)
def as_str(self, verb: SteamVerb = SteamVerb.DEFAULT):
return " ".join(map(shlex.quote, self.command(verb)))
@property
def checksum(self) -> str:
command = " ".join(shlex.quote(part) for part in self.command(setup=False))
return md5(command.encode("utf-8")).hexdigest()
return md5(self.as_str().encode("utf-8")).hexdigest()
@dataclass
@ -95,9 +108,9 @@ class ProtonTool(SteamRuntime):
return self.runtime is not None and self.runtime.appid == appid
return True
def command(self, setup: bool = False) -> List[str]:
cmd = self.runtime.command(setup)
cmd.extend(super().command(setup))
def command(self, verb: SteamVerb = SteamVerb.DEFAULT) -> List[str]:
cmd = self.runtime.command(verb)
cmd.extend(super().command(verb))
return cmd
@ -116,9 +129,9 @@ class CompatibilityTool(SteamBase):
name, data = list(self.compatibilitytool["compatibilitytools"]["compat_tools"].items())[0]
return data["display_name"]
def command(self, setup: bool = False) -> List[str]:
cmd = self.runtime.command(setup) if self.runtime is not None else []
cmd.extend(super().command(setup))
def command(self, verb: SteamVerb = SteamVerb.DEFAULT) -> List[str]:
cmd = self.runtime.command(verb) if self.runtime is not None else []
cmd.extend(super().command(verb))
return cmd
@ -229,6 +242,7 @@ def get_steam_environment(
# IMPORTANT: keep this in sync with the code below
environ = {"STEAM_COMPAT_DATA_PATH": compat_path if compat_path else ""}
if tool is None:
environ["STEAM_COMPAT_DATA_PATH"] = ""
environ["STEAM_COMPAT_CLIENT_INSTALL_PATH"] = ""
environ["STEAM_COMPAT_LIBRARY_PATHS"] = ""
environ["STEAM_COMPAT_MOUNTS"] = ""
@ -282,29 +296,5 @@ if __name__ == "__main__":
for tool in _tools:
print(get_steam_environment(tool))
print(tool.name)
print(tool.command())
print(" ".join(tool.command()))
def find_proton_combos():
possible_proton_combos = []
compatibilitytools_dirs = [
os.path.expanduser("~/.steam/steam/steamapps/common"),
"/usr/share/steam/compatibilitytools.d",
os.path.expanduser("~/.steam/compatibilitytools.d"),
os.path.expanduser("~/.steam/root/compatibilitytools.d"),
]
for c in compatibilitytools_dirs:
if os.path.exists(c):
for i in os.listdir(c):
proton = os.path.join(c, i, "proton")
compatibilitytool = os.path.join(c, i, "compatibilitytool.vdf")
toolmanifest = os.path.join(c, i, "toolmanifest.vdf")
if os.path.exists(proton) and (
os.path.exists(compatibilitytool) or os.path.exists(toolmanifest)
):
wrapper = f'"{proton}" run'
possible_proton_combos.append(wrapper)
if not possible_proton_combos:
logger.warning("Unable to find any Proton version")
return possible_proton_combos
print(tool.command(SteamVerb.RUN))
print(" ".join(tool.command(SteamVerb.RUN_IN_PREFIX)))

170
rare/utils/compat/utils.py Normal file
View file

@ -0,0 +1,170 @@
import os
import platform
import subprocess
from configparser import ConfigParser
from logging import getLogger
from typing import Mapping, Dict, List, Tuple
from PyQt5.QtCore import QProcess, QProcessEnvironment
from rare.utils import config_helper as config
if platform.system() != "Windows":
from . import wine
if platform.system() != "Darwin":
from . import proton
logger = getLogger("CompatUtils")
# this is a copied function from legendary.utils.wine_helpers, but registry file can be specified
def read_registry(registry: str, prefix: str) -> ConfigParser:
accepted = ["system.reg", "user.reg"]
if registry not in accepted:
raise RuntimeError(f'Unknown target "{registry}" not in {accepted}')
reg = ConfigParser(comment_prefixes=(';', '#', '/', 'WINE'), allow_no_value=True,
strict=False)
reg.optionxform = str
reg.read(os.path.join(prefix, 'system.reg'))
return reg
def get_configured_qprocess(command: List[str], environment: Mapping) -> QProcess:
logger.debug("Executing command: %s", command)
proc = QProcess()
proc.setProcessChannelMode(QProcess.SeparateChannels)
penv = QProcessEnvironment()
for ek, ev in environment.items():
penv.insert(ek, ev)
proc.setProcessEnvironment(penv)
proc.setProgram(command[0])
proc.setArguments(command[1:])
return proc
def get_configured_subprocess(command: List[str], environment: Mapping) -> subprocess.Popen:
logger.debug("Executing command: %s", command)
return subprocess.Popen(
command,
stdin=None,
stdout=subprocess.PIPE,
stderr=None,
env=environment,
shell=False,
text=False,
)
def execute_subprocess(command: List[str], arguments: List[str], environment: Mapping) -> Tuple[str, str]:
proc = get_configured_subprocess(command + arguments, environment)
print(proc.args)
out, err = proc.communicate()
out, err = out.decode("utf-8", "ignore") if out else "", err.decode("utf-8", "ignore") if err else ""
# lk: the following is a work-around for wineserver sometimes hanging around after
proc = get_configured_subprocess(command + ["wineboot", "-k"], environment)
_, _ = proc.communicate()
return out, err
def execute_qprocess(command: List[str], arguments: List[str], environment: Mapping) -> Tuple[str, str]:
proc = get_configured_qprocess(command + arguments, environment)
proc.start()
proc.waitForFinished(-1)
out, err = (
proc.readAllStandardOutput().data().decode("utf-8", "ignore"),
proc.readAllStandardError().data().decode("utf-8", "ignore")
)
proc.deleteLater()
# lk: the following is a work-around for wineserver sometimes hanging around after
proc = get_configured_qprocess(command + ["wineboot", "-k"], environment)
proc.start()
proc.waitForFinished(-1)
proc.deleteLater()
return out, err
def execute(command: List[str], arguments: List[str], environment: Mapping) -> Tuple[str, str]:
# Use the current environment if we are in flatpak or our own if we are on host
# In flatpak our environment is passed through `flatpak-spawn` arguments
if os.environ.get("container") == "flatpak":
flatpak_command = ["flatpak-spawn", "--host"]
for name, value in environment.items():
flatpak_command.append(f"--env={name}={value}")
_command = flatpak_command + command
_environment = os.environ.copy()
else:
_command = command
_environment = environment
try:
out, err = execute_qprocess(_command, arguments, _environment)
except Exception as e:
out, err = "", str(e)
return out, err
def resolve_path(command: List[str], environment: Mapping, path: str) -> str:
path = path.strip().replace("/", "\\")
# lk: if path does not exist form
arguments = ["cmd.exe", "/c", "echo", path]
# lk: if path exists and needs a case-sensitive interpretation form
# cmd = [wine_cmd, 'cmd', '/c', f'cd {path} & cd']
out, err = execute(command, arguments, environment)
out, err = out.strip(), err.strip()
if not out:
logger.error("Failed to resolve wine path due to \"%s\"", err)
return out
return out.strip('"')
def query_reg_path(wine_exec: str, wine_env: Mapping, reg_path: str):
raise NotImplementedError
def query_reg_key(command: List[str], environment: Mapping, reg_path: str, reg_key) -> str:
arguments = ["reg.exe", "query", reg_path, "/v", reg_key]
out, err = execute(command, arguments, environment)
out, err = out.strip(), err.strip()
if not out:
logger.error("Failed to query registry key due to \"%s\"", err)
return out
lines = out.split("\n")
keys: Dict = {}
for line in lines:
if line.startswith(" "*4):
key = [x for x in line.split(" "*4, 3) if bool(x)]
keys.update({key[0]: key[2]})
return keys.get(reg_key, "")
def convert_to_windows_path(wine_exec: str, wine_env: Mapping, path: str) -> str:
raise NotImplementedError
def convert_to_unix_path(command: List[str], environment: Mapping, path: str) -> str:
path = path.strip().strip('"')
arguments = ["winepath.exe", "-u", path]
out, err = execute(command, arguments, environment)
out, err = out.strip(), err.strip()
if not out:
logger.error("Failed to convert to unix path due to \"%s\"", err)
return os.path.realpath(out) if (out := out.strip()) else out
def get_host_environment(app_environment: Dict, silent: bool = True) -> Dict:
# Get a clean environment if we are in flatpak, this environment will be passed
# to `flatpak-spawn`, otherwise use the system's.
_environ = {} if os.environ.get("container") == "flatpak" else os.environ.copy()
_environ.update(app_environment)
if silent:
_environ["WINEESYNC"] = "0"
_environ["WINEFSYNC"] = "0"
_environ["WINE_DISABLE_FAST_SYNC"] = "1"
_environ["WINEDEBUG"] = "-all"
_environ["WINEDLLOVERRIDES"] = "winemenubuilder=d;mscoree=d;mshtml=d;"
# lk: pressure-vessel complains about this but it doesn't fail due to it
_environ["DISPLAY"] = ""
return _environ

View file

@ -1,5 +1,5 @@
import os
from typing import Callable, Optional, Set, Any, List
from typing import Callable, Optional, Set, Any, List, Tuple
from legendary.core import LegendaryCore
from legendary.models.config import LGDConf
@ -127,27 +127,27 @@ def get_proton_compatdata_fallback(app_name: Optional[str] = None, fallback: Any
return _compat
def get_wine_prefixes() -> Set[str]:
_prefixes: Set[str] = set()
def get_wine_prefixes() -> Set[Tuple[str, str]]:
_prefixes: Set[Tuple[str, str]] = set()
for name, section in _config.items():
pfx = section.get("WINEPREFIX") or section.get("wine_prefix")
if pfx:
_prefixes.update([pfx])
_prefixes = {os.path.expanduser(prefix) for prefix in _prefixes}
return set(filter(os.path.isdir, _prefixes))
_prefixes.update([(pfx, name[:-len(".env")] if name.endswith(".env") else name)])
_prefixes = {(os.path.expanduser(p), n) for p, n in _prefixes}
return {(p, n) for p, n in _prefixes if os.path.isdir(p)}
def get_proton_prefixes() -> Set[str]:
_prefixes: Set[str] = set()
def get_proton_prefixes() -> Set[Tuple[str, str]]:
_prefixes: Set[Tuple[str, str]] = set()
for name, section in _config.items():
pfx = os.path.join(compatdata, "pfx") if (compatdata := section.get("STEAM_COMPAT_DATA_PATH")) else ""
if pfx:
_prefixes.update([pfx])
_prefixes = {os.path.expanduser(prefix) for prefix in _prefixes}
return set(filter(os.path.isdir, _prefixes))
_prefixes.update([(pfx, name[:-len(".env")] if name.endswith(".env") else name)])
_prefixes = {(os.path.expanduser(p), n) for p, n in _prefixes}
return {(p, n) for p, n in _prefixes if os.path.isdir(p)}
def get_prefixes() -> Set[str]:
def get_prefixes() -> Set[Tuple[str, str]]:
return get_wine_prefixes().union(get_proton_prefixes())

View file

@ -81,6 +81,15 @@ def desktop_dir() -> Path:
def applications_dir() -> Path:
return Path(QStandardPaths.writableLocation(QStandardPaths.ApplicationsLocation))
def proton_compat_dir(app_name: str) -> Path:
return data_dir().joinpath(f"compatdata/{app_name}")
def wine_compat_dir(app_name: str) -> Path:
return proton_compat_dir(app_name).joinpath("pfx")
# fmt: off
__link_suffix = {
"Windows": {

View file

@ -1,109 +0,0 @@
import os
import platform
import subprocess
from configparser import ConfigParser
from logging import getLogger
from typing import Mapping, Dict, List, Tuple, Optional
from rare.utils import config_helper as config
if platform.system() != "Windows":
from . import wine
if platform.system() != "Darwin":
from . import proton
logger = getLogger("Runners")
# this is a copied function from legendary.utils.wine_helpers, but registry file can be specified
def read_registry(registry: str, prefix: str) -> ConfigParser:
accepted = ["system.reg", "user.reg"]
if registry not in accepted:
raise RuntimeError(f'Unknown target "{registry}" not in {accepted}')
reg = ConfigParser(comment_prefixes=(';', '#', '/', 'WINE'), allow_no_value=True,
strict=False)
reg.optionxform = str
reg.read(os.path.join(prefix, 'system.reg'))
return reg
def execute(command: List[str], environment: Mapping) -> Tuple[str, str]:
if os.environ.get("container") == "flatpak":
flatpak_command = ["flatpak-spawn", "--host"]
for name, value in environment.items():
flatpak_command.append(f"--env={name}={value}")
command = flatpak_command + command
try:
proc = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# Use the current environment if we are in flatpak or our own if we are on host
# In flatpak our environment is passed through `flatpak-spawn` arguments
env=os.environ.copy() if os.environ.get("container") == "flatpak" else environment,
shell=False,
text=True,
)
res = proc.communicate()
except (FileNotFoundError, PermissionError) as e:
res = ("", str(e))
return res
def resolve_path(command: List[str], environment: Mapping, path: str) -> str:
path = path.strip().replace("/", "\\")
# lk: if path does not exist form
cmd = command + ["cmd", "/c", "echo", path]
# lk: if path exists and needs a case-sensitive interpretation form
# cmd = [wine_cmd, 'cmd', '/c', f'cd {path} & cd']
out, err = execute(cmd, environment)
out, err = out.strip(), err.strip()
if not out:
logger.error("Failed to resolve wine path due to \"%s\"", err)
return out
return out.strip('"')
def query_reg_path(wine_exec: str, wine_env: Mapping, reg_path: str):
raise NotImplementedError
def query_reg_key(command: List[str], environment: Mapping, reg_path: str, reg_key) -> str:
cmd = command + ["reg", "query", reg_path, "/v", reg_key]
out, err = execute(cmd, environment)
out, err = out.strip(), err.strip()
if not out:
logger.error("Failed to query registry key due to \"%s\"", err)
return out
lines = out.split("\n")
keys: Dict = {}
for line in lines:
if line.startswith(" "*4):
key = [x for x in line.split(" "*4, 3) if bool(x)]
keys.update({key[0]: key[2]})
return keys.get(reg_key, "")
def convert_to_windows_path(wine_exec: str, wine_env: Mapping, path: str) -> str:
raise NotImplementedError
def convert_to_unix_path(command: List[str], environment: Mapping, path: str) -> str:
path = path.strip().strip('"')
cmd = command + ["winepath.exe", "-u", path]
out, err = execute(cmd, environment)
out, err = out.strip(), err.strip()
if not out:
logger.error("Failed to convert to unix path due to \"%s\"", err)
return os.path.realpath(out) if (out := out.strip()) else out
def get_environment(app_environment: Dict, silent: bool = True) -> Dict:
# Get a clean environment if we are in flatpak, this environment will be passed
# to `flatpak-spawn`, otherwise use the system's.
_environ = {} if os.environ.get("container") == "flatpak" else os.environ.copy()
_environ.update(app_environment)
if silent:
_environ["WINEDEBUG"] = "-all"
_environ["WINEDLLOVERRIDES"] = "winemenubuilder=d;mscoree=d;mshtml=d;"
_environ["DISPLAY"] = ""
return _environ