Skip to content

Instantly share code, notes, and snippets.

@avsm
Created March 15, 2026 10:26
Show Gist options
  • Select an option

  • Save avsm/e563f84e68182246d6d16fffa5d91b08 to your computer and use it in GitHub Desktop.

Select an option

Save avsm/e563f84e68182246d6d16fffa5d91b08 to your computer and use it in GitHub Desktop.
Westminister bin collection Python script
# /// script
# requires-python = ">=3.10"
# ///
"""Look up Westminster bin collection schedules by street name.
Usage:
uv run westminster_bins.py "Abbey Road"
uv run westminster_bins.py --list # dump all street names
uv run westminster_bins.py --dump-streets # dump name→USRN mapping as JSON
"""
import json
import sys
import urllib.request
from html.parser import HTMLParser
BASE = "https://transact.westminster.gov.uk/env"
STREET_LIST_URL = f"{BASE}/streetsearch.aspx"
REPORT_URL = f"{BASE}/streetreport.aspx"
# ---------------------------------------------------------------------------
# 1. Scrape the street dropdown to build name → USRN mapping
# ---------------------------------------------------------------------------
class OptionParser(HTMLParser):
"""Extract <option value="USRN">Street Name</option> pairs."""
def __init__(self):
super().__init__()
self.streets: dict[str, str] = {} # name → USRN
self._in_option = False
self._current_value = None
self._current_text = ""
def handle_starttag(self, tag, attrs):
if tag == "option":
self._in_option = True
self._current_value = dict(attrs).get("value", "")
self._current_text = ""
def handle_data(self, data):
if self._in_option:
self._current_text += data
def handle_endtag(self, tag):
if tag == "option" and self._in_option:
name = self._current_text.strip()
if name and self._current_value:
self.streets[name] = self._current_value
self._in_option = False
def fetch_streets() -> dict[str, str]:
"""Return {street_name: usrn} for all Westminster streets."""
req = urllib.request.Request(STREET_LIST_URL, headers={"User-Agent": "westminster-bins/1.0"})
with urllib.request.urlopen(req) as resp:
html = resp.read().decode("utf-8", errors="replace")
parser = OptionParser()
parser.feed(html)
return parser.streets
def find_street(query: str, streets: dict[str, str]) -> tuple[str, str]:
"""Fuzzy-match a query against the street list. Returns (name, usrn)."""
q = query.lower().strip()
# exact match first
for name, usrn in streets.items():
if name.lower() == q:
return name, usrn
# prefix match
matches = [(n, u) for n, u in streets.items() if n.lower().startswith(q)]
if len(matches) == 1:
return matches[0]
# substring match
if not matches:
matches = [(n, u) for n, u in streets.items() if q in n.lower()]
if len(matches) == 1:
return matches[0]
if len(matches) == 0:
print(f"No street found matching '{query}'", file=sys.stderr)
sys.exit(1)
print(f"Ambiguous query '{query}' — {len(matches)} matches:", file=sys.stderr)
for name, _ in matches[:20]:
print(f" {name}", file=sys.stderr)
if len(matches) > 20:
print(f" ... and {len(matches) - 20} more", file=sys.stderr)
sys.exit(1)
# ---------------------------------------------------------------------------
# 2. Fetch and parse the report page
# ---------------------------------------------------------------------------
class TableParser(HTMLParser):
"""Parse HTML tables into lists of row-dicts."""
def __init__(self):
super().__init__()
self.sections: list[dict] = []
self._current_section: str | None = None
self._in_h3 = False
self._h3_text = ""
self._headers: list[str] = []
self._rows: list[list[str]] = []
self._in_table = False
self._in_tr = False
self._in_td = False
self._in_th = False
self._current_row: list[str] = []
self._cell_text = ""
self._in_p_strong = False
self._p_strong_text = ""
self._sub_heading: str | None = None
self._tag_stack: list[str] = []
def handle_starttag(self, tag, attrs):
self._tag_stack.append(tag)
if tag == "h3":
self._in_h3 = True
self._h3_text = ""
elif tag == "table":
self._flush_table()
self._in_table = True
self._headers = []
self._rows = []
elif tag == "tr" and self._in_table:
self._in_tr = True
self._current_row = []
elif tag == "th" and self._in_tr:
self._in_th = True
self._cell_text = ""
elif tag == "td" and self._in_tr:
self._in_td = True
self._cell_text = ""
elif tag == "strong" or tag == "b":
if "p" in self._tag_stack and not self._in_table:
self._in_p_strong = True
self._p_strong_text = ""
def handle_data(self, data):
if self._in_h3:
self._h3_text += data
if self._in_th:
self._cell_text += data
if self._in_td:
self._cell_text += data
if self._in_p_strong:
self._p_strong_text += data
def handle_endtag(self, tag):
if self._tag_stack and self._tag_stack[-1] == tag:
self._tag_stack.pop()
if tag == "h3":
self._in_h3 = False
text = self._h3_text.strip()
# Extract section name like "Rubbish collections for X"
self._flush_table()
self._current_section = text
elif tag == "table":
self._flush_table()
self._in_table = False
elif tag == "tr" and self._in_tr:
if self._current_row:
self._rows.append(self._current_row)
self._in_tr = False
elif tag == "th" and self._in_th:
self._headers.append(self._cell_text.strip())
self._in_th = False
elif tag == "td" and self._in_td:
self._current_row.append(self._cell_text.strip())
self._in_td = False
elif tag in ("strong", "b") and self._in_p_strong:
self._in_p_strong = False
self._sub_heading = self._p_strong_text.strip().rstrip(":")
def _flush_table(self):
if not self._rows:
return
# Clean headers
headers = [h for h in self._headers if h.strip()]
if not headers:
return
section_name = self._current_section or "Unknown"
rows_dicts = []
for row in self._rows:
# Pad row to match headers length
padded = row + [""] * (len(headers) - len(row))
entry = {}
for i, h in enumerate(headers):
val = padded[i] if i < len(padded) else ""
if val:
entry[h] = val
if entry:
if self._sub_heading:
entry["_sub_heading"] = self._sub_heading
rows_dicts.append(entry)
if rows_dicts:
self.sections.append({
"section": section_name,
"entries": rows_dicts,
})
self._rows = []
self._headers = []
self._sub_heading = None
def fetch_report(street_name: str, usrn: str) -> dict:
"""Fetch the report page and return parsed schedule as a dict."""
url = f"{REPORT_URL}?Street={urllib.request.quote(street_name)}&USRN={usrn}"
req = urllib.request.Request(url, headers={"User-Agent": "westminster-bins/1.0"})
with urllib.request.urlopen(req) as resp:
html = resp.read().decode("utf-8", errors="replace")
parser = TableParser()
parser.feed(html)
result: dict = {
"street": street_name,
"usrn": usrn,
}
for section in parser.sections:
title = section["section"]
entries = section["entries"]
if "rubbish" in title.lower() or "waste" in title.lower():
result["rubbish_collections"] = _clean_rubbish(entries)
elif "recycling" in title.lower():
result["recycling_collections"] = _clean_recycling(entries)
elif "cleaning" in title.lower() or "sweep" in title.lower():
result["street_cleaning"] = _clean_cleaning(entries)
return result
def _clean_rubbish(entries: list[dict]) -> list[dict]:
out = []
for e in entries:
item: dict = {"location": e.get("Location", "")}
sub = e.get("_sub_heading", "")
if sub:
item["type"] = sub
if e.get("Week Days"):
item["weekdays"] = e["Week Days"]
if e.get("Week Times"):
item["weekday_times"] = e["Week Times"]
if e.get("Weekend Days"):
item["weekend_days"] = e["Weekend Days"]
if e.get("Weekend Times"):
item["weekend_times"] = e["Weekend Times"]
out.append(item)
return out
def _clean_recycling(entries: list[dict]) -> list[dict]:
out = []
for e in entries:
item: dict = {"location": e.get("Location", "")}
if e.get("Service Description"):
item["service"] = e["Service Description"]
if e.get("Week Days"):
item["weekdays"] = e["Week Days"]
if e.get("Week Times"):
item["weekday_times"] = e["Week Times"]
if e.get("Weekend Days"):
item["weekend_days"] = e["Weekend Days"]
if e.get("Weekend Times"):
item["weekend_times"] = e["Weekend Times"]
out.append(item)
return out
def _clean_cleaning(entries: list[dict]) -> list[dict]:
out = []
for e in entries:
item: dict = {"location": e.get("Location", "")}
if e.get("Service Description"):
item["service"] = e["Service Description"]
if e.get("Week Days"):
item["weekdays"] = e["Week Days"]
if e.get("Weekend Days"):
item["weekend_days"] = e["Weekend Days"]
out.append(item)
return out
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help"):
print(__doc__.strip())
sys.exit(0)
if sys.argv[1] == "--dump-streets":
streets = fetch_streets()
json.dump(streets, sys.stdout, indent=2)
print()
sys.exit(0)
if sys.argv[1] == "--list":
streets = fetch_streets()
for name in sorted(streets):
print(name)
sys.exit(0)
query = " ".join(sys.argv[1:])
streets = fetch_streets()
name, usrn = find_street(query, streets)
report = fetch_report(name, usrn)
json.dump(report, sys.stdout, indent=2)
print()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment