Skip to content

Instantly share code, notes, and snippets.

@t-wy
Last active March 5, 2026 08:20
Show Gist options
  • Select an option

  • Save t-wy/778123fedd15513e4626ad27f07cb690 to your computer and use it in GitHub Desktop.

Select an option

Save t-wy/778123fedd15513e4626ad27f07cb690 to your computer and use it in GitHub Desktop.
MessagePack-CSharp unpacker for Python
def messagepack_csharp_unpack(data: bytes) -> list:
"""
Redistribution Notice:
Properly attribute all entities listed below and request others to follow the same.
Otherwise, DO NOT remove or modify this comment.
Specification (MessagePack for C#):
https://github.com/MessagePack-CSharp/MessagePack-CSharp
Dependencies:
msgpack: https://github.com/msgpack/msgpack-python
lz4: https://github.com/python-lz4/python-lz4
Implementation:
@t-wy: https://github.com/t-wy
"""
from msgpack import unpackb, Unpacker
def LZ4_decompress(size: int, src: bytes) -> bytes:
from lz4.block import decompress
return decompress(src, uncompressed_size=size)
def ext_hook(code, data):
if code == 99:
unpacker = Unpacker(None, max_buffer_size=0, strict_map_key=False) # integer may be used as key
unpacker.feed(data)
return unpackb(LZ4_decompress(unpacker.unpack(), data[unpacker.tell():]), strict_map_key=False) # make sure to call unpack before tell
elif code == 98: # list of integers specifying lengths of each part
unpacker = Unpacker(None, max_buffer_size=0)
unpacker.feed(data)
return tuple(unpacker)
raise ValueError
def check_98(lst):
if len(lst) > 0 and type(lst[0]) is tuple:
return unpackb(b"".join(LZ4_decompress(size, part) for size, part in zip(lst[0], lst[1:])), strict_map_key=False)
return lst
unpacker = Unpacker(None, ext_hook=ext_hook, list_hook=check_98, max_buffer_size=0, strict_map_key=False)
unpacker.feed(data)
return list(unpacker)
# an example implementation of treating the msgpack object as a dataclass
from typing import List, Union, get_origin, get_args
from dataclasses import dataclass, is_dataclass
from enum import Enum
def conv_dataclass(target):
for field, cls in target.__annotations__.items():
if cls is None:
continue
value = getattr(target, field)
if value is None:
continue
is_list = False
if get_origin(cls) is list:
cls = get_args(cls)[0]
is_list = True
if get_origin(cls) is Union and type(None) in get_args(cls) and len(get_args(cls)) == 2:
cls = [t for t in get_args(cls) if t is not type(None)][0]
if isinstance(cls, type):
action = None
if issubclass(cls, Enum): # or cls is DateTime: # handle other data types
action = lambda x: cls(x)
elif is_dataclass(cls):
action = lambda x: conv_dataclass(cls(*x))
if action is not None:
if is_list:
action = (lambda func: lambda x: [func(y) for y in x])(action)
object.__setattr__(target, field, action(value))
return target
def messagepack_object(cls):
def __post_init__(self):
conv_dataclass(self)
cls.__post_init__ = __post_init__
return dataclass(cls)
@dataclass
class InnerClass:
f1: int
f2: int
@messagepack_object
class OuterClass:
a: int
b: list
c: List[InnerClass]
d: str
obj = [1, [2, 3], [[4, 5], [6, 7]], "8"]
print(OuterClass(*obj))
# output
# OuterClass(a=1, b=[2, 3], c=[InnerClass(f1=4, f2=5), InnerClass(f1=6, f2=7)], d='8')
# an example implementation of adding keys back for indexed key
unnamed_dict = lambda x: {index: xt for index, xt in enumerate(x)}
list_of = lambda _type: lambda x: [call_or_convert(_type, xt) for xt in x]
nullable = lambda _type: lambda x: None if x is None else call_or_convert(_type, x)
identity = lambda x: x
def call_or_convert(struct, value):
return struct(value) if callable(struct) else add_keys(struct, value)
def add_keys(struct_dict, result):
return {key: call_or_convert(struct_dict[key], value) for key, value in zip(struct_dict, result) if struct_dict[key] is not ...} # for gaps, use ...
obj = [1, [2, 3], [[4, 5], [6, 7]], "8"]
print(add_keys({
"a": int,
"b": list,
"c": list_of({
"f1": int,
"f2": int
}),
"d": str
}, obj))
# output
# {'a': 1, 'b': [2, 3], 'c': [{'f1': 4, 'f2': 5}, {'f1': 6, 'f2': 7}], 'd': '8'}
def LZ4_decompress(size: int, src: bytes) -> bytes:
# Polyfill if python-lz4 is not available, implemented by @t-wy
offset = 0
result = b""
while offset < len(src):
token = src[offset]
offset += 1
# get copy part
run_length = token >> 4
if run_length == 15:
while True:
length = src[offset]
offset += 1
if length == 255:
run_length += 255
else:
run_length += length
break
result += src[offset:offset + run_length]
offset += run_length
if offset >= len(src):
break
# get repeated part
# get offset
dest_cursor = len(result)
dest_cursor -= (src[offset + 1] << 8) | src[offset]
offset += 2
# get matchlength
run_length = token & 15
if run_length == 15:
while True:
length = src[offset]
offset += 1
if length == 255:
run_length += 255
else:
run_length += length
break
if dest_cursor + run_length + 4 <= len(result):
# simple copy
temp = result[dest_cursor:dest_cursor + run_length + 4]
else:
temp = (result[dest_cursor:] * ((run_length + 4) // (len(result) - dest_cursor) + 1))[:run_length + 4]
result += temp
return result
@t-wy
Copy link
Author

t-wy commented Mar 4, 2026

If you only need a payload that MessagePack-CSharp can just parse, I suppose taking only the values from each dict (in correct key order) and pack it normally using msgpack would be sufficient, and the LZ4 compression is optional to further decrease the size, if I understand correctly.
(Not sure though, they only stated the interchangability of Lz4Block and Lz4BlockArray: (Doc)

Regardless of which LZ4 option is set at the deserialization, both methods can be deserialized.
For example, when the Lz4BlockArray option was used, binary data using either Lz4Block and Lz4BlockArray can be deserialized.
Neither can be decompressed and hence deserialized when the compression option is set to None.

)

Of course, the schema options should be checked carefully to see if it accepts int key or string key before use: https://github.com/MessagePack-CSharp/MessagePack-CSharp?tab=readme-ov-file#use-indexed-keys-instead-of-string-keys-contractless

@david8557
Copy link

Unfortunately that won't work, it will have deserialize error.

@t-wy
Copy link
Author

t-wy commented Mar 5, 2026

Then the extension format has to be followed instead (check the schema regarding which class that is set to enable compression) (Spec):

For type 99 the simple msgpack supports the ExtType(99, data) syntax so the flow of serializing that object is:

  1. Serialize that object alone normally
  2. Record this uncompressed length and then apply lz4 compression to the payload
  3. data should be the length value serialized + the lz4 compression result

Example code: (Notice store_size has to be False to match the format)

def LZ4_compress(src: bytes) -> bytes:
    from lz4.block import compress
    return compress(src, store_size=False)

def compressed_pack(entry) -> bytes:
    from msgpack import packb, ExtType
    packed = packb(entry)
    ext99 = LZ4_compress(packed)
    to_serialize = ExtType(99, packb(len(packed)) + ext99)
    return packb(to_serialize)

payload = [
    [1, 2, 3], # the objects values (w/o keys (attribute name))
    [4, 5, 6],
]

# serialization without compression
from msgpack import packb
result1 = b"".join(packb(entry) for entry in payload)
print(result1) # b'\x93\x01\x02\x03\x93\x04\x05\x06'
print(unpack(result1)) # [[1, 2, 3], [4, 5, 6]]

# serialization with compression
result2 = b"".join(compressed_pack(entry) for entry in payload)
print(result2) # b'\xc7\x06c\x04@\x93\x01\x02\x03\xc7\x06c\x04@\x93\x04\x05\x06'
print(unpack(result2)) # [[1, 2, 3], [4, 5, 6]]

Check if that works. (I did not set up a C# environment to test the serialization since I only needed the deserialized data, so I'm not sure)

Notice that I use return list(unpacker) only because the data I handled are usually object arrays, adjust that if it is a single object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment