Last active
March 29, 2026 06:49
-
-
Save Arithmomaniac/616709f9d66d955f3eaa82433dbb8dd7 to your computer and use it in GitHub Desktop.
Pikud HaOref Alert Display for Jetson Nano - fullscreen green/orange/red based on oref.org.il API (Beit Shemesh)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Pikud HaOref Alert Display Server for Jetson Nano. | |
| Single-file server: polls oref.org.il API, serves fullscreen color page. | |
| GREEN = all clear (default) | |
| ORANGE = early warning / news flash | |
| RED = active siren (missiles, terror, etc.) | |
| Alert logic ported from amitfin/oref_alert Home Assistant integration. | |
| """ | |
| import http.server | |
| import json | |
| import threading | |
| import time | |
| import urllib.request | |
| import ssl | |
| import sys | |
| from datetime import datetime, timedelta | |
| # --- Configuration --- | |
| CITY = "בית שמש" | |
| PORT = 8080 | |
| # Polling (matches HA OrefAlertCoordinatorUpdater) | |
| POLL_INTERVAL_ACTIVE = 2 # during/after alerts | |
| POLL_INTERVAL_IDLE = 20 # when idle | |
| ACTIVE_WINDOW = 600 # 10 min after last alert, keep fast polling | |
| # Record expiration (matches HA RECORD_EXPIRATION_MINUTES) | |
| ALERT_EXPIRY_MIN = 180 # 3 hours | |
| PRE_ALERT_EXPIRY_MIN = 20 # 20 min | |
| # Dedup window (matches HA DEDUP_WINDOW_SECONDS) | |
| DEDUP_WINDOW = 180 | |
| # "Missed us" — if cohort cities' siren count stabilizes for this long, | |
| # assume the threat passed and show yellow-orange | |
| COHORT_STABLE_SECONDS = 90 | |
| # --- Category mappings (from HA categories.py) --- | |
| # Real-time API (Alerts.json) uses field "cat" | |
| # History API (AlertsHistory.json) uses field "category" | |
| # They use DIFFERENT numbering schemes! | |
| # Real-time -> history mapping (from HA REAL_TIME_TO_HISTORY_CATEGORY) | |
| RT_TO_HIST = { | |
| 1: 1, # missile | |
| 3: 7, # earthquake | |
| 4: 9, # radiological | |
| 5: 11, # tsunami | |
| 6: 2, # aircraftIntrusion | |
| 7: 12, # hazardousMaterials | |
| 13: 10, # terroristInfiltration | |
| } | |
| # HA category metadata: (icon, emoji, is_alert) | |
| # History categories where is_alert=True -> RED | |
| HIST_ALERT_CATEGORIES = {1, 2, 3, 4, 7, 8, 9, 10, 11, 12} | |
| # History cat 13 = update/end-alert -> GREEN | |
| HIST_END_CATEGORY = 13 | |
| # History cat 14 = flash/pre-alert -> ORANGE | |
| HIST_PRE_ALERT_CATEGORY = 14 | |
| # Real-time cat 10 is a MESSAGE category (HA: REAL_TIME_MESSAGE_CATEGORY) | |
| # If title contains this word, it's a pre-alert (HA: REAL_TIME_PRE_ALERT_WORD) | |
| RT_MESSAGE_CATEGORY = 10 | |
| PRE_ALERT_WORD = "בדקות" | |
| # --- Record types (from HA RecordType) --- | |
| ALERT = "alert" | |
| PRE_ALERT = "pre_alert" | |
| END = "end" | |
| # --- URLs --- | |
| API_URL = "https://www.oref.org.il/warningMessages/alert/Alerts.json" | |
| HISTORY_URL = "https://www.oref.org.il/warningMessages/alert/History/AlertsHistory.json" | |
| API_HEADERS = { | |
| "Referer": "https://www.oref.org.il/", | |
| "X-Requested-With": "XMLHttpRequest", | |
| "User-Agent": "Mozilla/5.0", | |
| } | |
| # --- State --- | |
| # Single record for our city (simplified from HA's per-area dict) | |
| # {type, category, time, expire, title} or None | |
| current_record = None | |
| lock = threading.Lock() | |
| ssl_ctx = ssl.create_default_context() | |
| ssl_ctx.check_hostname = False | |
| ssl_ctx.verify_mode = ssl.CERT_NONE | |
| # Track seen history records to avoid re-logging (key = "cat|date|title") | |
| seen_history = set() | |
| # In-memory event log for UI overlay (capped at 50 entries) | |
| MAX_EVENTS = 50 | |
| event_log = [] | |
| last_rt_key = None # dedup repeated RT polls ("type|cat|title") | |
| # "Missed us" cohort tracking | |
| # When Beit Shemesh gets a PRE_ALERT, the other cities in the same alert | |
| # entry are the "cohort". If cohort cities get sirens and no new ones appear | |
| # for COHORT_STABLE_SECONDS, we show yellow-orange ("probably passed"). | |
| cohort_cities = set() # cities from the same alert as our PRE_ALERT | |
| siren_cohort_cities = set() # subset of cohort that got sirens | |
| siren_cohort_stable_since = None # when siren_cohort_cities last grew | |
| def add_event(color, title, source, changed=False, event_time=None): | |
| """Append an event to the in-memory log (thread-safe, call inside lock). | |
| event_time: optional unix timestamp for displayed time (e.g. from HIST alertDate). | |
| Defaults to now. Sort order always uses receipt time (time.time()).""" | |
| now = time.time() | |
| display_dt = datetime.fromtimestamp(event_time if event_time else now) | |
| event_log.append({ | |
| "time": now, | |
| "time_str": display_dt.strftime("%H:%M"), | |
| "date_str": display_dt.strftime("%d/%m"), | |
| "color": color, | |
| "title": title, | |
| "source": source, | |
| "changed": changed, | |
| }) | |
| if len(event_log) > MAX_EVENTS: | |
| del event_log[0] | |
| def get_color(): | |
| """Derive display color from current record.""" | |
| if current_record is None: | |
| return "green" | |
| t = current_record["type"] | |
| if t == ALERT: | |
| return "red" | |
| elif t == PRE_ALERT: | |
| # "Missed us" logic: if cohort cities have sirens and count is stable | |
| if (siren_cohort_cities and siren_cohort_stable_since and | |
| time.time() - siren_cohort_stable_since >= COHORT_STABLE_SECONDS): | |
| return "yellow_orange" | |
| return "orange" | |
| return "green" | |
| def make_record(record_type, category, title, record_time=None): | |
| """Create a record with expiry based on type (matches HA add_metadata).""" | |
| if record_time is None: | |
| record_time = time.time() | |
| if record_type == ALERT: | |
| expire = record_time + ALERT_EXPIRY_MIN * 60 | |
| elif record_type == PRE_ALERT: | |
| expire = record_time + PRE_ALERT_EXPIRY_MIN * 60 | |
| else: | |
| expire = 0 # END records don't expire | |
| return { | |
| "type": record_type, | |
| "category": category, | |
| "time": record_time, | |
| "expire": expire, | |
| "title": title, | |
| } | |
| def should_update(new_record): | |
| """Decide if new_record should replace current_record (matches HA logic).""" | |
| global current_record | |
| if current_record is None: | |
| return True | |
| cur = current_record | |
| # PRE_ALERT won't overwrite ALERT (from HA: pre after alert -> continue) | |
| if new_record["type"] == PRE_ALERT and cur["type"] == ALERT: | |
| return False | |
| # END replaces only if it's NEWER than current record | |
| # (prevents old END from clearing a newer ALERT) | |
| if new_record["type"] == END: | |
| return new_record["time"] > cur["time"] | |
| # Newer record replaces older | |
| if new_record["time"] <= cur["time"]: | |
| return False | |
| # Dedup: same category within DEDUP_WINDOW ignored (from HA) | |
| if (new_record["category"] == cur["category"] and | |
| new_record["time"] - cur["time"] <= DEDUP_WINDOW): | |
| return False | |
| return True | |
| def remove_expired(): | |
| """Replace expired records with synthetic END (matches HA _remove_expired).""" | |
| global current_record, cohort_cities, siren_cohort_cities, siren_cohort_stable_since | |
| if current_record is None: | |
| return | |
| if current_record["expire"] > 0 and time.time() > current_record["expire"]: | |
| old_type = current_record["type"] | |
| current_record = make_record(END, HIST_END_CATEGORY, "expired") | |
| cohort_cities = set() | |
| siren_cohort_cities = set() | |
| siren_cohort_stable_since = None | |
| add_event("green", "expired ({})".format(old_type), "SYS", changed=True) | |
| print("EXPIRED: {} record -> END".format(old_type), file=sys.stderr) | |
| def classify_realtime(cat, title): | |
| """Classify a real-time API alert (matches HA _current_to_history_format).""" | |
| # Cat 10 = message category; check title for pre-alert word or end phrase | |
| if cat == RT_MESSAGE_CATEGORY: | |
| if PRE_ALERT_WORD in title: | |
| return PRE_ALERT, HIST_PRE_ALERT_CATEGORY | |
| elif "הסתיים" in title: | |
| # All-clear message ("האירוע הסתיים") → END | |
| return END, HIST_END_CATEGORY | |
| else: | |
| # Terror attack in history = cat 10 | |
| return ALERT, RT_TO_HIST.get(cat, cat) | |
| # Map real-time cat to history cat | |
| hist_cat = RT_TO_HIST.get(cat, None) | |
| if hist_cat is None: | |
| return None, None # unknown category, skip | |
| if hist_cat in HIST_ALERT_CATEGORIES: | |
| return ALERT, hist_cat | |
| return None, None | |
| def classify_history(cat): | |
| """Classify a history API record.""" | |
| if cat == HIST_END_CATEGORY: | |
| return END | |
| elif cat == HIST_PRE_ALERT_CATEGORY: | |
| return PRE_ALERT | |
| elif cat in HIST_ALERT_CATEGORIES: | |
| return ALERT | |
| return None | |
| def parse_time(date_str): | |
| """Parse alert date string to unix timestamp.""" | |
| try: | |
| dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") | |
| return dt.timestamp() | |
| except Exception: | |
| return None | |
| def decode_response(raw): | |
| """Decode API response, handling BOM and encoding quirks.""" | |
| return raw.decode('utf-8-sig').replace('\x00', '').strip() | |
| def fetch_url(url, timeout=10): | |
| """Fetch and decode a URL, return parsed JSON or None.""" | |
| try: | |
| req = urllib.request.Request(url, headers=API_HEADERS) | |
| with urllib.request.urlopen(req, timeout=timeout, context=ssl_ctx) as resp: | |
| text = decode_response(resp.read()) | |
| if not text: | |
| return None | |
| return json.loads(text) | |
| except Exception as e: | |
| print("Fetch error ({}): {}".format(url.split('/')[-1], e), file=sys.stderr) | |
| return None | |
| def process_realtime(): | |
| """Poll Alerts.json and process real-time alerts for our city. | |
| Returns True if ANY alerts are active (even for other cities).""" | |
| global current_record, cohort_cities, siren_cohort_cities, siren_cohort_stable_since | |
| data = fetch_url(API_URL, timeout=5) | |
| if data is None: | |
| return False | |
| if isinstance(data, dict): | |
| data = [data] | |
| if not isinstance(data, list): | |
| return False | |
| any_active = len(data) > 0 | |
| # If no active alerts, clear cohort tracking | |
| if not any_active: | |
| if cohort_cities: | |
| cohort_cities = set() | |
| siren_cohort_cities = set() | |
| siren_cohort_stable_since = None | |
| return False | |
| # --- Pass 1: process alerts for our city --- | |
| for alert in data: | |
| cat = alert.get("cat", 0) | |
| if isinstance(cat, str): | |
| cat = int(cat) | |
| areas = alert.get("data", []) | |
| if isinstance(areas, str): | |
| areas = [areas] | |
| title = alert.get("title", "") | |
| if CITY not in areas: | |
| continue | |
| record_type, hist_cat = classify_realtime(cat, title) | |
| if record_type is None: | |
| print("RT: unknown cat={} title={} (skipped)".format(cat, title), | |
| file=sys.stderr) | |
| continue | |
| new_rec = make_record(record_type, hist_cat, title) | |
| print("RT: cat={} type={} title={}".format(cat, record_type, title), | |
| file=sys.stderr) | |
| with lock: | |
| # Dedup repeated RT polls (same alert every 2s) | |
| global last_rt_key | |
| rt_key = "{}|{}|{}".format(record_type, hist_cat, title) | |
| is_new_rt = (rt_key != last_rt_key) | |
| last_rt_key = rt_key | |
| if not is_new_rt: | |
| # Same RT message as last poll — skip entirely | |
| continue | |
| # When we get a PRE_ALERT, record the cohort cities | |
| if record_type == PRE_ALERT: | |
| new_cohort = set(areas) - {CITY} | |
| if new_cohort != cohort_cities: | |
| cohort_cities = new_cohort | |
| siren_cohort_cities = set() | |
| siren_cohort_stable_since = None | |
| print("COHORT: tracking {} cities: {}".format( | |
| len(cohort_cities), | |
| ", ".join(sorted(cohort_cities))), file=sys.stderr) | |
| # If state is leaving PRE_ALERT, clear cohort | |
| if record_type != PRE_ALERT and should_update(new_rec): | |
| cohort_cities = set() | |
| siren_cohort_cities = set() | |
| siren_cohort_stable_since = None | |
| if should_update(new_rec): | |
| prev = get_color() | |
| current_record = new_rec | |
| now_color = get_color() | |
| changed = (now_color != prev) | |
| if changed: | |
| print("{} -> {} (RT cat={} type={})".format( | |
| prev.upper(), now_color.upper(), cat, record_type), | |
| file=sys.stderr) | |
| add_event(now_color, title, "RT", changed=changed) | |
| else: | |
| add_event(get_color(), title, "RT", changed=False) | |
| # --- Pass 2: check if cohort cities have sirens --- | |
| if cohort_cities and current_record and current_record["type"] == PRE_ALERT: | |
| for alert in data: | |
| cat = alert.get("cat", 0) | |
| if isinstance(cat, str): | |
| cat = int(cat) | |
| areas = alert.get("data", []) | |
| if isinstance(areas, str): | |
| areas = [areas] | |
| # Only consider ALERT-type entries | |
| record_type, _ = classify_realtime(cat, alert.get("title", "")) | |
| if record_type != ALERT: | |
| continue | |
| # Check which cohort cities are in this siren | |
| newly_hit = (set(areas) & cohort_cities) - siren_cohort_cities | |
| if newly_hit: | |
| with lock: | |
| siren_cohort_cities |= newly_hit | |
| siren_cohort_stable_since = time.time() | |
| print("COHORT SIREN: {} new: {} (total {}/{})".format( | |
| ", ".join(sorted(newly_hit)), | |
| len(newly_hit), len(siren_cohort_cities), | |
| len(cohort_cities)), file=sys.stderr) | |
| # Check if we just transitioned to yellow_orange | |
| with lock: | |
| color_now = get_color() | |
| if color_now == "yellow_orange": | |
| # Check if we already logged this transition | |
| last_evt = event_log[-1] if event_log else None | |
| if not last_evt or last_evt.get("color") != "yellow_orange": | |
| add_event("yellow_orange", "כנראה עבר", "SYS", changed=True) | |
| print("ORANGE -> YELLOW_ORANGE (cohort stable {}s)".format( | |
| COHORT_STABLE_SECONDS), file=sys.stderr) | |
| return any_active | |
| def process_history(): | |
| """Poll AlertsHistory.json for our city (catchup + end detection).""" | |
| global current_record | |
| data = fetch_url(HISTORY_URL) | |
| if data is None or not isinstance(data, list): | |
| return | |
| for record in data: | |
| area = record.get("data", "") | |
| if area != CITY: | |
| continue | |
| cat = record.get("category", 0) | |
| if isinstance(cat, str): | |
| cat = int(cat) | |
| date_str = record.get("alertDate", "") | |
| title = record.get("title", "") | |
| record_time = parse_time(date_str) | |
| if record_time is None: | |
| continue | |
| # Dedup: only log each history record once | |
| hist_key = "{}|{}|{}".format(cat, date_str, title) | |
| is_new_hist = hist_key not in seen_history | |
| if is_new_hist: | |
| seen_history.add(hist_key) | |
| record_type_log = classify_history(cat) | |
| type_str = record_type_log if record_type_log else "unknown" | |
| print("HIST: cat={} type={} date={} title={}".format( | |
| cat, type_str, date_str, title), file=sys.stderr) | |
| record_type = classify_history(cat) | |
| if record_type is None: | |
| continue | |
| new_rec = make_record(record_type, cat, title, record_time) | |
| with lock: | |
| if should_update(new_rec): | |
| prev = get_color() | |
| current_record = new_rec | |
| now_color = get_color() | |
| changed = (now_color != prev) | |
| if changed: | |
| print("{} -> {} (HIST cat={} type={} date={})".format( | |
| prev.upper(), now_color.upper(), cat, record_type, | |
| date_str), file=sys.stderr) | |
| if is_new_hist or changed: | |
| add_event(now_color, title, "HIST", changed=changed, | |
| event_time=record_time) | |
| elif is_new_hist: | |
| add_event(get_color(), title, "HIST", changed=False, | |
| event_time=record_time) | |
| # Only process the first matching record (newest) | |
| break | |
| def poll_loop(): | |
| """Main polling loop matching HA OrefAlertCoordinatorUpdater.""" | |
| poll_count = 0 | |
| while True: | |
| # Remove expired records (matches HA _remove_expired) | |
| with lock: | |
| remove_expired() | |
| # Process real-time alerts | |
| any_active = process_realtime() | |
| # Periodically check history (every ~30s when in alert, for end detection) | |
| poll_count += 1 | |
| if any_active and (current_record is None or current_record["type"] == END): | |
| # Alerts active elsewhere — check history immediately for our city | |
| process_history() | |
| elif current_record is not None and current_record["type"] != END: | |
| if poll_count % 15 == 0: # every 15 polls = ~30s at 2s interval | |
| process_history() | |
| elif poll_count % 3 == 0: # every ~1min when idle | |
| process_history() | |
| # Adaptive polling (matches HA OrefAlertCoordinatorUpdater._async_update) | |
| now = time.time() | |
| rec = current_record | |
| if rec and rec["type"] in (ALERT, PRE_ALERT): | |
| time.sleep(POLL_INTERVAL_ACTIVE) | |
| elif rec and (now - rec["time"]) < ACTIVE_WINDOW: | |
| time.sleep(POLL_INTERVAL_ACTIVE) | |
| else: | |
| time.sleep(POLL_INTERVAL_IDLE) | |
| HTML_PAGE = u"""<!DOCTYPE html> | |
| <html lang="he" dir="rtl"> | |
| <head> | |
| <meta charset="utf-8"> | |
| <meta name="viewport" content="width=device-width,initial-scale=1"> | |
| <title>Alert Display</title> | |
| <style> | |
| *{margin:0;padding:0;box-sizing:border-box} | |
| html,body{width:100vw;height:100vh;overflow:hidden;font-family:Arial,sans-serif; | |
| transition:background-color 0.5s ease} | |
| body.green{background:#00c853} | |
| body.orange{background:#ff9100} | |
| body.yellow_orange{background:#ffab00} | |
| body.red{background:#d50000} | |
| .c{display:flex;flex-direction:column;justify-content:center;align-items:center; | |
| height:100vh;color:#fff;text-shadow:2px 2px 8px rgba(0,0,0,.5)} | |
| .s{font-size:10vw;font-weight:bold} | |
| .clockrow{display:flex;align-items:baseline;margin-top:2vh;direction:ltr} | |
| .clock{font-size:7vw;font-weight:300;font-variant-numeric:tabular-nums} | |
| .sec{font-size:3vw;font-weight:300;opacity:.8;font-variant-numeric:tabular-nums} | |
| .err{font-size:1.5vw;margin-top:2vh;color:rgba(255,255,255,.7);display:none} | |
| .err.show{display:block} | |
| .evlog{position:fixed;bottom:1.5vh;right:1.5vw;background:rgba(0,0,0,.25); | |
| border-radius:8px;padding:8px 12px;color:#fff;font-size:1.3vw;direction:rtl; | |
| max-width:35vw;pointer-events:none} | |
| .evlog h3{font-size:1.1vw;opacity:.7;margin:0 0 4px 0;font-weight:400} | |
| .evrow{display:flex;align-items:center;gap:6px;padding:2px 0} | |
| .evrow.noticed{color:rgba(255,255,255,.5)} | |
| .evdot{width:0.9vw;height:0.9vw;border-radius:50%;flex-shrink:0} | |
| .evdot.green{background:#00c853}.evdot.orange{background:#ff9100}.evdot.yellow_orange{background:#ffab00}.evdot.red{background:#d50000} | |
| .evtime{font-size:1.1vw;opacity:.7;font-variant-numeric:tabular-nums;direction:ltr} | |
| .evtitle{font-size:1.1vw;white-space:nowrap;overflow:hidden;text-overflow:ellipsis} | |
| .evsrc{font-size:0.9vw;opacity:.5} | |
| </style> | |
| </head> | |
| <body style="background:#222"> | |
| <div class="c" id="main" style="display:none"> | |
| <div class="s" id="s">\u05d4\u05db\u05dc \u05ea\u05e7\u05d9\u05df</div> | |
| <div class="clockrow"><span class="clock" id="clock"></span><span class="sec" id="sec"></span></div> | |
| <div class="err" id="err"></div> | |
| </div> | |
| <div class="evlog" id="evlog" style="display:none"> | |
| <h3>\u05d0\u05d9\u05e8\u05d5\u05e2\u05d9\u05dd \u05d0\u05d7\u05e8\u05d5\u05e0\u05d9\u05dd</h3> | |
| <div id="evrows"></div> | |
| </div> | |
| <script> | |
| var S={"green":"\u05d4\u05db\u05dc \u05ea\u05e7\u05d9\u05df", | |
| "orange":"\u05d4\u05ea\u05e8\u05d0\u05d4 \u05de\u05d5\u05e7\u05d3\u05de\u05ea", | |
| "yellow_orange":"\u05db\u05e0\u05e8\u05d0\u05d4 \u05e2\u05d1\u05e8", | |
| "red":"\u05d0\u05d6\u05e2\u05e7\u05d4!"}; | |
| var fails=0; | |
| function u(){fetch("/status").then(function(r){return r.json()}).then(function(d){ | |
| fails=0; | |
| document.getElementById("err").className="err"; | |
| document.body.className=d.color; | |
| document.body.style.background=""; | |
| document.getElementById("main").style.display=""; | |
| document.getElementById("s").textContent=S[d.color]||d.color; | |
| }).catch(function(){ | |
| fails++; | |
| if(fails>=3){ | |
| document.getElementById("err").className="err show"; | |
| document.getElementById("err").textContent="\u26a0 \u05d0\u05d9\u05df \u05d7\u05d9\u05d1\u05d5\u05e8 \u05dc\u05e9\u05e8\u05ea"; | |
| } | |
| });} | |
| setInterval(u,2000);u(); | |
| function loadEvents(){fetch("/events").then(function(r){return r.json()}).then(function(evts){ | |
| var el=document.getElementById("evlog"); | |
| var rows=document.getElementById("evrows"); | |
| if(!evts||evts.length===0){el.style.display="none";return;} | |
| el.style.display=""; | |
| var h=""; | |
| for(var i=evts.length-1;i>=0;i--){var e=evts[i]; | |
| var cls=e.changed?"evrow":"evrow noticed"; | |
| var arrow=e.changed?"\u25c0 ":""; | |
| h+='<div class="'+cls+'"><span class="evdot '+e.color+'"></span>' | |
| +'<span class="evtime">'+e.date_str+" "+e.time_str+'</span>' | |
| +'<span class="evtitle">'+arrow+e.title+'</span>' | |
| +'<span class="evsrc">'+e.source+'</span></div>'; | |
| } | |
| rows.innerHTML=h; | |
| }).catch(function(){});} | |
| setInterval(loadEvents,5000);loadEvents(); | |
| function tick(){var n=new Date();var h=String(n.getHours()).padStart(2,"0");var m=String(n.getMinutes()).padStart(2,"0");var s=String(n.getSeconds()).padStart(2,"0");document.getElementById("clock").textContent=h+":"+m;document.getElementById("sec").textContent=":"+s;} | |
| setInterval(tick,1000);tick(); | |
| </script> | |
| </body> | |
| </html>""" | |
| class AlertServer(http.server.HTTPServer): | |
| allow_reuse_address = True | |
| class Handler(http.server.BaseHTTPRequestHandler): | |
| def do_GET(self): | |
| if self.path == "/status": | |
| with lock: | |
| data = json.dumps({"color": get_color()}) | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.end_headers() | |
| self.wfile.write(data.encode()) | |
| elif self.path == "/debug": | |
| with lock: | |
| rec = dict(current_record) if current_record else None | |
| if rec and "time" in rec: | |
| rec["time_str"] = datetime.fromtimestamp(rec["time"]).strftime("%H:%M:%S") | |
| if rec and "expire" in rec and rec["expire"] > 0: | |
| rec["expire_str"] = datetime.fromtimestamp(rec["expire"]).strftime("%H:%M:%S") | |
| data = json.dumps({ | |
| "color": get_color(), "record": rec, | |
| "cohort": { | |
| "cities": sorted(cohort_cities), | |
| "siren_cities": sorted(siren_cohort_cities), | |
| "stable_since": siren_cohort_stable_since, | |
| } if cohort_cities else None | |
| }, ensure_ascii=False) | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.end_headers() | |
| self.wfile.write(data.encode()) | |
| elif self.path == "/events": | |
| with lock: | |
| data = json.dumps(event_log[-10:], ensure_ascii=False) | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.end_headers() | |
| self.wfile.write(data.encode()) | |
| elif self.path in ("/", "/index.html"): | |
| self.send_response(200) | |
| self.send_header("Content-Type", "text/html; charset=utf-8") | |
| self.end_headers() | |
| self.wfile.write(HTML_PAGE.encode("utf-8")) | |
| else: | |
| self.send_error(404) | |
| def log_message(self, fmt, *args): | |
| pass | |
| if __name__ == "__main__": | |
| print("Starting Pikud HaOref alert display for Beit Shemesh...") | |
| print("Checking alert history for catchup...") | |
| process_history() | |
| color = get_color() | |
| print("Current state after catchup: {}".format(color)) | |
| # Seed initial event so overlay is never empty | |
| add_event(color, "server started", "SYS", changed=False) | |
| t = threading.Thread(target=poll_loop) | |
| t.daemon = True | |
| t.start() | |
| print("Serving on http://0.0.0.0:{}".format(PORT)) | |
| srv = AlertServer(("0.0.0.0", PORT), Handler) | |
| try: | |
| srv.serve_forever() | |
| except KeyboardInterrupt: | |
| print("\nShutdown.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment