Created
June 19, 2025 08:10
-
-
Save PaulGG-Code/29df6c147834e4dc325eaf4183a6b58f to your computer and use it in GitHub Desktop.
Scan, connect, pair, enumerate, fuzz ble tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import subprocess | |
| import platform | |
| import pexpect | |
| import csv | |
| import os | |
| import datetime | |
| import shutil | |
| from bleak import BleakClient, BleakScanner | |
| from bleak.backends.characteristic import BleakGATTCharacteristic | |
| from rich.console import Console | |
| from rich.table import Table | |
| address = "<input_mac_here" | |
| notifications_log = {} | |
| connected_client = None | |
| console = Console() | |
| UUID_ALIASES = { | |
| "00002a29-0000-1000-8000-00805f9b34fb": "Manufacturer Name", | |
| "00002a25-0000-1000-8000-00805f9b34fb": "Serial Number", | |
| "00002a27-0000-1000-8000-00805f9b34fb": "Hardware Revision", | |
| "00002a24-0000-1000-8000-00805f9b34fb": "Model Number", | |
| "00002a28-0000-1000-8000-00805f9b34fb": "Software Revision", | |
| "00002a00-0000-1000-8000-00805f9b34fb": "Device Name", | |
| "00002a01-0000-1000-8000-00805f9b34fb": "Appearance", | |
| "00002a04-0000-1000-8000-00805f9b34fb": "Peripheral Preferred Connection Parameters", | |
| "00002aa6-0000-1000-8000-00805f9b34fb": "Central Address Resolution", | |
| } | |
| def handle_notification(sender: BleakGATTCharacteristic, data: bytearray): | |
| console.print(f"\nπ [bold yellow]Notification from[/] {sender.uuid} (Handle: {sender.handle}): {data.hex()}") | |
| try: | |
| ascii_val = data.decode(errors="ignore") | |
| console.print(f" ASCII: [green]{ascii_val}[/]") | |
| except: | |
| pass | |
| try: | |
| if len(data) == 2: | |
| import struct | |
| console.print(f" Parsed ushort: [cyan]{struct.unpack('<H', data)[0]}") | |
| except: | |
| pass | |
| async def pair_with_device(mac: str): | |
| if platform.system() != "Linux": | |
| console.print("β οΈ [red]Auto pairing is only supported on Linux via bluetoothctl.") | |
| return | |
| console.print(f"π [blue]Automatically pairing with device[/] {mac} using bluetoothctl...") | |
| try: | |
| child = pexpect.spawn("bluetoothctl", timeout=30) | |
| child.expect_exact("[bluetoothctl]> ", timeout=10) | |
| child.sendline("power on") | |
| child.expect_exact("[bluetoothctl]> ", timeout=5) | |
| child.sendline("agent on") | |
| child.expect_exact("Agent registered", timeout=5) | |
| child.expect_exact("[bluetoothctl]> ", timeout=5) | |
| child.sendline("default-agent") | |
| child.expect_exact("Default agent request successful", timeout=5) | |
| child.expect_exact("[bluetoothctl]> ", timeout=5) | |
| child.sendline("scan on") | |
| child.expect_exact("Discovery started", timeout=5) | |
| child.expect_exact("[bluetoothctl]> ", timeout=5) | |
| child.sendline(f"pair {mac}") | |
| i = child.expect([ | |
| "Pairing successful", "Failed to pair", "AuthenticationCanceled", | |
| "Failed to connect", pexpect.TIMEOUT | |
| ], timeout=20) | |
| if i == 0: | |
| console.print("β [green]Paired successfully.") | |
| else: | |
| console.print("β [red]Pairing failed or timed out.") | |
| return | |
| child.sendline(f"trust {mac}") | |
| child.expect_exact("trust succeeded", timeout=5) | |
| child.expect_exact("[bluetoothctl]> ", timeout=5) | |
| console.print("β [green]Device trusted.") | |
| child.sendline("scan off") | |
| child.expect_exact("Discovery stopped", timeout=5) | |
| child.expect_exact("[bluetoothctl]> ", timeout=5) | |
| child.sendline("quit") | |
| child.close() | |
| console.print("π [green]Pairing process complete.") | |
| except Exception as e: | |
| console.print(f"β [red]Automated pairing failed: {e}") | |
| async def connect_device(): | |
| global connected_client | |
| try: | |
| console.print("π [blue]Scanning for device...") | |
| devices = await BleakScanner.discover(timeout=5.0) | |
| found = next((d for d in devices if d.address.lower() == address.lower()), None) | |
| if found: | |
| console.print(f"β [green]Found device:[/] {found.name}") | |
| else: | |
| console.print("β οΈ [yellow]Device not found, continuing anyway...") | |
| connected_client = BleakClient(address, timeout=15) | |
| await connected_client.connect() | |
| console.print(f"π [green]Connected to[/] {address}") | |
| except Exception as e: | |
| console.print(f"β [red]Failed to connect: {e}") | |
| async def list_services(): | |
| global connected_client | |
| if not connected_client or not connected_client.is_connected: | |
| console.print("β οΈ [red]Not connected.") | |
| return | |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| csv_file = f"ble_services_{timestamp}.csv" | |
| with open(csv_file, mode="w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["Service UUID", "Char UUID", "Alias", "Handle (dec)", "Handle (hex)", "Properties", "Value"]) | |
| services = connected_client.services | |
| for service in services: | |
| console.rule(f"π§ Service UUID: {service.uuid}") | |
| table = Table(show_header=True, header_style="bold magenta") | |
| table.add_column("Char UUID", style="cyan") | |
| table.add_column("Alias", style="white") | |
| table.add_column("Handle (dec)") | |
| table.add_column("Handle (hex)", style="yellow") | |
| table.add_column("Properties") | |
| table.add_column("Value") | |
| for char in service.characteristics: | |
| props = ", ".join(char.properties) | |
| val = "" | |
| if "read" in char.properties: | |
| try: | |
| raw_val = await connected_client.read_gatt_char(char.uuid) | |
| val = f"{raw_val.hex()} ({raw_val.decode(errors='ignore')})" | |
| except: | |
| val = "[red]Read error" | |
| alias = UUID_ALIASES.get(char.uuid.lower(), "") | |
| table.add_row(char.uuid, alias, str(char.handle), hex(char.handle), props, val) | |
| writer.writerow([service.uuid, char.uuid, alias, char.handle, hex(char.handle), props, val]) | |
| console.print(table) | |
| console.print(f"πΎ [green]Exported to {csv_file}") | |
| async def subscribe_notifications(): | |
| global connected_client | |
| if not connected_client or not connected_client.is_connected: | |
| console.print("β οΈ [red]Not connected.") | |
| return | |
| subscribed_uuids = [] | |
| services = connected_client.services | |
| for service in services: | |
| for char in service.characteristics: | |
| if "notify" in char.properties or "indicate" in char.properties: | |
| try: | |
| await connected_client.start_notify(char.uuid, handle_notification) | |
| subscribed_uuids.append(char.uuid) | |
| console.print(f"β [green]Subscribed to[/] {char.uuid}") | |
| except Exception as e: | |
| console.print(f"β [red]Could not subscribe to {char.uuid}: {e}") | |
| console.print("β³ [cyan]Listening for notifications. Press ENTER to stop...") | |
| await asyncio.get_event_loop().run_in_executor(None, input) | |
| for uuid in subscribed_uuids: | |
| try: | |
| await connected_client.stop_notify(uuid) | |
| console.print(f"π [green]Unsubscribed from {uuid}") | |
| except: | |
| pass | |
| async def fuzz_characteristics(): | |
| global connected_client | |
| if not connected_client or not connected_client.is_connected: | |
| console.print("β οΈ [red]Not connected.") | |
| return | |
| console.print("β οΈ [yellow]Fuzzing characteristics β sending dummy data to writable characteristics.") | |
| test_payloads = [b"\x00", b"\x01\x02", b"\xff" * 10, b"A" * 20] | |
| services = connected_client.services | |
| for service in services: | |
| for char in service.characteristics: | |
| if "write" in char.properties or "write-without-response" in char.properties: | |
| for payload in test_payloads: | |
| try: | |
| await connected_client.write_gatt_char(char.uuid, payload) | |
| console.print(f"π§ͺ Sent {payload.hex()} to {char.uuid}") | |
| if "read" in char.properties: | |
| new_val = await connected_client.read_gatt_char(char.uuid) | |
| console.print(f" π New value: {new_val.hex()} ({new_val.decode(errors='ignore')})") | |
| except Exception as e: | |
| console.print(f"β [red]Failed to fuzz {char.uuid}: {e}") | |
| async def menu(): | |
| global connected_client | |
| while True: | |
| console.print("\n[bold blue]--- BLE Menu ---") | |
| console.print("1. π Pair & Trust Device") | |
| console.print("2. π Connect to Device") | |
| console.print("3. π List Services and Characteristics") | |
| console.print("4. π¬ Listen for Notifications") | |
| console.print("5. π§ͺ Fuzz Characteristics (write)") | |
| console.print("6. π΅οΈ Reverse Notification Payloads (built-in decoding)") | |
| console.print("7. β Exit") | |
| choice = input("Select an option: ").strip() | |
| if choice == "1": | |
| await pair_with_device(address) | |
| elif choice == "2": | |
| await connect_device() | |
| elif choice == "3": | |
| await list_services() | |
| elif choice == "4": | |
| await subscribe_notifications() | |
| elif choice == "5": | |
| await fuzz_characteristics() | |
| elif choice == "6": | |
| console.print("[cyan]βΉοΈ Notification payloads will be interpreted in ASCII + ushort when received.") | |
| elif choice == "7": | |
| if connected_client and connected_client.is_connected: | |
| try: | |
| for service in connected_client.services: | |
| for char in service.characteristics: | |
| if "notify" in char.properties or "indicate" in char.properties: | |
| try: | |
| await connected_client.stop_notify(char.uuid) | |
| except: | |
| pass | |
| await connected_client.disconnect() | |
| console.print("π [green]Disconnected successfully.") | |
| except: | |
| pass | |
| console.print("π [bold]Goodbye.") | |
| break | |
| else: | |
| console.print("β [red]Invalid choice. Please enter a number between 1 and 7.") | |
| asyncio.run(menu()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment