1
0
Fork 0
mirror of synced 2024-05-26 15:30:15 +12:00
Rare/rare/shared/image_manager.py
2024-01-18 16:43:03 +02:00

386 lines
16 KiB
Python

import hashlib
import json
import pickle
import zlib
# from concurrent import futures
from logging import getLogger
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Set
from typing import Tuple, Dict, Union, Type, List, Callable
import requests
from PyQt5.QtCore import (
Qt,
pyqtSignal,
QObject,
QSize,
QThreadPool,
QRunnable,
QRect,
QRectF,
)
from PyQt5.QtGui import (
QPixmap,
QImage,
QPainter,
QPainterPath,
QBrush,
QTransform,
QPen,
)
from PyQt5.QtWidgets import QApplication
from legendary.models.game import Game
from rare.lgndr.core import LegendaryCore
from rare.models.image import ImageSize
from rare.models.signals import GlobalSignals
from rare.utils.paths import image_dir, resources_path, desktop_icon_suffix
# from requests_futures.sessions import FuturesSession
if TYPE_CHECKING:
pass
logger = getLogger("ImageManager")
class ImageManager(QObject):
class Worker(QRunnable):
class Signals(QObject):
# object: Game
completed = pyqtSignal(object)
def __init__(self, func: Callable, updates: List, json_data: Dict, game: Game):
super(ImageManager.Worker, self).__init__()
self.signals = ImageManager.Worker.Signals()
self.setAutoDelete(True)
self.func = func
self.updates = updates
self.json_data = json_data
self.game = game
def run(self):
self.func(self.updates, self.json_data, self.game)
logger.debug(f"Emitting singal for {self.game.app_name} ({self.game.app_title})")
self.signals.completed.emit(self.game)
def __init__(self, signals: GlobalSignals, core: LegendaryCore):
# lk: the ordering in __img_types matters for the order of fallbacks
# self.__img_types: Tuple = ("DieselGameBoxTall", "Thumbnail", "DieselGameBoxLogo", "DieselGameBox", "OfferImageTall")
self.__img_types: Tuple = ("DieselGameBoxTall", "Thumbnail", "DieselGameBoxLogo", "OfferImageTall")
self.__dl_retries = 1
self.__worker_app_names: Set[str] = set()
super(QObject, self).__init__()
self.signals = signals
self.core = core
self.image_dir: Path = image_dir()
if not self.image_dir.is_dir():
self.image_dir.mkdir()
logger.info(f"Created image directory at {self.image_dir}")
self.device = ImageSize.Preset(1, QApplication.instance().devicePixelRatio())
self.threadpool = QThreadPool()
self.threadpool.setMaxThreadCount(4)
def __img_dir(self, app_name: str) -> Path:
return self.image_dir.joinpath(app_name)
def __img_json(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath("image.json")
def __img_cache(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath("image.cache")
def __img_color(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath("installed.png")
def __img_gray(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath("uninstalled.png")
def __img_desktop_icon(self, app_name: str) -> Path:
return self.__img_dir(app_name).joinpath(f"icon.{desktop_icon_suffix()}")
def __prepare_download(self, game: Game, force: bool = False) -> Tuple[List, Dict]:
if force and self.__img_dir(game.app_name).exists():
self.__img_color(game.app_name).unlink(missing_ok=True)
self.__img_gray(game.app_name).unlink(missing_ok=True)
self.__img_desktop_icon(game.app_name).unlink(missing_ok=True)
if not self.__img_dir(game.app_name).is_dir():
self.__img_dir(game.app_name).mkdir()
# Load image checksums
if not self.__img_json(game.app_name).is_file():
json_data: Dict = dict(zip(self.__img_types, [None] * len(self.__img_types)))
else:
json_data = json.load(open(self.__img_json(game.app_name), "r"))
# lk: Find updates or initialize if images are missing.
# lk: `updates` will be empty for games without images
# lk: so everything below it is skipped
updates = []
if not (
self.__img_color(game.app_name).is_file()
and self.__img_gray(game.app_name).is_file()
and self.__img_desktop_icon(game.app_name).is_file()
):
# lk: fast path for games without images, convert Rare's logo
if not game.metadata.get("keyImages", []):
cache_data: Dict = dict(zip(self.__img_types, [None] * len(self.__img_types)))
cache_data["DieselGameBoxTall"] = open(
resources_path.joinpath("images", "cover.png"), "rb"
).read()
# cache_data["DieselGameBoxLogo"] = open(
# resources_path.joinpath("images", "Rare_nonsquared.png"), "rb").read()
self.__convert(game, cache_data)
json_data["cache"] = None
json_data["scale"] = ImageSize.Image.pixel_ratio
json_data["size"] = ImageSize.Image.size.__str__()
json.dump(json_data, open(self.__img_json(game.app_name), "w"))
else:
updates = [image for image in game.metadata["keyImages"] if image["type"] in self.__img_types]
else:
for image in game.metadata.get("keyImages", []):
if image["type"] in self.__img_types:
if image["type"] not in json_data.keys() or json_data[image["type"]] != image["md5"]:
updates.append(image)
return updates, json_data
def __download(self, updates, json_data, game, use_async: bool = False) -> bool:
# Decompress existing image.cache
if not self.__img_cache(game.app_name).is_file():
cache_data = dict(zip(self.__img_types, [None] * len(self.__img_types)))
else:
cache_data = self.__decompress(game)
# lk: filter updates again against the cache now that it is available
updates = [
image
for image in updates
if cache_data.get(image["type"], None) is None or json_data[image["type"]] != image["md5"]
]
# Download
# # lk: Keep this here, so I don't have to go looking for it again,
# # lk: it might be useful in the future.
# if use_async and len(updates) > 1:
# session = FuturesSession(max_workers=len(self.__img_types))
# image_requests = []
# for image in updates:
# logger.info(f"Downloading {image['type']} for {game.app_title}")
# json_data[image["type"]] = image["md5"]
# payload = {"resize": 1, "w": ImageSize.Image.size.width(), "h": ImageSize.Image.size.height()}
# req = session.get(image["url"], params=payload)
# req.image_type = image["type"]
# image_requests.append(req)
# for req in futures.as_completed(image_requests):
# cache_data[req.image_type] = req.result().content
# else:
for image in updates:
logger.info(f"Downloading {image['type']} for {game.app_name} ({game.app_title})")
json_data[image["type"]] = image["md5"]
payload = {"resize": 1, "w": ImageSize.Image.size.width(), "h": ImageSize.Image.size.height()}
try:
# cache_data[image["type"]] = requests.get(image["url"], params=payload).content
cache_data[image["type"]] = requests.get(image["url"], params=payload, timeout=10).content
except Exception as e:
logger.error(e)
return False
self.__convert(game, cache_data)
# lk: don't keep the cache if there is no logo (kept for me)
# if cache_data["DieselGameBoxLogo"] is not None:
# self.__compress(game, cache_data)
self.__compress(game, cache_data)
# hash image cache
try:
with open(self.__img_cache(game.app_name), "rb") as archive:
archive_hash = hashlib.md5(archive.read()).hexdigest()
except FileNotFoundError:
archive_hash = None
json_data["cache"] = archive_hash
json_data["scale"] = ImageSize.Image.pixel_ratio
json_data["size"] = {"w": ImageSize.Image.size.width(), "h": ImageSize.Image.size.height()}
# write image.json
with open(self.__img_json(game.app_name), "w") as file:
json.dump(json_data, file)
return bool(updates)
__icon_overlay: Optional[QPainterPath] = None
@staticmethod
def __generate_icon_overlay(rect: QRect) -> QPainterPath:
if ImageManager.__icon_overlay is not None:
return ImageManager.__icon_overlay
rounded_path = QPainterPath()
margin = 0.1
rounded_path.addRoundedRect(
QRectF(
rect.width() * margin,
rect.height() * margin,
rect.width() - (rect.width() * margin * 2),
rect.height() - (rect.width() * margin * 2)
),
rect.height() * 0.2,
rect.height() * 0.2,
)
ImageManager.__icon_overlay = rounded_path
return ImageManager.__icon_overlay
@staticmethod
def __convert_icon(cover: QImage) -> QImage:
icon_size = QSize(128, 128)
icon = QImage(icon_size, QImage.Format_ARGB32_Premultiplied)
painter = QPainter(icon)
painter.setRenderHint(QPainter.SmoothPixmapTransform, True)
painter.setRenderHint(QPainter.Antialiasing, True)
painter.setCompositionMode(QPainter.CompositionMode_Source)
painter.fillRect(icon.rect(), Qt.transparent)
overlay = ImageManager.__generate_icon_overlay(icon.rect())
brush = QBrush(cover)
scale = max(icon.width()/cover.width(), icon.height()/cover.height())
transform = QTransform().scale(scale, scale)
brush.setTransform(transform)
painter.fillPath(overlay, brush)
pen = QPen(Qt.black, 2)
painter.setPen(pen)
painter.drawPath(overlay)
painter.end()
return icon
def __convert(self, game, images, force=False) -> None:
for image in [self.__img_color(game.app_name), self.__img_gray(game.app_name)]:
if force and image.exists():
image.unlink(missing_ok=True)
cover_data = None
for image_type in self.__img_types:
if images[image_type] is not None:
cover_data = images[image_type]
break
cover = QImage()
cover.loadFromData(cover_data)
cover.convertToFormat(QImage.Format_ARGB32_Premultiplied)
# lk: Images are not always 4/3, crop them to size
factor = min(cover.width() // 3, cover.height() // 4)
rem_w = (cover.width() - factor * 3) // 2
rem_h = (cover.height() - factor * 4) // 2
cover = cover.copy(rem_w, rem_h, factor * 3, factor * 4)
if images["DieselGameBoxLogo"] is not None:
logo = QImage()
logo.loadFromData(images["DieselGameBoxLogo"])
logo.convertToFormat(QImage.Format_ARGB32_Premultiplied)
if logo.width() > cover.width():
logo = logo.scaled(cover.width(), cover.height(), Qt.KeepAspectRatio, Qt.SmoothTransformation)
painter = QPainter(cover)
painter.drawImage((cover.width() - logo.width()) // 2, cover.height() - logo.height(), logo)
painter.end()
cover = cover.scaled(ImageSize.Image.size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
icon = self.__convert_icon(cover)
icon.save(str(self.__img_desktop_icon(game.app_name)), format=desktop_icon_suffix().upper())
# this is not required if we ever want to re-apply the alpha channel
# cover = cover.convertToFormat(QImage.Format_Indexed8)
# add the alpha channel back to the cover
cover = cover.convertToFormat(QImage.Format_ARGB32_Premultiplied)
cover.save(str(self.__img_color(game.app_name)), format="PNG")
# quick way to convert to grayscale
cover = cover.convertToFormat(QImage.Format_Grayscale8)
# add the alpha channel back to the grayscale cover
cover = cover.convertToFormat(QImage.Format_ARGB32_Premultiplied)
cover.save(str(self.__img_gray(game.app_name)), format="PNG")
def __compress(self, game: Game, data: Dict) -> None:
archive = open(self.__img_cache(game.app_name), "wb")
cdata = zlib.compress(pickle.dumps(data), level=-1)
archive.write(cdata)
archive.close()
def __decompress(self, game: Game) -> Dict:
archive = open(self.__img_cache(game.app_name), "rb")
try:
data = zlib.decompress(archive.read())
data = pickle.loads(data)
except zlib.error:
data = dict(zip(self.__img_types, [None] * len(self.__img_types)))
finally:
archive.close()
return data
def download_image(
self, game: Game, load_callback: Callable[[], None], priority: int, force: bool = False
) -> None:
if game.app_name in self.__worker_app_names:
return
self.__worker_app_names.add(game.app_name)
updates, json_data = self.__prepare_download(game, force)
if not updates:
self.__worker_app_names.remove(game.app_name)
load_callback()
else:
image_worker = ImageManager.Worker(self.__download, updates, json_data, game)
image_worker.signals.completed.connect(lambda g: self.__worker_app_names.remove(g.app_name))
image_worker.signals.completed.connect(load_callback)
self.threadpool.start(image_worker, priority)
def download_image_blocking(self, game: Game, force: bool = False) -> None:
updates, json_data = self.__prepare_download(game, force)
if not updates:
return
if updates:
self.__download(updates, json_data, game, use_async=True)
def __get_cover(
self, container: Union[Type[QPixmap], Type[QImage]], app_name: str, color: bool = True
) -> Union[QPixmap, QImage]:
ret = container()
if not app_name:
raise RuntimeError("app_name is an empty string")
if color:
if self.__img_color(app_name).is_file():
ret.load(str(self.__img_color(app_name)))
else:
if self.__img_gray(app_name).is_file():
ret.load(str(self.__img_gray(app_name)))
if not ret.isNull():
ret.setDevicePixelRatio(ImageSize.Image.pixel_ratio)
# lk: Scaling happens at painting. It might be inefficient so leave this here as an alternative
# lk: If this is uncommented, the transformation in ImageWidget should be adjusted also
ret = ret.scaled(self.device.size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
ret.setDevicePixelRatio(self.device.pixel_ratio)
return ret
def get_pixmap(self, app_name: str, color: bool = True) -> QPixmap:
"""
Use when the image is to be presented directly on the screen.
@param app_name: The RareGame object for this game
@param color: True to load the colored pixmap, False to load the grayscale
@return: QPixmap
"""
pixmap: QPixmap = self.__get_cover(QPixmap, app_name, color)
return pixmap
def get_image(self, app_name: str, color: bool = True) -> QImage:
"""
Use when the image has to be manipulated before being rendered.
@param app_name: The RareGame object for this game
@param color: True to load the colored image, False to load the grayscale
@return: QImage
"""
image: QImage = self.__get_cover(QImage, app_name, color)
return image