From 450784283dd49152dda6322db2fb2ef33e7c382e Mon Sep 17 00:00:00 2001 From: derrod Date: Sat, 14 Oct 2023 14:20:17 +0200 Subject: [PATCH] [cli/core/downloader] Add option to bind to IP(s) --- legendary/cli.py | 5 ++++- legendary/core.py | 4 ++-- legendary/downloader/mp/manager.py | 12 ++++++++++-- legendary/downloader/mp/workers.py | 25 +++++++++++++++++++++++-- 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/legendary/cli.py b/legendary/cli.py index 71764ca..58d3ad0 100644 --- a/legendary/cli.py +++ b/legendary/cli.py @@ -967,7 +967,8 @@ class LegendaryCLI: disable_delta=args.disable_delta, override_delta_manifest=args.override_delta_manifest, preferred_cdn=args.preferred_cdn, - disable_https=args.disable_https) + disable_https=args.disable_https, + bind_ip=args.bind_ip) # game is either up-to-date or hasn't changed, so we have nothing to do if not analysis.dl_size: @@ -2782,6 +2783,8 @@ def main(): help='Automatically install all DLCs with the base game') install_parser.add_argument('--skip-dlcs', dest='skip_dlcs', action='store_true', help='Do not ask about installing DLCs.') + install_parser.add_argument('--bind', dest='bind_ip', action='store', metavar='', type=str, + help='Comma-separated list of IPs to bind to for downloading') uninstall_parser.add_argument('--keep-files', dest='keep_files', action='store_true', help='Keep files but remove game from Legendary database') diff --git a/legendary/core.py b/legendary/core.py index 5090c36..c6442d2 100644 --- a/legendary/core.py +++ b/legendary/core.py @@ -1294,7 +1294,7 @@ class LegendaryCore: repair: bool = False, repair_use_latest: bool = False, disable_delta: bool = False, override_delta_manifest: str = '', egl_guid: str = '', preferred_cdn: str = None, - disable_https: bool = False) -> (DLManager, AnalysisResult, ManifestMeta): + disable_https: bool = False, bind_ip: str = None) -> (DLManager, AnalysisResult, ManifestMeta): # load old manifest old_manifest = None @@ -1461,7 +1461,7 @@ class LegendaryCore: dlm = DLManager(install_path, base_url, resume_file=resume_file, status_q=status_q, max_shared_memory=max_shm * 1024 * 1024, max_workers=max_workers, - dl_timeout=dl_timeout) + dl_timeout=dl_timeout, bind_ip=bind_ip) anlres = dlm.run_analysis(manifest=new_manifest, old_manifest=old_manifest, patch=not disable_patching, resume=not force, file_prefix_filter=file_prefix_filter, diff --git a/legendary/downloader/mp/manager.py b/legendary/downloader/mp/manager.py index a60cb08..90ab37a 100644 --- a/legendary/downloader/mp/manager.py +++ b/legendary/downloader/mp/manager.py @@ -22,7 +22,7 @@ from legendary.models.manifest import ManifestComparison, Manifest class DLManager(Process): def __init__(self, download_dir, base_url, cache_dir=None, status_q=None, max_workers=0, update_interval=1.0, dl_timeout=10, resume_file=None, - max_shared_memory=1024 * 1024 * 1024): + max_shared_memory=1024 * 1024 * 1024, bind_ip=None): super().__init__(name='DLManager') self.log = logging.getLogger('DLM') self.proc_debug = False @@ -37,8 +37,11 @@ class DLManager(Process): self.writer_queue = None self.dl_result_q = None self.writer_result_q = None + + # Worker stuff self.max_workers = max_workers or min(cpu_count() * 2, 16) self.dl_timeout = dl_timeout + self.bind_ips = [] if not bind_ip else bind_ip.split(',') # Analysis stuff self.analysis = None @@ -655,10 +658,15 @@ class DLManager(Process): self.writer_result_q = MPQueue(-1) self.log.info(f'Starting download workers...') + + bind_ip = None for i in range(self.max_workers): + if self.bind_ips: + bind_ip = self.bind_ips[i % len(self.bind_ips)] + w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q, self.shared_memory.name, logging_queue=self.logging_queue, - dl_timeout=self.dl_timeout) + dl_timeout=self.dl_timeout, bind_addr=bind_ip) self.children.append(w) w.start() diff --git a/legendary/downloader/mp/workers.py b/legendary/downloader/mp/workers.py index e16cd96..27cb0bf 100644 --- a/legendary/downloader/mp/workers.py +++ b/legendary/downloader/mp/workers.py @@ -1,7 +1,6 @@ # coding: utf-8 import os -import requests import time import logging @@ -10,6 +9,9 @@ from multiprocessing import Process from multiprocessing.shared_memory import SharedMemory from queue import Empty +import requests +from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK + from legendary.models.chunk import Chunk from legendary.models.downloading import ( DownloaderTask, DownloaderTaskResult, @@ -18,9 +20,22 @@ from legendary.models.downloading import ( ) +class BindingHTTPAdapter(HTTPAdapter): + def __init__(self, addr): + self.__attrs__.append('addr') + self.addr = addr + super().__init__() + + def init_poolmanager( + self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs + ): + pool_kwargs['source_address'] = (self.addr, 0) + super().init_poolmanager(connections, maxsize, block, **pool_kwargs) + + class DLWorker(Process): def __init__(self, name, queue, out_queue, shm, max_retries=7, - logging_queue=None, dl_timeout=10): + logging_queue=None, dl_timeout=10, bind_addr=None): super().__init__(name=name) self.q = queue self.o_q = out_queue @@ -34,6 +49,12 @@ class DLWorker(Process): self.logging_queue = logging_queue self.dl_timeout = float(dl_timeout) if dl_timeout else 10.0 + # optionally bind an address + if bind_addr: + adapter = BindingHTTPAdapter(bind_addr) + self.session.mount('https://', adapter) + self.session.mount('http://', adapter) + def run(self): # we have to fix up the logger before we can start _root = logging.getLogger()