[all] Add a context manager for user data

This wraps all modifications of the `user.json` file into a FileLock
This commit is contained in:
Mathis Dröge 2023-06-06 19:34:11 +02:00
parent 8b2809779f
commit a27125cbc2
No known key found for this signature in database
GPG key ID: 3071D4EFBB298F5F
3 changed files with 87 additions and 49 deletions

View file

@ -133,7 +133,7 @@ class LegendaryCLI:
try:
if self.core.auth_import():
logger.info('Successfully imported login session from EGS!')
logger.info(f'Now logged in as user "{self.core.lgd.userdata["displayName"]}"')
logger.info(f'Now logged in as user "{self.core.lgd.immutable_user_properties["displayName"]}"')
return
else:
logger.warning('Login session from EGS seems to no longer be valid.')
@ -163,7 +163,7 @@ class LegendaryCLI:
auth_code = auth_code.strip('"')
else:
if do_webview_login(callback_code=self.core.auth_ex_token):
logger.info(f'Successfully logged in as "{self.core.lgd.userdata["displayName"]}" via WebView')
logger.info(f'Successfully logged in as "{self.core.lgd.immutable_user_properties["displayName"]}" via WebView')
else:
logger.error('WebView login attempt failed, please see log for details.')
return
@ -179,9 +179,9 @@ class LegendaryCLI:
return
if exchange_token and self.core.auth_ex_token(exchange_token):
logger.info(f'Successfully logged in as "{self.core.lgd.userdata["displayName"]}"')
logger.info(f'Successfully logged in as "{self.core.lgd.immutable_user_properties["displayName"]}"')
elif auth_code and self.core.auth_code(auth_code):
logger.info(f'Successfully logged in as "{self.core.lgd.userdata["displayName"]}"')
logger.info(f'Successfully logged in as "{self.core.lgd.immutable_user_properties["displayName"]}"')
else:
logger.error('Login attempt failed, please see log for details.')
@ -1505,11 +1505,12 @@ class LegendaryCLI:
# if automatic checks are off force an update here
self.core.check_for_updates(force=True)
if not self.core.lgd.userdata:
user_name = '<not logged in>'
args.offline = True
else:
user_name = self.core.lgd.userdata['displayName']
with self.core.lgd.user_lock() as user_lock:
if not user_lock.data:
user_name = '<not logged in>'
args.offline = True
else:
user_name = user_lock.data['displayName']
games_available = len(self.core.get_game_list(update_assets=not args.offline))
games_installed = len(self.core.get_installed_list())

View file

@ -23,7 +23,7 @@ from legendary.api.egs import EPCAPI
from legendary.api.lgd import LGDAPI
from legendary.downloader.mp.manager import DLManager
from legendary.lfs.egl import EPCLFS
from legendary.lfs.lgndry import LGDLFS
from legendary.lfs.lgndry import LGDLFS, LockedUserData
from legendary.lfs.utils import clean_filename, delete_folder, delete_filelist, get_dir_size
from legendary.models.downloading import AnalysisResult, ConditionCheckResult
from legendary.models.egl import EGLManifest
@ -131,7 +131,8 @@ class LegendaryCore:
Handles authentication via authorization code (either retrieved manually or automatically)
"""
try:
self.lgd.userdata = self.egs.start_session(authorization_code=code)
with self.lgd.user_lock() as user_lock:
user_lock.data = self.egs.start_session(authorization_code=code)
return True
except Exception as e:
self.log.error(f'Logging in failed with {e!r}, please try again.')
@ -142,7 +143,8 @@ class LegendaryCore:
Handles authentication via exchange token (either retrieved manually or automatically)
"""
try:
self.lgd.userdata = self.egs.start_session(exchange_token=code)
with self.lgd.user_lock() as user_lock:
user_lock.data = self.egs.start_session(exchange_token=code)
return True
except Exception as e:
self.log.error(f'Logging in failed with {e!r}, please try again.')
@ -171,22 +173,23 @@ class LegendaryCore:
raise ValueError('No login session in config')
refresh_token = re_data['Token']
try:
self.lgd.userdata = self.egs.start_session(refresh_token=refresh_token)
with self.lgd.user_lock() as user_lock:
user_lock.data = self.egs.start_session(refresh_token=refresh_token)
return True
except Exception as e:
self.log.error(f'Logging in failed with {e!r}, please try again.')
return False
def login(self, force_refresh=False) -> bool:
def _login(self, user_lock: LockedUserData, force_refresh=False) -> bool:
"""
Attempts logging in with existing credentials.
raises ValueError if no existing credentials or InvalidCredentialsError if the API return an error
"""
if not self.lgd.userdata:
if not user_lock.data:
raise ValueError('No saved credentials')
elif self.logged_in and self.lgd.userdata['expires_at']:
dt_exp = datetime.fromisoformat(self.lgd.userdata['expires_at'][:-1])
elif self.logged_in and user_lock.data['expires_at']:
dt_exp = datetime.fromisoformat(user_lock.data['expires_at'][:-1])
dt_now = datetime.utcnow()
td = dt_now - dt_exp
@ -212,8 +215,8 @@ class LegendaryCore:
except Exception as e:
self.log.warning(f'Checking for EOS Overlay updates failed: {e!r}')
if self.lgd.userdata['expires_at'] and not force_refresh:
dt_exp = datetime.fromisoformat(self.lgd.userdata['expires_at'][:-1])
if user_lock.data['expires_at'] and not force_refresh:
dt_exp = datetime.fromisoformat(user_lock.data['expires_at'][:-1])
dt_now = datetime.utcnow()
td = dt_now - dt_exp
@ -221,7 +224,7 @@ class LegendaryCore:
if dt_exp > dt_now and abs(td.total_seconds()) > 600:
self.log.info('Trying to re-use existing login session...')
try:
self.egs.resume_session(self.lgd.userdata)
self.egs.resume_session(user_lock.data)
self.logged_in = True
return True
except InvalidCredentialsError as e:
@ -233,7 +236,7 @@ class LegendaryCore:
try:
self.log.info('Logging in...')
userdata = self.egs.start_session(self.lgd.userdata['refresh_token'])
userdata = self.egs.start_session(user_lock.data['refresh_token'])
except InvalidCredentialsError:
self.log.error('Stored credentials are no longer valid! Please login again.')
self.lgd.invalidate_userdata()
@ -242,10 +245,14 @@ class LegendaryCore:
self.log.error(f'HTTP request for login failed: {e!r}, please try again later.')
return False
self.lgd.userdata = userdata
user_lock.data = userdata
self.logged_in = True
return True
def login(self, force_refresh=False) -> bool:
with self.lgd.user_lock() as user_lock:
return self._login(user_lock, force_refresh)
def update_check_enabled(self):
return not self.lgd.config.getboolean('Legendary', 'disable_update_check', fallback=False)
@ -726,8 +733,8 @@ class LegendaryCore:
elif not install.can_run_offline:
self.log.warning('Game is not approved for offline use and may not work correctly.')
account_id = self.lgd.userdata['account_id']
user_name = user or self.lgd.userdata['displayName']
account_id = self.lgd.immutable_user_properties['account_id']
user_name = user or self.lgd.immutable_user_properties['displayName']
params.egl_parameters.extend([
'-AUTH_LOGIN=unused',
@ -767,8 +774,8 @@ class LegendaryCore:
def get_origin_uri(self, app_name: str, offline: bool = False) -> str:
token = '0' if offline else self.egs.get_game_token()['code']
user_name = self.lgd.userdata['displayName']
account_id = self.lgd.userdata['account_id']
user_name = self.lgd.immutable_user_properties['displayName']
account_id = self.lgd.immutable_user_properties['account_id']
parameters = [
('AUTH_PASSWORD', token),
('AUTH_TYPE', 'exchangecode'),
@ -817,7 +824,7 @@ class LegendaryCore:
# the following variables are known:
path_vars = {
'{installdir}': igame.install_path,
'{epicid}': self.lgd.userdata['account_id']
'{epicid}': self.lgd.immutable_user_properties['account_id']
}
if sys_platform == 'win32':

View file

@ -7,6 +7,7 @@ import logging
from collections import defaultdict
from pathlib import Path
from time import time
from filelock import FileLock
from .utils import clean_filename
@ -16,6 +17,40 @@ from legendary.models.config import LGDConf
from legendary.utils.env import is_windows_mac_or_pyi
class LockedUserData(FileLock):
def __init__(self, user_data_path: str):
super().__init__(user_data_path + '.lock')
self._user_data_path = user_data_path
self._user_data = None
self._initial_user_data = None
def __enter__(self):
super().__enter__()
if os.path.exists(self._user_data_path):
with open(self._user_data_path, 'r') as f:
self._user_data = json.load(f)
self._initial_user_data = self._user_data
return self
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
if self._user_data != self._initial_user_data:
if self._user_data is not None:
with open(self._user_data_path, 'w') as f:
json.dump(self._user_data, f, indent=2, sort_keys=True)
else:
if os.path.exists(self._user_data_path):
os.remove(self._user_data_path)
@property
def data(self):
return self._user_data
@data.setter
def data(self, new_data):
self._user_data = new_data
class LGDLFS:
def __init__(self, config_file=None):
self.log = logging.getLogger('LGDLFS')
@ -130,31 +165,26 @@ class LGDLFS:
except Exception as e:
self.log.debug(f'Loading aliases failed with {e!r}')
def user_lock(self) -> LockedUserData:
return LockedUserData(os.path.join(self.path, 'user.json'))
@property
def userdata(self):
if self._user_data is not None:
return self._user_data
try:
self._user_data = json.load(open(os.path.join(self.path, 'user.json')))
return self._user_data
except Exception as e:
self.log.debug(f'Failed to load user data: {e!r}')
return None
@userdata.setter
def userdata(self, userdata):
if userdata is None:
raise ValueError('Userdata is none!')
self._user_data = userdata
json.dump(userdata, open(os.path.join(self.path, 'user.json'), 'w'),
indent=2, sort_keys=True)
def immutable_user_properties(self) -> dict[str, str] | None:
with self.user_lock() as user_lock:
immutable_data = user_lock.data
if immutable_data is None:
return immutable_data
del immutable_data['access_token']
del immutable_data['expires_at']
del immutable_data['expires_in']
del immutable_data['refresh_token']
del immutable_data['refresh_expires']
del immutable_data['refresh_expires_at']
return immutable_data
def invalidate_userdata(self):
self._user_data = None
if os.path.exists(os.path.join(self.path, 'user.json')):
os.remove(os.path.join(self.path, 'user.json'))
with self.user_lock() as user_lock:
user_lock.data = None
@property
def entitlements(self):