Last active
November 16, 2016 05:17
-
-
Save jeffutter/8d46213abf3d3448c7cd2b7711d02403 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule Thermostat.TempSensor.History do | |
| require Logger | |
| use GenServer | |
| use Timex | |
| alias Thermostat.TempSensor.Entry, as: Entry | |
| defstruct [:onewire_port, :onewire_id, :duration, :data, :dirty] | |
| @history_duration 60 * 60 * 24 | |
| @pause_ms 60000 | |
| def start_link(onewire_port, onewire_id) do | |
| GenServer.start_link(__MODULE__, [onewire_port, onewire_id], name: via_tuple(onewire_port, onewire_id)) | |
| end | |
| def via_tuple(onewire_port, onewire_id) do | |
| {:via, :gproc, {:n, :l, {:temp_sensor_history, onewire_port, onewire_id}}} | |
| end | |
| def init([onewire_port, onewire_id]) do | |
| history = case read_history(onewire_port, onewire_id) do | |
| {:ok, history} -> history | |
| _ -> default(onewire_port, onewire_id) | |
| end | |
| Process.send_after(self, :update, @pause_ms) | |
| {:ok, history} | |
| end | |
| defp default(onewire_port, onewire_id) do | |
| %__MODULE__{onewire_port: onewire_port, onewire_id: onewire_id, duration: @history_duration, data: [], dirty: false} | |
| end | |
| def add(onewire_port, onewire_id, entry) do | |
| via_tuple(onewire_port, onewire_id) | |
| |> GenServer.cast({:add, entry}) | |
| end | |
| def get_state(onewire_port, onewire_id) do | |
| via_tuple(onewire_port, onewire_id) | |
| |> GenServer.call(:get_state) | |
| end | |
| def handle_info(:update, state) do | |
| if state.dirty do | |
| state | |
| |> write_history(state.onewire_port, state.onewire_id) | |
| end | |
| Process.send_after(self, :update, @pause_ms) | |
| {:noreply, %__MODULE__{state | dirty: false}} | |
| end | |
| def handle_call(:get_state, _f, state) do | |
| {:reply, state, state} | |
| end | |
| def handle_cast({:add, entry}, state) do | |
| data = state.data |> trim_duration(state.duration) | |
| first = data |> Enum.at(0) | |
| second = data |> Enum.at(1) | |
| rest = Enum.slice(data, 2..(length(data))) | |
| data = add(first, second, rest, entry) | |
| dirty = state.dirty || state.data != data | |
| {:noreply, %__MODULE__{state | data: data, dirty: dirty}} | |
| end | |
| defp add(nil, nil, [], entry) do | |
| [entry] | |
| end | |
| defp add(%Entry{temp: first_temp} = first, second, rest, %Entry{temp: entry_temp} = entry) when entry_temp != first_temp do | |
| [entry, first, second] | |
| |> compact | |
| |> append(rest) | |
| end | |
| defp add(%Entry{temp: first_temp} = first, nil, rest, %Entry{temp: entry_temp} = entry) when entry_temp == first_temp do | |
| [entry, first] | |
| |> compact | |
| |> append(rest) | |
| end | |
| defp add(%Entry{temp: first_temp}, %Entry{temp: second_temp} = second, rest, %Entry{temp: entry_temp} = entry) when entry_temp == first_temp and first_temp == second_temp do | |
| [entry, second] | |
| |> compact | |
| |> append(rest) | |
| end | |
| defp add(%Entry{temp: first_temp} = first, %Entry{temp: second_temp} = second, rest, %Entry{temp: entry_temp} = entry) when entry_temp == first_temp and first_temp != second_temp do | |
| [entry, first, second] | |
| |> compact | |
| |> append(rest) | |
| end | |
| defp add(_first, _second, _rest, _entry) do | |
| [] | |
| end | |
| defp compact(list) do | |
| list | |
| |> Enum.filter(fn (x) -> x end) | |
| end | |
| defp append(h,t) do | |
| h ++ t | |
| end | |
| defp trim_duration(data, duration, reversed \\ false) | |
| defp trim_duration(data, duration, false) do | |
| data | |
| |> Enum.reverse | |
| |> trim_duration(duration, true) | |
| |> Enum.reverse | |
| end | |
| defp trim_duration(data, duration, true) do | |
| case List.first(data) do | |
| %Entry{time: time} -> | |
| if Timex.shift(Timex.now, seconds: -duration) > Timex.parse!(time, "{ISO:Extended}") do | |
| data | |
| |> List.delete_at(0) | |
| |> trim_duration(duration, true) | |
| else | |
| data | |
| end | |
| nil -> data | |
| _ -> data | |
| end | |
| end | |
| def latest(history) do | |
| [head|_tail] = history.data | |
| head | |
| end | |
| def to_list(history) do | |
| data = history.data | |
| |> pmap(&Entry.to_map/1) | |
| data | |
| end | |
| defp pmap(collection, function) do | |
| collection | |
| |> Enum.map(&Task.async(fn -> function.(&1) end)) | |
| |> Enum.map(&Task.await(&1)) | |
| end | |
| defp history_location(onewire_port, onewire_id) do | |
| File.mkdir_p!("/root/history/#{onewire_port}") | |
| "/root/history/#{onewire_port}/#{onewire_id}" | |
| end | |
| defp read_history(onewire_port, onewire_id) do | |
| hl = history_location(onewire_port, onewire_id) | |
| case File.read(hl) do | |
| {:ok, serialized} -> | |
| case unserialize(serialized) do | |
| {:ok, history} -> {:ok, history} | |
| {:error, _} -> | |
| Logger.error "Error Parsing #{hl}" | |
| {:error, "Parse Error"} | |
| end | |
| {:error, _} -> | |
| Logger.error "Error Loading #{hl}" | |
| {:error, "Load Error"} | |
| end | |
| end | |
| def write_history(history, onewire_port, onewire_id) do | |
| hl = history_location(onewire_port, onewire_id) | |
| case serialize(history) do | |
| {:ok, serialized} -> | |
| case File.write(hl, serialized, [:sync]) do | |
| {:error, _} -> | |
| Logger.error "Error Writing #{hl}" | |
| history | |
| :ok -> | |
| history | |
| end | |
| {:error, _} -> | |
| Logger.error "Error Serializing History" | |
| history | |
| end | |
| end | |
| def serialize(history) do | |
| {:ok, :erlang.term_to_binary(history)} | |
| end | |
| def unserialize(string) do | |
| {:ok, :erlang.binary_to_term(string)} | |
| end | |
| end | |
| defimpl Msgpax.Packer, for: Thermostat.TempSensor.History do | |
| def pack(history) do | |
| history | |
| |> Thermostat.TempSensor.History.to_list | |
| |> @protocol.List.pack | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment