Created
October 21, 2024 14:34
-
-
Save luca-m/ac1d604f2da6aace22afc36bc81a6295 to your computer and use it in GitHub Desktop.
Revisions
-
luca-m created this gist
Oct 21, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,262 @@ import functools import json import idaapi import ida_hexrays import ida_kernwin import idc import os import re import textwrap import threading import requests # ============================================================================= # Setup the context menu and hotkey in IDA # ============================================================================= OPENAI_API_KEY="" OPENAI_MODEL="gpt-4o-mini" class GepettoPlugin(idaapi.plugin_t): flags = 0 explain_action_name = "gepetto:explain_function" explain_menu_path = "Edit/Gepetto/Explain function" rename_action_name = "gepetto:rename_function" rename_menu_path = "Edit/Gepetto/Rename variables" wanted_name = 'Gepetto' wanted_hotkey = '' comment = "Uses gpt-4 to enrich the decompiler's output" help = "See usage instructions on GitHub" menu = None def init(self): # Check whether the decompiler is available if not ida_hexrays.init_hexrays_plugin(): return idaapi.PLUGIN_SKIP # Function explaining action explain_action = idaapi.action_desc_t(self.explain_action_name, 'Explain function', ExplainHandler(), "Ctrl+Alt+G", 'Use gpt-4 to explain the currently selected function', 199) idaapi.register_action(explain_action) idaapi.attach_action_to_menu(self.explain_menu_path, self.explain_action_name, idaapi.SETMENU_APP) # Variable renaming action rename_action = idaapi.action_desc_t(self.rename_action_name, 'Rename variables', RenameHandler(), "Ctrl+Alt+R", "Use gpt-4 to rename this function's variables", 199) idaapi.register_action(rename_action) idaapi.attach_action_to_menu(self.rename_menu_path, self.rename_action_name, idaapi.SETMENU_APP) # Register context menu actions self.menu = ContextMenuHooks() self.menu.hook() return idaapi.PLUGIN_KEEP def run(self, arg): pass def term(self): idaapi.detach_action_from_menu(self.explain_menu_path, self.explain_action_name) idaapi.detach_action_from_menu(self.rename_menu_path, self.rename_action_name) if self.menu: self.menu.unhook() return # ----------------------------------------------------------------------------- class ContextMenuHooks(idaapi.UI_Hooks): def finish_populating_widget_popup(self, form, popup): # Add actions to the context menu of the Pseudocode view if idaapi.get_widget_type(form) == idaapi.BWN_PSEUDOCODE: idaapi.attach_action_to_popup(form, popup, GepettoPlugin.explain_action_name, "Gepetto/") idaapi.attach_action_to_popup(form, popup, GepettoPlugin.rename_action_name, "Gepetto/") # ----------------------------------------------------------------------------- def comment_callback(address, view, response): """ Callback that sets a comment at the given address. :param address: The address of the function to comment :param view: A handle to the decompiler window :param response: The comment to add """ # Add newlines at the end of each sentence. response = "\n".join(textwrap.wrap(response, 80, replace_whitespace=False)) # Add the response as a comment in IDA. idc.set_func_cmt(address, response, 0) # Refresh the window so the comment is displayed properly if view: view.refresh_view(False) print("Query to model finished!") # ----------------------------------------------------------------------------- class ExplainHandler(idaapi.action_handler_t): """ This handler is tasked with querying the model for an explanation of the given function. Once the reply is received, it is added as a function comment. """ def __init__(self): idaapi.action_handler_t.__init__(self) def activate(self, ctx): decompiler_output = ida_hexrays.decompile(idaapi.get_screen_ea()) v = ida_hexrays.get_widget_vdui(ctx.widget) query_model_async("Can you explain what the following C function does and suggest a better name for it?\n" + str(decompiler_output), functools.partial(comment_callback, address=idaapi.get_screen_ea(), view=v)) return 1 # This action is always available. def update(self, ctx): return idaapi.AST_ENABLE_ALWAYS # ----------------------------------------------------------------------------- def rename_callback(address, view, response): """ Callback that extracts a JSON array of old names and new names from the response and sets them in the pseudocode. :param address: The address of the function to work on :param view: A handle to the decompiler window :param response: The response from the model """ j = re.search(r"\{[^}]*?\}", response) if not j: print(f"Cannot extract valid JSON from the response. Asking the model to fix it...") query_model_async("The JSON document provided in this response is invalid. Can you fix it?\n" + response, functools.partial(rename_callback, address=idaapi.get_screen_ea(), view=view)) return try: names = json.loads(j.group(0)) except json.decoder.JSONDecodeError: print(f"The JSON document returned is invalid. Asking the model to fix it...") query_model_async("Please fix the following JSON document:\n" + j.group(0), functools.partial(rename_callback, address=idaapi.get_screen_ea(), view=view)) return # The rename function needs the start address of the function function_addr = idaapi.get_func(address).start_ea replaced = [] for n in names: if ida_hexrays.rename_lvar(function_addr, n, names[n]): replaced.append(n) # Update possible names left in the function comment comment = idc.get_func_cmt(address, 0) if comment and len(replaced) > 0: for n in replaced: comment = re.sub(r'\b%s\b' % n, names[n], comment) idc.set_func_cmt(address, comment, 0) # Refresh the window to show the new names if view: view.refresh_view(True) print(f"Query finished! {len(replaced)} variable(s) renamed.") # ----------------------------------------------------------------------------- class RenameHandler(idaapi.action_handler_t): """ This handler requests new variable names from the model and updates the decompiler's output. """ def __init__(self): idaapi.action_handler_t.__init__(self) def activate(self, ctx): decompiler_output = ida_hexrays.decompile(idaapi.get_screen_ea()) v = ida_hexrays.get_widget_vdui(ctx.widget) query_model_async("Analyze the following C function:\n" + str(decompiler_output) + "\nSuggest better variable names, reply with a JSON array where keys are the original names" "and values are the proposed names. Do not explain anything, only print the JSON " "dictionary.", functools.partial(rename_callback, address=idaapi.get_screen_ea(), view=v)) return 1 # This action is always available. def update(self, ctx): return idaapi.AST_ENABLE_ALWAYS # ============================================================================= # Model interaction # ============================================================================= def query_model(query, cb, max_tokens=6500): """ Function which sends a query to the model API and calls a callback when the response is available. Blocks until the response is received. :param query: The request to send to the model API. :param cb: The function to which the response will be passed. """ api_key = OPENAI_API_KEY # os.getenv("OPENAI_API_KEY") # Assume API key is stored as environment variable if not api_key: print("Error: API key not found.") return headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } data = { "model": OPENAI_MODEL, # Replace with the model you want to use "messages": [ #{"role": "system", "content": "You are a reverse engineer."}, # System role message {"role": "user", "content": query} # User's query ], #"prompt": query, "max_tokens": max_tokens, "temperature": 0.6, "top_p": 1, "frequency_penalty": 1, "presence_penalty": 1, } try: response = requests.post( "https://api.openai.com/v1/chat/completions", # Adjust this to your API endpoint headers=headers, json=data, timeout=60 ) if response.status_code == 200: response_json = response.json() result = response_json['choices'][0]['message']['content'] # Correct extraction # Extract the completion text ida_kernwin.execute_sync(functools.partial(cb, response=result), ida_kernwin.MFF_WRITE) else: print(f"API request failed with status code {response.status_code}: {response.text}") except requests.exceptions.RequestException as e: print(f"Error during the API request: {e}") # ----------------------------------------------------------------------------- def query_model_async(query, cb): """ Function which sends a query to the model API asynchronously. :param query: The request to send to the model API. :param cb: The function to which the response will be passed. """ print("Request sent to model API asynchronously...") t = threading.Thread(target=query_model, args=[query, cb]) t.start() # ============================================================================= # Main # ============================================================================= def PLUGIN_ENTRY(): return GepettoPlugin()