Created
March 15, 2026 10:26
-
-
Save avsm/e563f84e68182246d6d16fffa5d91b08 to your computer and use it in GitHub Desktop.
Westminister bin collection Python script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.10" | |
| # /// | |
| """Look up Westminster bin collection schedules by street name. | |
| Usage: | |
| uv run westminster_bins.py "Abbey Road" | |
| uv run westminster_bins.py --list # dump all street names | |
| uv run westminster_bins.py --dump-streets # dump name→USRN mapping as JSON | |
| """ | |
| import json | |
| import sys | |
| import urllib.request | |
| from html.parser import HTMLParser | |
| BASE = "https://transact.westminster.gov.uk/env" | |
| STREET_LIST_URL = f"{BASE}/streetsearch.aspx" | |
| REPORT_URL = f"{BASE}/streetreport.aspx" | |
| # --------------------------------------------------------------------------- | |
| # 1. Scrape the street dropdown to build name → USRN mapping | |
| # --------------------------------------------------------------------------- | |
| class OptionParser(HTMLParser): | |
| """Extract <option value="USRN">Street Name</option> pairs.""" | |
| def __init__(self): | |
| super().__init__() | |
| self.streets: dict[str, str] = {} # name → USRN | |
| self._in_option = False | |
| self._current_value = None | |
| self._current_text = "" | |
| def handle_starttag(self, tag, attrs): | |
| if tag == "option": | |
| self._in_option = True | |
| self._current_value = dict(attrs).get("value", "") | |
| self._current_text = "" | |
| def handle_data(self, data): | |
| if self._in_option: | |
| self._current_text += data | |
| def handle_endtag(self, tag): | |
| if tag == "option" and self._in_option: | |
| name = self._current_text.strip() | |
| if name and self._current_value: | |
| self.streets[name] = self._current_value | |
| self._in_option = False | |
| def fetch_streets() -> dict[str, str]: | |
| """Return {street_name: usrn} for all Westminster streets.""" | |
| req = urllib.request.Request(STREET_LIST_URL, headers={"User-Agent": "westminster-bins/1.0"}) | |
| with urllib.request.urlopen(req) as resp: | |
| html = resp.read().decode("utf-8", errors="replace") | |
| parser = OptionParser() | |
| parser.feed(html) | |
| return parser.streets | |
| def find_street(query: str, streets: dict[str, str]) -> tuple[str, str]: | |
| """Fuzzy-match a query against the street list. Returns (name, usrn).""" | |
| q = query.lower().strip() | |
| # exact match first | |
| for name, usrn in streets.items(): | |
| if name.lower() == q: | |
| return name, usrn | |
| # prefix match | |
| matches = [(n, u) for n, u in streets.items() if n.lower().startswith(q)] | |
| if len(matches) == 1: | |
| return matches[0] | |
| # substring match | |
| if not matches: | |
| matches = [(n, u) for n, u in streets.items() if q in n.lower()] | |
| if len(matches) == 1: | |
| return matches[0] | |
| if len(matches) == 0: | |
| print(f"No street found matching '{query}'", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"Ambiguous query '{query}' — {len(matches)} matches:", file=sys.stderr) | |
| for name, _ in matches[:20]: | |
| print(f" {name}", file=sys.stderr) | |
| if len(matches) > 20: | |
| print(f" ... and {len(matches) - 20} more", file=sys.stderr) | |
| sys.exit(1) | |
| # --------------------------------------------------------------------------- | |
| # 2. Fetch and parse the report page | |
| # --------------------------------------------------------------------------- | |
| class TableParser(HTMLParser): | |
| """Parse HTML tables into lists of row-dicts.""" | |
| def __init__(self): | |
| super().__init__() | |
| self.sections: list[dict] = [] | |
| self._current_section: str | None = None | |
| self._in_h3 = False | |
| self._h3_text = "" | |
| self._headers: list[str] = [] | |
| self._rows: list[list[str]] = [] | |
| self._in_table = False | |
| self._in_tr = False | |
| self._in_td = False | |
| self._in_th = False | |
| self._current_row: list[str] = [] | |
| self._cell_text = "" | |
| self._in_p_strong = False | |
| self._p_strong_text = "" | |
| self._sub_heading: str | None = None | |
| self._tag_stack: list[str] = [] | |
| def handle_starttag(self, tag, attrs): | |
| self._tag_stack.append(tag) | |
| if tag == "h3": | |
| self._in_h3 = True | |
| self._h3_text = "" | |
| elif tag == "table": | |
| self._flush_table() | |
| self._in_table = True | |
| self._headers = [] | |
| self._rows = [] | |
| elif tag == "tr" and self._in_table: | |
| self._in_tr = True | |
| self._current_row = [] | |
| elif tag == "th" and self._in_tr: | |
| self._in_th = True | |
| self._cell_text = "" | |
| elif tag == "td" and self._in_tr: | |
| self._in_td = True | |
| self._cell_text = "" | |
| elif tag == "strong" or tag == "b": | |
| if "p" in self._tag_stack and not self._in_table: | |
| self._in_p_strong = True | |
| self._p_strong_text = "" | |
| def handle_data(self, data): | |
| if self._in_h3: | |
| self._h3_text += data | |
| if self._in_th: | |
| self._cell_text += data | |
| if self._in_td: | |
| self._cell_text += data | |
| if self._in_p_strong: | |
| self._p_strong_text += data | |
| def handle_endtag(self, tag): | |
| if self._tag_stack and self._tag_stack[-1] == tag: | |
| self._tag_stack.pop() | |
| if tag == "h3": | |
| self._in_h3 = False | |
| text = self._h3_text.strip() | |
| # Extract section name like "Rubbish collections for X" | |
| self._flush_table() | |
| self._current_section = text | |
| elif tag == "table": | |
| self._flush_table() | |
| self._in_table = False | |
| elif tag == "tr" and self._in_tr: | |
| if self._current_row: | |
| self._rows.append(self._current_row) | |
| self._in_tr = False | |
| elif tag == "th" and self._in_th: | |
| self._headers.append(self._cell_text.strip()) | |
| self._in_th = False | |
| elif tag == "td" and self._in_td: | |
| self._current_row.append(self._cell_text.strip()) | |
| self._in_td = False | |
| elif tag in ("strong", "b") and self._in_p_strong: | |
| self._in_p_strong = False | |
| self._sub_heading = self._p_strong_text.strip().rstrip(":") | |
| def _flush_table(self): | |
| if not self._rows: | |
| return | |
| # Clean headers | |
| headers = [h for h in self._headers if h.strip()] | |
| if not headers: | |
| return | |
| section_name = self._current_section or "Unknown" | |
| rows_dicts = [] | |
| for row in self._rows: | |
| # Pad row to match headers length | |
| padded = row + [""] * (len(headers) - len(row)) | |
| entry = {} | |
| for i, h in enumerate(headers): | |
| val = padded[i] if i < len(padded) else "" | |
| if val: | |
| entry[h] = val | |
| if entry: | |
| if self._sub_heading: | |
| entry["_sub_heading"] = self._sub_heading | |
| rows_dicts.append(entry) | |
| if rows_dicts: | |
| self.sections.append({ | |
| "section": section_name, | |
| "entries": rows_dicts, | |
| }) | |
| self._rows = [] | |
| self._headers = [] | |
| self._sub_heading = None | |
| def fetch_report(street_name: str, usrn: str) -> dict: | |
| """Fetch the report page and return parsed schedule as a dict.""" | |
| url = f"{REPORT_URL}?Street={urllib.request.quote(street_name)}&USRN={usrn}" | |
| req = urllib.request.Request(url, headers={"User-Agent": "westminster-bins/1.0"}) | |
| with urllib.request.urlopen(req) as resp: | |
| html = resp.read().decode("utf-8", errors="replace") | |
| parser = TableParser() | |
| parser.feed(html) | |
| result: dict = { | |
| "street": street_name, | |
| "usrn": usrn, | |
| } | |
| for section in parser.sections: | |
| title = section["section"] | |
| entries = section["entries"] | |
| if "rubbish" in title.lower() or "waste" in title.lower(): | |
| result["rubbish_collections"] = _clean_rubbish(entries) | |
| elif "recycling" in title.lower(): | |
| result["recycling_collections"] = _clean_recycling(entries) | |
| elif "cleaning" in title.lower() or "sweep" in title.lower(): | |
| result["street_cleaning"] = _clean_cleaning(entries) | |
| return result | |
| def _clean_rubbish(entries: list[dict]) -> list[dict]: | |
| out = [] | |
| for e in entries: | |
| item: dict = {"location": e.get("Location", "")} | |
| sub = e.get("_sub_heading", "") | |
| if sub: | |
| item["type"] = sub | |
| if e.get("Week Days"): | |
| item["weekdays"] = e["Week Days"] | |
| if e.get("Week Times"): | |
| item["weekday_times"] = e["Week Times"] | |
| if e.get("Weekend Days"): | |
| item["weekend_days"] = e["Weekend Days"] | |
| if e.get("Weekend Times"): | |
| item["weekend_times"] = e["Weekend Times"] | |
| out.append(item) | |
| return out | |
| def _clean_recycling(entries: list[dict]) -> list[dict]: | |
| out = [] | |
| for e in entries: | |
| item: dict = {"location": e.get("Location", "")} | |
| if e.get("Service Description"): | |
| item["service"] = e["Service Description"] | |
| if e.get("Week Days"): | |
| item["weekdays"] = e["Week Days"] | |
| if e.get("Week Times"): | |
| item["weekday_times"] = e["Week Times"] | |
| if e.get("Weekend Days"): | |
| item["weekend_days"] = e["Weekend Days"] | |
| if e.get("Weekend Times"): | |
| item["weekend_times"] = e["Weekend Times"] | |
| out.append(item) | |
| return out | |
| def _clean_cleaning(entries: list[dict]) -> list[dict]: | |
| out = [] | |
| for e in entries: | |
| item: dict = {"location": e.get("Location", "")} | |
| if e.get("Service Description"): | |
| item["service"] = e["Service Description"] | |
| if e.get("Week Days"): | |
| item["weekdays"] = e["Week Days"] | |
| if e.get("Weekend Days"): | |
| item["weekend_days"] = e["Weekend Days"] | |
| out.append(item) | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help"): | |
| print(__doc__.strip()) | |
| sys.exit(0) | |
| if sys.argv[1] == "--dump-streets": | |
| streets = fetch_streets() | |
| json.dump(streets, sys.stdout, indent=2) | |
| print() | |
| sys.exit(0) | |
| if sys.argv[1] == "--list": | |
| streets = fetch_streets() | |
| for name in sorted(streets): | |
| print(name) | |
| sys.exit(0) | |
| query = " ".join(sys.argv[1:]) | |
| streets = fetch_streets() | |
| name, usrn = find_street(query, streets) | |
| report = fetch_report(name, usrn) | |
| json.dump(report, sys.stdout, indent=2) | |
| print() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment