diff --git a/legendary/downloader/manager.py b/legendary/downloader/manager.py index 1d32784..09c54cc 100644 --- a/legendary/downloader/manager.py +++ b/legendary/downloader/manager.py @@ -44,7 +44,6 @@ class DLManager(Process): # Analysis stuff self.analysis = None self.tasks = deque() - self.dl_cache_guids = set() # guids that should be cached self.chunks_to_dl = deque() self.chunk_data_list = None @@ -301,35 +300,16 @@ class DLManager(Process): analysis_res.unchanged = len(mc.unchanged) self.log.debug(f'{analysis_res.unchanged} unchanged files') + # count references to chunks for determining runtime cache size later references = Counter() - chunkstream_starts = list() - # Chunks can have multiple entire files in them, the deque for a guid contains all files that start - # in that chunk (sorted by offset) so we can quickly and easily find the next link in the chunkstream. - # A nice side effect is that we can use this to check whether or not we missed something in the process. - chunk_to_file_map = defaultdict(deque) - - # Find chunkstream starts and also count references to each chunk - # Note that this has to be sorted to ensure the file map will be in the correct order - self.log.debug('Looking for chunkstreams and counting references...') - for fm in sorted(manifest.file_manifest_list.elements, - key=lambda x: x.chunk_parts[0].offset if x.chunk_parts else 0): - if not fm.chunk_parts: - self.tasks.append(FileTask(fm.filename, empty=True)) - continue - + for fm in manifest.file_manifest_list.elements: + # chunks of unchanged files are not downloaded so we can skip them if fm.filename in mc.unchanged: analysis_res.unchanged += fm.file_size + continue - for index, cp in enumerate(fm.chunk_parts): - if index == 0: - chunk_to_file_map[cp.guid_num].append(fm) - if cp.offset == 0: - self.log.debug(f'Found chunk stream start: {fm.filename}, {fm.chunk_parts[0]}') - chunkstream_starts.append(fm.chunk_parts[0]) - - # do not add references in case the file is unchanged and we do not need to download it anyway - if fm.filename not in mc.unchanged: - references[cp.guid_num] += 1 + for cp in fm.chunk_parts: + references[cp.guid_num] += 1 # determine reusable chunks and prepare lookup table for reusable ones re_usable = defaultdict(dict) @@ -353,90 +333,81 @@ class DLManager(Process): analysis_res.reuse_size += cp.size last_cache_size = current_cache_size = 0 + # set to determine whether a file is currently cached or not cached = set() # Using this secondary set is orders of magnitude faster than checking the deque. chunks_in_dl_list = set() + # This is just used to count all unique guids that have been cached + dl_cache_guids = set() - # run through the chunkstreams and create the download jobs, - # also determine minimum runtime cache requirement. - # Yeah this is a bit of a mess but still runs extremely - # quickly even with tens of thousands of files/chunks + # run through the list of files and create the download jobs and also determine minimum + # runtime cache requirement by simulating adding/removing from cache during download. self.log.debug('Creating filetasks and chunktasks...') - for next_chunk in chunkstream_starts: - self.log.debug(f'- Chunkstream start: {next_chunk!r}') + for current_file in sorted(manifest.file_manifest_list.elements, + key=lambda a: a.filename.lower()): + # skip unchanged and empty files + if current_file.filename in mc.unchanged: + continue + elif not current_file.chunk_parts: + self.tasks.append(FileTask(current_file.filename, empty=True)) + continue - while file_deque := chunk_to_file_map.get(next_chunk.guid_num): - current_file = file_deque.popleft() + existing_chunks = re_usable.get(current_file.filename, None) + chunk_tasks = [] + reused = 0 - if len(file_deque) == 0: - del chunk_to_file_map[next_chunk.guid_num] + for cp in current_file.chunk_parts: + ct = ChunkTask(cp.guid_num, cp.offset, cp.size) - # skip unchanged files - if current_file.filename in mc.unchanged: - # self.log.debug(f' + Skipping unchanged file: {current_file.filename}') - next_chunk = current_file.chunk_parts[-1] - continue + # re-use the chunk from the existing file if we can + if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks: + reused += 1 + ct.chunk_file = current_file.filename + ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)] + else: + # add to DL list if not already in it + if cp.guid_num not in chunks_in_dl_list: + self.chunks_to_dl.append(cp.guid_num) + chunks_in_dl_list.add(cp.guid_num) - existing_chunks = re_usable.get(current_file.filename, None) - chunk_tasks = [] - reused = 0 + # if chunk has more than one use or is already in cache, + # check if we need to add or remove it again. + if references[cp.guid_num] > 1 or cp.guid_num in cached: + references[cp.guid_num] -= 1 - for cp in current_file.chunk_parts: - ct = ChunkTask(cp.guid_num, cp.offset, cp.size) - - # re-use the chunk from the existing file if we can - if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks: - reused += 1 - ct.chunk_file = current_file.filename - ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)] - else: - # add to DL list if not already in it - if cp.guid_num not in chunks_in_dl_list: - self.chunks_to_dl.append(cp.guid_num) - chunks_in_dl_list.add(cp.guid_num) - - # if chunk has more than one use or is already in cache, - # check if we need to add or remove it again. - if references[cp.guid_num] > 1 or cp.guid_num in cached: - references[cp.guid_num] -= 1 - - if references[cp.guid_num] < 1: # delete from cache again - current_cache_size -= analysis_res.biggest_chunk - cached.remove(cp.guid_num) - ct.cleanup = True - elif cp.guid_num not in cached: # add to cache - self.dl_cache_guids.add(cp.guid_num) - cached.add(cp.guid_num) - current_cache_size += analysis_res.biggest_chunk - else: + # delete from cache if no references left + if references[cp.guid_num] < 1: + current_cache_size -= analysis_res.biggest_chunk + cached.remove(cp.guid_num) ct.cleanup = True + # add to cache if not already cached + elif cp.guid_num not in cached: + dl_cache_guids.add(cp.guid_num) + cached.add(cp.guid_num) + current_cache_size += analysis_res.biggest_chunk + else: + ct.cleanup = True - chunk_tasks.append(ct) - - if reused: - self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') - self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True)) - else: - self.tasks.append(FileTask(current_file.filename, fopen=True)) + chunk_tasks.append(ct) + if reused: + self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') + # open temporary file that will contain download + old file contents + self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True)) self.tasks.extend(chunk_tasks) + self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) + # delete old file and rename temproary + self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, + temporary_filename=current_file.filename + u'.tmp')) + else: + self.tasks.append(FileTask(current_file.filename, fopen=True)) + self.tasks.extend(chunk_tasks) + self.tasks.append(FileTask(current_file.filename, close=True)) - if reused: - self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) - self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, - temporary_filename=current_file.filename + u'.tmp')) - else: - self.tasks.append(FileTask(current_file.filename, close=True)) - - if current_cache_size > last_cache_size: - self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB') - last_cache_size = current_cache_size - - next_chunk = current_file.chunk_parts[-1] - - # If this is not empty something went horribly wrong. - if chunk_to_file_map: - raise ValueError('Some files were not processed:', chunk_to_file_map) + # check if runtime cache size has changed + if current_cache_size > last_cache_size: + self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB') + last_cache_size = current_cache_size self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.') analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe @@ -457,7 +428,7 @@ class DLManager(Process): for fname in mc.removed: self.tasks.append(FileTask(fname, delete=True)) - analysis_res.num_chunks_cache = len(self.dl_cache_guids) + analysis_res.num_chunks_cache = len(dl_cache_guids) self.chunk_data_list = manifest.chunk_data_list self.analysis = analysis_res