Skip to content

Instantly share code, notes, and snippets.

@uyjulian
Created October 20, 2023 02:22
Show Gist options
  • Select an option

  • Save uyjulian/b596c978da0c1031047e124eaf5d4f84 to your computer and use it in GitHub Desktop.

Select an option

Save uyjulian/b596c978da0c1031047e124eaf5d4f84 to your computer and use it in GitHub Desktop.

Revisions

  1. uyjulian created this gist Oct 20, 2023.
    447 changes: 447 additions & 0 deletions falcom_psp_iso_unpack.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,447 @@
    # SPDX-License-Identifier: MIT

    # Falcom YamaNeko engine on PSP ISO format extraction.
    # Uses the information contained in PSP_GAME/USRDIR/data.lst
    # Also recursively unpacks cclm archive/group files

    # See also: https://github.com/Trails-Research-Group

    import struct
    import io
    import os

    def read_unpack(fmt, f):
    return struct.unpack(fmt, f.read(struct.calcsize(fmt)))

    # based on https://github.com/barneygale/iso9660
    SECTOR_SIZE = 2048

    class ISO9660(object):
    def __init__(self, path):
    self._buff = None # input buffer
    self._root = None # root node
    self._pvd = {} # primary volume descriptor
    self._paths = [] # path table

    self._path = path

    ### Volume Descriptors
    sector = 0x10
    while True:
    self._get_sector(sector, SECTOR_SIZE)
    sector += 1
    ty = self._unpack('B')

    if ty == 1:
    self._unpack_pvd()
    elif ty == 255:
    break
    else:
    continue

    ### Path table
    l0 = self._pvd['path_table_size']
    self._get_sector(self._pvd['path_table_l_loc'], l0)

    while l0 > 0:
    p = {}
    l1 = self._unpack('B')
    l2 = self._unpack('B')
    p['ex_loc'] = self._unpack('<I')
    p['parent'] = self._unpack('<H')
    p['name'] = self._unpack_string(l1).rstrip('\x00')

    if l1 % 2 == 1:
    self._unpack('B')

    self._paths.append(p)

    l0 -= 8 + l1 + (l1 % 2)

    assert(l0 == 0)

    ##
    ## Retrieve file contents as a string
    ##

    def get_file(self, path):
    path = path.strip('/').split('/')
    path, filename = path[:-1], path[-1]
    parent_dir = self._root

    if len(path) != 0:
    parent_dir = self._dir_record_by_table(path)
    if parent_dir == None:
    parent_dir = self._dir_record_by_root(path)
    if parent_dir == None:
    raise Exception("Directory not found")

    f = self._search_dir_children(parent_dir, filename)

    if f == None:
    raise Exception("File not found")

    self._get_sector(f['ex_loc'], f['ex_len'])
    return self._unpack_raw(f['ex_len'])

    ##
    ## Methods for retrieving partial contents
    ##

    def _get_sector(self, sector, length):
    with open(self._path, 'rb') as f:
    f.seek(sector * SECTOR_SIZE)
    self._buff = io.BytesIO(f.read(length))

    ##
    ## Return the record for final directory in a path
    ##

    def _dir_record_by_table(self, path):
    for e in self._paths[::-1]:
    search = list(path)
    f = e
    while f['name'] == search[-1]:
    search.pop()
    f = self._paths[f['parent']-1]
    if f['parent'] == 1:
    return e

    return None

    def _dir_record_by_root(self, path):
    current = self._root
    remaining = list(path)

    while remaining:
    current = self._search_dir_children(current, remaining[0])

    if current == None:
    break

    remaining.pop(0)

    return current

    ##
    ## Unpack the Primary Volume Descriptor
    ##

    def _unpack_pvd(self):
    self._unpack_raw(131)
    self._pvd['path_table_size'] = self._unpack_both('i')
    self._pvd['path_table_l_loc'] = self._unpack('<I')
    self._unpack_raw(12)
    _, self._root = self._unpack_record() #root directory record
    self._unpack_raw(692)

    ##
    ## Unpack a directory record (a listing of a file or folder)
    ##

    def _unpack_record(self, read=0):
    l0 = self._unpack('<B')

    if l0 == 0:
    return read + 1, None

    l1 = self._unpack('<B')

    d = dict()
    d['ex_loc'] = self._unpack_both('I')
    d['ex_len'] = self._unpack_both('I')
    self._unpack_raw(14)

    l2 = self._unpack('<B')
    d['name'] = self._unpack_string(l2).split(';')[0].rstrip('\x00')

    if l2 % 2 == 0:
    self._unpack_raw(1)

    t = 34 + l2 - (l2 % 2)

    e = l0 - t
    if e > 0:
    self._unpack_raw(e)

    return read + l0, d

    # Assuming d is a directory record, this generator yields its children
    def _unpack_dir_children(self, d):
    sector = d['ex_loc']
    read = 0
    self._get_sector(sector, 2048)

    read, r_self = self._unpack_record(read)
    read, r_parent = self._unpack_record(read)

    while read < r_self['ex_len']: # Iterate over files in the directory
    if read % 2048 == 0:
    sector += 1
    self._get_sector(sector, 2048)
    read, data = self._unpack_record(read)

    if data == None: # end of directory listing
    to_read = 2048 - (read % 2048)
    self._unpack_raw(to_read)
    read += to_read
    else:
    yield data

    # Search for one child amongst the children
    def _search_dir_children(self, d, term):
    for e in self._unpack_dir_children(d):
    if e['name'] == term:
    return e

    return None
    ##
    ## Datatypes
    ##

    def _unpack_raw(self, l):
    return self._buff.read(l)

    # both-endian
    def _unpack_both(self, st):
    a = self._unpack('<' + st)
    b = self._unpack('>' + st)
    assert(a == b)
    return a

    def _unpack_string(self, l):
    return self._buff.read(l).rstrip(b' ').decode('ASCII')

    def _unpack(self, st):
    if st[0] not in ['<', '>']:
    st = '<' + st
    d = struct.unpack(st, self._buff.read(struct.calcsize(st)))
    if len(st) == 2:
    return d[0]
    else:
    return d

    def iterate_list(cb, df, ext_list, size_own, curstr=b"", max_entry_count=None):
    cur_entry_count = 0
    while df.tell() < size_own:
    dname = df.read(8).rstrip(b"\x00")
    dsize_or_count = int.from_bytes(df.read(4), byteorder="little")
    dlba = int.from_bytes(df.read(2), byteorder="little") | (int.from_bytes(df.read(1), byteorder="little") << 16)
    dext = int.from_bytes(df.read(1), byteorder="little")
    if dext == 0:
    cur_entry_count += iterate_list(cb, df, ext_list, size_own, curstr=curstr + dname + b"/", max_entry_count=dsize_or_count)
    else:
    cb(curstr + dname + b"." + ext_list[dext - 1], dsize_or_count, dlba)
    cur_entry_count += 1
    if max_entry_count != None:
    if cur_entry_count >= max_entry_count:
    break
    return cur_entry_count

    # Reference: CEgPacks2::UnpackBZMode2
    # Also known as falcom_compress / BZ / BZip / zero method
    def decompress(buffer, output, size):
    offset = 0 # u16
    bits = 8 # 8 to start off with, then 16
    flags = int.from_bytes(buffer[offset:offset + 2], byteorder="little")
    offset += 2
    flags >>= 8
    outputoffset = 0 # u16
    def getflag():
    nonlocal bits
    nonlocal flags
    nonlocal offset

    if bits == 0:
    slice_ = buffer[offset:offset + 2]
    if len(slice_) < 2:
    raise Exception("Out of data")
    flags = int.from_bytes(slice_, byteorder="little")
    offset += 2
    bits = 16
    flag = flags & 1
    flags >>= 1
    bits -= 1
    return flag
    def setup_run(prev_u_buffer_pos):
    nonlocal offset
    nonlocal buffer
    nonlocal output
    nonlocal outputoffset

    run = 2 # u16
    if getflag() == 0:
    run += 1
    if getflag() == 0:
    run += 1
    if getflag() == 0:
    run += 1
    if getflag() == 0:
    if getflag() == 0:
    slice_ = buffer[offset:offset + 1]
    if len(slice_) < 1:
    raise Exception("Out of data")
    run = int.from_bytes(slice_, byteorder="little")
    offset += 1
    run += 0xE
    else:
    run = 0
    for i in range(3):
    run = (run << 1) | getflag()
    run += 0x6
    # Does the 'copy from buffer' thing
    for i in range(run):
    output[outputoffset] = output[outputoffset - prev_u_buffer_pos]
    outputoffset += 1
    while True:
    if getflag() != 0: # Call next method to process next flag
    if getflag() != 0: # Long look-back distance or exit program or repeating sequence (flags = 11)
    run = 0 # u16
    for i in range(5): # Load high-order distance from flags (max = 0x31)
    run = (run << 1) | getflag()
    prev_u_buffer_pos = int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Load low-order distance (max = 0xFF)
    # Also acts as flag byte
    # run = 0 and byte = 0 -> exit program
    # run = 0 and byte = 1 -> sequence of repeating bytes
    offset += 1
    if run != 0:
    prev_u_buffer_pos = prev_u_buffer_pos | (run << 8) # Add high and low order distance (max distance = 0x31FF)
    setup_run(prev_u_buffer_pos) # Get run length and finish unpacking (write to output)
    elif prev_u_buffer_pos > 2: # Is this used? Seems inefficient.
    setup_run(prev_u_buffer_pos)
    elif prev_u_buffer_pos == 0: # Decompression complete. End program.
    break
    else: # Repeating byte
    branch = getflag() # True = long repeating sequence (> 30)
    for i in range(4):
    run = (run << 1) | getflag()
    if branch != 0:
    run = (run << 0x8) | int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Load run length from byte and add high-order run length (max = 0xFFF + 0xE)
    offset += 1
    run += 0xE
    output[outputoffset:outputoffset + run] = bytes(buffer[offset:offset + 1]) * run
    offset += 1
    outputoffset += run
    else: # Short look-back distance (flags = 10)
    prev_u_buffer_pos = int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Get the look-back distance (max = 0xFF)
    offset += 1
    setup_run(prev_u_buffer_pos) # Get run length and finish unpacking (write to output)
    else: # Copy byte (flags = 0)
    output[outputoffset:outputoffset + 1] = buffer[offset:offset + 1]
    outputoffset += 1
    offset += 1
    return outputoffset, offset

    # Reference: CSafeFile::freadP
    # Also known as FALCOM3 compression
    def decompress_blocks_stream(f):
    flags = read_unpack("<I", f)[0]
    dst = None
    dst_offset = 0
    if (flags & 0x80000000) != 0:
    raise Exception("High-bit method intentionally not supported")
    else:
    compressed_size = flags
    uncompressed_size, num_blocks = read_unpack("<2I", f)
    dst = bytearray(uncompressed_size) # Should already be initialized with 0
    cdata = io.BytesIO(f.read(compressed_size - 8))
    for i in range(num_blocks):
    block_size = read_unpack("<H", cdata)[0]
    output_tmp = bytearray(65536)
    inbuf = cdata.read(block_size - 2)
    if inbuf[0] != 0:
    raise Exception("Non-zero method currently not supported")
    num1, num2 = decompress(inbuf, output_tmp, block_size)
    dst[dst_offset:dst_offset + num1] = output_tmp[0:num1]
    dst_offset += num1
    if dst_offset >= uncompressed_size:
    break
    x = cdata.read(1)
    if len(x) == 0:
    break
    if x[0] == 0:
    break
    return bytes(dst)

    def unpack_cclm_recursive(df, curstr=b""):
    files = []
    di1 = df.read(4)
    di2 = df.read(4)
    di3 = df.read(4)
    di4 = df.read(4)
    if len(di1) != 4 or len(di2) != 4 or len(di3) != 4 or len(di4) != 4:
    return False
    i1 = int.from_bytes(di1, byteorder="little")
    i2 = int.from_bytes(di2, byteorder="little")
    i3 = int.from_bytes(di3, byteorder="little")
    i4 = int.from_bytes(di4, byteorder="little")
    if i1 != i2 or i1 != i3 or i1 != i4:
    return False
    if i1 == 0:
    return False
    for i in range(i1):
    name_b = df.read(16)
    if len(name_b) != 16:
    return False
    offset_b = df.read(4)
    if len(offset_b) != 4:
    return False
    size_b = df.read(4)
    if len(size_b) != 4:
    return False;
    decompressed_size_b = df.read(4)
    if len(decompressed_size_b) != 4:
    return False
    always_zero_b = df.read(4)
    if len(always_zero_b) != 4:
    return False
    name = name_b.rstrip(b"\x00")
    offset = int.from_bytes(offset_b, byteorder="little")
    size = int.from_bytes(size_b, byteorder="little")
    decompressed_size = int.from_bytes(decompressed_size_b, byteorder="little")
    always_zero = int.from_bytes(always_zero_b, byteorder="little")
    files.append([name, offset, size, decompressed_size])
    for x in files:
    df.seek(x[1])
    d = df.read(x[2])
    if len(d) != x[2]:
    return False
    dbio = io.BytesIO(d)
    if x[3] != 0 and x[2] != x[3]:
    d = decompress_blocks_stream(dbio)
    dbio = io.BytesIO(d)
    name = x[0].decode("ASCII", errors="replace").replace("\uFFFD", "_")
    if not unpack_cclm_recursive(dbio, curstr + name + "_unpacked/"):
    fullpath = curstr + name
    fullpath_dirname = os.path.dirname(fullpath)
    os.makedirs(fullpath_dirname, exist_ok=True)
    with open(fullpath, "wb") as wf:
    wf.write(d)
    return True

    if __name__ == '__main__':
    import sys
    iso_path = sys.argv[1]
    cd = ISO9660(iso_path)
    out_path = sys.argv[2]
    d = cd.get_file("PSP_GAME/USRDIR/data.lst")
    df = io.BytesIO(d)
    size_own = int.from_bytes(df.read(4), byteorder="little")
    if len(d) != size_own:
    raise Exception("Incorrect size of data.lst")
    ext_list_d = df.read(0x400 - 4)
    ext_list = [ext_list_d[i:i + 4].rstrip(b"\x00").replace(b"\x82", b"_").replace(b"\x86", b"_") for i in range(0, len(ext_list_d), 4)]
    ext_list = [x for x in ext_list if x != b""]
    with open(sys.argv[1], "rb") as f:
    def list_cb(pathname, size, lba):
    f.seek(SECTOR_SIZE * lba)
    pathname_ascii = pathname.decode("ASCII")
    fullpath = out_path + "/" + pathname_ascii
    fullpath_dirname = os.path.dirname(fullpath)
    os.makedirs(fullpath_dirname, exist_ok=True)
    ds = f.read(size)
    dsbio = io.BytesIO(ds)
    if not unpack_cclm_recursive(dsbio, fullpath + "_unpacked/"):
    with open(fullpath, "wb") as wf:
    wf.write(ds)
    iterate_list(list_cb, df, ext_list, size_own)