1
0
Fork 0
mirror of synced 2024-10-03 10:47:18 +13:00
Rare/rare/lgndr/manager.py

198 lines
8.4 KiB
Python
Raw Normal View History

2022-07-16 06:06:41 +12:00
import logging
import os
import time
from multiprocessing import Queue as MPQueue
from multiprocessing.shared_memory import SharedMemory
from sys import exit
from threading import Condition, Thread
2022-07-16 06:06:41 +12:00
import legendary.downloader.mp.manager
from legendary.downloader.mp.workers import DLWorker, FileWorker
from legendary.models.downloading import ChunkTask, SharedMemorySegment, TerminateWorkerTask
from .downloading import UIUpdate
class DLManager(legendary.downloader.mp.manager.DLManager):
# fmt: off
2022-07-16 06:06:41 +12:00
# @staticmethod
def run_real(self):
self.shared_memory = SharedMemory(create=True, size=self.max_shared_memory)
self.log.debug(f'Created shared memory of size: {self.shared_memory.size / 1024 / 1024:.02f} MiB')
# create the shared memory segments and add them to their respective pools
for i in range(int(self.shared_memory.size / self.analysis.biggest_chunk)):
_sms = SharedMemorySegment(offset=i * self.analysis.biggest_chunk,
end=i * self.analysis.biggest_chunk + self.analysis.biggest_chunk)
self.sms.append(_sms)
self.log.debug(f'Created {len(self.sms)} shared memory segments.')
# Create queues
self.dl_worker_queue = MPQueue(-1)
self.writer_queue = MPQueue(-1)
self.dl_result_q = MPQueue(-1)
self.writer_result_q = MPQueue(-1)
self.log.info(f'Starting download workers...')
for i in range(self.max_workers):
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q,
self.shared_memory.name, logging_queue=self.logging_queue,
dl_timeout=self.dl_timeout)
self.children.append(w)
w.start()
self.log.info('Starting file writing worker...')
writer_p = FileWorker(self.writer_queue, self.writer_result_q, self.dl_dir,
self.shared_memory.name, self.cache_dir, self.logging_queue)
self.children.append(writer_p)
writer_p.start()
num_chunk_tasks = sum(isinstance(t, ChunkTask) for t in self.tasks)
num_dl_tasks = len(self.chunks_to_dl)
num_tasks = len(self.tasks)
num_shared_memory_segments = len(self.sms)
self.log.debug(f'Chunks to download: {num_dl_tasks}, File tasks: {num_tasks}, Chunk tasks: {num_chunk_tasks}')
# active downloader tasks
self.active_tasks = 0
processed_chunks = 0
processed_tasks = 0
total_dl = 0
total_write = 0
# synchronization conditions
shm_cond = Condition()
task_cond = Condition()
self.conditions = [shm_cond, task_cond]
# start threads
s_time = time.time()
self.threads.append(Thread(target=self.download_job_manager, args=(task_cond, shm_cond)))
self.threads.append(Thread(target=self.dl_results_handler, args=(task_cond,)))
self.threads.append(Thread(target=self.fw_results_handler, args=(shm_cond,)))
for t in self.threads:
t.start()
last_update = time.time()
while processed_tasks < num_tasks:
delta = time.time() - last_update
if not delta:
time.sleep(self.update_interval)
continue
# update all the things
processed_chunks += self.num_processed_since_last
processed_tasks += self.num_tasks_processed_since_last
total_dl += self.bytes_downloaded_since_last
total_write += self.bytes_written_since_last
dl_speed = self.bytes_downloaded_since_last / delta
dl_unc_speed = self.bytes_decompressed_since_last / delta
w_speed = self.bytes_written_since_last / delta
r_speed = self.bytes_read_since_last / delta
# c_speed = self.num_processed_since_last / delta
# set temporary counters to 0
self.bytes_read_since_last = self.bytes_written_since_last = 0
self.bytes_downloaded_since_last = self.num_processed_since_last = 0
self.bytes_decompressed_since_last = self.num_tasks_processed_since_last = 0
last_update = time.time()
perc = (processed_chunks / num_chunk_tasks) * 100
runtime = time.time() - s_time
total_avail = len(self.sms)
total_used = (num_shared_memory_segments - total_avail) * (self.analysis.biggest_chunk / 1024 / 1024)
if runtime and processed_chunks:
average_speed = processed_chunks / runtime
estimate = (num_chunk_tasks - processed_chunks) / average_speed
hours, estimate = int(estimate // 3600), estimate % 3600
minutes, seconds = int(estimate // 60), int(estimate % 60)
rt_hours, runtime = int(runtime // 3600), runtime % 3600
rt_minutes, rt_seconds = int(runtime // 60), int(runtime % 60)
else:
estimate = 0
hours = minutes = seconds = 0
rt_hours = rt_minutes = rt_seconds = 0
log_level = self.log.level
# lk: Disable up to INFO logging level for the segment below
self.log.setLevel(logging.ERROR)
self.log.info(f'= Progress: {perc:.02f}% ({processed_chunks}/{num_chunk_tasks}), '
f'Running for {rt_hours:02d}:{rt_minutes:02d}:{rt_seconds:02d}, '
f'ETA: {hours:02d}:{minutes:02d}:{seconds:02d}')
self.log.info(f' - Downloaded: {total_dl / 1024 / 1024:.02f} MiB, '
f'Written: {total_write / 1024 / 1024:.02f} MiB')
self.log.info(f' - Cache usage: {total_used:.02f} MiB, active tasks: {self.active_tasks}')
self.log.info(f' + Download\t- {dl_speed / 1024 / 1024:.02f} MiB/s (raw) '
f'/ {dl_unc_speed / 1024 / 1024:.02f} MiB/s (decompressed)')
self.log.info(f' + Disk\t- {w_speed / 1024 / 1024:.02f} MiB/s (write) / '
f'{r_speed / 1024 / 1024:.02f} MiB/s (read)')
# lk: Restore previous logging level
self.log.setLevel(log_level)
# send status update to back to instantiator (if queue exists)
if self.status_queue:
try:
self.status_queue.put(UIUpdate(
progress=perc, download_speed=dl_unc_speed, write_speed=w_speed, read_speed=r_speed,
runtime=round(runtime),
estimated_time_left=round(estimate),
processed_chunks=processed_chunks,
chunk_tasks=num_chunk_tasks,
total_downloaded=total_dl,
total_written=total_write,
cache_usage=total_used,
active_tasks=self.active_tasks,
download_compressed_speed=dl_speed,
memory_usage=total_used * 1024 * 1024
), timeout=1.0)
except Exception as e:
self.log.warning(f'Failed to send status update to queue: {e!r}')
time.sleep(self.update_interval)
for i in range(self.max_workers):
self.dl_worker_queue.put_nowait(TerminateWorkerTask())
self.log.info('Waiting for installation to finish...')
self.writer_queue.put_nowait(TerminateWorkerTask())
writer_p.join(timeout=10.0)
if writer_p.exitcode is None:
self.log.warning(f'Terminating writer process, no exit code!')
writer_p.terminate()
# forcibly kill DL workers that are not actually dead yet
for child in self.children:
if child.exitcode is None:
child.terminate()
# make sure all the threads are dead.
for t in self.threads:
t.join(timeout=5.0)
if t.is_alive():
self.log.warning(f'Thread did not terminate! {repr(t)}')
# clean up resume file
if self.resume_file:
try:
os.remove(self.resume_file)
except OSError as e:
self.log.warning(f'Failed to remove resume file: {e!r}')
# close up shared memory
self.shared_memory.close()
self.shared_memory.unlink()
self.shared_memory = None
self.log.info('All done! Download manager quitting...')
# finally, exit the process.
exit(0)
# fmt: on