Last active
February 11, 2026 09:56
-
-
Save erick-otenyo/811118b4e572c2af6a03e0bcfb9e4421 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| WIS2box SYNOP BUFR Downloader - INAM Mozambique | |
| ================================================= | |
| Subscribes to INAM WIS2box MQTT broker and downloads SYNOP BUFR files | |
| as they are published. Files are organized by WIGOS station ID and date. | |
| Directory structure: | |
| bufr_data/ | |
| └── {wigos_station_id}/ | |
| └── {year}/ | |
| └── {month}/ | |
| └── {day}/ | |
| ├── messages.json # log of all downloaded messages | |
| ├── {filename}.bufr | |
| └── {filename}.bufr | |
| MQTT Broker: wis2.inam.gov.mz:1883 | |
| Credentials: everyone / everyone | |
| Usage: | |
| pip install paho-mqtt requests | |
| python wis2box_downloader_inam.py | |
| """ | |
| import json | |
| import logging | |
| import sys | |
| import time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import paho.mqtt.client as mqtt | |
| import requests | |
| # --------------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------------- | |
| MQTT_BROKER = "wis2.inam.gov.mz" | |
| MQTT_PORT = 1883 | |
| MQTT_USERNAME = "everyone" | |
| MQTT_PASSWORD = "everyone" | |
| # Topics for surface observations / SYNOP | |
| MQTT_TOPICS = [ | |
| { | |
| "topic": "origin/a/wis2/mz-inam/data/core/weather/surface-based-observations/synop", | |
| "qos": 1 | |
| } | |
| ] | |
| # Root directory for downloaded data | |
| OUTPUT_DIR = Path("./bufr_data") | |
| # Logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler("wis2box_downloader.log"), | |
| ], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def extract_wigos_id(notification): | |
| """ | |
| Extract the WIGOS Station Identifier from a WIS2 notification. | |
| """ | |
| props = notification.get("properties", {}) | |
| # Direct field | |
| wsi = props.get("wigos_station_identifier") | |
| if not wsi: | |
| raise Exception("WIGOS Station Identifier not found") | |
| return wsi | |
| def extract_pubtime(notification): | |
| """Extract publication time as a datetime object.""" | |
| pub_time = notification.get("properties", {}).get("pubtime", "") | |
| if pub_time: | |
| try: | |
| return datetime.fromisoformat(pub_time.replace("Z", "+00:00")) | |
| except ValueError: | |
| raise Exception("Invalid pubtime format") | |
| raise Exception("Publication time not found") | |
| def get_day_dir(wigos_id, dt): | |
| """Build the output path: bufr_data/{wigos_id}/{year}/{month}/{day}/""" | |
| day_dir = OUTPUT_DIR / wigos_id / str(dt.year) / f"{dt.month:02d}" / f"{dt.day:02d}" | |
| day_dir.mkdir(parents=True, exist_ok=True) | |
| return day_dir | |
| def load_messages_log(day_dir): | |
| """Load or initialize the daily messages.json log.""" | |
| log_path = day_dir / "messages.json" | |
| if log_path.exists(): | |
| try: | |
| return json.loads(log_path.read_text()) | |
| except (json.JSONDecodeError, OSError): | |
| pass | |
| return {"station": "", "date": "", "messages": []} | |
| def save_messages_log(day_dir, log_data): | |
| """Save the daily messages.json log.""" | |
| log_path = day_dir / "messages.json" | |
| log_path.write_text(json.dumps(log_data, indent=2)) | |
| def is_already_downloaded(log_data, message_id): | |
| """Check if a message has already been downloaded.""" | |
| return any(m["id"] == message_id for m in log_data["messages"]) | |
| # --------------------------------------------------------------------------- | |
| # MQTT Callbacks | |
| # --------------------------------------------------------------------------- | |
| def on_connect(client, userdata, flags, rc, properties=None): | |
| if rc == 0: | |
| logger.info("Connected to %s:%d", MQTT_BROKER, MQTT_PORT) | |
| # Subscribe to topics | |
| for mqtt_topic in MQTT_TOPICS: | |
| topic = mqtt_topic["topic"] | |
| qos = mqtt_topic.get("qos", 0) | |
| client.subscribe(topic, qos=qos) | |
| logger.info("Subscribed: %s", topic) | |
| else: | |
| logger.error("Connection failed: rc=%d", rc) | |
| def on_disconnect(client, userdata, flags, rc, properties=None): | |
| if rc != 0: | |
| logger.warning("Unexpected disconnection (rc=%d), reconnecting...", rc) | |
| def on_message(client, userdata, msg): | |
| try: | |
| # Parse the notification JSON | |
| notification = json.loads(msg.payload.decode("utf-8")) | |
| message_id = notification.get("id", "unknown") | |
| # Extract WIGOS ID and publication time for logging and directory structure | |
| wigos_id = extract_wigos_id(notification) | |
| dt = extract_pubtime(notification) | |
| date_str = dt.strftime("%Y-%m-%d") | |
| logger.info( | |
| "Notification: %s | Station: %s | %s", | |
| message_id, wigos_id, date_str, | |
| ) | |
| # Set up the day directory and load the message log | |
| day_dir = get_day_dir(wigos_id, dt) | |
| log_data = load_messages_log(day_dir) | |
| log_data["station"] = wigos_id | |
| log_data["date"] = date_str | |
| # Skip if already downloaded | |
| if is_already_downloaded(log_data, message_id): | |
| logger.info(" Already downloaded, skipping.") | |
| return | |
| # Find the download URL | |
| canonical_url = None | |
| links = notification.get("links", []) | |
| for link in links: | |
| if link.get("rel") == "canonical" and link.get("href"): | |
| canonical_url = link["href"] | |
| break | |
| if not canonical_url: | |
| for link in links: | |
| href = link.get("href", "") | |
| if href.endswith(".bufr") or href.endswith(".bufr4"): | |
| canonical_url = href | |
| break | |
| if not canonical_url and links: | |
| canonical_url = links[0].get("href") | |
| if not canonical_url: | |
| logger.warning(" No download link found.") | |
| return | |
| # Download the file | |
| logger.info(" Downloading: %s", canonical_url) | |
| response = requests.get(canonical_url, timeout=30) | |
| response.raise_for_status() | |
| # Save the BUFR file | |
| filename = canonical_url.split("/")[-1].split("?")[0] | |
| if not filename.endswith((".bufr", ".bufr4")): | |
| filename = f"{filename}.bufr" | |
| filepath = day_dir / filename | |
| if filepath.exists(): | |
| stem = filepath.stem | |
| suffix = filepath.suffix | |
| filepath = day_dir / f"{stem}_{dt.strftime('%H%M%S')}{suffix}" | |
| filepath.write_bytes(response.content) | |
| logger.info(" Saved: %s (%d bytes)", filepath, len(response.content)) | |
| # Record in the daily message log | |
| log_data["messages"].append({ | |
| "id": message_id, | |
| "topic": msg.topic, | |
| "pubtime": notification.get("properties", {}).get("pubtime", ""), | |
| "data_id": notification.get("properties", {}).get("data_id", ""), | |
| "url": canonical_url, | |
| "filename": filepath.name, | |
| "size_bytes": len(response.content), | |
| "downloaded_at": datetime.now(timezone.utc).isoformat(), | |
| }) | |
| save_messages_log(day_dir, log_data) | |
| except json.JSONDecodeError: | |
| logger.error("Failed to parse notification JSON") | |
| except requests.exceptions.RequestException as e: | |
| logger.error(" Download failed: %s", e) | |
| except Exception as e: | |
| logger.error("Error: %s", e, exc_info=True) | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| logger.info("WIS2box BUFR Downloader - INAM Mozambique") | |
| logger.info("Broker: %s:%d", MQTT_BROKER, MQTT_PORT) | |
| logger.info("Output: %s", OUTPUT_DIR.resolve()) | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| try: | |
| client = mqtt.Client( | |
| client_id=f"diana-dl-{int(time.time())}", | |
| protocol=mqtt.MQTTv311, | |
| callback_api_version=mqtt.CallbackAPIVersion.VERSION2 | |
| ) | |
| except TypeError: | |
| client = mqtt.Client(client_id=f"diana-dl-{int(time.time())}") | |
| client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) | |
| client.on_connect = on_connect | |
| client.on_disconnect = on_disconnect | |
| client.on_message = on_message | |
| client.reconnect_delay_set(min_delay=1, max_delay=60) | |
| try: | |
| client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60) | |
| client.loop_forever() | |
| except KeyboardInterrupt: | |
| logger.info("Stopped by user.") | |
| client.disconnect() | |
| except Exception as e: | |
| logger.error("Fatal: %s", e, exc_info=True) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment