# coding: utf-8 from __future__ import annotations import hashlib import logging import struct import zlib from base64 import b64encode from io import BytesIO from typing import Optional logger = logging.getLogger('Manifest') def read_fstring(bio): length = struct.unpack(' 0: s = bio.read(length - 1).decode('ascii') bio.seek(1, 1) # skip string null terminator else: # empty string, no terminators or anything s = '' return s def write_fstring(bio, string): if not string: bio.write(struct.pack('= 15: return 'ChunksV4' elif version >= 6: return 'ChunksV3' elif version >= 3: return 'ChunksV2' else: return 'Chunks' class Manifest: header_magic = 0x44BEC00C default_serialisation_version = 17 def __init__(self): self.header_size = 41 self.size_compressed = 0 self.size_uncompressed = 0 self.sha_hash = '' self.stored_as = 0 self.version = 18 self.data = b'' # remainder self.meta: Optional[ManifestMeta] = None self.chunk_data_list: Optional[CDL] = None self.file_manifest_list: Optional[FML] = None self.custom_fields: Optional[CustomFields] = None @property def compressed(self): return self.stored_as & 0x1 @classmethod def read_all(cls, data): _m = cls.read(data) _tmp = BytesIO(_m.data) _m.meta = ManifestMeta.read(_tmp) _m.chunk_data_list = CDL.read(_tmp, _m.meta.feature_level) _m.file_manifest_list = FML.read(_tmp) _m.custom_fields = CustomFields.read(_tmp) if unhandled_data := _tmp.read(): logger.warning(f'Did not read {len(unhandled_data)} remaining bytes in manifest! ' f'This may not be a problem.') # Throw this away since the raw data is no longer needed _tmp.close() del _tmp _m.data = b'' return _m @classmethod def read(cls, data): bio = BytesIO(data) if struct.unpack(' 21: logger.warning(f'Trying to serialise an unknown target version: {target_version},' f'clamping to 21.') target_version = 21 # Ensure metadata will be correct self.meta.feature_level = target_version self.meta.write(body_bio) self.chunk_data_list.write(body_bio) self.file_manifest_list.write(body_bio) self.custom_fields.write(body_bio) self.data = body_bio.getvalue() self.size_uncompressed = self.size_compressed = len(self.data) self.sha_hash = hashlib.sha1(self.data).digest() if self.compressed or compress: self.stored_as |= 0x1 self.data = zlib.compress(self.data) self.size_compressed = len(self.data) bio = fp or BytesIO() bio.write(struct.pack('= 1 stores build ID if _meta.data_version >= 1: _meta._build_id = read_fstring(bio) # Manifest version 21 with data version >= 2 stores uninstall commands if _meta.data_version >= 2: _meta.uninstall_action_path = read_fstring(bio) _meta.uninstall_action_args = read_fstring(bio) if (size_read := bio.tell()) != _meta.meta_size: logger.warning(f'Did not read entire manifest metadata! Version: {_meta.data_version}, ' f'{_meta.meta_size - size_read} bytes missing, skipping...') bio.seek(_meta.meta_size - size_read, 1) # downgrade version to prevent issues during serialisation _meta.data_version = 0 return _meta def write(self, bio): meta_start = bio.tell() bio.write(struct.pack('= 1: write_fstring(bio, self.build_id) if self.data_version >= 2: write_fstring(bio, self.uninstall_action_path) write_fstring(bio, self.uninstall_action_args) meta_end = bio.tell() bio.seek(meta_start) bio.write(struct.pack(''.format( self.guid_str, self.hash, self.sha_hash.hex(), self.group_num, self.window_size, self.file_size ) @property def guid_str(self): if not self._guid_str: self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid) return self._guid_str @property def guid_num(self): if not self._guid_num: self._guid_num = self.guid[3] + (self.guid[2] << 32) + (self.guid[1] << 64) + (self.guid[0] << 96) return self._guid_num @property def group_num(self): if self._guid_num is not None: return self._group_num self._group_num = (zlib.crc32( struct.pack(' 0: logger.warning(f'Did not read {diff} bytes from chunk part!') bio.seek(diff) # MD5 hash + MIME type (Manifest feature level 19) if _fml.version >= 1: for fm in _fml.elements: _has_md5 = struct.unpack('= 2: for fm in _fml.elements: fm.hash_sha256 = bio.read(32) # we have to calculate the actual file size ourselves for fm in _fml.elements: fm.file_size = sum(c.size for c in fm.chunk_parts) if (size_read := bio.tell() - fml_start) != _fml.size: logger.warning(f'Did not read entire file data list! Version: {_fml.version}, ' f'{_fml.size - size_read} bytes missing, skipping...') bio.seek(_fml.size - size_read, 1) # downgrade version to prevent issues during serialisation _fml.version = 0 return _fml def write(self, bio): fml_start = bio.tell() bio.write(struct.pack('= 1: for fm in self.elements: has_md5 = 1 if fm.hash_md5 else 0 bio.write(struct.pack('= 2: for fm in self.elements: bio.write(fm.hash_sha256) fml_end = bio.tell() bio.seek(fml_start) bio.write(struct.pack(''.format( self.filename, self.symlink_target, self.hash.hex(), self.flags, ', '.join(self.install_tags), cp_repr, self.file_size ) class ChunkPart: def __init__(self, guid=None, offset=0, size=0, file_offset=0): self.guid = guid self.offset = offset self.size = size self.file_offset = file_offset # caches for things that are "expensive" to compute self._guid_str = None self._guid_num = None @property def guid_str(self): if not self._guid_str: self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid) return self._guid_str @property def guid_num(self): if not self._guid_num: self._guid_num = self.guid[3] + (self.guid[2] << 32) + (self.guid[1] << 64) + (self.guid[0] << 96) return self._guid_num def __repr__(self): guid_readable = '-'.join('{:08x}'.format(g) for g in self.guid) return ''.format( guid_readable, self.offset, self.size, self.file_offset) class CustomFields: def __init__(self): self.size = 0 self.version = 0 self.count = 0 self._dict = dict() def __getitem__(self, item): return self._dict.get(item, None) def __setitem__(self, key, value): self._dict[key] = value def __str__(self): return str(self._dict) def items(self): return self._dict.items() def keys(self): return self._dict.keys() def values(self): return self._dict.values() @classmethod def read(cls, bio): _cf = cls() cf_start = bio.tell() _cf.size = struct.unpack('