mirror of
https://github.com/derrod/legendary.git
synced 2024-06-28 19:20:28 +12:00
9131f32c22
This increases peek download speed from about 850MB/s to 960MB/s on my computer. https://github.com/derrod/legendary/issues/620
306 lines
12 KiB
Python
306 lines
12 KiB
Python
# coding: utf-8
|
|
|
|
import os
|
|
import time
|
|
import logging
|
|
|
|
from logging.handlers import QueueHandler
|
|
from multiprocessing import Process
|
|
from multiprocessing.shared_memory import SharedMemory
|
|
from queue import Empty
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
|
|
|
|
from legendary.models.chunk import Chunk
|
|
from legendary.models.downloading import (
|
|
DownloaderTask, DownloaderTaskResult,
|
|
WriterTask, WriterTaskResult,
|
|
TerminateWorkerTask, TaskFlags
|
|
)
|
|
|
|
|
|
class BindingHTTPAdapter(HTTPAdapter):
|
|
def __init__(self, addr):
|
|
self.__attrs__.append('addr')
|
|
self.addr = addr
|
|
super().__init__()
|
|
|
|
def init_poolmanager(
|
|
self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs
|
|
):
|
|
pool_kwargs['source_address'] = (self.addr, 0)
|
|
super().init_poolmanager(connections, maxsize, block, **pool_kwargs)
|
|
|
|
|
|
class DLWorker(Process):
|
|
def __init__(self, name, queue, out_queue, shm, max_retries=7,
|
|
logging_queue=None, dl_timeout=10, bind_addr=None):
|
|
super().__init__(name=name)
|
|
self.q = queue
|
|
self.o_q = out_queue
|
|
self.session = requests.session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'EpicGamesLauncher/11.0.1-14907503+++Portal+Release-Live Windows/10.0.19041.1.256.64bit'
|
|
})
|
|
self.max_retries = max_retries
|
|
self.shm = SharedMemory(name=shm)
|
|
self.log_level = logging.getLogger().level
|
|
self.logging_queue = logging_queue
|
|
self.dl_timeout = float(dl_timeout) if dl_timeout else 10.0
|
|
|
|
# optionally bind an address
|
|
if bind_addr:
|
|
adapter = BindingHTTPAdapter(bind_addr)
|
|
self.session.mount('https://', adapter)
|
|
self.session.mount('http://', adapter)
|
|
|
|
def run(self):
|
|
# we have to fix up the logger before we can start
|
|
_root = logging.getLogger()
|
|
_root.handlers = []
|
|
_root.addHandler(QueueHandler(self.logging_queue))
|
|
|
|
logger = logging.getLogger(self.name)
|
|
logger.setLevel(self.log_level)
|
|
logger.debug(f'Download worker reporting for duty!')
|
|
|
|
empty = False
|
|
while True:
|
|
try:
|
|
job: DownloaderTask = self.q.get(timeout=10.0)
|
|
empty = False
|
|
except Empty:
|
|
if not empty:
|
|
logger.debug('Queue Empty, waiting for more...')
|
|
empty = True
|
|
continue
|
|
|
|
if isinstance(job, TerminateWorkerTask): # let worker die
|
|
logger.debug('Worker received termination signal, shutting down...')
|
|
break
|
|
|
|
tries = 0
|
|
compressed = 0
|
|
chunk = None
|
|
|
|
try:
|
|
while tries < self.max_retries:
|
|
# retry once immediately, otherwise do exponential backoff
|
|
if tries > 1:
|
|
sleep_time = 2**(tries-1)
|
|
logger.info(f'Sleeping {sleep_time} seconds before retrying.')
|
|
time.sleep(sleep_time)
|
|
|
|
# print('Downloading', job.url)
|
|
logger.debug(f'Downloading {job.url}')
|
|
|
|
try:
|
|
r = self.session.get(job.url, timeout=self.dl_timeout)
|
|
r.raise_for_status()
|
|
except Exception as e:
|
|
logger.warning(f'Chunk download for {job.chunk_guid} failed: ({e!r}), retrying...')
|
|
continue
|
|
|
|
if r.status_code != 200:
|
|
logger.warning(f'Chunk download for {job.chunk_guid} failed: status {r.status_code}, retrying...')
|
|
continue
|
|
else:
|
|
compressed = len(r.content)
|
|
chunk = Chunk.read_buffer(r.content)
|
|
break
|
|
else:
|
|
raise TimeoutError('Max retries reached')
|
|
except Exception as e:
|
|
logger.error(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...')
|
|
# add failed job to result queue to be requeued
|
|
self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__))
|
|
except KeyboardInterrupt:
|
|
logger.warning('Immediate exit requested, quitting...')
|
|
break
|
|
|
|
if not chunk:
|
|
logger.warning('Chunk somehow None?')
|
|
self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__))
|
|
continue
|
|
|
|
# decompress stuff
|
|
try:
|
|
data = chunk.data
|
|
size = len(data)
|
|
if size > job.shm.size:
|
|
logger.fatal('Downloaded chunk is longer than SharedMemorySegment!')
|
|
|
|
self.shm.buf[job.shm.offset:job.shm.offset + size] = data
|
|
del chunk
|
|
self.o_q.put(DownloaderTaskResult(success=True, size_decompressed=size,
|
|
size_downloaded=compressed, **job.__dict__))
|
|
except Exception as e:
|
|
logger.warning(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...')
|
|
self.o_q.put(DownloaderTaskResult(success=False, **job.__dict__))
|
|
continue
|
|
except KeyboardInterrupt:
|
|
logger.warning('Immediate exit requested, quitting...')
|
|
break
|
|
|
|
self.shm.close()
|
|
|
|
|
|
class FileWorker(Process):
|
|
def __init__(self, queue, out_queue, base_path, shm, cache_path=None, logging_queue=None):
|
|
super().__init__(name='FileWorker')
|
|
self.q = queue
|
|
self.o_q = out_queue
|
|
self.base_path = base_path
|
|
self.cache_path = cache_path or os.path.join(base_path, '.cache')
|
|
self.shm = SharedMemory(name=shm)
|
|
self.log_level = logging.getLogger().level
|
|
self.logging_queue = logging_queue
|
|
|
|
def run(self):
|
|
# we have to fix up the logger before we can start
|
|
_root = logging.getLogger()
|
|
_root.handlers = []
|
|
_root.addHandler(QueueHandler(self.logging_queue))
|
|
|
|
logger = logging.getLogger(self.name)
|
|
logger.setLevel(self.log_level)
|
|
logger.debug('Download worker reporting for duty!')
|
|
|
|
last_filename = ''
|
|
current_file = None
|
|
|
|
while True:
|
|
try:
|
|
try:
|
|
j: WriterTask = self.q.get(timeout=10.0)
|
|
except Empty:
|
|
logger.warning('Writer queue empty!')
|
|
continue
|
|
|
|
if isinstance(j, TerminateWorkerTask):
|
|
if current_file:
|
|
current_file.close()
|
|
logger.debug('Worker received termination signal, shutting down...')
|
|
# send termination task to results halnder as well
|
|
self.o_q.put(TerminateWorkerTask())
|
|
break
|
|
|
|
# make directories if required
|
|
path = os.path.split(j.filename)[0]
|
|
if not os.path.exists(os.path.join(self.base_path, path)):
|
|
os.makedirs(os.path.join(self.base_path, path))
|
|
|
|
full_path = os.path.join(self.base_path, j.filename)
|
|
|
|
if j.flags & TaskFlags.CREATE_EMPTY_FILE: # just create an empty file
|
|
open(full_path, 'a').close()
|
|
self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
|
|
continue
|
|
elif j.flags & TaskFlags.OPEN_FILE:
|
|
if current_file:
|
|
logger.warning(f'Opening new file {j.filename} without closing previous! {last_filename}')
|
|
current_file.close()
|
|
|
|
current_file = open(full_path, 'wb')
|
|
last_filename = j.filename
|
|
|
|
self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
|
|
continue
|
|
elif j.flags & TaskFlags.CLOSE_FILE:
|
|
if current_file:
|
|
current_file.close()
|
|
current_file = None
|
|
else:
|
|
logger.warning(f'Asking to close file that is not open: {j.filename}')
|
|
|
|
self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
|
|
continue
|
|
elif j.flags & TaskFlags.RENAME_FILE:
|
|
if current_file:
|
|
logger.warning('Trying to rename file without closing first!')
|
|
current_file.close()
|
|
current_file = None
|
|
if j.flags & TaskFlags.DELETE_FILE:
|
|
try:
|
|
os.remove(full_path)
|
|
except OSError as e:
|
|
logger.error(f'Removing file failed: {e!r}')
|
|
self.o_q.put(WriterTaskResult(success=False, **j.__dict__))
|
|
continue
|
|
|
|
try:
|
|
os.rename(os.path.join(self.base_path, j.old_file), full_path)
|
|
except OSError as e:
|
|
logger.error(f'Renaming file failed: {e!r}')
|
|
self.o_q.put(WriterTaskResult(success=False, **j.__dict__))
|
|
continue
|
|
|
|
self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
|
|
continue
|
|
elif j.flags & TaskFlags.DELETE_FILE:
|
|
if current_file:
|
|
logger.warning('Trying to delete file without closing first!')
|
|
current_file.close()
|
|
current_file = None
|
|
|
|
try:
|
|
os.remove(full_path)
|
|
except OSError as e:
|
|
if not j.flags & TaskFlags.SILENT:
|
|
logger.error(f'Removing file failed: {e!r}')
|
|
|
|
self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
|
|
continue
|
|
elif j.flags & TaskFlags.MAKE_EXECUTABLE:
|
|
if current_file:
|
|
logger.warning('Trying to chmod file without closing first!')
|
|
current_file.close()
|
|
current_file = None
|
|
|
|
try:
|
|
st = os.stat(full_path)
|
|
os.chmod(full_path, st.st_mode | 0o111)
|
|
except OSError as e:
|
|
if not j.flags & TaskFlags.SILENT:
|
|
logger.error(f'chmod\'ing file failed: {e!r}')
|
|
|
|
self.o_q.put(WriterTaskResult(success=True, **j.__dict__))
|
|
continue
|
|
|
|
try:
|
|
if j.shared_memory:
|
|
shm_offset = j.shared_memory.offset + j.chunk_offset
|
|
shm_end = shm_offset + j.chunk_size
|
|
current_file.write(self.shm.buf[shm_offset:shm_end])
|
|
elif j.cache_file:
|
|
with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f:
|
|
if j.chunk_offset:
|
|
f.seek(j.chunk_offset)
|
|
current_file.write(f.read(j.chunk_size))
|
|
elif j.old_file:
|
|
with open(os.path.join(self.base_path, j.old_file), 'rb') as f:
|
|
if j.chunk_offset:
|
|
f.seek(j.chunk_offset)
|
|
current_file.write(f.read(j.chunk_size))
|
|
except Exception as e:
|
|
logger.warning(f'Something in writing a file failed: {e!r}')
|
|
self.o_q.put(WriterTaskResult(success=False, size=j.chunk_size, **j.__dict__))
|
|
else:
|
|
self.o_q.put(WriterTaskResult(success=True, size=j.chunk_size, **j.__dict__))
|
|
except Exception as e:
|
|
logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...')
|
|
self.o_q.put(WriterTaskResult(success=False, **j.__dict__))
|
|
|
|
try:
|
|
if current_file:
|
|
current_file.close()
|
|
current_file = None
|
|
except Exception as e:
|
|
logger.error(f'Closing file after error failed: {e!r}')
|
|
except KeyboardInterrupt:
|
|
logger.warning('Immediate exit requested, quitting...')
|
|
if current_file:
|
|
current_file.close()
|
|
return
|