[downloader] Update to use new dataclasses

This commit is contained in:
derrod 2021-08-14 05:10:31 +02:00
parent 1fe0dab78b
commit 9e01ae1c5b
2 changed files with 63 additions and 88 deletions

View file

@ -14,7 +14,7 @@ from queue import Empty
from sys import exit from sys import exit
from threading import Condition, Thread from threading import Condition, Thread
from legendary.downloader.workers import DLWorker, FileWorker from legendary.downloader.mp.workers import DLWorker, FileWorker
from legendary.models.downloading import * from legendary.models.downloading import *
from legendary.models.manifest import ManifestComparison, Manifest from legendary.models.manifest import ManifestComparison, Manifest
@ -151,7 +151,7 @@ class DLManager(Process):
mc.changed -= files_to_skip mc.changed -= files_to_skip
mc.unchanged |= files_to_skip mc.unchanged |= files_to_skip
for fname in sorted(files_to_skip): for fname in sorted(files_to_skip):
additional_deletion_tasks.append(FileTask(fname, delete=True, silent=True)) additional_deletion_tasks.append(FileTask(fname, flags=TaskFlags.DELETE_FILE | TaskFlags.SILENT))
# if include/exclude prefix has been set: mark all files that are not to be downloaded as unchanged # if include/exclude prefix has been set: mark all files that are not to be downloaded as unchanged
if file_exclude_filter: if file_exclude_filter:
@ -297,7 +297,7 @@ class DLManager(Process):
if current_file.filename in mc.unchanged: if current_file.filename in mc.unchanged:
continue continue
elif not current_file.chunk_parts: elif not current_file.chunk_parts:
self.tasks.append(FileTask(current_file.filename, empty=True)) self.tasks.append(FileTask(current_file.filename, flags=TaskFlags.CREATE_EMPTY_FILE))
continue continue
existing_chunks = re_usable.get(current_file.filename, None) existing_chunks = re_usable.get(current_file.filename, None)
@ -341,16 +341,16 @@ class DLManager(Process):
if reused: if reused:
self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}')
# open temporary file that will contain download + old file contents # open temporary file that will contain download + old file contents
self.tasks.append(FileTask(current_file.filename + u'.tmp', open=True)) self.tasks.append(FileTask(current_file.filename + u'.tmp', flags=TaskFlags.OPEN_FILE))
self.tasks.extend(chunk_tasks) self.tasks.extend(chunk_tasks)
self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) self.tasks.append(FileTask(current_file.filename + u'.tmp', flags=TaskFlags.CLOSE_FILE))
# delete old file and rename temporary # delete old file and rename temporary
self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, self.tasks.append(FileTask(current_file.filename, old_file=current_file.filename + u'.tmp',
temporary_filename=current_file.filename + u'.tmp')) flags=TaskFlags.RENAME_FILE | TaskFlags.DELETE_FILE))
else: else:
self.tasks.append(FileTask(current_file.filename, open=True)) self.tasks.append(FileTask(current_file.filename, flags=TaskFlags.OPEN_FILE))
self.tasks.extend(chunk_tasks) self.tasks.extend(chunk_tasks)
self.tasks.append(FileTask(current_file.filename, close=True)) self.tasks.append(FileTask(current_file.filename, flags=TaskFlags.CLOSE_FILE))
# check if runtime cache size has changed # check if runtime cache size has changed
if current_cache_size > last_cache_size: if current_cache_size > last_cache_size:
@ -384,7 +384,7 @@ class DLManager(Process):
# add jobs to remove files # add jobs to remove files
for fname in mc.removed: for fname in mc.removed:
self.tasks.append(FileTask(fname, delete=True)) self.tasks.append(FileTask(fname, flags=TaskFlags.DELETE_FILE))
self.tasks.extend(additional_deletion_tasks) self.tasks.extend(additional_deletion_tasks)
analysis_res.num_chunks_cache = len(dl_cache_guids) analysis_res.num_chunks_cache = len(dl_cache_guids)
@ -440,20 +440,9 @@ class DLManager(Process):
while task and self.running: while task and self.running:
if isinstance(task, FileTask): # this wasn't necessarily a good idea... if isinstance(task, FileTask): # this wasn't necessarily a good idea...
try: try:
if task.empty: self.writer_queue.put(WriterTask(**task.__dict__), timeout=1.0)
self.writer_queue.put(WriterTask(task.filename, empty=True), timeout=1.0) if task.flags & TaskFlags.OPEN_FILE:
elif task.rename:
self.writer_queue.put(WriterTask(task.filename, rename=True,
delete=task.delete,
old_filename=task.temporary_filename),
timeout=1.0)
elif task.delete:
self.writer_queue.put(WriterTask(task.filename, delete=True, silent=task.silent), timeout=1.0)
elif task.open:
self.writer_queue.put(WriterTask(task.filename, open=True), timeout=1.0)
current_file = task.filename current_file = task.filename
elif task.close:
self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0)
except Exception as e: except Exception as e:
self.tasks.appendleft(task) self.tasks.appendleft(task)
self.log.warning(f'Adding to queue failed: {e!r}') self.log.warning(f'Adding to queue failed: {e!r}')
@ -475,8 +464,8 @@ class DLManager(Process):
self.writer_queue.put(WriterTask( self.writer_queue.put(WriterTask(
filename=current_file, shared_memory=res_shm, filename=current_file, shared_memory=res_shm,
chunk_offset=task.chunk_offset, chunk_size=task.chunk_size, chunk_offset=task.chunk_offset, chunk_size=task.chunk_size,
chunk_guid=task.chunk_guid, release_memory=task.cleanup, chunk_guid=task.chunk_guid, cache_file=task.chunk_file,
old_file=task.chunk_file # todo on-disk cache flags=TaskFlags.RELEASE_MEMORY if task.cleanup else TaskFlags.NONE
), timeout=1.0) ), timeout=1.0)
except Exception as e: except Exception as e:
self.log.warning(f'Adding to queue failed: {e!r}') self.log.warning(f'Adding to queue failed: {e!r}')
@ -502,14 +491,13 @@ class DLManager(Process):
if res.success: if res.success:
self.log.debug(f'Download for {res.chunk_guid} succeeded, adding to in_buffer...') self.log.debug(f'Download for {res.chunk_guid} succeeded, adding to in_buffer...')
in_buffer[res.chunk_guid] = res in_buffer[res.chunk_guid] = res
self.bytes_downloaded_since_last += res.compressed_size self.bytes_downloaded_since_last += res.size_downloaded
self.bytes_decompressed_since_last += res.size self.bytes_decompressed_since_last += res.size_decompressed
else: else:
self.log.error(f'Download for {res.chunk_guid} failed, retrying...') self.log.error(f'Download for {res.chunk_guid} failed, retrying...')
try: try:
self.dl_worker_queue.put(DownloaderTask( # since the result is a subclass of the task we can simply resubmit the result object
url=res.url, chunk_guid=res.chunk_guid, shm=res.shared_memory self.dl_worker_queue.put(res, timeout=1.0)
), timeout=1.0)
self.active_tasks += 1 self.active_tasks += 1
except Exception as e: except Exception as e:
self.log.warning(f'Failed adding retry task to queue! {e!r}') self.log.warning(f'Failed adding retry task to queue! {e!r}')
@ -526,9 +514,14 @@ class DLManager(Process):
while self.running: while self.running:
try: try:
res = self.writer_result_q.get(timeout=1.0) res = self.writer_result_q.get(timeout=1.0)
if isinstance(res, TerminateWorkerTask):
self.log.debug('Got termination command in FW result handler')
break
self.num_tasks_processed_since_last += 1 self.num_tasks_processed_since_last += 1
if res.closed and self.resume_file and res.success: if res.flags & TaskFlags.CLOSE_FILE and self.resume_file and res.success:
if res.filename.endswith('.tmp'): if res.filename.endswith('.tmp'):
res.filename = res.filename[:-4] res.filename = res.filename[:-4]
@ -537,14 +530,10 @@ class DLManager(Process):
with open(self.resume_file, 'ab') as rf: with open(self.resume_file, 'ab') as rf:
rf.write(f'{file_hash}:{res.filename}\n'.encode('utf-8')) rf.write(f'{file_hash}:{res.filename}\n'.encode('utf-8'))
if res.kill:
self.log.debug('Got termination command in FW result handler')
break
if not res.success: if not res.success:
# todo make this kill the installation process or at least skip the file and mark it as failed # todo make this kill the installation process or at least skip the file and mark it as failed
self.log.fatal(f'Writing for {res.filename} failed!') self.log.fatal(f'Writing for {res.filename} failed!')
if res.release_memory: if res.flags & TaskFlags.RELEASE_MEMORY:
self.sms.appendleft(res.shared_memory) self.sms.appendleft(res.shared_memory)
with shm_cond: with shm_cond:
shm_cond.notify() shm_cond.notify()
@ -731,10 +720,10 @@ class DLManager(Process):
time.sleep(self.update_interval) time.sleep(self.update_interval)
for i in range(self.max_workers): for i in range(self.max_workers):
self.dl_worker_queue.put_nowait(DownloaderTask(kill=True)) self.dl_worker_queue.put_nowait(TerminateWorkerTask())
self.log.info('Waiting for installation to finish...') self.log.info('Waiting for installation to finish...')
self.writer_queue.put_nowait(WriterTask('', kill=True)) self.writer_queue.put_nowait(TerminateWorkerTask())
writer_p.join(timeout=10.0) writer_p.join(timeout=10.0)
if writer_p.exitcode is None: if writer_p.exitcode is None:

View file

@ -11,7 +11,11 @@ from multiprocessing.shared_memory import SharedMemory
from queue import Empty from queue import Empty
from legendary.models.chunk import Chunk from legendary.models.chunk import Chunk
from legendary.models.downloading import DownloaderTaskResult, WriterTaskResult from legendary.models.downloading import (
DownloaderTask, DownloaderTaskResult,
WriterTask, WriterTaskResult,
TerminateWorkerTask, TaskFlags
)
class DLWorker(Process): class DLWorker(Process):
@ -43,7 +47,7 @@ class DLWorker(Process):
empty = False empty = False
while True: while True:
try: try:
job = self.q.get(timeout=10.0) job: DownloaderTask = self.q.get(timeout=10.0)
empty = False empty = False
except Empty: except Empty:
if not empty: if not empty:
@ -51,12 +55,11 @@ class DLWorker(Process):
empty = True empty = True
continue continue
if job.kill: # let worker die if isinstance(job, TerminateWorkerTask): # let worker die
logger.debug(f'Worker received kill signal, shutting down...') logger.debug(f'Worker received termination signal, shutting down...')
break break
tries = 0 tries = 0
dl_start = dl_end = 0
compressed = 0 compressed = 0
chunk = None chunk = None
@ -64,7 +67,6 @@ class DLWorker(Process):
while tries < self.max_retries: while tries < self.max_retries:
# print('Downloading', job.url) # print('Downloading', job.url)
logger.debug(f'Downloading {job.url}') logger.debug(f'Downloading {job.url}')
dl_start = time.time()
try: try:
r = self.session.get(job.url, timeout=self.dl_timeout) r = self.session.get(job.url, timeout=self.dl_timeout)
@ -73,7 +75,6 @@ class DLWorker(Process):
logger.warning(f'Chunk download for {job.chunk_guid} failed: ({e!r}), retrying...') logger.warning(f'Chunk download for {job.chunk_guid} failed: ({e!r}), retrying...')
continue continue
dl_end = time.time()
if r.status_code != 200: if r.status_code != 200:
logger.warning(f'Chunk download for {job.chunk_guid} failed: status {r.status_code}, retrying...') logger.warning(f'Chunk download for {job.chunk_guid} failed: status {r.status_code}, retrying...')
continue continue
@ -86,14 +87,14 @@ class DLWorker(Process):
except Exception as e: except Exception as e:
logger.error(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...') logger.error(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...')
# add failed job to result queue to be requeued # add failed job to result queue to be requeued
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url)) self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__))
except KeyboardInterrupt: except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...') logger.warning('Immediate exit requested, quitting...')
break break
if not chunk: if not chunk:
logger.warning(f'Chunk somehow None?') logger.warning(f'Chunk somehow None?')
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url)) self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__))
continue continue
# decompress stuff # decompress stuff
@ -104,12 +105,11 @@ class DLWorker(Process):
self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data) self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data)
del chunk del chunk
self.o_q.put(DownloaderTaskResult(success=True, chunk_guid=job.chunk_guid, shm=job.shm, self.o_q.put(DownloaderTaskResult(success=True, size_decompressed=size,
url=job.url, size=size, compressed_size=compressed, size_downloaded=compressed, **job.__dict__))
time_delta=dl_end - dl_start))
except Exception as e: except Exception as e:
logger.warning(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...') logger.warning(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...')
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url)) self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__))
continue continue
except KeyboardInterrupt: except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...') logger.warning('Immediate exit requested, quitting...')
@ -145,15 +145,17 @@ class FileWorker(Process):
while True: while True:
try: try:
try: try:
j = self.q.get(timeout=10.0) j: WriterTask = self.q.get(timeout=10.0)
except Empty: except Empty:
logger.warning('Writer queue empty!') logger.warning('Writer queue empty!')
continue continue
if j.kill: if isinstance(j, TerminateWorkerTask):
if current_file: if current_file:
current_file.close() current_file.close()
self.o_q.put(WriterTaskResult(success=True, kill=True)) logger.debug(f'Worker received termination signal, shutting down...')
# send termination task to results halnder as well
self.o_q.put(TerminateWorkerTask())
break break
# make directories if required # make directories if required
@ -163,11 +165,11 @@ class FileWorker(Process):
full_path = os.path.join(self.base_path, j.filename) full_path = os.path.join(self.base_path, j.filename)
if j.empty: # just create an empty file if j.flags & TaskFlags.CREATE_EMPTY_FILE: # just create an empty file
open(full_path, 'a').close() open(full_path, 'a').close()
self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
continue continue
elif j.open: elif j.flags & TaskFlags.OPEN_FILE:
if current_file: if current_file:
logger.warning(f'Opening new file {j.filename} without closing previous! {last_filename}') logger.warning(f'Opening new file {j.filename} without closing previous! {last_filename}')
current_file.close() current_file.close()
@ -175,40 +177,40 @@ class FileWorker(Process):
current_file = open(full_path, 'wb') current_file = open(full_path, 'wb')
last_filename = j.filename last_filename = j.filename
self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
continue continue
elif j.close: elif j.flags & TaskFlags.CLOSE_FILE:
if current_file: if current_file:
current_file.close() current_file.close()
current_file = None current_file = None
else: else:
logger.warning(f'Asking to close file that is not open: {j.filename}') logger.warning(f'Asking to close file that is not open: {j.filename}')
self.o_q.put(WriterTaskResult(success=True, filename=j.filename, closed=True)) self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
continue continue
elif j.rename: elif j.flags & TaskFlags.RENAME_FILE:
if current_file: if current_file:
logger.warning('Trying to rename file without closing first!') logger.warning('Trying to rename file without closing first!')
current_file.close() current_file.close()
current_file = None current_file = None
if j.delete: if j.flags & TaskFlags.DELETE_FILE:
try: try:
os.remove(full_path) os.remove(full_path)
except OSError as e: except OSError as e:
logger.error(f'Removing file failed: {e!r}') logger.error(f'Removing file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename)) self.o_q.put(WriterTaskResult(success=False, **j.__dict__))
continue continue
try: try:
os.rename(os.path.join(self.base_path, j.old_filename), full_path) os.rename(os.path.join(self.base_path, j.old_file), full_path)
except OSError as e: except OSError as e:
logger.error(f'Renaming file failed: {e!r}') logger.error(f'Renaming file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename)) self.o_q.put(WriterTaskResult(success=False, **j.__dict__))
continue continue
self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
continue continue
elif j.delete: elif j.flags & TaskFlags.DELETE_FILE:
if current_file: if current_file:
logger.warning('Trying to delete file without closing first!') logger.warning('Trying to delete file without closing first!')
current_file.close() current_file.close()
@ -217,51 +219,35 @@ class FileWorker(Process):
try: try:
os.remove(full_path) os.remove(full_path)
except OSError as e: except OSError as e:
if not j.silent: if not j.flags & TaskFlags.SILENT:
logger.error(f'Removing file failed: {e!r}') logger.error(f'Removing file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
continue continue
pre_write = post_write = 0
try: try:
if j.shared_memory: if j.shared_memory:
pre_write = time.time()
shm_offset = j.shared_memory.offset + j.chunk_offset shm_offset = j.shared_memory.offset + j.chunk_offset
shm_end = shm_offset + j.chunk_size shm_end = shm_offset + j.chunk_size
current_file.write(self.shm.buf[shm_offset:shm_end].tobytes()) current_file.write(self.shm.buf[shm_offset:shm_end].tobytes())
post_write = time.time()
elif j.cache_file: elif j.cache_file:
pre_write = time.time()
with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f: with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f:
if j.chunk_offset: if j.chunk_offset:
f.seek(j.chunk_offset) f.seek(j.chunk_offset)
current_file.write(f.read(j.chunk_size)) current_file.write(f.read(j.chunk_size))
post_write = time.time()
elif j.old_file: elif j.old_file:
pre_write = time.time()
with open(os.path.join(self.base_path, j.old_file), 'rb') as f: with open(os.path.join(self.base_path, j.old_file), 'rb') as f:
if j.chunk_offset: if j.chunk_offset:
f.seek(j.chunk_offset) f.seek(j.chunk_offset)
current_file.write(f.read(j.chunk_size)) current_file.write(f.read(j.chunk_size))
post_write = time.time()
except Exception as e: except Exception as e:
logger.warning(f'Something in writing a file failed: {e!r}') logger.warning(f'Something in writing a file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename, self.o_q.put(WriterTaskResult(success=False, size=j.chunk_size, **j.__dict__))
chunk_guid=j.chunk_guid,
release_memory=j.release_memory,
shared_memory=j.shared_memory, size=j.chunk_size,
time_delta=post_write-pre_write))
else: else:
self.o_q.put(WriterTaskResult(success=True, filename=j.filename, self.o_q.put(WriterTaskResult(success=True, size=j.chunk_size, **j.__dict__))
chunk_guid=j.chunk_guid,
release_memory=j.release_memory,
shared_memory=j.shared_memory, size=j.chunk_size,
time_delta=post_write-pre_write))
except Exception as e: except Exception as e:
logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...') logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid)) self.o_q.put(WriterTaskResult(success=False, **j.__dict__))
try: try:
if current_file: if current_file: