1
0
Fork 0
mirror of synced 2024-06-23 08:40:45 +12:00

MoveGameAcrossDrive: Implementation

This commit is contained in:
aznd 2022-04-22 00:28:33 +02:00
parent 405379ef3b
commit ca25e9ad7c

View file

@ -5,9 +5,15 @@ from pathlib import Path
from logging import getLogger
from typing import Tuple
from PyQt5.QtCore import Qt, pyqtSignal, QThreadPool, pyqtSlot
from PyQt5.QtCore import (
QObject,
QRunnable,
Qt,
pyqtSignal,
QThreadPool,
pyqtSlot,
)
from PyQt5.QtWidgets import (
QCheckBox,
QFileDialog,
QHBoxLayout,
QLabel,
@ -20,7 +26,8 @@ from PyQt5.QtWidgets import (
QWidgetAction,
)
from legendary.models.game import Game, InstalledGame
from legendary.models.game import Game, InstalledGame, VerifyResult
from rare.legendary.legendary.utils.lfs import validate_files
from rare.shared import (
LegendaryCoreSingleton,
GlobalSignalsSingleton,
@ -73,15 +80,20 @@ class GameInfo(QWidget, Ui_GameInfo):
else:
self.repair_button.clicked.connect(self.repair)
self.install_button.clicked.connect(
lambda: self.game_utils.launch_game(self.game.app_name)
)
self.install_button.clicked.connect(lambda: self.game_utils.launch_game(self.game.app_name))
self.move_game_pop_up = MoveGamePopUp()
self.move_action = QWidgetAction(self)
self.move_action.setDefaultWidget(self.move_game_pop_up)
self.move_button.setMenu(QMenu())
self.move_button.menu().addAction(self.move_action)
self.progress_of_moving = QProgressBar()
self.existing_game_dir = False
self.is_moving = False
self.game_moving = None
self.dest_path_with_suffix = None
self.widget_container = QWidget()
box_layout = QHBoxLayout()
box_layout.setContentsMargins(0, 0, 0, 0)
@ -89,6 +101,7 @@ class GameInfo(QWidget, Ui_GameInfo):
self.widget_container.setLayout(box_layout)
index = self.move_stack.addWidget(self.widget_container)
self.move_stack.setCurrentIndex(index)
self.move_game_pop_up.move_clicked.connect(self.move_button.menu().close)
self.move_game_pop_up.move_clicked.connect(self.move_game)
self.move_game_pop_up.browse_done.connect(self.show_menu_after_browse)
@ -98,9 +111,7 @@ class GameInfo(QWidget, Ui_GameInfo):
self.uninstalled.emit(self.game.app_name)
def repair(self):
repair_file = os.path.join(
self.core.lgd.get_tmp_path(), f"{self.game.app_name}.repair"
)
repair_file = os.path.join(self.core.lgd.get_tmp_path(), f"{self.game.app_name}.repair")
if not os.path.exists(repair_file):
QMessageBox.warning(
self,
@ -120,9 +131,7 @@ class GameInfo(QWidget, Ui_GameInfo):
QMessageBox.warning(
self,
"Warning",
self.tr("Installation path of {} does not exist. Cannot verify").format(
self.igame.title
),
self.tr("Installation path of {} does not exist. Cannot verify").format(self.igame.title),
)
return
self.verify_widget.setCurrentIndex(1)
@ -132,6 +141,7 @@ class GameInfo(QWidget, Ui_GameInfo):
self.verify_progress.setValue(0)
self.verify_threads[self.game.app_name] = verify_worker
self.verify_pool.start(verify_worker)
self.move_button.setEnabled(False)
def verify_statistics(self, num, total, app_name):
# checked, max, app_name
@ -165,57 +175,104 @@ class GameInfo(QWidget, Ui_GameInfo):
)
if ans == QMessageBox.Yes:
self.signals.install_game.emit(
InstallOptionsModel(
app_name=self.game.app_name, repair=True, update=True
)
InstallOptionsModel(app_name=self.game.app_name, repair=True, update=True)
)
self.verify_widget.setCurrentIndex(0)
self.verify_threads.pop(app_name)
self.move_button.setEnabled(True)
self.verify_button.setEnabled(True)
@pyqtSlot(str)
def move_game(self, destination_path):
destination_path = Path(destination_path)
def move_game(self, dest_path):
dest_path = Path(dest_path)
install_path = Path(self.igame.install_path)
destination_path_with_suffix = destination_path.joinpath(
install_path.stem
self.dest_path_with_suffix = dest_path.joinpath(install_path.stem)
if self.dest_path_with_suffix.is_dir():
self.existing_game_dir = is_game_dir(install_path, self.dest_path_with_suffix)
if not self.existing_game_dir:
for i in dest_path.iterdir():
if install_path.stem in i.stem:
warn_msg = QMessageBox()
warn_msg.setText(self.tr("Destination file/directory exists."))
warn_msg.setInformativeText(
self.tr("Do you really want to overwrite it? This will delete {}").format(
self.dest_path_with_suffix
)
)
warn_msg.addButton(QPushButton(self.tr("Yes")), QMessageBox.YesRole)
warn_msg.addButton(QPushButton(self.tr("No")), QMessageBox.NoRole)
response = warn_msg.exec()
if response == 0:
# Not using pathlib, since we can't delete not-empty folders. With shutil we can.
if self.dest_path_with_suffix.is_dir():
shutil.rmtree(self.dest_path_with_suffix)
else:
self.dest_path_with_suffix.unlink()
else:
return
self.move_stack.addWidget(self.progress_of_moving)
self.move_stack.setCurrentWidget(self.progress_of_moving)
self.game_moving = self.igame.app_name
self.is_moving = True
self.verify_button.setEnabled(False)
if self.move_game_pop_up.find_mount(dest_path) != self.move_game_pop_up.find_mount(install_path):
# Destination dir on different drive
self.start_copy_diff_drive()
else:
# Destination dir on same drive
shutil.move(self.igame.install_path, dest_path)
self.set_new_game(self.dest_path_with_suffix)
def update_progressbar(self, progress_int):
self.progress_of_moving.setValue(progress_int)
def start_copy_diff_drive(self):
copy_worker = CopyGameInstallation(
install_path=self.igame.install_path,
dest_path=self.dest_path_with_suffix,
is_existing_dir=self.existing_game_dir,
igame=self.igame,
)
progress_of_moving = QProgressBar(self)
progress_of_moving.setValue(0)
copy_worker.signals.progress.connect(self.update_progressbar)
copy_worker.signals.finished.connect(self.set_new_game)
copy_worker.signals.no_space_left.connect(self.warn_no_space_left)
QThreadPool.globalInstance().start(copy_worker)
for i in destination_path.iterdir():
if install_path.stem in i.stem:
warn_msg = QMessageBox()
warn_msg.setText(self.tr("Destination file/directory exists."))
warn_msg.setInformativeText(
self.tr(
"Do you really want to overwrite it? This will delete {}"
).format(destination_path_with_suffix)
)
warn_msg.addButton(QPushButton(self.tr("Yes")), QMessageBox.YesRole)
warn_msg.addButton(QPushButton(self.tr("No")), QMessageBox.NoRole)
def move_helper_clean_up(self):
self.move_stack.setCurrentWidget(self.move_button)
self.move_game_pop_up.refresh_indicator()
self.is_moving = False
self.game_moving = None
self.verify_button.setEnabled(True)
self.move_button.setEnabled(True)
response = warn_msg.exec()
# This func does the needed UI changes, e.g. changing back to the initial move tool button and other stuff
def warn_no_space_left(self):
err_msg = QMessageBox()
err_msg.setText(self.tr("Out of space or unknown OS error occured."))
err_msg.exec()
self.move_helper_clean_up()
if response == 0:
# Not using pathlib, since we can't delete not-empty folders. With shutil we can.
if destination_path_with_suffix.is_dir():
shutil.rmtree(destination_path_with_suffix)
else:
destination_path_with_suffix.unlink()
else:
return
shutil.move(self.igame.install_path, destination_path)
self.install_path.setText(str(destination_path_with_suffix))
self.igame.install_path = str(destination_path_with_suffix)
# Sets all needed variables to the new path.
def set_new_game(self, dest_path_with_suffix):
self.install_path.setText(str(dest_path_with_suffix))
self.igame.install_path = str(dest_path_with_suffix)
self.core.lgd.set_installed_game(self.igame.app_name, self.igame)
self.move_game_pop_up.install_path = self.igame.install_path
self.move_game_pop_up.refresh_indicator()
progress_of_moving.setValue(100)
self.move_helper_clean_up()
# We need to re-show the menu, as after clicking on browse, the whole menu gets closed.
# Otherwise, the user would need to click on the move button again to open it again.
def show_menu_after_browse(self):
self.move_button.showMenu()
@ -235,9 +292,7 @@ class GameInfo(QWidget, Ui_GameInfo):
if self.igame:
self.version.setText(self.igame.version)
else:
self.version.setText(
self.game.app_version(self.igame.platform if self.igame else "Windows")
)
self.version.setText(self.game.app_version(self.igame.platform if self.igame else "Windows"))
self.dev.setText(self.game.metadata["developer"])
if self.igame:
@ -276,9 +331,7 @@ class GameInfo(QWidget, Ui_GameInfo):
self.steam_worker.set_app_name(self.game.app_name)
QThreadPool.globalInstance().start(self.steam_worker)
if len(self.verify_threads.keys()) == 0 or not self.verify_threads.get(
self.game.app_name
):
if len(self.verify_threads.keys()) == 0 or not self.verify_threads.get(self.game.app_name):
self.verify_widget.setCurrentIndex(0)
elif self.verify_threads.get(self.game.app_name):
self.verify_widget.setCurrentIndex(1)
@ -289,6 +342,25 @@ class GameInfo(QWidget, Ui_GameInfo):
* 100
)
)
# If the game that is currently moving matches with the current app_name, we show the progressbar.
# Otherwhise, we show the move tool button.
if self.igame is not None:
if self.game_moving == self.igame.app_name:
index = self.move_stack.addWidget(self.progress_of_moving)
self.move_stack.setCurrentIndex(index)
else:
index = self.move_stack.addWidget(self.move_button)
self.move_stack.setCurrentIndex(index)
# If a game is verifying or moving, disable both verify and moving buttons.
if len(self.verify_threads):
self.verify_button.setEnabled(False)
self.move_button.setEnabled(False)
if self.is_moving:
self.move_button.setEnabled(False)
self.verify_button.setEnabled(False)
self.move_game_pop_up.update_game(app_name)
@ -301,9 +373,7 @@ class MoveGamePopUp(QWidget):
layout: QVBoxLayout = QVBoxLayout()
self.install_path = str()
self.core = LegendaryCoreSingleton()
self.move_path_edit = PathEdit(
str(), QFileDialog.Directory, edit_func=self.edit_func_move_game
)
self.move_path_edit = PathEdit(str(), QFileDialog.Directory, edit_func=self.edit_func_move_game)
self.move_path_edit.path_select.clicked.connect(self.emit_browse_done_signal)
self.move_game = QPushButton(self.tr("Move"))
@ -312,12 +382,19 @@ class MoveGamePopUp(QWidget):
self.warn_overwriting = QLabel()
bottom_layout = QHBoxLayout()
bottom_layout.setAlignment(Qt.AlignRight)
bottom_layout.addWidget(self.warn_overwriting, stretch=1)
bottom_layout.addWidget(self.move_game)
middle_layout = QHBoxLayout()
middle_layout.setAlignment(Qt.AlignRight)
middle_layout.addWidget(self.warn_overwriting, stretch=1)
middle_layout.addWidget(self.move_game)
bottom_layout = QVBoxLayout()
self.aval_space_label = QLabel()
self.req_space_label = QLabel()
bottom_layout.addWidget(self.aval_space_label)
bottom_layout.addWidget(self.req_space_label)
layout.addWidget(self.move_path_edit)
layout.addLayout(middle_layout)
layout.addLayout(bottom_layout)
self.setLayout(layout)
@ -335,7 +412,8 @@ class MoveGamePopUp(QWidget):
self.move_path_edit.setText(text)
# Thanks to lk.
def find_mount(self, path):
@staticmethod
def find_mount(path):
mount_point = path
while path != path.anchor:
if path.is_mount():
@ -345,7 +423,9 @@ class MoveGamePopUp(QWidget):
return mount_point
def edit_func_move_game(self, dir_selected):
self.move_game.setEnabled(True)
self.warn_overwriting.setHidden(True)
def helper_func(reason: str) -> Tuple[bool, str, str]:
self.move_game.setEnabled(False)
return False, dir_selected, self.tr(reason)
@ -353,54 +433,52 @@ class MoveGamePopUp(QWidget):
if not self.install_path or not dir_selected:
return helper_func("You need to provide a directory.")
current_path = Path(self.install_path).resolve()
destination_path = Path(dir_selected).resolve()
destination_path_with_suffix = destination_path.joinpath(
current_path.stem
).resolve()
install_path = Path(self.install_path).resolve()
dest_path = Path(dir_selected).resolve()
dest_path_with_suffix = dest_path.joinpath(install_path.stem).resolve()
if not destination_path.is_dir():
if not dest_path.is_dir():
return helper_func("Directory doesn't exist or file selected.")
# Get free space on drive and size of game folder
stat = os.statvfs(dest_path)
free_space_dest_drive = stat.f_bavail * stat.f_frsize
source_size = sum(f.stat().st_size for f in install_path.glob("**/*") if f.is_file())
# Calculate from bytes to gigabytes
free_space_dest_drive = round(free_space_dest_drive / 1000**3, 2)
source_size = round(source_size / 1000**3, 2)
self.aval_space_label.setText(self.tr("Available space on disk: {}GB".format(free_space_dest_drive)))
self.req_space_label.setText(self.tr("Required space: {}GB").format(source_size))
if not os.access(dir_selected, os.W_OK) or not os.access(self.install_path, os.W_OK):
return helper_func("No write permission on destination path/current install path.")
if (
current_path == destination_path
or current_path == destination_path_with_suffix
):
if install_path == dest_path or install_path == dest_path_with_suffix:
return helper_func("Same directory or parent directory selected.")
if str(current_path) in str(destination_path):
return helper_func(
"You can't select a directory that is inside the current install path."
)
if str(install_path) in str(dest_path):
return helper_func("You can't select a directory that is inside the current install path.")
if str(destination_path_with_suffix) in str(current_path):
return helper_func(
"You can't select a directory which contains the game installation."
)
if not platform.system() == "Windows":
if self.find_mount(destination_path) != self.find_mount(current_path):
return helper_func(
"Moving to a different drive is currently not supported."
)
else:
if current_path.drive != destination_path.drive:
return helper_func(
"Moving to a different drive is currently not supported."
)
if str(dest_path_with_suffix) in str(install_path):
return helper_func("You can't select a directory which contains the game installation.")
for game in self.core.get_installed_list():
if game.install_path in dir_selected:
return helper_func(
"Game installations cannot be nested due to unintended sideeffects."
)
return helper_func("Game installations cannot be nested due to unintended sideeffects.")
for i in destination_path.iterdir():
if current_path.stem in i.stem:
self.warn_overwriting.setHidden(False)
is_existing_dir = is_game_dir(install_path, dest_path_with_suffix)
for i in dest_path.iterdir():
if install_path.stem in i.stem:
if dest_path_with_suffix.is_dir():
if not is_existing_dir:
self.warn_overwriting.setHidden(False)
elif dest_path_with_suffix.is_file():
self.warn_overwriting.setHidden(False)
if free_space_dest_drive <= source_size and not is_existing_dir:
return helper_func("Not enough space available on drive.")
# Fallback
self.move_game.setEnabled(True)
@ -412,4 +490,119 @@ class MoveGamePopUp(QWidget):
return
self.install_path = igame.install_path
self.move_path_edit.setText(igame.install_path)
self.warn_overwriting.setText(self.tr("Moving here will overwrite the dir/file {}/").format(Path(self.install_path).stem))
self.warn_overwriting.setText(
self.tr("Moving here will overwrite the dir/file {}/").format(Path(self.install_path).stem)
)
class CopyGameInstallation(QRunnable):
class Signals(QObject):
progress = pyqtSignal(int)
finished = pyqtSignal(str)
no_space_left = pyqtSignal()
def __init__(
self,
install_path: Path,
dest_path: Path,
is_existing_dir: bool,
igame: InstalledGame,
):
super(CopyGameInstallation, self).__init__()
self.signals = CopyGameInstallation.Signals()
self.install_path = str(install_path)
self.dest_path = dest_path
self.source_size = 0
self.dest_size = 0
self.is_existing_dir = is_existing_dir
self.core = LegendaryCoreSingleton()
self.igame = igame
self.file_list = None
self.total: int = 0
def run(self):
root_directory = Path(self.install_path)
self.source_size = sum(f.stat().st_size for f in root_directory.glob("**/*") if f.is_file())
# if game dir is not existing, just copying:
if not self.is_existing_dir:
shutil.copytree(
self.install_path,
self.dest_path,
copy_function=self.copy_each_file_with_progress,
dirs_exist_ok=True,
)
else:
manifest_data, _ = self.core.get_installed_manifest(self.igame.app_name)
manifest = self.core.load_manifest(manifest_data)
files = sorted(
manifest.file_manifest_list.elements,
key=lambda a: a.filename.lower(),
)
self.file_list = [(f.filename, f.sha_hash.hex()) for f in files]
self.total = len(self.file_list)
# recreate dir structure
shutil.copytree(
self.install_path,
self.dest_path,
copy_function=self.copy_dir_structure,
dirs_exist_ok=True,
)
for i, (result, relative_path, _, _) in enumerate(
validate_files(str(self.dest_path), self.file_list)
):
dst_path = f"{self.dest_path}/{relative_path}"
src_path = f"{self.install_path}/{relative_path}"
if Path(src_path).is_file():
if result == VerifyResult.HASH_MISMATCH:
try:
shutil.copy(src_path, dst_path)
except IOError:
self.signals.no_space_left.emit()
return
elif result == VerifyResult.FILE_MISSING:
try:
shutil.copy(src_path, dst_path)
except (IOError, OSError):
self.signals.no_space_left.emit()
return
elif result == VerifyResult.OTHER_ERROR:
logger.warning(f"Copying file {src_path} to {dst_path} failed")
self.signals.progress.emit(int(i * 10 / self.total * 10))
else:
logger.warning(
f"Source dir does not have file {src_path}. File will be missing in the destination "
f"dir. "
)
shutil.rmtree(self.install_path)
self.signals.finished.emit(str(self.dest_path))
def copy_each_file_with_progress(self, src, dst):
shutil.copy(src, dst)
self.dest_size += Path(src).stat().st_size
self.signals.progress.emit(int(self.dest_size * 10 / self.source_size * 10))
# This method is a copy_func, and only copies the src if it's a dir.
# Thus, it can be used to re-create the dir strucute.
@staticmethod
def copy_dir_structure(src, dst):
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
if os.path.isdir(src):
shutil.copyfile(src, dst)
shutil.copystat(src, dst)
return dst
def is_game_dir(install_path: Path, dest_path: Path):
# This iterates over the destination dir, then iterates over the current install dir and if the file names
# matches, we have an exisiting dir
if dest_path.is_dir():
for file in dest_path.iterdir():
for install_file in install_path.iterdir():
if file.name == install_file.name:
return True
return False