Created
January 24, 2016 02:29
-
-
Save mattmahn/b0f09492fe7e3c2d792a to your computer and use it in GitHub Desktop.
Revisions
-
mattmahn created this gist
Jan 24, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,219 @@ #!/usr/bin/env python3 # coding=utf-8 from __future__ import print_function import os import json import Queue import socket import threading import time import syslog import serial import math TCP_IP = "localhost" TCP_PORT = 5001 SPEED_SERIAL_LOC = "/dev/ttyUSB0" LOG_FILE_NAME = "/usr/local/share/daq/log-%s.csv" % time.strftime("%b%d.%H.%M") ELECTRIC_SENSORS = ["speed", "joules"] GAS_SENSORS = ["speed", "oxy", "temp", "rpm"] serial_sensors = dict() data_points = dict(time=Queue.Queue(maxsize=0)) data_points_live = dict(time=0.0) def obtain_data(): if serial_sensors: # obtain the speed from the arduino (blocking until it writes, up to 2 seconds) for sens_type, sens in serial_sensors.items(): raw_read = sens.readline() try: value = float(raw_read) # set the timestamp for the sake of accuracy cur_time = int(time.time() * 1000) data_points_live["time"] = cur_time except ValueError as e: err_msg = "corrupt_serial_read: %s" % str(e) syslog.syslog(syslog.LOG_ERR, err_msg) if __debug__: print(err_msg) value = 0.0 # write to the queue to be picked up and logged data_points[sens_type].put_nowait(value) # update the data state snapshot for the android publish thread data_points_live[sens_type] = value # push the timestamp last, the logging thread waits for timestamp first. cur_time = int(time.time() * 1000) data_points["time"].put_nowait(cur_time) else: # if we have no sensors to read, wait a second time.sleep(1) def sensor_subscriber_thread(): while not HALT: obtain_data() # runnable to pull off the data_point queues and log as fast as possible def log_data_thread(): while not HALT: try: values = [] for k, q in data_points.items(): # time should be first for sorting reasons if k == "time": values = [q.get(timeout=.5)] + values else: values.append(q.get(timeout=.5)) print(values) log_data(values) except Queue.Empty: pass # adb connection thread runnable # attempts to connect to android phone over adb and send whatever the latest # data point looked like in JSON form. # sends as fast as possible. def adb_publish_thread(): while not HALT: try: syslog.syslog(syslog.LOG_DEBUG, "Attempting to establish adb connection...") s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) num_attempts = 0 while not HALT: try: if __debug__: print("Trying to connect...") s.connect((TCP_IP, TCP_PORT)) syslog.syslog(syslog.LOG_INFO, "adb connected.") if __debug__: print("Connected") break except Exception as e: syslog.syslog(syslog.LOG_ERR, str(e)) if __debug__: print(e) num_attempts += 1 # wait increases logarithmically, up to 5 seconds wait_time = .5 * int(math.log(num_attempts, 10)) time.sleep(wait_time if wait_time < 5.0 else 5.0) continue while not HALT: try: s.send((json.dumps(data_points_live) + "\n").encode("utf-8")) time.sleep(0.01) except Exception as e: syslog.syslog(syslog.LOG_ERR, str(e)) if __debug__: print(e) print("Reconnecting") break except Exception as sock_create_err: syslog.syslog(syslog.LOG_ERR, str(sock_create_err)) finally: s.close() def log_data(values): log_line = ','.join(str(i) for i in values) + '\n' if __debug__: print(log_line, end="", flush=True) log_file.write(log_line) def log_message(message): syslog.syslog(syslog.LOG_INFO, message) if __debug__: print(message) def init_sensor(sensor_type): serial_sensors[sensor_type] = serial.Serial(SPEED_SERIAL_LOC, baudrate=9600, timeout=2) data_points[sensor_type] = Queue.Queue(maxsize=0) data_points_live[sensor_type] = 0.0 def main(): global HALT # start all of the threads all_threads = list(subscriber_pool.items()) + list(publisher_pool.items()) for t_name, t in all_threads: if type(t) == threading.Thread: t.start() try: while 1: # heartbeat every second for sens, dp_queue in data_points.items(): msg = "dp_queue_length: %s\t%d" % (sens, dp_queue.qsize()) log_message(msg) for t_name, t in all_threads: msg = "%s_thread_alive: %r" % (t_name, t.is_alive()) log_message(msg) # spinlock is bad - this is slightly less bad time.sleep(1) except KeyboardInterrupt: syslog.syslog(syslog.LOG_INFO, "keyboard_interrupt: quitting...") # queue up quitting HALT = True finally: for t_name, t in all_threads: if type(t) == threading.Thread and t.is_alive(): t.join() log_file.flush() log_file.close() syslog.syslog(syslog.LOG_INFO, "done.") if __name__ == '__main__': # managing and joining multiple async inputs is difficult and messy subscriber_pool = {"all": threading.Thread(target=sensor_subscriber_thread, args=())} publisher_pool = { "android_connection": threading.Thread(target=adb_publish_thread, args=()), "logging": threading.Thread(target=log_data_thread, args=()) } # aggregate different sensors based on environment variable describing car type car_type = os.environ.get("DAQ_CAR_TYPE") if car_type == "electric": # TODO add electric car sensor stuff sensors = ELECTRIC_SENSORS elif car_type == "gas": # TODO add gas car sensor stuff sensors = GAS_SENSORS elif car_type == "speed_only": sensors = ["speed"] else: msg = "environment: DAQ_CAR_TYPE not set. try 'gas' or 'electric'." syslog.syslog(syslog.LOG_ERR, msg) raise RuntimeError(msg) for sensor in sensors: init_sensor(sensor) log_file = open(LOG_FILE_NAME, mode='a') log_file.write("time," + ','.join(sensors) + "\n") HALT = False main() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,94 @@ root@daq-e:/opt/datacollector# pypy /usr/local/src/datacollector/adb.py dp_queue_length: speed 0 dp_queue_length: time 0 all_thread_alive: True android_connection_thread_alive: True Trying to connect... logging_thread_alive: True [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused dp_queue_length: speed 0 dp_queue_length: time 0 all_thread_alive: True android_connection_thread_alive: True logging_thread_alive: True Trying to connect... [Errno 111] Connection refused [1453599993309L, 22.0] Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib/pypy/lib-python/2.7/threading.py", line 806, in __bootstrap_inner self.run() File "/usr/lib/pypy/lib-python/2.7/threading.py", line 759, in run self.__target(*self.__args, **self.__kwargs) File "/usr/local/src/datacollector/adb.py", line 78, in log_data_thread log_data(values) File "/usr/local/src/datacollector/adb.py", line 136, in log_data print(log_line, end="", flush=True) TypeError: invalid keyword arguments to print() Trying to connect... [Errno 111] Connection refused dp_queue_length: speed 9 dp_queue_length: time 9 all_thread_alive: True android_connection_thread_alive: True logging_thread_alive: False Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused dp_queue_length: speed 102 dp_queue_length: time 102 all_thread_alive: True android_connection_thread_alive: True logging_thread_alive: False Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused dp_queue_length: speed 135 dp_queue_length: time 135 all_thread_alive: True android_connection_thread_alive: True logging_thread_alive: False Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused dp_queue_length: speed 184 dp_queue_length: time 184 all_thread_alive: True android_connection_thread_alive: True logging_thread_alive: False Trying to connect... [Errno 111] Connection refused Trying to connect... [Errno 111] Connection refused dp_queue_length: speed 212 dp_queue_length: time 212 all_thread_alive: True android_connection_thread_alive: True logging_thread_alive: False Trying to connect... [Errno 111] Connection refused ^C