1
0
Fork 0
mirror of synced 2024-06-02 10:44:40 +12:00
Rare/custom_legendary/downloader/workers.py
2021-03-18 12:45:59 +01:00

277 lines
12 KiB
Python

# coding: utf-8
import logging
import os
import time
from logging.handlers import QueueHandler
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
from queue import Empty
import requests
from custom_legendary.models.chunk import Chunk
from custom_legendary.models.downloading import DownloaderTaskResult, WriterTaskResult
class DLWorker(Process):
def __init__(self, name, queue, out_queue, shm, max_retries=5,
logging_queue=None, dl_timeout=10):
super().__init__(name=name)
self.q = queue
self.o_q = out_queue
self.session = requests.session()
self.session.headers.update({
'User-Agent': 'EpicGamesLauncher/11.0.1-14907503+++Portal+Release-Live Windows/10.0.19041.1.256.64bit'
})
self.max_retries = max_retries
self.shm = SharedMemory(name=shm)
self.log_level = logging.getLogger().level
self.logging_queue = logging_queue
self.dl_timeout = float(dl_timeout) if dl_timeout else 10.0
def run(self):
# we have to fix up the logger before we can start
_root = logging.getLogger()
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
logger = logging.getLogger(self.name)
logger.setLevel(self.log_level)
logger.debug(f'Download worker reporting for duty!')
empty = False
while True:
try:
job = self.q.get(timeout=10.0)
empty = False
except Empty:
if not empty:
logger.debug(f'Queue Empty, waiting for more...')
empty = True
continue
if job.kill: # let worker die
logger.debug(f'Worker received kill signal, shutting down...')
break
tries = 0
dl_start = dl_end = 0
compressed = 0
chunk = None
try:
while tries < self.max_retries:
# print('Downloading', job.url)
logger.debug(f'Downloading {job.url}')
dl_start = time.time()
try:
r = self.session.get(job.url, timeout=self.dl_timeout)
r.raise_for_status()
except Exception as e:
logger.warning(f'Chunk download for {job.guid} failed: ({e!r}), retrying...')
continue
dl_end = time.time()
if r.status_code != 200:
logger.warning(f'Chunk download for {job.guid} failed: status {r.status_code}, retrying...')
continue
else:
compressed = len(r.content)
chunk = Chunk.read_buffer(r.content)
break
else:
raise TimeoutError('Max retries reached')
except Exception as e:
logger.error(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
# add failed job to result queue to be requeued
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...')
break
if not chunk:
logger.warning(f'Chunk somehow None?')
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
continue
# decompress stuff
try:
size = len(chunk.data)
if size > job.shm.size:
logger.fatal(f'Downloaded chunk is longer than SharedMemorySegment!')
self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data)
del chunk
self.o_q.put(DownloaderTaskResult(success=True, chunk_guid=job.guid, shm=job.shm,
url=job.url, size=size, compressed_size=compressed,
time_delta=dl_end - dl_start))
except Exception as e:
logger.warning(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
continue
except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...')
break
self.shm.close()
class FileWorker(Process):
def __init__(self, queue, out_queue, base_path, shm, cache_path=None, logging_queue=None):
super().__init__(name='FileWorker')
self.q = queue
self.o_q = out_queue
self.base_path = base_path
self.cache_path = cache_path if cache_path else os.path.join(base_path, '.cache')
self.shm = SharedMemory(name=shm)
self.log_level = logging.getLogger().level
self.logging_queue = logging_queue
def run(self):
# we have to fix up the logger before we can start
_root = logging.getLogger()
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
logger = logging.getLogger(self.name)
logger.setLevel(self.log_level)
logger.debug(f'Download worker reporting for duty!')
last_filename = ''
current_file = None
while True:
try:
try:
j = self.q.get(timeout=10.0)
except Empty:
logger.warning('Writer queue empty!')
continue
if j.kill:
if current_file:
current_file.close()
self.o_q.put(WriterTaskResult(success=True, kill=True))
break
# make directories if required
path = os.path.split(j.filename)[0]
if not os.path.exists(os.path.join(self.base_path, path)):
os.makedirs(os.path.join(self.base_path, path))
full_path = os.path.join(self.base_path, j.filename)
if j.empty: # just create an empty file
open(full_path, 'a').close()
self.o_q.put(WriterTaskResult(success=True, filename=j.filename))
continue
elif j.open:
if current_file:
logger.warning(f'Opening new file {j.filename} without closing previous! {last_filename}')
current_file.close()
current_file = open(full_path, 'wb')
last_filename = j.filename
self.o_q.put(WriterTaskResult(success=True, filename=j.filename))
continue
elif j.close:
if current_file:
current_file.close()
current_file = None
else:
logger.warning(f'Asking to close file that is not open: {j.filename}')
self.o_q.put(WriterTaskResult(success=True, filename=j.filename, closed=True))
continue
elif j.rename:
if current_file:
logger.warning('Trying to rename file without closing first!')
current_file.close()
current_file = None
if j.delete:
try:
os.remove(full_path)
except OSError as e:
logger.error(f'Removing file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename))
continue
try:
os.rename(os.path.join(self.base_path, j.old_filename), full_path)
except OSError as e:
logger.error(f'Renaming file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename))
continue
self.o_q.put(WriterTaskResult(success=True, filename=j.filename))
continue
elif j.delete:
if current_file:
logger.warning('Trying to delete file without closing first!')
current_file.close()
current_file = None
try:
os.remove(full_path)
except OSError as e:
if not j.silent:
logger.error(f'Removing file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=True, filename=j.filename))
continue
pre_write = post_write = 0
try:
if j.shm:
pre_write = time.time()
shm_offset = j.shm.offset + j.chunk_offset
shm_end = shm_offset + j.chunk_size
current_file.write(self.shm.buf[shm_offset:shm_end].tobytes())
post_write = time.time()
elif j.cache_file:
pre_write = time.time()
with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f:
if j.chunk_offset:
f.seek(j.chunk_offset)
current_file.write(f.read(j.chunk_size))
post_write = time.time()
elif j.old_file:
pre_write = time.time()
with open(os.path.join(self.base_path, j.old_file), 'rb') as f:
if j.chunk_offset:
f.seek(j.chunk_offset)
current_file.write(f.read(j.chunk_size))
post_write = time.time()
except Exception as e:
logger.warning(f'Something in writing a file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename,
chunk_guid=j.chunk_guid,
release_memory=j.release_memory,
shm=j.shm, size=j.chunk_size,
time_delta=post_write - pre_write))
else:
self.o_q.put(WriterTaskResult(success=True, filename=j.filename,
chunk_guid=j.chunk_guid,
release_memory=j.release_memory,
shm=j.shm, size=j.chunk_size,
time_delta=post_write - pre_write))
except Exception as e:
logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid))
try:
if current_file:
current_file.close()
current_file = None
except Exception as e:
logger.error(f'Closing file after error failed: {e!r}')
except KeyboardInterrupt:
logger.warning('Immediate exit requested, quitting...')
if current_file:
current_file.close()
return