Skip to content

Instantly share code, notes, and snippets.

@Arithmomaniac
Last active March 29, 2026 06:49
Show Gist options
  • Select an option

  • Save Arithmomaniac/616709f9d66d955f3eaa82433dbb8dd7 to your computer and use it in GitHub Desktop.

Select an option

Save Arithmomaniac/616709f9d66d955f3eaa82433dbb8dd7 to your computer and use it in GitHub Desktop.
Pikud HaOref Alert Display for Jetson Nano - fullscreen green/orange/red based on oref.org.il API (Beit Shemesh)
#!/usr/bin/env python3
"""
Pikud HaOref Alert Display Server for Jetson Nano.
Single-file server: polls oref.org.il API, serves fullscreen color page.
GREEN = all clear (default)
ORANGE = early warning / news flash
RED = active siren (missiles, terror, etc.)
Alert logic ported from amitfin/oref_alert Home Assistant integration.
"""
import http.server
import json
import threading
import time
import urllib.request
import ssl
import sys
from datetime import datetime, timedelta
# --- Configuration ---
CITY = "בית שמש"
PORT = 8080
# Polling (matches HA OrefAlertCoordinatorUpdater)
POLL_INTERVAL_ACTIVE = 2 # during/after alerts
POLL_INTERVAL_IDLE = 20 # when idle
ACTIVE_WINDOW = 600 # 10 min after last alert, keep fast polling
# Record expiration (matches HA RECORD_EXPIRATION_MINUTES)
ALERT_EXPIRY_MIN = 180 # 3 hours
PRE_ALERT_EXPIRY_MIN = 20 # 20 min
# Dedup window (matches HA DEDUP_WINDOW_SECONDS)
DEDUP_WINDOW = 180
# "Missed us" — if cohort cities' siren count stabilizes for this long,
# assume the threat passed and show yellow-orange
COHORT_STABLE_SECONDS = 90
# --- Category mappings (from HA categories.py) ---
# Real-time API (Alerts.json) uses field "cat"
# History API (AlertsHistory.json) uses field "category"
# They use DIFFERENT numbering schemes!
# Real-time -> history mapping (from HA REAL_TIME_TO_HISTORY_CATEGORY)
RT_TO_HIST = {
1: 1, # missile
3: 7, # earthquake
4: 9, # radiological
5: 11, # tsunami
6: 2, # aircraftIntrusion
7: 12, # hazardousMaterials
13: 10, # terroristInfiltration
}
# HA category metadata: (icon, emoji, is_alert)
# History categories where is_alert=True -> RED
HIST_ALERT_CATEGORIES = {1, 2, 3, 4, 7, 8, 9, 10, 11, 12}
# History cat 13 = update/end-alert -> GREEN
HIST_END_CATEGORY = 13
# History cat 14 = flash/pre-alert -> ORANGE
HIST_PRE_ALERT_CATEGORY = 14
# Real-time cat 10 is a MESSAGE category (HA: REAL_TIME_MESSAGE_CATEGORY)
# If title contains this word, it's a pre-alert (HA: REAL_TIME_PRE_ALERT_WORD)
RT_MESSAGE_CATEGORY = 10
PRE_ALERT_WORD = "בדקות"
# --- Record types (from HA RecordType) ---
ALERT = "alert"
PRE_ALERT = "pre_alert"
END = "end"
# --- URLs ---
API_URL = "https://www.oref.org.il/warningMessages/alert/Alerts.json"
HISTORY_URL = "https://www.oref.org.il/warningMessages/alert/History/AlertsHistory.json"
API_HEADERS = {
"Referer": "https://www.oref.org.il/",
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0",
}
# --- State ---
# Single record for our city (simplified from HA's per-area dict)
# {type, category, time, expire, title} or None
current_record = None
lock = threading.Lock()
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
# Track seen history records to avoid re-logging (key = "cat|date|title")
seen_history = set()
# In-memory event log for UI overlay (capped at 50 entries)
MAX_EVENTS = 50
event_log = []
last_rt_key = None # dedup repeated RT polls ("type|cat|title")
# "Missed us" cohort tracking
# When Beit Shemesh gets a PRE_ALERT, the other cities in the same alert
# entry are the "cohort". If cohort cities get sirens and no new ones appear
# for COHORT_STABLE_SECONDS, we show yellow-orange ("probably passed").
cohort_cities = set() # cities from the same alert as our PRE_ALERT
siren_cohort_cities = set() # subset of cohort that got sirens
siren_cohort_stable_since = None # when siren_cohort_cities last grew
def add_event(color, title, source, changed=False, event_time=None):
"""Append an event to the in-memory log (thread-safe, call inside lock).
event_time: optional unix timestamp for displayed time (e.g. from HIST alertDate).
Defaults to now. Sort order always uses receipt time (time.time())."""
now = time.time()
display_dt = datetime.fromtimestamp(event_time if event_time else now)
event_log.append({
"time": now,
"time_str": display_dt.strftime("%H:%M"),
"date_str": display_dt.strftime("%d/%m"),
"color": color,
"title": title,
"source": source,
"changed": changed,
})
if len(event_log) > MAX_EVENTS:
del event_log[0]
def get_color():
"""Derive display color from current record."""
if current_record is None:
return "green"
t = current_record["type"]
if t == ALERT:
return "red"
elif t == PRE_ALERT:
# "Missed us" logic: if cohort cities have sirens and count is stable
if (siren_cohort_cities and siren_cohort_stable_since and
time.time() - siren_cohort_stable_since >= COHORT_STABLE_SECONDS):
return "yellow_orange"
return "orange"
return "green"
def make_record(record_type, category, title, record_time=None):
"""Create a record with expiry based on type (matches HA add_metadata)."""
if record_time is None:
record_time = time.time()
if record_type == ALERT:
expire = record_time + ALERT_EXPIRY_MIN * 60
elif record_type == PRE_ALERT:
expire = record_time + PRE_ALERT_EXPIRY_MIN * 60
else:
expire = 0 # END records don't expire
return {
"type": record_type,
"category": category,
"time": record_time,
"expire": expire,
"title": title,
}
def should_update(new_record):
"""Decide if new_record should replace current_record (matches HA logic)."""
global current_record
if current_record is None:
return True
cur = current_record
# PRE_ALERT won't overwrite ALERT (from HA: pre after alert -> continue)
if new_record["type"] == PRE_ALERT and cur["type"] == ALERT:
return False
# END replaces only if it's NEWER than current record
# (prevents old END from clearing a newer ALERT)
if new_record["type"] == END:
return new_record["time"] > cur["time"]
# Newer record replaces older
if new_record["time"] <= cur["time"]:
return False
# Dedup: same category within DEDUP_WINDOW ignored (from HA)
if (new_record["category"] == cur["category"] and
new_record["time"] - cur["time"] <= DEDUP_WINDOW):
return False
return True
def remove_expired():
"""Replace expired records with synthetic END (matches HA _remove_expired)."""
global current_record, cohort_cities, siren_cohort_cities, siren_cohort_stable_since
if current_record is None:
return
if current_record["expire"] > 0 and time.time() > current_record["expire"]:
old_type = current_record["type"]
current_record = make_record(END, HIST_END_CATEGORY, "expired")
cohort_cities = set()
siren_cohort_cities = set()
siren_cohort_stable_since = None
add_event("green", "expired ({})".format(old_type), "SYS", changed=True)
print("EXPIRED: {} record -> END".format(old_type), file=sys.stderr)
def classify_realtime(cat, title):
"""Classify a real-time API alert (matches HA _current_to_history_format)."""
# Cat 10 = message category; check title for pre-alert word or end phrase
if cat == RT_MESSAGE_CATEGORY:
if PRE_ALERT_WORD in title:
return PRE_ALERT, HIST_PRE_ALERT_CATEGORY
elif "הסתיים" in title:
# All-clear message ("האירוע הסתיים") → END
return END, HIST_END_CATEGORY
else:
# Terror attack in history = cat 10
return ALERT, RT_TO_HIST.get(cat, cat)
# Map real-time cat to history cat
hist_cat = RT_TO_HIST.get(cat, None)
if hist_cat is None:
return None, None # unknown category, skip
if hist_cat in HIST_ALERT_CATEGORIES:
return ALERT, hist_cat
return None, None
def classify_history(cat):
"""Classify a history API record."""
if cat == HIST_END_CATEGORY:
return END
elif cat == HIST_PRE_ALERT_CATEGORY:
return PRE_ALERT
elif cat in HIST_ALERT_CATEGORIES:
return ALERT
return None
def parse_time(date_str):
"""Parse alert date string to unix timestamp."""
try:
dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
return dt.timestamp()
except Exception:
return None
def decode_response(raw):
"""Decode API response, handling BOM and encoding quirks."""
return raw.decode('utf-8-sig').replace('\x00', '').strip()
def fetch_url(url, timeout=10):
"""Fetch and decode a URL, return parsed JSON or None."""
try:
req = urllib.request.Request(url, headers=API_HEADERS)
with urllib.request.urlopen(req, timeout=timeout, context=ssl_ctx) as resp:
text = decode_response(resp.read())
if not text:
return None
return json.loads(text)
except Exception as e:
print("Fetch error ({}): {}".format(url.split('/')[-1], e), file=sys.stderr)
return None
def process_realtime():
"""Poll Alerts.json and process real-time alerts for our city.
Returns True if ANY alerts are active (even for other cities)."""
global current_record, cohort_cities, siren_cohort_cities, siren_cohort_stable_since
data = fetch_url(API_URL, timeout=5)
if data is None:
return False
if isinstance(data, dict):
data = [data]
if not isinstance(data, list):
return False
any_active = len(data) > 0
# If no active alerts, clear cohort tracking
if not any_active:
if cohort_cities:
cohort_cities = set()
siren_cohort_cities = set()
siren_cohort_stable_since = None
return False
# --- Pass 1: process alerts for our city ---
for alert in data:
cat = alert.get("cat", 0)
if isinstance(cat, str):
cat = int(cat)
areas = alert.get("data", [])
if isinstance(areas, str):
areas = [areas]
title = alert.get("title", "")
if CITY not in areas:
continue
record_type, hist_cat = classify_realtime(cat, title)
if record_type is None:
print("RT: unknown cat={} title={} (skipped)".format(cat, title),
file=sys.stderr)
continue
new_rec = make_record(record_type, hist_cat, title)
print("RT: cat={} type={} title={}".format(cat, record_type, title),
file=sys.stderr)
with lock:
# Dedup repeated RT polls (same alert every 2s)
global last_rt_key
rt_key = "{}|{}|{}".format(record_type, hist_cat, title)
is_new_rt = (rt_key != last_rt_key)
last_rt_key = rt_key
if not is_new_rt:
# Same RT message as last poll — skip entirely
continue
# When we get a PRE_ALERT, record the cohort cities
if record_type == PRE_ALERT:
new_cohort = set(areas) - {CITY}
if new_cohort != cohort_cities:
cohort_cities = new_cohort
siren_cohort_cities = set()
siren_cohort_stable_since = None
print("COHORT: tracking {} cities: {}".format(
len(cohort_cities),
", ".join(sorted(cohort_cities))), file=sys.stderr)
# If state is leaving PRE_ALERT, clear cohort
if record_type != PRE_ALERT and should_update(new_rec):
cohort_cities = set()
siren_cohort_cities = set()
siren_cohort_stable_since = None
if should_update(new_rec):
prev = get_color()
current_record = new_rec
now_color = get_color()
changed = (now_color != prev)
if changed:
print("{} -> {} (RT cat={} type={})".format(
prev.upper(), now_color.upper(), cat, record_type),
file=sys.stderr)
add_event(now_color, title, "RT", changed=changed)
else:
add_event(get_color(), title, "RT", changed=False)
# --- Pass 2: check if cohort cities have sirens ---
if cohort_cities and current_record and current_record["type"] == PRE_ALERT:
for alert in data:
cat = alert.get("cat", 0)
if isinstance(cat, str):
cat = int(cat)
areas = alert.get("data", [])
if isinstance(areas, str):
areas = [areas]
# Only consider ALERT-type entries
record_type, _ = classify_realtime(cat, alert.get("title", ""))
if record_type != ALERT:
continue
# Check which cohort cities are in this siren
newly_hit = (set(areas) & cohort_cities) - siren_cohort_cities
if newly_hit:
with lock:
siren_cohort_cities |= newly_hit
siren_cohort_stable_since = time.time()
print("COHORT SIREN: {} new: {} (total {}/{})".format(
", ".join(sorted(newly_hit)),
len(newly_hit), len(siren_cohort_cities),
len(cohort_cities)), file=sys.stderr)
# Check if we just transitioned to yellow_orange
with lock:
color_now = get_color()
if color_now == "yellow_orange":
# Check if we already logged this transition
last_evt = event_log[-1] if event_log else None
if not last_evt or last_evt.get("color") != "yellow_orange":
add_event("yellow_orange", "כנראה עבר", "SYS", changed=True)
print("ORANGE -> YELLOW_ORANGE (cohort stable {}s)".format(
COHORT_STABLE_SECONDS), file=sys.stderr)
return any_active
def process_history():
"""Poll AlertsHistory.json for our city (catchup + end detection)."""
global current_record
data = fetch_url(HISTORY_URL)
if data is None or not isinstance(data, list):
return
for record in data:
area = record.get("data", "")
if area != CITY:
continue
cat = record.get("category", 0)
if isinstance(cat, str):
cat = int(cat)
date_str = record.get("alertDate", "")
title = record.get("title", "")
record_time = parse_time(date_str)
if record_time is None:
continue
# Dedup: only log each history record once
hist_key = "{}|{}|{}".format(cat, date_str, title)
is_new_hist = hist_key not in seen_history
if is_new_hist:
seen_history.add(hist_key)
record_type_log = classify_history(cat)
type_str = record_type_log if record_type_log else "unknown"
print("HIST: cat={} type={} date={} title={}".format(
cat, type_str, date_str, title), file=sys.stderr)
record_type = classify_history(cat)
if record_type is None:
continue
new_rec = make_record(record_type, cat, title, record_time)
with lock:
if should_update(new_rec):
prev = get_color()
current_record = new_rec
now_color = get_color()
changed = (now_color != prev)
if changed:
print("{} -> {} (HIST cat={} type={} date={})".format(
prev.upper(), now_color.upper(), cat, record_type,
date_str), file=sys.stderr)
if is_new_hist or changed:
add_event(now_color, title, "HIST", changed=changed,
event_time=record_time)
elif is_new_hist:
add_event(get_color(), title, "HIST", changed=False,
event_time=record_time)
# Only process the first matching record (newest)
break
def poll_loop():
"""Main polling loop matching HA OrefAlertCoordinatorUpdater."""
poll_count = 0
while True:
# Remove expired records (matches HA _remove_expired)
with lock:
remove_expired()
# Process real-time alerts
any_active = process_realtime()
# Periodically check history (every ~30s when in alert, for end detection)
poll_count += 1
if any_active and (current_record is None or current_record["type"] == END):
# Alerts active elsewhere — check history immediately for our city
process_history()
elif current_record is not None and current_record["type"] != END:
if poll_count % 15 == 0: # every 15 polls = ~30s at 2s interval
process_history()
elif poll_count % 3 == 0: # every ~1min when idle
process_history()
# Adaptive polling (matches HA OrefAlertCoordinatorUpdater._async_update)
now = time.time()
rec = current_record
if rec and rec["type"] in (ALERT, PRE_ALERT):
time.sleep(POLL_INTERVAL_ACTIVE)
elif rec and (now - rec["time"]) < ACTIVE_WINDOW:
time.sleep(POLL_INTERVAL_ACTIVE)
else:
time.sleep(POLL_INTERVAL_IDLE)
HTML_PAGE = u"""<!DOCTYPE html>
<html lang="he" dir="rtl">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1">
<title>Alert Display</title>
<style>
*{margin:0;padding:0;box-sizing:border-box}
html,body{width:100vw;height:100vh;overflow:hidden;font-family:Arial,sans-serif;
transition:background-color 0.5s ease}
body.green{background:#00c853}
body.orange{background:#ff9100}
body.yellow_orange{background:#ffab00}
body.red{background:#d50000}
.c{display:flex;flex-direction:column;justify-content:center;align-items:center;
height:100vh;color:#fff;text-shadow:2px 2px 8px rgba(0,0,0,.5)}
.s{font-size:10vw;font-weight:bold}
.clockrow{display:flex;align-items:baseline;margin-top:2vh;direction:ltr}
.clock{font-size:7vw;font-weight:300;font-variant-numeric:tabular-nums}
.sec{font-size:3vw;font-weight:300;opacity:.8;font-variant-numeric:tabular-nums}
.err{font-size:1.5vw;margin-top:2vh;color:rgba(255,255,255,.7);display:none}
.err.show{display:block}
.evlog{position:fixed;bottom:1.5vh;right:1.5vw;background:rgba(0,0,0,.25);
border-radius:8px;padding:8px 12px;color:#fff;font-size:1.3vw;direction:rtl;
max-width:35vw;pointer-events:none}
.evlog h3{font-size:1.1vw;opacity:.7;margin:0 0 4px 0;font-weight:400}
.evrow{display:flex;align-items:center;gap:6px;padding:2px 0}
.evrow.noticed{color:rgba(255,255,255,.5)}
.evdot{width:0.9vw;height:0.9vw;border-radius:50%;flex-shrink:0}
.evdot.green{background:#00c853}.evdot.orange{background:#ff9100}.evdot.yellow_orange{background:#ffab00}.evdot.red{background:#d50000}
.evtime{font-size:1.1vw;opacity:.7;font-variant-numeric:tabular-nums;direction:ltr}
.evtitle{font-size:1.1vw;white-space:nowrap;overflow:hidden;text-overflow:ellipsis}
.evsrc{font-size:0.9vw;opacity:.5}
</style>
</head>
<body style="background:#222">
<div class="c" id="main" style="display:none">
<div class="s" id="s">\u05d4\u05db\u05dc \u05ea\u05e7\u05d9\u05df</div>
<div class="clockrow"><span class="clock" id="clock"></span><span class="sec" id="sec"></span></div>
<div class="err" id="err"></div>
</div>
<div class="evlog" id="evlog" style="display:none">
<h3>\u05d0\u05d9\u05e8\u05d5\u05e2\u05d9\u05dd \u05d0\u05d7\u05e8\u05d5\u05e0\u05d9\u05dd</h3>
<div id="evrows"></div>
</div>
<script>
var S={"green":"\u05d4\u05db\u05dc \u05ea\u05e7\u05d9\u05df",
"orange":"\u05d4\u05ea\u05e8\u05d0\u05d4 \u05de\u05d5\u05e7\u05d3\u05de\u05ea",
"yellow_orange":"\u05db\u05e0\u05e8\u05d0\u05d4 \u05e2\u05d1\u05e8",
"red":"\u05d0\u05d6\u05e2\u05e7\u05d4!"};
var fails=0;
function u(){fetch("/status").then(function(r){return r.json()}).then(function(d){
fails=0;
document.getElementById("err").className="err";
document.body.className=d.color;
document.body.style.background="";
document.getElementById("main").style.display="";
document.getElementById("s").textContent=S[d.color]||d.color;
}).catch(function(){
fails++;
if(fails>=3){
document.getElementById("err").className="err show";
document.getElementById("err").textContent="\u26a0 \u05d0\u05d9\u05df \u05d7\u05d9\u05d1\u05d5\u05e8 \u05dc\u05e9\u05e8\u05ea";
}
});}
setInterval(u,2000);u();
function loadEvents(){fetch("/events").then(function(r){return r.json()}).then(function(evts){
var el=document.getElementById("evlog");
var rows=document.getElementById("evrows");
if(!evts||evts.length===0){el.style.display="none";return;}
el.style.display="";
var h="";
for(var i=evts.length-1;i>=0;i--){var e=evts[i];
var cls=e.changed?"evrow":"evrow noticed";
var arrow=e.changed?"\u25c0 ":"";
h+='<div class="'+cls+'"><span class="evdot '+e.color+'"></span>'
+'<span class="evtime">'+e.date_str+" "+e.time_str+'</span>'
+'<span class="evtitle">'+arrow+e.title+'</span>'
+'<span class="evsrc">'+e.source+'</span></div>';
}
rows.innerHTML=h;
}).catch(function(){});}
setInterval(loadEvents,5000);loadEvents();
function tick(){var n=new Date();var h=String(n.getHours()).padStart(2,"0");var m=String(n.getMinutes()).padStart(2,"0");var s=String(n.getSeconds()).padStart(2,"0");document.getElementById("clock").textContent=h+":"+m;document.getElementById("sec").textContent=":"+s;}
setInterval(tick,1000);tick();
</script>
</body>
</html>"""
class AlertServer(http.server.HTTPServer):
allow_reuse_address = True
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/status":
with lock:
data = json.dumps({"color": get_color()})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(data.encode())
elif self.path == "/debug":
with lock:
rec = dict(current_record) if current_record else None
if rec and "time" in rec:
rec["time_str"] = datetime.fromtimestamp(rec["time"]).strftime("%H:%M:%S")
if rec and "expire" in rec and rec["expire"] > 0:
rec["expire_str"] = datetime.fromtimestamp(rec["expire"]).strftime("%H:%M:%S")
data = json.dumps({
"color": get_color(), "record": rec,
"cohort": {
"cities": sorted(cohort_cities),
"siren_cities": sorted(siren_cohort_cities),
"stable_since": siren_cohort_stable_since,
} if cohort_cities else None
}, ensure_ascii=False)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(data.encode())
elif self.path == "/events":
with lock:
data = json.dumps(event_log[-10:], ensure_ascii=False)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(data.encode())
elif self.path in ("/", "/index.html"):
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(HTML_PAGE.encode("utf-8"))
else:
self.send_error(404)
def log_message(self, fmt, *args):
pass
if __name__ == "__main__":
print("Starting Pikud HaOref alert display for Beit Shemesh...")
print("Checking alert history for catchup...")
process_history()
color = get_color()
print("Current state after catchup: {}".format(color))
# Seed initial event so overlay is never empty
add_event(color, "server started", "SYS", changed=False)
t = threading.Thread(target=poll_loop)
t.daemon = True
t.start()
print("Serving on http://0.0.0.0:{}".format(PORT))
srv = AlertServer(("0.0.0.0", PORT), Handler)
try:
srv.serve_forever()
except KeyboardInterrupt:
print("\nShutdown.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment