Skip to content

Instantly share code, notes, and snippets.

@PaulGG-Code
Created June 19, 2025 08:10
Show Gist options
  • Select an option

  • Save PaulGG-Code/29df6c147834e4dc325eaf4183a6b58f to your computer and use it in GitHub Desktop.

Select an option

Save PaulGG-Code/29df6c147834e4dc325eaf4183a6b58f to your computer and use it in GitHub Desktop.
Scan, connect, pair, enumerate, fuzz ble tool
import asyncio
import subprocess
import platform
import pexpect
import csv
import os
import datetime
import shutil
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from rich.console import Console
from rich.table import Table
address = "<input_mac_here"
notifications_log = {}
connected_client = None
console = Console()
UUID_ALIASES = {
"00002a29-0000-1000-8000-00805f9b34fb": "Manufacturer Name",
"00002a25-0000-1000-8000-00805f9b34fb": "Serial Number",
"00002a27-0000-1000-8000-00805f9b34fb": "Hardware Revision",
"00002a24-0000-1000-8000-00805f9b34fb": "Model Number",
"00002a28-0000-1000-8000-00805f9b34fb": "Software Revision",
"00002a00-0000-1000-8000-00805f9b34fb": "Device Name",
"00002a01-0000-1000-8000-00805f9b34fb": "Appearance",
"00002a04-0000-1000-8000-00805f9b34fb": "Peripheral Preferred Connection Parameters",
"00002aa6-0000-1000-8000-00805f9b34fb": "Central Address Resolution",
}
def handle_notification(sender: BleakGATTCharacteristic, data: bytearray):
console.print(f"\nπŸ”” [bold yellow]Notification from[/] {sender.uuid} (Handle: {sender.handle}): {data.hex()}")
try:
ascii_val = data.decode(errors="ignore")
console.print(f" ASCII: [green]{ascii_val}[/]")
except:
pass
try:
if len(data) == 2:
import struct
console.print(f" Parsed ushort: [cyan]{struct.unpack('<H', data)[0]}")
except:
pass
async def pair_with_device(mac: str):
if platform.system() != "Linux":
console.print("⚠️ [red]Auto pairing is only supported on Linux via bluetoothctl.")
return
console.print(f"πŸ” [blue]Automatically pairing with device[/] {mac} using bluetoothctl...")
try:
child = pexpect.spawn("bluetoothctl", timeout=30)
child.expect_exact("[bluetoothctl]> ", timeout=10)
child.sendline("power on")
child.expect_exact("[bluetoothctl]> ", timeout=5)
child.sendline("agent on")
child.expect_exact("Agent registered", timeout=5)
child.expect_exact("[bluetoothctl]> ", timeout=5)
child.sendline("default-agent")
child.expect_exact("Default agent request successful", timeout=5)
child.expect_exact("[bluetoothctl]> ", timeout=5)
child.sendline("scan on")
child.expect_exact("Discovery started", timeout=5)
child.expect_exact("[bluetoothctl]> ", timeout=5)
child.sendline(f"pair {mac}")
i = child.expect([
"Pairing successful", "Failed to pair", "AuthenticationCanceled",
"Failed to connect", pexpect.TIMEOUT
], timeout=20)
if i == 0:
console.print("βœ… [green]Paired successfully.")
else:
console.print("❌ [red]Pairing failed or timed out.")
return
child.sendline(f"trust {mac}")
child.expect_exact("trust succeeded", timeout=5)
child.expect_exact("[bluetoothctl]> ", timeout=5)
console.print("βœ… [green]Device trusted.")
child.sendline("scan off")
child.expect_exact("Discovery stopped", timeout=5)
child.expect_exact("[bluetoothctl]> ", timeout=5)
child.sendline("quit")
child.close()
console.print("πŸ”’ [green]Pairing process complete.")
except Exception as e:
console.print(f"❌ [red]Automated pairing failed: {e}")
async def connect_device():
global connected_client
try:
console.print("πŸ” [blue]Scanning for device...")
devices = await BleakScanner.discover(timeout=5.0)
found = next((d for d in devices if d.address.lower() == address.lower()), None)
if found:
console.print(f"βœ… [green]Found device:[/] {found.name}")
else:
console.print("⚠️ [yellow]Device not found, continuing anyway...")
connected_client = BleakClient(address, timeout=15)
await connected_client.connect()
console.print(f"πŸ”— [green]Connected to[/] {address}")
except Exception as e:
console.print(f"❌ [red]Failed to connect: {e}")
async def list_services():
global connected_client
if not connected_client or not connected_client.is_connected:
console.print("⚠️ [red]Not connected.")
return
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
csv_file = f"ble_services_{timestamp}.csv"
with open(csv_file, mode="w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Service UUID", "Char UUID", "Alias", "Handle (dec)", "Handle (hex)", "Properties", "Value"])
services = connected_client.services
for service in services:
console.rule(f"πŸ”§ Service UUID: {service.uuid}")
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Char UUID", style="cyan")
table.add_column("Alias", style="white")
table.add_column("Handle (dec)")
table.add_column("Handle (hex)", style="yellow")
table.add_column("Properties")
table.add_column("Value")
for char in service.characteristics:
props = ", ".join(char.properties)
val = ""
if "read" in char.properties:
try:
raw_val = await connected_client.read_gatt_char(char.uuid)
val = f"{raw_val.hex()} ({raw_val.decode(errors='ignore')})"
except:
val = "[red]Read error"
alias = UUID_ALIASES.get(char.uuid.lower(), "")
table.add_row(char.uuid, alias, str(char.handle), hex(char.handle), props, val)
writer.writerow([service.uuid, char.uuid, alias, char.handle, hex(char.handle), props, val])
console.print(table)
console.print(f"πŸ’Ύ [green]Exported to {csv_file}")
async def subscribe_notifications():
global connected_client
if not connected_client or not connected_client.is_connected:
console.print("⚠️ [red]Not connected.")
return
subscribed_uuids = []
services = connected_client.services
for service in services:
for char in service.characteristics:
if "notify" in char.properties or "indicate" in char.properties:
try:
await connected_client.start_notify(char.uuid, handle_notification)
subscribed_uuids.append(char.uuid)
console.print(f"βœ… [green]Subscribed to[/] {char.uuid}")
except Exception as e:
console.print(f"❌ [red]Could not subscribe to {char.uuid}: {e}")
console.print("⏳ [cyan]Listening for notifications. Press ENTER to stop...")
await asyncio.get_event_loop().run_in_executor(None, input)
for uuid in subscribed_uuids:
try:
await connected_client.stop_notify(uuid)
console.print(f"πŸ”• [green]Unsubscribed from {uuid}")
except:
pass
async def fuzz_characteristics():
global connected_client
if not connected_client or not connected_client.is_connected:
console.print("⚠️ [red]Not connected.")
return
console.print("⚠️ [yellow]Fuzzing characteristics β€” sending dummy data to writable characteristics.")
test_payloads = [b"\x00", b"\x01\x02", b"\xff" * 10, b"A" * 20]
services = connected_client.services
for service in services:
for char in service.characteristics:
if "write" in char.properties or "write-without-response" in char.properties:
for payload in test_payloads:
try:
await connected_client.write_gatt_char(char.uuid, payload)
console.print(f"πŸ§ͺ Sent {payload.hex()} to {char.uuid}")
if "read" in char.properties:
new_val = await connected_client.read_gatt_char(char.uuid)
console.print(f" πŸ“– New value: {new_val.hex()} ({new_val.decode(errors='ignore')})")
except Exception as e:
console.print(f"❌ [red]Failed to fuzz {char.uuid}: {e}")
async def menu():
global connected_client
while True:
console.print("\n[bold blue]--- BLE Menu ---")
console.print("1. πŸ”“ Pair & Trust Device")
console.print("2. πŸ”— Connect to Device")
console.print("3. πŸ“– List Services and Characteristics")
console.print("4. πŸ“¬ Listen for Notifications")
console.print("5. πŸ§ͺ Fuzz Characteristics (write)")
console.print("6. πŸ•΅οΈ Reverse Notification Payloads (built-in decoding)")
console.print("7. ❌ Exit")
choice = input("Select an option: ").strip()
if choice == "1":
await pair_with_device(address)
elif choice == "2":
await connect_device()
elif choice == "3":
await list_services()
elif choice == "4":
await subscribe_notifications()
elif choice == "5":
await fuzz_characteristics()
elif choice == "6":
console.print("[cyan]ℹ️ Notification payloads will be interpreted in ASCII + ushort when received.")
elif choice == "7":
if connected_client and connected_client.is_connected:
try:
for service in connected_client.services:
for char in service.characteristics:
if "notify" in char.properties or "indicate" in char.properties:
try:
await connected_client.stop_notify(char.uuid)
except:
pass
await connected_client.disconnect()
console.print("πŸ”Œ [green]Disconnected successfully.")
except:
pass
console.print("πŸ‘‹ [bold]Goodbye.")
break
else:
console.print("❌ [red]Invalid choice. Please enter a number between 1 and 7.")
asyncio.run(menu())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment