From 9e01ae1c5b07336f57d3cb9ff7238c22fe0cd980 Mon Sep 17 00:00:00 2001 From: derrod Date: Sat, 14 Aug 2021 05:10:31 +0200 Subject: [PATCH] [downloader] Update to use new dataclasses --- legendary/downloader/manager.py | 65 +++++++++++-------------- legendary/downloader/workers.py | 86 ++++++++++++++------------------- 2 files changed, 63 insertions(+), 88 deletions(-) diff --git a/legendary/downloader/manager.py b/legendary/downloader/manager.py index 03fb96e..11c4c1e 100644 --- a/legendary/downloader/manager.py +++ b/legendary/downloader/manager.py @@ -14,7 +14,7 @@ from queue import Empty from sys import exit from threading import Condition, Thread -from legendary.downloader.workers import DLWorker, FileWorker +from legendary.downloader.mp.workers import DLWorker, FileWorker from legendary.models.downloading import * from legendary.models.manifest import ManifestComparison, Manifest @@ -151,7 +151,7 @@ class DLManager(Process): mc.changed -= files_to_skip mc.unchanged |= files_to_skip for fname in sorted(files_to_skip): - additional_deletion_tasks.append(FileTask(fname, delete=True, silent=True)) + additional_deletion_tasks.append(FileTask(fname, flags=TaskFlags.DELETE_FILE | TaskFlags.SILENT)) # if include/exclude prefix has been set: mark all files that are not to be downloaded as unchanged if file_exclude_filter: @@ -297,7 +297,7 @@ class DLManager(Process): if current_file.filename in mc.unchanged: continue elif not current_file.chunk_parts: - self.tasks.append(FileTask(current_file.filename, empty=True)) + self.tasks.append(FileTask(current_file.filename, flags=TaskFlags.CREATE_EMPTY_FILE)) continue existing_chunks = re_usable.get(current_file.filename, None) @@ -341,16 +341,16 @@ class DLManager(Process): if reused: self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') # open temporary file that will contain download + old file contents - self.tasks.append(FileTask(current_file.filename + u'.tmp', open=True)) + self.tasks.append(FileTask(current_file.filename + u'.tmp', flags=TaskFlags.OPEN_FILE)) self.tasks.extend(chunk_tasks) - self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) + self.tasks.append(FileTask(current_file.filename + u'.tmp', flags=TaskFlags.CLOSE_FILE)) # delete old file and rename temporary - self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, - temporary_filename=current_file.filename + u'.tmp')) + self.tasks.append(FileTask(current_file.filename, old_file=current_file.filename + u'.tmp', + flags=TaskFlags.RENAME_FILE | TaskFlags.DELETE_FILE)) else: - self.tasks.append(FileTask(current_file.filename, open=True)) + self.tasks.append(FileTask(current_file.filename, flags=TaskFlags.OPEN_FILE)) self.tasks.extend(chunk_tasks) - self.tasks.append(FileTask(current_file.filename, close=True)) + self.tasks.append(FileTask(current_file.filename, flags=TaskFlags.CLOSE_FILE)) # check if runtime cache size has changed if current_cache_size > last_cache_size: @@ -384,7 +384,7 @@ class DLManager(Process): # add jobs to remove files for fname in mc.removed: - self.tasks.append(FileTask(fname, delete=True)) + self.tasks.append(FileTask(fname, flags=TaskFlags.DELETE_FILE)) self.tasks.extend(additional_deletion_tasks) analysis_res.num_chunks_cache = len(dl_cache_guids) @@ -440,20 +440,9 @@ class DLManager(Process): while task and self.running: if isinstance(task, FileTask): # this wasn't necessarily a good idea... try: - if task.empty: - self.writer_queue.put(WriterTask(task.filename, empty=True), timeout=1.0) - elif task.rename: - self.writer_queue.put(WriterTask(task.filename, rename=True, - delete=task.delete, - old_filename=task.temporary_filename), - timeout=1.0) - elif task.delete: - self.writer_queue.put(WriterTask(task.filename, delete=True, silent=task.silent), timeout=1.0) - elif task.open: - self.writer_queue.put(WriterTask(task.filename, open=True), timeout=1.0) + self.writer_queue.put(WriterTask(**task.__dict__), timeout=1.0) + if task.flags & TaskFlags.OPEN_FILE: current_file = task.filename - elif task.close: - self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0) except Exception as e: self.tasks.appendleft(task) self.log.warning(f'Adding to queue failed: {e!r}') @@ -475,8 +464,8 @@ class DLManager(Process): self.writer_queue.put(WriterTask( filename=current_file, shared_memory=res_shm, chunk_offset=task.chunk_offset, chunk_size=task.chunk_size, - chunk_guid=task.chunk_guid, release_memory=task.cleanup, - old_file=task.chunk_file # todo on-disk cache + chunk_guid=task.chunk_guid, cache_file=task.chunk_file, + flags=TaskFlags.RELEASE_MEMORY if task.cleanup else TaskFlags.NONE ), timeout=1.0) except Exception as e: self.log.warning(f'Adding to queue failed: {e!r}') @@ -502,14 +491,13 @@ class DLManager(Process): if res.success: self.log.debug(f'Download for {res.chunk_guid} succeeded, adding to in_buffer...') in_buffer[res.chunk_guid] = res - self.bytes_downloaded_since_last += res.compressed_size - self.bytes_decompressed_since_last += res.size + self.bytes_downloaded_since_last += res.size_downloaded + self.bytes_decompressed_since_last += res.size_decompressed else: self.log.error(f'Download for {res.chunk_guid} failed, retrying...') try: - self.dl_worker_queue.put(DownloaderTask( - url=res.url, chunk_guid=res.chunk_guid, shm=res.shared_memory - ), timeout=1.0) + # since the result is a subclass of the task we can simply resubmit the result object + self.dl_worker_queue.put(res, timeout=1.0) self.active_tasks += 1 except Exception as e: self.log.warning(f'Failed adding retry task to queue! {e!r}') @@ -526,9 +514,14 @@ class DLManager(Process): while self.running: try: res = self.writer_result_q.get(timeout=1.0) + + if isinstance(res, TerminateWorkerTask): + self.log.debug('Got termination command in FW result handler') + break + self.num_tasks_processed_since_last += 1 - if res.closed and self.resume_file and res.success: + if res.flags & TaskFlags.CLOSE_FILE and self.resume_file and res.success: if res.filename.endswith('.tmp'): res.filename = res.filename[:-4] @@ -537,14 +530,10 @@ class DLManager(Process): with open(self.resume_file, 'ab') as rf: rf.write(f'{file_hash}:{res.filename}\n'.encode('utf-8')) - if res.kill: - self.log.debug('Got termination command in FW result handler') - break - if not res.success: # todo make this kill the installation process or at least skip the file and mark it as failed self.log.fatal(f'Writing for {res.filename} failed!') - if res.release_memory: + if res.flags & TaskFlags.RELEASE_MEMORY: self.sms.appendleft(res.shared_memory) with shm_cond: shm_cond.notify() @@ -731,10 +720,10 @@ class DLManager(Process): time.sleep(self.update_interval) for i in range(self.max_workers): - self.dl_worker_queue.put_nowait(DownloaderTask(kill=True)) + self.dl_worker_queue.put_nowait(TerminateWorkerTask()) self.log.info('Waiting for installation to finish...') - self.writer_queue.put_nowait(WriterTask('', kill=True)) + self.writer_queue.put_nowait(TerminateWorkerTask()) writer_p.join(timeout=10.0) if writer_p.exitcode is None: diff --git a/legendary/downloader/workers.py b/legendary/downloader/workers.py index 12bbf37..927e86b 100644 --- a/legendary/downloader/workers.py +++ b/legendary/downloader/workers.py @@ -11,7 +11,11 @@ from multiprocessing.shared_memory import SharedMemory from queue import Empty from legendary.models.chunk import Chunk -from legendary.models.downloading import DownloaderTaskResult, WriterTaskResult +from legendary.models.downloading import ( + DownloaderTask, DownloaderTaskResult, + WriterTask, WriterTaskResult, + TerminateWorkerTask, TaskFlags +) class DLWorker(Process): @@ -43,7 +47,7 @@ class DLWorker(Process): empty = False while True: try: - job = self.q.get(timeout=10.0) + job: DownloaderTask = self.q.get(timeout=10.0) empty = False except Empty: if not empty: @@ -51,12 +55,11 @@ class DLWorker(Process): empty = True continue - if job.kill: # let worker die - logger.debug(f'Worker received kill signal, shutting down...') + if isinstance(job, TerminateWorkerTask): # let worker die + logger.debug(f'Worker received termination signal, shutting down...') break tries = 0 - dl_start = dl_end = 0 compressed = 0 chunk = None @@ -64,7 +67,6 @@ class DLWorker(Process): while tries < self.max_retries: # print('Downloading', job.url) logger.debug(f'Downloading {job.url}') - dl_start = time.time() try: r = self.session.get(job.url, timeout=self.dl_timeout) @@ -73,7 +75,6 @@ class DLWorker(Process): logger.warning(f'Chunk download for {job.chunk_guid} failed: ({e!r}), retrying...') continue - dl_end = time.time() if r.status_code != 200: logger.warning(f'Chunk download for {job.chunk_guid} failed: status {r.status_code}, retrying...') continue @@ -86,14 +87,14 @@ class DLWorker(Process): except Exception as e: logger.error(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...') # add failed job to result queue to be requeued - self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url)) + self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__)) except KeyboardInterrupt: logger.warning('Immediate exit requested, quitting...') break if not chunk: logger.warning(f'Chunk somehow None?') - self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url)) + self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__)) continue # decompress stuff @@ -104,12 +105,11 @@ class DLWorker(Process): self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data) del chunk - self.o_q.put(DownloaderTaskResult(success=True, chunk_guid=job.chunk_guid, shm=job.shm, - url=job.url, size=size, compressed_size=compressed, - time_delta=dl_end - dl_start)) + self.o_q.put(DownloaderTaskResult(success=True, size_decompressed=size, + size_downloaded=compressed, **job.__dict__)) except Exception as e: logger.warning(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...') - self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url)) + self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__)) continue except KeyboardInterrupt: logger.warning('Immediate exit requested, quitting...') @@ -145,15 +145,17 @@ class FileWorker(Process): while True: try: try: - j = self.q.get(timeout=10.0) + j: WriterTask = self.q.get(timeout=10.0) except Empty: logger.warning('Writer queue empty!') continue - if j.kill: + if isinstance(j, TerminateWorkerTask): if current_file: current_file.close() - self.o_q.put(WriterTaskResult(success=True, kill=True)) + logger.debug(f'Worker received termination signal, shutting down...') + # send termination task to results halnder as well + self.o_q.put(TerminateWorkerTask()) break # make directories if required @@ -163,11 +165,11 @@ class FileWorker(Process): full_path = os.path.join(self.base_path, j.filename) - if j.empty: # just create an empty file + if j.flags & TaskFlags.CREATE_EMPTY_FILE: # just create an empty file open(full_path, 'a').close() - self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) + self.o_q.put(WriterTaskResult(success=True, **j.__dict__)) continue - elif j.open: + elif j.flags & TaskFlags.OPEN_FILE: if current_file: logger.warning(f'Opening new file {j.filename} without closing previous! {last_filename}') current_file.close() @@ -175,40 +177,40 @@ class FileWorker(Process): current_file = open(full_path, 'wb') last_filename = j.filename - self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) + self.o_q.put(WriterTaskResult(success=True, **j.__dict__)) continue - elif j.close: + elif j.flags & TaskFlags.CLOSE_FILE: if current_file: current_file.close() current_file = None else: logger.warning(f'Asking to close file that is not open: {j.filename}') - self.o_q.put(WriterTaskResult(success=True, filename=j.filename, closed=True)) + self.o_q.put(WriterTaskResult(success=True, **j.__dict__)) continue - elif j.rename: + elif j.flags & TaskFlags.RENAME_FILE: if current_file: logger.warning('Trying to rename file without closing first!') current_file.close() current_file = None - if j.delete: + if j.flags & TaskFlags.DELETE_FILE: try: os.remove(full_path) except OSError as e: logger.error(f'Removing file failed: {e!r}') - self.o_q.put(WriterTaskResult(success=False, filename=j.filename)) + self.o_q.put(WriterTaskResult(success=False, **j.__dict__)) continue try: - os.rename(os.path.join(self.base_path, j.old_filename), full_path) + os.rename(os.path.join(self.base_path, j.old_file), full_path) except OSError as e: logger.error(f'Renaming file failed: {e!r}') - self.o_q.put(WriterTaskResult(success=False, filename=j.filename)) + self.o_q.put(WriterTaskResult(success=False, **j.__dict__)) continue - self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) + self.o_q.put(WriterTaskResult(success=True, **j.__dict__)) continue - elif j.delete: + elif j.flags & TaskFlags.DELETE_FILE: if current_file: logger.warning('Trying to delete file without closing first!') current_file.close() @@ -217,51 +219,35 @@ class FileWorker(Process): try: os.remove(full_path) except OSError as e: - if not j.silent: + if not j.flags & TaskFlags.SILENT: logger.error(f'Removing file failed: {e!r}') - self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) + self.o_q.put(WriterTaskResult(success=True, **j.__dict__)) continue - pre_write = post_write = 0 - try: if j.shared_memory: - pre_write = time.time() shm_offset = j.shared_memory.offset + j.chunk_offset shm_end = shm_offset + j.chunk_size current_file.write(self.shm.buf[shm_offset:shm_end].tobytes()) - post_write = time.time() elif j.cache_file: - pre_write = time.time() with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f: if j.chunk_offset: f.seek(j.chunk_offset) current_file.write(f.read(j.chunk_size)) - post_write = time.time() elif j.old_file: - pre_write = time.time() with open(os.path.join(self.base_path, j.old_file), 'rb') as f: if j.chunk_offset: f.seek(j.chunk_offset) current_file.write(f.read(j.chunk_size)) - post_write = time.time() except Exception as e: logger.warning(f'Something in writing a file failed: {e!r}') - self.o_q.put(WriterTaskResult(success=False, filename=j.filename, - chunk_guid=j.chunk_guid, - release_memory=j.release_memory, - shared_memory=j.shared_memory, size=j.chunk_size, - time_delta=post_write-pre_write)) + self.o_q.put(WriterTaskResult(success=False, size=j.chunk_size, **j.__dict__)) else: - self.o_q.put(WriterTaskResult(success=True, filename=j.filename, - chunk_guid=j.chunk_guid, - release_memory=j.release_memory, - shared_memory=j.shared_memory, size=j.chunk_size, - time_delta=post_write-pre_write)) + self.o_q.put(WriterTaskResult(success=True, size=j.chunk_size, **j.__dict__)) except Exception as e: logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...') - self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid)) + self.o_q.put(WriterTaskResult(success=False, **j.__dict__)) try: if current_file: