Skip to content

Instantly share code, notes, and snippets.

@nehalvpatel
Last active July 2, 2020 16:02
Show Gist options
  • Select an option

  • Save nehalvpatel/ed8a217d903006c3a6ef914964e4301e to your computer and use it in GitHub Desktop.

Select an option

Save nehalvpatel/ed8a217d903006c3a6ef914964e4301e to your computer and use it in GitHub Desktop.
import asyncio
import requests
import time
import os
import sys
from bleak import discover, BleakError
BLUE_RADIOS_CID = 133
STATIC_DATA_TID = 0x3C
TEMPERATURE_DATA_STRUCTURE_ID = 0x43
SENSORBUG_ADDRESS = "6D876CEA-F283-4BD6-A89C-AC2F3BF2C39A"
BATTERY_LEVEL_FOR_ALERT = 20
REFRESH_TIMEOUT_SECONDS = 5
def notify(title, text):
os.system("""osascript -e 'display notification "{}" with title "{}"'""".format(text, title))
async def run():
issuedBatteryWarningAlready = False
issuedNetworkAlertAlready = False
while True:
time.sleep(REFRESH_TIMEOUT_SECONDS)
print()
try:
all_devices = await discover()
for device in all_devices:
if device.address != SENSORBUG_ADDRESS:
continue
if "manufacturer_data" not in device.metadata or BLUE_RADIOS_CID not in device.metadata["manufacturer_data"]:
notify("SensorBug", "Could not find manufacturer data.")
return
data_bytes = device.metadata["manufacturer_data"][BLUE_RADIOS_CID]
if (data_bytes[2] == STATIC_DATA_TID):
battery_level = data_bytes[3]
config_counter = data_bytes[4]
if battery_level <= BATTERY_LEVEL_FOR_ALERT and issuedBatteryWarningAlready == False:
notify("SensorBug", "Battery level is near " + str(battery_level) + "%.")
issuedBatteryWarningAlready = True
idx = 5
while (idx < len(data_bytes)):
if (data_bytes[idx] == TEMPERATURE_DATA_STRUCTURE_ID):
# Convert next two bytes to int16.
temp_sensor_raw_value = int.from_bytes(data_bytes[idx+1:idx+3], byteorder=sys.byteorder)
temp_celsius = temp_sensor_raw_value * 0.0625
temp_fahrenheit = 9.0/5.0 * temp_celsius + 32
print(time.ctime())
print("Battery Level: " + str(battery_level) + "%")
print("Config Counter: " + str(config_counter))
print("Celsius: " + str(temp_celsius))
print("Temperature: " + str(temp_fahrenheit))
print(requests.get("https://www.example.com/bedroom/temperature.php?password=example&set_to=" + str(temp_celsius)).text)
idx += 3
else:
idx += 1
except requests.exceptions.RequestException as e:
if issuedNetworkAlertAlready == False:
notify("SensorBug", "Could not send temperature value to server.")
issuedNetworkAlertAlready = True
print(e)
except BleakError as e:
notify("SensorBug", "{}".format(e))
raise SystemExit(e)
except:
notify("SensorBug", "Encountered an error.")
e = sys.exc_info()[0]
raise SystemExit(e)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment