From 8f7db143a6ee5d68f1a3dcf3bb4f27e86147e935 Mon Sep 17 00:00:00 2001 From: derrod Date: Thu, 30 Apr 2020 09:43:17 +0200 Subject: [PATCH] [downloader] Greatly simplify download task creation This is a change to something that was so massively stupid and overcomplicated that I feel like I need to explain and justify myself: After figuring out the format for manifests and spending countless hours staring at IDA/Ghidra I kinda was sick of that, so I decided to figure out what to do with the manifest myself by playing around with it, which was also a lot more fun than looking through disassembly. When looking at the chunks and files it quickly became obvious that the way they're created is by concatenating all files into 1 MiB chunks that can be downloaded and reassmebled (mostly) sequentially. What I did not know was how the order of files in this "stream" was determined. In playing around with it I came up with the old method: essentially forming a chain of files, because each file's end generally pointed to the start of the next file. And it worked great! At least until now... Yesterday somebody alerted me to a game where this failed and it took me a bit to figure out. Essentially the chaining had failed because multiple files started at the same offset, but some of them would follow another chain that never went back to the chunk it started at, effectively skipping those files. This was rather annoying to deal with, I came up with a workaround but it wasn't pretty. So I decided to jump back into IDA/Ghidra and find out how Epic does it for real. Well it took me a while, but thanks to symbols (yay macOS!) and a decent decompiler in Ghidra even a noob like me was able to find it eventually. The answer is as simple as it can be: the files are sorted alphabetically (case-insensitive). So really all I ever had to do was to sort files alphabetically and then run through them to create the list of tasks. I feel so stupid. P.S.: I tested a few games and for the most part the resulting file processing order is identical between the old and the new method. The cases where it differs is when there's heavy de-duplication happening (e.g. Diabotical's small model files) but the runtime cache size remains the same so both methods are equally efficient, the old one just can't handle certain cases. --- legendary/downloader/manager.py | 163 +++++++++++++------------------- 1 file changed, 67 insertions(+), 96 deletions(-) diff --git a/legendary/downloader/manager.py b/legendary/downloader/manager.py index 1d32784..09c54cc 100644 --- a/legendary/downloader/manager.py +++ b/legendary/downloader/manager.py @@ -44,7 +44,6 @@ class DLManager(Process): # Analysis stuff self.analysis = None self.tasks = deque() - self.dl_cache_guids = set() # guids that should be cached self.chunks_to_dl = deque() self.chunk_data_list = None @@ -301,35 +300,16 @@ class DLManager(Process): analysis_res.unchanged = len(mc.unchanged) self.log.debug(f'{analysis_res.unchanged} unchanged files') + # count references to chunks for determining runtime cache size later references = Counter() - chunkstream_starts = list() - # Chunks can have multiple entire files in them, the deque for a guid contains all files that start - # in that chunk (sorted by offset) so we can quickly and easily find the next link in the chunkstream. - # A nice side effect is that we can use this to check whether or not we missed something in the process. - chunk_to_file_map = defaultdict(deque) - - # Find chunkstream starts and also count references to each chunk - # Note that this has to be sorted to ensure the file map will be in the correct order - self.log.debug('Looking for chunkstreams and counting references...') - for fm in sorted(manifest.file_manifest_list.elements, - key=lambda x: x.chunk_parts[0].offset if x.chunk_parts else 0): - if not fm.chunk_parts: - self.tasks.append(FileTask(fm.filename, empty=True)) - continue - + for fm in manifest.file_manifest_list.elements: + # chunks of unchanged files are not downloaded so we can skip them if fm.filename in mc.unchanged: analysis_res.unchanged += fm.file_size + continue - for index, cp in enumerate(fm.chunk_parts): - if index == 0: - chunk_to_file_map[cp.guid_num].append(fm) - if cp.offset == 0: - self.log.debug(f'Found chunk stream start: {fm.filename}, {fm.chunk_parts[0]}') - chunkstream_starts.append(fm.chunk_parts[0]) - - # do not add references in case the file is unchanged and we do not need to download it anyway - if fm.filename not in mc.unchanged: - references[cp.guid_num] += 1 + for cp in fm.chunk_parts: + references[cp.guid_num] += 1 # determine reusable chunks and prepare lookup table for reusable ones re_usable = defaultdict(dict) @@ -353,90 +333,81 @@ class DLManager(Process): analysis_res.reuse_size += cp.size last_cache_size = current_cache_size = 0 + # set to determine whether a file is currently cached or not cached = set() # Using this secondary set is orders of magnitude faster than checking the deque. chunks_in_dl_list = set() + # This is just used to count all unique guids that have been cached + dl_cache_guids = set() - # run through the chunkstreams and create the download jobs, - # also determine minimum runtime cache requirement. - # Yeah this is a bit of a mess but still runs extremely - # quickly even with tens of thousands of files/chunks + # run through the list of files and create the download jobs and also determine minimum + # runtime cache requirement by simulating adding/removing from cache during download. self.log.debug('Creating filetasks and chunktasks...') - for next_chunk in chunkstream_starts: - self.log.debug(f'- Chunkstream start: {next_chunk!r}') + for current_file in sorted(manifest.file_manifest_list.elements, + key=lambda a: a.filename.lower()): + # skip unchanged and empty files + if current_file.filename in mc.unchanged: + continue + elif not current_file.chunk_parts: + self.tasks.append(FileTask(current_file.filename, empty=True)) + continue - while file_deque := chunk_to_file_map.get(next_chunk.guid_num): - current_file = file_deque.popleft() + existing_chunks = re_usable.get(current_file.filename, None) + chunk_tasks = [] + reused = 0 - if len(file_deque) == 0: - del chunk_to_file_map[next_chunk.guid_num] + for cp in current_file.chunk_parts: + ct = ChunkTask(cp.guid_num, cp.offset, cp.size) - # skip unchanged files - if current_file.filename in mc.unchanged: - # self.log.debug(f' + Skipping unchanged file: {current_file.filename}') - next_chunk = current_file.chunk_parts[-1] - continue + # re-use the chunk from the existing file if we can + if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks: + reused += 1 + ct.chunk_file = current_file.filename + ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)] + else: + # add to DL list if not already in it + if cp.guid_num not in chunks_in_dl_list: + self.chunks_to_dl.append(cp.guid_num) + chunks_in_dl_list.add(cp.guid_num) - existing_chunks = re_usable.get(current_file.filename, None) - chunk_tasks = [] - reused = 0 + # if chunk has more than one use or is already in cache, + # check if we need to add or remove it again. + if references[cp.guid_num] > 1 or cp.guid_num in cached: + references[cp.guid_num] -= 1 - for cp in current_file.chunk_parts: - ct = ChunkTask(cp.guid_num, cp.offset, cp.size) - - # re-use the chunk from the existing file if we can - if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks: - reused += 1 - ct.chunk_file = current_file.filename - ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)] - else: - # add to DL list if not already in it - if cp.guid_num not in chunks_in_dl_list: - self.chunks_to_dl.append(cp.guid_num) - chunks_in_dl_list.add(cp.guid_num) - - # if chunk has more than one use or is already in cache, - # check if we need to add or remove it again. - if references[cp.guid_num] > 1 or cp.guid_num in cached: - references[cp.guid_num] -= 1 - - if references[cp.guid_num] < 1: # delete from cache again - current_cache_size -= analysis_res.biggest_chunk - cached.remove(cp.guid_num) - ct.cleanup = True - elif cp.guid_num not in cached: # add to cache - self.dl_cache_guids.add(cp.guid_num) - cached.add(cp.guid_num) - current_cache_size += analysis_res.biggest_chunk - else: + # delete from cache if no references left + if references[cp.guid_num] < 1: + current_cache_size -= analysis_res.biggest_chunk + cached.remove(cp.guid_num) ct.cleanup = True + # add to cache if not already cached + elif cp.guid_num not in cached: + dl_cache_guids.add(cp.guid_num) + cached.add(cp.guid_num) + current_cache_size += analysis_res.biggest_chunk + else: + ct.cleanup = True - chunk_tasks.append(ct) - - if reused: - self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') - self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True)) - else: - self.tasks.append(FileTask(current_file.filename, fopen=True)) + chunk_tasks.append(ct) + if reused: + self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') + # open temporary file that will contain download + old file contents + self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True)) self.tasks.extend(chunk_tasks) + self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) + # delete old file and rename temproary + self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, + temporary_filename=current_file.filename + u'.tmp')) + else: + self.tasks.append(FileTask(current_file.filename, fopen=True)) + self.tasks.extend(chunk_tasks) + self.tasks.append(FileTask(current_file.filename, close=True)) - if reused: - self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) - self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, - temporary_filename=current_file.filename + u'.tmp')) - else: - self.tasks.append(FileTask(current_file.filename, close=True)) - - if current_cache_size > last_cache_size: - self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB') - last_cache_size = current_cache_size - - next_chunk = current_file.chunk_parts[-1] - - # If this is not empty something went horribly wrong. - if chunk_to_file_map: - raise ValueError('Some files were not processed:', chunk_to_file_map) + # check if runtime cache size has changed + if current_cache_size > last_cache_size: + self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB') + last_cache_size = current_cache_size self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.') analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe @@ -457,7 +428,7 @@ class DLManager(Process): for fname in mc.removed: self.tasks.append(FileTask(fname, delete=True)) - analysis_res.num_chunks_cache = len(self.dl_cache_guids) + analysis_res.num_chunks_cache = len(dl_cache_guids) self.chunk_data_list = manifest.chunk_data_list self.analysis = analysis_res