Created
July 8, 2025 16:48
-
-
Save ryanteck/255f4890cc141ff372821662895d6c25 to your computer and use it in GitHub Desktop.
Revisions
-
ryanteck created this gist
Jul 8, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,226 @@ # Solar Charger Code # (C) Ryan Walmsley 2023 # This code will monitor and calculate the average solar export from the house, then charge from it. # It then instructs my charger (an EO Mini Pro) to then charge at the set rate. from requests import get, post, exceptions import requests from pprint import pprint import json from time import sleep from math import floor import threading import datetime from serial import rs485 import datetime import sentry_sdk sentry_sdk.init( dsn="<SENTRY KEY>", # Set traces_sample_rate to 1.0 to capture 100% # of transactions for performance monitoring. # We recommend adjusting this value in production. traces_sample_rate=1.0 ) history = [] # Setup the API hassio_base_url = "<HASSIO>" power_consumption_url = "sensor.home_instant_electricity" eo_target_current = "sensor.eomini_target_current" car_charger_url = "sensor.car_monitoring_car_charger_w" set_charge_rate_url = "sensor.set_car_charge_amps" charger_relay_url = "switch.chargerrelay_charger_relay" average_url = "sensor.charger_average" hassio_key = "<<HASSIOKEY>>" headers = { "Authorization": "Bearer "+ hassio_key, "content-type": "application/json", } # Current Switch Settings - The letters / numbers on the switch match up to these. charge_switch_currents = { '0': 0, '1': 6, '2': 8, '3': 10, '4': 13, '5': 15, '6': 16, '7': 18, '8': 20, '9': 22, 'A': 24, 'B': 25, 'C': 26, 'D': 28, 'E': 30, 'F': 32 } # Convert charge dutys from the charger to amps, times by 0.6 for fast chargers. # Doesn't have the extra maths for fast charging but not required. def convert_charge_hex_to_amps(charge_hex): charge_hex_converted = int(charge_hex, 16) charge_amps = int((charge_hex_converted * 0.6)/10) return charge_amps # Convert charge dutys from the amps to duty, divide by 0.6 for fast chargers. # Doesn't have the extra maths for fast charging but not required. def convert_charge_amps_to_hex(charge_amps): if(charge_amps < 5.5): charge_amps = 0 charge_duty = (int(charge_amps / 0.6)*10) charge_hex = format(charge_duty, 'X') if(len(charge_hex)<3): charge_hex = '0' + charge_hex return charge_hex # Convert charge dutys from the amps to duty, divide by 0.6 for fast chargers. # Doesn't have the extra maths for rapid charging but not required. def convert_charge_watts_to_hex(charge_amps): pprint(charge_amps) pprint(floor(charge_amps*4)/4) if(charge_amps < 6.0): charge_amps = 0 charge_duty = (int(charge_amps / 0.6)*10) charge_hex = format(charge_duty, 'X') if(len(charge_hex)<3): charge_hex = '0' + charge_hex return charge_hex # Main function to get the current export from the house, and consumption from the charger, and calculate how many amps to export at. def calculate_charge(): global charge_ampage while True: try: response_pc = get(hassio_base_url+power_consumption_url, headers=headers) except requests.ConnectionError: response_pc = "" try: response_cc = get(hassio_base_url+car_charger_url, headers=headers) except requests.ConnectionError: response_pc = "" try: power_consumption = float(response_pc.json()['state']) except ValueError: power_consumption = float(0) except AttributeError: power_consumption = float(0) try: charger_consumption = float(response_cc.json()['state']) except ValueError: charger_consumption = float(0) except AttributeError: power_consumption = float(0) history.append((power_consumption - charger_consumption)) average = (round((sum(history)/len(history)))) pprint("Averages") pprint(average) average = average + 100 pprint(average) average_json = {"state": float(average), "attributes": {"unit_of_measurement": "W"}} average_response = post(hassio_base_url+average_url, headers=headers, json=average_json) charge_rate = floor((-average / 240)*4)/4 charge_rate = charge_rate - 0.5 pprint(charge_rate) if(charge_rate < 5.5): charge_rate = 0 charge_ampage = charge_rate #pprint(str(-average) + " " + str(charge_rate)) set_charge_rate = {"state": float(charge_rate), "attributes": {"unit_of_measurement": "A"}} set_charge_rate_response = post(hassio_base_url+set_charge_rate_url, headers=headers, json=set_charge_rate) if(len(history)>=15): history.pop(0) sleep(5) def manage_charger(): global charge_ampage global actual_charge_ampage while True: try: hassio_response_manual_override = get(hassio_base_url+"input_boolean.charge_car_override", headers=headers).json() if hassio_response_manual_override['state'] == 'off': pprint("Solar Charging Enabled") pprint("Solar Export Amps: " + str(charge_ampage)) actual_charge_ampage = charge_ampage else: pprint("Manual Charge Override") try: hassio_response_manual_current = get(hassio_base_url+"input_number.car_charge_amps", headers=headers).json() actual_charge_ampage = int(float(hassio_response_manual_current['state'])) pprint("Manual Current Amps: " + str(actual_charge_ampage)) except requests.ConnectionError: pprint("HASSIO Override Connection Issue, Disable charging for now.") actual_charge_ampage = 0 except requests.ConnectionError: pprint("HASSIO Override Connection Issue, Disable charging for now.") actual_charge_ampage = 0 except KeyError: pprint("HASSIO Override Connection Issue, Disable charging for now.") actual_charge_ampage = 0 data = [] built_command = '>0000' + convert_charge_amps_to_hex(actual_charge_ampage) + "\r" pprint("Charge Current Set To: " + str(actual_charge_ampage)) pprint(built_command) for c in built_command.encode("ascii"): data.append(int(c)) ser.write(data) packet = ser.readline() pprint(packet) variables = {} variables['target_current'] = actual_charge_ampage variables['version'] = str(packet[1:3]) variables['current_switch_setting'] = charge_switch_currents[packet[3:4].decode('utf-8')] # Responds with Hex 7 which is 18A if correct. variables['control_pilot_voltage'] = int(packet[4:7], 16) variables['charge_duty'] = convert_charge_hex_to_amps(packet[7:10]) variables['plug_present_voltage'] = int(packet[10:13], 16) variables['live_voltage'] = int(packet[13:16], 16) variables['neutral_voltage'] = int(packet[16:19], 16) variables['daylight_detection'] = str(packet[19:22]) variables['mains_frequency'] = int(packet[22:25], 16) variables['charger_state'] = str(packet[25:27]) variables['relay_state'] = int(packet[27:28], 16) variables['plug_state'] = int(packet[28:29],16) variables['HUB_duty_limit'] = convert_charge_hex_to_amps(packet[29:32]) variables['charge_duty_timer'] = str(packet[32:36]) variables['station_uptime'] = str(datetime.timedelta(minutes=int(packet[36:40], 16))) variables['charge_time'] = str(datetime.timedelta(minutes=int(packet[40:44], 16))) variables['state_of_mains'] = str(packet[44:46]) variables['cp_line_state'] = str(packet[46]) variables['station_ID'] = int(packet[47:48],16) variables['random_value'] = str(packet[48:50]) variables['max_current'] = convert_charge_hex_to_amps(packet[50:53]) variables['persistant_ID'] = int(packet[53:61],16) variables['checksum'] = str(packet[53:55]) # Modified to 53-55 as these are the last chars. Looks correct. for variable in variables.items(): pprint(variable) target_current_json = {"state": float(variables['target_current']), "attributes": {"unit_of_measurement": "A"}} try: target_current_response = post(hassio_base_url+eo_target_current, headers=headers, json=target_current_json) except requests.ConnectionError: pprint("HASSIO Override Connection Issue, can't feed back.") #if( variables['target_current'] > 0 ): # try: # hassio_relay_post = post(hassio_base_url+charger_relay_url, headers=headers, json={"state": "on"}) # except: # pprint("Failed to change relay") #else: # try: # hassio_relay_post = post(hassio_base_url+charger_relay_url, headers=headers, json={"state": "off"}) # except: # pprint("Failed to change relay") sleep(10) if __name__ == "__main__": ser = rs485.RS485() ser.port = "/dev/ttyUSB0" ser.baudrate = 115200 ser.timeout = 0 ser.baudrate = 115200 ser.rs485_mode = rs485.RS485Settings(rts_level_for_tx=False, rts_level_for_rx=True, delay_before_rx=0) ser.timeout = 0.01 ser.open() charge_ampage = 0 actual_charge_ampage = 0 calculate_thread = threading.Thread(target=calculate_charge) charger_thread = threading.Thread(target=manage_charger) calculate_thread.start() charger_thread.start() while True: sleep(1)