Skip to content

Instantly share code, notes, and snippets.

@erick-otenyo
Last active February 11, 2026 09:56
Show Gist options
  • Select an option

  • Save erick-otenyo/811118b4e572c2af6a03e0bcfb9e4421 to your computer and use it in GitHub Desktop.

Select an option

Save erick-otenyo/811118b4e572c2af6a03e0bcfb9e4421 to your computer and use it in GitHub Desktop.
"""
WIS2box SYNOP BUFR Downloader - INAM Mozambique
=================================================
Subscribes to INAM WIS2box MQTT broker and downloads SYNOP BUFR files
as they are published. Files are organized by WIGOS station ID and date.
Directory structure:
bufr_data/
└── {wigos_station_id}/
└── {year}/
└── {month}/
└── {day}/
├── messages.json # log of all downloaded messages
├── {filename}.bufr
└── {filename}.bufr
MQTT Broker: wis2.inam.gov.mz:1883
Credentials: everyone / everyone
Usage:
pip install paho-mqtt requests
python wis2box_downloader_inam.py
"""
import json
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import paho.mqtt.client as mqtt
import requests
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
MQTT_BROKER = "wis2.inam.gov.mz"
MQTT_PORT = 1883
MQTT_USERNAME = "everyone"
MQTT_PASSWORD = "everyone"
# Topics for surface observations / SYNOP
MQTT_TOPICS = [
{
"topic": "origin/a/wis2/mz-inam/data/core/weather/surface-based-observations/synop",
"qos": 1
}
]
# Root directory for downloaded data
OUTPUT_DIR = Path("./bufr_data")
# Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("wis2box_downloader.log"),
],
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def extract_wigos_id(notification):
"""
Extract the WIGOS Station Identifier from a WIS2 notification.
"""
props = notification.get("properties", {})
# Direct field
wsi = props.get("wigos_station_identifier")
if not wsi:
raise Exception("WIGOS Station Identifier not found")
return wsi
def extract_pubtime(notification):
"""Extract publication time as a datetime object."""
pub_time = notification.get("properties", {}).get("pubtime", "")
if pub_time:
try:
return datetime.fromisoformat(pub_time.replace("Z", "+00:00"))
except ValueError:
raise Exception("Invalid pubtime format")
raise Exception("Publication time not found")
def get_day_dir(wigos_id, dt):
"""Build the output path: bufr_data/{wigos_id}/{year}/{month}/{day}/"""
day_dir = OUTPUT_DIR / wigos_id / str(dt.year) / f"{dt.month:02d}" / f"{dt.day:02d}"
day_dir.mkdir(parents=True, exist_ok=True)
return day_dir
def load_messages_log(day_dir):
"""Load or initialize the daily messages.json log."""
log_path = day_dir / "messages.json"
if log_path.exists():
try:
return json.loads(log_path.read_text())
except (json.JSONDecodeError, OSError):
pass
return {"station": "", "date": "", "messages": []}
def save_messages_log(day_dir, log_data):
"""Save the daily messages.json log."""
log_path = day_dir / "messages.json"
log_path.write_text(json.dumps(log_data, indent=2))
def is_already_downloaded(log_data, message_id):
"""Check if a message has already been downloaded."""
return any(m["id"] == message_id for m in log_data["messages"])
# ---------------------------------------------------------------------------
# MQTT Callbacks
# ---------------------------------------------------------------------------
def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
logger.info("Connected to %s:%d", MQTT_BROKER, MQTT_PORT)
# Subscribe to topics
for mqtt_topic in MQTT_TOPICS:
topic = mqtt_topic["topic"]
qos = mqtt_topic.get("qos", 0)
client.subscribe(topic, qos=qos)
logger.info("Subscribed: %s", topic)
else:
logger.error("Connection failed: rc=%d", rc)
def on_disconnect(client, userdata, flags, rc, properties=None):
if rc != 0:
logger.warning("Unexpected disconnection (rc=%d), reconnecting...", rc)
def on_message(client, userdata, msg):
try:
# Parse the notification JSON
notification = json.loads(msg.payload.decode("utf-8"))
message_id = notification.get("id", "unknown")
# Extract WIGOS ID and publication time for logging and directory structure
wigos_id = extract_wigos_id(notification)
dt = extract_pubtime(notification)
date_str = dt.strftime("%Y-%m-%d")
logger.info(
"Notification: %s | Station: %s | %s",
message_id, wigos_id, date_str,
)
# Set up the day directory and load the message log
day_dir = get_day_dir(wigos_id, dt)
log_data = load_messages_log(day_dir)
log_data["station"] = wigos_id
log_data["date"] = date_str
# Skip if already downloaded
if is_already_downloaded(log_data, message_id):
logger.info(" Already downloaded, skipping.")
return
# Find the download URL
canonical_url = None
links = notification.get("links", [])
for link in links:
if link.get("rel") == "canonical" and link.get("href"):
canonical_url = link["href"]
break
if not canonical_url:
for link in links:
href = link.get("href", "")
if href.endswith(".bufr") or href.endswith(".bufr4"):
canonical_url = href
break
if not canonical_url and links:
canonical_url = links[0].get("href")
if not canonical_url:
logger.warning(" No download link found.")
return
# Download the file
logger.info(" Downloading: %s", canonical_url)
response = requests.get(canonical_url, timeout=30)
response.raise_for_status()
# Save the BUFR file
filename = canonical_url.split("/")[-1].split("?")[0]
if not filename.endswith((".bufr", ".bufr4")):
filename = f"{filename}.bufr"
filepath = day_dir / filename
if filepath.exists():
stem = filepath.stem
suffix = filepath.suffix
filepath = day_dir / f"{stem}_{dt.strftime('%H%M%S')}{suffix}"
filepath.write_bytes(response.content)
logger.info(" Saved: %s (%d bytes)", filepath, len(response.content))
# Record in the daily message log
log_data["messages"].append({
"id": message_id,
"topic": msg.topic,
"pubtime": notification.get("properties", {}).get("pubtime", ""),
"data_id": notification.get("properties", {}).get("data_id", ""),
"url": canonical_url,
"filename": filepath.name,
"size_bytes": len(response.content),
"downloaded_at": datetime.now(timezone.utc).isoformat(),
})
save_messages_log(day_dir, log_data)
except json.JSONDecodeError:
logger.error("Failed to parse notification JSON")
except requests.exceptions.RequestException as e:
logger.error(" Download failed: %s", e)
except Exception as e:
logger.error("Error: %s", e, exc_info=True)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
logger.info("WIS2box BUFR Downloader - INAM Mozambique")
logger.info("Broker: %s:%d", MQTT_BROKER, MQTT_PORT)
logger.info("Output: %s", OUTPUT_DIR.resolve())
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
try:
client = mqtt.Client(
client_id=f"diana-dl-{int(time.time())}",
protocol=mqtt.MQTTv311,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
except TypeError:
client = mqtt.Client(client_id=f"diana-dl-{int(time.time())}")
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.reconnect_delay_set(min_delay=1, max_delay=60)
try:
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
client.loop_forever()
except KeyboardInterrupt:
logger.info("Stopped by user.")
client.disconnect()
except Exception as e:
logger.error("Fatal: %s", e, exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment