1
0
Fork 0
mirror of synced 2024-06-26 18:20:50 +12:00
Rare/rare/shared/image_manager.py
loathingKernel c063f5f5b9
Library: Move image loading from RareCore to the GameWidget
Instead of loading images in the showEvent of the MainWindow,
load them in the showEvent of each widget. It seems to reduce
the startup stuttering this way. With some more work
we can only load the images for the widgets that are currently
visible and reduce the stutter even more.

At the same time, reduce the number of concurrent downloads
in the image manager and add a timeout so we won't halt.
The exception from the timeout is just logged at this point,
and the download is not requeued.
2023-11-29 12:50:02 +02:00

386 lines
16 KiB
Python

import hashlib
import json
import pickle
import zlib
# from concurrent import futures
from logging import getLogger
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Set
from typing import Tuple, Dict, Union, Type, List, Callable
import requests
from PyQt5.QtCore import (
Qt,
pyqtSignal,
QObject,
QSize,
QThreadPool,
QRunnable,
QRect,
QRectF,
)
from PyQt5.QtGui import (
QPixmap,
QImage,
QPainter,
QPainterPath,
QBrush,
QTransform,
QPen,
)
from PyQt5.QtWidgets import QApplication
from legendary.models.game import Game
from rare.lgndr.core import LegendaryCore
from rare.models.image import ImageSize
from rare.models.signals import GlobalSignals
from rare.utils.paths import image_dir, resources_path, desktop_icon_suffix
# from requests_futures.sessions import FuturesSession
if TYPE_CHECKING:
pass
logger = getLogger("ImageManager")
class ImageManager(QObject):
class Worker(QRunnable):
class Signals(QObject):
# object: Game
completed = pyqtSignal(object)
def __init__(self, func: Callable, updates: List, json_data: Dict, game: Game):
super(ImageManager.Worker, self).__init__()
self.signals = ImageManager.Worker.Signals()
self.setAutoDelete(True)
self.func = func
self.updates = updates
self.json_data = json_data
self.game = game
def run(self):
self.func(self.updates, self.json_data, self.game)
logger.debug(f"Emitting singal for {self.game.app_name} ({self.game.app_title})")
self.signals.completed.emit(self.game)
def __init__(self, signals: GlobalSignals, core: LegendaryCore):
# lk: the ordering in __img_types matters for the order of fallbacks
# self.__img_types: Tuple = ("DieselGameBoxTall", "Thumbnail", "DieselGameBoxLogo", "DieselGameBox", "OfferImageTall")
self.__img_types: Tuple = ("DieselGameBoxTall", "Thumbnail", "DieselGameBoxLogo", "OfferImageTall")
self.__dl_retries = 1
self.__worker_app_names: Set[str] = set()
super(QObject, self).__init__()
self.signals = signals
self.core = core
self.image_dir: Path = image_dir()
if not self.image_dir.is_dir():
self.image_dir.mkdir()
logger.info(f"Created image directory at {self.image_dir}")
self.device = ImageSize.Preset(1, QApplication.instance().devicePixelRatio())
self.threadpool = QThreadPool()
self.threadpool.setMaxThreadCount(6)
def __img_dir(self, app_name: str) -> Path:
return self.image_dir.joinpath(app_name)
def __img_json(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath("image.json")
def __img_cache(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath("image.cache")
def __img_color(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath("installed.png")
def __img_gray(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath("uninstalled.png")
def __img_desktop_icon(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath(f"icon.{desktop_icon_suffix()}")
def __prepare_download(self, game: Game, force: bool = False) -> Tuple[List, Dict]:
if force and self.__img_dir(game.app_name).exists():
self.__img_color(game.app_name).unlink(missing_ok=True)
self.__img_gray(game.app_name).unlink(missing_ok=True)
self.__img_desktop_icon(game.app_name).unlink(missing_ok=True)
if not self.__img_dir(game.app_name).is_dir():
self.__img_dir(game.app_name).mkdir()
# Load image checksums
if not self.__img_json(game.app_name).is_file():
json_data: Dict = dict(zip(self.__img_types, [None] * len(self.__img_types)))
else:
json_data = json.load(open(self.__img_json(game.app_name), "r"))
# lk: Find updates or initialize if images are missing.
# lk: `updates` will be empty for games without images
# lk: so everything below it is skipped
updates = []
if not (
self.__img_color(game.app_name).is_file()
and self.__img_gray(game.app_name).is_file()
and self.__img_desktop_icon(game.app_name).is_file()
):
# lk: fast path for games without images, convert Rare's logo
if not game.metadata.get("keyImages", []):
cache_data: Dict = dict(zip(self.__img_types, [None] * len(self.__img_types)))
cache_data["DieselGameBoxTall"] = open(
resources_path.joinpath("images", "cover.png"), "rb"
).read()
# cache_data["DieselGameBoxLogo"] = open(
# resources_path.joinpath("images", "Rare_nonsquared.png"), "rb").read()
self.__convert(game, cache_data)
json_data["cache"] = None
json_data["scale"] = ImageSize.Image.pixel_ratio
json_data["size"] = ImageSize.Image.size.__str__()
json.dump(json_data, open(self.__img_json(game.app_name), "w"))
else:
updates = [image for image in game.metadata["keyImages"] if image["type"] in self.__img_types]
else:
for image in game.metadata.get("keyImages", []):
if image["type"] in self.__img_types:
if image["type"] not in json_data.keys() or json_data[image["type"]] != image["md5"]:
updates.append(image)
return updates, json_data
def __download(self, updates, json_data, game, use_async: bool = False) -> bool:
# Decompress existing image.cache
if not self.__img_cache(game.app_name).is_file():
cache_data = dict(zip(self.__img_types, [None] * len(self.__img_types)))
else:
cache_data = self.__decompress(game)
# lk: filter updates again against the cache now that it is available
updates = [
image
for image in updates
if cache_data.get(image["type"], None) is None or json_data[image["type"]] != image["md5"]
]
# Download
# # lk: Keep this here, so I don't have to go looking for it again,
# # lk: it might be useful in the future.
# if use_async and len(updates) > 1:
# session = FuturesSession(max_workers=len(self.__img_types))
# image_requests = []
# for image in updates:
# logger.info(f"Downloading {image['type']} for {game.app_title}")
# json_data[image["type"]] = image["md5"]
# payload = {"resize": 1, "w": ImageSize.Image.size.width(), "h": ImageSize.Image.size.height()}
# req = session.get(image["url"], params=payload)
# req.image_type = image["type"]
# image_requests.append(req)
# for req in futures.as_completed(image_requests):
# cache_data[req.image_type] = req.result().content
# else:
for image in updates:
logger.info(f"Downloading {image['type']} for {game.app_name} ({game.app_title})")
json_data[image["type"]] = image["md5"]
payload = {"resize": 1, "w": ImageSize.Image.size.width(), "h": ImageSize.Image.size.height()}
try:
# cache_data[image["type"]] = requests.get(image["url"], params=payload).content
cache_data[image["type"]] = requests.get(image["url"], params=payload, timeout=10).content
except Exception as e:
logger.error(e)
return False
self.__convert(game, cache_data)
# lk: don't keep the cache if there is no logo (kept for me)
# if cache_data["DieselGameBoxLogo"] is not None:
# self.__compress(game, cache_data)
self.__compress(game, cache_data)
# hash image cache
try:
with open(self.__img_cache(game.app_name), "rb") as archive:
archive_hash = hashlib.md5(archive.read()).hexdigest()
except FileNotFoundError:
archive_hash = None
json_data["cache"] = archive_hash
json_data["scale"] = ImageSize.Image.pixel_ratio
json_data["size"] = {"w": ImageSize.Image.size.width(), "h": ImageSize.Image.size.height()}
# write image.json
with open(self.__img_json(game.app_name), "w") as file:
json.dump(json_data, file)
return bool(updates)
__icon_overlay: Optional[QPainterPath] = None
@staticmethod
def __generate_icon_overlay(rect: QRect) -> QPainterPath:
if ImageManager.__icon_overlay is not None:
return ImageManager.__icon_overlay
rounded_path = QPainterPath()
margin = 0.1
rounded_path.addRoundedRect(
QRectF(
rect.width() * margin,
rect.height() * margin,
rect.width() - (rect.width() * margin * 2),
rect.height() - (rect.width() * margin * 2)
),
rect.height() * 0.2,
rect.height() * 0.2,
)
ImageManager.__icon_overlay = rounded_path
return ImageManager.__icon_overlay
@staticmethod
def __convert_icon(cover: QImage) -> QImage:
icon_size = QSize(128, 128)
icon = QImage(icon_size, QImage.Format_ARGB32_Premultiplied)
painter = QPainter(icon)
painter.setRenderHint(QPainter.SmoothPixmapTransform, True)
painter.setRenderHint(QPainter.Antialiasing, True)
painter.setCompositionMode(QPainter.CompositionMode_Source)
painter.fillRect(icon.rect(), Qt.transparent)
overlay = ImageManager.__generate_icon_overlay(icon.rect())
brush = QBrush(cover)
scale = max(icon.width()/cover.width(), icon.height()/cover.height())
transform = QTransform().scale(scale, scale)
brush.setTransform(transform)
painter.fillPath(overlay, brush)
pen = QPen(Qt.black, 2)
painter.setPen(pen)
painter.drawPath(overlay)
painter.end()
return icon
def __convert(self, game, images, force=False) -> None:
for image in [self.__img_color(game.app_name), self.__img_gray(game.app_name)]:
if force and image.exists():
image.unlink(missing_ok=True)
cover_data = None
for image_type in self.__img_types:
if images[image_type] is not None:
cover_data = images[image_type]
break
cover = QImage()
cover.loadFromData(cover_data)
cover.convertToFormat(QImage.Format_ARGB32_Premultiplied)
# lk: Images are not always 4/3, crop them to size
factor = min(cover.width() // 3, cover.height() // 4)
rem_w = (cover.width() - factor * 3) // 2
rem_h = (cover.height() - factor * 4) // 2
cover = cover.copy(rem_w, rem_h, factor * 3, factor * 4)
if images["DieselGameBoxLogo"] is not None:
logo = QImage()
logo.loadFromData(images["DieselGameBoxLogo"])
logo.convertToFormat(QImage.Format_ARGB32_Premultiplied)
if logo.width() > cover.width():
logo = logo.scaled(cover.width(), cover.height(), Qt.KeepAspectRatio, Qt.SmoothTransformation)
painter = QPainter(cover)
painter.drawImage((cover.width() - logo.width()) // 2, cover.height() - logo.height(), logo)
painter.end()
cover = cover.scaled(ImageSize.Image.size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
icon = self.__convert_icon(cover)
icon.save(str(self.__img_desktop_icon(game.app_name)), format=desktop_icon_suffix().upper())
# this is not required if we ever want to re-apply the alpha channel
# cover = cover.convertToFormat(QImage.Format_Indexed8)
# add the alpha channel back to the cover
cover = cover.convertToFormat(QImage.Format_ARGB32_Premultiplied)
cover.save(str(self.__img_color(game.app_name)), format="PNG")
# quick way to convert to grayscale
cover = cover.convertToFormat(QImage.Format_Grayscale8)
# add the alpha channel back to the grayscale cover
cover = cover.convertToFormat(QImage.Format_ARGB32_Premultiplied)
cover.save(str(self.__img_gray(game.app_name)), format="PNG")
def __compress(self, game: Game, data: Dict) -> None:
archive = open(self.__img_cache(game.app_name), "wb")
cdata = zlib.compress(pickle.dumps(data), level=-1)
archive.write(cdata)
archive.close()
def __decompress(self, game: Game) -> Dict:
archive = open(self.__img_cache(game.app_name), "rb")
try:
data = zlib.decompress(archive.read())
data = pickle.loads(data)
except zlib.error:
data = dict(zip(self.__img_types, [None] * len(self.__img_types)))
finally:
archive.close()
return data
def download_image(
self, game: Game, load_callback: Callable[[], None], priority: int, force: bool = False
) -> None:
if game.app_name in self.__worker_app_names:
return
self.__worker_app_names.add(game.app_name)
updates, json_data = self.__prepare_download(game, force)
if not updates:
self.__worker_app_names.remove(game.app_name)
load_callback()
else:
image_worker = ImageManager.Worker(self.__download, updates, json_data, game)
image_worker.signals.completed.connect(lambda g: self.__worker_app_names.remove(g.app_name))
image_worker.signals.completed.connect(load_callback)
self.threadpool.start(image_worker, priority)
def download_image_blocking(self, game: Game, force: bool = False) -> None:
updates, json_data = self.__prepare_download(game, force)
if not updates:
return
if updates:
self.__download(updates, json_data, game, use_async=True)
def __get_cover(
self, container: Union[Type[QPixmap], Type[QImage]], app_name: str, color: bool = True
) -> Union[QPixmap, QImage]:
ret = container()
if not app_name:
raise RuntimeError("app_name is an empty string")
if color:
if self.__img_color(app_name).is_file():
ret.load(str(self.__img_color(app_name)))
else:
if self.__img_gray(app_name).is_file():
ret.load(str(self.__img_gray(app_name)))
if not ret.isNull():
ret.setDevicePixelRatio(ImageSize.Image.pixel_ratio)
# lk: Scaling happens at painting. It might be inefficient so leave this here as an alternative
# lk: If this is uncommented, the transformation in ImageWidget should be adjusted also
ret = ret.scaled(self.device.size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
ret.setDevicePixelRatio(self.device.pixel_ratio)
return ret
def get_pixmap(self, app_name: str, color: bool = True) -> QPixmap:
"""
Use when the image is to be presented directly on the screen.
@param app_name: The RareGame object for this game
@param color: True to load the colored pixmap, False to load the grayscale
@return: QPixmap
"""
pixmap: QPixmap = self.__get_cover(QPixmap, app_name, color)
return pixmap
def get_image(self, app_name: str, color: bool = True) -> QImage:
"""
Use when the image has to be manipulated before being rendered.
@param app_name: The RareGame object for this game
@param color: True to load the colored image, False to load the grayscale
@return: QImage
"""
image: QImage = self.__get_cover(QImage, app_name, color)
return image