[downloader] Clean up progress and other logging

This commit is contained in:
derrod 2020-05-22 14:31:49 +02:00
parent d62b45f899
commit 6b88d93576
2 changed files with 40 additions and 24 deletions

View file

@ -424,7 +424,7 @@ class DLManager(Process):
self.log.debug('Waiting for more shared memory...')
shm_cond.wait(timeout=1.0)
self.log.info('Download Job Manager quitting...')
self.log.debug('Download Job Manager quitting...')
def dl_results_handler(self, task_cond: Condition):
in_buffer = dict()
@ -515,7 +515,7 @@ class DLManager(Process):
except Exception as e:
self.log.warning(f'Unhandled exception when trying to read download result queue: {e!r}')
self.log.info('Download result handler quitting...')
self.log.debug('Download result handler quitting...')
def fw_results_handler(self, shm_cond: Condition):
while self.running:
@ -530,7 +530,7 @@ class DLManager(Process):
rf.write(f'{file_hash}:{res.filename}\n'.encode('utf-8'))
if res.kill:
self.log.info('Got termination command in FW result handler')
self.log.debug('Got termination command in FW result handler')
break
if not res.success:
@ -552,7 +552,7 @@ class DLManager(Process):
continue
except Exception as e:
self.log.warning(f'Exception when trying to read writer result queue: {e!r}')
self.log.info('Writer result handler quitting...')
self.log.debug('Writer result handler quitting...')
def run(self):
if not self.analysis:
@ -565,7 +565,7 @@ class DLManager(Process):
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
self.log = logging.getLogger('DLMProc')
self.log = logging.getLogger('DLManager')
self.log.info(f'Download Manager running with process-id: {os.getpid()}')
try:
@ -674,7 +674,7 @@ class DLManager(Process):
dl_unc_speed = self.bytes_decompressed_since_last / delta
w_speed = self.bytes_written_since_last / delta
r_speed = self.bytes_read_since_last / delta
c_speed = self.num_processed_since_last / delta
# c_speed = self.num_processed_since_last / delta
# set temporary counters to 0
self.bytes_read_since_last = self.bytes_written_since_last = 0
@ -683,24 +683,32 @@ class DLManager(Process):
last_update = time.time()
perc = (processed_chunks / num_chunk_tasks) * 100
self.log.info(f'\n============== {time.time() - s_time:.01f} seconds since start')
self.log.info(f'Progress: {processed_chunks}/{num_chunk_tasks} ({perc:.02f}%) chunk tasks processed.')
self.log.info(f'Downloaded: {total_dl / 1024 / 1024:.02f} MiB, '
f'Written: {total_write / 1024 / 1024:.02f} MiB')
# speed meters
self.log.info('Speeds:')
self.log.info(f' + Download - {dl_speed / 1024 / 1024:.02f} MiB/s (raw) '
f'/ {dl_unc_speed / 1024 / 1024:.02f} MiB/s (decompressed)')
self.log.info(f' + Write (disk) - {w_speed / 1024 / 1024:.02f} MiB/s')
self.log.info(f' + Read (disk) - {r_speed / 1024 / 1024:.02f} MiB/s')
self.log.info(f' + Tasks - {c_speed:.02f} Chunks/s')
self.log.info(f'Active download tasks: {self.active_tasks}')
# shared memory debugging
runtime = time.time() - s_time
total_avail = len(self.sms)
total_used = (num_shared_memory_segments - total_avail) * (self.analysis.biggest_chunk / 1024 / 1024)
self.log.info(f'Shared memory usage: {total_used} MiB, available: {total_avail}')
if runtime and processed_chunks:
rt_hours, runtime = int(runtime // 3600), runtime % 3600
rt_minutes, rt_seconds = int(runtime // 60), int(runtime % 60)
average_speed = processed_chunks / runtime
estimate = (num_chunk_tasks - processed_chunks) / average_speed
hours, estimate = int(estimate // 3600), estimate % 3600
minutes, seconds = int(estimate // 60), int(estimate % 60)
else:
hours = minutes = seconds = 0
rt_hours = rt_minutes = rt_seconds = 0
self.log.info(f'= Progress: {perc:.02f}% ({processed_chunks}/{num_chunk_tasks}), '
f'Running for {rt_hours:02d}:{rt_minutes:02d}:{rt_seconds:02d}, '
f'ETA: {hours:02d}:{minutes:02d}:{seconds:02d}')
self.log.info(f' - Downloaded: {total_dl / 1024 / 1024:.02f} MiB, '
f'Written: {total_write / 1024 / 1024:.02f} MiB')
self.log.info(f' - Cache usage: {total_used} MiB, active tasks: {self.active_tasks}')
self.log.info(f' + Download\t- {dl_speed / 1024 / 1024:.02f} MiB/s (raw) '
f'/ {dl_unc_speed / 1024 / 1024:.02f} MiB/s (decompressed)')
self.log.info(f' + Disk\t- {w_speed / 1024 / 1024:.02f} MiB/s (write) / '
f'{r_speed / 1024 / 1024:.02f} MiB/s (read)')
# send status update to back to instantiator (if queue exists)
if self.status_queue:
@ -717,12 +725,12 @@ class DLManager(Process):
for i in range(self.max_workers):
self.dl_worker_queue.put_nowait(DownloaderTask(kill=True))
self.log.info('Waiting for installation to finish...')
self.writer_queue.put_nowait(WriterTask('', kill=True))
self.log.info('Waiting for writer process to finish...')
writer_p.join(timeout=10.0)
if writer_p.exitcode is None:
self.log.warning(f'Terminating writer process {e!r}')
self.log.warning(f'Terminating writer process, no exit code!')
writer_p.terminate()
# forcibly kill DL workers that are not actually dead yet
@ -748,5 +756,6 @@ class DLManager(Process):
self.shared_memory.unlink()
self.shared_memory = None
self.log.info('All done! Download manager quitting...')
# finally, exit the process.
exit(0)

View file

@ -87,6 +87,9 @@ class DLWorker(Process):
logger.error(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
# add failed job to result queue to be requeued
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...')
break
if not chunk:
logger.warning(f'Chunk somehow None?')
@ -108,6 +111,9 @@ class DLWorker(Process):
logger.warning(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
continue
except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...')
break
self.shm.close()
@ -263,6 +269,7 @@ class FileWorker(Process):
except Exception as e:
logger.error(f'Closing file after error failed: {e!r}')
except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...')
if current_file:
current_file.close()
return