2022-07-16 06:06:41 +12:00
|
|
|
import logging
|
2022-04-19 02:51:51 +12:00
|
|
|
import os
|
2022-08-03 11:33:50 +12:00
|
|
|
import queue
|
2022-04-19 02:51:51 +12:00
|
|
|
import time
|
|
|
|
from multiprocessing import Queue as MPQueue
|
|
|
|
from multiprocessing.shared_memory import SharedMemory
|
|
|
|
from sys import exit
|
|
|
|
from threading import Condition, Thread
|
|
|
|
|
2022-07-21 08:33:44 +12:00
|
|
|
from legendary.downloader.mp.manager import DLManager as DLManagerReal
|
2022-04-19 02:51:51 +12:00
|
|
|
from legendary.downloader.mp.workers import DLWorker, FileWorker
|
|
|
|
from legendary.models.downloading import ChunkTask, SharedMemorySegment, TerminateWorkerTask
|
|
|
|
|
2022-10-27 14:00:48 +13:00
|
|
|
from rare.lgndr.glue.monkeys import DLManagerSignals
|
|
|
|
from rare.lgndr.models.downloading import UIUpdate
|
2022-04-19 02:51:51 +12:00
|
|
|
|
|
|
|
|
2022-08-03 11:33:50 +12:00
|
|
|
# fmt: off
|
2022-07-21 08:33:44 +12:00
|
|
|
class DLManager(DLManagerReal):
|
2023-12-14 02:00:36 +13:00
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
super(DLManager, self).__init__(*args, **kwargs)
|
|
|
|
self.log.info("Using Rare's DLManager monkey")
|
|
|
|
|
2022-08-03 11:33:50 +12:00
|
|
|
# Rare: prototype to avoid undefined variable in type checkers
|
|
|
|
signals_queue: MPQueue
|
|
|
|
|
2022-07-16 06:06:41 +12:00
|
|
|
# @staticmethod
|
2022-04-19 02:51:51 +12:00
|
|
|
def run_real(self):
|
|
|
|
self.shared_memory = SharedMemory(create=True, size=self.max_shared_memory)
|
|
|
|
self.log.debug(f'Created shared memory of size: {self.shared_memory.size / 1024 / 1024:.02f} MiB')
|
|
|
|
|
|
|
|
# create the shared memory segments and add them to their respective pools
|
|
|
|
for i in range(int(self.shared_memory.size / self.analysis.biggest_chunk)):
|
|
|
|
_sms = SharedMemorySegment(offset=i * self.analysis.biggest_chunk,
|
|
|
|
end=i * self.analysis.biggest_chunk + self.analysis.biggest_chunk)
|
|
|
|
self.sms.append(_sms)
|
|
|
|
|
|
|
|
self.log.debug(f'Created {len(self.sms)} shared memory segments.')
|
|
|
|
|
|
|
|
# Create queues
|
|
|
|
self.dl_worker_queue = MPQueue(-1)
|
|
|
|
self.writer_queue = MPQueue(-1)
|
|
|
|
self.dl_result_q = MPQueue(-1)
|
|
|
|
self.writer_result_q = MPQueue(-1)
|
|
|
|
|
|
|
|
self.log.info(f'Starting download workers...')
|
2023-12-10 23:09:26 +13:00
|
|
|
|
|
|
|
bind_ip = None
|
2022-04-19 02:51:51 +12:00
|
|
|
for i in range(self.max_workers):
|
2023-12-10 23:09:26 +13:00
|
|
|
if self.bind_ips:
|
|
|
|
bind_ip = self.bind_ips[i % len(self.bind_ips)]
|
|
|
|
|
2022-04-19 02:51:51 +12:00
|
|
|
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q,
|
|
|
|
self.shared_memory.name, logging_queue=self.logging_queue,
|
2023-12-10 23:09:26 +13:00
|
|
|
dl_timeout=self.dl_timeout, bind_addr=bind_ip)
|
2022-04-19 02:51:51 +12:00
|
|
|
self.children.append(w)
|
|
|
|
w.start()
|
|
|
|
|
|
|
|
self.log.info('Starting file writing worker...')
|
|
|
|
writer_p = FileWorker(self.writer_queue, self.writer_result_q, self.dl_dir,
|
|
|
|
self.shared_memory.name, self.cache_dir, self.logging_queue)
|
|
|
|
self.children.append(writer_p)
|
|
|
|
writer_p.start()
|
|
|
|
|
|
|
|
num_chunk_tasks = sum(isinstance(t, ChunkTask) for t in self.tasks)
|
|
|
|
num_dl_tasks = len(self.chunks_to_dl)
|
|
|
|
num_tasks = len(self.tasks)
|
|
|
|
num_shared_memory_segments = len(self.sms)
|
|
|
|
self.log.debug(f'Chunks to download: {num_dl_tasks}, File tasks: {num_tasks}, Chunk tasks: {num_chunk_tasks}')
|
|
|
|
|
|
|
|
# active downloader tasks
|
|
|
|
self.active_tasks = 0
|
|
|
|
processed_chunks = 0
|
|
|
|
processed_tasks = 0
|
|
|
|
total_dl = 0
|
|
|
|
total_write = 0
|
|
|
|
|
|
|
|
# synchronization conditions
|
|
|
|
shm_cond = Condition()
|
|
|
|
task_cond = Condition()
|
|
|
|
self.conditions = [shm_cond, task_cond]
|
|
|
|
|
|
|
|
# start threads
|
2023-11-30 23:51:39 +13:00
|
|
|
s_time = time.perf_counter()
|
2022-04-19 02:51:51 +12:00
|
|
|
self.threads.append(Thread(target=self.download_job_manager, args=(task_cond, shm_cond)))
|
|
|
|
self.threads.append(Thread(target=self.dl_results_handler, args=(task_cond,)))
|
|
|
|
self.threads.append(Thread(target=self.fw_results_handler, args=(shm_cond,)))
|
|
|
|
|
|
|
|
for t in self.threads:
|
|
|
|
t.start()
|
|
|
|
|
2023-11-30 23:51:39 +13:00
|
|
|
last_update = time.perf_counter()
|
2022-04-19 02:51:51 +12:00
|
|
|
|
2022-08-03 11:33:50 +12:00
|
|
|
# Rare: kill requested
|
|
|
|
kill_request = False
|
|
|
|
|
2022-04-19 02:51:51 +12:00
|
|
|
while processed_tasks < num_tasks:
|
2023-11-30 23:51:39 +13:00
|
|
|
delta = time.perf_counter() - last_update
|
2022-04-19 02:51:51 +12:00
|
|
|
if not delta:
|
|
|
|
time.sleep(self.update_interval)
|
|
|
|
continue
|
|
|
|
|
|
|
|
# update all the things
|
|
|
|
processed_chunks += self.num_processed_since_last
|
|
|
|
processed_tasks += self.num_tasks_processed_since_last
|
|
|
|
|
|
|
|
total_dl += self.bytes_downloaded_since_last
|
|
|
|
total_write += self.bytes_written_since_last
|
|
|
|
|
|
|
|
dl_speed = self.bytes_downloaded_since_last / delta
|
|
|
|
dl_unc_speed = self.bytes_decompressed_since_last / delta
|
|
|
|
w_speed = self.bytes_written_since_last / delta
|
|
|
|
r_speed = self.bytes_read_since_last / delta
|
|
|
|
# c_speed = self.num_processed_since_last / delta
|
|
|
|
|
|
|
|
# set temporary counters to 0
|
|
|
|
self.bytes_read_since_last = self.bytes_written_since_last = 0
|
|
|
|
self.bytes_downloaded_since_last = self.num_processed_since_last = 0
|
|
|
|
self.bytes_decompressed_since_last = self.num_tasks_processed_since_last = 0
|
2023-11-30 23:51:39 +13:00
|
|
|
last_update = time.perf_counter()
|
2022-04-19 02:51:51 +12:00
|
|
|
|
|
|
|
perc = (processed_chunks / num_chunk_tasks) * 100
|
2023-11-30 23:51:39 +13:00
|
|
|
runtime = time.perf_counter() - s_time
|
2022-04-19 02:51:51 +12:00
|
|
|
total_avail = len(self.sms)
|
|
|
|
total_used = (num_shared_memory_segments - total_avail) * (self.analysis.biggest_chunk / 1024 / 1024)
|
|
|
|
|
2022-06-26 06:48:25 +12:00
|
|
|
if runtime and processed_chunks:
|
2022-04-19 02:51:51 +12:00
|
|
|
average_speed = processed_chunks / runtime
|
|
|
|
estimate = (num_chunk_tasks - processed_chunks) / average_speed
|
2022-06-26 06:48:25 +12:00
|
|
|
hours, estimate = int(estimate // 3600), estimate % 3600
|
|
|
|
minutes, seconds = int(estimate // 60), int(estimate % 60)
|
|
|
|
|
|
|
|
rt_hours, runtime = int(runtime // 3600), runtime % 3600
|
|
|
|
rt_minutes, rt_seconds = int(runtime // 60), int(runtime % 60)
|
|
|
|
else:
|
2022-07-11 06:58:29 +12:00
|
|
|
estimate = 0
|
2022-06-26 06:48:25 +12:00
|
|
|
hours = minutes = seconds = 0
|
|
|
|
rt_hours = rt_minutes = rt_seconds = 0
|
|
|
|
|
2022-08-03 11:33:50 +12:00
|
|
|
# Rare: Disable up to INFO logging level for the segment below
|
2022-08-12 22:17:53 +12:00
|
|
|
log_level = self.log.level
|
2022-06-26 23:29:20 +12:00
|
|
|
self.log.setLevel(logging.ERROR)
|
2022-06-26 06:48:25 +12:00
|
|
|
self.log.info(f'= Progress: {perc:.02f}% ({processed_chunks}/{num_chunk_tasks}), '
|
|
|
|
f'Running for {rt_hours:02d}:{rt_minutes:02d}:{rt_seconds:02d}, '
|
|
|
|
f'ETA: {hours:02d}:{minutes:02d}:{seconds:02d}')
|
|
|
|
self.log.info(f' - Downloaded: {total_dl / 1024 / 1024:.02f} MiB, '
|
|
|
|
f'Written: {total_write / 1024 / 1024:.02f} MiB')
|
|
|
|
self.log.info(f' - Cache usage: {total_used:.02f} MiB, active tasks: {self.active_tasks}')
|
|
|
|
self.log.info(f' + Download\t- {dl_speed / 1024 / 1024:.02f} MiB/s (raw) '
|
|
|
|
f'/ {dl_unc_speed / 1024 / 1024:.02f} MiB/s (decompressed)')
|
|
|
|
self.log.info(f' + Disk\t- {w_speed / 1024 / 1024:.02f} MiB/s (write) / '
|
|
|
|
f'{r_speed / 1024 / 1024:.02f} MiB/s (read)')
|
2022-08-03 11:33:50 +12:00
|
|
|
# Rare: Restore previous logging level
|
2022-06-26 23:29:20 +12:00
|
|
|
self.log.setLevel(log_level)
|
2022-04-19 02:51:51 +12:00
|
|
|
|
|
|
|
# send status update to back to instantiator (if queue exists)
|
|
|
|
if self.status_queue:
|
|
|
|
try:
|
|
|
|
self.status_queue.put(UIUpdate(
|
2022-06-26 06:48:25 +12:00
|
|
|
progress=perc, download_speed=dl_unc_speed, write_speed=w_speed, read_speed=r_speed,
|
2022-04-19 02:51:51 +12:00
|
|
|
runtime=round(runtime),
|
|
|
|
estimated_time_left=round(estimate),
|
|
|
|
processed_chunks=processed_chunks,
|
|
|
|
chunk_tasks=num_chunk_tasks,
|
|
|
|
total_downloaded=total_dl,
|
|
|
|
total_written=total_write,
|
|
|
|
cache_usage=total_used,
|
|
|
|
active_tasks=self.active_tasks,
|
2022-06-26 06:48:25 +12:00
|
|
|
download_compressed_speed=dl_speed,
|
|
|
|
memory_usage=total_used * 1024 * 1024
|
2022-04-19 02:51:51 +12:00
|
|
|
), timeout=1.0)
|
|
|
|
except Exception as e:
|
|
|
|
self.log.warning(f'Failed to send status update to queue: {e!r}')
|
|
|
|
|
2022-08-03 11:33:50 +12:00
|
|
|
# Rare: queue of control signals
|
|
|
|
try:
|
|
|
|
signals: DLManagerSignals = self.signals_queue.get(timeout=0.5)
|
|
|
|
self.log.warning('Immediate stop requested!')
|
|
|
|
if signals.kill is True:
|
|
|
|
# lk: graceful but not what legendary does
|
|
|
|
self.running = False
|
|
|
|
# send conditions to unlock threads if they aren't already
|
|
|
|
for cond in self.conditions:
|
|
|
|
with cond:
|
|
|
|
cond.notify()
|
|
|
|
kill_request = True
|
|
|
|
break
|
|
|
|
# # lk: alternative way, but doesn't clean shm
|
|
|
|
# for i in range(self.max_workers):
|
|
|
|
# self.dl_worker_queue.put_nowait(TerminateWorkerTask())
|
|
|
|
#
|
|
|
|
# self.log.info('Waiting for installation to finish...')
|
|
|
|
# self.writer_queue.put_nowait(TerminateWorkerTask())
|
|
|
|
# raise KeyboardInterrupt
|
|
|
|
except queue.Empty:
|
|
|
|
pass
|
|
|
|
|
2022-04-19 02:51:51 +12:00
|
|
|
time.sleep(self.update_interval)
|
|
|
|
|
|
|
|
for i in range(self.max_workers):
|
|
|
|
self.dl_worker_queue.put_nowait(TerminateWorkerTask())
|
|
|
|
|
|
|
|
self.log.info('Waiting for installation to finish...')
|
|
|
|
self.writer_queue.put_nowait(TerminateWorkerTask())
|
|
|
|
|
|
|
|
writer_p.join(timeout=10.0)
|
|
|
|
if writer_p.exitcode is None:
|
|
|
|
self.log.warning(f'Terminating writer process, no exit code!')
|
|
|
|
writer_p.terminate()
|
|
|
|
|
|
|
|
# forcibly kill DL workers that are not actually dead yet
|
|
|
|
for child in self.children:
|
|
|
|
if child.exitcode is None:
|
|
|
|
child.terminate()
|
|
|
|
|
|
|
|
# make sure all the threads are dead.
|
|
|
|
for t in self.threads:
|
|
|
|
t.join(timeout=5.0)
|
|
|
|
if t.is_alive():
|
|
|
|
self.log.warning(f'Thread did not terminate! {repr(t)}')
|
|
|
|
|
|
|
|
# clean up resume file
|
2022-08-03 11:33:50 +12:00
|
|
|
if self.resume_file and not kill_request:
|
2022-04-19 02:51:51 +12:00
|
|
|
try:
|
|
|
|
os.remove(self.resume_file)
|
|
|
|
except OSError as e:
|
|
|
|
self.log.warning(f'Failed to remove resume file: {e!r}')
|
|
|
|
|
|
|
|
# close up shared memory
|
|
|
|
self.shared_memory.close()
|
|
|
|
self.shared_memory.unlink()
|
|
|
|
self.shared_memory = None
|
|
|
|
|
|
|
|
self.log.info('All done! Download manager quitting...')
|
|
|
|
# finally, exit the process.
|
|
|
|
exit(0)
|
2022-08-03 11:33:50 +12:00
|
|
|
|
|
|
|
# fmt: on
|