Skip to content

Instantly share code, notes, and snippets.

@marsmensch
Created January 20, 2026 11:35
Show Gist options
  • Select an option

  • Save marsmensch/5b9f42778f796c7d90c7ef2886794555 to your computer and use it in GitHub Desktop.

Select an option

Save marsmensch/5b9f42778f796c7d90c7ef2886794555 to your computer and use it in GitHub Desktop.
Avalon Mini 3 Bitcoin Miner - CGMiner API client and iOS integration guide
#!/usr/bin/env python3
"""
Avalon Mini 3 CGMiner API Client
Demonstrates reading and writing mining parameters via the unauthenticated
CGMiner API on port 4028. No authentication required.
Usage:
python3 avalon_cgminer_api.py --host 192.168.130.132 status
python3 avalon_cgminer_api.py --host 192.168.130.132 pools
python3 avalon_cgminer_api.py --host 192.168.130.132 addpool stratum+tcp://pool.example.com:3333 worker.name
python3 avalon_cgminer_api.py --host 192.168.130.132 switchpool 0
"""
import socket
import json
import argparse
import re
import sys
from typing import Optional, Dict, Any, List
from datetime import timedelta
class AvalonMini3:
"""CGMiner API client for Avalon Mini 3."""
def __init__(self, host: str, port: int = 4028, timeout: int = 10):
self.host = host
self.port = port
self.timeout = timeout
def command(self, cmd: str, param: str = None) -> Optional[Dict]:
"""Send command to CGMiner API."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
payload = {"command": cmd}
if param:
payload["parameter"] = param
sock.send(json.dumps(payload).encode())
response = b""
while True:
try:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
if b'\x00' in response:
break
except socket.timeout:
break
sock.close()
return json.loads(response.rstrip(b'\x00').decode())
except ConnectionRefusedError:
print(f"Error: Connection refused ({self.host}:{self.port})", file=sys.stderr)
return None
except socket.timeout:
print(f"Error: Connection timeout", file=sys.stderr)
return None
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return None
# Read commands
def version(self) -> Optional[Dict]:
return self.command("version")
def summary(self) -> Optional[Dict]:
return self.command("summary")
def stats(self) -> Optional[Dict]:
return self.command("stats")
def pools(self) -> Optional[Dict]:
return self.command("pools")
def devs(self) -> Optional[Dict]:
return self.command("devs")
def config(self) -> Optional[Dict]:
return self.command("config")
# Write commands
def switchpool(self, pool_id: int) -> Optional[Dict]:
return self.command("switchpool", str(pool_id))
def addpool(self, url: str, worker: str, password: str = "x") -> Optional[Dict]:
return self.command("addpool", f"{url},{worker},{password}")
def removepool(self, pool_id: int) -> Optional[Dict]:
return self.command("removepool", str(pool_id))
def enablepool(self, pool_id: int) -> Optional[Dict]:
return self.command("enablepool", str(pool_id))
def disablepool(self, pool_id: int) -> Optional[Dict]:
return self.command("disablepool", str(pool_id))
def poolpriority(self, pool_ids: List[int]) -> Optional[Dict]:
return self.command("poolpriority", ",".join(map(str, pool_ids)))
def parse_stats(self, stats: Dict) -> Dict[str, Any]:
"""Extract key metrics from stats response."""
if not stats or "STATS" not in stats:
return {}
for stat in stats["STATS"]:
if "MM ID0" not in stat:
continue
mm = stat["MM ID0"]
def get(pattern, default="0"):
match = re.search(pattern, mm)
return match.group(1) if match else default
# Parse power values
power_in, power_out = 0, 0
ps_match = re.search(r'PS\[([^\]]+)\]', mm)
if ps_match:
parts = ps_match.group(1).split()
if len(parts) > 4:
power_in = int(parts[1])
power_out = int(parts[4])
return {
"hashrate_ths": float(get(r'GHSspd\[([0-9.]+)\]')) / 1000,
"hashrate_avg_ths": float(get(r'GHSavg\[([0-9.]+)\]')) / 1000,
"hashrate_nominal_ths": float(get(r'GHSmm\[([0-9.]+)\]')) / 1000,
"elapsed_s": int(get(r'Elapsed\[(\d+)\]')),
"temp_hashboard_c": int(get(r'HBTemp\[(\d+)\]')),
"temp_max_c": int(get(r'TMax\[(\d+)\]')),
"temp_avg_c": int(get(r'TAvg\[(\d+)\]')),
"fan_rpm": int(get(r'Fan1\[(\d+)\]')),
"fan_pct": int(get(r'FanR\[(\d+)%?\]')),
"freq_mhz": float(get(r'Freq\[([0-9.]+)\]')),
"ping_ms": int(get(r'PING\[(\d+)\]')),
"power_in_w": power_in,
"power_out_w": power_out,
"work_mode": int(get(r'WORKMODE\[(\d+)\]', "1")),
"hw_errors": int(get(r'HW\[(\d+)\]')),
"dna": get(r'DNA\[([0-9a-f]+)\]', ''),
}
return {}
def format_uptime(seconds: int) -> str:
td = timedelta(seconds=seconds)
days = td.days
hours, rem = divmod(td.seconds, 3600)
mins, secs = divmod(rem, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if mins:
parts.append(f"{mins}m")
return " ".join(parts) or "0m"
def cmd_status(miner: AvalonMini3):
"""Show device status."""
version = miner.version()
if not version or "VERSION" not in version:
print("Failed to get device info")
return
v = version["VERSION"][0]
stats = miner.parse_stats(miner.stats())
print(f"\n{'='*50}")
print(f" {v.get('PROD', 'Avalon')} @ {miner.host}")
print(f"{'='*50}")
print(f" DNA: {v.get('DNA', 'N/A')}")
print(f" MAC: {v.get('MAC', 'N/A')}")
print(f" Firmware: {v.get('LVERSION', 'N/A')}")
if stats:
mode_names = {0: "Heater", 1: "Mining", 2: "Night"}
print(f"\n Hashrate: {stats['hashrate_ths']:.2f} TH/s (avg: {stats['hashrate_avg_ths']:.2f})")
print(f" Temp: {stats['temp_hashboard_c']}°C (max: {stats['temp_max_c']}°C)")
print(f" Fan: {stats['fan_rpm']} RPM ({stats['fan_pct']}%)")
print(f" Power: {stats['power_in_w']}W in / {stats['power_out_w']}W out")
print(f" Uptime: {format_uptime(stats['elapsed_s'])}")
print(f" Mode: {mode_names.get(stats['work_mode'], 'Unknown')}")
print(f" Ping: {stats['ping_ms']} ms")
print()
def cmd_pools(miner: AvalonMini3):
"""List configured pools."""
pools = miner.pools()
if not pools or "POOLS" not in pools:
print("Failed to get pools")
return
print(f"\n{'ID':<4} {'Status':<10} {'Pri':<4} {'Active':<7} {'Accepted':<10} URL")
print("-" * 80)
for p in pools["POOLS"]:
active = "*" if p.get("Stratum Active") else " "
print(f"{p['POOL']:<4} {p['Status']:<10} {p['Priority']:<4} {active:<7} {p['Accepted']:<10} {p['URL']}")
print(f" Worker: {p['User']}")
print()
def cmd_addpool(miner: AvalonMini3, url: str, worker: str, password: str = "x"):
"""Add a new pool."""
result = miner.addpool(url, worker, password)
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S":
print(f"Pool added: {url} ({worker})")
else:
print(f"Failed to add pool: {result}")
def cmd_removepool(miner: AvalonMini3, pool_id: int):
"""Remove a pool."""
result = miner.removepool(pool_id)
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S":
print(f"Pool {pool_id} removed")
else:
print(f"Failed to remove pool: {result}")
def cmd_switchpool(miner: AvalonMini3, pool_id: int):
"""Switch to a pool."""
result = miner.switchpool(pool_id)
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S":
print(f"Switched to pool {pool_id}")
else:
print(f"Failed to switch pool: {result}")
def cmd_enablepool(miner: AvalonMini3, pool_id: int):
"""Enable a pool."""
result = miner.enablepool(pool_id)
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S":
print(f"Pool {pool_id} enabled")
else:
print(f"Failed to enable pool: {result}")
def cmd_disablepool(miner: AvalonMini3, pool_id: int):
"""Disable a pool."""
result = miner.disablepool(pool_id)
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S":
print(f"Pool {pool_id} disabled")
else:
print(f"Failed to disable pool: {result}")
def cmd_raw(miner: AvalonMini3, command: str, param: str = None):
"""Send raw command and print JSON response."""
result = miner.command(command, param)
print(json.dumps(result, indent=2))
def main():
parser = argparse.ArgumentParser(description="Avalon Mini 3 CGMiner API Client")
parser.add_argument("--host", "-H", required=True, help="Miner IP address")
parser.add_argument("--port", "-p", type=int, default=4028, help="API port")
subparsers = parser.add_subparsers(dest="cmd", help="Command")
subparsers.add_parser("status", help="Show device status")
subparsers.add_parser("pools", help="List pools")
add_p = subparsers.add_parser("addpool", help="Add pool")
add_p.add_argument("url", help="Pool URL (stratum+tcp://...)")
add_p.add_argument("worker", help="Worker name")
add_p.add_argument("--password", default="x", help="Pool password")
rm_p = subparsers.add_parser("removepool", help="Remove pool")
rm_p.add_argument("pool_id", type=int, help="Pool ID")
sw_p = subparsers.add_parser("switchpool", help="Switch to pool")
sw_p.add_argument("pool_id", type=int, help="Pool ID")
en_p = subparsers.add_parser("enablepool", help="Enable pool")
en_p.add_argument("pool_id", type=int, help="Pool ID")
dis_p = subparsers.add_parser("disablepool", help="Disable pool")
dis_p.add_argument("pool_id", type=int, help="Pool ID")
raw_p = subparsers.add_parser("raw", help="Send raw command")
raw_p.add_argument("command", help="API command")
raw_p.add_argument("--param", help="Command parameter")
args = parser.parse_args()
if not args.cmd:
parser.print_help()
return
miner = AvalonMini3(args.host, args.port)
if args.cmd == "status":
cmd_status(miner)
elif args.cmd == "pools":
cmd_pools(miner)
elif args.cmd == "addpool":
cmd_addpool(miner, args.url, args.worker, args.password)
elif args.cmd == "removepool":
cmd_removepool(miner, args.pool_id)
elif args.cmd == "switchpool":
cmd_switchpool(miner, args.pool_id)
elif args.cmd == "enablepool":
cmd_enablepool(miner, args.pool_id)
elif args.cmd == "disablepool":
cmd_disablepool(miner, args.pool_id)
elif args.cmd == "raw":
cmd_raw(miner, args.command, args.param)
if __name__ == "__main__":
main()

Controlling the Avalon Mini 3 from iOS

I recently figured out how to talk to my Avalon Mini 3 Bitcoin miner programmatically. Turns out it's surprisingly simple - the device runs CGMiner and exposes a JSON API on port 4028 with zero authentication. Here's everything you need to build your own monitoring app.

The API

The Mini 3 speaks JSON over raw TCP. Open a socket to port 4028, send a command, read the response (null-terminated), done. No HTTP, no auth tokens, no handshakes.

{"command": "stats"}

That's it. The device returns a JSON blob with everything: hashrate, temperatures, fan speeds, power consumption, pool status.

Commands That Matter

Reading data:

  • version - firmware, DNA (unique chip ID), MAC address
  • stats - the big one: hashrate, temps, fans, power, uptime
  • pools - configured mining pools and their status
  • summary - share counts, uptime, efficiency

Changing settings:

  • switchpool + pool ID - switch active pool
  • addpool + "url,worker,password" - add a new pool
  • removepool + pool ID - delete a pool
  • enablepool / disablepool + pool ID - toggle pools

Parsing the Stats Response

The stats command returns device data in a weird format. Look for MM ID0 in the response - it's a string with bracketed key-value pairs:

GHSspd[40517.38] GHSavg[36635.22] HBTemp[68] TMax[79] Fan1[2364] FanR[76%] ...

Extract what you need with regex:

  • GHSspd - real-time hashrate (GH/s, divide by 1000 for TH/s)
  • GHSavg - average hashrate
  • HBTemp / TMax - hashboard and max temperature (Celsius)
  • Fan1 / FanR - fan RPM and percentage
  • PS[x x x x x] - power stats (index 1 = input watts, index 4 = output watts)
  • WORKMODE - 0=Heater, 1=Mining, 2=Night
  • Elapsed - uptime in seconds

iOS Implementation Notes

Use Network.framework with NWConnection for TCP. The API is simple enough that you don't need a full HTTP client - just raw socket operations.

Key points:

  • Response ends with null byte (\0), strip it before JSON parsing
  • Keep connections short-lived, don't try to maintain persistent sockets
  • 10 second timeout is reasonable
  • Poll every 5 seconds for live dashboards, 30 seconds for background monitoring

Network Discovery

To find miners on your network, iterate through your subnet and try the version command on port 4028. If you get a response containing "Avalon", you found one.

Security Warning

The API has no authentication whatsoever. Anyone on your local network can read your mining stats, change your pools, or redirect your hashrate elsewhere. Keep your miners on a trusted network or behind a firewall.

What About the Web Interface?

The web UI on port 80 uses QR code authentication with a proprietary hash algorithm. I've reverse engineered part of it:

  • code field = SHA256(DNA)[:24]
  • auth field = unknown (custom firmware algorithm)

For now, the CGMiner API gives you everything you need without dealing with auth. The web interface is only necessary for firmware updates.


Tested on Avalon Mini 3 with firmware 25022401_cb28ba7, CGMiner 4.11.1, API 3.7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment