Skip to content

Instantly share code, notes, and snippets.

@CherryDT
Created April 24, 2026 13:56
Show Gist options
  • Select an option

  • Save CherryDT/2b4b691e4d86a53bc6edea27cef36697 to your computer and use it in GitHub Desktop.

Select an option

Save CherryDT/2b4b691e4d86a53bc6edea27cef36697 to your computer and use it in GitHub Desktop.
Script to patch converted Android HPROF dumps' strings to correctly display in Eclipse MAT
#!/usr/bin/env python3
"""
hprof_android_string_fix.py
By David Trapp (dt@david-trapp.com)
Patch Android/ART HPROF dumps converted with hprof-conv so Eclipse MAT can
display java.lang.String values that were stored in Android compact/packed
form.
This turns things like `\u616d\u6e69` back into e.g. `main`.
This is intentionally conservative by default:
- only arrays referenced from java.lang.String.value are considered
- ambiguous shared backing arrays with non-zero offset are skipped
- very large backing arrays are skipped unless --max-array-bytes is raised
- char[] packed-byte repair is limited to ASCII-ish strings unless
--aggressive-unicode is used
Usage:
python hprof_android_string_fix.py input.hprof output.hprof --dry-run
python hprof_android_string_fix.py input.hprof output.hprof
Keep the original dump. This script rewrites a copy.
"""
from __future__ import annotations
import argparse
import collections
import dataclasses
import os
import re
import shutil
import struct
import sys
from typing import BinaryIO, Dict, Iterable, List, Optional, Tuple
TAG_STRING_IN_UTF8 = 0x01
TAG_LOAD_CLASS = 0x02
TAG_HEAP_DUMP = 0x0C
TAG_HEAP_DUMP_SEGMENT = 0x1C
HPROF_CLASS_DUMP = 0x20
HPROF_INSTANCE_DUMP = 0x21
HPROF_OBJECT_ARRAY_DUMP = 0x22
HPROF_PRIMITIVE_ARRAY_DUMP = 0x23
TYPE_OBJECT = 2
TYPE_BOOLEAN = 4
TYPE_CHAR = 5
TYPE_FLOAT = 6
TYPE_DOUBLE = 7
TYPE_BYTE = 8
TYPE_SHORT = 9
TYPE_INT = 10
TYPE_LONG = 11
PRIMITIVE_ARRAY_NAMES = {
TYPE_BOOLEAN: "boolean[]",
TYPE_CHAR: "char[]",
TYPE_FLOAT: "float[]",
TYPE_DOUBLE: "double[]",
TYPE_BYTE: "byte[]",
TYPE_SHORT: "short[]",
TYPE_INT: "int[]",
TYPE_LONG: "long[]",
}
ROOT_SIZES_FIXED = {
0xFF: lambda id_size: id_size, # ROOT_UNKNOWN
0x01: lambda id_size: id_size * 2, # ROOT_JNI_GLOBAL
0x02: lambda id_size: id_size + 8, # ROOT_JNI_LOCAL
0x03: lambda id_size: id_size + 8, # ROOT_JAVA_FRAME
0x04: lambda id_size: id_size + 4, # ROOT_NATIVE_STACK
0x05: lambda id_size: id_size, # ROOT_STICKY_CLASS
0x06: lambda id_size: id_size + 4, # ROOT_THREAD_BLOCK
0x07: lambda id_size: id_size, # ROOT_MONITOR_USED
0x08: lambda id_size: id_size + 8, # ROOT_THREAD_OBJECT
0x89: lambda id_size: id_size, # ROOT_INTERNED_STRING
0x8A: lambda id_size: id_size, # ROOT_FINALIZING
0x8B: lambda id_size: id_size, # ROOT_DEBUGGER
0x8C: lambda id_size: id_size, # ROOT_REFERENCE_CLEANUP
0x8D: lambda id_size: id_size, # ROOT_VM_INTERNAL
0x8E: lambda id_size: id_size + 8, # ROOT_JNI_MONITOR
0x90: lambda id_size: id_size, # ROOT_UNREACHABLE
0xFE: lambda id_size: 4 + id_size, # HEAP_DUMP_INFO
}
@dataclasses.dataclass
class Header:
raw: bytes
version: str
id_size: int
timestamp_hi: int
timestamp_lo: int
@dataclasses.dataclass
class FieldInfo:
name_id: int
name: str
type_code: int
offset: int
@dataclasses.dataclass
class ClassInfo:
class_id: int
name: str
super_id: int
instance_size: int
fields: List[FieldInfo]
@dataclasses.dataclass
class StringInstance:
object_id: int
value_id: int
count_offset: Optional[int]
offset_offset: Optional[int]
coder_offset: Optional[int]
count_value: Optional[int]
offset_value: Optional[int]
coder_value: Optional[int]
data_len: int
@dataclasses.dataclass
class ArrayPatch:
array_id: int
old_type: int
old_count: int
new_type: int
new_count: int
new_payload: bytes
decoded_preview: str
reason: str
@dataclasses.dataclass
class InstancePatch:
object_id: int
replacements: Dict[int, bytes]
@dataclasses.dataclass
class Model:
strings: Dict[int, str]
class_name_string_ids: Dict[int, int]
classes: Dict[int, ClassInfo]
string_class_id: Optional[int]
@dataclasses.dataclass
class Plans:
array_patches: Dict[int, ArrayPatch]
instance_patches: Dict[int, InstancePatch]
heap_deltas: Dict[int, int]
skipped: collections.Counter
stats: collections.Counter
class HprofError(Exception):
pass
def read_exact(f: BinaryIO, n: int) -> bytes:
b = f.read(n)
if len(b) != n:
raise HprofError(f"Unexpected EOF while reading {n} bytes")
return b
def copy_exact(src: BinaryIO, dst: BinaryIO, n: int, chunk_size: int = 1024 * 1024) -> None:
remaining = n
while remaining:
chunk = src.read(min(chunk_size, remaining))
if not chunk:
raise HprofError("Unexpected EOF while copying")
dst.write(chunk)
remaining -= len(chunk)
def unpack_u1(b: bytes) -> int:
return b[0]
def unpack_u2(b: bytes) -> int:
return struct.unpack(">H", b)[0]
def unpack_u4(b: bytes) -> int:
return struct.unpack(">I", b)[0]
def unpack_i4(b: bytes) -> int:
return struct.unpack(">i", b)[0]
def pack_u1(v: int) -> bytes:
return bytes([v & 0xFF])
def pack_u2(v: int) -> bytes:
return struct.pack(">H", v & 0xFFFF)
def pack_u4(v: int) -> bytes:
return struct.pack(">I", v & 0xFFFFFFFF)
def pack_i4(v: int) -> bytes:
return struct.pack(">i", int(v))
def read_u1(f: BinaryIO) -> int:
return unpack_u1(read_exact(f, 1))
def read_u2(f: BinaryIO) -> int:
return unpack_u2(read_exact(f, 2))
def read_u4(f: BinaryIO) -> int:
return unpack_u4(read_exact(f, 4))
def read_id(f: BinaryIO, id_size: int) -> int:
return int.from_bytes(read_exact(f, id_size), "big", signed=False)
def pack_id(v: int, id_size: int) -> bytes:
return int(v).to_bytes(id_size, "big", signed=False)
def value_size(type_code: int, id_size: int) -> int:
if type_code == TYPE_OBJECT:
return id_size
if type_code in (TYPE_BOOLEAN, TYPE_BYTE):
return 1
if type_code in (TYPE_CHAR, TYPE_SHORT):
return 2
if type_code in (TYPE_FLOAT, TYPE_INT):
return 4
if type_code in (TYPE_DOUBLE, TYPE_LONG):
return 8
raise HprofError(f"Unknown HPROF value type {type_code}")
def primitive_size(type_code: int) -> int:
if type_code in (TYPE_BOOLEAN, TYPE_BYTE):
return 1
if type_code in (TYPE_CHAR, TYPE_SHORT):
return 2
if type_code in (TYPE_FLOAT, TYPE_INT):
return 4
if type_code in (TYPE_DOUBLE, TYPE_LONG):
return 8
raise HprofError(f"Unknown primitive array type {type_code}")
def read_header(f: BinaryIO) -> Header:
raw = bytearray()
while True:
c = f.read(1)
if not c:
raise HprofError("Missing HPROF header terminator")
raw += c
if c == b"\0":
break
version = raw[:-1].decode("ascii", errors="replace")
rest = read_exact(f, 12)
raw += rest
id_size, ts_hi, ts_lo = struct.unpack(">III", rest)
if id_size not in (4, 8):
raise HprofError(f"Unsupported object id size: {id_size}")
return Header(bytes(raw), version, id_size, ts_hi, ts_lo)
def read_value_from_data(data: bytes, off: int, type_code: int, id_size: int) -> int:
size = value_size(type_code, id_size)
part = data[off:off + size]
if len(part) != size:
raise HprofError("Instance data is shorter than expected")
if type_code == TYPE_OBJECT:
return int.from_bytes(part, "big", signed=False)
if type_code == TYPE_BOOLEAN:
return 1 if part[0] else 0
if type_code == TYPE_BYTE:
return part[0]
if type_code == TYPE_CHAR:
return unpack_u2(part)
if type_code == TYPE_SHORT:
return struct.unpack(">h", part)[0]
if type_code == TYPE_INT:
return unpack_i4(part)
if type_code == TYPE_LONG:
return struct.unpack(">q", part)[0]
if type_code == TYPE_FLOAT:
return unpack_u4(part)
if type_code == TYPE_DOUBLE:
return int.from_bytes(part, "big", signed=False)
raise HprofError(f"Unknown value type {type_code}")
def normalize_class_name(name: str) -> str:
return name.replace("/", ".")
def safe_decode_utf8(b: bytes) -> str:
return b.decode("utf-8", errors="replace")
def skip_class_dump_after_tag(f: BinaryIO, id_size: int) -> None:
read_id(f, id_size) # class id
f.seek(4, os.SEEK_CUR) # stack trace serial
for _ in range(6):
read_id(f, id_size) # super, loader, signers, pd, reserved1, reserved2
f.seek(4, os.SEEK_CUR) # instance size
cp_count = read_u2(f)
for _ in range(cp_count):
f.seek(2, os.SEEK_CUR)
t = read_u1(f)
f.seek(value_size(t, id_size), os.SEEK_CUR)
static_count = read_u2(f)
for _ in range(static_count):
read_id(f, id_size)
t = read_u1(f)
f.seek(value_size(t, id_size), os.SEEK_CUR)
inst_count = read_u2(f)
f.seek(inst_count * (id_size + 1), os.SEEK_CUR)
def parse_class_dump_after_tag(f: BinaryIO, id_size: int, strings: Dict[int, str]) -> ClassInfo:
class_id = read_id(f, id_size)
f.seek(4, os.SEEK_CUR) # stack trace serial
super_id = read_id(f, id_size)
read_id(f, id_size) # class loader
read_id(f, id_size) # signers
read_id(f, id_size) # protection domain
read_id(f, id_size) # reserved
read_id(f, id_size) # reserved
instance_size = read_u4(f)
cp_count = read_u2(f)
for _ in range(cp_count):
f.seek(2, os.SEEK_CUR)
t = read_u1(f)
f.seek(value_size(t, id_size), os.SEEK_CUR)
static_count = read_u2(f)
for _ in range(static_count):
read_id(f, id_size)
t = read_u1(f)
f.seek(value_size(t, id_size), os.SEEK_CUR)
raw_fields: List[Tuple[int, int]] = []
inst_count = read_u2(f)
for _ in range(inst_count):
name_id = read_id(f, id_size)
type_code = read_u1(f)
raw_fields.append((name_id, type_code))
off = 0
fields: List[FieldInfo] = []
for name_id, type_code in raw_fields:
name = strings.get(name_id, f"<string:{name_id:x}>")
fields.append(FieldInfo(name_id=name_id, name=name, type_code=type_code, offset=off))
off += value_size(type_code, id_size)
return ClassInfo(
class_id=class_id,
name="",
super_id=super_id,
instance_size=instance_size,
fields=fields,
)
def skip_heap_subrecord_after_tag(f: BinaryIO, tag: int, id_size: int) -> None:
if tag in ROOT_SIZES_FIXED:
f.seek(ROOT_SIZES_FIXED[tag](id_size), os.SEEK_CUR)
return
if tag == HPROF_CLASS_DUMP:
skip_class_dump_after_tag(f, id_size)
return
if tag == HPROF_INSTANCE_DUMP:
read_id(f, id_size)
f.seek(4, os.SEEK_CUR)
read_id(f, id_size)
data_len = read_u4(f)
f.seek(data_len, os.SEEK_CUR)
return
if tag == HPROF_OBJECT_ARRAY_DUMP:
read_id(f, id_size)
f.seek(4, os.SEEK_CUR)
count = read_u4(f)
read_id(f, id_size)
f.seek(count * id_size, os.SEEK_CUR)
return
if tag == HPROF_PRIMITIVE_ARRAY_DUMP:
read_id(f, id_size)
f.seek(4, os.SEEK_CUR)
count = read_u4(f)
type_code = read_u1(f)
f.seek(count * primitive_size(type_code), os.SEEK_CUR)
return
raise HprofError(f"Unknown heap sub-record tag 0x{tag:02x} at file offset 0x{f.tell() - 1:x}")
def iter_top_records(f: BinaryIO) -> Iterable[Tuple[int, int, int, int, int]]:
while True:
record_start = f.tell()
hdr = f.read(9)
if not hdr:
return
if len(hdr) != 9:
raise HprofError("Truncated top-level HPROF record header")
tag, time_delta, length = struct.unpack(">BII", hdr)
data_start = f.tell()
yield record_start, tag, time_delta, length, data_start
f.seek(data_start + length)
def walk_heap_record(f: BinaryIO, data_start: int, length: int, id_size: int, handler) -> None:
end = data_start + length
while f.tell() < end:
sub_start = f.tell()
tag = read_u1(f)
handler(sub_start, tag, end)
if f.tell() != end:
raise HprofError(f"Heap record overrun: at 0x{f.tell():x}, expected 0x{end:x}")
def pass1_model(path: str, verbose: bool = False) -> Tuple[Header, Model]:
strings: Dict[int, str] = {}
class_name_string_ids: Dict[int, int] = {}
classes: Dict[int, ClassInfo] = {}
with open(path, "rb") as f:
header = read_header(f)
for record_start, tag, _time_delta, length, data_start in iter_top_records(f):
data_end = data_start + length
if tag == TAG_STRING_IN_UTF8:
sid = read_id(f, header.id_size)
raw = read_exact(f, length - header.id_size)
strings[sid] = safe_decode_utf8(raw)
elif tag == TAG_LOAD_CLASS:
f.seek(4, os.SEEK_CUR) # class serial number
class_id = read_id(f, header.id_size)
f.seek(4, os.SEEK_CUR) # stack trace serial number
name_id = read_id(f, header.id_size)
class_name_string_ids[class_id] = name_id
elif tag in (TAG_HEAP_DUMP, TAG_HEAP_DUMP_SEGMENT):
def handler(sub_start: int, sub_tag: int, end: int) -> None:
if sub_tag == HPROF_CLASS_DUMP:
ci = parse_class_dump_after_tag(f, header.id_size, strings)
classes[ci.class_id] = ci
else:
skip_heap_subrecord_after_tag(f, sub_tag, header.id_size)
walk_heap_record(f, data_start, length, header.id_size, handler)
f.seek(data_end)
for class_id, ci in classes.items():
name_id = class_name_string_ids.get(class_id)
if name_id is not None:
ci.name = strings.get(name_id, "")
else:
ci.name = ""
string_class_id = None
for class_id, ci in classes.items():
if normalize_class_name(ci.name) == "java.lang.String":
string_class_id = class_id
break
if verbose:
print(f"HPROF version: {header.version}, id size: {header.id_size}")
print(f"Loaded {len(strings):,} UTF8 strings, {len(classes):,} classes")
if string_class_id is None:
print("java.lang.String class was not found")
else:
ci = classes[string_class_id]
print("java.lang.String fields:")
for field in ci.fields:
print(f" offset={field.offset:3d} type={field.type_code:2d} name={field.name}")
return header, Model(
strings=strings,
class_name_string_ids=class_name_string_ids,
classes=classes,
string_class_id=string_class_id,
)
def pass2_string_instances(path: str, header: Header, model: Model, verbose: bool = False) -> Tuple[Dict[int, StringInstance], Dict[int, List[int]], collections.Counter]:
if model.string_class_id is None:
raise HprofError("Cannot find java.lang.String in this HPROF")
string_class = model.classes[model.string_class_id]
value_field = next((f for f in string_class.fields if f.name == "value" and f.type_code == TYPE_OBJECT), None)
if value_field is None:
raise HprofError("java.lang.String.value field was not found")
count_field = next((f for f in string_class.fields if f.name in ("count", "length") and f.type_code == TYPE_INT), None)
offset_field = next((f for f in string_class.fields if f.name == "offset" and f.type_code == TYPE_INT), None)
coder_field = next((f for f in string_class.fields if f.name == "coder" and f.type_code == TYPE_BYTE), None)
by_object: Dict[int, StringInstance] = {}
by_value: Dict[int, List[int]] = collections.defaultdict(list)
stats = collections.Counter()
with open(path, "rb") as f:
read_header(f)
for record_start, tag, _time_delta, length, data_start in iter_top_records(f):
data_end = data_start + length
if tag in (TAG_HEAP_DUMP, TAG_HEAP_DUMP_SEGMENT):
def handler(sub_start: int, sub_tag: int, end: int) -> None:
if sub_tag != HPROF_INSTANCE_DUMP:
skip_heap_subrecord_after_tag(f, sub_tag, header.id_size)
return
object_id = read_id(f, header.id_size)
f.seek(4, os.SEEK_CUR)
class_id = read_id(f, header.id_size)
data_len = read_u4(f)
data = read_exact(f, data_len)
if class_id != model.string_class_id:
return
value_id = read_value_from_data(data, value_field.offset, TYPE_OBJECT, header.id_size)
if value_id == 0:
stats["string_null_value"] += 1
return
count_value = None
offset_value = None
coder_value = None
if count_field is not None:
count_value = read_value_from_data(data, count_field.offset, TYPE_INT, header.id_size)
if offset_field is not None:
offset_value = read_value_from_data(data, offset_field.offset, TYPE_INT, header.id_size)
if coder_field is not None:
coder_value = read_value_from_data(data, coder_field.offset, TYPE_BYTE, header.id_size)
si = StringInstance(
object_id=object_id,
value_id=value_id,
count_offset=count_field.offset if count_field is not None else None,
offset_offset=offset_field.offset if offset_field is not None else None,
coder_offset=coder_field.offset if coder_field is not None else None,
count_value=count_value,
offset_value=offset_value,
coder_value=coder_value,
data_len=data_len,
)
by_object[object_id] = si
by_value[value_id].append(object_id)
stats["string_instances"] += 1
walk_heap_record(f, data_start, length, header.id_size, handler)
f.seek(data_end)
if verbose:
print(f"Found {len(by_object):,} java.lang.String instances")
print(f"Found {len(by_value):,} distinct String.value backing arrays")
return by_object, by_value, stats
ASCIIISH_RE = re.compile(r"^[A-Za-z0-9_.$/@:;,+\- #()[\]{}<>\\|?!%&=*'\"~\r\n\t]*$")
def utf16_code_units_count(text: str) -> int:
return len(text.encode("utf-16-be", errors="surrogatepass")) // 2
def encode_as_hprof_char_array(text: str) -> bytes:
return text.encode("utf-16-be", errors="surrogatepass")
def mostly_printable(text: str) -> bool:
if text == "":
return True
bad = 0
for ch in text:
o = ord(ch)
if ch in "\r\n\t":
continue
if o >= 0x20 and not (0xD800 <= o <= 0xDFFF):
continue
bad += 1
return bad / max(len(text), 1) <= 0.02
def is_asciiish(text: str) -> bool:
if text == "":
return True
if not ASCIIISH_RE.match(text):
return False
useful = sum(1 for ch in text if ch.isalnum() or ch in " _.$/@:-")
return useful / max(len(text), 1) >= 0.60
def decode_bytes_best(data: bytes, prefer_utf8: bool = False) -> Tuple[str, str]:
if prefer_utf8:
try:
text = data.decode("utf-8")
return text, "utf-8"
except UnicodeDecodeError:
pass
return data.decode("latin-1"), "latin-1"
def score_utf16_candidate(text: str) -> Tuple[int, int, int]:
printable = sum(1 for ch in text if ch in "\r\n\t" or (ord(ch) >= 0x20 and not (0xD800 <= ord(ch) <= 0xDFFF)))
asciiish = sum(1 for ch in text if ch.isascii() and (ch.isalnum() or ch in " _.$/@:-"))
nuls = text.count("\x00")
return printable, asciiish, -nuls
def decode_utf16_bytes_best(data: bytes) -> Tuple[Optional[str], str]:
if len(data) % 2:
return None, "odd-utf16-length"
candidates = []
for enc in ("utf-16-le", "utf-16-be"):
try:
text = data.decode(enc, errors="surrogatepass")
candidates.append((score_utf16_candidate(text), text, enc))
except UnicodeDecodeError:
pass
if not candidates:
return None, "invalid-utf16"
candidates.sort(reverse=True, key=lambda x: x[0])
return candidates[0][1], candidates[0][2]
def decode_packed_char_payload(data: bytes, aggressive_unicode: bool) -> Tuple[Optional[str], str]:
if len(data) % 2:
return None, "odd-char-array-length"
raw = bytearray()
high_units = 0
ascii_pair_units = 0
for i in range(0, len(data), 2):
v = (data[i] << 8) | data[i + 1]
lo = v & 0xFF
hi = (v >> 8) & 0xFF
if v > 0x00FF:
high_units += 1
if 0x20 <= lo <= 0x7E and (hi == 0 or 0x20 <= hi <= 0x7E):
ascii_pair_units += 1
raw.append(lo)
if hi != 0:
raw.append(hi)
if not raw:
return "", "packed-empty"
text, enc = decode_bytes_best(bytes(raw), prefer_utf8=True)
# Conservative default: only patch obvious ASCII-ish packed strings.
# This avoids corrupting legitimate non-ASCII UTF-16 char[] values such as Chinese text.
if not aggressive_unicode:
units = max(len(data) // 2, 1)
if high_units / units < 0.60:
return None, "char-array-not-obviously-packed"
if ascii_pair_units / units < 0.80:
return None, "packed-bytes-not-asciiish"
if not is_asciiish(text):
return None, "decoded-text-not-asciiish"
if not mostly_printable(text):
return None, "decoded-text-not-printable"
return text, f"packed-char-array/{enc}"
def decode_string_value_array(
array_type: int,
count: int,
payload: bytes,
string_instances: List[StringInstance],
aggressive_unicode: bool,
force_byte_strings: bool,
) -> Tuple[Optional[str], str]:
coder_values = {si.coder_value for si in string_instances if si.coder_value is not None}
coder = next(iter(coder_values)) if len(coder_values) == 1 else None
if array_type == TYPE_BYTE:
if coder == 1:
text, enc = decode_utf16_bytes_best(payload)
if text is None:
return None, enc
if not aggressive_unicode and not mostly_printable(text):
return None, "utf16-byte-string-not-printable"
return text, f"byte-array/coder=1/{enc}"
# Android/JDK compact strings with coder 0 are Latin-1, not UTF-8.
text = payload.decode("latin-1")
if not force_byte_strings and not aggressive_unicode:
# Still conservative: skip very binary-looking strings.
if not mostly_printable(text):
return None, "latin1-byte-string-not-printable"
return text, "byte-array/coder=0-or-unknown/latin-1"
if array_type == TYPE_CHAR:
return decode_packed_char_payload(payload, aggressive_unicode=aggressive_unicode)
return None, f"unsupported-array-type-{array_type}"
def build_plans(
path: str,
header: Header,
model: Model,
by_object: Dict[int, StringInstance],
by_value: Dict[int, List[int]],
max_array_bytes: int,
aggressive: bool,
aggressive_unicode: bool,
force_byte_strings: bool,
set_coder_utf16: bool,
verbose: bool = False,
) -> Plans:
array_patches: Dict[int, ArrayPatch] = {}
instance_patches: Dict[int, InstancePatch] = {}
heap_deltas: Dict[int, int] = collections.defaultdict(int)
skipped = collections.Counter()
stats = collections.Counter()
with open(path, "rb") as f:
read_header(f)
for record_start, tag, _time_delta, length, data_start in iter_top_records(f):
data_end = data_start + length
if tag in (TAG_HEAP_DUMP, TAG_HEAP_DUMP_SEGMENT):
def handler(sub_start: int, sub_tag: int, end: int) -> None:
if sub_tag != HPROF_PRIMITIVE_ARRAY_DUMP:
skip_heap_subrecord_after_tag(f, sub_tag, header.id_size)
return
array_id = read_id(f, header.id_size)
f.seek(4, os.SEEK_CUR)
count = read_u4(f)
array_type = read_u1(f)
elem_size = primitive_size(array_type)
payload_len = count * elem_size
if array_id not in by_value:
f.seek(payload_len, os.SEEK_CUR)
return
if array_type not in (TYPE_BYTE, TYPE_CHAR):
skipped["string_value_not_byte_or_char_array"] += 1
f.seek(payload_len, os.SEEK_CUR)
return
if payload_len > max_array_bytes:
skipped["array_too_large"] += 1
f.seek(payload_len, os.SEEK_CUR)
return
payload = read_exact(f, payload_len)
string_objs = by_value[array_id]
sis = [by_object[obj_id] for obj_id in string_objs]
offsets = {si.offset_value for si in sis if si.offset_value is not None}
counts = {si.count_value for si in sis if si.count_value is not None}
if not aggressive:
if any(si.offset_value not in (None, 0) for si in sis):
skipped["nonzero_offset"] += 1
return
if len(string_objs) > 1 and (len(offsets) > 1 or len(counts) > 1):
skipped["shared_backing_array_ambiguous"] += 1
return
decoded, reason = decode_string_value_array(
array_type=array_type,
count=count,
payload=payload,
string_instances=sis,
aggressive_unicode=aggressive_unicode,
force_byte_strings=force_byte_strings,
)
if decoded is None:
skipped[reason] += 1
return
new_payload = encode_as_hprof_char_array(decoded)
new_count = len(new_payload) // 2
if array_type == TYPE_CHAR and new_payload == payload and new_count == count:
skipped["no_change_needed"] += 1
return
old_sub_len = 1 + header.id_size + 4 + 4 + 1 + payload_len
new_sub_len = 1 + header.id_size + 4 + 4 + 1 + len(new_payload)
delta = new_sub_len - old_sub_len
array_patches[array_id] = ArrayPatch(
array_id=array_id,
old_type=array_type,
old_count=count,
new_type=TYPE_CHAR,
new_count=new_count,
new_payload=new_payload,
decoded_preview=decoded[:120],
reason=reason,
)
heap_deltas[record_start] += delta
for si in sis:
replacements: Dict[int, bytes] = {}
if si.count_offset is not None:
replacements[si.count_offset] = pack_i4(new_count)
if si.offset_offset is not None:
replacements[si.offset_offset] = pack_i4(0)
if set_coder_utf16 and si.coder_offset is not None:
replacements[si.coder_offset] = pack_u1(1)
if replacements:
instance_patches[si.object_id] = InstancePatch(
object_id=si.object_id,
replacements=replacements,
)
stats["arrays_patched"] += 1
stats["strings_affected"] += len(sis)
if array_type == TYPE_BYTE:
stats["byte_arrays_patched"] += 1
elif array_type == TYPE_CHAR:
stats["char_arrays_patched"] += 1
walk_heap_record(f, data_start, length, header.id_size, handler)
f.seek(data_end)
if verbose:
print(f"Planned array patches: {len(array_patches):,}")
print(f"Planned string instance patches: {len(instance_patches):,}")
if skipped:
print("Skipped:")
for k, v in skipped.most_common():
print(f" {k}: {v:,}")
if array_patches:
print("Examples:")
for p in list(array_patches.values())[:10]:
old_name = PRIMITIVE_ARRAY_NAMES.get(p.old_type, str(p.old_type))
print(f" 0x{p.array_id:x}: {old_name}[{p.old_count}] -> char[{p.new_count}] "
f"({p.reason}) {p.decoded_preview!r}")
return Plans(
array_patches=array_patches,
instance_patches=instance_patches,
heap_deltas=dict(heap_deltas),
skipped=skipped,
stats=stats,
)
def patch_instance_data(data: bytes, patch: InstancePatch) -> bytes:
out = bytearray(data)
for off, replacement in patch.replacements.items():
if off < 0 or off + len(replacement) > len(out):
raise HprofError(f"Instance patch for object 0x{patch.object_id:x} is out of bounds")
out[off:off + len(replacement)] = replacement
return bytes(out)
def write_patched_heap_subrecord(
inp: BinaryIO,
out: BinaryIO,
sub_tag: int,
header: Header,
plans: Plans,
) -> None:
sub_start = inp.tell() - 1
if sub_tag == HPROF_PRIMITIVE_ARRAY_DUMP:
array_id = read_id(inp, header.id_size)
stack_serial = read_u4(inp)
count = read_u4(inp)
array_type = read_u1(inp)
payload_len = count * primitive_size(array_type)
patch = plans.array_patches.get(array_id)
if patch is None:
inp.seek(sub_start)
copy_exact(inp, out, 1 + header.id_size + 4 + 4 + 1 + payload_len)
return
inp.seek(payload_len, os.SEEK_CUR)
out.write(pack_u1(HPROF_PRIMITIVE_ARRAY_DUMP))
out.write(pack_id(array_id, header.id_size))
out.write(pack_u4(stack_serial))
out.write(pack_u4(patch.new_count))
out.write(pack_u1(patch.new_type))
out.write(patch.new_payload)
return
if sub_tag == HPROF_INSTANCE_DUMP:
object_id = read_id(inp, header.id_size)
stack_serial = read_u4(inp)
class_id = read_id(inp, header.id_size)
data_len = read_u4(inp)
data = read_exact(inp, data_len)
patch = plans.instance_patches.get(object_id)
if patch is None:
inp.seek(sub_start)
copy_exact(inp, out, 1 + header.id_size + 4 + header.id_size + 4 + data_len)
return
patched_data = patch_instance_data(data, patch)
out.write(pack_u1(HPROF_INSTANCE_DUMP))
out.write(pack_id(object_id, header.id_size))
out.write(pack_u4(stack_serial))
out.write(pack_id(class_id, header.id_size))
out.write(pack_u4(len(patched_data)))
out.write(patched_data)
return
# Non-patched subrecord: parse once to know its original byte length, then copy.
skip_heap_subrecord_after_tag(inp, sub_tag, header.id_size)
sub_end = inp.tell()
inp.seek(sub_start)
copy_exact(inp, out, sub_end - sub_start)
def write_output(path_in: str, path_out: str, header: Header, plans: Plans, verbose: bool = False) -> None:
tmp_out = path_out + ".tmp"
with open(path_in, "rb") as inp, open(tmp_out, "wb") as out:
actual_header = read_header(inp)
if actual_header.raw != header.raw:
raise HprofError("Header changed between passes")
out.write(header.raw)
for record_start, tag, time_delta, length, data_start in iter_top_records(inp):
data_end = data_start + length
if tag not in (TAG_HEAP_DUMP, TAG_HEAP_DUMP_SEGMENT):
out.write(struct.pack(">BII", tag, time_delta, length))
copy_exact(inp, out, length)
inp.seek(data_end)
continue
new_length = length + plans.heap_deltas.get(record_start, 0)
if new_length < 0 or new_length > 0xFFFFFFFF:
raise HprofError(f"Patched heap record at 0x{record_start:x} has invalid length {new_length}")
out.write(struct.pack(">BII", tag, time_delta, new_length))
while inp.tell() < data_end:
sub_tag = read_u1(inp)
write_patched_heap_subrecord(inp, out, sub_tag, header, plans)
if inp.tell() != data_end:
raise HprofError(f"Heap output pass overrun: at 0x{inp.tell():x}, expected 0x{data_end:x}")
out.flush()
os.replace(tmp_out, path_out)
if verbose:
print(f"Wrote {path_out}")
def main(argv: Optional[List[str]] = None) -> int:
ap = argparse.ArgumentParser(
description="Patch Android/ART compact/packed java.lang.String values in a converted HPROF so MAT can display them.",
)
ap.add_argument("input", help="Input HPROF, normally the hprof-conv output")
ap.add_argument("output", help="Output patched HPROF")
ap.add_argument("--dry-run", action="store_true", help="Analyze and show planned changes without writing output")
ap.add_argument("--verbose", "-v", action="store_true", help="Print parser details and patch examples")
ap.add_argument("--max-array-bytes", type=int, default=1024 * 1024, help="Skip String.value arrays larger than this many bytes/chars payload; default: 1048576")
ap.add_argument("--aggressive", action="store_true", help="Patch ambiguous shared String backing arrays and non-zero offsets")
ap.add_argument("--aggressive-unicode", action="store_true", help="Try to repair non-ASCII packed char[] strings too; higher false-positive risk")
ap.add_argument("--force-byte-strings", action="store_true", help="Patch byte[] String.value arrays even if they are not very printable")
ap.add_argument("--no-set-coder-utf16", action="store_true", help="Do not set java.lang.String.coder to UTF16 when a coder field exists")
args = ap.parse_args(argv)
if not os.path.exists(args.input):
print(f"Input does not exist: {args.input}", file=sys.stderr)
return 2
if os.path.abspath(args.input) == os.path.abspath(args.output):
print("Refusing to overwrite input in-place. Use a separate output file.", file=sys.stderr)
return 2
try:
header, model = pass1_model(args.input, verbose=args.verbose)
by_object, by_value, stats2 = pass2_string_instances(args.input, header, model, verbose=args.verbose)
plans = build_plans(
path=args.input,
header=header,
model=model,
by_object=by_object,
by_value=by_value,
max_array_bytes=args.max_array_bytes,
aggressive=args.aggressive,
aggressive_unicode=args.aggressive_unicode,
force_byte_strings=args.force_byte_strings,
set_coder_utf16=not args.no_set_coder_utf16,
verbose=args.verbose or args.dry_run,
)
print("Summary:")
print(f" java.lang.String instances: {stats2.get('string_instances', 0):,}")
print(f" distinct backing arrays: {len(by_value):,}")
print(f" arrays to patch: {len(plans.array_patches):,}")
print(f" strings affected: {plans.stats.get('strings_affected', 0):,}")
print(f" heap records resized: {len(plans.heap_deltas):,}")
if args.dry_run:
print("Dry run only; no output written.")
return 0
if not plans.array_patches and not plans.instance_patches:
print("No patches planned. Writing a byte-for-byte copy of the input.")
shutil.copyfile(args.input, args.output)
return 0
write_output(args.input, args.output, header, plans, verbose=True)
return 0
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment