1
0
Fork 0
mirror of synced 2024-06-26 18:20:50 +12:00

GameInfo: Make widgets react to changes from RareGame's widget.update signal

Note: the `__update_widget()` method, while it doesn't have any visible delay
has the potential for improvement. I didn't do it because it felt like
premature optimization.

MoveGamePopUp: update it to use RareGame and it's signals

RareGame: Add `install_path` attribute and change `needs_verification` setter
Now both setters will update the local `igame` attribute and save it too.

MoveWorker: Update it to use RareGame.
Other changes include moving "same-drive" moving into the worker and using
`os.path` methods instead of `PathLib`

SteamGrades: Remove worker, it is implemented in RareGame now.
This commit is contained in:
loathingKernel 2023-02-15 16:59:33 +02:00
parent 034c6f2ade
commit 6c0663771c
5 changed files with 350 additions and 398 deletions

View file

@ -1,36 +1,27 @@
import os
import platform
import shutil
from ctypes import c_uint64
from logging import getLogger
from pathlib import Path
from typing import Optional, Union
from typing import Optional
from PyQt5.QtCore import (
Qt,
QThreadPool,
pyqtSlot,
)
from PyQt5.QtWidgets import (
QMenu,
QPushButton,
QWidget,
QMessageBox,
QWidgetAction,
)
from rare.models.game import RareGame
from rare.shared import (
RareCore,
LegendaryCoreSingleton,
GlobalSignalsSingleton,
ArgumentsSingleton,
ImageManagerSingleton,
)
from rare.shared import RareCore
from rare.shared.image_manager import ImageSize
from rare.shared.workers import VerifyWorker, MoveWorker
from rare.ui.components.tabs.games.game_info.game_info import Ui_GameInfo
from rare.utils.misc import get_size
from rare.utils.steam_grades import SteamWorker
from rare.widgets.image_widget import ImageWidget
from .move_game import MoveGamePopUp, is_game_dir
@ -47,10 +38,9 @@ class GameInfo(QWidget):
self.ui.uninstall_button.setObjectName("UninstallButton")
self.rcore = RareCore.instance()
self.core = LegendaryCoreSingleton()
self.signals = GlobalSignalsSingleton()
self.args = ArgumentsSingleton()
self.image_manager = ImageManagerSingleton()
self.core = RareCore.instance().core()
self.args = RareCore.instance().args()
# self.image_manager = RareCore.instance().image_manager()
self.rgame: Optional[RareGame] = None
@ -58,42 +48,35 @@ class GameInfo(QWidget):
self.image.setFixedSize(ImageSize.Display)
self.ui.layout_game_info.insertWidget(0, self.image, alignment=Qt.AlignTop)
if platform.system() == "Windows":
self.ui.lbl_grade.setVisible(False)
self.ui.grade.setVisible(False)
self.ui.game_actions_stack.setCurrentWidget(self.ui.installed_page)
self.ui.uninstall_button.clicked.connect(self.__on_uninstall)
self.ui.verify_button.clicked.connect(self.__on_verify)
self.verify_pool = QThreadPool()
self.verify_pool.setMaxThreadCount(2)
if self.args.offline:
self.ui.repair_button.setDisabled(True)
else:
self.ui.repair_button.clicked.connect(self.__on_repair)
self.ui.install_button.clicked.connect(self.__on_install)
self.ui.verify_button.clicked.connect(self.__on_verify)
self.ui.repair_button.clicked.connect(self.__on_repair)
self.ui.uninstall_button.clicked.connect(self.__on_uninstall)
self.move_game_pop_up = MoveGamePopUp()
self.move_action = QWidgetAction(self)
self.move_action.setDefaultWidget(self.move_game_pop_up)
self.ui.move_button.setMenu(QMenu())
self.ui.move_button.menu().addAction(self.move_action)
self.move_game_pop_up = MoveGamePopUp(self)
move_action = QWidgetAction(self)
move_action.setDefaultWidget(self.move_game_pop_up)
self.ui.move_button.setMenu(QMenu(self.ui.move_button))
self.ui.move_button.menu().addAction(move_action)
self.existing_game_dir = False
self.is_moving = False
self.game_moving = None
self.dest_path_with_suffix = None
self.move_game_pop_up.browse_done.connect(self.show_menu_after_browse)
self.move_game_pop_up.browse_done.connect(self.ui.move_button.showMenu)
self.move_game_pop_up.move_clicked.connect(self.ui.move_button.menu().close)
self.move_game_pop_up.move_clicked.connect(self.move_game)
self.move_game_pop_up.move_clicked.connect(self.__on_move)
self.steam_grade_ratings = {
"platinum": self.tr("Platinum"),
"gold": self.tr("Gold"),
"silver": self.tr("Silver"),
"bronze": self.tr("Bronze"),
"borked": self.tr("Borked"),
"fail": self.tr("Failed to get rating"),
"pending": self.tr("Loading..."),
"na": self.tr("Not applicable"),
}
@pyqtSlot()
def __on_install(self):
if self.rgame.is_origin:
if self.rgame.is_non_asset:
self.rgame.launch()
else:
self.rgame.install()
@ -101,12 +84,12 @@ class GameInfo(QWidget):
# FIXME: Move to RareGame
@pyqtSlot()
def __on_uninstall(self):
""" This function is to be called from the button only """
""" This method is to be called from the button only """
self.rgame.uninstall()
@pyqtSlot()
def __on_repair(self):
""" This function is to be called from the button only """
""" This method is to be called from the button only """
repair_file = os.path.join(self.core.lgd.get_tmp_path(), f"{self.rgame.app_name}.repair")
if not os.path.exists(repair_file):
QMessageBox.warning(
@ -125,7 +108,7 @@ class GameInfo(QWidget):
if rgame.has_update:
ans = QMessageBox.question(
self,
self.tr("Repair and update?"),
self.tr("Repair and update? - {}").format(self.rgame.title),
self.tr(
"There is an update for <b>{}</b> from <b>{}</b> to <b>{}</b>. "
"Do you want to update the game while repairing it?"
@ -133,9 +116,17 @@ class GameInfo(QWidget):
) == QMessageBox.Yes
rgame.repair(repair_and_update=ans)
@pyqtSlot(RareGame, str)
def __on_worker_error(self, rgame: RareGame, message: str):
QMessageBox.warning(
self,
self.tr("Error - {}").format(rgame.title),
message
)
@pyqtSlot()
def __on_verify(self):
""" This function is to be called from the button only """
""" This method is to be called from the button only """
if not os.path.exists(self.rgame.igame.install_path):
logger.error(f"Installation path {self.rgame.igame.install_path} for {self.rgame.title} does not exist")
QMessageBox.warning(
@ -150,35 +141,18 @@ class GameInfo(QWidget):
worker = VerifyWorker(self.core, self.args, rgame)
worker.signals.progress.connect(self.__on_verify_progress)
worker.signals.result.connect(self.__on_verify_result)
worker.signals.error.connect(self.__on_verify_error)
self.ui.verify_stack.setCurrentWidget(self.ui.verify_progress_page)
self.ui.verify_progress.setValue(0)
self.ui.move_button.setEnabled(False)
worker.signals.error.connect(self.__on_worker_error)
self.rcore.enqueue_worker(rgame, worker)
def verify_cleanup(self, rgame: RareGame):
if rgame is not self.rgame:
return
self.ui.verify_stack.setCurrentWidget(self.ui.verify_button_page)
self.ui.move_button.setEnabled(True)
self.ui.verify_button.setEnabled(True)
@pyqtSlot(RareGame, str)
def __on_verify_error(self, rgame: RareGame, message):
self.verify_cleanup(rgame)
QMessageBox.warning(
self,
self.tr("Error - {}").format(rgame.title),
message
)
@pyqtSlot(RareGame, int, int, float, float)
def __on_verify_progress(self, rgame: RareGame, num, total, percentage, speed):
# lk: the check is NOT REQUIRED because signals are disconnected but protect against it anyway
if rgame is not self.rgame:
return
self.ui.verify_progress.setValue(num * 100 // total)
@pyqtSlot(RareGame, bool, int, int)
def __on_verify_result(self, rgame: RareGame, success, failed, missing):
self.verify_cleanup(rgame)
self.ui.repair_button.setDisabled(success)
if success:
QMessageBox.information(
@ -202,201 +176,173 @@ class GameInfo(QWidget):
self.repair_game(rgame)
@pyqtSlot(str)
def move_game(self, dest_path):
dest_path = Path(dest_path)
install_path = Path(self.rgame.igame.install_path)
self.dest_path_with_suffix = dest_path.joinpath(install_path.stem)
def __on_move(self, dst_path: str):
""" This method is to be called from the button only """
new_install_path = os.path.join(dst_path, os.path.basename(self.rgame.install_path))
if self.dest_path_with_suffix.is_dir():
self.existing_game_dir = is_game_dir(install_path, self.dest_path_with_suffix)
dir_exists = False
if os.path.isdir(new_install_path):
dir_exists = is_game_dir(self.rgame.install_path, new_install_path)
if not self.existing_game_dir:
for i in dest_path.iterdir():
if install_path.stem in i.stem:
warn_msg = QMessageBox()
warn_msg.setText(self.tr("Destination file/directory exists."))
warn_msg.setInformativeText(
self.tr("Do you really want to overwrite it? This will delete {}").format(
self.dest_path_with_suffix
)
if not dir_exists:
for item in os.listdir(dst_path):
if os.path.basename(self.rgame.install_path) in os.path.basename(item):
ans = QMessageBox.question(
self,
self.tr("Move game? - {}").format(self.rgame.title),
self.tr(
"Destination <b>{}</b> already exists. "
"Are you sure you want to overwrite it?"
).format(new_install_path),
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes,
)
warn_msg.addButton(QPushButton(self.tr("Yes")), QMessageBox.YesRole)
warn_msg.addButton(QPushButton(self.tr("No")), QMessageBox.NoRole)
response = warn_msg.exec()
if response == 0:
# Not using pathlib, since we can't delete not-empty folders. With shutil we can.
if self.dest_path_with_suffix.is_dir():
shutil.rmtree(self.dest_path_with_suffix)
if ans == QMessageBox.Yes:
if os.path.isdir(new_install_path):
shutil.rmtree(new_install_path)
else:
self.dest_path_with_suffix.unlink()
os.remove(new_install_path)
else:
return
self.ui.move_stack.setCurrentWidget(self.ui.move_progress)
self.move_game(self.rgame, new_install_path, dir_exists)
self.game_moving = self.rgame.app_name
self.is_moving = True
self.ui.verify_button.setEnabled(False)
if self.move_game_pop_up.is_different_drive(str(dest_path), str(install_path)):
# Destination dir on different drive
self.start_copy_diff_drive()
else:
# Destination dir on same drive
shutil.move(self.rgame.igame.install_path, dest_path)
self.set_new_game(self.dest_path_with_suffix)
def __on_move_progress(self, progress_int):
self.ui.move_progress.setValue(progress_int)
def start_copy_diff_drive(self):
def move_game(self, rgame: RareGame, dst_path, dst_exists):
worker = MoveWorker(
self.core,
install_path=self.rgame.igame.install_path,
dest_path=self.dest_path_with_suffix,
is_existing_dir=self.existing_game_dir,
igame=self.rgame.igame,
self.core, rgame=rgame, dst_path=dst_path, dst_exists=dst_exists
)
worker.signals.progress.connect(self.__on_move_progress)
worker.signals.result.connect(self.set_new_game)
worker.signals.error.connect(self.warn_no_space_left)
worker.signals.result.connect(self.__on_move_result)
worker.signals.error.connect(self.__on_worker_error)
self.rcore.enqueue_worker(self.rgame, worker)
def move_helper_clean_up(self):
self.ui.move_stack.setCurrentWidget(self.ui.move_button_page)
self.move_game_pop_up.refresh_indicator()
self.is_moving = False
self.game_moving = None
self.ui.verify_button.setEnabled(True)
self.ui.move_button.setEnabled(True)
@pyqtSlot(RareGame, int, c_uint64, c_uint64)
def __on_move_progress(self, rgame: RareGame, progress: int, total_size: c_uint64, copied_size: c_uint64):
# lk: the check is NOT REQUIRED because signals are disconnected but protect against it anyway
if rgame is not self.rgame:
return
self.ui.move_progress.setValue(progress)
# This func does the needed UI changes, e.g. changing back to the initial move tool button and other stuff
def warn_no_space_left(self):
err_msg = QMessageBox()
err_msg.setText(self.tr("Out of space or unknown OS error occured."))
err_msg.exec()
self.move_helper_clean_up()
# Sets all needed variables to the new path.
def set_new_game(self, dest_path_with_suffix):
self.ui.install_path.setText(str(dest_path_with_suffix))
self.rgame.igame.install_path = str(dest_path_with_suffix)
self.core.lgd.set_installed_game(self.rgame.app_name, self.rgame.igame)
self.move_game_pop_up.install_path = self.rgame.igame.install_path
self.move_helper_clean_up()
# We need to re-show the menu, as after clicking on browse, the whole menu gets closed.
# Otherwise, the user would need to click on the move button again to open it again.
def show_menu_after_browse(self):
self.ui.move_button.showMenu()
@pyqtSlot(RareGame, str)
def __on_move_result(self, rgame: RareGame, dst_path: str):
QMessageBox.information(
self,
self.tr("Summary - {}").format(rgame.title),
self.tr("<b>{}</b> successfully moved to <b>{}<b>.").format(rgame.title, dst_path),
)
@pyqtSlot()
def __update_ui(self):
pass
def __update_widget(self):
""" React to state updates from RareGame """
# self.image.setPixmap(self.image_manager.get_pixmap(self.rgame.app_name, True))
self.image.setPixmap(self.rgame.pixmap)
self.ui.lbl_version.setDisabled(self.rgame.is_non_asset)
self.ui.version.setDisabled(self.rgame.is_non_asset)
self.ui.version.setText(
self.rgame.version if not self.rgame.is_non_asset else "N/A"
)
self.ui.lbl_install_size.setEnabled(self.rgame.is_installed and not self.rgame.is_non_asset)
self.ui.install_size.setEnabled(self.rgame.is_installed and not self.rgame.is_non_asset)
self.ui.install_size.setText(
get_size(self.rgame.igame.install_size) if self.rgame.is_installed and not self.rgame.is_non_asset else "N/A"
)
self.ui.lbl_install_path.setEnabled(self.rgame.is_installed and not self.rgame.is_non_asset)
self.ui.install_path.setEnabled(self.rgame.is_installed and not self.rgame.is_non_asset)
self.ui.install_path.setText(
self.rgame.igame.install_path if self.rgame.is_installed and not self.rgame.is_non_asset else "N/A"
)
self.ui.platform.setText(
self.rgame.igame.platform if self.rgame.is_installed and not self.rgame.is_non_asset else "Windows"
)
self.ui.lbl_grade.setDisabled(
self.rgame.is_unreal and platform.system() == "Windows"
)
self.ui.grade.setDisabled(
self.rgame.is_unreal and platform.system() == "Windows"
)
self.ui.grade.setText(self.steam_grade_ratings[self.rgame.steam_grade()])
self.ui.install_button.setEnabled(
(not self.rgame.is_installed or self.rgame.is_non_asset) and self.rgame.is_idle
)
self.ui.import_button.setEnabled(False)
self.ui.verify_button.setEnabled(
self.rgame.is_installed and (not self.rgame.is_non_asset) and self.rgame.is_idle
)
self.ui.verify_progress.setValue(self.rgame.progress if self.rgame.State == RareGame.State.VERIFYING else 0)
if self.rgame.state == RareGame.State.VERIFYING:
self.ui.verify_stack.setCurrentWidget(self.ui.verify_progress_page)
else:
self.ui.verify_stack.setCurrentWidget(self.ui.verify_button_page)
self.ui.repair_button.setEnabled(
self.rgame.is_installed and (not self.rgame.is_non_asset) and self.rgame.is_idle
and os.path.exists(os.path.join(self.core.lgd.get_tmp_path(), f"{self.rgame.app_name}.repair"))
and not self.args.offline
)
self.ui.move_button.setEnabled(
self.rgame.is_installed and (not self.rgame.is_non_asset) and self.rgame.is_idle
)
self.ui.move_progress.setValue(self.rgame.progress if self.rgame.state == RareGame.State.MOVING else 0)
if self.rgame.state == RareGame.State.MOVING:
self.ui.move_stack.setCurrentWidget(self.ui.move_progress_page)
else:
self.ui.move_stack.setCurrentWidget(self.ui.move_button_page)
self.ui.uninstall_button.setEnabled(
self.rgame.is_installed and (not self.rgame.is_non_asset) and self.rgame.is_idle
)
if self.rgame.is_installed and not self.rgame.is_non_asset:
self.ui.game_actions_stack.setCurrentWidget(self.ui.installed_page)
else:
self.ui.game_actions_stack.setCurrentWidget(self.ui.uninstalled_page)
@pyqtSlot(str)
@pyqtSlot(RareGame)
def update_game(self, rgame: Union[RareGame, str]):
if isinstance(rgame, str):
rgame = self.rcore.get_game(rgame)
def update_game(self, rgame: RareGame):
if self.rgame is not None:
if (worker := self.rgame.worker()) is not None:
if isinstance(worker, VerifyWorker):
try:
worker.signals.progress.disconnect(self.__on_verify_progress)
self.ui.verify_stack.setCurrentWidget(self.ui.verify_button_page)
except TypeError as e:
logger.warning(f"{self.rgame.app_name} verify worker: {e}")
if isinstance(worker, MoveWorker):
try:
worker.signals.progress.disconnect(self.__on_move_progress)
self.ui.move_stack.setCurrentWidget(self.ui.move_button_page)
except TypeError as e:
logger.warning(f"{self.rgame.app_name} move worker: {e}")
self.rgame.signals.widget.update.disconnect(self.__update_ui)
self.rgame.signals.game.installed.disconnect(self.update_game)
self.rgame.signals.game.uninstalled.disconnect(self.update_game)
self.rgame.signals.widget.update.disconnect(self.__update_widget)
self.rgame = None
rgame.signals.widget.update.connect(self.__update_ui)
rgame.signals.game.installed.connect(self.update_game)
rgame.signals.game.uninstalled.connect(self.update_game)
rgame.signals.widget.update.connect(self.__update_widget)
if (worker := rgame.worker()) is not None:
if isinstance(worker, VerifyWorker):
self.ui.verify_stack.setCurrentWidget(self.ui.verify_progress_page)
self.ui.verify_progress.setValue(rgame.progress)
worker.signals.progress.connect(self.__on_verify_progress)
else:
self.ui.verify_stack.setCurrentWidget(self.ui.verify_button_page)
if isinstance(worker, MoveWorker):
self.ui.move_stack.setCurrentWidget(self.ui.move_progress_page)
self.ui.move_progress.setValue(rgame.progress)
worker.signals.progress.connect(self.__on_move_progress)
else:
self.ui.move_stack.setCurrentWidget(self.ui.move_button_page)
self.title.setTitle(rgame.app_title)
self.image.setPixmap(rgame.pixmap)
self.ui.app_name.setText(rgame.app_name)
self.ui.version.setText(rgame.version)
self.ui.dev.setText(rgame.developer)
if rgame.igame:
self.ui.install_size.setText(get_size(rgame.igame.install_size))
self.ui.install_path.setText(rgame.igame.install_path)
self.ui.platform.setText(rgame.igame.platform)
else:
self.ui.install_size.setText("N/A")
self.ui.install_path.setText("N/A")
self.ui.platform.setText("Windows")
self.ui.install_size.setEnabled(bool(rgame.igame))
self.ui.lbl_install_size.setEnabled(bool(rgame.igame))
self.ui.install_path.setEnabled(bool(rgame.igame))
self.ui.lbl_install_path.setEnabled(bool(rgame.igame))
self.ui.uninstall_button.setEnabled(bool(rgame.igame))
self.ui.verify_button.setEnabled(bool(rgame.igame))
self.ui.repair_button.setEnabled(bool(rgame.igame))
if not rgame.is_installed or rgame.is_origin:
if rgame.is_non_asset:
self.ui.install_button.setText(self.tr("Link to Origin/Launch"))
self.ui.game_actions_stack.setCurrentWidget(self.ui.uninstalled_page)
if rgame.is_origin:
self.ui.version.setText("N/A")
self.ui.version.setEnabled(False)
self.ui.install_button.setText(self.tr("Link to Origin/Launch"))
else:
self.ui.install_button.setText(self.tr("Install Game"))
else:
if not self.args.offline:
self.ui.repair_button.setDisabled(
not os.path.exists(os.path.join(self.core.lgd.get_tmp_path(), f"{rgame.app_name}.repair"))
)
self.ui.game_actions_stack.setCurrentWidget(self.ui.installed_page)
self.ui.install_button.setText(self.tr("Install Game"))
grade_visible = not rgame.is_unreal and platform.system() != "Windows"
self.ui.grade.setVisible(grade_visible)
self.ui.lbl_grade.setVisible(grade_visible)
if platform.system() != "Windows" and not rgame.is_unreal:
self.ui.grade.setText(self.tr("Loading"))
# TODO: Handle result emitted after quickly changing between game information
steam_worker: SteamWorker = SteamWorker(self.core, rgame.app_name)
steam_worker.signals.rating.connect(self.ui.grade.setText)
QThreadPool.globalInstance().start(steam_worker)
self.ui.verify_button.setEnabled(rgame.is_idle)
self.ui.move_button.setEnabled(rgame.is_idle)
self.move_game_pop_up.update_game(rgame.app_name)
self.move_game_pop_up.update_game(rgame)
self.rgame = rgame
self.__update_widget()

View file

@ -1,12 +1,12 @@
import os
import shutil
from logging import getLogger
from pathlib import Path
from typing import Tuple
from typing import Tuple, Optional
from PyQt5.QtCore import pyqtSignal, Qt
from PyQt5.QtCore import pyqtSignal, Qt, pyqtSlot
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLabel, QFileDialog
from rare.models.game import RareGame
from rare.shared import LegendaryCoreSingleton
from rare.utils.extra_widgets import PathEdit
@ -20,132 +20,135 @@ class MoveGamePopUp(QWidget):
def __init__(self, parent=None):
super(MoveGamePopUp, self).__init__(parent=parent)
self.core = LegendaryCoreSingleton()
self.install_path = ""
self.move_path_edit = PathEdit("", QFileDialog.Directory, edit_func=self.edit_func_move_game)
self.move_path_edit.path_select.clicked.connect(self.emit_browse_done_signal)
self.rgame: Optional[RareGame] = None
self.move_button = QPushButton(self.tr("Move"))
self.move_button.setFixedSize(self.move_path_edit.path_select.sizeHint())
self.move_button.clicked.connect(self.emit_move_game_signal)
self.path_edit = PathEdit("", QFileDialog.Directory, edit_func=self.path_edit_cb)
self.path_edit.path_select.clicked.connect(self.browse_done)
self.warn_overwriting = QLabel()
self.button = QPushButton(self.tr("Move"))
self.button.setMinimumWidth(self.path_edit.path_select.sizeHint().width())
self.button.clicked.connect(lambda p: self.move_clicked.emit(self.path_edit.text()))
self.warn_label = QLabel(self)
middle_layout = QHBoxLayout()
middle_layout.setAlignment(Qt.AlignRight)
middle_layout.addWidget(self.warn_overwriting, stretch=1)
middle_layout.addWidget(self.move_button)
middle_layout.addWidget(self.warn_label, stretch=1)
middle_layout.addWidget(self.button)
bottom_layout = QVBoxLayout()
self.aval_space_label = QLabel()
self.req_space_label = QLabel()
self.aval_space_label = QLabel(self)
self.req_space_label = QLabel(self)
bottom_layout.addWidget(self.aval_space_label)
bottom_layout.addWidget(self.req_space_label)
layout: QVBoxLayout = QVBoxLayout()
layout.addWidget(self.move_path_edit)
layout: QVBoxLayout = QVBoxLayout(self)
layout.addWidget(self.path_edit)
layout.addLayout(middle_layout)
layout.addLayout(bottom_layout)
self.setLayout(layout)
def emit_move_game_signal(self):
self.move_clicked.emit(self.move_path_edit.text())
def emit_browse_done_signal(self):
self.browse_done.emit()
def refresh_indicator(self):
# needed so the edit_func gets run again
text = self.move_path_edit.text()
self.move_path_edit.setText(str())
self.move_path_edit.setText(text)
text = self.path_edit.text()
self.path_edit.setText(str())
self.path_edit.setText(text)
@staticmethod
def is_different_drive(dir1: str, dir2: str):
return os.stat(dir1).st_dev != os.stat(dir2).st_dev
def edit_func_move_game(self, dir_selected):
self.move_button.setEnabled(True)
self.warn_overwriting.setHidden(True)
def path_edit_cb(self, path: str):
self.button.setEnabled(True)
self.warn_label.setHidden(True)
def helper_func(reason: str) -> Tuple[bool, str, str]:
self.move_button.setEnabled(False)
return False, dir_selected, self.tr(reason)
self.button.setEnabled(False)
return False, path, self.tr(reason)
if not self.install_path or not dir_selected:
if not self.rgame.igame.install_path or not path:
return helper_func("You need to provide a directory.")
install_path = Path(self.install_path).resolve()
dest_path = Path(dir_selected).resolve()
dest_path_with_suffix = dest_path.joinpath(install_path.stem).resolve()
src_path = os.path.realpath(self.rgame.igame.install_path)
dst_path = os.path.realpath(path)
dst_install_path = os.path.realpath(os.path.join(dst_path, os.path.basename(src_path)))
if not dest_path.is_dir():
if not os.path.isdir(dst_path):
return helper_func("Directory doesn't exist or file selected.")
# Get free space on drive and size of game folder
_, _, free_space = shutil.disk_usage(dest_path)
source_size = sum(f.stat().st_size for f in install_path.glob("**/*") if f.is_file())
_, _, free_space = shutil.disk_usage(dst_path)
source_size = sum(
os.stat(os.path.join(dp, f)).st_size
for dp, dn, filenames in os.walk(src_path)
for f in filenames
)
# Calculate from bytes to gigabytes
free_space_dest_drive = round(free_space / 1000 ** 3, 2)
free_space = round(free_space / 1000 ** 3, 2)
source_size = round(source_size / 1000 ** 3, 2)
self.aval_space_label.setText(self.tr("Available space: {}GB".format(free_space_dest_drive)))
self.aval_space_label.setText(self.tr("Available space: {}GB".format(free_space)))
self.req_space_label.setText(self.tr("Required space: {}GB").format(source_size))
if not os.access(dir_selected, os.W_OK) or not os.access(self.install_path, os.W_OK):
if not os.access(path, os.W_OK) or not os.access(self.rgame.igame.install_path, os.W_OK):
return helper_func("No write permission on destination path/current install path.")
if install_path == dest_path or install_path == dest_path_with_suffix:
if src_path == dst_path or src_path == dst_install_path:
return helper_func("Same directory or parent directory selected.")
if str(install_path) in str(dest_path):
if str(src_path) in str(dst_path):
return helper_func("You can't select a directory that is inside the current install path.")
if str(dest_path_with_suffix) in str(install_path):
if str(dst_install_path) in str(src_path):
return helper_func("You can't select a directory which contains the game installation.")
for game in self.core.get_installed_list():
if game.install_path in dir_selected:
if game.install_path in path:
return helper_func("Game installations cannot be nested due to unintended sideeffects.")
is_existing_dir = is_game_dir(install_path, dest_path_with_suffix)
is_existing_dir = is_game_dir(src_path, dst_install_path)
for i in dest_path.iterdir():
if install_path.stem in i.stem:
if dest_path_with_suffix.is_dir():
for item in os.listdir(dst_path):
if os.path.basename(src_path) in os.path.basename(item):
if os.path.isdir(dst_install_path):
if not is_existing_dir:
self.warn_overwriting.setHidden(False)
elif dest_path_with_suffix.is_file():
self.warn_overwriting.setHidden(False)
self.warn_label.setHidden(False)
elif os.path.isfile(dst_install_path):
self.warn_label.setHidden(False)
if free_space_dest_drive <= source_size and not is_existing_dir:
if free_space <= source_size and not is_existing_dir:
return helper_func("Not enough space available on drive.")
# Fallback
self.move_button.setEnabled(True)
return True, dir_selected, str()
self.button.setEnabled(True)
return True, path, str()
def update_game(self, app_name):
igame = self.core.get_installed_game(app_name, skip_sync=True)
if igame is None:
@pyqtSlot()
def __update_widget(self):
""" React to state updates from RareGame """
if not self.rgame.is_installed and not self.rgame.is_non_asset:
self.setDisabled(True)
return
self.install_path = igame.install_path
# FIXME: Make edit_func lighter instead of blocking signals
self.move_path_edit.line_edit.blockSignals(True)
self.move_path_edit.setText(igame.install_path)
self.path_edit.line_edit.blockSignals(True)
self.path_edit.setText(self.rgame.igame.install_path)
# FIXME: Make edit_func lighter instead of blocking signals
self.move_path_edit.line_edit.blockSignals(False)
self.warn_overwriting.setText(
self.tr("Moving here will overwrite the dir/file {}/").format(Path(self.install_path).stem)
self.path_edit.line_edit.blockSignals(False)
self.warn_label.setText(
self.tr("<b>Moving<b> will overwrite <b>{}<b>").format(os.path.basename(self.rgame.install_path))
)
self.refresh_indicator()
def update_game(self, rgame: RareGame):
if self.rgame is not None:
self.rgame.signals.widget.update.disconnect(self.__update_widget)
self.rgame = None
rgame.signals.widget.update.connect(self.__update_widget)
self.rgame = rgame
self.__update_widget()
def is_game_dir(install_path: Path, dest_path: Path):
def is_game_dir(src_path: str, dst_path: str):
# This iterates over the destination dir, then iterates over the current install dir and if the file names
# matches, we have an exisiting dir
if dest_path.is_dir():
for file in dest_path.iterdir():
for install_file in install_path.iterdir():
if file.name == install_file.name:
if os.path.isdir(dst_path):
for dst_file in os.listdir(dst_path):
for src_file in os.listdir(src_path):
if dst_file == src_file:
return True
return False

View file

@ -280,6 +280,9 @@ class RareGame(RareGameSlim):
with open(os.path.join(data_dir(), "game_meta.json"), "w") as metadata_json:
json.dump(metadata, metadata_json, indent=2)
def store_igame(self):
self.core.lgd.set_installed_game(self.app_name, self.igame)
def update_game(self):
self.game = self.core.get_game(
self.app_name, update_meta=False, platform=self.igame.platform if self.igame else "Windows"
@ -428,6 +431,7 @@ class RareGame(RareGameSlim):
ret = False
return ret
@property
def needs_verification(self) -> bool:
"""!
@ -444,7 +448,7 @@ class RareGame(RareGameSlim):
return False
@needs_verification.setter
def needs_verification(self, not_update: bool) -> None:
def needs_verification(self, needs: bool) -> None:
"""!
@brief Sets the verification status of a game.
@ -452,11 +456,12 @@ class RareGame(RareGameSlim):
named like this. After the verification, set this to 'False'
to update the InstalledGame in the widget.
@param not_update If the game requires verification
@param needs If the game requires verification
@return None
"""
if not not_update:
self.igame = self.core.get_installed_game(self.game.app_name)
self.igame.needs_verification = needs
self.store_igame()
self.update_igame()
@property
def is_dlc(self) -> bool:
@ -515,6 +520,16 @@ class RareGame(RareGameSlim):
return self.game.metadata.get("customAttributes", {}).get("CloudSaveFolder", {}).get("value")
return ""
@property
def install_path(self) -> str:
return self.igame.install_path
@install_path.setter
def install_path(self, path: str) -> None:
self.igame.install_path = path
self.store_igame()
self.update_igame()
def steam_grade(self) -> str:
if platform.system() == "Windows" or self.is_unreal:
return "na"

View file

@ -1,121 +1,137 @@
import os
import shutil
from ctypes import c_uint64
from logging import getLogger
from pathlib import Path
from PyQt5.QtCore import pyqtSignal, QObject
from legendary.lfs.utils import validate_files
from legendary.models.game import VerifyResult, InstalledGame
from legendary.models.game import VerifyResult
from rare.models.game import RareGame
from rare.lgndr.core import LegendaryCore
from .worker import QueueWorker, QueueWorkerInfo
logger = getLogger("MoveWorker")
# noinspection PyUnresolvedReferences
class MoveWorker(QueueWorker):
class Signals(QObject):
progress = pyqtSignal(int)
result = pyqtSignal(str)
error = pyqtSignal()
# int: percentage, c_uint64: source size, c_uint64: dest size
progress = pyqtSignal(RareGame, int, c_uint64, c_uint64)
# str: destination path
result = pyqtSignal(RareGame, str)
# str: error message
error = pyqtSignal(RareGame, str)
def __init__(
self,
core: LegendaryCore,
install_path: str,
dest_path: Path,
is_existing_dir: bool,
igame: InstalledGame,
):
def __init__(self, core: LegendaryCore, rgame: RareGame, dst_path: str, dst_exists: bool):
super(MoveWorker, self).__init__()
self.signals = MoveWorker.Signals()
self.core = core
self.install_path = install_path
self.dest_path = dest_path
self.source_size = 0
self.dest_size = 0
self.is_existing_dir = is_existing_dir
self.igame = igame
self.file_list = None
self.total: int = 0
self.rgame = rgame
self.dst_path = dst_path
self.dst_exists = dst_exists
def worker_info(self) -> QueueWorkerInfo:
return QueueWorkerInfo(
app_name=self.rgame.app_name, app_title=self.rgame.app_title, worker_type="Move", state=self.state
)
def run_real(self):
root_directory = Path(self.install_path)
self.source_size = sum(f.stat().st_size for f in root_directory.glob("**/*") if f.is_file())
def progress(self, src_size, dst_size):
progress = dst_size * 100 // src_size
self.rgame.signals.progress.update.emit(progress)
self.signals.progress.emit(self.rgame, progress, c_uint64(src_size), c_uint64(dst_size))
def run_real(self):
self.rgame.signals.progress.start.emit()
if os.stat(self.rgame.install_path).st_dev == os.stat(os.path.dirname(self.dst_path)).st_dev:
shutil.move(self.rgame.install_path, self.dst_path)
elif not self.dst_exists:
src_size = sum(
os.stat(os.path.join(dp, f)).st_size
for dp, dn, filenames in os.walk(self.rgame.install_path)
for f in filenames
)
dst_size = 0
def copy_with_progress(src, dst):
nonlocal dst_size
shutil.copy2(src, dst)
dst_size += os.stat(src).st_size
self.progress(src_size, dst_size)
# if game dir is not existing, just copying:
if not self.is_existing_dir:
shutil.copytree(
self.install_path,
self.dest_path,
copy_function=self.copy_each_file_with_progress,
self.rgame.install_path,
self.dst_path,
copy_function=copy_with_progress,
dirs_exist_ok=True,
)
shutil.rmtree(self.rgame.install_path)
else:
manifest_data, _ = self.core.get_installed_manifest(self.igame.app_name)
manifest_data, _ = self.core.get_installed_manifest(self.rgame.app_name)
manifest = self.core.load_manifest(manifest_data)
files = sorted(
manifest.file_manifest_list.elements,
key=lambda a: a.filename.lower(),
)
self.file_list = [(f.filename, f.sha_hash.hex()) for f in files]
self.total = len(self.file_list)
if config_tags := self.core.lgd.config.get(self.rgame.app_name, 'install_tags', fallback=None):
install_tags = set(i.strip() for i in config_tags.split(','))
file_list = [
(f.filename, f.sha_hash.hex())
for f in files
if any(it in install_tags for it in f.install_tags) or not f.install_tags
]
else:
file_list = [(f.filename, f.sha_hash.hex()) for f in files]
total_size = sum(manifest.file_manifest_list.get_file_by_path(fm[0]).file_size
for fm in file_list)
dst_size = 0
# This method is a copy_func, and only copies the src if it's a dir.
# Thus, it can be used to re-create the dir structure.
def copy_dir_structure(src, dst):
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
if os.path.isdir(src):
shutil.copyfile(src, dst)
shutil.copystat(src, dst)
return dst
# recreate dir structure
shutil.copytree(
self.install_path,
self.dest_path,
copy_function=self.copy_dir_structure,
self.rgame.install_path,
self.dst_path,
copy_function=copy_dir_structure,
dirs_exist_ok=True,
)
for i, (result, relative_path, _, _) in enumerate(
validate_files(str(self.dest_path), self.file_list)
):
dst_path = f"{self.dest_path}/{relative_path}"
src_path = f"{self.install_path}/{relative_path}"
if Path(src_path).is_file():
if result == VerifyResult.HASH_MISMATCH:
for result, path, _, _ in validate_files(self.dst_path, file_list):
dst_path = os.path.join(self.dst_path, path)
src_path = os.path.join(self.rgame.install_path, path)
if os.path.isfile(src_path):
if result == VerifyResult.HASH_MATCH:
dst_size += os.stat(dst_path).st_size
if result == VerifyResult.HASH_MISMATCH or result == VerifyResult.FILE_MISSING:
try:
shutil.copy(src_path, dst_path)
except IOError:
self.signals.no_space_left.emit()
shutil.copy2(src_path, dst_path)
dst_size += os.stat(dst_path).st_size
except (IOError, OSError) as e:
self.rgame.signals.progress.finish.emit(True)
self.signals.error.emit(self.rgame, str(e))
return
elif result == VerifyResult.FILE_MISSING:
try:
shutil.copy(src_path, dst_path)
except (IOError, OSError):
self.signals.no_space_left.emit()
return
elif result == VerifyResult.OTHER_ERROR:
else:
logger.warning(f"Copying file {src_path} to {dst_path} failed")
self.signals.progress.emit(int(i * 10 / self.total * 10))
self.progress(total_size, dst_size)
else:
logger.warning(
f"Source dir does not have file {src_path}. File will be missing in the destination "
f"dir. "
f"Source dir does not have file {src_path}. File will be missing in the destination dir."
)
self.rgame.needs_verification = True
shutil.rmtree(self.rgame.install_path)
shutil.rmtree(self.install_path)
self.signals.finished.emit(str(self.dest_path))
self.rgame.install_path = self.dst_path
def copy_each_file_with_progress(self, src, dst):
shutil.copy(src, dst)
self.dest_size += Path(src).stat().st_size
self.signals.progress.emit(int(self.dest_size * 10 / self.source_size * 10))
# This method is a copy_func, and only copies the src if it's a dir.
# Thus, it can be used to re-create the dir strucute.
@staticmethod
def copy_dir_structure(src, dst):
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
if os.path.isdir(src):
shutil.copyfile(src, dst)
shutil.copystat(src, dst)
return dst
self.rgame.signals.progress.finish.emit(False)
self.signals.result.emit(self.rgame, self.dst_path)

View file

@ -4,7 +4,6 @@ import os
from datetime import date
import requests
from PyQt5.QtCore import pyqtSignal, QRunnable, QObject, QCoreApplication
from rare.lgndr.core import LegendaryCore
from rare.utils.paths import data_dir, cache_dir
@ -13,33 +12,6 @@ replace_chars = ",;.:-_ "
url = "https://api.steampowered.com/ISteamApps/GetAppList/v2/"
class SteamWorker(QRunnable):
class Signals(QObject):
rating = pyqtSignal(str)
def __init__(self, core: LegendaryCore, app_name: str):
super(SteamWorker, self).__init__()
self.signals = SteamWorker.Signals()
self.core = core
self.app_name = app_name
_tr = QCoreApplication.translate
self.ratings = {
"platinum": _tr("SteamWorker", "Platinum"),
"gold": _tr("SteamWorker", "Gold"),
"silver": _tr("SteamWorker", "Silver"),
"bronze": _tr("SteamWorker", "Bronze"),
"fail": _tr("SteamWorker", "Could not get grade"),
"borked": _tr("SteamWorker", "Borked"),
"pending": _tr("SteamWorker", "Loading..."),
}
def run(self) -> None:
self.signals.rating.emit(
self.ratings.get(get_rating(self.core, self.app_name), self.ratings["fail"])
)
self.signals.deleteLater()
__steam_ids_json = None
__grades_json = None