Created
October 20, 2023 02:22
-
-
Save uyjulian/b596c978da0c1031047e124eaf5d4f84 to your computer and use it in GitHub Desktop.
Revisions
-
uyjulian created this gist
Oct 20, 2023 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,447 @@ # SPDX-License-Identifier: MIT # Falcom YamaNeko engine on PSP ISO format extraction. # Uses the information contained in PSP_GAME/USRDIR/data.lst # Also recursively unpacks cclm archive/group files # See also: https://github.com/Trails-Research-Group import struct import io import os def read_unpack(fmt, f): return struct.unpack(fmt, f.read(struct.calcsize(fmt))) # based on https://github.com/barneygale/iso9660 SECTOR_SIZE = 2048 class ISO9660(object): def __init__(self, path): self._buff = None # input buffer self._root = None # root node self._pvd = {} # primary volume descriptor self._paths = [] # path table self._path = path ### Volume Descriptors sector = 0x10 while True: self._get_sector(sector, SECTOR_SIZE) sector += 1 ty = self._unpack('B') if ty == 1: self._unpack_pvd() elif ty == 255: break else: continue ### Path table l0 = self._pvd['path_table_size'] self._get_sector(self._pvd['path_table_l_loc'], l0) while l0 > 0: p = {} l1 = self._unpack('B') l2 = self._unpack('B') p['ex_loc'] = self._unpack('<I') p['parent'] = self._unpack('<H') p['name'] = self._unpack_string(l1).rstrip('\x00') if l1 % 2 == 1: self._unpack('B') self._paths.append(p) l0 -= 8 + l1 + (l1 % 2) assert(l0 == 0) ## ## Retrieve file contents as a string ## def get_file(self, path): path = path.strip('/').split('/') path, filename = path[:-1], path[-1] parent_dir = self._root if len(path) != 0: parent_dir = self._dir_record_by_table(path) if parent_dir == None: parent_dir = self._dir_record_by_root(path) if parent_dir == None: raise Exception("Directory not found") f = self._search_dir_children(parent_dir, filename) if f == None: raise Exception("File not found") self._get_sector(f['ex_loc'], f['ex_len']) return self._unpack_raw(f['ex_len']) ## ## Methods for retrieving partial contents ## def _get_sector(self, sector, length): with open(self._path, 'rb') as f: f.seek(sector * SECTOR_SIZE) self._buff = io.BytesIO(f.read(length)) ## ## Return the record for final directory in a path ## def _dir_record_by_table(self, path): for e in self._paths[::-1]: search = list(path) f = e while f['name'] == search[-1]: search.pop() f = self._paths[f['parent']-1] if f['parent'] == 1: return e return None def _dir_record_by_root(self, path): current = self._root remaining = list(path) while remaining: current = self._search_dir_children(current, remaining[0]) if current == None: break remaining.pop(0) return current ## ## Unpack the Primary Volume Descriptor ## def _unpack_pvd(self): self._unpack_raw(131) self._pvd['path_table_size'] = self._unpack_both('i') self._pvd['path_table_l_loc'] = self._unpack('<I') self._unpack_raw(12) _, self._root = self._unpack_record() #root directory record self._unpack_raw(692) ## ## Unpack a directory record (a listing of a file or folder) ## def _unpack_record(self, read=0): l0 = self._unpack('<B') if l0 == 0: return read + 1, None l1 = self._unpack('<B') d = dict() d['ex_loc'] = self._unpack_both('I') d['ex_len'] = self._unpack_both('I') self._unpack_raw(14) l2 = self._unpack('<B') d['name'] = self._unpack_string(l2).split(';')[0].rstrip('\x00') if l2 % 2 == 0: self._unpack_raw(1) t = 34 + l2 - (l2 % 2) e = l0 - t if e > 0: self._unpack_raw(e) return read + l0, d # Assuming d is a directory record, this generator yields its children def _unpack_dir_children(self, d): sector = d['ex_loc'] read = 0 self._get_sector(sector, 2048) read, r_self = self._unpack_record(read) read, r_parent = self._unpack_record(read) while read < r_self['ex_len']: # Iterate over files in the directory if read % 2048 == 0: sector += 1 self._get_sector(sector, 2048) read, data = self._unpack_record(read) if data == None: # end of directory listing to_read = 2048 - (read % 2048) self._unpack_raw(to_read) read += to_read else: yield data # Search for one child amongst the children def _search_dir_children(self, d, term): for e in self._unpack_dir_children(d): if e['name'] == term: return e return None ## ## Datatypes ## def _unpack_raw(self, l): return self._buff.read(l) # both-endian def _unpack_both(self, st): a = self._unpack('<' + st) b = self._unpack('>' + st) assert(a == b) return a def _unpack_string(self, l): return self._buff.read(l).rstrip(b' ').decode('ASCII') def _unpack(self, st): if st[0] not in ['<', '>']: st = '<' + st d = struct.unpack(st, self._buff.read(struct.calcsize(st))) if len(st) == 2: return d[0] else: return d def iterate_list(cb, df, ext_list, size_own, curstr=b"", max_entry_count=None): cur_entry_count = 0 while df.tell() < size_own: dname = df.read(8).rstrip(b"\x00") dsize_or_count = int.from_bytes(df.read(4), byteorder="little") dlba = int.from_bytes(df.read(2), byteorder="little") | (int.from_bytes(df.read(1), byteorder="little") << 16) dext = int.from_bytes(df.read(1), byteorder="little") if dext == 0: cur_entry_count += iterate_list(cb, df, ext_list, size_own, curstr=curstr + dname + b"/", max_entry_count=dsize_or_count) else: cb(curstr + dname + b"." + ext_list[dext - 1], dsize_or_count, dlba) cur_entry_count += 1 if max_entry_count != None: if cur_entry_count >= max_entry_count: break return cur_entry_count # Reference: CEgPacks2::UnpackBZMode2 # Also known as falcom_compress / BZ / BZip / zero method def decompress(buffer, output, size): offset = 0 # u16 bits = 8 # 8 to start off with, then 16 flags = int.from_bytes(buffer[offset:offset + 2], byteorder="little") offset += 2 flags >>= 8 outputoffset = 0 # u16 def getflag(): nonlocal bits nonlocal flags nonlocal offset if bits == 0: slice_ = buffer[offset:offset + 2] if len(slice_) < 2: raise Exception("Out of data") flags = int.from_bytes(slice_, byteorder="little") offset += 2 bits = 16 flag = flags & 1 flags >>= 1 bits -= 1 return flag def setup_run(prev_u_buffer_pos): nonlocal offset nonlocal buffer nonlocal output nonlocal outputoffset run = 2 # u16 if getflag() == 0: run += 1 if getflag() == 0: run += 1 if getflag() == 0: run += 1 if getflag() == 0: if getflag() == 0: slice_ = buffer[offset:offset + 1] if len(slice_) < 1: raise Exception("Out of data") run = int.from_bytes(slice_, byteorder="little") offset += 1 run += 0xE else: run = 0 for i in range(3): run = (run << 1) | getflag() run += 0x6 # Does the 'copy from buffer' thing for i in range(run): output[outputoffset] = output[outputoffset - prev_u_buffer_pos] outputoffset += 1 while True: if getflag() != 0: # Call next method to process next flag if getflag() != 0: # Long look-back distance or exit program or repeating sequence (flags = 11) run = 0 # u16 for i in range(5): # Load high-order distance from flags (max = 0x31) run = (run << 1) | getflag() prev_u_buffer_pos = int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Load low-order distance (max = 0xFF) # Also acts as flag byte # run = 0 and byte = 0 -> exit program # run = 0 and byte = 1 -> sequence of repeating bytes offset += 1 if run != 0: prev_u_buffer_pos = prev_u_buffer_pos | (run << 8) # Add high and low order distance (max distance = 0x31FF) setup_run(prev_u_buffer_pos) # Get run length and finish unpacking (write to output) elif prev_u_buffer_pos > 2: # Is this used? Seems inefficient. setup_run(prev_u_buffer_pos) elif prev_u_buffer_pos == 0: # Decompression complete. End program. break else: # Repeating byte branch = getflag() # True = long repeating sequence (> 30) for i in range(4): run = (run << 1) | getflag() if branch != 0: run = (run << 0x8) | int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Load run length from byte and add high-order run length (max = 0xFFF + 0xE) offset += 1 run += 0xE output[outputoffset:outputoffset + run] = bytes(buffer[offset:offset + 1]) * run offset += 1 outputoffset += run else: # Short look-back distance (flags = 10) prev_u_buffer_pos = int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Get the look-back distance (max = 0xFF) offset += 1 setup_run(prev_u_buffer_pos) # Get run length and finish unpacking (write to output) else: # Copy byte (flags = 0) output[outputoffset:outputoffset + 1] = buffer[offset:offset + 1] outputoffset += 1 offset += 1 return outputoffset, offset # Reference: CSafeFile::freadP # Also known as FALCOM3 compression def decompress_blocks_stream(f): flags = read_unpack("<I", f)[0] dst = None dst_offset = 0 if (flags & 0x80000000) != 0: raise Exception("High-bit method intentionally not supported") else: compressed_size = flags uncompressed_size, num_blocks = read_unpack("<2I", f) dst = bytearray(uncompressed_size) # Should already be initialized with 0 cdata = io.BytesIO(f.read(compressed_size - 8)) for i in range(num_blocks): block_size = read_unpack("<H", cdata)[0] output_tmp = bytearray(65536) inbuf = cdata.read(block_size - 2) if inbuf[0] != 0: raise Exception("Non-zero method currently not supported") num1, num2 = decompress(inbuf, output_tmp, block_size) dst[dst_offset:dst_offset + num1] = output_tmp[0:num1] dst_offset += num1 if dst_offset >= uncompressed_size: break x = cdata.read(1) if len(x) == 0: break if x[0] == 0: break return bytes(dst) def unpack_cclm_recursive(df, curstr=b""): files = [] di1 = df.read(4) di2 = df.read(4) di3 = df.read(4) di4 = df.read(4) if len(di1) != 4 or len(di2) != 4 or len(di3) != 4 or len(di4) != 4: return False i1 = int.from_bytes(di1, byteorder="little") i2 = int.from_bytes(di2, byteorder="little") i3 = int.from_bytes(di3, byteorder="little") i4 = int.from_bytes(di4, byteorder="little") if i1 != i2 or i1 != i3 or i1 != i4: return False if i1 == 0: return False for i in range(i1): name_b = df.read(16) if len(name_b) != 16: return False offset_b = df.read(4) if len(offset_b) != 4: return False size_b = df.read(4) if len(size_b) != 4: return False; decompressed_size_b = df.read(4) if len(decompressed_size_b) != 4: return False always_zero_b = df.read(4) if len(always_zero_b) != 4: return False name = name_b.rstrip(b"\x00") offset = int.from_bytes(offset_b, byteorder="little") size = int.from_bytes(size_b, byteorder="little") decompressed_size = int.from_bytes(decompressed_size_b, byteorder="little") always_zero = int.from_bytes(always_zero_b, byteorder="little") files.append([name, offset, size, decompressed_size]) for x in files: df.seek(x[1]) d = df.read(x[2]) if len(d) != x[2]: return False dbio = io.BytesIO(d) if x[3] != 0 and x[2] != x[3]: d = decompress_blocks_stream(dbio) dbio = io.BytesIO(d) name = x[0].decode("ASCII", errors="replace").replace("\uFFFD", "_") if not unpack_cclm_recursive(dbio, curstr + name + "_unpacked/"): fullpath = curstr + name fullpath_dirname = os.path.dirname(fullpath) os.makedirs(fullpath_dirname, exist_ok=True) with open(fullpath, "wb") as wf: wf.write(d) return True if __name__ == '__main__': import sys iso_path = sys.argv[1] cd = ISO9660(iso_path) out_path = sys.argv[2] d = cd.get_file("PSP_GAME/USRDIR/data.lst") df = io.BytesIO(d) size_own = int.from_bytes(df.read(4), byteorder="little") if len(d) != size_own: raise Exception("Incorrect size of data.lst") ext_list_d = df.read(0x400 - 4) ext_list = [ext_list_d[i:i + 4].rstrip(b"\x00").replace(b"\x82", b"_").replace(b"\x86", b"_") for i in range(0, len(ext_list_d), 4)] ext_list = [x for x in ext_list if x != b""] with open(sys.argv[1], "rb") as f: def list_cb(pathname, size, lba): f.seek(SECTOR_SIZE * lba) pathname_ascii = pathname.decode("ASCII") fullpath = out_path + "/" + pathname_ascii fullpath_dirname = os.path.dirname(fullpath) os.makedirs(fullpath_dirname, exist_ok=True) ds = f.read(size) dsbio = io.BytesIO(ds) if not unpack_cclm_recursive(dsbio, fullpath + "_unpacked/"): with open(fullpath, "wb") as wf: wf.write(ds) iterate_list(list_cb, df, ext_list, size_own)