import mitmproxy.http from mitmproxy import ctx import lz4.block import binascii # NOTE: miitomo common key is '9ec1c78fa2cb34e2bed5691c08432f04' COMMON_KEY = "9ec1c78fa2cb34e2bed5691c08432f04" SESSION_ID_COOKIE_NAME = "player_session_id" def transform_common_key(s): """ Transforms the common key by subtracting 0x62 from each character and negating the result, similar to FUN_0004a120 in libsakasho.so. Args: s (str): Input common key string. Returns: bytes: Processed bytes. """ return bytes([(-0x62 - ord(c)) & 0xFF for c in s]) def build_xor_table(common_key, session_id): """ Build the XOR table by processing the common key and session ID and concatenating the results. Args: common_key (str): The common key string. session_id (str): The session ID string. Returns: bytes: The XOR table. """ transformed_common = transform_common_key(common_key) return transformed_common + session_id.encode('ascii') def conditional_xor(data, xor_table, encode=False): """ Apply the conditional XOR operation to the data using the XOR table. For each byte in the data: - If (key_byte & 7) == 0, XOR the data byte with the key byte. - Else, perform a bit rotation based on (key_byte & 7). Args: data (bytes): The obfuscated or plain data. xor_table (bytes): The XOR table. encode (bool): If True, perform encoding; if False, perform decoding. Returns: bytes: The data after applying the XOR operation. """ output = bytearray(len(data)) table_len = len(xor_table) for i in range(len(data)): key_byte = xor_table[(i + 1) % table_len] if (key_byte & 7) == 0: # Perform XOR output[i] = data[i] ^ key_byte else: # Perform bit rotation shift = key_byte & 7 if encode: rotated = ((data[i] << (8 - shift)) | (data[i] >> shift)) & 0xFF else: rotated = ((data[i] >> (8 - shift)) | (data[i] << shift)) & 0xFF output[i] = rotated return bytes(output) def decode_varint(data): """ Decode a varint from the beginning of the data. Args: data (bytes): The data containing the varint at the start. Returns: tuple: (decoded integer, number of bytes consumed) Raises: ValueError: If the varint is too long or incomplete. """ value = 0 shift = 0 for i, byte in enumerate(data): value |= (byte & 0x7F) << shift if (byte & 0x80) == 0: return value, i + 1 shift += 7 if shift >= 35: raise ValueError("Varint too long") raise ValueError("Incomplete varint") def encode_varint(value): """ Encode an integer into varint format. Args: value (int): The integer to encode. Returns: bytes: The varint-encoded bytes. """ parts = [] while True: byte = value & 0x7F value >>= 7 if value: parts.append(byte | 0x80) else: parts.append(byte) break return bytes(parts) class DenaObfuscation: def __init__(self): # Constant common key self.common_key = COMMON_KEY def get_session_id(self, flow: mitmproxy.http.HTTPFlow): """ Retrieve the session ID from cookies. It first checks the response cookies, then the request cookies. If not found, returns None. Args: flow (mitmproxy.http.HTTPFlow): The HTTP flow. Returns: str or None: The session ID if found, else None. """ # Check response cookies if flow.response: cookies = flow.response.cookies.get(SESSION_ID_COOKIE_NAME) if cookies: return cookies[0] if isinstance(cookies, tuple) else cookies # Check request cookies if flow.request: cookies = flow.request.cookies.get(SESSION_ID_COOKIE_NAME) if cookies: return cookies[0] if isinstance(cookies, tuple) else cookies return None def should_process_flow(self, flow: mitmproxy.http.HTTPFlow) -> bool: """ Determine whether to process the flow based on specific criteria. This function checks if the User-Agent contains "SakashoClient". Args: flow (mitmproxy.http.HTTPFlow): The HTTP flow. Returns: bool: True if the flow should be processed, otherwise False. """ # Check if User-Agent contains "SakashoClient" user_agent = flow.request.headers.get("User-Agent", "") if "SakashoClient" not in user_agent: return False # Skip /v1/session specifically if flow.request.path == "/v1/session": return False return True def request(self, flow: mitmproxy.http.HTTPFlow): """ Handle the HTTP request. Encode the request body if it starts with "{". """ session_id = self.get_session_id(flow) if not session_id or not self.should_process_flow(flow): return # Build XOR table xor_table = build_xor_table(self.common_key, session_id) if not xor_table: ctx.log.error("XOR table is empty. Skipping flow.") return # Store XOR table in flow metadata flow.metadata['xor_table'] = xor_table if flow.request.content and flow.request.content.startswith(b'{'): # If the request body starts with '{', encode it try: # Encode the content encoded_data = self.encode_content(flow.request.content, xor_table) # Replace the request content with encoded data flow.request.content = encoded_data ctx.log.info(f"Encoded request for flow {flow.id}") except Exception as e: ctx.log.error(f"Error encoding request for flow {flow.id}: {e}") def response(self, flow: mitmproxy.http.HTTPFlow): """ Handle the HTTP response. Decode the response body if applicable. """ session_id = self.get_session_id(flow) if not session_id or not self.should_process_flow(flow): return # Build XOR table xor_table = build_xor_table(self.common_key, session_id) if not xor_table: ctx.log.error("XOR table is empty. Skipping flow.") return # Store XOR table in flow metadata flow.metadata['xor_table'] = xor_table if flow.response.content: try: # Decode the response body data_after_xor = conditional_xor(flow.response.content, xor_table, encode=False) # Decode varint varint, varint_length = decode_varint(data_after_xor) # Extract compressed data compressed_data = data_after_xor[varint_length:] #print(data_after_xor) # Decompress using LZ4 decompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=varint) # Replace the response content with decompressed data flow.response.content = decompressed_data flow.response.headers['Content-Type'] = 'application/json; charset=UTF-8' # Mark that the response has been decoded flow.metadata['decoded_response'] = { "varint": varint, "varint_length": varint_length, "compressed_data": compressed_data, "decompressed_data": decompressed_data } ctx.log.info(f"Decoded response for flow {flow.request.url} ({flow.id})") except Exception as e: ctx.log.warn(f"Error decoding response for flow {flow.request.url} ({flow.id}): {e}") if flow.request.content: # If the request body does not start with '{', decode it after response is done try: # Decode the request body data_after_xor = conditional_xor(flow.request.content, xor_table, encode=False) # Decode varint varint, varint_length = decode_varint(data_after_xor) # Extract compressed data compressed_data = data_after_xor[varint_length:] # Decompress using LZ4 decompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=varint) # Replace the request content with decompressed data flow.request.content = decompressed_data ctx.log.info(f"Decoded request for flow {flow.id} after response done") except Exception as e: ctx.log.error(f"Error decoding request for flow {flow.id} after response done: {e}") def encode_content(self, content: bytes, xor_table: bytes): """ Encode the content using the provided XOR table. Args: content (bytes): The data to encode or decode. xor_table (bytes): The XOR table. encode (bool): Encoding or decoding. Returns: bytes: The encoded or decoded data. """ # Compress using LZ4 compressed_data = lz4.block.compress(content) # Encode varint (size of decompressed data) varint = encode_varint(len(content)) # Concatenate varint and compressed data data_to_encode = varint + compressed_data[4:] # Apply conditional XOR encoding encoded_data = conditional_xor(data_to_encode, xor_table, encode=True) return encoded_data def encode_request(self, flow: mitmproxy.http.HTTPFlow): """ Encode the modified request content before sending it to the server. """ if 'decoded_request' in flow.metadata: xor_table = flow.metadata['xor_table'] decoded_data = flow.request.content try: # Encode the content encoded_data = self.encode_content(decoded_data, xor_table) # Replace the request content with encoded data flow.request.content = encoded_data ctx.log.info(f"Encoded modified request for flow {flow.request.url} ({flow.id})") except Exception as e: ctx.log.error(f"Error encoding modified request for flow {flow.request.url} ({flow.id}): {e}") def encode_response(self, flow: mitmproxy.http.HTTPFlow): """ Encode the modified response content before sending it to the client. """ if 'decoded_response' in flow.metadata: xor_table = flow.metadata['xor_table'] decoded_data = flow.response.content try: # Encode the content encoded_data = self.encode_content(decoded_data, xor_table) # Replace the response content with encoded data flow.response.content = encoded_data ctx.log.info(f"Encoded modified response for flow {flow.request.url} ({flow.id})") except Exception as e: ctx.log.error(f"Error encoding modified response for flow {flow.request.url} ({flow.id})") addons = [ DenaObfuscation() ]