1
0
Fork 0
mirror of synced 2024-06-07 21:24:41 +12:00

Move import games to separate thread

This commit is contained in:
Dummerle 2022-07-01 17:43:25 +02:00
parent 2f43431589
commit 667ca0ecb6
No known key found for this signature in database
GPG key ID: AB68CC59CA39F2F1

View file

@ -1,12 +1,13 @@
import json
import os
from dataclasses import dataclass
from logging import getLogger
from pathlib import Path
from typing import List, Tuple, Optional
from PyQt5.QtCore import Qt, QModelIndex, pyqtSignal
from PyQt5.QtCore import Qt, QModelIndex, pyqtSignal, QRunnable, QObject, QThreadPool
from PyQt5.QtGui import QStandardItemModel
from PyQt5.QtWidgets import QFileDialog, QGroupBox, QCompleter, QTreeView, QHeaderView
from PyQt5.QtWidgets import QFileDialog, QGroupBox, QCompleter, QTreeView, QHeaderView, QApplication, QMessageBox
from rare.shared import LegendaryCoreSingleton, GlobalSignalsSingleton, ApiResultsSingleton
from rare.ui.components.tabs.games.import_sync.import_group import Ui_ImportGroup
@ -16,6 +17,91 @@ from rare.utils.extra_widgets import IndicatorLineEdit, PathEdit
logger = getLogger("Import")
def find_app_name(path: str, core) -> Optional[str]:
if os.path.exists(os.path.join(path, ".egstore")):
for i in os.listdir(os.path.join(path, ".egstore")):
if i.endswith(".mancpn"):
file = os.path.join(path, ".egstore", i)
return json.load(open(file, "r")).get("AppName")
elif app_name := legendary_utils.resolve_aliases(
core, os.path.basename(os.path.normpath(path))):
# return None if game does not exist (Workaround for overlay)
if not core.get_game(app_name):
return None
return app_name
else:
logger.warning(f"Could not find AppName for {path}")
return None
@dataclass
class ResultGame:
app_name: str
error: Optional[str] = None
@dataclass
class Result:
successful_games: List[ResultGame] = None
failed_games: List[ResultGame] = None
error_messages: List[str] = None
class ImportWorker(QRunnable):
class Signals(QObject):
finished = pyqtSignal(Result)
progress = pyqtSignal(int)
def __init__(self, path: str, import_folder: bool = False, app_name: str = None):
super(ImportWorker, self).__init__()
self.signals = self.Signals()
self.core = LegendaryCoreSingleton()
self.path = Path(path)
self.import_folder = import_folder
self.app_name = app_name
self.tr = lambda message: QApplication.translate("ImportThread", message)
def run(self) -> None:
result = Result([], [], [])
if self.import_folder:
number_of_folders = len(list(self.path.iterdir()))
for i, child in enumerate(self.path.iterdir()):
if not child.is_dir():
continue
if (app_name := find_app_name(str(child), self.core)) is not None:
logger.debug(f"Found app_name {app_name} for {child}")
err = self.__import_game(app_name, child)
if err:
result.failed_games.append(ResultGame(app_name, err))
else:
result.successful_games.append(ResultGame(app_name))
else:
result.error_messages.append(self.tr("Could not find AppName for {}").format(child))
self.signals.progress.emit(int(100 * i // number_of_folders))
else:
if not self.app_name:
# try to find app name
if a_n := find_app_name(str(self.path), self.core):
self.app_name = a_n
else:
result.error_messages.append(self.tr("Could not find AppName for {}").format(str(self.path)))
return
err = self.__import_game(self.app_name, self.path)
if err:
result.failed_games.append(ResultGame(self.app_name, err))
else:
result.successful_games.append(ResultGame(self.app_name))
self.signals.finished.emit(result)
def __import_game(self, app_name: str, path: Path) -> str:
if not (err := legendary_utils.import_game(self.core, app_name=app_name, path=str(path))):
igame = self.core.get_installed_game(app_name)
logger.info(f"Successfully imported {igame.title}")
return ""
else:
return err
class AppNameCompleter(QCompleter):
activated = pyqtSignal(str)
@ -97,9 +183,7 @@ class ImportGroup(QGroupBox):
self.ui.import_button.setEnabled(False)
self.ui.import_button.clicked.connect(
lambda: self.import_games(self.path_edit.text())
if self.ui.import_folder_check.isChecked()
else self.import_game(self.path_edit.text())
lambda: self.import_pressed(self.path_edit.text())
)
self.ui.import_folder_check.stateChanged.connect(
@ -108,6 +192,7 @@ class ImportGroup(QGroupBox):
self.ui.import_folder_check.stateChanged.connect(
lambda s: self.app_name_edit.setEnabled(not s)
)
self.threadpool = QThreadPool.globalInstance()
def path_edit_cb(self, path) -> Tuple[bool, str, str]:
if os.path.exists(path):
@ -123,7 +208,7 @@ class ImportGroup(QGroupBox):
self.ui.info_label.setText(str())
self.ui.import_folder_check.setChecked(False)
if self.path_edit.is_valid:
self.app_name_edit.setText(self.find_app_name(path))
self.app_name_edit.setText(find_app_name(path, self.core))
else:
self.app_name_edit.setText(str())
@ -142,53 +227,41 @@ class ImportGroup(QGroupBox):
else:
self.ui.import_button.setEnabled(False)
def find_app_name(self, path: str) -> Optional[str]:
if os.path.exists(os.path.join(path, ".egstore")):
for i in os.listdir(os.path.join(path, ".egstore")):
if i.endswith(".mancpn"):
file = os.path.join(path, ".egstore", i)
return json.load(open(file, "r")).get("AppName")
elif app_name := legendary_utils.resolve_aliases(
self.core, os.path.basename(os.path.normpath(path))):
return app_name
else:
logger.warning(f"Could not find AppName for {path}")
return None
def import_game(self, path=None):
def import_pressed(self, path=None):
if not path:
path = self.path_edit.text()
if not (app_name := self.app_name_edit.text()):
# try to find app name
if a_n := self.find_app_name(path):
app_name = a_n
worker = ImportWorker(path, self.ui.import_folder_check.isChecked(), self.app_name_edit.text())
worker.signals.finished.connect(self.import_finished)
worker.signals.progress.connect(self.import_progress)
self.threadpool.start(worker)
def import_finished(self, result: Result):
logger.info(f"Import finished: {result.__dict__}")
if result.successful_games:
self.signals.update_gamelist.emit([i.app_name for i in result.successful_games])
if len(result.successful_games) == 1:
self.ui.info_label.setText(
self.tr(f"{self.core.get_game(result.successful_games[0].app_name).app_title} imported successfully: ")
)
else:
self.ui.info_label.setText(self.tr("Could not find AppName"))
return
self.__import_game(app_name, path)
self.ui.info_label.setText(
self.tr("Imported {} games successfully".format(len(result.successful_games)))
)
for res_game in result.failed_games:
igame = self.core.get_installed_game(res_game.app_name)
if igame.version != self.core.get_asset(igame.app_name, igame.platform, False).build_version:
# update available
self.signals.add_download.emit(igame.app_name)
self.signals.update_download_tab_text.emit()
def import_games(self, path=None):
if not path:
path = self.path_edit.text()
path = Path(path)
for child in path.iterdir():
if child.is_dir():
if (app_name := self.find_app_name(str(child))) is not None:
self.__import_game(app_name, str(child))
if result.failed_games:
self.ui.info_label.setText(
self.tr(f"Failed to import: "
f"{', '.join([self.core.get_game(i.app_name).app_title for i in result.failed_games])}")
)
if result.error_messages:
QMessageBox.warning(self, self.tr("Error"), self.tr("\n".join(result.error_messages)))
def __import_game(self, app_name, path):
if not (err := legendary_utils.import_game(self.core, app_name=app_name, path=path)):
igame = self.core.get_installed_game(app_name)
logger.info(f"Successfully imported {igame.title}")
self.ui.info_label.setText(self.tr("Successfully imported {}").format(igame.title))
self.signals.update_gamelist.emit([app_name])
if igame.version != self.core.get_asset(app_name, igame.platform, False).build_version:
# update available
self.signals.add_download.emit(igame.app_name)
self.signals.update_download_tab_text.emit()
else:
logger.warning(f'Failed to import "{app_name}"')
self.ui.info_label.setText(self.tr("Could not import {}: {}").format(app_name, err))
return
def import_progress(self, progress: int):
pass