Skip to content

Instantly share code, notes, and snippets.

@ryanteck
Created July 8, 2025 16:48
Show Gist options
  • Select an option

  • Save ryanteck/255f4890cc141ff372821662895d6c25 to your computer and use it in GitHub Desktop.

Select an option

Save ryanteck/255f4890cc141ff372821662895d6c25 to your computer and use it in GitHub Desktop.

Revisions

  1. ryanteck created this gist Jul 8, 2025.
    226 changes: 226 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,226 @@
    # Solar Charger Code
    # (C) Ryan Walmsley 2023

    # This code will monitor and calculate the average solar export from the house, then charge from it.
    # It then instructs my charger (an EO Mini Pro) to then charge at the set rate.

    from requests import get, post, exceptions
    import requests
    from pprint import pprint
    import json
    from time import sleep
    from math import floor
    import threading
    import datetime
    from serial import rs485
    import datetime
    import sentry_sdk
    sentry_sdk.init(
    dsn="<SENTRY KEY>",

    # Set traces_sample_rate to 1.0 to capture 100%
    # of transactions for performance monitoring.
    # We recommend adjusting this value in production.
    traces_sample_rate=1.0
    )

    history = []

    # Setup the API

    hassio_base_url = "<HASSIO>"
    power_consumption_url = "sensor.home_instant_electricity"
    eo_target_current = "sensor.eomini_target_current"
    car_charger_url = "sensor.car_monitoring_car_charger_w"
    set_charge_rate_url = "sensor.set_car_charge_amps"
    charger_relay_url = "switch.chargerrelay_charger_relay"
    average_url = "sensor.charger_average"
    hassio_key = "<<HASSIOKEY>>"
    headers = {
    "Authorization": "Bearer "+ hassio_key,
    "content-type": "application/json",
    }

    # Current Switch Settings - The letters / numbers on the switch match up to these.
    charge_switch_currents = {
    '0': 0, '1': 6, '2': 8, '3': 10, '4': 13, '5': 15, '6': 16, '7': 18, '8': 20,
    '9': 22, 'A': 24, 'B': 25, 'C': 26, 'D': 28, 'E': 30, 'F': 32
    }

    # Convert charge dutys from the charger to amps, times by 0.6 for fast chargers.
    # Doesn't have the extra maths for fast charging but not required.
    def convert_charge_hex_to_amps(charge_hex):
    charge_hex_converted = int(charge_hex, 16)
    charge_amps = int((charge_hex_converted * 0.6)/10)

    return charge_amps

    # Convert charge dutys from the amps to duty, divide by 0.6 for fast chargers.
    # Doesn't have the extra maths for fast charging but not required.
    def convert_charge_amps_to_hex(charge_amps):
    if(charge_amps < 5.5):
    charge_amps = 0
    charge_duty = (int(charge_amps / 0.6)*10)
    charge_hex = format(charge_duty, 'X')
    if(len(charge_hex)<3):
    charge_hex = '0' + charge_hex
    return charge_hex

    # Convert charge dutys from the amps to duty, divide by 0.6 for fast chargers.
    # Doesn't have the extra maths for rapid charging but not required.
    def convert_charge_watts_to_hex(charge_amps):
    pprint(charge_amps)
    pprint(floor(charge_amps*4)/4)
    if(charge_amps < 6.0):
    charge_amps = 0
    charge_duty = (int(charge_amps / 0.6)*10)
    charge_hex = format(charge_duty, 'X')
    if(len(charge_hex)<3):
    charge_hex = '0' + charge_hex
    return charge_hex

    # Main function to get the current export from the house, and consumption from the charger, and calculate how many amps to export at.
    def calculate_charge():
    global charge_ampage
    while True:
    try:
    response_pc = get(hassio_base_url+power_consumption_url, headers=headers)
    except requests.ConnectionError:
    response_pc = ""

    try:
    response_cc = get(hassio_base_url+car_charger_url, headers=headers)
    except requests.ConnectionError:
    response_pc = ""

    try:
    power_consumption = float(response_pc.json()['state'])
    except ValueError:
    power_consumption = float(0)
    except AttributeError:
    power_consumption = float(0)
    try:
    charger_consumption = float(response_cc.json()['state'])
    except ValueError:
    charger_consumption = float(0)
    except AttributeError:
    power_consumption = float(0)
    history.append((power_consumption - charger_consumption))
    average = (round((sum(history)/len(history))))
    pprint("Averages")
    pprint(average)
    average = average + 100
    pprint(average)
    average_json = {"state": float(average), "attributes": {"unit_of_measurement": "W"}}
    average_response = post(hassio_base_url+average_url, headers=headers, json=average_json)
    charge_rate = floor((-average / 240)*4)/4
    charge_rate = charge_rate - 0.5
    pprint(charge_rate)
    if(charge_rate < 5.5):
    charge_rate = 0
    charge_ampage = charge_rate
    #pprint(str(-average) + " " + str(charge_rate))
    set_charge_rate = {"state": float(charge_rate), "attributes": {"unit_of_measurement": "A"}}
    set_charge_rate_response = post(hassio_base_url+set_charge_rate_url, headers=headers, json=set_charge_rate)
    if(len(history)>=15):
    history.pop(0)
    sleep(5)

    def manage_charger():
    global charge_ampage
    global actual_charge_ampage
    while True:
    try:
    hassio_response_manual_override = get(hassio_base_url+"input_boolean.charge_car_override", headers=headers).json()
    if hassio_response_manual_override['state'] == 'off':
    pprint("Solar Charging Enabled")
    pprint("Solar Export Amps: " + str(charge_ampage))
    actual_charge_ampage = charge_ampage
    else:
    pprint("Manual Charge Override")
    try:
    hassio_response_manual_current = get(hassio_base_url+"input_number.car_charge_amps", headers=headers).json()
    actual_charge_ampage = int(float(hassio_response_manual_current['state']))
    pprint("Manual Current Amps: " + str(actual_charge_ampage))
    except requests.ConnectionError:
    pprint("HASSIO Override Connection Issue, Disable charging for now.")
    actual_charge_ampage = 0
    except requests.ConnectionError:
    pprint("HASSIO Override Connection Issue, Disable charging for now.")
    actual_charge_ampage = 0
    except KeyError:
    pprint("HASSIO Override Connection Issue, Disable charging for now.")
    actual_charge_ampage = 0

    data = []
    built_command = '>0000' + convert_charge_amps_to_hex(actual_charge_ampage) + "\r"
    pprint("Charge Current Set To: " + str(actual_charge_ampage))
    pprint(built_command)
    for c in built_command.encode("ascii"):
    data.append(int(c))
    ser.write(data)
    packet = ser.readline()
    pprint(packet)
    variables = {}
    variables['target_current'] = actual_charge_ampage
    variables['version'] = str(packet[1:3])
    variables['current_switch_setting'] = charge_switch_currents[packet[3:4].decode('utf-8')] # Responds with Hex 7 which is 18A if correct.
    variables['control_pilot_voltage'] = int(packet[4:7], 16)
    variables['charge_duty'] = convert_charge_hex_to_amps(packet[7:10])
    variables['plug_present_voltage'] = int(packet[10:13], 16)
    variables['live_voltage'] = int(packet[13:16], 16)
    variables['neutral_voltage'] = int(packet[16:19], 16)
    variables['daylight_detection'] = str(packet[19:22])
    variables['mains_frequency'] = int(packet[22:25], 16)
    variables['charger_state'] = str(packet[25:27])
    variables['relay_state'] = int(packet[27:28], 16)
    variables['plug_state'] = int(packet[28:29],16)
    variables['HUB_duty_limit'] = convert_charge_hex_to_amps(packet[29:32])
    variables['charge_duty_timer'] = str(packet[32:36])
    variables['station_uptime'] = str(datetime.timedelta(minutes=int(packet[36:40], 16)))
    variables['charge_time'] = str(datetime.timedelta(minutes=int(packet[40:44], 16)))
    variables['state_of_mains'] = str(packet[44:46])
    variables['cp_line_state'] = str(packet[46])
    variables['station_ID'] = int(packet[47:48],16)
    variables['random_value'] = str(packet[48:50])
    variables['max_current'] = convert_charge_hex_to_amps(packet[50:53])
    variables['persistant_ID'] = int(packet[53:61],16)
    variables['checksum'] = str(packet[53:55]) # Modified to 53-55 as these are the last chars. Looks correct.
    for variable in variables.items():
    pprint(variable)
    target_current_json = {"state": float(variables['target_current']), "attributes": {"unit_of_measurement": "A"}}
    try:
    target_current_response = post(hassio_base_url+eo_target_current, headers=headers, json=target_current_json)
    except requests.ConnectionError:
    pprint("HASSIO Override Connection Issue, can't feed back.")

    #if( variables['target_current'] > 0 ):
    # try:
    # hassio_relay_post = post(hassio_base_url+charger_relay_url, headers=headers, json={"state": "on"})
    # except:
    # pprint("Failed to change relay")
    #else:
    # try:
    # hassio_relay_post = post(hassio_base_url+charger_relay_url, headers=headers, json={"state": "off"})
    # except:
    # pprint("Failed to change relay")
    sleep(10)

    if __name__ == "__main__":
    ser = rs485.RS485()
    ser.port = "/dev/ttyUSB0"
    ser.baudrate = 115200
    ser.timeout = 0
    ser.baudrate = 115200
    ser.rs485_mode = rs485.RS485Settings(rts_level_for_tx=False, rts_level_for_rx=True, delay_before_rx=0)
    ser.timeout = 0.01
    ser.open()
    charge_ampage = 0
    actual_charge_ampage = 0
    calculate_thread = threading.Thread(target=calculate_charge)
    charger_thread = threading.Thread(target=manage_charger)
    calculate_thread.start()
    charger_thread.start()

    while True:
    sleep(1)