diff --git a/legendary/downloader/manager.py b/legendary/downloader/manager.py index cbf0ef3..16f2f3b 100644 --- a/legendary/downloader/manager.py +++ b/legendary/downloader/manager.py @@ -78,6 +78,277 @@ class DLManager(Process): self.num_processed_since_last = 0 self.num_tasks_processed_since_last = 0 + def run_analysis(self, manifest: Manifest, old_manifest: Manifest = None, + patch=True, resume=True, file_prefix_filter=None, + file_exclude_filter=None, file_install_tag=None, + processing_optimization=False) -> AnalysisResult: + """ + Run analysis on manifest and old manifest (if not None) and return a result + with a summary resources required in order to install the provided manifest. + + :param manifest: Manifest to install + :param old_manifest: Old manifest to patch from (if applicable) + :param patch: Patch instead of redownloading the entire file + :param resume: Continue based on resume file if it exists + :param file_prefix_filter: Only download files that start with this prefix + :param file_exclude_filter: Exclude files with this prefix from download + :param file_install_tag: Only install files with the specified tag + :param processing_optimization: Attempt to optimize processing order and RAM usage + :return: AnalysisResult + """ + + analysis_res = AnalysisResult() + analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements) + analysis_res.biggest_chunk = max(c.window_size for c in manifest.chunk_data_list.elements) + analysis_res.biggest_file_size = max(f.file_size for f in manifest.file_manifest_list.elements) + is_1mib = analysis_res.biggest_chunk == 1024 * 1024 + self.log.debug(f'Biggest chunk size: {analysis_res.biggest_chunk} bytes (== 1 MiB? {is_1mib})') + + self.log.debug(f'Creating manifest comparison...') + mc = ManifestComparison.create(manifest, old_manifest) + analysis_res.manifest_comparison = mc + + if resume and self.resume_file and os.path.exists(self.resume_file): + try: + completed_files = set(i.strip() for i in open(self.resume_file).readlines()) + # remove completed files from changed/added and move them to unchanged for the analysis. + mc.added -= completed_files + mc.changed -= completed_files + mc.unchanged |= completed_files + self.log.debug(f'Skipped {len(completed_files)} files based on resume data!') + except Exception as e: + self.log.warning(f'Reading resume file failed: {e!r}, continuing as normal...') + + # Not entirely sure what install tags are used for, only some titles have them. + # Let's add it for testing anyway. + if file_install_tag: + files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements + if file_install_tag not in i.install_tags) + self.log.info(f'Found {len(files_to_skip)} files to skip based on install tag.') + mc.added -= files_to_skip + mc.changed -= files_to_skip + mc.unchanged |= files_to_skip + + # if include/exclude prefix has been set: mark all files that are not to be downloaded as unchanged + if file_exclude_filter: + file_exclude_filter = file_exclude_filter.lower() + files_to_skip = set(i for i in mc.added | mc.changed if i.lower().startswith(file_exclude_filter)) + self.log.info(f'Found {len(files_to_skip)} files to skip based on exclude prefix.') + mc.added -= files_to_skip + mc.changed -= files_to_skip + mc.unchanged |= files_to_skip + + if file_prefix_filter: + file_prefix_filter = file_prefix_filter.lower() + files_to_skip = set(i for i in mc.added | mc.changed if not i.lower().startswith(file_prefix_filter)) + self.log.info(f'Found {len(files_to_skip)} files to skip based on include prefix.') + mc.added -= files_to_skip + mc.changed -= files_to_skip + mc.unchanged |= files_to_skip + + if file_prefix_filter or file_exclude_filter or file_install_tag: + self.log.info(f'Remaining files after filtering: {len(mc.added) + len(mc.changed)}') + # correct install size after filtering + analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements + if fm.filename in mc.added) + + if mc.removed: + analysis_res.removed = len(mc.removed) + self.log.debug(f'{analysis_res.removed} removed files') + if mc.added: + analysis_res.added = len(mc.added) + self.log.debug(f'{analysis_res.added} added files') + if mc.changed: + analysis_res.changed = len(mc.changed) + self.log.debug(f'{analysis_res.changed} changed files') + if mc.unchanged: + analysis_res.unchanged = len(mc.unchanged) + self.log.debug(f'{analysis_res.unchanged} unchanged files') + + if processing_optimization and len(manifest.file_manifest_list.elements) > 8_000: + self.log.warning('Manifest contains too many files, processing optimizations will be disabled.') + processing_optimization = False + elif processing_optimization: + self.log.info('Processing order optimization is enabled, analysis may take a few seconds longer...') + + # count references to chunks for determining runtime cache size later + references = Counter() + file_to_chunks = defaultdict(set) + fmlist = sorted(manifest.file_manifest_list.elements, + key=lambda a: a.filename.lower()) + + for fm in fmlist: + # chunks of unchanged files are not downloaded so we can skip them + if fm.filename in mc.unchanged: + analysis_res.unchanged += fm.file_size + continue + + for cp in fm.chunk_parts: + references[cp.guid_num] += 1 + if processing_optimization: + file_to_chunks[fm.filename].add(cp.guid_num) + + if processing_optimization: + # reorder the file manifest list to group files that share many chunks + # 5 is mostly arbitrary but has shown in testing to be a good choice + min_overlap = 5 + # enumerate the file list to try and find a "partner" for + # each file that shares the most chunks with it. + partners = dict() + filenames = [fm.filename for fm in fmlist] + + for num, filename in enumerate(filenames[:int((len(filenames) + 1) / 2)]): + chunks = file_to_chunks[filename] + max_overlap = min_overlap + + for other_file in filenames[num + 1:]: + overlap = len(chunks & file_to_chunks[other_file]) + if overlap > max_overlap: + partners[filename] = other_file + max_overlap = overlap + + # iterate over all the files again and this time around + _fmlist = [] + processed = set() + for fm in fmlist: + if fm.filename in processed: + continue + _fmlist.append(fm) + processed.add(fm.filename) + # try to find the file's "partner" + partner = partners.get(fm.filename, None) + if not partner or partner in processed: + continue + + partner_fm = manifest.file_manifest_list.get_file_by_path(partner) + _fmlist.append(partner_fm) + processed.add(partner) + + fmlist = _fmlist + + # determine reusable chunks and prepare lookup table for reusable ones + re_usable = defaultdict(dict) + if old_manifest and mc.changed and patch: + self.log.debug('Analyzing manifests for re-usable chunks...') + for changed in mc.changed: + old_file = old_manifest.file_manifest_list.get_file_by_path(changed) + new_file = manifest.file_manifest_list.get_file_by_path(changed) + + existing_chunks = dict() + off = 0 + for cp in old_file.chunk_parts: + existing_chunks[(cp.guid_num, cp.offset, cp.size)] = off + off += cp.size + + for cp in new_file.chunk_parts: + key = (cp.guid_num, cp.offset, cp.size) + if key in existing_chunks: + references[cp.guid_num] -= 1 + re_usable[changed][key] = existing_chunks[key] + analysis_res.reuse_size += cp.size + + last_cache_size = current_cache_size = 0 + # set to determine whether a file is currently cached or not + cached = set() + # Using this secondary set is orders of magnitude faster than checking the deque. + chunks_in_dl_list = set() + # This is just used to count all unique guids that have been cached + dl_cache_guids = set() + + # run through the list of files and create the download jobs and also determine minimum + # runtime cache requirement by simulating adding/removing from cache during download. + self.log.debug('Creating filetasks and chunktasks...') + for current_file in fmlist: + # skip unchanged and empty files + if current_file.filename in mc.unchanged: + continue + elif not current_file.chunk_parts: + self.tasks.append(FileTask(current_file.filename, empty=True)) + continue + + existing_chunks = re_usable.get(current_file.filename, None) + chunk_tasks = [] + reused = 0 + + for cp in current_file.chunk_parts: + ct = ChunkTask(cp.guid_num, cp.offset, cp.size) + + # re-use the chunk from the existing file if we can + if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks: + reused += 1 + ct.chunk_file = current_file.filename + ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)] + else: + # add to DL list if not already in it + if cp.guid_num not in chunks_in_dl_list: + self.chunks_to_dl.append(cp.guid_num) + chunks_in_dl_list.add(cp.guid_num) + + # if chunk has more than one use or is already in cache, + # check if we need to add or remove it again. + if references[cp.guid_num] > 1 or cp.guid_num in cached: + references[cp.guid_num] -= 1 + + # delete from cache if no references left + if references[cp.guid_num] < 1: + current_cache_size -= analysis_res.biggest_chunk + cached.remove(cp.guid_num) + ct.cleanup = True + # add to cache if not already cached + elif cp.guid_num not in cached: + dl_cache_guids.add(cp.guid_num) + cached.add(cp.guid_num) + current_cache_size += analysis_res.biggest_chunk + else: + ct.cleanup = True + + chunk_tasks.append(ct) + + if reused: + self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') + # open temporary file that will contain download + old file contents + self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True)) + self.tasks.extend(chunk_tasks) + self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) + # delete old file and rename temproary + self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, + temporary_filename=current_file.filename + u'.tmp')) + else: + self.tasks.append(FileTask(current_file.filename, fopen=True)) + self.tasks.extend(chunk_tasks) + self.tasks.append(FileTask(current_file.filename, close=True)) + + # check if runtime cache size has changed + if current_cache_size > last_cache_size: + self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB') + last_cache_size = current_cache_size + + self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.') + analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe + + # Todo implement on-disk caching to avoid this issue. + if analysis_res.min_memory > self.max_shared_memory: + shared_mib = f'{self.max_shared_memory / 1024 / 1024:.01f} MiB' + required_mib = f'{analysis_res.min_memory / 1024 / 1024:.01f} MiB' + raise MemoryError(f'Current shared memory cache is smaller than required! {shared_mib} < {required_mib}. ' + f'Try running legendary with the --enable-reordering flag to reduce memory usage.') + + # calculate actual dl and patch write size. + analysis_res.dl_size = \ + sum(c.file_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list) + analysis_res.uncompressed_dl_size = \ + sum(c.window_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list) + + # add jobs to remove files + for fname in mc.removed: + self.tasks.append(FileTask(fname, delete=True)) + + analysis_res.num_chunks_cache = len(dl_cache_guids) + self.chunk_data_list = manifest.chunk_data_list + self.analysis = analysis_res + + return analysis_res + def download_job_manager(self, task_cond: Condition, shm_cond: Condition): while self.chunks_to_dl and self.running: while self.active_tasks < self.max_workers * 2 and self.chunks_to_dl: @@ -243,277 +514,6 @@ class DLManager(Process): self.log.warning(f'Exception when trying to read writer result queue: {e!r}') self.log.info('Writer result handler quitting...') - def run_analysis(self, manifest: Manifest, old_manifest: Manifest = None, - patch=True, resume=True, file_prefix_filter=None, - file_exclude_filter=None, file_install_tag=None, - processing_optimization=False) -> AnalysisResult: - """ - Run analysis on manifest and old manifest (if not None) and return a result - with a summary resources required in order to install the provided manifest. - - :param manifest: Manifest to install - :param old_manifest: Old manifest to patch from (if applicable) - :param patch: Patch instead of redownloading the entire file - :param resume: Continue based on resume file if it exists - :param file_prefix_filter: Only download files that start with this prefix - :param file_exclude_filter: Exclude files with this prefix from download - :param file_install_tag: Only install files with the specified tag - :param processing_optimization: Attempt to optimize processing order and RAM usage - :return: AnalysisResult - """ - - analysis_res = AnalysisResult() - analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements) - analysis_res.biggest_chunk = max(c.window_size for c in manifest.chunk_data_list.elements) - analysis_res.biggest_file_size = max(f.file_size for f in manifest.file_manifest_list.elements) - is_1mib = analysis_res.biggest_chunk == 1024 * 1024 - self.log.debug(f'Biggest chunk size: {analysis_res.biggest_chunk} bytes (== 1 MiB? {is_1mib})') - - self.log.debug(f'Creating manifest comparison...') - mc = ManifestComparison.create(manifest, old_manifest) - analysis_res.manifest_comparison = mc - - if resume and self.resume_file and os.path.exists(self.resume_file): - try: - completed_files = set(i.strip() for i in open(self.resume_file).readlines()) - # remove completed files from changed/added and move them to unchanged for the analysis. - mc.added -= completed_files - mc.changed -= completed_files - mc.unchanged |= completed_files - self.log.debug(f'Skipped {len(completed_files)} files based on resume data!') - except Exception as e: - self.log.warning(f'Reading resume file failed: {e!r}, continuing as normal...') - - # Not entirely sure what install tags are used for, only some titles have them. - # Let's add it for testing anyway. - if file_install_tag: - files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements - if file_install_tag not in i.install_tags) - self.log.info(f'Found {len(files_to_skip)} files to skip based on install tag.') - mc.added -= files_to_skip - mc.changed -= files_to_skip - mc.unchanged |= files_to_skip - - # if include/exclude prefix has been set: mark all files that are not to be downloaded as unchanged - if file_exclude_filter: - file_exclude_filter = file_exclude_filter.lower() - files_to_skip = set(i for i in mc.added | mc.changed if i.lower().startswith(file_exclude_filter)) - self.log.info(f'Found {len(files_to_skip)} files to skip based on exclude prefix.') - mc.added -= files_to_skip - mc.changed -= files_to_skip - mc.unchanged |= files_to_skip - - if file_prefix_filter: - file_prefix_filter = file_prefix_filter.lower() - files_to_skip = set(i for i in mc.added | mc.changed if not i.lower().startswith(file_prefix_filter)) - self.log.info(f'Found {len(files_to_skip)} files to skip based on include prefix.') - mc.added -= files_to_skip - mc.changed -= files_to_skip - mc.unchanged |= files_to_skip - - if file_prefix_filter or file_exclude_filter or file_install_tag: - self.log.info(f'Remaining files after filtering: {len(mc.added) + len(mc.changed)}') - # correct install size after filtering - analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements - if fm.filename in mc.added) - - if mc.removed: - analysis_res.removed = len(mc.removed) - self.log.debug(f'{analysis_res.removed} removed files') - if mc.added: - analysis_res.added = len(mc.added) - self.log.debug(f'{analysis_res.added} added files') - if mc.changed: - analysis_res.changed = len(mc.changed) - self.log.debug(f'{analysis_res.changed} changed files') - if mc.unchanged: - analysis_res.unchanged = len(mc.unchanged) - self.log.debug(f'{analysis_res.unchanged} unchanged files') - - if processing_optimization and len(manifest.file_manifest_list.elements) > 8_000: - self.log.warning('Manifest contains too many files, processing optimizations will be disabled.') - processing_optimization = False - elif processing_optimization: - self.log.info('Processing order optimization is enabled, analysis may take a few seconds longer...') - - # count references to chunks for determining runtime cache size later - references = Counter() - file_to_chunks = defaultdict(set) - fmlist = sorted(manifest.file_manifest_list.elements, - key=lambda a: a.filename.lower()) - - for fm in fmlist: - # chunks of unchanged files are not downloaded so we can skip them - if fm.filename in mc.unchanged: - analysis_res.unchanged += fm.file_size - continue - - for cp in fm.chunk_parts: - references[cp.guid_num] += 1 - if processing_optimization: - file_to_chunks[fm.filename].add(cp.guid_num) - - if processing_optimization: - # reorder the file manifest list to group files that share many chunks - # 5 is mostly arbitrary but has shown in testing to be a good choice - min_overlap = 5 - # enumerate the file list to try and find a "partner" for - # each file that shares the most chunks with it. - partners = dict() - filenames = [fm.filename for fm in fmlist] - - for num, filename in enumerate(filenames[:int((len(filenames)+1)/2)]): - chunks = file_to_chunks[filename] - max_overlap = min_overlap - - for other_file in filenames[num+1:]: - overlap = len(chunks & file_to_chunks[other_file]) - if overlap > max_overlap: - partners[filename] = other_file - max_overlap = overlap - - # iterate over all the files again and this time around - _fmlist = [] - processed = set() - for fm in fmlist: - if fm.filename in processed: - continue - _fmlist.append(fm) - processed.add(fm.filename) - # try to find the file's "partner" - partner = partners.get(fm.filename, None) - if not partner or partner in processed: - continue - - partner_fm = manifest.file_manifest_list.get_file_by_path(partner) - _fmlist.append(partner_fm) - processed.add(partner) - - fmlist = _fmlist - - # determine reusable chunks and prepare lookup table for reusable ones - re_usable = defaultdict(dict) - if old_manifest and mc.changed and patch: - self.log.debug('Analyzing manifests for re-usable chunks...') - for changed in mc.changed: - old_file = old_manifest.file_manifest_list.get_file_by_path(changed) - new_file = manifest.file_manifest_list.get_file_by_path(changed) - - existing_chunks = dict() - off = 0 - for cp in old_file.chunk_parts: - existing_chunks[(cp.guid_num, cp.offset, cp.size)] = off - off += cp.size - - for cp in new_file.chunk_parts: - key = (cp.guid_num, cp.offset, cp.size) - if key in existing_chunks: - references[cp.guid_num] -= 1 - re_usable[changed][key] = existing_chunks[key] - analysis_res.reuse_size += cp.size - - last_cache_size = current_cache_size = 0 - # set to determine whether a file is currently cached or not - cached = set() - # Using this secondary set is orders of magnitude faster than checking the deque. - chunks_in_dl_list = set() - # This is just used to count all unique guids that have been cached - dl_cache_guids = set() - - # run through the list of files and create the download jobs and also determine minimum - # runtime cache requirement by simulating adding/removing from cache during download. - self.log.debug('Creating filetasks and chunktasks...') - for current_file in fmlist: - # skip unchanged and empty files - if current_file.filename in mc.unchanged: - continue - elif not current_file.chunk_parts: - self.tasks.append(FileTask(current_file.filename, empty=True)) - continue - - existing_chunks = re_usable.get(current_file.filename, None) - chunk_tasks = [] - reused = 0 - - for cp in current_file.chunk_parts: - ct = ChunkTask(cp.guid_num, cp.offset, cp.size) - - # re-use the chunk from the existing file if we can - if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks: - reused += 1 - ct.chunk_file = current_file.filename - ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)] - else: - # add to DL list if not already in it - if cp.guid_num not in chunks_in_dl_list: - self.chunks_to_dl.append(cp.guid_num) - chunks_in_dl_list.add(cp.guid_num) - - # if chunk has more than one use or is already in cache, - # check if we need to add or remove it again. - if references[cp.guid_num] > 1 or cp.guid_num in cached: - references[cp.guid_num] -= 1 - - # delete from cache if no references left - if references[cp.guid_num] < 1: - current_cache_size -= analysis_res.biggest_chunk - cached.remove(cp.guid_num) - ct.cleanup = True - # add to cache if not already cached - elif cp.guid_num not in cached: - dl_cache_guids.add(cp.guid_num) - cached.add(cp.guid_num) - current_cache_size += analysis_res.biggest_chunk - else: - ct.cleanup = True - - chunk_tasks.append(ct) - - if reused: - self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') - # open temporary file that will contain download + old file contents - self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True)) - self.tasks.extend(chunk_tasks) - self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) - # delete old file and rename temproary - self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, - temporary_filename=current_file.filename + u'.tmp')) - else: - self.tasks.append(FileTask(current_file.filename, fopen=True)) - self.tasks.extend(chunk_tasks) - self.tasks.append(FileTask(current_file.filename, close=True)) - - # check if runtime cache size has changed - if current_cache_size > last_cache_size: - self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB') - last_cache_size = current_cache_size - - self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.') - analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe - - # Todo implement on-disk caching to avoid this issue. - if analysis_res.min_memory > self.max_shared_memory: - shared_mib = f'{self.max_shared_memory / 1024 / 1024:.01f} MiB' - required_mib = f'{analysis_res.min_memory / 1024 / 1024:.01f} MiB' - raise MemoryError(f'Current shared memory cache is smaller than required! {shared_mib} < {required_mib}. ' - f'Try running legendary with the --enable-reordering flag to reduce memory usage.') - - # calculate actual dl and patch write size. - analysis_res.dl_size = \ - sum(c.file_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list) - analysis_res.uncompressed_dl_size = \ - sum(c.window_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list) - - # add jobs to remove files - for fname in mc.removed: - self.tasks.append(FileTask(fname, delete=True)) - - analysis_res.num_chunks_cache = len(dl_cache_guids) - self.chunk_data_list = manifest.chunk_data_list - self.analysis = analysis_res - - return analysis_res - def run(self): if not self.analysis: raise ValueError('Did not run analysis before trying to run download!')