Created
April 24, 2026 13:56
-
-
Save CherryDT/2b4b691e4d86a53bc6edea27cef36697 to your computer and use it in GitHub Desktop.
Script to patch converted Android HPROF dumps' strings to correctly display in Eclipse MAT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| hprof_android_string_fix.py | |
| By David Trapp (dt@david-trapp.com) | |
| Patch Android/ART HPROF dumps converted with hprof-conv so Eclipse MAT can | |
| display java.lang.String values that were stored in Android compact/packed | |
| form. | |
| This turns things like `\u616d\u6e69` back into e.g. `main`. | |
| This is intentionally conservative by default: | |
| - only arrays referenced from java.lang.String.value are considered | |
| - ambiguous shared backing arrays with non-zero offset are skipped | |
| - very large backing arrays are skipped unless --max-array-bytes is raised | |
| - char[] packed-byte repair is limited to ASCII-ish strings unless | |
| --aggressive-unicode is used | |
| Usage: | |
| python hprof_android_string_fix.py input.hprof output.hprof --dry-run | |
| python hprof_android_string_fix.py input.hprof output.hprof | |
| Keep the original dump. This script rewrites a copy. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import collections | |
| import dataclasses | |
| import os | |
| import re | |
| import shutil | |
| import struct | |
| import sys | |
| from typing import BinaryIO, Dict, Iterable, List, Optional, Tuple | |
| TAG_STRING_IN_UTF8 = 0x01 | |
| TAG_LOAD_CLASS = 0x02 | |
| TAG_HEAP_DUMP = 0x0C | |
| TAG_HEAP_DUMP_SEGMENT = 0x1C | |
| HPROF_CLASS_DUMP = 0x20 | |
| HPROF_INSTANCE_DUMP = 0x21 | |
| HPROF_OBJECT_ARRAY_DUMP = 0x22 | |
| HPROF_PRIMITIVE_ARRAY_DUMP = 0x23 | |
| TYPE_OBJECT = 2 | |
| TYPE_BOOLEAN = 4 | |
| TYPE_CHAR = 5 | |
| TYPE_FLOAT = 6 | |
| TYPE_DOUBLE = 7 | |
| TYPE_BYTE = 8 | |
| TYPE_SHORT = 9 | |
| TYPE_INT = 10 | |
| TYPE_LONG = 11 | |
| PRIMITIVE_ARRAY_NAMES = { | |
| TYPE_BOOLEAN: "boolean[]", | |
| TYPE_CHAR: "char[]", | |
| TYPE_FLOAT: "float[]", | |
| TYPE_DOUBLE: "double[]", | |
| TYPE_BYTE: "byte[]", | |
| TYPE_SHORT: "short[]", | |
| TYPE_INT: "int[]", | |
| TYPE_LONG: "long[]", | |
| } | |
| ROOT_SIZES_FIXED = { | |
| 0xFF: lambda id_size: id_size, # ROOT_UNKNOWN | |
| 0x01: lambda id_size: id_size * 2, # ROOT_JNI_GLOBAL | |
| 0x02: lambda id_size: id_size + 8, # ROOT_JNI_LOCAL | |
| 0x03: lambda id_size: id_size + 8, # ROOT_JAVA_FRAME | |
| 0x04: lambda id_size: id_size + 4, # ROOT_NATIVE_STACK | |
| 0x05: lambda id_size: id_size, # ROOT_STICKY_CLASS | |
| 0x06: lambda id_size: id_size + 4, # ROOT_THREAD_BLOCK | |
| 0x07: lambda id_size: id_size, # ROOT_MONITOR_USED | |
| 0x08: lambda id_size: id_size + 8, # ROOT_THREAD_OBJECT | |
| 0x89: lambda id_size: id_size, # ROOT_INTERNED_STRING | |
| 0x8A: lambda id_size: id_size, # ROOT_FINALIZING | |
| 0x8B: lambda id_size: id_size, # ROOT_DEBUGGER | |
| 0x8C: lambda id_size: id_size, # ROOT_REFERENCE_CLEANUP | |
| 0x8D: lambda id_size: id_size, # ROOT_VM_INTERNAL | |
| 0x8E: lambda id_size: id_size + 8, # ROOT_JNI_MONITOR | |
| 0x90: lambda id_size: id_size, # ROOT_UNREACHABLE | |
| 0xFE: lambda id_size: 4 + id_size, # HEAP_DUMP_INFO | |
| } | |
| @dataclasses.dataclass | |
| class Header: | |
| raw: bytes | |
| version: str | |
| id_size: int | |
| timestamp_hi: int | |
| timestamp_lo: int | |
| @dataclasses.dataclass | |
| class FieldInfo: | |
| name_id: int | |
| name: str | |
| type_code: int | |
| offset: int | |
| @dataclasses.dataclass | |
| class ClassInfo: | |
| class_id: int | |
| name: str | |
| super_id: int | |
| instance_size: int | |
| fields: List[FieldInfo] | |
| @dataclasses.dataclass | |
| class StringInstance: | |
| object_id: int | |
| value_id: int | |
| count_offset: Optional[int] | |
| offset_offset: Optional[int] | |
| coder_offset: Optional[int] | |
| count_value: Optional[int] | |
| offset_value: Optional[int] | |
| coder_value: Optional[int] | |
| data_len: int | |
| @dataclasses.dataclass | |
| class ArrayPatch: | |
| array_id: int | |
| old_type: int | |
| old_count: int | |
| new_type: int | |
| new_count: int | |
| new_payload: bytes | |
| decoded_preview: str | |
| reason: str | |
| @dataclasses.dataclass | |
| class InstancePatch: | |
| object_id: int | |
| replacements: Dict[int, bytes] | |
| @dataclasses.dataclass | |
| class Model: | |
| strings: Dict[int, str] | |
| class_name_string_ids: Dict[int, int] | |
| classes: Dict[int, ClassInfo] | |
| string_class_id: Optional[int] | |
| @dataclasses.dataclass | |
| class Plans: | |
| array_patches: Dict[int, ArrayPatch] | |
| instance_patches: Dict[int, InstancePatch] | |
| heap_deltas: Dict[int, int] | |
| skipped: collections.Counter | |
| stats: collections.Counter | |
| class HprofError(Exception): | |
| pass | |
| def read_exact(f: BinaryIO, n: int) -> bytes: | |
| b = f.read(n) | |
| if len(b) != n: | |
| raise HprofError(f"Unexpected EOF while reading {n} bytes") | |
| return b | |
| def copy_exact(src: BinaryIO, dst: BinaryIO, n: int, chunk_size: int = 1024 * 1024) -> None: | |
| remaining = n | |
| while remaining: | |
| chunk = src.read(min(chunk_size, remaining)) | |
| if not chunk: | |
| raise HprofError("Unexpected EOF while copying") | |
| dst.write(chunk) | |
| remaining -= len(chunk) | |
| def unpack_u1(b: bytes) -> int: | |
| return b[0] | |
| def unpack_u2(b: bytes) -> int: | |
| return struct.unpack(">H", b)[0] | |
| def unpack_u4(b: bytes) -> int: | |
| return struct.unpack(">I", b)[0] | |
| def unpack_i4(b: bytes) -> int: | |
| return struct.unpack(">i", b)[0] | |
| def pack_u1(v: int) -> bytes: | |
| return bytes([v & 0xFF]) | |
| def pack_u2(v: int) -> bytes: | |
| return struct.pack(">H", v & 0xFFFF) | |
| def pack_u4(v: int) -> bytes: | |
| return struct.pack(">I", v & 0xFFFFFFFF) | |
| def pack_i4(v: int) -> bytes: | |
| return struct.pack(">i", int(v)) | |
| def read_u1(f: BinaryIO) -> int: | |
| return unpack_u1(read_exact(f, 1)) | |
| def read_u2(f: BinaryIO) -> int: | |
| return unpack_u2(read_exact(f, 2)) | |
| def read_u4(f: BinaryIO) -> int: | |
| return unpack_u4(read_exact(f, 4)) | |
| def read_id(f: BinaryIO, id_size: int) -> int: | |
| return int.from_bytes(read_exact(f, id_size), "big", signed=False) | |
| def pack_id(v: int, id_size: int) -> bytes: | |
| return int(v).to_bytes(id_size, "big", signed=False) | |
| def value_size(type_code: int, id_size: int) -> int: | |
| if type_code == TYPE_OBJECT: | |
| return id_size | |
| if type_code in (TYPE_BOOLEAN, TYPE_BYTE): | |
| return 1 | |
| if type_code in (TYPE_CHAR, TYPE_SHORT): | |
| return 2 | |
| if type_code in (TYPE_FLOAT, TYPE_INT): | |
| return 4 | |
| if type_code in (TYPE_DOUBLE, TYPE_LONG): | |
| return 8 | |
| raise HprofError(f"Unknown HPROF value type {type_code}") | |
| def primitive_size(type_code: int) -> int: | |
| if type_code in (TYPE_BOOLEAN, TYPE_BYTE): | |
| return 1 | |
| if type_code in (TYPE_CHAR, TYPE_SHORT): | |
| return 2 | |
| if type_code in (TYPE_FLOAT, TYPE_INT): | |
| return 4 | |
| if type_code in (TYPE_DOUBLE, TYPE_LONG): | |
| return 8 | |
| raise HprofError(f"Unknown primitive array type {type_code}") | |
| def read_header(f: BinaryIO) -> Header: | |
| raw = bytearray() | |
| while True: | |
| c = f.read(1) | |
| if not c: | |
| raise HprofError("Missing HPROF header terminator") | |
| raw += c | |
| if c == b"\0": | |
| break | |
| version = raw[:-1].decode("ascii", errors="replace") | |
| rest = read_exact(f, 12) | |
| raw += rest | |
| id_size, ts_hi, ts_lo = struct.unpack(">III", rest) | |
| if id_size not in (4, 8): | |
| raise HprofError(f"Unsupported object id size: {id_size}") | |
| return Header(bytes(raw), version, id_size, ts_hi, ts_lo) | |
| def read_value_from_data(data: bytes, off: int, type_code: int, id_size: int) -> int: | |
| size = value_size(type_code, id_size) | |
| part = data[off:off + size] | |
| if len(part) != size: | |
| raise HprofError("Instance data is shorter than expected") | |
| if type_code == TYPE_OBJECT: | |
| return int.from_bytes(part, "big", signed=False) | |
| if type_code == TYPE_BOOLEAN: | |
| return 1 if part[0] else 0 | |
| if type_code == TYPE_BYTE: | |
| return part[0] | |
| if type_code == TYPE_CHAR: | |
| return unpack_u2(part) | |
| if type_code == TYPE_SHORT: | |
| return struct.unpack(">h", part)[0] | |
| if type_code == TYPE_INT: | |
| return unpack_i4(part) | |
| if type_code == TYPE_LONG: | |
| return struct.unpack(">q", part)[0] | |
| if type_code == TYPE_FLOAT: | |
| return unpack_u4(part) | |
| if type_code == TYPE_DOUBLE: | |
| return int.from_bytes(part, "big", signed=False) | |
| raise HprofError(f"Unknown value type {type_code}") | |
| def normalize_class_name(name: str) -> str: | |
| return name.replace("/", ".") | |
| def safe_decode_utf8(b: bytes) -> str: | |
| return b.decode("utf-8", errors="replace") | |
| def skip_class_dump_after_tag(f: BinaryIO, id_size: int) -> None: | |
| read_id(f, id_size) # class id | |
| f.seek(4, os.SEEK_CUR) # stack trace serial | |
| for _ in range(6): | |
| read_id(f, id_size) # super, loader, signers, pd, reserved1, reserved2 | |
| f.seek(4, os.SEEK_CUR) # instance size | |
| cp_count = read_u2(f) | |
| for _ in range(cp_count): | |
| f.seek(2, os.SEEK_CUR) | |
| t = read_u1(f) | |
| f.seek(value_size(t, id_size), os.SEEK_CUR) | |
| static_count = read_u2(f) | |
| for _ in range(static_count): | |
| read_id(f, id_size) | |
| t = read_u1(f) | |
| f.seek(value_size(t, id_size), os.SEEK_CUR) | |
| inst_count = read_u2(f) | |
| f.seek(inst_count * (id_size + 1), os.SEEK_CUR) | |
| def parse_class_dump_after_tag(f: BinaryIO, id_size: int, strings: Dict[int, str]) -> ClassInfo: | |
| class_id = read_id(f, id_size) | |
| f.seek(4, os.SEEK_CUR) # stack trace serial | |
| super_id = read_id(f, id_size) | |
| read_id(f, id_size) # class loader | |
| read_id(f, id_size) # signers | |
| read_id(f, id_size) # protection domain | |
| read_id(f, id_size) # reserved | |
| read_id(f, id_size) # reserved | |
| instance_size = read_u4(f) | |
| cp_count = read_u2(f) | |
| for _ in range(cp_count): | |
| f.seek(2, os.SEEK_CUR) | |
| t = read_u1(f) | |
| f.seek(value_size(t, id_size), os.SEEK_CUR) | |
| static_count = read_u2(f) | |
| for _ in range(static_count): | |
| read_id(f, id_size) | |
| t = read_u1(f) | |
| f.seek(value_size(t, id_size), os.SEEK_CUR) | |
| raw_fields: List[Tuple[int, int]] = [] | |
| inst_count = read_u2(f) | |
| for _ in range(inst_count): | |
| name_id = read_id(f, id_size) | |
| type_code = read_u1(f) | |
| raw_fields.append((name_id, type_code)) | |
| off = 0 | |
| fields: List[FieldInfo] = [] | |
| for name_id, type_code in raw_fields: | |
| name = strings.get(name_id, f"<string:{name_id:x}>") | |
| fields.append(FieldInfo(name_id=name_id, name=name, type_code=type_code, offset=off)) | |
| off += value_size(type_code, id_size) | |
| return ClassInfo( | |
| class_id=class_id, | |
| name="", | |
| super_id=super_id, | |
| instance_size=instance_size, | |
| fields=fields, | |
| ) | |
| def skip_heap_subrecord_after_tag(f: BinaryIO, tag: int, id_size: int) -> None: | |
| if tag in ROOT_SIZES_FIXED: | |
| f.seek(ROOT_SIZES_FIXED[tag](id_size), os.SEEK_CUR) | |
| return | |
| if tag == HPROF_CLASS_DUMP: | |
| skip_class_dump_after_tag(f, id_size) | |
| return | |
| if tag == HPROF_INSTANCE_DUMP: | |
| read_id(f, id_size) | |
| f.seek(4, os.SEEK_CUR) | |
| read_id(f, id_size) | |
| data_len = read_u4(f) | |
| f.seek(data_len, os.SEEK_CUR) | |
| return | |
| if tag == HPROF_OBJECT_ARRAY_DUMP: | |
| read_id(f, id_size) | |
| f.seek(4, os.SEEK_CUR) | |
| count = read_u4(f) | |
| read_id(f, id_size) | |
| f.seek(count * id_size, os.SEEK_CUR) | |
| return | |
| if tag == HPROF_PRIMITIVE_ARRAY_DUMP: | |
| read_id(f, id_size) | |
| f.seek(4, os.SEEK_CUR) | |
| count = read_u4(f) | |
| type_code = read_u1(f) | |
| f.seek(count * primitive_size(type_code), os.SEEK_CUR) | |
| return | |
| raise HprofError(f"Unknown heap sub-record tag 0x{tag:02x} at file offset 0x{f.tell() - 1:x}") | |
| def iter_top_records(f: BinaryIO) -> Iterable[Tuple[int, int, int, int, int]]: | |
| while True: | |
| record_start = f.tell() | |
| hdr = f.read(9) | |
| if not hdr: | |
| return | |
| if len(hdr) != 9: | |
| raise HprofError("Truncated top-level HPROF record header") | |
| tag, time_delta, length = struct.unpack(">BII", hdr) | |
| data_start = f.tell() | |
| yield record_start, tag, time_delta, length, data_start | |
| f.seek(data_start + length) | |
| def walk_heap_record(f: BinaryIO, data_start: int, length: int, id_size: int, handler) -> None: | |
| end = data_start + length | |
| while f.tell() < end: | |
| sub_start = f.tell() | |
| tag = read_u1(f) | |
| handler(sub_start, tag, end) | |
| if f.tell() != end: | |
| raise HprofError(f"Heap record overrun: at 0x{f.tell():x}, expected 0x{end:x}") | |
| def pass1_model(path: str, verbose: bool = False) -> Tuple[Header, Model]: | |
| strings: Dict[int, str] = {} | |
| class_name_string_ids: Dict[int, int] = {} | |
| classes: Dict[int, ClassInfo] = {} | |
| with open(path, "rb") as f: | |
| header = read_header(f) | |
| for record_start, tag, _time_delta, length, data_start in iter_top_records(f): | |
| data_end = data_start + length | |
| if tag == TAG_STRING_IN_UTF8: | |
| sid = read_id(f, header.id_size) | |
| raw = read_exact(f, length - header.id_size) | |
| strings[sid] = safe_decode_utf8(raw) | |
| elif tag == TAG_LOAD_CLASS: | |
| f.seek(4, os.SEEK_CUR) # class serial number | |
| class_id = read_id(f, header.id_size) | |
| f.seek(4, os.SEEK_CUR) # stack trace serial number | |
| name_id = read_id(f, header.id_size) | |
| class_name_string_ids[class_id] = name_id | |
| elif tag in (TAG_HEAP_DUMP, TAG_HEAP_DUMP_SEGMENT): | |
| def handler(sub_start: int, sub_tag: int, end: int) -> None: | |
| if sub_tag == HPROF_CLASS_DUMP: | |
| ci = parse_class_dump_after_tag(f, header.id_size, strings) | |
| classes[ci.class_id] = ci | |
| else: | |
| skip_heap_subrecord_after_tag(f, sub_tag, header.id_size) | |
| walk_heap_record(f, data_start, length, header.id_size, handler) | |
| f.seek(data_end) | |
| for class_id, ci in classes.items(): | |
| name_id = class_name_string_ids.get(class_id) | |
| if name_id is not None: | |
| ci.name = strings.get(name_id, "") | |
| else: | |
| ci.name = "" | |
| string_class_id = None | |
| for class_id, ci in classes.items(): | |
| if normalize_class_name(ci.name) == "java.lang.String": | |
| string_class_id = class_id | |
| break | |
| if verbose: | |
| print(f"HPROF version: {header.version}, id size: {header.id_size}") | |
| print(f"Loaded {len(strings):,} UTF8 strings, {len(classes):,} classes") | |
| if string_class_id is None: | |
| print("java.lang.String class was not found") | |
| else: | |
| ci = classes[string_class_id] | |
| print("java.lang.String fields:") | |
| for field in ci.fields: | |
| print(f" offset={field.offset:3d} type={field.type_code:2d} name={field.name}") | |
| return header, Model( | |
| strings=strings, | |
| class_name_string_ids=class_name_string_ids, | |
| classes=classes, | |
| string_class_id=string_class_id, | |
| ) | |
| def pass2_string_instances(path: str, header: Header, model: Model, verbose: bool = False) -> Tuple[Dict[int, StringInstance], Dict[int, List[int]], collections.Counter]: | |
| if model.string_class_id is None: | |
| raise HprofError("Cannot find java.lang.String in this HPROF") | |
| string_class = model.classes[model.string_class_id] | |
| value_field = next((f for f in string_class.fields if f.name == "value" and f.type_code == TYPE_OBJECT), None) | |
| if value_field is None: | |
| raise HprofError("java.lang.String.value field was not found") | |
| count_field = next((f for f in string_class.fields if f.name in ("count", "length") and f.type_code == TYPE_INT), None) | |
| offset_field = next((f for f in string_class.fields if f.name == "offset" and f.type_code == TYPE_INT), None) | |
| coder_field = next((f for f in string_class.fields if f.name == "coder" and f.type_code == TYPE_BYTE), None) | |
| by_object: Dict[int, StringInstance] = {} | |
| by_value: Dict[int, List[int]] = collections.defaultdict(list) | |
| stats = collections.Counter() | |
| with open(path, "rb") as f: | |
| read_header(f) | |
| for record_start, tag, _time_delta, length, data_start in iter_top_records(f): | |
| data_end = data_start + length | |
| if tag in (TAG_HEAP_DUMP, TAG_HEAP_DUMP_SEGMENT): | |
| def handler(sub_start: int, sub_tag: int, end: int) -> None: | |
| if sub_tag != HPROF_INSTANCE_DUMP: | |
| skip_heap_subrecord_after_tag(f, sub_tag, header.id_size) | |
| return | |
| object_id = read_id(f, header.id_size) | |
| f.seek(4, os.SEEK_CUR) | |
| class_id = read_id(f, header.id_size) | |
| data_len = read_u4(f) | |
| data = read_exact(f, data_len) | |
| if class_id != model.string_class_id: | |
| return | |
| value_id = read_value_from_data(data, value_field.offset, TYPE_OBJECT, header.id_size) | |
| if value_id == 0: | |
| stats["string_null_value"] += 1 | |
| return | |
| count_value = None | |
| offset_value = None | |
| coder_value = None | |
| if count_field is not None: | |
| count_value = read_value_from_data(data, count_field.offset, TYPE_INT, header.id_size) | |
| if offset_field is not None: | |
| offset_value = read_value_from_data(data, offset_field.offset, TYPE_INT, header.id_size) | |
| if coder_field is not None: | |
| coder_value = read_value_from_data(data, coder_field.offset, TYPE_BYTE, header.id_size) | |
| si = StringInstance( | |
| object_id=object_id, | |
| value_id=value_id, | |
| count_offset=count_field.offset if count_field is not None else None, | |
| offset_offset=offset_field.offset if offset_field is not None else None, | |
| coder_offset=coder_field.offset if coder_field is not None else None, | |
| count_value=count_value, | |
| offset_value=offset_value, | |
| coder_value=coder_value, | |
| data_len=data_len, | |
| ) | |
| by_object[object_id] = si | |
| by_value[value_id].append(object_id) | |
| stats["string_instances"] += 1 | |
| walk_heap_record(f, data_start, length, header.id_size, handler) | |
| f.seek(data_end) | |
| if verbose: | |
| print(f"Found {len(by_object):,} java.lang.String instances") | |
| print(f"Found {len(by_value):,} distinct String.value backing arrays") | |
| return by_object, by_value, stats | |
| ASCIIISH_RE = re.compile(r"^[A-Za-z0-9_.$/@:;,+\- #()[\]{}<>\\|?!%&=*'\"~\r\n\t]*$") | |
| def utf16_code_units_count(text: str) -> int: | |
| return len(text.encode("utf-16-be", errors="surrogatepass")) // 2 | |
| def encode_as_hprof_char_array(text: str) -> bytes: | |
| return text.encode("utf-16-be", errors="surrogatepass") | |
| def mostly_printable(text: str) -> bool: | |
| if text == "": | |
| return True | |
| bad = 0 | |
| for ch in text: | |
| o = ord(ch) | |
| if ch in "\r\n\t": | |
| continue | |
| if o >= 0x20 and not (0xD800 <= o <= 0xDFFF): | |
| continue | |
| bad += 1 | |
| return bad / max(len(text), 1) <= 0.02 | |
| def is_asciiish(text: str) -> bool: | |
| if text == "": | |
| return True | |
| if not ASCIIISH_RE.match(text): | |
| return False | |
| useful = sum(1 for ch in text if ch.isalnum() or ch in " _.$/@:-") | |
| return useful / max(len(text), 1) >= 0.60 | |
| def decode_bytes_best(data: bytes, prefer_utf8: bool = False) -> Tuple[str, str]: | |
| if prefer_utf8: | |
| try: | |
| text = data.decode("utf-8") | |
| return text, "utf-8" | |
| except UnicodeDecodeError: | |
| pass | |
| return data.decode("latin-1"), "latin-1" | |
| def score_utf16_candidate(text: str) -> Tuple[int, int, int]: | |
| printable = sum(1 for ch in text if ch in "\r\n\t" or (ord(ch) >= 0x20 and not (0xD800 <= ord(ch) <= 0xDFFF))) | |
| asciiish = sum(1 for ch in text if ch.isascii() and (ch.isalnum() or ch in " _.$/@:-")) | |
| nuls = text.count("\x00") | |
| return printable, asciiish, -nuls | |
| def decode_utf16_bytes_best(data: bytes) -> Tuple[Optional[str], str]: | |
| if len(data) % 2: | |
| return None, "odd-utf16-length" | |
| candidates = [] | |
| for enc in ("utf-16-le", "utf-16-be"): | |
| try: | |
| text = data.decode(enc, errors="surrogatepass") | |
| candidates.append((score_utf16_candidate(text), text, enc)) | |
| except UnicodeDecodeError: | |
| pass | |
| if not candidates: | |
| return None, "invalid-utf16" | |
| candidates.sort(reverse=True, key=lambda x: x[0]) | |
| return candidates[0][1], candidates[0][2] | |
| def decode_packed_char_payload(data: bytes, aggressive_unicode: bool) -> Tuple[Optional[str], str]: | |
| if len(data) % 2: | |
| return None, "odd-char-array-length" | |
| raw = bytearray() | |
| high_units = 0 | |
| ascii_pair_units = 0 | |
| for i in range(0, len(data), 2): | |
| v = (data[i] << 8) | data[i + 1] | |
| lo = v & 0xFF | |
| hi = (v >> 8) & 0xFF | |
| if v > 0x00FF: | |
| high_units += 1 | |
| if 0x20 <= lo <= 0x7E and (hi == 0 or 0x20 <= hi <= 0x7E): | |
| ascii_pair_units += 1 | |
| raw.append(lo) | |
| if hi != 0: | |
| raw.append(hi) | |
| if not raw: | |
| return "", "packed-empty" | |
| text, enc = decode_bytes_best(bytes(raw), prefer_utf8=True) | |
| # Conservative default: only patch obvious ASCII-ish packed strings. | |
| # This avoids corrupting legitimate non-ASCII UTF-16 char[] values such as Chinese text. | |
| if not aggressive_unicode: | |
| units = max(len(data) // 2, 1) | |
| if high_units / units < 0.60: | |
| return None, "char-array-not-obviously-packed" | |
| if ascii_pair_units / units < 0.80: | |
| return None, "packed-bytes-not-asciiish" | |
| if not is_asciiish(text): | |
| return None, "decoded-text-not-asciiish" | |
| if not mostly_printable(text): | |
| return None, "decoded-text-not-printable" | |
| return text, f"packed-char-array/{enc}" | |
| def decode_string_value_array( | |
| array_type: int, | |
| count: int, | |
| payload: bytes, | |
| string_instances: List[StringInstance], | |
| aggressive_unicode: bool, | |
| force_byte_strings: bool, | |
| ) -> Tuple[Optional[str], str]: | |
| coder_values = {si.coder_value for si in string_instances if si.coder_value is not None} | |
| coder = next(iter(coder_values)) if len(coder_values) == 1 else None | |
| if array_type == TYPE_BYTE: | |
| if coder == 1: | |
| text, enc = decode_utf16_bytes_best(payload) | |
| if text is None: | |
| return None, enc | |
| if not aggressive_unicode and not mostly_printable(text): | |
| return None, "utf16-byte-string-not-printable" | |
| return text, f"byte-array/coder=1/{enc}" | |
| # Android/JDK compact strings with coder 0 are Latin-1, not UTF-8. | |
| text = payload.decode("latin-1") | |
| if not force_byte_strings and not aggressive_unicode: | |
| # Still conservative: skip very binary-looking strings. | |
| if not mostly_printable(text): | |
| return None, "latin1-byte-string-not-printable" | |
| return text, "byte-array/coder=0-or-unknown/latin-1" | |
| if array_type == TYPE_CHAR: | |
| return decode_packed_char_payload(payload, aggressive_unicode=aggressive_unicode) | |
| return None, f"unsupported-array-type-{array_type}" | |
| def build_plans( | |
| path: str, | |
| header: Header, | |
| model: Model, | |
| by_object: Dict[int, StringInstance], | |
| by_value: Dict[int, List[int]], | |
| max_array_bytes: int, | |
| aggressive: bool, | |
| aggressive_unicode: bool, | |
| force_byte_strings: bool, | |
| set_coder_utf16: bool, | |
| verbose: bool = False, | |
| ) -> Plans: | |
| array_patches: Dict[int, ArrayPatch] = {} | |
| instance_patches: Dict[int, InstancePatch] = {} | |
| heap_deltas: Dict[int, int] = collections.defaultdict(int) | |
| skipped = collections.Counter() | |
| stats = collections.Counter() | |
| with open(path, "rb") as f: | |
| read_header(f) | |
| for record_start, tag, _time_delta, length, data_start in iter_top_records(f): | |
| data_end = data_start + length | |
| if tag in (TAG_HEAP_DUMP, TAG_HEAP_DUMP_SEGMENT): | |
| def handler(sub_start: int, sub_tag: int, end: int) -> None: | |
| if sub_tag != HPROF_PRIMITIVE_ARRAY_DUMP: | |
| skip_heap_subrecord_after_tag(f, sub_tag, header.id_size) | |
| return | |
| array_id = read_id(f, header.id_size) | |
| f.seek(4, os.SEEK_CUR) | |
| count = read_u4(f) | |
| array_type = read_u1(f) | |
| elem_size = primitive_size(array_type) | |
| payload_len = count * elem_size | |
| if array_id not in by_value: | |
| f.seek(payload_len, os.SEEK_CUR) | |
| return | |
| if array_type not in (TYPE_BYTE, TYPE_CHAR): | |
| skipped["string_value_not_byte_or_char_array"] += 1 | |
| f.seek(payload_len, os.SEEK_CUR) | |
| return | |
| if payload_len > max_array_bytes: | |
| skipped["array_too_large"] += 1 | |
| f.seek(payload_len, os.SEEK_CUR) | |
| return | |
| payload = read_exact(f, payload_len) | |
| string_objs = by_value[array_id] | |
| sis = [by_object[obj_id] for obj_id in string_objs] | |
| offsets = {si.offset_value for si in sis if si.offset_value is not None} | |
| counts = {si.count_value for si in sis if si.count_value is not None} | |
| if not aggressive: | |
| if any(si.offset_value not in (None, 0) for si in sis): | |
| skipped["nonzero_offset"] += 1 | |
| return | |
| if len(string_objs) > 1 and (len(offsets) > 1 or len(counts) > 1): | |
| skipped["shared_backing_array_ambiguous"] += 1 | |
| return | |
| decoded, reason = decode_string_value_array( | |
| array_type=array_type, | |
| count=count, | |
| payload=payload, | |
| string_instances=sis, | |
| aggressive_unicode=aggressive_unicode, | |
| force_byte_strings=force_byte_strings, | |
| ) | |
| if decoded is None: | |
| skipped[reason] += 1 | |
| return | |
| new_payload = encode_as_hprof_char_array(decoded) | |
| new_count = len(new_payload) // 2 | |
| if array_type == TYPE_CHAR and new_payload == payload and new_count == count: | |
| skipped["no_change_needed"] += 1 | |
| return | |
| old_sub_len = 1 + header.id_size + 4 + 4 + 1 + payload_len | |
| new_sub_len = 1 + header.id_size + 4 + 4 + 1 + len(new_payload) | |
| delta = new_sub_len - old_sub_len | |
| array_patches[array_id] = ArrayPatch( | |
| array_id=array_id, | |
| old_type=array_type, | |
| old_count=count, | |
| new_type=TYPE_CHAR, | |
| new_count=new_count, | |
| new_payload=new_payload, | |
| decoded_preview=decoded[:120], | |
| reason=reason, | |
| ) | |
| heap_deltas[record_start] += delta | |
| for si in sis: | |
| replacements: Dict[int, bytes] = {} | |
| if si.count_offset is not None: | |
| replacements[si.count_offset] = pack_i4(new_count) | |
| if si.offset_offset is not None: | |
| replacements[si.offset_offset] = pack_i4(0) | |
| if set_coder_utf16 and si.coder_offset is not None: | |
| replacements[si.coder_offset] = pack_u1(1) | |
| if replacements: | |
| instance_patches[si.object_id] = InstancePatch( | |
| object_id=si.object_id, | |
| replacements=replacements, | |
| ) | |
| stats["arrays_patched"] += 1 | |
| stats["strings_affected"] += len(sis) | |
| if array_type == TYPE_BYTE: | |
| stats["byte_arrays_patched"] += 1 | |
| elif array_type == TYPE_CHAR: | |
| stats["char_arrays_patched"] += 1 | |
| walk_heap_record(f, data_start, length, header.id_size, handler) | |
| f.seek(data_end) | |
| if verbose: | |
| print(f"Planned array patches: {len(array_patches):,}") | |
| print(f"Planned string instance patches: {len(instance_patches):,}") | |
| if skipped: | |
| print("Skipped:") | |
| for k, v in skipped.most_common(): | |
| print(f" {k}: {v:,}") | |
| if array_patches: | |
| print("Examples:") | |
| for p in list(array_patches.values())[:10]: | |
| old_name = PRIMITIVE_ARRAY_NAMES.get(p.old_type, str(p.old_type)) | |
| print(f" 0x{p.array_id:x}: {old_name}[{p.old_count}] -> char[{p.new_count}] " | |
| f"({p.reason}) {p.decoded_preview!r}") | |
| return Plans( | |
| array_patches=array_patches, | |
| instance_patches=instance_patches, | |
| heap_deltas=dict(heap_deltas), | |
| skipped=skipped, | |
| stats=stats, | |
| ) | |
| def patch_instance_data(data: bytes, patch: InstancePatch) -> bytes: | |
| out = bytearray(data) | |
| for off, replacement in patch.replacements.items(): | |
| if off < 0 or off + len(replacement) > len(out): | |
| raise HprofError(f"Instance patch for object 0x{patch.object_id:x} is out of bounds") | |
| out[off:off + len(replacement)] = replacement | |
| return bytes(out) | |
| def write_patched_heap_subrecord( | |
| inp: BinaryIO, | |
| out: BinaryIO, | |
| sub_tag: int, | |
| header: Header, | |
| plans: Plans, | |
| ) -> None: | |
| sub_start = inp.tell() - 1 | |
| if sub_tag == HPROF_PRIMITIVE_ARRAY_DUMP: | |
| array_id = read_id(inp, header.id_size) | |
| stack_serial = read_u4(inp) | |
| count = read_u4(inp) | |
| array_type = read_u1(inp) | |
| payload_len = count * primitive_size(array_type) | |
| patch = plans.array_patches.get(array_id) | |
| if patch is None: | |
| inp.seek(sub_start) | |
| copy_exact(inp, out, 1 + header.id_size + 4 + 4 + 1 + payload_len) | |
| return | |
| inp.seek(payload_len, os.SEEK_CUR) | |
| out.write(pack_u1(HPROF_PRIMITIVE_ARRAY_DUMP)) | |
| out.write(pack_id(array_id, header.id_size)) | |
| out.write(pack_u4(stack_serial)) | |
| out.write(pack_u4(patch.new_count)) | |
| out.write(pack_u1(patch.new_type)) | |
| out.write(patch.new_payload) | |
| return | |
| if sub_tag == HPROF_INSTANCE_DUMP: | |
| object_id = read_id(inp, header.id_size) | |
| stack_serial = read_u4(inp) | |
| class_id = read_id(inp, header.id_size) | |
| data_len = read_u4(inp) | |
| data = read_exact(inp, data_len) | |
| patch = plans.instance_patches.get(object_id) | |
| if patch is None: | |
| inp.seek(sub_start) | |
| copy_exact(inp, out, 1 + header.id_size + 4 + header.id_size + 4 + data_len) | |
| return | |
| patched_data = patch_instance_data(data, patch) | |
| out.write(pack_u1(HPROF_INSTANCE_DUMP)) | |
| out.write(pack_id(object_id, header.id_size)) | |
| out.write(pack_u4(stack_serial)) | |
| out.write(pack_id(class_id, header.id_size)) | |
| out.write(pack_u4(len(patched_data))) | |
| out.write(patched_data) | |
| return | |
| # Non-patched subrecord: parse once to know its original byte length, then copy. | |
| skip_heap_subrecord_after_tag(inp, sub_tag, header.id_size) | |
| sub_end = inp.tell() | |
| inp.seek(sub_start) | |
| copy_exact(inp, out, sub_end - sub_start) | |
| def write_output(path_in: str, path_out: str, header: Header, plans: Plans, verbose: bool = False) -> None: | |
| tmp_out = path_out + ".tmp" | |
| with open(path_in, "rb") as inp, open(tmp_out, "wb") as out: | |
| actual_header = read_header(inp) | |
| if actual_header.raw != header.raw: | |
| raise HprofError("Header changed between passes") | |
| out.write(header.raw) | |
| for record_start, tag, time_delta, length, data_start in iter_top_records(inp): | |
| data_end = data_start + length | |
| if tag not in (TAG_HEAP_DUMP, TAG_HEAP_DUMP_SEGMENT): | |
| out.write(struct.pack(">BII", tag, time_delta, length)) | |
| copy_exact(inp, out, length) | |
| inp.seek(data_end) | |
| continue | |
| new_length = length + plans.heap_deltas.get(record_start, 0) | |
| if new_length < 0 or new_length > 0xFFFFFFFF: | |
| raise HprofError(f"Patched heap record at 0x{record_start:x} has invalid length {new_length}") | |
| out.write(struct.pack(">BII", tag, time_delta, new_length)) | |
| while inp.tell() < data_end: | |
| sub_tag = read_u1(inp) | |
| write_patched_heap_subrecord(inp, out, sub_tag, header, plans) | |
| if inp.tell() != data_end: | |
| raise HprofError(f"Heap output pass overrun: at 0x{inp.tell():x}, expected 0x{data_end:x}") | |
| out.flush() | |
| os.replace(tmp_out, path_out) | |
| if verbose: | |
| print(f"Wrote {path_out}") | |
| def main(argv: Optional[List[str]] = None) -> int: | |
| ap = argparse.ArgumentParser( | |
| description="Patch Android/ART compact/packed java.lang.String values in a converted HPROF so MAT can display them.", | |
| ) | |
| ap.add_argument("input", help="Input HPROF, normally the hprof-conv output") | |
| ap.add_argument("output", help="Output patched HPROF") | |
| ap.add_argument("--dry-run", action="store_true", help="Analyze and show planned changes without writing output") | |
| ap.add_argument("--verbose", "-v", action="store_true", help="Print parser details and patch examples") | |
| ap.add_argument("--max-array-bytes", type=int, default=1024 * 1024, help="Skip String.value arrays larger than this many bytes/chars payload; default: 1048576") | |
| ap.add_argument("--aggressive", action="store_true", help="Patch ambiguous shared String backing arrays and non-zero offsets") | |
| ap.add_argument("--aggressive-unicode", action="store_true", help="Try to repair non-ASCII packed char[] strings too; higher false-positive risk") | |
| ap.add_argument("--force-byte-strings", action="store_true", help="Patch byte[] String.value arrays even if they are not very printable") | |
| ap.add_argument("--no-set-coder-utf16", action="store_true", help="Do not set java.lang.String.coder to UTF16 when a coder field exists") | |
| args = ap.parse_args(argv) | |
| if not os.path.exists(args.input): | |
| print(f"Input does not exist: {args.input}", file=sys.stderr) | |
| return 2 | |
| if os.path.abspath(args.input) == os.path.abspath(args.output): | |
| print("Refusing to overwrite input in-place. Use a separate output file.", file=sys.stderr) | |
| return 2 | |
| try: | |
| header, model = pass1_model(args.input, verbose=args.verbose) | |
| by_object, by_value, stats2 = pass2_string_instances(args.input, header, model, verbose=args.verbose) | |
| plans = build_plans( | |
| path=args.input, | |
| header=header, | |
| model=model, | |
| by_object=by_object, | |
| by_value=by_value, | |
| max_array_bytes=args.max_array_bytes, | |
| aggressive=args.aggressive, | |
| aggressive_unicode=args.aggressive_unicode, | |
| force_byte_strings=args.force_byte_strings, | |
| set_coder_utf16=not args.no_set_coder_utf16, | |
| verbose=args.verbose or args.dry_run, | |
| ) | |
| print("Summary:") | |
| print(f" java.lang.String instances: {stats2.get('string_instances', 0):,}") | |
| print(f" distinct backing arrays: {len(by_value):,}") | |
| print(f" arrays to patch: {len(plans.array_patches):,}") | |
| print(f" strings affected: {plans.stats.get('strings_affected', 0):,}") | |
| print(f" heap records resized: {len(plans.heap_deltas):,}") | |
| if args.dry_run: | |
| print("Dry run only; no output written.") | |
| return 0 | |
| if not plans.array_patches and not plans.instance_patches: | |
| print("No patches planned. Writing a byte-for-byte copy of the input.") | |
| shutil.copyfile(args.input, args.output) | |
| return 0 | |
| write_output(args.input, args.output, header, plans, verbose=True) | |
| return 0 | |
| except Exception as e: | |
| print(f"ERROR: {e}", file=sys.stderr) | |
| return 1 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment