1
0
Fork 0
mirror of synced 2024-06-17 18:14:44 +12:00
Rare/rare/shared/image_manager.py

462 lines
20 KiB
Python
Raw Permalink Normal View History

import hashlib
import json
import pickle
import zlib
# from concurrent import futures
from logging import getLogger
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Set
from typing import Tuple, Dict, Union, Type, List, Callable
import requests
from PyQt5.QtCore import Qt, pyqtSignal, QObject, QSize, QThreadPool, QRunnable, QRect, QRectF, pyqtSlot
from PyQt5.QtGui import QPixmap, QImage, QPainter, QPainterPath, QBrush, QTransform, QPen
from PyQt5.QtWidgets import QApplication
from legendary.models.game import Game
from rare.lgndr.core import LegendaryCore
from rare.models.image import ImageSize, ImageType
from rare.models.signals import GlobalSignals
from rare.utils.paths import (
image_dir,
image_dir_game,
image_tall_path,
image_wide_path,
image_icon_path,
resources_path,
desktop_icon_suffix,
desktop_icon_path,
)
# from requests_futures.sessions import FuturesSession
if TYPE_CHECKING:
pass
logger = getLogger("ImageManager")
class ImageManager(QObject):
class Worker(QRunnable):
class Signals(QObject):
# object: Game
completed = pyqtSignal(object)
def __init__(self, func: Callable, updates: List, json_data: Dict, game: Game):
super(ImageManager.Worker, self).__init__()
self.signals = ImageManager.Worker.Signals()
self.setAutoDelete(True)
self.func = func
self.updates = updates
self.json_data = json_data
self.game = game
def run(self):
self.func(self.updates, self.json_data, self.game)
2023-06-18 12:26:02 +12:00
logger.debug(f"Emitting singal for {self.game.app_name} ({self.game.app_title})")
self.signals.completed.emit(self.game)
def __init__(self, signals: GlobalSignals, core: LegendaryCore):
# lk: the ordering in __img_types matters for the order of fallbacks
# {'AndroidIcon', 'DieselGameBox', 'DieselGameBoxLogo', 'DieselGameBoxTall', 'DieselGameBoxWide',
# 'ESRB', 'Featured', 'OfferImageTall', 'OfferImageWide', 'Screenshot', 'Thumbnail'}
self.__img_tall_types: Tuple = ("DieselGameBoxTall", "OfferImageTall", "Thumbnail")
self.__img_wide_types: Tuple = ("DieselGameBoxWide", "DieselGameBox", "OfferImageWide", "Screenshot")
self.__img_logo_types: Tuple = ("DieselGameBoxLogo",)
self.__img_types: Tuple = self.__img_tall_types + self.__img_wide_types + self.__img_logo_types
self.__dl_retries = 1
self.__worker_app_names: Set[str] = set()
super(QObject, self).__init__()
self.signals = signals
self.core = core
self.image_dir: Path = image_dir()
if not self.image_dir.is_dir():
self.image_dir.mkdir()
logger.info(f"Created image directory at {self.image_dir}")
self.threadpool = QThreadPool()
self.threadpool.setMaxThreadCount(6)
@staticmethod
def __img_json(app_name: str) -> Path:
return image_dir_game(app_name).joinpath("image.json")
@staticmethod
def __img_cache(app_name: str) -> Path:
return image_dir_game(app_name).joinpath("image.cache")
@staticmethod
def __img_all(app_name: str) -> Tuple:
return (
image_tall_path(app_name),
image_tall_path(app_name, color=False),
image_wide_path(app_name),
image_wide_path(app_name, color=False),
image_icon_path(app_name),
image_icon_path(app_name, color=False),
desktop_icon_path(app_name),
)
def __prepare_download(self, game: Game, force: bool = False) -> Tuple[List, Dict]:
if force and image_dir_game(game.app_name).exists():
for file in self.__img_all(game.app_name):
file.unlink(missing_ok=True)
if not image_dir_game(game.app_name).is_dir():
image_dir_game(game.app_name).mkdir()
# Load image checksums
if not self.__img_json(game.app_name).is_file():
json_data: Dict = dict(zip(self.__img_types, [None] * len(self.__img_types)))
else:
json_data = json.load(open(self.__img_json(game.app_name), "r"))
# Only download the best matching candidate for each image category
def best_match(key_images: List, image_types: Tuple) -> Dict:
matches = sorted(
filter(lambda image: image["type"] in image_types, key_images),
key=lambda x: image_types.index(x["type"]) if x["type"] in image_types else len(image_types),
reverse=False,
)
try:
best = matches[0]
except IndexError as e:
best = {}
return best
candidates = tuple(
image
for image in [
best_match(game.metadata.get("keyImages", []), self.__img_tall_types),
best_match(game.metadata.get("keyImages", []), self.__img_wide_types),
best_match(game.metadata.get("keyImages", []), self.__img_logo_types),
]
if bool(image)
)
# lk: Find updates or initialize if images are missing.
# lk: `updates` will be empty for games without images
# lk: so everything below it is skipped
updates = []
if not all(file.is_file() for file in self.__img_all(game.app_name)):
# lk: fast path for games without images, convert Rare's logo
if not candidates:
cache_data: Dict = dict(zip(self.__img_types, [None] * len(self.__img_types)))
with open(resources_path.joinpath("images", "cover.png"), "rb") as fd:
cache_data["DieselGameBoxTall"] = fd.read()
fd.seek(0)
cache_data["DieselGameBoxWide"] = fd.read()
# cache_data["DieselGameBoxLogo"] = open(
# resources_path.joinpath("images", "Rare_nonsquared.png"), "rb").read()
self.__convert(game, cache_data)
json_data["cache"] = None
json_data["scale"] = ImageSize.Tall.pixel_ratio
json_data["size"] = {"w": ImageSize.Tall.size.width(), "h": ImageSize.Tall.size.height()}
json.dump(json_data, open(self.__img_json(game.app_name), "w"))
else:
updates = [image for image in candidates if image["type"] in self.__img_types]
else:
for image in candidates:
if image["type"] in self.__img_types:
if image["type"] not in json_data.keys() or json_data[image["type"]] != image["md5"]:
updates.append(image)
return updates, json_data
def __download(self, updates: List, json_data: Dict, game: Game, use_async: bool = False) -> bool:
# Decompress existing image.cache
if not self.__img_cache(game.app_name).is_file():
cache_data = dict(zip(self.__img_types, [None] * len(self.__img_types)))
else:
cache_data = self.__decompress(game)
# lk: filter updates again against the cache now that it is available
# images in cache don't need to be downloaded again.
downloads = [
image
for image in updates
if (cache_data.get(image["type"], None) is None or json_data[image["type"]] != image["md5"])
]
# Download
# # lk: Keep this here, so I don't have to go looking for it again,
# # lk: it might be useful in the future.
# if use_async:
# session = FuturesSession(max_workers=len(self.__img_types))
# image_requests = []
# for image in downloads:
# logger.info(f"Downloading {image['type']} for {game.app_title}")
# json_data[image["type"]] = image["md5"]
# if image["type"] in self.__img_tall_types:
# payload = {"resize": 1, "w": ImageSize.Image.size.width(), "h": ImageSize.Image.size.height()}
# elif image["type"] in self.__img_wide_types:
# payload = {"resize": 1, "w": ImageSize.ImageWide.size.width(), "h": ImageSize.ImageWide.size.height()}
# else:
# # Set the larger of the sizes for everything else
# payload = {"resize": 1, "w": ImageSize.ImageWide.size.width(), "h": ImageSize.ImageWide.size.height()}
# req = session.get(image["url"], params=payload)
# req.image_type = image["type"]
# image_requests.append(req)
# for req in futures.as_completed(image_requests):
# cache_data[req.image_type] = req.result().content
# else:
for image in downloads:
logger.info(f"Downloading {image['type']} for {game.app_name} ({game.app_title})")
json_data[image["type"]] = image["md5"]
if image["type"] in self.__img_tall_types:
payload = {"resize": 1, "w": ImageSize.Tall.size.width(), "h": ImageSize.Tall.size.height()}
elif image["type"] in self.__img_wide_types:
payload = {"resize": 1, "w": ImageSize.Wide.size.width(), "h": ImageSize.Wide.size.height()}
else:
# Set the larger of the sizes for everything else
payload = {"resize": 1, "w": ImageSize.Wide.size.width(), "h": ImageSize.Wide.size.height()}
try:
# cache_data[image["type"]] = requests.get(image["url"], params=payload).content
cache_data[image["type"]] = requests.get(image["url"], params=payload, timeout=10).content
except Exception as e:
logger.error(e)
return False
self.__convert(game, cache_data)
# lk: don't keep the cache if there is no logo (kept for me)
# if cache_data["DieselGameBoxLogo"] is not None:
# self.__compress(game, cache_data)
self.__compress(game, cache_data)
# hash image cache
try:
with open(self.__img_cache(game.app_name), "rb") as archive:
archive_hash = hashlib.md5(archive.read()).hexdigest()
except FileNotFoundError:
archive_hash = None
json_data["cache"] = archive_hash
json_data["scale"] = ImageSize.Tall.pixel_ratio
json_data["size"] = {"w": ImageSize.Tall.size.width(), "h": ImageSize.Tall.size.height()}
# write image.json
with open(self.__img_json(game.app_name), "w") as file:
json.dump(json_data, file)
return bool(updates)
__icon_overlay: Optional[QPainterPath] = None
@staticmethod
def __generate_icon_overlay(rect: QRect) -> QPainterPath:
if ImageManager.__icon_overlay is not None:
return ImageManager.__icon_overlay
rounded_path = QPainterPath()
margin = 0.05
rounded_path.addRoundedRect(
QRectF(
rect.width() * margin,
rect.height() * margin,
rect.width() - (rect.width() * margin * 2),
rect.height() - (rect.width() * margin * 2),
),
rect.height() * 0.2,
rect.height() * 0.2,
)
ImageManager.__icon_overlay = rounded_path
return ImageManager.__icon_overlay
@staticmethod
def __convert_icon(cover: QImage) -> QImage:
icon_size = QSize(128, 128)
icon = QImage(icon_size, QImage.Format_ARGB32_Premultiplied)
painter = QPainter(icon)
painter.setRenderHint(QPainter.SmoothPixmapTransform, True)
painter.setRenderHint(QPainter.Antialiasing, True)
painter.setCompositionMode(QPainter.CompositionMode_Source)
painter.fillRect(icon.rect(), Qt.transparent)
overlay = ImageManager.__generate_icon_overlay(icon.rect())
brush = QBrush(cover)
scale = max(icon.width() / cover.width(), icon.height() / cover.height())
transform = QTransform().scale(scale, scale)
brush.setTransform(transform)
painter.fillPath(overlay, brush)
pen = QPen(Qt.black, 2)
painter.setPen(pen)
painter.drawPath(overlay)
painter.end()
return icon
def __convert(self, game, images, force=False) -> None:
for file in self.__img_all(game.app_name):
if force and file.exists():
file.unlink(missing_ok=True)
def find_image_data(image_types: Tuple):
data = None
for image_type in image_types:
if images.get(image_type, None) is not None:
data = images[image_type]
break
return data
tall_data = find_image_data(self.__img_tall_types)
wide_data = find_image_data(self.__img_wide_types)
logo_data = find_image_data(self.__img_logo_types)
def convert_image(image_data, logo_data, preset: ImageSize.Preset) -> QImage:
image = QImage()
image.loadFromData(image_data)
image.convertToFormat(QImage.Format_ARGB32_Premultiplied)
# lk: Images are not always at the correct aspect ratio, so crop them to size
wr, hr = preset.aspect_ratio
factor = min(image.width() // wr, image.height() // hr)
rem_w = (image.width() - factor * wr) // 2
rem_h = (image.height() - factor * hr) // 2
image = image.copy(rem_w, rem_h, factor * wr, factor * hr)
if logo_data is not None:
logo = QImage()
logo.loadFromData(logo_data)
logo.convertToFormat(QImage.Format_ARGB32_Premultiplied)
if logo.width() > image.width():
logo = logo.scaled(image.width(), image.height(), Qt.KeepAspectRatio, Qt.SmoothTransformation)
painter = QPainter(image)
painter.drawImage((image.width() - logo.width()) // 2, image.height() - logo.height(), logo)
painter.end()
return image.scaled(preset.size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
tall = convert_image(tall_data, logo_data, ImageSize.Tall)
wide = convert_image(wide_data, logo_data, ImageSize.Wide)
icon = self.__convert_icon(tall)
icon.save(desktop_icon_path(game.app_name).as_posix(), format=desktop_icon_suffix().upper())
def save_image(image: QImage, color_path: Path, gray_path: Path):
# this is not required if we ever want to re-apply the alpha channel
# image = image.convertToFormat(QImage.Format_Indexed8)
# add the alpha channel back to the cover
image = image.convertToFormat(QImage.Format_ARGB32_Premultiplied)
image.save(color_path.as_posix(), format="PNG")
# quick way to convert to grayscale, but keep the alpha channel
alpha = image.convertToFormat(QImage.Format_Alpha8)
image = image.convertToFormat(QImage.Format_Grayscale8)
# add the alpha channel back to the grayscale cover
image = image.convertToFormat(QImage.Format_ARGB32_Premultiplied)
image.setAlphaChannel(alpha)
image.save(gray_path.as_posix(), format="PNG")
save_image(icon, image_icon_path(game.app_name), image_icon_path(game.app_name, color=False))
save_image(tall, image_tall_path(game.app_name), image_tall_path(game.app_name, color=False))
save_image(wide, image_wide_path(game.app_name), image_wide_path(game.app_name, color=False))
def __compress(self, game: Game, data: Dict) -> None:
archive = open(self.__img_cache(game.app_name), "wb")
cdata = zlib.compress(pickle.dumps(data), level=-1)
archive.write(cdata)
archive.close()
def __decompress(self, game: Game) -> Dict:
archive = open(self.__img_cache(game.app_name), "rb")
try:
data = zlib.decompress(archive.read())
data = pickle.loads(data)
except zlib.error:
data = dict(zip(self.__img_types, [None] * len(self.__img_types)))
finally:
archive.close()
return data
def __append_to_queue(self, game: Game):
self.__worker_app_names.add(game.app_name)
@pyqtSlot(object)
def __remove_from_queue(self, game: Game):
self.__worker_app_names.remove(game.app_name)
def download_image(
self, game: Game, load_callback: Callable[[], None], priority: int, force: bool = False
) -> None:
if game.app_name in self.__worker_app_names:
return
self.__append_to_queue(game)
updates, json_data = self.__prepare_download(game, force)
if not updates:
self.__remove_from_queue(game)
load_callback()
else:
# Copy the data because we are going to be a thread and we modify them later on
image_worker = ImageManager.Worker(self.__download, updates.copy(), json_data.copy(), game)
image_worker.signals.completed.connect(self.__remove_from_queue)
image_worker.signals.completed.connect(load_callback)
self.threadpool.start(image_worker, priority)
def download_image_launch(
self, game: Game, callback: Callable[[Game], None], priority: int, force: bool = False
) -> None:
if self.__img_cache(game.app_name).is_file() and not force:
return
def _callback():
callback(game)
self.download_image(game, _callback, priority, force)
def download_image_blocking(self, game: Game, force: bool = False) -> None:
updates, json_data = self.__prepare_download(game, force)
if not updates:
return
if updates:
self.__download(updates, json_data, game, use_async=True)
@staticmethod
def __get_cover(
container: Union[Type[QPixmap], Type[QImage]], app_name: str, preset: ImageSize.Preset, color: bool,
) -> Union[QPixmap, QImage]:
ret = container()
if preset.orientation == ImageType.Icon:
if image_icon_path(app_name, color).is_file():
ret.load(image_icon_path(app_name, color).as_posix())
elif preset.orientation == ImageType.Tall:
if image_tall_path(app_name, color).is_file():
ret.load(image_tall_path(app_name, color).as_posix())
elif preset.orientation == ImageType.Wide:
if image_wide_path(app_name, color).is_file():
ret.load(image_wide_path(app_name, color).as_posix())
else:
raise RuntimeError("Unknown image preset")
if not ret.isNull():
device = ImageSize.Preset(
divisor=preset.base.divisor,
pixel_ratio=QApplication.instance().devicePixelRatio(),
orientation=preset.base.orientation,
base=preset
)
ret.setDevicePixelRatio(preset.pixel_ratio)
# lk: Scaling happens at painting. It might be inefficient so leave this here as an alternative
# lk: If this is uncommented, the transformation in ImageWidget should be adjusted also
ret = ret.scaled(device.size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
ret.setDevicePixelRatio(device.pixel_ratio)
return ret
def get_pixmap(self, app_name: str, preset: ImageSize.Preset, color: bool = True) -> QPixmap:
"""
Use when the image is to be presented directly on the screen.
@param app_name: The RareGame object for this game
@param preset:
@param color: True to load the colored pixmap, False to load the grayscale
@return: QPixmap
"""
pixmap: QPixmap = self.__get_cover(QPixmap, app_name, preset, color)
return pixmap
def get_image(self, app_name: str, preset: ImageSize.Preset, color: bool = True) -> QImage:
"""
Use when the image has to be manipulated before being rendered.
@param app_name: The RareGame object for this game
@param preset:
@param color: True to load the colored image, False to load the grayscale
@return: QImage
"""
image: QImage = self.__get_cover(QImage, app_name, preset, color)
return image