1
0
Fork 0
mirror of synced 2024-06-02 02:34:40 +12:00

Add custom Legendary version

This commit is contained in:
Dummerle 2021-03-15 11:48:28 +01:00
parent d0c2059158
commit f704b24ae0
29 changed files with 6250 additions and 0 deletions

4
legendary/__init__.py Normal file
View file

@ -0,0 +1,4 @@
"""Legendary!"""
__version__ = '0.20.6'
__codename__ = 'A Red Letter Day'

View file

167
legendary/api/egs.py Normal file
View file

@ -0,0 +1,167 @@
# !/usr/bin/env python
# coding: utf-8
import requests
import logging
from requests.auth import HTTPBasicAuth
from legendary.models.exceptions import InvalidCredentialsError
class EPCAPI:
_user_agent = 'UELauncher/11.0.1-14907503+++Portal+Release-Live Windows/10.0.19041.1.256.64bit'
# required for the oauth request
_user_basic = '34a02cf8f4414e29b15921876da36f9a'
_pw_basic = 'daafbccc737745039dffe53d94fc76cf'
_oauth_host = 'account-public-service-prod03.ol.epicgames.com'
_launcher_host = 'launcher-public-service-prod06.ol.epicgames.com'
_entitlements_host = 'entitlement-public-service-prod08.ol.epicgames.com'
_catalog_host = 'catalog-public-service-prod06.ol.epicgames.com'
_ecommerce_host = 'ecommerceintegration-public-service-ecomprod02.ol.epicgames.com'
_datastorage_host = 'datastorage-public-service-liveegs.live.use1a.on.epicgames.com'
_library_host = 'library-service.live.use1a.on.epicgames.com'
def __init__(self, lc='en', cc='US'):
self.session = requests.session()
self.log = logging.getLogger('EPCAPI')
self.unauth_session = requests.session()
self.session.headers['User-Agent'] = self._user_agent
self.unauth_session.headers['User-Agent'] = self._user_agent
self._oauth_basic = HTTPBasicAuth(self._user_basic, self._pw_basic)
self.access_token = None
self.user = None
self.language_code = lc
self.country_code = cc
def resume_session(self, session):
self.session.headers['Authorization'] = f'bearer {session["access_token"]}'
r = self.session.get(f'https://{self._oauth_host}/account/api/oauth/verify')
if r.status_code >= 500:
r.raise_for_status()
j = r.json()
if 'errorMessage' in j:
self.log.warning(f'Login to EGS API failed with errorCode: {j["errorCode"]}')
raise InvalidCredentialsError(j['errorCode'])
# update other data
session.update(j)
self.user = session
return self.user
def start_session(self, refresh_token: str = None, exchange_token: str = None) -> dict:
if refresh_token:
params = dict(grant_type='refresh_token',
refresh_token=refresh_token,
token_type='eg1')
elif exchange_token:
params = dict(grant_type='exchange_code',
exchange_code=exchange_token,
token_type='eg1')
else:
raise ValueError('At least one token type must be specified!')
r = self.session.post(f'https://{self._oauth_host}/account/api/oauth/token',
data=params, auth=self._oauth_basic)
# Only raise HTTP exceptions on server errors
if r.status_code >= 500:
r.raise_for_status()
j = r.json()
if 'error' in j:
self.log.warning(f'Login to EGS API failed with errorCode: {j["errorCode"]}')
raise InvalidCredentialsError(j['errorCode'])
self.user = j
self.session.headers['Authorization'] = f'bearer {self.user["access_token"]}'
return self.user
def invalidate_session(self): # unused
r = self.session.delete(f'https://{self._oauth_host}/account/api/oauth/sessions/kill/{self.access_token}')
def get_game_token(self):
r = self.session.get(f'https://{self._oauth_host}/account/api/oauth/exchange')
r.raise_for_status()
return r.json()
def get_ownership_token(self, namespace, catalog_item_id):
user_id = self.user.get('account_id')
r = self.session.post(f'https://{self._ecommerce_host}/ecommerceintegration/api/public/'
f'platforms/EPIC/identities/{user_id}/ownershipToken',
data=dict(nsCatalogItemId=f'{namespace}:{catalog_item_id}'))
r.raise_for_status()
return r.content
def get_game_assets(self, platform='Windows', label='Live'):
r = self.session.get(f'https://{self._launcher_host}/launcher/api/public/assets/{platform}',
params=dict(label=label))
r.raise_for_status()
return r.json()
def get_game_manifest(self, namespace, catalog_item_id, app_name, platform='Windows', label='Live'):
r = self.session.get(f'https://{self._launcher_host}/launcher/api/public/assets/v2/platform'
f'/{platform}/namespace/{namespace}/catalogItem/{catalog_item_id}/app'
f'/{app_name}/label/{label}')
r.raise_for_status()
return r.json()
def get_user_entitlements(self):
user_id = self.user.get('account_id')
r = self.session.get(f'https://{self._entitlements_host}/entitlement/api/account/{user_id}/entitlements',
params=dict(start=0, count=5000))
r.raise_for_status()
return r.json()
def get_game_info(self, namespace, catalog_item_id):
r = self.session.get(f'https://{self._catalog_host}/catalog/api/shared/namespace/{namespace}/bulk/items',
params=dict(id=catalog_item_id, includeDLCDetails=True, includeMainGameDetails=True,
country=self.country_code, locale=self.language_code))
r.raise_for_status()
return r.json().get(catalog_item_id, None)
def get_library_items(self, include_metadata=True):
records = []
r = self.session.get(f'https://{self._library_host}/library/api/public/items',
params=dict(includeMetadata=include_metadata))
r.raise_for_status()
j = r.json()
records.extend(j['records'])
# Fetch remaining library entries as long as there is a cursor
while cursor := j['responseMetadata'].get('nextCursor', None):
r = self.session.get(f'https://{self._library_host}/library/api/public/items',
params=dict(includeMetadata=include_metadata, cursor=cursor))
r.raise_for_status()
j = r.json()
records.extend(j['records'])
return records
def get_user_cloud_saves(self, app_name='', manifests=False, filenames=None):
if app_name and manifests:
app_name += '/manifests/'
elif app_name:
app_name += '/'
user_id = self.user.get('account_id')
if filenames:
r = self.session.post(f'https://{self._datastorage_host}/api/v1/access/egstore/savesync/'
f'{user_id}/{app_name}', json=dict(files=filenames))
else:
r = self.session.get(f'https://{self._datastorage_host}/api/v1/access/egstore/savesync/'
f'{user_id}/{app_name}')
r.raise_for_status()
return r.json()
def create_game_cloud_saves(self, app_name, filenames):
return self.get_user_cloud_saves(app_name, filenames=filenames)
def delete_game_cloud_save_file(self, path):
url = f'https://{self._datastorage_host}/api/v1/data/egstore/{path}'
r = self.session.delete(url)
r.raise_for_status()

1248
legendary/cli.py Normal file

File diff suppressed because it is too large Load diff

1396
legendary/core.py Normal file

File diff suppressed because it is too large Load diff

View file

View file

@ -0,0 +1,764 @@
# coding: utf-8
# please don't look at this code too hard, it's a mess.
import logging
import os
import time
from collections import Counter, defaultdict, deque
from logging.handlers import QueueHandler
from multiprocessing import cpu_count, Process, Queue as MPQueue
from multiprocessing.shared_memory import SharedMemory
from queue import Empty
from sys import exit
from threading import Condition, Thread
from legendary.downloader.workers import DLWorker, FileWorker
from legendary.models.downloading import *
from legendary.models.manifest import ManifestComparison, Manifest
class DLManager(Process):
def __init__(self, download_dir, base_url, cache_dir=None, status_q=None,
max_workers=0, update_interval=1.0, dl_timeout=10, resume_file=None,
max_shared_memory=1024 * 1024 * 1024):
super().__init__(name='DLManager')
self.log = logging.getLogger('DLM')
self.proc_debug = False
self.base_url = base_url
self.dl_dir = download_dir
self.cache_dir = cache_dir if cache_dir else os.path.join(download_dir, '.cache')
# All the queues!
self.logging_queue = None
self.dl_worker_queue = None
self.writer_queue = None
self.dl_result_q = None
self.writer_result_q = None
self.max_workers = max_workers if max_workers else min(cpu_count() * 2, 16)
self.dl_timeout = dl_timeout
# Analysis stuff
self.analysis = None
self.tasks = deque()
self.chunks_to_dl = deque()
self.chunk_data_list = None
# shared memory stuff
self.max_shared_memory = max_shared_memory # 1 GiB by default
self.sms = deque()
self.shared_memory = None
# Interval for log updates and pushing updates to the queue
self.update_interval = update_interval
self.status_queue = status_q # queue used to relay status info back to GUI/CLI
# Resume file stuff
self.resume_file = resume_file
self.hash_map = dict()
# cross-thread runtime information
self.running = True
self.active_tasks = 0
self.children = []
self.threads = []
self.conditions = []
# bytes downloaded and decompressed since last report
self.bytes_downloaded_since_last = 0
self.bytes_decompressed_since_last = 0
# bytes written since last report
self.bytes_written_since_last = 0
# bytes read since last report
self.bytes_read_since_last = 0
# chunks written since last report
self.num_processed_since_last = 0
self.num_tasks_processed_since_last = 0
def run_analysis(self, manifest: Manifest, old_manifest: Manifest = None,
patch=True, resume=True, file_prefix_filter=None,
file_exclude_filter=None, file_install_tag=None,
processing_optimization=False) -> AnalysisResult:
"""
Run analysis on manifest and old manifest (if not None) and return a result
with a summary resources required in order to install the provided manifest.
:param manifest: Manifest to install
:param old_manifest: Old manifest to patch from (if applicable)
:param patch: Patch instead of redownloading the entire file
:param resume: Continue based on resume file if it exists
:param file_prefix_filter: Only download files that start with this prefix
:param file_exclude_filter: Exclude files with this prefix from download
:param file_install_tag: Only install files with the specified tag
:param processing_optimization: Attempt to optimize processing order and RAM usage
:return: AnalysisResult
"""
analysis_res = AnalysisResult()
analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements)
analysis_res.biggest_chunk = max(c.window_size for c in manifest.chunk_data_list.elements)
analysis_res.biggest_file_size = max(f.file_size for f in manifest.file_manifest_list.elements)
is_1mib = analysis_res.biggest_chunk == 1024 * 1024
self.log.debug(f'Biggest chunk size: {analysis_res.biggest_chunk} bytes (== 1 MiB? {is_1mib})')
self.log.debug(f'Creating manifest comparison...')
mc = ManifestComparison.create(manifest, old_manifest)
analysis_res.manifest_comparison = mc
if resume and self.resume_file and os.path.exists(self.resume_file):
self.log.info('Found previously interrupted download. Download will be resumed if possible.')
try:
missing = 0
mismatch = 0
completed_files = set()
for line in open(self.resume_file).readlines():
file_hash, _, filename = line.strip().partition(':')
_p = os.path.join(self.dl_dir, filename)
if not os.path.exists(_p):
self.log.debug(f'File does not exist but is in resume file: "{_p}"')
missing += 1
elif file_hash != manifest.file_manifest_list.get_file_by_path(filename).sha_hash.hex():
mismatch += 1
else:
completed_files.add(filename)
if missing:
self.log.warning(f'{missing} previously completed file(s) are missing, they will be redownloaded.')
if mismatch:
self.log.warning(f'{mismatch} existing file(s) have been changed and will be redownloaded.')
# remove completed files from changed/added and move them to unchanged for the analysis.
mc.added -= completed_files
mc.changed -= completed_files
mc.unchanged |= completed_files
self.log.info(f'Skipping {len(completed_files)} files based on resume data.')
except Exception as e:
self.log.warning(f'Reading resume file failed: {e!r}, continuing as normal...')
# Install tags are used for selective downloading, e.g. for language packs
additional_deletion_tasks = []
if file_install_tag is not None:
if isinstance(file_install_tag, str):
file_install_tag = [file_install_tag]
files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements
if not any((fit in i.install_tags) or (not fit and not i.install_tags)
for fit in file_install_tag))
self.log.info(f'Found {len(files_to_skip)} files to skip based on install tag.')
mc.added -= files_to_skip
mc.changed -= files_to_skip
mc.unchanged |= files_to_skip
for fname in sorted(files_to_skip):
additional_deletion_tasks.append(FileTask(fname, delete=True, silent=True))
# if include/exclude prefix has been set: mark all files that are not to be downloaded as unchanged
if file_exclude_filter:
if isinstance(file_exclude_filter, str):
file_exclude_filter = [file_exclude_filter]
file_exclude_filter = [f.lower() for f in file_exclude_filter]
files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements if
any(i.filename.lower().startswith(pfx) for pfx in file_exclude_filter))
self.log.info(f'Found {len(files_to_skip)} files to skip based on exclude prefix.')
mc.added -= files_to_skip
mc.changed -= files_to_skip
mc.unchanged |= files_to_skip
if file_prefix_filter:
if isinstance(file_prefix_filter, str):
file_prefix_filter = [file_prefix_filter]
file_prefix_filter = [f.lower() for f in file_prefix_filter]
files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements if not
any(i.filename.lower().startswith(pfx) for pfx in file_prefix_filter))
self.log.info(f'Found {len(files_to_skip)} files to skip based on include prefix(es)')
mc.added -= files_to_skip
mc.changed -= files_to_skip
mc.unchanged |= files_to_skip
if file_prefix_filter or file_exclude_filter or file_install_tag:
self.log.info(f'Remaining files after filtering: {len(mc.added) + len(mc.changed)}')
# correct install size after filtering
analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements
if fm.filename in mc.added)
if mc.removed:
analysis_res.removed = len(mc.removed)
self.log.debug(f'{analysis_res.removed} removed files')
if mc.added:
analysis_res.added = len(mc.added)
self.log.debug(f'{analysis_res.added} added files')
if mc.changed:
analysis_res.changed = len(mc.changed)
self.log.debug(f'{analysis_res.changed} changed files')
if mc.unchanged:
analysis_res.unchanged = len(mc.unchanged)
self.log.debug(f'{analysis_res.unchanged} unchanged files')
if processing_optimization and len(manifest.file_manifest_list.elements) > 100_000:
self.log.warning('Manifest contains too many files, processing optimizations will be disabled.')
processing_optimization = False
elif processing_optimization:
self.log.info('Processing order optimization is enabled, analysis may take a few seconds longer...')
# count references to chunks for determining runtime cache size later
references = Counter()
fmlist = sorted(manifest.file_manifest_list.elements,
key=lambda a: a.filename.lower())
for fm in fmlist:
self.hash_map[fm.filename] = fm.sha_hash.hex()
# chunks of unchanged files are not downloaded so we can skip them
if fm.filename in mc.unchanged:
analysis_res.unchanged += fm.file_size
continue
for cp in fm.chunk_parts:
references[cp.guid_num] += 1
if processing_optimization:
s_time = time.time()
# reorder the file manifest list to group files that share many chunks
# 4 is mostly arbitrary but has shown in testing to be a good choice
min_overlap = 4
# ignore files with less than N chunk parts, this speeds things up dramatically
cp_threshold = 5
remaining_files = {fm.filename: {cp.guid_num for cp in fm.chunk_parts}
for fm in fmlist if fm.filename not in mc.unchanged}
_fmlist = []
# iterate over all files that will be downloaded and pair up those that share the most chunks
for fm in fmlist:
if fm.filename not in remaining_files:
continue
_fmlist.append(fm)
f_chunks = remaining_files.pop(fm.filename)
if len(f_chunks) < cp_threshold:
continue
best_overlap, match = 0, None
for fname, chunks in remaining_files.items():
if len(chunks) < cp_threshold:
continue
overlap = len(f_chunks & chunks)
if overlap > min_overlap and overlap > best_overlap:
best_overlap, match = overlap, fname
if match:
_fmlist.append(manifest.file_manifest_list.get_file_by_path(match))
remaining_files.pop(match)
fmlist = _fmlist
opt_delta = time.time() - s_time
self.log.debug(f'Processing optimizations took {opt_delta:.01f} seconds.')
# determine reusable chunks and prepare lookup table for reusable ones
re_usable = defaultdict(dict)
if old_manifest and mc.changed and patch:
self.log.debug('Analyzing manifests for re-usable chunks...')
for changed in mc.changed:
old_file = old_manifest.file_manifest_list.get_file_by_path(changed)
new_file = manifest.file_manifest_list.get_file_by_path(changed)
existing_chunks = defaultdict(list)
off = 0
for cp in old_file.chunk_parts:
existing_chunks[cp.guid_num].append((off, cp.offset, cp.offset + cp.size))
off += cp.size
for cp in new_file.chunk_parts:
key = (cp.guid_num, cp.offset, cp.size)
for file_o, cp_o, cp_end_o in existing_chunks[cp.guid_num]:
# check if new chunk part is wholly contained in the old chunk part
if cp_o <= cp.offset and (cp.offset + cp.size) <= cp_end_o:
references[cp.guid_num] -= 1
re_usable[changed][key] = file_o + (cp.offset - cp_o)
analysis_res.reuse_size += cp.size
break
last_cache_size = current_cache_size = 0
# set to determine whether a file is currently cached or not
cached = set()
# Using this secondary set is orders of magnitude faster than checking the deque.
chunks_in_dl_list = set()
# This is just used to count all unique guids that have been cached
dl_cache_guids = set()
# run through the list of files and create the download jobs and also determine minimum
# runtime cache requirement by simulating adding/removing from cache during download.
self.log.debug('Creating filetasks and chunktasks...')
for current_file in fmlist:
# skip unchanged and empty files
if current_file.filename in mc.unchanged:
continue
elif not current_file.chunk_parts:
self.tasks.append(FileTask(current_file.filename, empty=True))
continue
existing_chunks = re_usable.get(current_file.filename, None)
chunk_tasks = []
reused = 0
for cp in current_file.chunk_parts:
ct = ChunkTask(cp.guid_num, cp.offset, cp.size)
# re-use the chunk from the existing file if we can
if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks:
reused += 1
ct.chunk_file = current_file.filename
ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)]
else:
# add to DL list if not already in it
if cp.guid_num not in chunks_in_dl_list:
self.chunks_to_dl.append(cp.guid_num)
chunks_in_dl_list.add(cp.guid_num)
# if chunk has more than one use or is already in cache,
# check if we need to add or remove it again.
if references[cp.guid_num] > 1 or cp.guid_num in cached:
references[cp.guid_num] -= 1
# delete from cache if no references left
if references[cp.guid_num] < 1:
current_cache_size -= analysis_res.biggest_chunk
cached.remove(cp.guid_num)
ct.cleanup = True
# add to cache if not already cached
elif cp.guid_num not in cached:
dl_cache_guids.add(cp.guid_num)
cached.add(cp.guid_num)
current_cache_size += analysis_res.biggest_chunk
else:
ct.cleanup = True
chunk_tasks.append(ct)
if reused:
self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}')
# open temporary file that will contain download + old file contents
self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True))
self.tasks.extend(chunk_tasks)
self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True))
# delete old file and rename temporary
self.tasks.append(FileTask(current_file.filename, delete=True, rename=True,
temporary_filename=current_file.filename + u'.tmp'))
else:
self.tasks.append(FileTask(current_file.filename, fopen=True))
self.tasks.extend(chunk_tasks)
self.tasks.append(FileTask(current_file.filename, close=True))
# check if runtime cache size has changed
if current_cache_size > last_cache_size:
self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB')
last_cache_size = current_cache_size
self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.')
analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe
# Todo implement on-disk caching to avoid this issue.
if analysis_res.min_memory > self.max_shared_memory:
shared_mib = f'{self.max_shared_memory / 1024 / 1024:.01f} MiB'
required_mib = f'{analysis_res.min_memory / 1024 / 1024:.01f} MiB'
suggested_mib = round(self.max_shared_memory / 1024 / 1024 +
(analysis_res.min_memory - self.max_shared_memory) / 1024 / 1024 + 32)
if processing_optimization:
message = f'Try running legendary with "--enable-reordering --max-shared-memory {suggested_mib:.0f}"'
else:
message = 'Try running legendary with "--enable-reordering" to reduce memory usage, ' \
f'or use "--max-shared-memory {suggested_mib:.0f}" to increase the limit.'
raise MemoryError(f'Current shared memory cache is smaller than required: {shared_mib} < {required_mib}. '
+ message)
# calculate actual dl and patch write size.
analysis_res.dl_size = \
sum(c.file_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list)
analysis_res.uncompressed_dl_size = \
sum(c.window_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list)
# add jobs to remove files
for fname in mc.removed:
self.tasks.append(FileTask(fname, delete=True))
self.tasks.extend(additional_deletion_tasks)
analysis_res.num_chunks_cache = len(dl_cache_guids)
self.chunk_data_list = manifest.chunk_data_list
self.analysis = analysis_res
return analysis_res
def download_job_manager(self, task_cond: Condition, shm_cond: Condition):
while self.chunks_to_dl and self.running:
while self.active_tasks < self.max_workers * 2 and self.chunks_to_dl:
try:
sms = self.sms.popleft()
no_shm = False
except IndexError: # no free cache
no_shm = True
break
c_guid = self.chunks_to_dl.popleft()
chunk = self.chunk_data_list.get_chunk_by_guid(c_guid)
self.log.debug(f'Adding {chunk.guid_num} (active: {self.active_tasks})')
try:
self.dl_worker_queue.put(DownloaderTask(url=self.base_url + '/' + chunk.path,
chunk_guid=c_guid, shm=sms),
timeout=1.0)
except Exception as e:
self.log.warning(f'Failed to add to download queue: {e!r}')
self.chunks_to_dl.appendleft(c_guid)
break
self.active_tasks += 1
else:
# active tasks limit hit, wait for tasks to finish
with task_cond:
self.log.debug('Waiting for download tasks to complete..')
task_cond.wait(timeout=1.0)
continue
if no_shm:
# if we break we ran out of shared memory, so wait for that.
with shm_cond:
self.log.debug('Waiting for more shared memory...')
shm_cond.wait(timeout=1.0)
self.log.debug('Download Job Manager quitting...')
def dl_results_handler(self, task_cond: Condition):
in_buffer = dict()
task = self.tasks.popleft()
current_file = ''
while task and self.running:
if isinstance(task, FileTask): # this wasn't necessarily a good idea...
try:
if task.empty:
self.writer_queue.put(WriterTask(task.filename, empty=True), timeout=1.0)
elif task.rename:
self.writer_queue.put(WriterTask(task.filename, rename=True,
delete=task.delete,
old_filename=task.temporary_filename),
timeout=1.0)
elif task.delete:
self.writer_queue.put(WriterTask(task.filename, delete=True, silent=task.silent), timeout=1.0)
elif task.open:
self.writer_queue.put(WriterTask(task.filename, fopen=True), timeout=1.0)
current_file = task.filename
elif task.close:
self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0)
except Exception as e:
self.tasks.appendleft(task)
self.log.warning(f'Adding to queue failed: {e!r}')
continue
try:
task = self.tasks.popleft()
except IndexError: # finished
break
continue
while (task.chunk_guid in in_buffer) or task.chunk_file:
res_shm = None
if not task.chunk_file: # not re-using from an old file
res_shm = in_buffer[task.chunk_guid].shm
try:
self.log.debug(f'Adding {task.chunk_guid} to writer queue')
self.writer_queue.put(WriterTask(
filename=current_file, shared_memory=res_shm,
chunk_offset=task.chunk_offset, chunk_size=task.chunk_size,
chunk_guid=task.chunk_guid, release_memory=task.cleanup,
old_file=task.chunk_file # todo on-disk cache
), timeout=1.0)
except Exception as e:
self.log.warning(f'Adding to queue failed: {e!r}')
break
if task.cleanup and not task.chunk_file:
del in_buffer[task.chunk_guid]
try:
task = self.tasks.popleft()
if isinstance(task, FileTask):
break
except IndexError: # finished
task = None
break
else: # only enter blocking code if the loop did not break
try:
res = self.dl_result_q.get(timeout=1)
self.active_tasks -= 1
with task_cond:
task_cond.notify()
if res.success:
self.log.debug(f'Download for {res.guid} succeeded, adding to in_buffer...')
in_buffer[res.guid] = res
self.bytes_downloaded_since_last += res.compressed_size
self.bytes_decompressed_since_last += res.size
else:
self.log.error(f'Download for {res.guid} failed, retrying...')
try:
self.dl_worker_queue.put(DownloaderTask(
url=res.url, chunk_guid=res.guid, shm=res.shm
), timeout=1.0)
self.active_tasks += 1
except Exception as e:
self.log.warning(f'Failed adding retry task to queue! {e!r}')
# If this failed for whatever reason, put the chunk at the front of the DL list
self.chunks_to_dl.appendleft(res.chunk_guid)
except Empty:
pass
except Exception as e:
self.log.warning(f'Unhandled exception when trying to read download result queue: {e!r}')
self.log.debug('Download result handler quitting...')
def fw_results_handler(self, shm_cond: Condition):
while self.running:
try:
res = self.writer_result_q.get(timeout=1.0)
self.num_tasks_processed_since_last += 1
if res.closed and self.resume_file and res.success:
if res.filename.endswith('.tmp'):
res.filename = res.filename[:-4]
file_hash = self.hash_map[res.filename]
# write last completed file to super simple resume file
with open(self.resume_file, 'ab') as rf:
rf.write(f'{file_hash}:{res.filename}\n'.encode('utf-8'))
if res.kill:
self.log.debug('Got termination command in FW result handler')
break
if not res.success:
# todo make this kill the installation process or at least skip the file and mark it as failed
self.log.fatal(f'Writing for {res.filename} failed!')
if res.release_memory:
self.sms.appendleft(res.shm)
with shm_cond:
shm_cond.notify()
if res.chunk_guid:
self.bytes_written_since_last += res.size
# if there's no shared memory we must have read from disk.
if not res.shm:
self.bytes_read_since_last += res.size
self.num_processed_since_last += 1
except Empty:
continue
except Exception as e:
self.log.warning(f'Exception when trying to read writer result queue: {e!r}')
self.log.debug('Writer result handler quitting...')
def run(self):
if not self.analysis:
raise ValueError('Did not run analysis before trying to run download!')
# Subprocess will use its own root logger that logs to a Queue instead
_root = logging.getLogger()
_root.setLevel(logging.DEBUG if self.proc_debug else logging.INFO)
if self.logging_queue:
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
self.log = logging.getLogger('DLManager')
self.log.info(f'Download Manager running with process-id: {os.getpid()}')
try:
self.run_real()
except KeyboardInterrupt:
self.log.warning('Immediate exit requested!')
self.running = False
# send conditions to unlock threads if they aren't already
for cond in self.conditions:
with cond:
cond.notify()
# make sure threads are dead.
for t in self.threads:
t.join(timeout=5.0)
if t.is_alive():
self.log.warning(f'Thread did not terminate! {repr(t)}')
# clean up all the queues, otherwise this process won't terminate properly
for name, q in zip(('Download jobs', 'Writer jobs', 'Download results', 'Writer results'),
(self.dl_worker_queue, self.writer_queue, self.dl_result_q, self.writer_result_q)):
self.log.debug(f'Cleaning up queue "{name}"')
try:
while True:
_ = q.get_nowait()
except Empty:
q.close()
q.join_thread()
def run_real(self):
self.shared_memory = SharedMemory(create=True, size=self.max_shared_memory)
self.log.debug(f'Created shared memory of size: {self.shared_memory.size / 1024 / 1024:.02f} MiB')
# create the shared memory segments and add them to their respective pools
for i in range(int(self.shared_memory.size / self.analysis.biggest_chunk)):
_sms = SharedMemorySegment(offset=i * self.analysis.biggest_chunk,
end=i * self.analysis.biggest_chunk + self.analysis.biggest_chunk)
self.sms.append(_sms)
self.log.debug(f'Created {len(self.sms)} shared memory segments.')
# Create queues
self.dl_worker_queue = MPQueue(-1)
self.writer_queue = MPQueue(-1)
self.dl_result_q = MPQueue(-1)
self.writer_result_q = MPQueue(-1)
self.log.info(f'Starting download workers...')
for i in range(self.max_workers):
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q,
self.shared_memory.name, logging_queue=self.logging_queue,
dl_timeout=self.dl_timeout)
self.children.append(w)
w.start()
self.log.info('Starting file writing worker...')
writer_p = FileWorker(self.writer_queue, self.writer_result_q, self.dl_dir,
self.shared_memory.name, self.cache_dir, self.logging_queue)
self.children.append(writer_p)
writer_p.start()
num_chunk_tasks = sum(isinstance(t, ChunkTask) for t in self.tasks)
num_dl_tasks = len(self.chunks_to_dl)
num_tasks = len(self.tasks)
num_shared_memory_segments = len(self.sms)
self.log.debug(f'Chunks to download: {num_dl_tasks}, File tasks: {num_tasks}, Chunk tasks: {num_chunk_tasks}')
# active downloader tasks
self.active_tasks = 0
processed_chunks = 0
processed_tasks = 0
total_dl = 0
total_write = 0
# synchronization conditions
shm_cond = Condition()
task_cond = Condition()
self.conditions = [shm_cond, task_cond]
# start threads
s_time = time.time()
self.threads.append(Thread(target=self.download_job_manager, args=(task_cond, shm_cond)))
self.threads.append(Thread(target=self.dl_results_handler, args=(task_cond,)))
self.threads.append(Thread(target=self.fw_results_handler, args=(shm_cond,)))
for t in self.threads:
t.start()
last_update = time.time()
while processed_tasks < num_tasks:
delta = time.time() - last_update
if not delta:
time.sleep(self.update_interval)
continue
# update all the things
processed_chunks += self.num_processed_since_last
processed_tasks += self.num_tasks_processed_since_last
total_dl += self.bytes_downloaded_since_last
total_write += self.bytes_written_since_last
dl_speed = self.bytes_downloaded_since_last / delta
dl_unc_speed = self.bytes_decompressed_since_last / delta
w_speed = self.bytes_written_since_last / delta
r_speed = self.bytes_read_since_last / delta
# c_speed = self.num_processed_since_last / delta
# set temporary counters to 0
self.bytes_read_since_last = self.bytes_written_since_last = 0
self.bytes_downloaded_since_last = self.num_processed_since_last = 0
self.bytes_decompressed_since_last = self.num_tasks_processed_since_last = 0
last_update = time.time()
perc = (processed_chunks / num_chunk_tasks) * 100
runtime = time.time() - s_time
total_avail = len(self.sms)
total_used = (num_shared_memory_segments - total_avail) * (self.analysis.biggest_chunk / 1024 / 1024)
try:
average_speed = processed_chunks / runtime
estimate = (num_chunk_tasks - processed_chunks) / average_speed
except ZeroDivisionError:
average_speed = estimate = 0
# TODO set current_filename argument of UIUpdate
# send status update to back to instantiator (if queue exists)
if self.status_queue:
try:
self.status_queue.put(UIUpdate(
progress=perc,
runtime=round(runtime),
estimated_time_left=round(estimate),
processed_chunks=processed_chunks,
chunk_tasks=num_chunk_tasks,
total_downloaded=total_dl,
total_written=total_write,
cache_usage=total_used,
active_tasks=self.active_tasks,
download_speed=dl_speed,
download_decompressed_speed=dl_unc_speed,
write_speed=w_speed,
read_speed=r_speed,
), timeout=1.0)
except Exception as e:
self.log.warning(f'Failed to send status update to queue: {e!r}')
time.sleep(self.update_interval)
for i in range(self.max_workers):
self.dl_worker_queue.put_nowait(DownloaderTask(kill=True))
self.log.info('Waiting for installation to finish...')
self.writer_queue.put_nowait(WriterTask('', kill=True))
writer_p.join(timeout=10.0)
if writer_p.exitcode is None:
self.log.warning(f'Terminating writer process, no exit code!')
writer_p.terminate()
# forcibly kill DL workers that are not actually dead yet
for child in self.children:
if child.exitcode is None:
child.terminate()
# make sure all the threads are dead.
for t in self.threads:
t.join(timeout=5.0)
if t.is_alive():
self.log.warning(f'Thread did not terminate! {repr(t)}')
# clean up resume file
if self.resume_file:
try:
os.remove(self.resume_file)
except OSError as e:
self.log.warning(f'Failed to remove resume file: {e!r}')
# close up shared memory
self.shared_memory.close()
self.shared_memory.unlink()
self.shared_memory = None
self.log.info('All done! Download manager quitting...')
# finally, exit the process.
exit(0)

View file

@ -0,0 +1,276 @@
# coding: utf-8
import os
import requests
import time
import logging
from logging.handlers import QueueHandler
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
from queue import Empty
from legendary.models.chunk import Chunk
from legendary.models.downloading import DownloaderTaskResult, WriterTaskResult
class DLWorker(Process):
def __init__(self, name, queue, out_queue, shm, max_retries=5,
logging_queue=None, dl_timeout=10):
super().__init__(name=name)
self.q = queue
self.o_q = out_queue
self.session = requests.session()
self.session.headers.update({
'User-Agent': 'EpicGamesLauncher/11.0.1-14907503+++Portal+Release-Live Windows/10.0.19041.1.256.64bit'
})
self.max_retries = max_retries
self.shm = SharedMemory(name=shm)
self.log_level = logging.getLogger().level
self.logging_queue = logging_queue
self.dl_timeout = float(dl_timeout) if dl_timeout else 10.0
def run(self):
# we have to fix up the logger before we can start
_root = logging.getLogger()
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
logger = logging.getLogger(self.name)
logger.setLevel(self.log_level)
logger.debug(f'Download worker reporting for duty!')
empty = False
while True:
try:
job = self.q.get(timeout=10.0)
empty = False
except Empty:
if not empty:
logger.debug(f'Queue Empty, waiting for more...')
empty = True
continue
if job.kill: # let worker die
logger.debug(f'Worker received kill signal, shutting down...')
break
tries = 0
dl_start = dl_end = 0
compressed = 0
chunk = None
try:
while tries < self.max_retries:
# print('Downloading', job.url)
logger.debug(f'Downloading {job.url}')
dl_start = time.time()
try:
r = self.session.get(job.url, timeout=self.dl_timeout)
r.raise_for_status()
except Exception as e:
logger.warning(f'Chunk download for {job.guid} failed: ({e!r}), retrying...')
continue
dl_end = time.time()
if r.status_code != 200:
logger.warning(f'Chunk download for {job.guid} failed: status {r.status_code}, retrying...')
continue
else:
compressed = len(r.content)
chunk = Chunk.read_buffer(r.content)
break
else:
raise TimeoutError('Max retries reached')
except Exception as e:
logger.error(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
# add failed job to result queue to be requeued
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...')
break
if not chunk:
logger.warning(f'Chunk somehow None?')
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
continue
# decompress stuff
try:
size = len(chunk.data)
if size > job.shm.size:
logger.fatal(f'Downloaded chunk is longer than SharedMemorySegment!')
self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data)
del chunk
self.o_q.put(DownloaderTaskResult(success=True, chunk_guid=job.guid, shm=job.shm,
url=job.url, size=size, compressed_size=compressed,
time_delta=dl_end - dl_start))
except Exception as e:
logger.warning(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
continue
except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...')
break
self.shm.close()
class FileWorker(Process):
def __init__(self, queue, out_queue, base_path, shm, cache_path=None, logging_queue=None):
super().__init__(name='FileWorker')
self.q = queue
self.o_q = out_queue
self.base_path = base_path
self.cache_path = cache_path if cache_path else os.path.join(base_path, '.cache')
self.shm = SharedMemory(name=shm)
self.log_level = logging.getLogger().level
self.logging_queue = logging_queue
def run(self):
# we have to fix up the logger before we can start
_root = logging.getLogger()
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
logger = logging.getLogger(self.name)
logger.setLevel(self.log_level)
logger.debug(f'Download worker reporting for duty!')
last_filename = ''
current_file = None
while True:
try:
try:
j = self.q.get(timeout=10.0)
except Empty:
logger.warning('Writer queue empty!')
continue
if j.kill:
if current_file:
current_file.close()
self.o_q.put(WriterTaskResult(success=True, kill=True))
break
# make directories if required
path = os.path.split(j.filename)[0]
if not os.path.exists(os.path.join(self.base_path, path)):
os.makedirs(os.path.join(self.base_path, path))
full_path = os.path.join(self.base_path, j.filename)
if j.empty: # just create an empty file
open(full_path, 'a').close()
self.o_q.put(WriterTaskResult(success=True, filename=j.filename))
continue
elif j.open:
if current_file:
logger.warning(f'Opening new file {j.filename} without closing previous! {last_filename}')
current_file.close()
current_file = open(full_path, 'wb')
last_filename = j.filename
self.o_q.put(WriterTaskResult(success=True, filename=j.filename))
continue
elif j.close:
if current_file:
current_file.close()
current_file = None
else:
logger.warning(f'Asking to close file that is not open: {j.filename}')
self.o_q.put(WriterTaskResult(success=True, filename=j.filename, closed=True))
continue
elif j.rename:
if current_file:
logger.warning('Trying to rename file without closing first!')
current_file.close()
current_file = None
if j.delete:
try:
os.remove(full_path)
except OSError as e:
logger.error(f'Removing file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename))
continue
try:
os.rename(os.path.join(self.base_path, j.old_filename), full_path)
except OSError as e:
logger.error(f'Renaming file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename))
continue
self.o_q.put(WriterTaskResult(success=True, filename=j.filename))
continue
elif j.delete:
if current_file:
logger.warning('Trying to delete file without closing first!')
current_file.close()
current_file = None
try:
os.remove(full_path)
except OSError as e:
if not j.silent:
logger.error(f'Removing file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=True, filename=j.filename))
continue
pre_write = post_write = 0
try:
if j.shm:
pre_write = time.time()
shm_offset = j.shm.offset + j.chunk_offset
shm_end = shm_offset + j.chunk_size
current_file.write(self.shm.buf[shm_offset:shm_end].tobytes())
post_write = time.time()
elif j.cache_file:
pre_write = time.time()
with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f:
if j.chunk_offset:
f.seek(j.chunk_offset)
current_file.write(f.read(j.chunk_size))
post_write = time.time()
elif j.old_file:
pre_write = time.time()
with open(os.path.join(self.base_path, j.old_file), 'rb') as f:
if j.chunk_offset:
f.seek(j.chunk_offset)
current_file.write(f.read(j.chunk_size))
post_write = time.time()
except Exception as e:
logger.warning(f'Something in writing a file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename,
chunk_guid=j.chunk_guid,
release_memory=j.release_memory,
shm=j.shm, size=j.chunk_size,
time_delta=post_write-pre_write))
else:
self.o_q.put(WriterTaskResult(success=True, filename=j.filename,
chunk_guid=j.chunk_guid,
release_memory=j.release_memory,
shm=j.shm, size=j.chunk_size,
time_delta=post_write-pre_write))
except Exception as e:
logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid))
try:
if current_file:
current_file.close()
current_file = None
except Exception as e:
logger.error(f'Closing file after error failed: {e!r}')
except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...')
if current_file:
current_file.close()
return

View file

82
legendary/lfs/egl.py Normal file
View file

@ -0,0 +1,82 @@
# coding: utf-8
import configparser
import json
import os
from typing import List
from legendary.models.egl import EGLManifest
class EPCLFS:
def __init__(self):
if os.name == 'nt':
self.appdata_path = os.path.expandvars(
r'%LOCALAPPDATA%\EpicGamesLauncher\Saved\Config\Windows'
)
self.programdata_path = os.path.expandvars(
r'%PROGRAMDATA%\Epic\EpicGamesLauncher\Data\Manifests'
)
else:
self.appdata_path = self.programdata_path = None
self.config = configparser.ConfigParser(strict=False)
self.config.optionxform = lambda option: option
self.manifests = dict()
def read_config(self):
if not self.appdata_path:
raise ValueError('EGS AppData path is not set')
self.config.read(os.path.join(self.appdata_path, 'GameUserSettings.ini'))
def save_config(self):
if not self.appdata_path:
raise ValueError('EGS AppData path is not set')
with open(os.path.join(self.appdata_path, 'GameUserSettings.ini'), 'w') as f:
self.config.write(f, space_around_delimiters=False)
def read_manifests(self):
if not self.programdata_path:
raise ValueError('EGS ProgramData path is not set')
for f in os.listdir(self.programdata_path):
if f.endswith('.item'):
data = json.load(open(os.path.join(self.programdata_path, f)))
self.manifests[data['AppName']] = data
def get_manifests(self) -> List[EGLManifest]:
if not self.manifests:
self.read_manifests()
return [EGLManifest.from_json(m) for m in self.manifests.values()]
def get_manifest(self, app_name) -> EGLManifest:
if not self.manifests:
self.read_manifests()
if app_name in self.manifests:
return EGLManifest.from_json(self.manifests[app_name])
else:
raise ValueError('Cannot find manifest')
def set_manifest(self, manifest: EGLManifest):
if not self.programdata_path:
raise ValueError('EGS ProgramData path is not set')
manifest_data = manifest.to_json()
self.manifests[manifest.app_name] = manifest_data
with open(os.path.join(self.programdata_path, f'{manifest.installation_guid}.item'), 'w') as f:
json.dump(manifest_data, f, indent=4, sort_keys=True)
def delete_manifest(self, app_name):
if not self.manifests:
self.read_manifests()
if app_name not in self.manifests:
raise ValueError('AppName is not in manifests!')
manifest = EGLManifest.from_json(self.manifests.pop(app_name))
os.remove(os.path.join(self.programdata_path, f'{manifest.installation_guid}.item'))

268
legendary/lfs/lgndry.py Normal file
View file

@ -0,0 +1,268 @@
# coding: utf-8
import json
import os
import configparser
import logging
from pathlib import Path
from legendary.models.game import *
from legendary.utils.lfs import clean_filename
class LGDLFS:
def __init__(self):
self.log = logging.getLogger('LGDLFS')
if config_path := os.environ.get('XDG_CONFIG_HOME'):
self.path = os.path.join(config_path, 'legendary')
else:
self.path = os.path.expanduser('~/.config/legendary')
# EGS user info
self._user_data = None
# EGS entitlements
self._entitlements = None
# EGS asset data
self._assets = None
# EGS metadata
self._game_metadata = dict()
# Config with game specific settings (e.g. start parameters, env variables)
self.config = configparser.ConfigParser(comment_prefixes='/', allow_no_value=True)
self.config.optionxform = str
# ensure folders exist.
for f in ['', 'manifests', 'metadata', 'tmp']:
if not os.path.exists(os.path.join(self.path, f)):
os.makedirs(os.path.join(self.path, f))
# if "old" folder exists migrate files and remove it
if os.path.exists(os.path.join(self.path, 'manifests', 'old')):
self.log.info('Migrating manifest files from old folders to new, please wait...')
# remove unversioned manifest files
for _f in os.listdir(os.path.join(self.path, 'manifests')):
if '.manifest' not in _f:
continue
if '_' not in _f or (_f.startswith('UE_') and _f.count('_') < 2):
self.log.debug(f'Deleting "{_f}" ...')
os.remove(os.path.join(self.path, 'manifests', _f))
# move files from "old" to the base folder
for _f in os.listdir(os.path.join(self.path, 'manifests', 'old')):
try:
self.log.debug(f'Renaming "{_f}"')
os.rename(os.path.join(self.path, 'manifests', 'old', _f),
os.path.join(self.path, 'manifests', _f))
except Exception as e:
self.log.warning(f'Renaming manifest file "{_f}" failed: {e!r}')
# remove "old" folder
try:
os.removedirs(os.path.join(self.path, 'manifests', 'old'))
except Exception as e:
self.log.warning(f'Removing "{os.path.join(self.path, "manifests", "old")}" folder failed: '
f'{e!r}, please remove manually')
# try loading config
self.config.read(os.path.join(self.path, 'config.ini'))
# make sure "Legendary" section exists
if 'Legendary' not in self.config:
self.config['Legendary'] = dict()
try:
self._installed = json.load(open(os.path.join(self.path, 'installed.json')))
except Exception as e: # todo do not do this
self._installed = None
# load existing app metadata
for gm_file in os.listdir(os.path.join(self.path, 'metadata')):
try:
_meta = json.load(open(os.path.join(self.path, 'metadata', gm_file)))
self._game_metadata[_meta['app_name']] = _meta
except Exception as e:
self.log.debug(f'Loading game meta file "{gm_file}" failed: {e!r}')
@property
def userdata(self):
if self._user_data is not None:
return self._user_data
try:
self._user_data = json.load(open(os.path.join(self.path, 'user.json')))
return self._user_data
except Exception as e:
self.log.debug(f'Failed to load user data: {e!r}')
return None
@userdata.setter
def userdata(self, userdata):
if userdata is None:
raise ValueError('Userdata is none!')
self._user_data = userdata
json.dump(userdata, open(os.path.join(self.path, 'user.json'), 'w'),
indent=2, sort_keys=True)
def invalidate_userdata(self):
self._user_data = None
if os.path.exists(os.path.join(self.path, 'user.json')):
os.remove(os.path.join(self.path, 'user.json'))
@property
def entitlements(self):
if self._entitlements is not None:
return self._entitlements
try:
self._entitlements = json.load(open(os.path.join(self.path, 'entitlements.json')))
return self._entitlements
except Exception as e:
self.log.debug(f'Failed to load entitlements data: {e!r}')
return None
@entitlements.setter
def entitlements(self, entitlements):
if entitlements is None:
raise ValueError('Entitlements is none!')
self._entitlements = entitlements
json.dump(entitlements, open(os.path.join(self.path, 'entitlements.json'), 'w'),
indent=2, sort_keys=True)
@property
def assets(self):
if self._assets is None:
try:
self._assets = [GameAsset.from_json(a) for a in
json.load(open(os.path.join(self.path, 'assets.json')))]
except Exception as e:
self.log.debug(f'Failed to load assets data: {e!r}')
return None
return self._assets
@assets.setter
def assets(self, assets):
if assets is None:
raise ValueError('Assets is none!')
self._assets = assets
json.dump([a.__dict__ for a in self._assets],
open(os.path.join(self.path, 'assets.json'), 'w'),
indent=2, sort_keys=True)
def _get_manifest_filename(self, app_name, version):
fname = clean_filename(f'{app_name}_{version}')
return os.path.join(self.path, 'manifests', f'{fname}.manifest')
def load_manifest(self, app_name, version):
try:
return open(self._get_manifest_filename(app_name, version), 'rb').read()
except FileNotFoundError: # all other errors should propagate
return None
def save_manifest(self, app_name, manifest_data, version):
with open(self._get_manifest_filename(app_name, version), 'wb') as f:
f.write(manifest_data)
def get_game_meta(self, app_name):
_meta = self._game_metadata.get(app_name, None)
if _meta:
return Game.from_json(_meta)
return None
def set_game_meta(self, app_name, meta):
json_meta = meta.__dict__
self._game_metadata[app_name] = json_meta
meta_file = os.path.join(self.path, 'metadata', f'{app_name}.json')
json.dump(json_meta, open(meta_file, 'w'), indent=2, sort_keys=True)
def delete_game_meta(self, app_name):
if app_name in self._game_metadata:
del self._game_metadata[app_name]
meta_file = os.path.join(self.path, 'metadata', f'{app_name}.json')
if os.path.exists(meta_file):
os.remove(meta_file)
else:
raise ValueError(f'Game {app_name} does not exist in metadata DB!')
def get_tmp_path(self):
return os.path.join(self.path, 'tmp')
def clean_tmp_data(self):
for f in os.listdir(os.path.join(self.path, 'tmp')):
try:
os.remove(os.path.join(self.path, 'tmp', f))
except Exception as e:
self.log.warning(f'Failed to delete file "{f}": {e!r}')
def clean_metadata(self, app_names):
for f in os.listdir(os.path.join(self.path, 'metadata')):
app_name = f.rpartition('.')[0]
if app_name not in app_names:
try:
os.remove(os.path.join(self.path, 'metadata', f))
except Exception as e:
self.log.warning(f'Failed to delete file "{f}": {e!r}')
def clean_manifests(self, in_use):
in_use_files = set(f'{clean_filename(f"{app_name}_{version}")}.manifest' for app_name, version in in_use)
for f in os.listdir(os.path.join(self.path, 'manifests')):
if f not in in_use_files:
try:
os.remove(os.path.join(self.path, 'manifests', f))
except Exception as e:
self.log.warning(f'Failed to delete file "{f}": {e!r}')
def get_installed_game(self, app_name):
if self._installed is None:
try:
self._installed = json.load(open(os.path.join(self.path, 'installed.json')))
except Exception as e:
self.log.debug(f'Failed to load installed game data: {e!r}')
return None
game_json = self._installed.get(app_name, None)
if game_json:
return InstalledGame.from_json(game_json)
return None
def set_installed_game(self, app_name, install_info):
if self._installed is None:
self._installed = dict()
if app_name in self._installed:
self._installed[app_name].update(install_info.__dict__)
else:
self._installed[app_name] = install_info.__dict__
json.dump(self._installed, open(os.path.join(self.path, 'installed.json'), 'w'),
indent=2, sort_keys=True)
def remove_installed_game(self, app_name):
if self._installed is None:
self.log.warning('Trying to remove a game, but no installed games?!')
return
if app_name in self._installed:
del self._installed[app_name]
else:
self.log.warning('Trying to remove non-installed game:', app_name)
return
json.dump(self._installed, open(os.path.join(self.path, 'installed.json'), 'w'),
indent=2, sort_keys=True)
def get_installed_list(self):
if not self._installed:
return []
return [InstalledGame.from_json(i) for i in self._installed.values()]
def save_config(self):
with open(os.path.join(self.path, 'config.ini'), 'w') as cf:
self.config.write(cf)
def get_dir_size(self):
return sum(f.stat().st_size for f in Path(self.path).glob('**/*') if f.is_file())

View file

150
legendary/models/chunk.py Normal file
View file

@ -0,0 +1,150 @@
# coding: utf-8
import struct
import zlib
from hashlib import sha1
from io import BytesIO
from uuid import uuid4
from legendary.utils.rolling_hash import get_hash
# ToDo do some reworking to make this more memory efficient
class Chunk:
header_magic = 0xB1FE3AA2
def __init__(self):
self.header_version = 3
self.header_size = 0
self.compressed_size = 0
self.hash = 0
self.stored_as = 0
self.guid = struct.unpack('>IIII', uuid4().bytes)
# 0x1 = rolling hash, 0x2 = sha hash, 0x3 = both
self.hash_type = 0
self.sha_hash = None
self.uncompressed_size = 1024 * 1024
self._guid_str = ''
self._guid_num = 0
self._bio = None
self._data = None
@property
def data(self):
if self._data:
return self._data
if self.compressed:
self._data = zlib.decompress(self._bio.read())
else:
self._data = self._bio.read()
# close BytesIO with raw data since we no longer need it
self._bio.close()
self._bio = None
return self._data
@data.setter
def data(self, value: bytes):
if len(value) > 1024*1024:
raise ValueError('Provided data is too large (> 1 MiB)!')
# data is now uncompressed
if self.compressed:
self.stored_as ^= 0x1
# pad data to 1 MiB
if len(value) < 1024 * 1024:
value += b'\x00' * (1024 * 1024 - len(value))
# recalculate hashes
self.hash = get_hash(value)
self.sha_hash = sha1(value).digest()
self.hash_type = 0x3
self._data = value
@property
def guid_str(self):
if not self._guid_str:
self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid)
return self._guid_str
@property
def guid_num(self):
if not self._guid_num:
self._guid_num = self.guid[3] + (self.guid[2] << 32) + (self.guid[1] << 64) + (self.guid[0] << 96)
return self._guid_num
@property
def compressed(self):
return self.stored_as & 0x1
@classmethod
def read_buffer(cls, data):
_sio = BytesIO(data)
return cls.read(_sio)
@classmethod
def read(cls, bio):
head_start = bio.tell()
if struct.unpack('<I', bio.read(4))[0] != cls.header_magic:
raise ValueError('Chunk magic doesn\'t match!')
_chunk = cls()
_chunk._bio = bio
_chunk.header_version = struct.unpack('<I', bio.read(4))[0]
_chunk.header_size = struct.unpack('<I', bio.read(4))[0]
_chunk.compressed_size = struct.unpack('<I', bio.read(4))[0]
_chunk.guid = struct.unpack('<IIII', bio.read(16))
_chunk.hash = struct.unpack('<Q', bio.read(8))[0]
_chunk.stored_as = struct.unpack('B', bio.read(1))[0]
if _chunk.header_version >= 2:
_chunk.sha_hash = bio.read(20)
_chunk.hash_type = struct.unpack('B', bio.read(1))[0]
if _chunk.header_version >= 3:
_chunk.uncompressed_size = struct.unpack('<I', bio.read(4))[0]
if bio.tell() - head_start != _chunk.header_size:
raise ValueError('Did not read entire chunk header!')
return _chunk
def write(self, fp=None, compress=True):
if not fp:
bio = BytesIO()
else:
bio = fp
self.uncompressed_size = self.compressed_size = len(self.data)
if compress or self.compressed:
self._data = zlib.compress(self.data)
self.stored_as |= 0x1
self.compressed_size = len(self._data)
bio.write(struct.pack('<I', self.header_magic))
# we only serialize the latest version so version/size are hardcoded to 3/66
bio.write(struct.pack('<I', 3))
bio.write(struct.pack('<I', 66))
bio.write(struct.pack('<I', self.compressed_size))
bio.write(struct.pack('<IIII', *self.guid))
bio.write(struct.pack('<Q', self.hash))
bio.write(struct.pack('<B', self.stored_as))
# header version 2 stuff
bio.write(self.sha_hash)
bio.write(struct.pack('B', self.hash_type))
# header version 3 stuff
bio.write(struct.pack('<I', self.uncompressed_size))
# finally, add the data
bio.write(self._data)
if not fp:
return bio.getvalue()
else:
return bio.tell()

View file

@ -0,0 +1,174 @@
# coding: utf-8
class DownloaderTask:
def __init__(self, url=None, chunk_guid=None, shm=None, kill=False):
self.url = url
self.guid = chunk_guid
self.shm = shm
self.kill = kill
class DownloaderTaskResult:
def __init__(self, success, chunk_guid, shm, url, size=None,
compressed_size=None, time_delta=None):
self.success = success
self.shm = shm
self.size = size
self.compressed_size = compressed_size
self.guid = chunk_guid
self.time_delta = time_delta
self.url = url
class WriterTask:
"""
Writing task for FileWorker, including some metadata that is required.
"""
def __init__(self, filename, chunk_offset=0, chunk_size=0, chunk_guid=None, close=False,
shared_memory=None, cache_file='', old_file='', release_memory=False, rename=False,
empty=False, kill=False, delete=False, old_filename='', fopen=False, silent=False):
self.filename = filename
self.empty = empty
self.shm = shared_memory
self.chunk_offset = chunk_offset
self.chunk_size = chunk_size
self.chunk_guid = chunk_guid
self.release_memory = release_memory
# reading from a cached chunk instead of memory
self.cache_file = cache_file
self.old_file = old_file
self.open = fopen
self.close = close
self.delete = delete
self.rename = rename
self.old_filename = old_filename
self.silent = silent # disable logging
self.kill = kill # final task for worker (quit)
class WriterTaskResult:
def __init__(self, success, filename='', chunk_guid='',
release_memory=False, shm=None, size=0,
kill=False, closed=False, time_delta=None):
self.success = success
self.filename = filename
self.chunk_guid = chunk_guid
self.release_memory = release_memory
self.shm = shm
self.size = size
self.kill = kill
self.closed = closed
self.time_delta = time_delta
class UIUpdate:
"""
Status update object sent from the manager to the CLI/GUI to update status indicators
"""
def __init__(self, progress, runtime, estimated_time_left, processed_chunks, chunk_tasks,
total_downloaded, total_written, cache_usage, active_tasks, download_speed,
download_decompressed_speed, write_speed, read_speed, current_filename=''):
self.progress = progress
self.runtime = runtime
self.estimated_time_left = estimated_time_left
self.processed_chunks = processed_chunks
self.chunk_tasks = chunk_tasks
self.total_downloaded = total_downloaded
self.total_written = total_written
self.cache_usage = cache_usage
self.active_tasks = active_tasks
self.download_speed = download_speed
self.download_decompressed_speed = download_decompressed_speed
self.write_speed = write_speed
self.read_speed = read_speed
self.current_filename = current_filename
class SharedMemorySegment:
"""
Segment of the shared memory used for one Chunk
"""
def __init__(self, offset=0, end=1024 * 1024):
self.offset = offset
self.end = end
@property
def size(self):
return self.end - self.offset
class ChunkTask:
def __init__(self, chunk_guid, chunk_offset=0, chunk_size=0, cleanup=False, chunk_file=None):
"""
Download manager chunk task
:param chunk_guid: GUID of chunk
:param cleanup: whether or not this chunk can be removed from disk/memory after it has been written
:param chunk_offset: Offset into file or shared memory
:param chunk_size: Size to read from file or shared memory
:param chunk_file: Either cache or existing game file this chunk is read from if not using shared memory
"""
self.chunk_guid = chunk_guid
self.cleanup = cleanup
self.chunk_offset = chunk_offset
self.chunk_size = chunk_size
self.chunk_file = chunk_file
class FileTask:
def __init__(self, filename, delete=False, empty=False, fopen=False, close=False,
rename=False, temporary_filename=None, silent=False):
"""
Download manager Task for a file
:param filename: name of the file
:param delete: if this is a file to be deleted, if rename is true, delete filename before renaming
:param empty: if this is an empty file that just needs to be "touch"-ed (may not have chunk tasks)
:param temporary_filename: If rename is true: Filename to rename from.
"""
self.filename = filename
self.delete = delete
self.empty = empty
self.open = fopen
self.close = close
self.rename = rename
self.temporary_filename = temporary_filename
self.silent = silent
@property
def is_reusing(self):
return self.temporary_filename is not None
class AnalysisResult:
def __init__(self):
self.dl_size = 0
self.uncompressed_dl_size = 0
self.install_size = 0
self.reuse_size = 0
self.biggest_file_size = 0
self.unchanged_size = 0
self.biggest_chunk = 0
self.min_memory = 0
self.num_chunks = 0
self.num_chunks_cache = 0
self.num_files = 0
self.removed = 0
self.added = 0
self.changed = 0
self.unchanged = 0
self.manifest_comparison = None
class ConditionCheckResult:
"""Result object used in Core to identify problems that would prevent an installation from succeeding"""
def __init__(self, failures=None, warnings=None):
self.failures = failures
self.warnings = warnings

162
legendary/models/egl.py Normal file
View file

@ -0,0 +1,162 @@
from copy import deepcopy
from distutils.util import strtobool
from legendary.models.game import InstalledGame, Game
_template = {
'AppCategories': ['public', 'games', 'applications'],
'AppName': '',
'AppVersionString': '',
'BaseURLs': [],
'BuildLabel': '',
'CatalogItemId': '',
'CatalogNamespace': '',
'ChunkDbs': [],
'CompatibleApps': [],
'DisplayName': '',
'FormatVersion': 0,
'FullAppName': '',
'HostInstallationGuid': '',
'InstallComponents': [],
'InstallLocation': '',
'InstallSessionId': '',
'InstallSize': 0,
'InstallTags': [],
'InstallationGuid': '',
'LaunchCommand': '',
'LaunchExecutable': '',
'MainGameAppName': '',
'MainWindowProcessName': '',
'MandatoryAppFolderName': '',
'ManifestLocation': '',
'OwnershipToken': '',
'PrereqIds': [],
'ProcessNames': [],
'StagingLocation': '',
'TechnicalType': '',
'VaultThumbnailUrl': '',
'VaultTitleText': '',
'bCanRunOffline': True,
'bIsApplication': True,
'bIsExecutable': True,
'bIsIncompleteInstall': False,
'bIsManaged': False,
'bNeedsValidation': False,
'bRequiresAuth': True
}
class EGLManifest:
def __init__(self):
self.app_name = None
self.app_version_string = None
self.base_urls = None
self.build_label = None
self.catalog_item_id = None
self.namespace = None
self.display_name = None
self.install_location = None
self.install_size = None
self.install_tags = None
self.installation_guid = None
self.launch_command = None
self.executable = None
self.main_game_appname = None
self.app_folder_name = None
self.manifest_location = None
self.ownership_token = None
self.staging_location = None
self.can_run_offline = None
self.is_incomplete_install = None
self.needs_validation = None
self.remainder = dict()
@classmethod
def from_json(cls, json: dict):
json = deepcopy(json)
tmp = cls()
tmp.app_name = json.pop('AppName')
tmp.app_version_string = json.pop('AppVersionString', None)
tmp.base_urls = json.pop('BaseURLs', list())
tmp.build_label = json.pop('BuildLabel', '')
tmp.catalog_item_id = json.pop('CatalogItemId', '')
tmp.namespace = json.pop('CatalogNamespace', '')
tmp.display_name = json.pop('DisplayName', '')
tmp.install_location = json.pop('InstallLocation', '')
tmp.install_size = json.pop('InstallSize', 0)
tmp.install_tags = json.pop('InstallTags', [])
tmp.installation_guid = json.pop('InstallationGuid', '')
tmp.launch_command = json.pop('LaunchCommand', '')
tmp.executable = json.pop('LaunchExecutable', '')
tmp.main_game_appname = json.pop('MainGameAppName', '')
tmp.app_folder_name = json.pop('MandatoryAppFolderName', '')
tmp.manifest_location = json.pop('ManifestLocation', '')
tmp.ownership_token = strtobool(json.pop('OwnershipToken', 'False'))
tmp.staging_location = json.pop('StagingLocation', '')
tmp.can_run_offline = json.pop('bCanRunOffline', True)
tmp.is_incomplete_install = json.pop('bIsIncompleteInstall', False)
tmp.needs_validation = json.pop('bNeedsValidation', False)
tmp.remainder = json.copy()
return tmp
def to_json(self) -> dict:
out = _template.copy()
out.update(self.remainder)
out['AppName'] = self.app_name
out['AppVersionString'] = self.app_version_string
out['BaseURLs'] = self.base_urls
out['BuildLabel'] = self.build_label
out['CatalogItemId'] = self.catalog_item_id
out['CatalogNamespace'] = self.namespace
out['DisplayName'] = self.display_name
out['InstallLocation'] = self.install_location
out['InstallSize'] = self.install_size
out['InstallTags'] = self.install_tags
out['InstallationGuid'] = self.installation_guid
out['LaunchCommand'] = self.launch_command
out['LaunchExecutable'] = self.executable
out['MainGameAppName'] = self.main_game_appname
out['MandatoryAppFolderName'] = self.app_folder_name
out['ManifestLocation'] = self.manifest_location
out['OwnershipToken'] = str(self.ownership_token).lower()
out['StagingLocation'] = self.staging_location
out['bCanRunOffline'] = self.can_run_offline
out['bIsIncompleteInstall'] = self.is_incomplete_install
out['bNeedsValidation'] = self.needs_validation
return out
@classmethod
def from_lgd_game(cls, game: Game, igame: InstalledGame):
tmp = cls()
tmp.app_name = game.app_name
tmp.app_version_string = igame.version
tmp.base_urls = igame.base_urls
tmp.build_label = 'Live'
tmp.catalog_item_id = game.asset_info.catalog_item_id
tmp.namespace = game.asset_info.namespace
tmp.display_name = igame.title
tmp.install_location = igame.install_path
tmp.install_size = igame.install_size
tmp.install_tags = igame.install_tags
tmp.installation_guid = igame.egl_guid
tmp.launch_command = igame.launch_parameters
tmp.executable = igame.executable
tmp.main_game_appname = game.app_name # todo for DLC support this needs to be the base game
tmp.app_folder_name = game.metadata.get('customAttributes', {}).get('FolderName', {}).get('value', '')
tmp.manifest_location = igame.install_path + '/.egstore'
tmp.ownership_token = igame.requires_ot
tmp.staging_location = igame.install_path + '/.egstore/bps'
tmp.can_run_offline = igame.can_run_offline
tmp.is_incomplete_install = False
tmp.needs_validation = igame.needs_verification
return tmp
def to_lgd_igame(self) -> InstalledGame:
return InstalledGame(app_name=self.app_name, title=self.display_name, version=self.app_version_string,
base_urls=self.base_urls, install_path=self.install_location, executable=self.executable,
launch_parameters=self.launch_command, can_run_offline=self.can_run_offline,
requires_ot=self.ownership_token, is_dlc=False,
needs_verification=self.needs_validation, install_size=self.install_size,
egl_guid=self.installation_guid, install_tags=self.install_tags)

View file

@ -0,0 +1,12 @@
# coding: utf-8
# ToDo more custom exceptions where it makes sense
class CaptchaError(Exception):
"""Raised by core if direct login fails"""
pass
class InvalidCredentialsError(Exception):
pass

146
legendary/models/game.py Normal file
View file

@ -0,0 +1,146 @@
# coding: utf-8
from enum import Enum
class GameAsset:
def __init__(self):
self.app_name = ''
self.asset_id = ''
self.build_version = ''
self.catalog_item_id = ''
self.label_name = ''
self.namespace = ''
self.metadata = dict()
@classmethod
def from_egs_json(cls, json):
tmp = cls()
tmp.app_name = json.get('appName', '')
tmp.asset_id = json.get('assetId', '')
tmp.build_version = json.get('buildVersion', '')
tmp.catalog_item_id = json.get('catalogItemId', '')
tmp.label_name = json.get('labelName', '')
tmp.namespace = json.get('namespace', '')
tmp.metadata = json.get('metadata', {})
return tmp
@classmethod
def from_json(cls, json):
tmp = cls()
tmp.app_name = json.get('app_name', '')
tmp.asset_id = json.get('asset_id', '')
tmp.build_version = json.get('build_version', '')
tmp.catalog_item_id = json.get('catalog_item_id', '')
tmp.label_name = json.get('label_name', '')
tmp.namespace = json.get('namespace', '')
tmp.metadata = json.get('metadata', {})
return tmp
class Game:
def __init__(self, app_name='', app_title='', asset_info=None, app_version='', metadata=None):
self.metadata = dict() if metadata is None else metadata # store metadata from EGS
self.asset_info = asset_info if asset_info else GameAsset() # asset info from EGS
self.app_version = app_version
self.app_name = app_name
self.app_title = app_title
self.base_urls = [] # base urls for download, only really used when cached manifest is current
@property
def is_dlc(self):
return self.metadata and 'mainGameItem' in self.metadata
@property
def supports_cloud_saves(self):
return self.metadata and (self.metadata.get('customAttributes', {}).get('CloudSaveFolder') is not None)
@classmethod
def from_json(cls, json):
tmp = cls()
tmp.metadata = json.get('metadata', dict())
tmp.asset_info = GameAsset.from_json(json.get('asset_info', dict()))
tmp.app_name = json.get('app_name', 'undefined')
tmp.app_title = json.get('app_title', 'undefined')
tmp.app_version = json.get('app_version', 'undefined')
tmp.base_urls = json.get('base_urls', list())
return tmp
@property
def __dict__(self):
"""This is just here so asset_info gets turned into a dict as well"""
return dict(metadata=self.metadata, asset_info=self.asset_info.__dict__,
app_name=self.app_name, app_title=self.app_title,
app_version=self.app_version, base_urls=self.base_urls)
class InstalledGame:
def __init__(self, app_name='', title='', version='', manifest_path='', base_urls=None,
install_path='', executable='', launch_parameters='', prereq_info=None,
can_run_offline=False, requires_ot=False, is_dlc=False, save_path=None,
needs_verification=False, install_size=0, egl_guid='', install_tags=None):
self.app_name = app_name
self.title = title
self.version = version
self.manifest_path = manifest_path
self.base_urls = list() if not base_urls else base_urls
self.install_path = install_path
self.executable = executable
self.launch_parameters = launch_parameters
self.prereq_info = prereq_info
self.can_run_offline = can_run_offline
self.requires_ot = requires_ot
self.is_dlc = is_dlc
self.save_path = save_path
self.needs_verification = needs_verification
self.install_size = install_size
self.egl_guid = egl_guid
self.install_tags = install_tags if install_tags else []
@classmethod
def from_json(cls, json):
tmp = cls()
tmp.app_name = json.get('app_name', '')
tmp.version = json.get('version', '')
tmp.title = json.get('title', '')
tmp.manifest_path = json.get('manifest_path', '')
tmp.base_urls = json.get('base_urls', list())
tmp.install_path = json.get('install_path', '')
tmp.executable = json.get('executable', '')
tmp.launch_parameters = json.get('launch_parameters', '')
tmp.prereq_info = json.get('prereq_info', None)
tmp.can_run_offline = json.get('can_run_offline', False)
tmp.requires_ot = json.get('requires_ot', False)
tmp.is_dlc = json.get('is_dlc', False)
tmp.save_path = json.get('save_path', None)
tmp.needs_verification = json.get('needs_verification', False) is True
tmp.install_size = json.get('install_size', 0)
tmp.egl_guid = json.get('egl_guid', '')
tmp.install_tags = json.get('install_tags', [])
return tmp
class SaveGameFile:
def __init__(self, app_name='', filename='', manifest='', datetime=None):
self.app_name = app_name
self.filename = filename
self.manifest_name = manifest
self.datetime = datetime
class SaveGameStatus(Enum):
LOCAL_NEWER = 0
REMOTE_NEWER = 1
SAME_AGE = 2
NO_SAVE = 3
class VerifyResult(Enum):
HASH_MATCH = 0
HASH_MISMATCH = 1
FILE_MISSING = 2
OTHER_ERROR = 3

View file

@ -0,0 +1,178 @@
# coding: utf-8
import json
import struct
from copy import deepcopy
from legendary.models.manifest import (
Manifest, ManifestMeta, CDL, ChunkPart, ChunkInfo, FML, FileManifest, CustomFields
)
def blob_to_num(in_str):
"""
The JSON manifest use a rather strange format for storing numbers.
It's essentially %03d for each char concatenated to a string.
...instead of just putting the fucking number in the JSON...
Also it's still little endian so we have to bitshift it.
"""
num = 0
shift = 0
for i in range(0, len(in_str), 3):
num += (int(in_str[i:i + 3]) << shift)
shift += 8
return num
def guid_from_json(in_str):
return struct.unpack('>IIII', bytes.fromhex(in_str))
class JSONManifest(Manifest):
"""
Manifest-compatible reader for JSON based manifests
"""
def __init__(self):
super().__init__()
self.json_data = None
@classmethod
def read_all(cls, manifest):
_m = cls.read(manifest)
_tmp = deepcopy(_m.json_data)
_m.meta = JSONManifestMeta.read(_tmp)
_m.chunk_data_list = JSONCDL.read(_tmp, manifest_version=_m.version)
_m.file_manifest_list = JSONFML.read(_tmp)
_m.custom_fields = CustomFields()
_m.custom_fields._dict = _tmp.pop('CustomFields', dict())
if _tmp.keys():
print(f'Did not read JSON keys: {_tmp.keys()}!')
# clear raw data after manifest has been loaded
_m.data = b''
_m.json_data = None
return _m
@classmethod
def read(cls, manifest):
_manifest = cls()
_manifest.data = manifest
_manifest.json_data = json.loads(manifest.decode('utf-8'))
_manifest.stored_as = 0 # never compressed
_manifest.version = blob_to_num(_manifest.json_data.get('ManifestFileVersion', '013000000000'))
return _manifest
def write(self, *args, **kwargs):
# The version here only matters for the manifest header,
# the feature level in meta determines chunk folders etc.
# So all that's required for successful serialization is
# setting it to something high enough to be a binary manifest
self.version = 18
return super().write(*args, **kwargs)
class JSONManifestMeta(ManifestMeta):
def __init__(self):
super().__init__()
@classmethod
def read(cls, json_data):
_meta = cls()
_meta.feature_level = blob_to_num(json_data.pop('ManifestFileVersion', '013000000000'))
_meta.is_file_data = json_data.pop('bIsFileData', False)
_meta.app_id = blob_to_num(json_data.pop('AppID', '000000000000'))
_meta.app_name = json_data.pop('AppNameString', '')
_meta.build_version = json_data.pop('BuildVersionString', '')
_meta.launch_exe = json_data.pop('LaunchExeString', '')
_meta.launch_command = json_data.pop('LaunchCommand', '')
_meta.prereq_ids = json_data.pop('PrereqIds', list())
_meta.prereq_name = json_data.pop('PrereqName', '')
_meta.prereq_path = json_data.pop('PrereqPath', '')
_meta.prereq_args = json_data.pop('PrereqArgs', '')
return _meta
class JSONCDL(CDL):
def __init__(self):
super().__init__()
@classmethod
def read(cls, json_data, manifest_version=13):
_cdl = cls()
_cdl._manifest_version = manifest_version
_cdl.count = len(json_data['ChunkFilesizeList'])
cfl = json_data.pop('ChunkFilesizeList')
chl = json_data.pop('ChunkHashList')
csl = json_data.pop('ChunkShaList')
dgl = json_data.pop('DataGroupList')
_guids = list(cfl.keys())
for guid in _guids:
_ci = ChunkInfo(manifest_version=manifest_version)
_ci.guid = guid_from_json(guid)
_ci.file_size = blob_to_num(cfl.pop(guid))
_ci.hash = blob_to_num(chl.pop(guid))
_ci.sha_hash = bytes.fromhex(csl.pop(guid))
_ci.group_num = blob_to_num(dgl.pop(guid))
_ci.window_size = 1024*1024
_cdl.elements.append(_ci)
for _dc in (cfl, chl, csl, dgl):
if _dc:
print(f'Non-consumed CDL stuff: {_dc}')
return _cdl
class JSONFML(FML):
def __init__(self):
super().__init__()
@classmethod
def read(cls, json_data):
_fml = cls()
_fml.count = len(json_data['FileManifestList'])
for _fmj in json_data.pop('FileManifestList'):
_fm = FileManifest()
_fm.filename = _fmj.pop('Filename', '')
_fm.hash = blob_to_num(_fmj.pop('FileHash')).to_bytes(160//8, 'little')
_fm.flags |= int(_fmj.pop('bIsReadOnly', False))
_fm.flags |= int(_fmj.pop('bIsCompressed', False)) << 1
_fm.flags |= int(_fmj.pop('bIsUnixExecutable', False)) << 2
_fm.file_size = 0
_fm.chunk_parts = []
_fm.install_tags = _fmj.pop('InstallTags', list())
_offset = 0
for _cpj in _fmj.pop('FileChunkParts'):
_cp = ChunkPart()
_cp.guid = guid_from_json(_cpj.pop('Guid'))
_cp.offset = blob_to_num(_cpj.pop('Offset'))
_cp.size = blob_to_num(_cpj.pop('Size'))
_cp.file_offset = _offset
_fm.file_size += _cp.size
if _cpj:
print(f'Non-read ChunkPart keys: {_cpj.keys()}')
_fm.chunk_parts.append(_cp)
_offset += _cp.size
if _fmj:
print(f'Non-read FileManifest keys: {_fmj.keys()}')
_fml.elements.append(_fm)
return _fml

View file

@ -0,0 +1,739 @@
# coding: utf-8
import hashlib
import logging
import struct
import zlib
from base64 import b64encode
from io import BytesIO
logger = logging.getLogger('Manifest')
def read_fstring(bio):
length = struct.unpack('<i', bio.read(4))[0]
# if the length is negative the string is UTF-16 encoded, this was a pain to figure out.
if length < 0:
# utf-16 chars are 2 bytes wide but the length is # of characters, not bytes
# todo actually make sure utf-16 characters can't be longer than 2 bytes
length *= -2
s = bio.read(length - 2).decode('utf-16')
bio.seek(2, 1) # utf-16 strings have two byte null terminators
elif length > 0:
s = bio.read(length - 1).decode('ascii')
bio.seek(1, 1) # skip string null terminator
else: # empty string, no terminators or anything
s = ''
return s
def write_fstring(bio, string):
if not string:
bio.write(struct.pack('<i', 0))
return
try:
s = string.encode('ascii')
bio.write(struct.pack('<i', len(string) + 1))
bio.write(s)
bio.write(b'\x00')
except UnicodeEncodeError:
s = string.encode('utf-16le')
bio.write(struct.pack('<i', -(len(string) + 1)))
bio.write(s)
bio.write(b'\x00\x00')
def get_chunk_dir(version):
# The lowest version I've ever seen was 12 (Unreal Tournament), but for completeness sake leave all of them in
if version >= 15:
return 'ChunksV4'
elif version >= 6:
return 'ChunksV3'
elif version >= 3:
return 'ChunksV2'
else:
return 'Chunks'
class Manifest:
header_magic = 0x44BEC00C
def __init__(self):
self.header_size = 41
self.size_compressed = 0
self.size_uncompressed = 0
self.sha_hash = ''
self.stored_as = 0
self.version = 18
self.data = b''
# remainder
self.meta = None
self.chunk_data_list = None
self.file_manifest_list = None
self.custom_fields = None
@property
def compressed(self):
return self.stored_as & 0x1
@classmethod
def read_all(cls, data):
_m = cls.read(data)
_tmp = BytesIO(_m.data)
_m.meta = ManifestMeta.read(_tmp)
_m.chunk_data_list = CDL.read(_tmp, _m.meta.feature_level)
_m.file_manifest_list = FML.read(_tmp)
_m.custom_fields = CustomFields.read(_tmp)
unhandled_data = _tmp.read()
if unhandled_data:
logger.warning(f'Did not read {len(unhandled_data)} remaining bytes in manifest! '
f'This may not be a problem.')
# Throw this away since the raw data is no longer needed
_tmp.close()
del _tmp
_m.data = b''
return _m
@classmethod
def read(cls, data):
bio = BytesIO(data)
if struct.unpack('<I', bio.read(4))[0] != cls.header_magic:
raise ValueError('No header magic!')
_manifest = cls()
_manifest.header_size = struct.unpack('<I', bio.read(4))[0]
_manifest.size_uncompressed = struct.unpack('<I', bio.read(4))[0]
_manifest.size_compressed = struct.unpack('<I', bio.read(4))[0]
_manifest.sha_hash = bio.read(20)
_manifest.stored_as = struct.unpack('B', bio.read(1))[0]
_manifest.version = struct.unpack('<I', bio.read(4))[0]
if bio.tell() != _manifest.header_size:
logger.fatal(f'Did not read entire header {bio.tell()} != {_manifest.header_size}! '
f'Header version: {_manifest.version}, please report this on '
f'GitHub along with a sample of the problematic manifest!')
raise ValueError('Did not read complete manifest header!')
data = bio.read()
if _manifest.compressed:
_manifest.data = zlib.decompress(data)
dec_hash = hashlib.sha1(_manifest.data).hexdigest()
if dec_hash != _manifest.sha_hash.hex():
raise ValueError('Hash does not match!')
else:
_manifest.data = data
return _manifest
def write(self, fp=None, compress=True):
body_bio = BytesIO()
self.meta.write(body_bio)
self.chunk_data_list.write(body_bio)
self.file_manifest_list.write(body_bio)
self.custom_fields.write(body_bio)
self.data = body_bio.getvalue()
self.size_uncompressed = self.size_compressed = len(self.data)
self.sha_hash = hashlib.sha1(self.data).digest()
if self.compressed or compress:
self.stored_as |= 0x1
self.data = zlib.compress(self.data)
self.size_compressed = len(self.data)
if not fp:
bio = BytesIO()
else:
bio = fp
bio.write(struct.pack('<I', self.header_magic))
bio.write(struct.pack('<I', self.header_size))
bio.write(struct.pack('<I', self.size_uncompressed))
bio.write(struct.pack('<I', self.size_compressed))
bio.write(self.sha_hash)
bio.write(struct.pack('B', self.stored_as))
bio.write(struct.pack('<I', self.version))
bio.write(self.data)
if not fp:
return bio.getvalue()
else:
return bio.tell()
class ManifestMeta:
def __init__(self):
self.meta_size = 0
self.data_version = 0
self.feature_level = 18
self.is_file_data = False
self.app_id = 0
self.app_name = ''
self.build_version = ''
self.launch_exe = ''
self.launch_command = ''
self.prereq_ids = []
self.prereq_name = ''
self.prereq_path = ''
self.prereq_args = ''
# this build id is used for something called "delta file" which I guess I'll have to implement eventually
self._build_id = ''
@property
def build_id(self):
if self._build_id:
return self._build_id
# this took a while to figure out and get right and I'm still not sure if it'll work for all games :x
s = hashlib.sha1()
s.update(struct.pack('<I', self.app_id))
s.update(self.app_name.encode('utf-8'))
s.update(self.build_version.encode('utf-8'))
s.update(self.launch_exe.encode('utf-8'))
s.update(self.launch_command.encode('utf-8'))
self._build_id = b64encode(s.digest()).decode('ascii').replace('+', '-').replace('/', '_').replace('=', '')
return self._build_id
@classmethod
def read(cls, bio):
_meta = cls()
_meta.meta_size = struct.unpack('<I', bio.read(4))[0]
_meta.data_version = struct.unpack('B', bio.read(1))[0]
# Usually same as manifest version, but can be different
# e.g. if JSON manifest has been converted to binary manifest.
_meta.feature_level = struct.unpack('<I', bio.read(4))[0]
# As far as I can tell this was used for very old manifests that didn't use chunks at all
_meta.is_file_data = struct.unpack('B', bio.read(1))[0] == 1
# 0 for most apps, generally not used
_meta.app_id = struct.unpack('<I', bio.read(4))[0]
_meta.app_name = read_fstring(bio)
_meta.build_version = read_fstring(bio)
_meta.launch_exe = read_fstring(bio)
_meta.launch_command = read_fstring(bio)
# This is a list though I've never seen more than one entry
entries = struct.unpack('<I', bio.read(4))[0]
for i in range(entries):
_meta.prereq_ids.append(read_fstring(bio))
_meta.prereq_name = read_fstring(bio)
_meta.prereq_path = read_fstring(bio)
_meta.prereq_args = read_fstring(bio)
# apparently there's a newer version that actually stores *a* build id.
if _meta.data_version > 0:
_meta._build_id = read_fstring(bio)
if bio.tell() != _meta.meta_size:
raise ValueError('Did not read entire meta!')
return _meta
def write(self, bio):
meta_start = bio.tell()
bio.write(struct.pack('<I', 0)) # placeholder size
bio.write(struct.pack('B', self.data_version))
bio.write(struct.pack('<I', self.feature_level))
bio.write(struct.pack('B', self.is_file_data))
bio.write(struct.pack('<I', self.app_id))
write_fstring(bio, self.app_name)
write_fstring(bio, self.build_version)
write_fstring(bio, self.launch_exe)
write_fstring(bio, self.launch_command)
bio.write(struct.pack('<I', len(self.prereq_ids)))
for preqre_id in self.prereq_ids:
write_fstring(bio, preqre_id)
write_fstring(bio, self.prereq_name)
write_fstring(bio, self.prereq_path)
write_fstring(bio, self.prereq_args)
if self.data_version > 0:
write_fstring(bio, self.build_id)
meta_end = bio.tell()
bio.seek(meta_start)
bio.write(struct.pack('<I', meta_end - meta_start))
bio.seek(meta_end)
class CDL:
def __init__(self):
self.version = 0
self.size = 0
self.count = 0
self.elements = []
self._manifest_version = 18
self._guid_map = None
self._guid_int_map = None
self._path_map = None
def get_chunk_by_path(self, path):
if not self._path_map:
self._path_map = dict()
for index, chunk in enumerate(self.elements):
self._path_map[chunk.path] = index
index = self._path_map.get(path, None)
if index is None:
raise ValueError(f'Invalid path! "{path}"')
return self.elements[index]
def get_chunk_by_guid(self, guid):
"""
Get chunk by GUID string or number, creates index of chunks on first call
Integer GUIDs are usually faster and require less memory, use those when possible.
:param guid:
:return:
"""
if isinstance(guid, int):
return self.get_chunk_by_guid_num(guid)
else:
return self.get_chunk_by_guid_str(guid)
def get_chunk_by_guid_str(self, guid):
if not self._guid_map:
self._guid_map = dict()
for index, chunk in enumerate(self.elements):
self._guid_map[chunk.guid_str] = index
index = self._guid_map.get(guid.lower(), None)
if index is None:
raise ValueError(f'Invalid GUID! {guid}')
return self.elements[index]
def get_chunk_by_guid_num(self, guid_int):
if not self._guid_int_map:
self._guid_int_map = dict()
for index, chunk in enumerate(self.elements):
self._guid_int_map[chunk.guid_num] = index
index = self._guid_int_map.get(guid_int, None)
if index is None:
raise ValueError(f'Invalid GUID! {hex(guid_int)}')
return self.elements[index]
@classmethod
def read(cls, bio, manifest_version=18):
cdl_start = bio.tell()
_cdl = cls()
_cdl._manifest_version = manifest_version
_cdl.size = struct.unpack('<I', bio.read(4))[0]
_cdl.version = struct.unpack('B', bio.read(1))[0]
_cdl.count = struct.unpack('<I', bio.read(4))[0]
# the way this data is stored is rather odd, maybe there's a nicer way to write this...
for i in range(_cdl.count):
_cdl.elements.append(ChunkInfo(manifest_version=manifest_version))
# guid, doesn't seem to be a standard like UUID but is fairly straightfoward, 4 bytes, 128 bit.
for chunk in _cdl.elements:
chunk.guid = struct.unpack('<IIII', bio.read(16))
# hash is a 64 bit integer, no idea how it's calculated but we don't need to know that.
for chunk in _cdl.elements:
chunk.hash = struct.unpack('<Q', bio.read(8))[0]
# sha1 hash
for chunk in _cdl.elements:
chunk.sha_hash = bio.read(20)
# group number, seems to be part of the download path
for chunk in _cdl.elements:
chunk.group_num = struct.unpack('B', bio.read(1))[0]
# window size is the uncompressed size
for chunk in _cdl.elements:
chunk.window_size = struct.unpack('<I', bio.read(4))[0]
# file size is the compressed size that will need to be downloaded
for chunk in _cdl.elements:
chunk.file_size = struct.unpack('<q', bio.read(8))[0]
if bio.tell() - cdl_start != _cdl.size:
raise ValueError('Did not read entire chunk data list!')
return _cdl
def write(self, bio):
cdl_start = bio.tell()
bio.write(struct.pack('<I', 0)) # placeholder size
bio.write(struct.pack('B', self.version))
bio.write(struct.pack('<I', len(self.elements)))
for chunk in self.elements:
bio.write(struct.pack('<IIII', *chunk.guid))
for chunk in self.elements:
bio.write(struct.pack('<Q', chunk.hash))
for chunk in self.elements:
bio.write(chunk.sha_hash)
for chunk in self.elements:
bio.write(struct.pack('B', chunk.group_num))
for chunk in self.elements:
bio.write(struct.pack('<I', chunk.window_size))
for chunk in self.elements:
bio.write(struct.pack('<q', chunk.file_size))
cdl_end = bio.tell()
bio.seek(cdl_start)
bio.write(struct.pack('<I', cdl_end - cdl_start))
bio.seek(cdl_end)
class ChunkInfo:
def __init__(self, manifest_version=18):
self.guid = None
self.hash = 0
self.sha_hash = b''
self.window_size = 0
self.file_size = 0
self._manifest_version = manifest_version
# caches for things that are "expensive" to compute
self._group_num = None
self._guid_str = None
self._guid_num = None
def __repr__(self):
return '<ChunkInfo (guid={}, hash={}, sha_hash={}, group_num={}, window_size={}, file_size={})>'.format(
self.guid_str, self.hash, self.sha_hash.hex(), self.group_num, self.window_size, self.file_size
)
@property
def guid_str(self):
if not self._guid_str:
self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid)
return self._guid_str
@property
def guid_num(self):
if not self._guid_num:
self._guid_num = self.guid[3] + (self.guid[2] << 32) + (self.guid[1] << 64) + (self.guid[0] << 96)
return self._guid_num
@property
def group_num(self):
if self._guid_num is not None:
return self._group_num
self._group_num = (zlib.crc32(
struct.pack('<I', self.guid[0]) +
struct.pack('<I', self.guid[1]) +
struct.pack('<I', self.guid[2]) +
struct.pack('<I', self.guid[3])
) & 0xffffffff) % 100
return self._group_num
@group_num.setter
def group_num(self, value):
self._group_num = value
@property
def path(self):
return '{}/{:02d}/{:016X}_{}.chunk'.format(
get_chunk_dir(self._manifest_version), self.group_num,
self.hash, ''.join('{:08X}'.format(g) for g in self.guid))
class FML:
def __init__(self):
self.version = 0
self.size = 0
self.count = 0
self.elements = []
self._path_map = dict()
def get_file_by_path(self, path):
if not self._path_map:
self._path_map = dict()
for index, fm in enumerate(self.elements):
self._path_map[fm.filename] = index
index = self._path_map.get(path, None)
if index is None:
raise ValueError(f'Invalid path! {path}')
return self.elements[index]
@classmethod
def read(cls, bio):
fml_start = bio.tell()
_fml = cls()
_fml.size = struct.unpack('<I', bio.read(4))[0]
_fml.version = struct.unpack('B', bio.read(1))[0]
_fml.count = struct.unpack('<I', bio.read(4))[0]
for i in range(_fml.count):
_fml.elements.append(FileManifest())
for fm in _fml.elements:
fm.filename = read_fstring(bio)
# never seen this used in any of the manifests I checked but can't wait for something to break because of it
for fm in _fml.elements:
fm.symlink_target = read_fstring(bio)
# For files this is actually the SHA1 instead of whatever it is for chunks...
for fm in _fml.elements:
fm.hash = bio.read(20)
# Flags, the only one I've seen is for executables
for fm in _fml.elements:
fm.flags = struct.unpack('B', bio.read(1))[0]
# install tags, no idea what they do, I've only seen them in the Fortnite manifest
for fm in _fml.elements:
_elem = struct.unpack('<I', bio.read(4))[0]
for i in range(_elem):
fm.install_tags.append(read_fstring(bio))
# Each file is made up of "Chunk Parts" that can be spread across the "chunk stream"
for fm in _fml.elements:
_elem = struct.unpack('<I', bio.read(4))[0]
_offset = 0
for i in range(_elem):
chunkp = ChunkPart()
_start = bio.tell()
_size = struct.unpack('<I', bio.read(4))[0]
chunkp.guid = struct.unpack('<IIII', bio.read(16))
chunkp.offset = struct.unpack('<I', bio.read(4))[0]
chunkp.size = struct.unpack('<I', bio.read(4))[0]
chunkp.file_offset = _offset
fm.chunk_parts.append(chunkp)
_offset += chunkp.size
if (diff := (bio.tell() - _start - _size)) > 0:
logger.warning(f'Did not read {diff} bytes from chunk part!')
bio.seek(diff)
# we have to calculate the actual file size ourselves
for fm in _fml.elements:
fm.file_size = sum(c.size for c in fm.chunk_parts)
if bio.tell() - fml_start != _fml.size:
raise ValueError('Did not read entire chunk data list!')
return _fml
def write(self, bio):
fml_start = bio.tell()
bio.write(struct.pack('<I', 0)) # placeholder size
bio.write(struct.pack('B', self.version))
bio.write(struct.pack('<I', len(self.elements)))
for fm in self.elements:
write_fstring(bio, fm.filename)
for fm in self.elements:
write_fstring(bio, fm.symlink_target)
for fm in self.elements:
bio.write(fm.hash)
for fm in self.elements:
bio.write(struct.pack('B', fm.flags))
for fm in self.elements:
bio.write(struct.pack('<I', len(fm.install_tags)))
for tag in fm.install_tags:
write_fstring(bio, tag)
# finally, write the chunk parts
for fm in self.elements:
bio.write(struct.pack('<I', len(fm.chunk_parts)))
for cp in fm.chunk_parts:
# size is always 28 bytes (4 size + 16 guid + 4 offset + 4 size)
bio.write(struct.pack('<I', 28))
bio.write(struct.pack('<IIII', *cp.guid))
bio.write(struct.pack('<I', cp.offset))
bio.write(struct.pack('<I', cp.size))
fml_end = bio.tell()
bio.seek(fml_start)
bio.write(struct.pack('<I', fml_end - fml_start))
bio.seek(fml_end)
class FileManifest:
def __init__(self):
self.filename = ''
self.symlink_target = ''
self.hash = b''
self.flags = 0
self.install_tags = []
self.chunk_parts = []
self.file_size = 0
@property
def read_only(self):
return self.flags & 0x1
@property
def compressed(self):
return self.flags & 0x2
@property
def executable(self):
return self.flags & 0x4
@property
def sha_hash(self):
return self.hash
def __repr__(self):
if len(self.chunk_parts) <= 20:
cp_repr = ', '.join(repr(c) for c in self.chunk_parts)
else:
_cp = [repr(cp) for cp in self.chunk_parts[:20]]
_cp.append('[...]')
cp_repr = ', '.join(_cp)
return '<FileManifest (filename="{}", symlink_target="{}", hash={}, flags={}, ' \
'install_tags=[{}], chunk_parts=[{}], file_size={})>'.format(
self.filename, self.symlink_target, self.hash.hex(), self.flags,
', '.join(self.install_tags), cp_repr, self.file_size
)
class ChunkPart:
def __init__(self, guid=None, offset=0, size=0, file_offset=0):
self.guid = guid
self.offset = offset
self.size = size
self.file_offset = file_offset
# caches for things that are "expensive" to compute
self._guid_str = None
self._guid_num = None
@property
def guid_str(self):
if not self._guid_str:
self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid)
return self._guid_str
@property
def guid_num(self):
if not self._guid_num:
self._guid_num = self.guid[3] + (self.guid[2] << 32) + (self.guid[1] << 64) + (self.guid[0] << 96)
return self._guid_num
def __repr__(self):
guid_readable = '-'.join('{:08x}'.format(g) for g in self.guid)
return '<ChunkPart (guid={}, offset={}, size={}, file_offset={})>'.format(
guid_readable, self.offset, self.size, self.file_offset)
class CustomFields:
def __init__(self):
self.size = 0
self.version = 0
self.count = 0
self._dict = dict()
def __getitem__(self, item):
return self._dict.get(item, None)
def __setitem__(self, key, value):
self._dict[key] = value
def __str__(self):
return str(self._dict)
def items(self):
return self._dict.items()
def keys(self):
return self._dict.keys()
def values(self):
return self._dict.values()
@classmethod
def read(cls, bio):
_cf = cls()
cf_start = bio.tell()
_cf.size = struct.unpack('<I', bio.read(4))[0]
_cf.version = struct.unpack('B', bio.read(1))[0]
_cf.count = struct.unpack('<I', bio.read(4))[0]
_keys = []
_values = []
for i in range(_cf.count):
_keys.append(read_fstring(bio))
for i in range(_cf.count):
_values.append(read_fstring(bio))
_cf._dict = dict(zip(_keys, _values))
if bio.tell() - cf_start != _cf.size:
raise ValueError('Did not read entire custom fields list!')
return _cf
def write(self, bio):
cf_start = bio.tell()
bio.write(struct.pack('<I', 0)) # placeholder size
bio.write(struct.pack('B', self.version))
bio.write(struct.pack('<I', len(self._dict)))
for key in self.keys():
write_fstring(bio, key)
for value in self.values():
write_fstring(bio, value)
cf_end = bio.tell()
# write proper size
bio.seek(cf_start)
bio.write(struct.pack('<I', cf_end - cf_start))
bio.seek(cf_end)
class ManifestComparison:
def __init__(self):
self.added = set()
self.removed = set()
self.changed = set()
self.unchanged = set()
@classmethod
def create(cls, manifest, old_manifest=None):
comp = cls()
if not old_manifest:
comp.added = set(fm.filename for fm in manifest.file_manifest_list.elements)
return comp
old_files = {fm.filename: fm.hash for fm in old_manifest.file_manifest_list.elements}
for fm in manifest.file_manifest_list.elements:
old_file_hash = old_files.pop(fm.filename, None)
if old_file_hash:
if fm.hash == old_file_hash:
comp.unchanged.add(fm.filename)
else:
comp.changed.add(fm.filename)
else:
comp.added.add(fm.filename)
# any remaining old files were removed
if old_files:
comp.removed = set(old_files.keys())
return comp

View file

44
legendary/utils/cli.py Normal file
View file

@ -0,0 +1,44 @@
from legendary.utils.selective_dl import games
def get_boolean_choice(prompt, default=True):
if default:
yn = 'Y/n'
else:
yn = 'y/N'
choice = input(f'{prompt} [{yn}]: ')
if not choice:
return default
elif choice[0].lower() == 'y':
return True
else:
return False
def sdl_prompt(app_name, title):
tags = ['']
if '__required' in games[app_name]:
tags.extend(games[app_name]['__required']['tags'])
print(f'You are about to install {title}, this game supports selective downloads.')
print('The following optional packs are available:')
for tag, info in games[app_name].items():
if tag == '__required':
continue
print(' *', tag, '-', info['name'])
print('Please enter a comma-separated list of optional packs to install (leave blank for defaults)')
examples = ','.join([g for g in games[app_name].keys() if g != '__required'][:2])
choices = input(f'Additional packs [e.g. {examples}]: ')
if not choices:
return tags
for c in choices.split(','):
c = c.strip()
if c in games[app_name]:
tags.extend(games[app_name][c]['tags'])
else:
print('Invalid tag:', c)
return tags

View file

@ -0,0 +1,29 @@
import argparse
# reference: https://gist.github.com/sampsyo/471779#gistcomment-2886157
class AliasedSubParsersAction(argparse._SubParsersAction):
class _AliasedPseudoAction(argparse.Action):
def __init__(self, name, aliases, help):
dest = name
if aliases:
dest += ' (%s)' % ','.join(aliases)
sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
sup.__init__(option_strings=[], dest=dest, help=help)
def add_parser(self, name, **kwargs):
aliases = kwargs.pop('aliases', [])
parser = super(AliasedSubParsersAction, self).add_parser(name, **kwargs)
# Make the aliases work.
for alias in aliases:
self._name_parser_map[alias] = parser
# Make the help text reflect them, first removing old help entry.
if 'help' in kwargs:
help = kwargs.pop('help')
self._choices_actions.pop()
pseudo_action = self._AliasedPseudoAction(name, aliases, help)
self._choices_actions.append(pseudo_action)
return parser

View file

@ -0,0 +1,19 @@
# coding: utf-8
# games where the download order optimizations are enabled by default
# a set() of versions can be specified, empty set means all versions.
_optimize_default = {
'wombat': {}, # world war z
'snapdragon': {}, # metro exodus
'honeycreeper': {}, # diabotical
'bcc75c246fe04e45b0c1f1c3fd52503a': { # pillars of eternity
'1.0.2' # problematic version
}
}
def is_opt_enabled(app_name, version):
if (versions := _optimize_default.get(app_name.lower())) is not None:
if version in versions or not versions:
return True
return False

118
legendary/utils/lfs.py Normal file
View file

@ -0,0 +1,118 @@
# coding: utf-8
import os
import shutil
import hashlib
import logging
from typing import List, Iterator
from legendary.models.game import VerifyResult
logger = logging.getLogger('LFS Utils')
def delete_folder(path: str, recursive=True) -> bool:
try:
logger.debug(f'Deleting "{path}", recursive={recursive}...')
if not recursive:
os.removedirs(path)
else:
shutil.rmtree(path)
except Exception as e:
logger.error(f'Failed deleting files with {e!r}')
return False
else:
return True
def delete_filelist(path: str, filenames: List[str],
delete_root_directory: bool = False,
silent: bool = False) -> bool:
dirs = set()
no_error = True
# delete all files that were installed
for filename in filenames:
_dir, _fn = os.path.split(filename)
if _dir:
dirs.add(_dir)
try:
os.remove(os.path.join(path, _dir, _fn))
except Exception as e:
if not silent:
logger.error(f'Failed deleting file {filename} with {e!r}')
no_error = False
# add intermediate directories that would have been missed otherwise
for _dir in sorted(dirs):
head, _ = os.path.split(_dir)
while head:
dirs.add(head)
head, _ = os.path.split(head)
# remove all directories
for _dir in sorted(dirs, key=len, reverse=True):
try:
os.rmdir(os.path.join(path, _dir))
except FileNotFoundError:
# directory has already been deleted, ignore that
continue
except Exception as e:
if not silent:
logger.error(f'Failed removing directory "{_dir}" with {e!r}')
no_error = False
if delete_root_directory:
try:
os.rmdir(path)
except Exception as e:
if not silent:
logger.error(f'Removing game directory failed with {e!r}')
return no_error
def validate_files(base_path: str, filelist: List[tuple], hash_type='sha1') -> Iterator[tuple]:
"""
Validates the files in filelist in path against the provided hashes
:param base_path: path in which the files are located
:param filelist: list of tuples in format (path, hash [hex])
:param hash_type: (optional) type of hash, default is sha1
:return: list of files that failed hash check
"""
if not filelist:
raise ValueError('No files to validate!')
if not os.path.exists(base_path):
raise OSError('Path does not exist')
for file_path, file_hash in filelist:
full_path = os.path.join(base_path, file_path)
# logger.debug(f'Checking "{file_path}"...')
if not os.path.exists(full_path):
yield VerifyResult.FILE_MISSING, file_path, ''
continue
try:
with open(full_path, 'rb') as f:
real_file_hash = hashlib.new(hash_type)
while chunk := f.read(1024*1024):
real_file_hash.update(chunk)
result_hash = real_file_hash.hexdigest()
if file_hash != result_hash:
yield VerifyResult.HASH_MISMATCH, file_path, result_hash
else:
yield VerifyResult.HASH_MATCH, file_path, result_hash
except Exception as e:
logger.fatal(f'Could not verify "{file_path}"; opening failed with: {e!r}')
yield VerifyResult.OTHER_ERROR, file_path, ''
def clean_filename(filename):
return ''.join(i for i in filename if i not in '<>:"/\\|?*')

View file

@ -0,0 +1,28 @@
from legendary.models.manifest import Manifest
def combine_manifests(base_manifest: Manifest, delta_manifest: Manifest):
added = set()
# overwrite file elements with the ones from the delta manifest
for idx, file_elem in enumerate(base_manifest.file_manifest_list.elements):
try:
delta_file = delta_manifest.file_manifest_list.get_file_by_path(file_elem.filename)
base_manifest.file_manifest_list.elements[idx] = delta_file
added.add(delta_file.filename)
except ValueError:
pass
# add other files that may be missing
for delta_file in delta_manifest.file_manifest_list.elements:
if delta_file.filename not in added:
base_manifest.file_manifest_list.elements.append(delta_file)
# update count and clear map
base_manifest.file_manifest_list.count = len(base_manifest.file_manifest_list.elements)
base_manifest.file_manifest_list._path_map = None
# add chunks from delta manifest to main manifest and again clear path caches
base_manifest.chunk_data_list.elements.extend(delta_manifest.chunk_data_list.elements)
base_manifest.chunk_data_list.count = len(base_manifest.chunk_data_list.elements)
base_manifest.chunk_data_list._guid_map = None
base_manifest.chunk_data_list._guid_int_map = None
base_manifest.chunk_data_list._path_map = None

View file

@ -0,0 +1,25 @@
# this is the rolling hash Epic uses, it appears to be a variation on CRC-64-ECMA
hash_poly = 0xC96C5795D7870F42
hash_table = []
def _init():
for i in range(256):
for _ in range(8):
if i & 1:
i >>= 1
i ^= hash_poly
else:
i >>= 1
hash_table.append(i)
def get_hash(data):
if not hash_table:
_init()
h = 0
for i in range(len(data)):
h = ((h << 1 | h >> 63) ^ hash_table[data[i]]) & 0xffffffffffffffff
return h

View file

@ -0,0 +1,166 @@
import logging
import os
from datetime import datetime
from fnmatch import fnmatch
from hashlib import sha1
from io import BytesIO
from tempfile import TemporaryFile
from legendary.models.chunk import Chunk
from legendary.models.manifest import \
Manifest, ManifestMeta, CDL, FML, CustomFields, FileManifest, ChunkPart, ChunkInfo
def _filename_matches(filename, patterns):
"""
Helper to determine if a filename matches the filter patterns
:param filename: name of the file
:param patterns: list of patterns to match against
:return:
"""
for pattern in patterns:
if pattern.endswith('/'):
# pat is a directory, check if path starts with it
if filename.startswith(pattern):
return True
elif fnmatch(filename, pattern):
return True
return False
class SaveGameHelper:
def __init__(self):
self.files = dict()
self.log = logging.getLogger('SGH')
def finalize_chunk(self, chunk: Chunk):
ci = ChunkInfo()
ci.guid = chunk.guid
ci.hash = chunk.hash
ci.sha_hash = chunk.sha_hash
# use a temporary file for uploading
_tmp_file = TemporaryFile()
self.files[ci.path] = _tmp_file
# write() returns file size and also sets the uncompressed size
ci.file_size = chunk.write(_tmp_file)
ci.window_size = chunk.uncompressed_size
_tmp_file.seek(0)
return ci
def package_savegame(self, input_folder: str, app_name: str = '',
epic_id: str = '', cloud_folder: str = '',
include_filter: list = None,
exclude_filter: list = None,
manifest_dt: datetime = None):
"""
:param input_folder: Folder to be packaged into chunks/manifest
:param app_name: App name for savegame being stored
:param epic_id: Epic account ID
:param cloud_folder: Folder the savegame resides in (based on game metadata)
:param include_filter: list of patterns for files to include (excludes all others)
:param exclude_filter: list of patterns for files to exclude (includes all others)
:param manifest_dt: datetime for the manifest name (optional)
:return:
"""
m = Manifest()
m.meta = ManifestMeta()
m.chunk_data_list = CDL()
m.file_manifest_list = FML()
m.custom_fields = CustomFields()
# create metadata for savegame
m.meta.app_name = f'{app_name}{epic_id}'
if not manifest_dt:
manifest_dt = datetime.utcnow()
m.meta.build_version = manifest_dt.strftime('%Y.%m.%d-%H.%M.%S')
m.custom_fields['CloudSaveFolder'] = cloud_folder
self.log.info(f'Packing savegame for "{app_name}", input folder: {input_folder}')
files = []
for _dir, _, _files in os.walk(input_folder):
for _file in _files:
_file_path = os.path.join(_dir, _file)
_file_path_rel = os.path.relpath(_file_path, input_folder).replace('\\', '/')
if include_filter and not _filename_matches(_file_path_rel, include_filter):
self.log.debug(f'Excluding "{_file_path_rel}" (does not match include filter)')
continue
elif exclude_filter and _filename_matches(_file_path_rel, exclude_filter):
self.log.debug(f'Excluding "{_file_path_rel}" (does match exclude filter)')
continue
files.append(_file_path)
if not files:
if exclude_filter or include_filter:
self.log.warning('No save files matching the specified filters have been found.')
return self.files
chunk_num = 0
cur_chunk = None
cur_buffer = None
for _file in sorted(files, key=str.casefold):
s = os.stat(_file)
f = FileManifest()
# get relative path for manifest
f.filename = os.path.relpath(_file, input_folder).replace('\\', '/')
self.log.debug(f'Processing file "{f.filename}"')
f.file_size = s.st_size
fhash = sha1()
with open(_file, 'rb') as cf:
while remaining := s.st_size - cf.tell():
if not cur_chunk: # create new chunk
cur_chunk = Chunk()
if cur_buffer:
cur_buffer.close()
cur_buffer = BytesIO()
chunk_num += 1
# create chunk part and write it to chunk buffer
cp = ChunkPart(guid=cur_chunk.guid, offset=cur_buffer.tell(),
size=min(remaining, 1024 * 1024 - cur_buffer.tell()),
file_offset=cf.tell())
_tmp = cf.read(cp.size)
if not _tmp:
self.log.warning(f'Got EOF for "{f.filename}" with {remaining} bytes remaining! '
f'File may have been corrupted/modified.')
break
cur_buffer.write(_tmp)
fhash.update(_tmp) # update sha1 hash with new data
f.chunk_parts.append(cp)
if cur_buffer.tell() >= 1024 * 1024:
cur_chunk.data = cur_buffer.getvalue()
ci = self.finalize_chunk(cur_chunk)
self.log.info(f'Chunk #{chunk_num} "{ci.path}" created')
# add chunk to CDL
m.chunk_data_list.elements.append(ci)
cur_chunk = None
f.hash = fhash.digest()
m.file_manifest_list.elements.append(f)
# write remaining chunk if it exists
if cur_chunk:
cur_chunk.data = cur_buffer.getvalue()
ci = self.finalize_chunk(cur_chunk)
self.log.info(f'Chunk #{chunk_num} "{ci.path}" created')
m.chunk_data_list.elements.append(ci)
cur_buffer.close()
# Finally write/serialize manifest into another temporary file
_m_filename = f'manifests/{m.meta.build_version}.manifest'
_tmp_file = TemporaryFile()
_m_size = m.write(_tmp_file)
_tmp_file.seek(0)
self.log.info(f'Manifest "{_m_filename}" written ({_m_size} bytes)')
self.files[_m_filename] = _tmp_file
# return dict with created files for uploading/whatever
return self.files

View file

@ -0,0 +1,38 @@
# This file contains definitions for selective downloading for supported games
# coding: utf-8
_cyberpunk_sdl = {
'de': {'tags': ['voice_de_de'], 'name': 'Deutsch'},
'es': {'tags': ['voice_es_es'], 'name': 'español (España)'},
'fr': {'tags': ['voice_fr_fr'], 'name': 'français'},
'it': {'tags': ['voice_it_it'], 'name': 'italiano'},
'ja': {'tags': ['voice_ja_jp'], 'name': '日本語'},
'ko': {'tags': ['voice_ko_kr'], 'name': '한국어'},
'pl': {'tags': ['voice_pl_pl'], 'name': 'polski'},
'pt': {'tags': ['voice_pt_br'], 'name': 'português brasileiro'},
'ru': {'tags': ['voice_ru_ru'], 'name': 'русский'},
'cn': {'tags': ['voice_zh_cn'], 'name': '中文(中国)'}
}
_fortnite_sdl = {
'__required': {'tags': ['chunk0', 'chunk10'], 'name': 'Fortnite Core'},
'stw': {'tags': ['chunk11', 'chunk11optional'], 'name': 'Fortnite Save the World'},
'hd_textures': {'tags': ['chunk10optional'], 'name': 'High Resolution Textures'},
'lang_de': {'tags': ['chunk2'], 'name': '(Language Pack) Deutsch'},
'lang_fr': {'tags': ['chunk5'], 'name': '(Language Pack) français'},
'lang_pl': {'tags': ['chunk7'], 'name': '(Language Pack) polski'},
'lang_ru': {'tags': ['chunk8'], 'name': '(Language Pack) русский'},
'lang_cn': {'tags': ['chunk9'], 'name': '(Language Pack) 中文(中国)'}
}
games = {
'Fortnite': _fortnite_sdl,
'Ginger': _cyberpunk_sdl
}
def get_sdl_appname(app_name):
for k in games.keys():
if app_name.startswith(k):
return k
return None

View file

@ -0,0 +1,17 @@
import os
import configparser
def read_registry(wine_pfx):
reg = configparser.ConfigParser(comment_prefixes=(';', '#', '/', 'WINE'), allow_no_value=True)
reg.optionxform = str
reg.read(os.path.join(wine_pfx, 'user.reg'))
return reg
def get_shell_folders(registry, wine_pfx):
folders = dict()
for k, v in registry['Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Explorer\\\\Shell Folders'].items():
path_cleaned = v.strip('"').strip().replace('\\\\', '/').replace('C:/', '')
folders[k.strip('"').strip()] = os.path.join(wine_pfx, 'drive_c', path_cleaned)
return folders