From dc381cacb0566b4426ed457b148274d6b3336632 Mon Sep 17 00:00:00 2001 From: derrod Date: Wed, 11 Aug 2021 09:21:19 +0200 Subject: [PATCH] [downloader] Adjust for changes in new dataclass attribute names --- legendary/downloader/manager.py | 18 +++++++++--------- legendary/downloader/workers.py | 24 ++++++++++++------------ 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/legendary/downloader/manager.py b/legendary/downloader/manager.py index 0b84d2a..03fb96e 100644 --- a/legendary/downloader/manager.py +++ b/legendary/downloader/manager.py @@ -341,14 +341,14 @@ class DLManager(Process): if reused: self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') # open temporary file that will contain download + old file contents - self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True)) + self.tasks.append(FileTask(current_file.filename + u'.tmp', open=True)) self.tasks.extend(chunk_tasks) self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) # delete old file and rename temporary self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, temporary_filename=current_file.filename + u'.tmp')) else: - self.tasks.append(FileTask(current_file.filename, fopen=True)) + self.tasks.append(FileTask(current_file.filename, open=True)) self.tasks.extend(chunk_tasks) self.tasks.append(FileTask(current_file.filename, close=True)) @@ -450,7 +450,7 @@ class DLManager(Process): elif task.delete: self.writer_queue.put(WriterTask(task.filename, delete=True, silent=task.silent), timeout=1.0) elif task.open: - self.writer_queue.put(WriterTask(task.filename, fopen=True), timeout=1.0) + self.writer_queue.put(WriterTask(task.filename, open=True), timeout=1.0) current_file = task.filename elif task.close: self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0) @@ -500,15 +500,15 @@ class DLManager(Process): task_cond.notify() if res.success: - self.log.debug(f'Download for {res.guid} succeeded, adding to in_buffer...') - in_buffer[res.guid] = res + self.log.debug(f'Download for {res.chunk_guid} succeeded, adding to in_buffer...') + in_buffer[res.chunk_guid] = res self.bytes_downloaded_since_last += res.compressed_size self.bytes_decompressed_since_last += res.size else: - self.log.error(f'Download for {res.guid} failed, retrying...') + self.log.error(f'Download for {res.chunk_guid} failed, retrying...') try: self.dl_worker_queue.put(DownloaderTask( - url=res.url, chunk_guid=res.guid, shm=res.shm + url=res.url, chunk_guid=res.chunk_guid, shm=res.shared_memory ), timeout=1.0) self.active_tasks += 1 except Exception as e: @@ -545,14 +545,14 @@ class DLManager(Process): # todo make this kill the installation process or at least skip the file and mark it as failed self.log.fatal(f'Writing for {res.filename} failed!') if res.release_memory: - self.sms.appendleft(res.shm) + self.sms.appendleft(res.shared_memory) with shm_cond: shm_cond.notify() if res.chunk_guid: self.bytes_written_since_last += res.size # if there's no shared memory we must have read from disk. - if not res.shm: + if not res.shared_memory: self.bytes_read_since_last += res.size self.num_processed_since_last += 1 diff --git a/legendary/downloader/workers.py b/legendary/downloader/workers.py index 8f34e83..12bbf37 100644 --- a/legendary/downloader/workers.py +++ b/legendary/downloader/workers.py @@ -70,12 +70,12 @@ class DLWorker(Process): r = self.session.get(job.url, timeout=self.dl_timeout) r.raise_for_status() except Exception as e: - logger.warning(f'Chunk download for {job.guid} failed: ({e!r}), retrying...') + logger.warning(f'Chunk download for {job.chunk_guid} failed: ({e!r}), retrying...') continue dl_end = time.time() if r.status_code != 200: - logger.warning(f'Chunk download for {job.guid} failed: status {r.status_code}, retrying...') + logger.warning(f'Chunk download for {job.chunk_guid} failed: status {r.status_code}, retrying...') continue else: compressed = len(r.content) @@ -84,16 +84,16 @@ class DLWorker(Process): else: raise TimeoutError('Max retries reached') except Exception as e: - logger.error(f'Job for {job.guid} failed with: {e!r}, fetching next one...') + logger.error(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...') # add failed job to result queue to be requeued - self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) + self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url)) except KeyboardInterrupt: logger.warning('Immediate exit requested, quitting...') break if not chunk: logger.warning(f'Chunk somehow None?') - self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) + self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url)) continue # decompress stuff @@ -104,12 +104,12 @@ class DLWorker(Process): self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data) del chunk - self.o_q.put(DownloaderTaskResult(success=True, chunk_guid=job.guid, shm=job.shm, + self.o_q.put(DownloaderTaskResult(success=True, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url, size=size, compressed_size=compressed, time_delta=dl_end - dl_start)) except Exception as e: - logger.warning(f'Job for {job.guid} failed with: {e!r}, fetching next one...') - self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) + logger.warning(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...') + self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url)) continue except KeyboardInterrupt: logger.warning('Immediate exit requested, quitting...') @@ -226,9 +226,9 @@ class FileWorker(Process): pre_write = post_write = 0 try: - if j.shm: + if j.shared_memory: pre_write = time.time() - shm_offset = j.shm.offset + j.chunk_offset + shm_offset = j.shared_memory.offset + j.chunk_offset shm_end = shm_offset + j.chunk_size current_file.write(self.shm.buf[shm_offset:shm_end].tobytes()) post_write = time.time() @@ -251,13 +251,13 @@ class FileWorker(Process): self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid, release_memory=j.release_memory, - shm=j.shm, size=j.chunk_size, + shared_memory=j.shared_memory, size=j.chunk_size, time_delta=post_write-pre_write)) else: self.o_q.put(WriterTaskResult(success=True, filename=j.filename, chunk_guid=j.chunk_guid, release_memory=j.release_memory, - shm=j.shm, size=j.chunk_size, + shared_memory=j.shared_memory, size=j.chunk_size, time_delta=post_write-pre_write)) except Exception as e: logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...')