|
#!/usr/bin/env python3 |
|
""" |
|
Avalon Mini 3 CGMiner API Client |
|
|
|
Demonstrates reading and writing mining parameters via the unauthenticated |
|
CGMiner API on port 4028. No authentication required. |
|
|
|
Usage: |
|
python3 avalon_cgminer_api.py --host 192.168.130.132 status |
|
python3 avalon_cgminer_api.py --host 192.168.130.132 pools |
|
python3 avalon_cgminer_api.py --host 192.168.130.132 addpool stratum+tcp://pool.example.com:3333 worker.name |
|
python3 avalon_cgminer_api.py --host 192.168.130.132 switchpool 0 |
|
""" |
|
|
|
import socket |
|
import json |
|
import argparse |
|
import re |
|
import sys |
|
from typing import Optional, Dict, Any, List |
|
from datetime import timedelta |
|
|
|
|
|
class AvalonMini3: |
|
"""CGMiner API client for Avalon Mini 3.""" |
|
|
|
def __init__(self, host: str, port: int = 4028, timeout: int = 10): |
|
self.host = host |
|
self.port = port |
|
self.timeout = timeout |
|
|
|
def command(self, cmd: str, param: str = None) -> Optional[Dict]: |
|
"""Send command to CGMiner API.""" |
|
try: |
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
sock.settimeout(self.timeout) |
|
sock.connect((self.host, self.port)) |
|
|
|
payload = {"command": cmd} |
|
if param: |
|
payload["parameter"] = param |
|
|
|
sock.send(json.dumps(payload).encode()) |
|
|
|
response = b"" |
|
while True: |
|
try: |
|
chunk = sock.recv(4096) |
|
if not chunk: |
|
break |
|
response += chunk |
|
if b'\x00' in response: |
|
break |
|
except socket.timeout: |
|
break |
|
|
|
sock.close() |
|
return json.loads(response.rstrip(b'\x00').decode()) |
|
|
|
except ConnectionRefusedError: |
|
print(f"Error: Connection refused ({self.host}:{self.port})", file=sys.stderr) |
|
return None |
|
except socket.timeout: |
|
print(f"Error: Connection timeout", file=sys.stderr) |
|
return None |
|
except Exception as e: |
|
print(f"Error: {e}", file=sys.stderr) |
|
return None |
|
|
|
# Read commands |
|
def version(self) -> Optional[Dict]: |
|
return self.command("version") |
|
|
|
def summary(self) -> Optional[Dict]: |
|
return self.command("summary") |
|
|
|
def stats(self) -> Optional[Dict]: |
|
return self.command("stats") |
|
|
|
def pools(self) -> Optional[Dict]: |
|
return self.command("pools") |
|
|
|
def devs(self) -> Optional[Dict]: |
|
return self.command("devs") |
|
|
|
def config(self) -> Optional[Dict]: |
|
return self.command("config") |
|
|
|
# Write commands |
|
def switchpool(self, pool_id: int) -> Optional[Dict]: |
|
return self.command("switchpool", str(pool_id)) |
|
|
|
def addpool(self, url: str, worker: str, password: str = "x") -> Optional[Dict]: |
|
return self.command("addpool", f"{url},{worker},{password}") |
|
|
|
def removepool(self, pool_id: int) -> Optional[Dict]: |
|
return self.command("removepool", str(pool_id)) |
|
|
|
def enablepool(self, pool_id: int) -> Optional[Dict]: |
|
return self.command("enablepool", str(pool_id)) |
|
|
|
def disablepool(self, pool_id: int) -> Optional[Dict]: |
|
return self.command("disablepool", str(pool_id)) |
|
|
|
def poolpriority(self, pool_ids: List[int]) -> Optional[Dict]: |
|
return self.command("poolpriority", ",".join(map(str, pool_ids))) |
|
|
|
def parse_stats(self, stats: Dict) -> Dict[str, Any]: |
|
"""Extract key metrics from stats response.""" |
|
if not stats or "STATS" not in stats: |
|
return {} |
|
|
|
for stat in stats["STATS"]: |
|
if "MM ID0" not in stat: |
|
continue |
|
|
|
mm = stat["MM ID0"] |
|
|
|
def get(pattern, default="0"): |
|
match = re.search(pattern, mm) |
|
return match.group(1) if match else default |
|
|
|
# Parse power values |
|
power_in, power_out = 0, 0 |
|
ps_match = re.search(r'PS\[([^\]]+)\]', mm) |
|
if ps_match: |
|
parts = ps_match.group(1).split() |
|
if len(parts) > 4: |
|
power_in = int(parts[1]) |
|
power_out = int(parts[4]) |
|
|
|
return { |
|
"hashrate_ths": float(get(r'GHSspd\[([0-9.]+)\]')) / 1000, |
|
"hashrate_avg_ths": float(get(r'GHSavg\[([0-9.]+)\]')) / 1000, |
|
"hashrate_nominal_ths": float(get(r'GHSmm\[([0-9.]+)\]')) / 1000, |
|
"elapsed_s": int(get(r'Elapsed\[(\d+)\]')), |
|
"temp_hashboard_c": int(get(r'HBTemp\[(\d+)\]')), |
|
"temp_max_c": int(get(r'TMax\[(\d+)\]')), |
|
"temp_avg_c": int(get(r'TAvg\[(\d+)\]')), |
|
"fan_rpm": int(get(r'Fan1\[(\d+)\]')), |
|
"fan_pct": int(get(r'FanR\[(\d+)%?\]')), |
|
"freq_mhz": float(get(r'Freq\[([0-9.]+)\]')), |
|
"ping_ms": int(get(r'PING\[(\d+)\]')), |
|
"power_in_w": power_in, |
|
"power_out_w": power_out, |
|
"work_mode": int(get(r'WORKMODE\[(\d+)\]', "1")), |
|
"hw_errors": int(get(r'HW\[(\d+)\]')), |
|
"dna": get(r'DNA\[([0-9a-f]+)\]', ''), |
|
} |
|
|
|
return {} |
|
|
|
|
|
def format_uptime(seconds: int) -> str: |
|
td = timedelta(seconds=seconds) |
|
days = td.days |
|
hours, rem = divmod(td.seconds, 3600) |
|
mins, secs = divmod(rem, 60) |
|
parts = [] |
|
if days: |
|
parts.append(f"{days}d") |
|
if hours: |
|
parts.append(f"{hours}h") |
|
if mins: |
|
parts.append(f"{mins}m") |
|
return " ".join(parts) or "0m" |
|
|
|
|
|
def cmd_status(miner: AvalonMini3): |
|
"""Show device status.""" |
|
version = miner.version() |
|
if not version or "VERSION" not in version: |
|
print("Failed to get device info") |
|
return |
|
|
|
v = version["VERSION"][0] |
|
stats = miner.parse_stats(miner.stats()) |
|
|
|
print(f"\n{'='*50}") |
|
print(f" {v.get('PROD', 'Avalon')} @ {miner.host}") |
|
print(f"{'='*50}") |
|
print(f" DNA: {v.get('DNA', 'N/A')}") |
|
print(f" MAC: {v.get('MAC', 'N/A')}") |
|
print(f" Firmware: {v.get('LVERSION', 'N/A')}") |
|
|
|
if stats: |
|
mode_names = {0: "Heater", 1: "Mining", 2: "Night"} |
|
print(f"\n Hashrate: {stats['hashrate_ths']:.2f} TH/s (avg: {stats['hashrate_avg_ths']:.2f})") |
|
print(f" Temp: {stats['temp_hashboard_c']}°C (max: {stats['temp_max_c']}°C)") |
|
print(f" Fan: {stats['fan_rpm']} RPM ({stats['fan_pct']}%)") |
|
print(f" Power: {stats['power_in_w']}W in / {stats['power_out_w']}W out") |
|
print(f" Uptime: {format_uptime(stats['elapsed_s'])}") |
|
print(f" Mode: {mode_names.get(stats['work_mode'], 'Unknown')}") |
|
print(f" Ping: {stats['ping_ms']} ms") |
|
print() |
|
|
|
|
|
def cmd_pools(miner: AvalonMini3): |
|
"""List configured pools.""" |
|
pools = miner.pools() |
|
if not pools or "POOLS" not in pools: |
|
print("Failed to get pools") |
|
return |
|
|
|
print(f"\n{'ID':<4} {'Status':<10} {'Pri':<4} {'Active':<7} {'Accepted':<10} URL") |
|
print("-" * 80) |
|
|
|
for p in pools["POOLS"]: |
|
active = "*" if p.get("Stratum Active") else " " |
|
print(f"{p['POOL']:<4} {p['Status']:<10} {p['Priority']:<4} {active:<7} {p['Accepted']:<10} {p['URL']}") |
|
print(f" Worker: {p['User']}") |
|
print() |
|
|
|
|
|
def cmd_addpool(miner: AvalonMini3, url: str, worker: str, password: str = "x"): |
|
"""Add a new pool.""" |
|
result = miner.addpool(url, worker, password) |
|
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S": |
|
print(f"Pool added: {url} ({worker})") |
|
else: |
|
print(f"Failed to add pool: {result}") |
|
|
|
|
|
def cmd_removepool(miner: AvalonMini3, pool_id: int): |
|
"""Remove a pool.""" |
|
result = miner.removepool(pool_id) |
|
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S": |
|
print(f"Pool {pool_id} removed") |
|
else: |
|
print(f"Failed to remove pool: {result}") |
|
|
|
|
|
def cmd_switchpool(miner: AvalonMini3, pool_id: int): |
|
"""Switch to a pool.""" |
|
result = miner.switchpool(pool_id) |
|
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S": |
|
print(f"Switched to pool {pool_id}") |
|
else: |
|
print(f"Failed to switch pool: {result}") |
|
|
|
|
|
def cmd_enablepool(miner: AvalonMini3, pool_id: int): |
|
"""Enable a pool.""" |
|
result = miner.enablepool(pool_id) |
|
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S": |
|
print(f"Pool {pool_id} enabled") |
|
else: |
|
print(f"Failed to enable pool: {result}") |
|
|
|
|
|
def cmd_disablepool(miner: AvalonMini3, pool_id: int): |
|
"""Disable a pool.""" |
|
result = miner.disablepool(pool_id) |
|
if result and result.get("STATUS", [{}])[0].get("STATUS") == "S": |
|
print(f"Pool {pool_id} disabled") |
|
else: |
|
print(f"Failed to disable pool: {result}") |
|
|
|
|
|
def cmd_raw(miner: AvalonMini3, command: str, param: str = None): |
|
"""Send raw command and print JSON response.""" |
|
result = miner.command(command, param) |
|
print(json.dumps(result, indent=2)) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Avalon Mini 3 CGMiner API Client") |
|
parser.add_argument("--host", "-H", required=True, help="Miner IP address") |
|
parser.add_argument("--port", "-p", type=int, default=4028, help="API port") |
|
|
|
subparsers = parser.add_subparsers(dest="cmd", help="Command") |
|
|
|
subparsers.add_parser("status", help="Show device status") |
|
subparsers.add_parser("pools", help="List pools") |
|
|
|
add_p = subparsers.add_parser("addpool", help="Add pool") |
|
add_p.add_argument("url", help="Pool URL (stratum+tcp://...)") |
|
add_p.add_argument("worker", help="Worker name") |
|
add_p.add_argument("--password", default="x", help="Pool password") |
|
|
|
rm_p = subparsers.add_parser("removepool", help="Remove pool") |
|
rm_p.add_argument("pool_id", type=int, help="Pool ID") |
|
|
|
sw_p = subparsers.add_parser("switchpool", help="Switch to pool") |
|
sw_p.add_argument("pool_id", type=int, help="Pool ID") |
|
|
|
en_p = subparsers.add_parser("enablepool", help="Enable pool") |
|
en_p.add_argument("pool_id", type=int, help="Pool ID") |
|
|
|
dis_p = subparsers.add_parser("disablepool", help="Disable pool") |
|
dis_p.add_argument("pool_id", type=int, help="Pool ID") |
|
|
|
raw_p = subparsers.add_parser("raw", help="Send raw command") |
|
raw_p.add_argument("command", help="API command") |
|
raw_p.add_argument("--param", help="Command parameter") |
|
|
|
args = parser.parse_args() |
|
|
|
if not args.cmd: |
|
parser.print_help() |
|
return |
|
|
|
miner = AvalonMini3(args.host, args.port) |
|
|
|
if args.cmd == "status": |
|
cmd_status(miner) |
|
elif args.cmd == "pools": |
|
cmd_pools(miner) |
|
elif args.cmd == "addpool": |
|
cmd_addpool(miner, args.url, args.worker, args.password) |
|
elif args.cmd == "removepool": |
|
cmd_removepool(miner, args.pool_id) |
|
elif args.cmd == "switchpool": |
|
cmd_switchpool(miner, args.pool_id) |
|
elif args.cmd == "enablepool": |
|
cmd_enablepool(miner, args.pool_id) |
|
elif args.cmd == "disablepool": |
|
cmd_disablepool(miner, args.pool_id) |
|
elif args.cmd == "raw": |
|
cmd_raw(miner, args.command, args.param) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |