Skip to content

Instantly share code, notes, and snippets.

@mattmahn
Created January 24, 2016 02:29
Show Gist options
  • Select an option

  • Save mattmahn/b0f09492fe7e3c2d792a to your computer and use it in GitHub Desktop.

Select an option

Save mattmahn/b0f09492fe7e3c2d792a to your computer and use it in GitHub Desktop.

Revisions

  1. mattmahn created this gist Jan 24, 2016.
    219 changes: 219 additions & 0 deletions adb.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,219 @@
    #!/usr/bin/env python3
    # coding=utf-8

    from __future__ import print_function
    import os
    import json
    import Queue
    import socket
    import threading
    import time
    import syslog
    import serial
    import math

    TCP_IP = "localhost"
    TCP_PORT = 5001

    SPEED_SERIAL_LOC = "/dev/ttyUSB0"

    LOG_FILE_NAME = "/usr/local/share/daq/log-%s.csv" % time.strftime("%b%d.%H.%M")

    ELECTRIC_SENSORS = ["speed", "joules"]
    GAS_SENSORS = ["speed", "oxy", "temp", "rpm"]

    serial_sensors = dict()

    data_points = dict(time=Queue.Queue(maxsize=0))

    data_points_live = dict(time=0.0)


    def obtain_data():
    if serial_sensors:
    # obtain the speed from the arduino (blocking until it writes, up to 2 seconds)
    for sens_type, sens in serial_sensors.items():
    raw_read = sens.readline()
    try:
    value = float(raw_read)
    # set the timestamp for the sake of accuracy
    cur_time = int(time.time() * 1000)
    data_points_live["time"] = cur_time
    except ValueError as e:
    err_msg = "corrupt_serial_read: %s" % str(e)
    syslog.syslog(syslog.LOG_ERR, err_msg)
    if __debug__:
    print(err_msg)
    value = 0.0
    # write to the queue to be picked up and logged
    data_points[sens_type].put_nowait(value)
    # update the data state snapshot for the android publish thread
    data_points_live[sens_type] = value

    # push the timestamp last, the logging thread waits for timestamp first.
    cur_time = int(time.time() * 1000)
    data_points["time"].put_nowait(cur_time)
    else:
    # if we have no sensors to read, wait a second
    time.sleep(1)


    def sensor_subscriber_thread():
    while not HALT:
    obtain_data()


    # runnable to pull off the data_point queues and log as fast as possible
    def log_data_thread():
    while not HALT:
    try:
    values = []
    for k, q in data_points.items():
    # time should be first for sorting reasons
    if k == "time":
    values = [q.get(timeout=.5)] + values
    else:
    values.append(q.get(timeout=.5))
    print(values)
    log_data(values)
    except Queue.Empty:
    pass


    # adb connection thread runnable
    # attempts to connect to android phone over adb and send whatever the latest
    # data point looked like in JSON form.
    # sends as fast as possible.
    def adb_publish_thread():
    while not HALT:
    try:
    syslog.syslog(syslog.LOG_DEBUG, "Attempting to establish adb connection...")
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    num_attempts = 0
    while not HALT:
    try:
    if __debug__:
    print("Trying to connect...")

    s.connect((TCP_IP, TCP_PORT))
    syslog.syslog(syslog.LOG_INFO, "adb connected.")

    if __debug__:
    print("Connected")
    break
    except Exception as e:
    syslog.syslog(syslog.LOG_ERR, str(e))

    if __debug__:
    print(e)

    num_attempts += 1
    # wait increases logarithmically, up to 5 seconds
    wait_time = .5 * int(math.log(num_attempts, 10))
    time.sleep(wait_time if wait_time < 5.0 else 5.0)
    continue

    while not HALT:
    try:
    s.send((json.dumps(data_points_live) + "\n").encode("utf-8"))
    time.sleep(0.01)
    except Exception as e:
    syslog.syslog(syslog.LOG_ERR, str(e))
    if __debug__:
    print(e)
    print("Reconnecting")
    break
    except Exception as sock_create_err:
    syslog.syslog(syslog.LOG_ERR, str(sock_create_err))
    finally:
    s.close()


    def log_data(values):
    log_line = ','.join(str(i) for i in values) + '\n'
    if __debug__:
    print(log_line, end="", flush=True)
    log_file.write(log_line)


    def log_message(message):
    syslog.syslog(syslog.LOG_INFO, message)
    if __debug__:
    print(message)


    def init_sensor(sensor_type):
    serial_sensors[sensor_type] = serial.Serial(SPEED_SERIAL_LOC, baudrate=9600, timeout=2)
    data_points[sensor_type] = Queue.Queue(maxsize=0)
    data_points_live[sensor_type] = 0.0


    def main():
    global HALT

    # start all of the threads
    all_threads = list(subscriber_pool.items()) + list(publisher_pool.items())
    for t_name, t in all_threads:
    if type(t) == threading.Thread:
    t.start()

    try:
    while 1:
    # heartbeat every second
    for sens, dp_queue in data_points.items():
    msg = "dp_queue_length: %s\t%d" % (sens, dp_queue.qsize())
    log_message(msg)
    for t_name, t in all_threads:
    msg = "%s_thread_alive: %r" % (t_name, t.is_alive())
    log_message(msg)
    # spinlock is bad - this is slightly less bad
    time.sleep(1)
    except KeyboardInterrupt:
    syslog.syslog(syslog.LOG_INFO, "keyboard_interrupt: quitting...")
    # queue up quitting
    HALT = True
    finally:
    for t_name, t in all_threads:
    if type(t) == threading.Thread and t.is_alive():
    t.join()
    log_file.flush()
    log_file.close()
    syslog.syslog(syslog.LOG_INFO, "done.")


    if __name__ == '__main__':

    # managing and joining multiple async inputs is difficult and messy
    subscriber_pool = {"all": threading.Thread(target=sensor_subscriber_thread, args=())}

    publisher_pool = {
    "android_connection": threading.Thread(target=adb_publish_thread, args=()),
    "logging": threading.Thread(target=log_data_thread, args=())
    }

    # aggregate different sensors based on environment variable describing car type
    car_type = os.environ.get("DAQ_CAR_TYPE")

    if car_type == "electric":
    # TODO add electric car sensor stuff
    sensors = ELECTRIC_SENSORS
    elif car_type == "gas":
    # TODO add gas car sensor stuff
    sensors = GAS_SENSORS
    elif car_type == "speed_only":
    sensors = ["speed"]
    else:
    msg = "environment: DAQ_CAR_TYPE not set. try 'gas' or 'electric'."
    syslog.syslog(syslog.LOG_ERR, msg)
    raise RuntimeError(msg)

    for sensor in sensors:
    init_sensor(sensor)

    log_file = open(LOG_FILE_NAME, mode='a')
    log_file.write("time," + ','.join(sensors) + "\n")

    HALT = False

    main()
    94 changes: 94 additions & 0 deletions std(out|err)
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,94 @@
    root@daq-e:/opt/datacollector# pypy /usr/local/src/datacollector/adb.py
    dp_queue_length: speed 0
    dp_queue_length: time 0
    all_thread_alive: True
    android_connection_thread_alive: True
    Trying to connect...
    logging_thread_alive: True
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    dp_queue_length: speed 0
    dp_queue_length: time 0
    all_thread_alive: True
    android_connection_thread_alive: True
    logging_thread_alive: True
    Trying to connect...
    [Errno 111] Connection refused
    [1453599993309L, 22.0]
    Exception in thread Thread-3:
    Traceback (most recent call last):
    File "/usr/lib/pypy/lib-python/2.7/threading.py", line 806, in __bootstrap_inner
    self.run()
    File "/usr/lib/pypy/lib-python/2.7/threading.py", line 759, in run
    self.__target(*self.__args, **self.__kwargs)
    File "/usr/local/src/datacollector/adb.py", line 78, in log_data_thread
    log_data(values)
    File "/usr/local/src/datacollector/adb.py", line 136, in log_data
    print(log_line, end="", flush=True)
    TypeError: invalid keyword arguments to print()

    Trying to connect...
    [Errno 111] Connection refused
    dp_queue_length: speed 9
    dp_queue_length: time 9
    all_thread_alive: True
    android_connection_thread_alive: True
    logging_thread_alive: False
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    dp_queue_length: speed 102
    dp_queue_length: time 102
    all_thread_alive: True
    android_connection_thread_alive: True
    logging_thread_alive: False
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    dp_queue_length: speed 135
    dp_queue_length: time 135
    all_thread_alive: True
    android_connection_thread_alive: True
    logging_thread_alive: False
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    dp_queue_length: speed 184
    dp_queue_length: time 184
    all_thread_alive: True
    android_connection_thread_alive: True
    logging_thread_alive: False
    Trying to connect...
    [Errno 111] Connection refused
    Trying to connect...
    [Errno 111] Connection refused
    dp_queue_length: speed 212
    dp_queue_length: time 212
    all_thread_alive: True
    android_connection_thread_alive: True
    logging_thread_alive: False
    Trying to connect...
    [Errno 111] Connection refused
    ^C