diff --git a/legendary/cli.py b/legendary/cli.py index ed2a0ba..4537e77 100644 --- a/legendary/cli.py +++ b/legendary/cli.py @@ -3,32 +3,41 @@ import argparse import logging -import multiprocessing import os import shlex import subprocess import time import webbrowser +from logging.handlers import QueueHandler, QueueListener +from multiprocessing import freeze_support, Queue as MPQueue from sys import exit from legendary.core import LegendaryCore from legendary.models.exceptions import InvalidCredentialsError +# todo custom formatter for cli logger (clean info, highlighted error/warning) logging.basicConfig( - format='[%(asctime)s] [%(name)s] %(levelname)s: %(message)s', + format='[%(name)s] %(levelname)s: %(message)s', level=logging.INFO ) logger = logging.getLogger('cli') -# todo logger with QueueHandler/QueueListener -# todo custom formatter for cli logger (clean info, highlighted error/warning) - class LegendaryCLI: def __init__(self): self.core = LegendaryCore() self.logger = logging.getLogger('cli') + self.logging_queue = None + + def setup_threaded_logging(self): + self.logging_queue = MPQueue(-1) + shandler = logging.StreamHandler() + sformatter = logging.Formatter('[%(asctime)s] [%(name)s] %(levelname)s: %(message)s') + shandler.setFormatter(sformatter) + ql = QueueListener(self.logging_queue, shandler) + ql.start() + return ql def auth(self, args): try: @@ -223,6 +232,10 @@ class LegendaryCLI: start_t = time.time() try: + # set up logging stuff (should be moved somewhere else later) + dlm.logging_queue = self.logging_queue + dlm.proc_debug = args.dlm_debug + dlm.start() dlm.join() except Exception as e: @@ -243,7 +256,7 @@ class LegendaryCLI: print(f' - {dlc.app_title} (App name: {dlc.app_name}, version: {dlc.app_version})') # todo recursively call install with modified args to install DLC automatically (after confirm) print('Installing DLCs works the same as the main game, just use the DLC app name instead.') - print('Automatic installation of DLC is currently not supported.') + print('(Automatic installation of DLC is currently not supported.)') if postinstall: self._handle_postinstall(postinstall, igame, yes=args.yes) @@ -354,6 +367,8 @@ def main(): help='Do not mark game as intalled and do not run prereq installers after download.') install_parser.add_argument('--update-only', dest='update_pnly', action='store_true', help='Abort if game is not already installed (for automation)') + install_parser.add_argument('--dlm-debug', dest='dlm_debug', action='store_true', + help='Set download manager and worker processes\' loglevel to debug') launch_parser.add_argument('--offline', dest='offline', action='store_true', default=False, help='Skip login and launch game without online authentication') @@ -381,6 +396,7 @@ def main(): return cli = LegendaryCLI() + ql = cli.setup_threaded_logging() config_ll = cli.core.lgd.config.get('Legendary', 'log_level', fallback='info') if config_ll == 'debug' or args.debug: @@ -391,23 +407,28 @@ def main(): # technically args.func() with setdefaults could work (see docs on subparsers) # but that would require all funcs to accept args and extra... - if args.subparser_name == 'auth': - cli.auth(args) - elif args.subparser_name == 'list-games': - cli.list_games() - elif args.subparser_name == 'list-installed': - cli.list_installed(args) - elif args.subparser_name == 'launch': - cli.launch_game(args, extra) - elif args.subparser_name == 'download': - cli.install_game(args) - elif args.subparser_name == 'uninstall': - cli.uninstall_game(args) + try: + if args.subparser_name == 'auth': + cli.auth(args) + elif args.subparser_name == 'list-games': + cli.list_games() + elif args.subparser_name == 'list-installed': + cli.list_installed(args) + elif args.subparser_name == 'launch': + cli.launch_game(args, extra) + elif args.subparser_name == 'download': + cli.install_game(args) + elif args.subparser_name == 'uninstall': + cli.uninstall_game(args) + except KeyboardInterrupt: + logger.info('Command was aborted via KeyboardInterrupt, cleaning up...') cli.core.exit() + ql.stop() exit(0) if __name__ == '__main__': - multiprocessing.freeze_support() # required for pyinstaller + # required for pyinstaller on Windows, does nothing on other platforms. + freeze_support() main() diff --git a/legendary/downloader/manager.py b/legendary/downloader/manager.py index e787409..def5dff 100644 --- a/legendary/downloader/manager.py +++ b/legendary/downloader/manager.py @@ -8,6 +8,7 @@ import os import time from collections import Counter, defaultdict, deque +from logging.handlers import QueueHandler from multiprocessing import cpu_count, Process, Queue as MPQueue from multiprocessing.shared_memory import SharedMemory from queue import Empty @@ -24,14 +25,15 @@ class DLManager(Process): max_jobs=100, max_failures=5, max_workers=0, update_interval=1.0, max_shared_memory=1024 * 1024 * 1024, resume_file=None): super().__init__(name='DLManager') - self.log = logging.getLogger('DLManager') - self.log_level = self.log.level + self.log = logging.getLogger('DLM') + self.proc_debug = False self.base_url = base_url self.dl_dir = download_dir self.cache_dir = cache_dir if cache_dir else os.path.join(download_dir, '.cache') # All the queues! + self.logging_queue = None self.dl_worker_queue = None self.writer_queue = None self.dl_result_q = None @@ -63,6 +65,8 @@ class DLManager(Process): self.running = True self.active_tasks = 0 self.children = [] + self.threads = [] + self.conditions = [] # bytes downloaded and decompressed since last report self.bytes_downloaded_since_last = 0 self.bytes_decompressed_since_last = 0 @@ -451,19 +455,43 @@ class DLManager(Process): if not self.analysis: raise ValueError('Did not run analysis before trying to run download!') - # fix loglevel in subprocess - self.log.setLevel(self.log_level) + # Subprocess will use its own root logger that logs to a Queue instead + _root = logging.getLogger() + _root.setLevel(logging.DEBUG if self.proc_debug else logging.INFO) + if self.logging_queue: + _root.handlers = [] + _root.addHandler(QueueHandler(self.logging_queue)) + + self.log = logging.getLogger('DLMProc') + self.log.info(f'Download Manager running with process-id: {os.getpid()}') try: self.run_real() except KeyboardInterrupt: self.log.warning('Immediate exit requested!') self.running = False - for proc in self.children: + + # send conditions to unlock threads if they aren't already + for cond in self.conditions: + with cond: + cond.notify() + + # make sure threads are dead. + for t in self.threads: + t.join(timeout=5.0) + if t.is_alive(): + self.log.warning(f'Thread did not terminate! {repr(t)}') + + # clean up all the queues, otherwise this process won't terminate properly + for name, q in zip(('Download jobs', 'Writer jobs', 'Download results', 'Writer results'), + (self.dl_worker_queue, self.writer_queue, self.dl_result_q, self.writer_result_q)): + self.log.debug(f'Cleaning up queue "{name}"') try: - proc.terminate() - except Exception as e: - print(f'Terminating process {repr(proc)} failed: {e!r}') + while True: + _ = q.get_nowait() + except Empty: + q.close() + q.join_thread() def run_real(self): self.shared_memory = SharedMemory(create=True, size=self.max_shared_memory) @@ -478,21 +506,22 @@ class DLManager(Process): self.log.debug(f'Created {len(self.sms)} shared memory segments.') # Create queues - self.dl_worker_queue = MPQueue() - self.writer_queue = MPQueue() - self.dl_result_q = MPQueue() - self.writer_result_q = MPQueue() + self.dl_worker_queue = MPQueue(-1) + self.writer_queue = MPQueue(-1) + self.dl_result_q = MPQueue(-1) + self.writer_result_q = MPQueue(-1) self.log.info(f'Starting download workers...') for i in range(self.max_workers): - w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, - self.dl_result_q, self.shared_memory.name) + w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q, + self.shared_memory.name, logging_queue=self.logging_queue) self.children.append(w) w.start() self.log.info('Starting file writing worker...') writer_p = FileWorker(self.writer_queue, self.writer_result_q, self.dl_dir, - self.shared_memory.name, self.cache_dir) + self.shared_memory.name, self.cache_dir, self.logging_queue) + self.children.append(writer_p) writer_p.start() num_chunk_tasks = sum(isinstance(t, ChunkTask) for t in self.tasks) @@ -511,14 +540,15 @@ class DLManager(Process): # synchronization conditions shm_cond = Condition() task_cond = Condition() + self.conditions = [shm_cond, task_cond] # start threads s_time = time.time() - dlj_e = Thread(target=self.download_job_manager, args=(task_cond, shm_cond)) - dlr_e = Thread(target=self.dl_results_handler, args=(task_cond,)) - fwr_e = Thread(target=self.fw_results_handler, args=(shm_cond,)) + self.threads.append(Thread(target=self.download_job_manager, args=(task_cond, shm_cond))) + self.threads.append(Thread(target=self.dl_results_handler, args=(task_cond,))) + self.threads.append(Thread(target=self.fw_results_handler, args=(shm_cond,))) - for t in (dlj_e, dlr_e, fwr_e): + for t in self.threads: t.start() last_update = time.time() @@ -597,7 +627,7 @@ class DLManager(Process): child.terminate() # make sure all the threads are dead. - for t in (dlj_e, dlr_e, fwr_e): + for t in self.threads: t.join(timeout=5.0) if t.is_alive(): self.log.warning(f'Thread did not terminate! {repr(t)}') diff --git a/legendary/downloader/workers.py b/legendary/downloader/workers.py index 4cb5413..3a8f137 100644 --- a/legendary/downloader/workers.py +++ b/legendary/downloader/workers.py @@ -6,6 +6,7 @@ import requests import time import logging +from logging.handlers import QueueHandler from multiprocessing import Process from multiprocessing.shared_memory import SharedMemory from queue import Empty @@ -15,7 +16,7 @@ from legendary.models.downloading import DownloaderTaskResult, WriterTaskResult class DLWorker(Process): - def __init__(self, name, queue, out_queue, shm, max_retries=5): + def __init__(self, name, queue, out_queue, shm, max_retries=5, logging_queue=None): super().__init__(name=name) self.q = queue self.o_q = out_queue @@ -25,9 +26,19 @@ class DLWorker(Process): }) self.max_retries = max_retries self.shm = SharedMemory(name=shm) - self.log = logging.getLogger('DLWorker') + self.log_level = logging.getLogger().level + self.logging_queue = logging_queue def run(self): + # we have to fix up the logger before we can start + _root = logging.getLogger() + _root.handlers = [] + _root.addHandler(QueueHandler(self.logging_queue)) + + logger = logging.getLogger(self.name) + logger.setLevel(self.log_level) + logger.debug(f'Download worker reporting for duty!') + empty = False while True: try: @@ -35,12 +46,12 @@ class DLWorker(Process): empty = False except Empty: if not empty: - self.log.debug(f'[{self.name}] Queue Empty, waiting for more...') + logger.debug(f'[{self.name}] Queue Empty, waiting for more...') empty = True continue if job.kill: # let worker die - self.log.info(f'[{self.name}] Worker received kill signal, shutting down...') + logger.debug(f'[{self.name}] Worker received kill signal, shutting down...') break tries = 0 @@ -51,19 +62,19 @@ class DLWorker(Process): try: while tries < self.max_retries: # print('Downloading', job.url) - self.log.debug(f'[{self.name}] Downloading {job.url}') + logger.debug(f'[{self.name}] Downloading {job.url}') dl_start = time.time() try: r = self.session.get(job.url, timeout=5.0) r.raise_for_status() except Exception as e: - self.log.warning(f'[{self.name}] Chunk download failed ({e!r}), retrying...') + logger.warning(f'[{self.name}] Chunk download failed ({e!r}), retrying...') continue dl_end = time.time() if r.status_code != 200: - self.log.warning(f'[{self.name}] Chunk download failed (Status {r.status_code}), retrying...') + logger.warning(f'[{self.name}] Chunk download failed (Status {r.status_code}), retrying...') continue else: compressed = len(r.content) @@ -72,12 +83,12 @@ class DLWorker(Process): else: raise TimeoutError('Max retries reached') except Exception as e: - self.log.error(f'[{self.name}] Job failed with: {e!r}, fetching next one...') + logger.error(f'[{self.name}] Job failed with: {e!r}, fetching next one...') # add failed job to result queue to be requeued self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) if not chunk: - self.log.warning(f'[{self.name}] Chunk smoehow None?') + logger.warning(f'[{self.name}] Chunk smoehow None?') self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) continue @@ -85,7 +96,7 @@ class DLWorker(Process): try: size = len(chunk.data) if size > job.shm.size: - self.log.fatal(f'Downloaded chunk is longer than SharedMemorySegment!') + logger.fatal(f'Downloaded chunk is longer than SharedMemorySegment!') self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data) del chunk @@ -93,7 +104,7 @@ class DLWorker(Process): url=job.url, size=size, compressed_size=compressed, time_delta=dl_end - dl_start)) except Exception as e: - self.log.warning(f'[{self.name}] Job failed with: {e!r}, fetching next one...') + logger.warning(f'[{self.name}] Job failed with: {e!r}, fetching next one...') self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) continue @@ -101,16 +112,26 @@ class DLWorker(Process): class FileWorker(Process): - def __init__(self, queue, out_queue, base_path, shm, cache_path=None): - super().__init__(name='File worker') + def __init__(self, queue, out_queue, base_path, shm, cache_path=None, logging_queue=None): + super().__init__(name='FileWorker') self.q = queue self.o_q = out_queue self.base_path = base_path self.cache_path = cache_path if cache_path else os.path.join(base_path, '.cache') self.shm = SharedMemory(name=shm) - self.log = logging.getLogger('DLWorker') + self.log_level = logging.getLogger().level + self.logging_queue = logging_queue def run(self): + # we have to fix up the logger before we can start + _root = logging.getLogger() + _root.handlers = [] + _root.addHandler(QueueHandler(self.logging_queue)) + + logger = logging.getLogger(self.name) + logger.setLevel(self.log_level) + logger.debug(f'Download worker reporting for duty!') + last_filename = '' current_file = None @@ -119,7 +140,7 @@ class FileWorker(Process): try: j = self.q.get(timeout=10.0) except Empty: - self.log.warning('Writer queue empty!') + logger.warning('Writer queue empty!') continue if j.kill: @@ -141,7 +162,7 @@ class FileWorker(Process): continue elif j.open: if current_file: - self.log.warning(f'Opening new file {j.filename} without closing previous! {last_filename}') + logger.warning(f'Opening new file {j.filename} without closing previous! {last_filename}') current_file.close() current_file = open(full_path, 'wb') @@ -154,27 +175,27 @@ class FileWorker(Process): current_file.close() current_file = None else: - self.log.warning(f'Asking to close file that is not open: {j.filename}') + logger.warning(f'Asking to close file that is not open: {j.filename}') self.o_q.put(WriterTaskResult(success=True, filename=j.filename, closed=True)) continue elif j.rename: if current_file: - self.log.warning('Trying to rename file without closing first!') + logger.warning('Trying to rename file without closing first!') current_file.close() current_file = None if j.delete: try: os.remove(full_path) except OSError as e: - self.log.error(f'Removing file failed: {e!r}') + logger.error(f'Removing file failed: {e!r}') self.o_q.put(WriterTaskResult(success=False, filename=j.filename)) continue try: os.rename(os.path.join(self.base_path, j.old_filename), full_path) except OSError as e: - self.log.error(f'Renaming file failed: {e!r}') + logger.error(f'Renaming file failed: {e!r}') self.o_q.put(WriterTaskResult(success=False, filename=j.filename)) continue @@ -182,14 +203,14 @@ class FileWorker(Process): continue elif j.delete: if current_file: - self.log.warning('Trying to delete file without closing first!') + logger.warning('Trying to delete file without closing first!') current_file.close() current_file = None try: os.remove(full_path) except OSError as e: - self.log.error(f'Removing file failed: {e!r}') + logger.error(f'Removing file failed: {e!r}') self.o_q.put(WriterTaskResult(success=True, filename=j.filename)) continue @@ -218,7 +239,7 @@ class FileWorker(Process): current_file.write(f.read(j.chunk_size)) post_write = time.time() except Exception as e: - self.log.warning(f'Something in writing a file failed: {e!r}') + logger.warning(f'Something in writing a file failed: {e!r}') self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid, release_memory=j.release_memory, @@ -231,7 +252,7 @@ class FileWorker(Process): shm=j.shm, size=j.chunk_size, time_delta=post_write-pre_write)) except Exception as e: - self.log.warning(f'[{self.name}] Job {j.filename} failed with: {e!r}, fetching next one...') + logger.warning(f'[{self.name}] Job {j.filename} failed with: {e!r}, fetching next one...') self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid)) try: @@ -239,7 +260,7 @@ class FileWorker(Process): current_file.close() current_file = None except Exception as e: - self.log.error(f'[{self.name}] Closing file after error failed: {e!r}') + logger.error(f'[{self.name}] Closing file after error failed: {e!r}') except KeyboardInterrupt: if current_file: current_file.close()