|
from __future__ import annotations |
|
|
|
import argparse |
|
import asyncio |
|
import json |
|
import logging |
|
import re |
|
from dataclasses import asdict, dataclass |
|
from typing import Any, Iterable, Mapping, Sequence |
|
|
|
from playwright.async_api import async_playwright |
|
|
|
SEARCH_URL = "https://steuerberaterverzeichnis.berufs-org.de/?lang=de" |
|
DETAILS_LINK_SELECTOR = "a[href^='details/']" |
|
LOGGER = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass(frozen=True) |
|
class Lead: |
|
name_lines: tuple[str, ...] |
|
address_lines: tuple[str, ...] |
|
details_url: str |
|
|
|
def format_line(self) -> str: |
|
name = " ".join(self.name_lines).strip() |
|
address = ", ".join(self.address_lines).strip() |
|
return f"{name} | {address} | {self.details_url}" |
|
|
|
|
|
@dataclass(frozen=True) |
|
class SummarySection: |
|
identity_lines: tuple[str, ...] |
|
legal_form: str | None |
|
address_lines: tuple[str, ...] |
|
contacts: "ContactsSection" |
|
safe_id: str | None |
|
|
|
|
|
@dataclass(frozen=True) |
|
class ContactsSection: |
|
phone: str | None |
|
mobile: str | None |
|
fax: str | None |
|
email: str | None |
|
website: str | None |
|
|
|
|
|
@dataclass(frozen=True) |
|
class NoteEntry: |
|
label: str |
|
value: str |
|
|
|
|
|
@dataclass(frozen=True) |
|
class NotesSection: |
|
entries: tuple[NoteEntry, ...] |
|
|
|
|
|
@dataclass(frozen=True) |
|
class ChamberSection: |
|
name: str |
|
address_lines: tuple[str, ...] |
|
|
|
|
|
@dataclass(frozen=True) |
|
class AdditionalOfficesSection: |
|
offices: tuple[str, ...] |
|
|
|
|
|
@dataclass(frozen=True) |
|
class ShareholdersSection: |
|
members: tuple[str, ...] |
|
|
|
|
|
@dataclass(frozen=True) |
|
class RegisterSection: |
|
entries: tuple[str, ...] |
|
|
|
|
|
@dataclass(frozen=True) |
|
class RepresentativeGroup: |
|
title: str |
|
members: tuple[str, ...] |
|
|
|
|
|
@dataclass(frozen=True) |
|
class RepresentativesSection: |
|
groups: tuple[RepresentativeGroup, ...] |
|
|
|
|
|
@dataclass(frozen=True) |
|
class LeadDetails: |
|
details_url: str |
|
summary: SummarySection |
|
notes: NotesSection | None |
|
chamber: ChamberSection | None |
|
additional_offices: AdditionalOfficesSection | None |
|
shareholders: ShareholdersSection | None |
|
register: RegisterSection | None |
|
representatives: RepresentativesSection | None |
|
|
|
|
|
def _split_card_lines(lines: Iterable[str]) -> tuple[tuple[str, ...], tuple[str, ...]]: |
|
"""Split card text into name and address groups using a postal-code boundary.""" |
|
name_lines: list[str] = [] |
|
address_lines: list[str] = [] |
|
postal_seen = False |
|
postal_re = re.compile(r"\b\d{5}\b") |
|
|
|
for line in lines: |
|
cleaned = line.strip() |
|
if not cleaned: |
|
continue |
|
if postal_re.search(cleaned): |
|
postal_seen = True |
|
if postal_seen: |
|
address_lines.append(cleaned) |
|
else: |
|
name_lines.append(cleaned) |
|
|
|
return tuple(name_lines), tuple(address_lines) |
|
|
|
|
|
def _normalize_whitespace(value: str) -> str: |
|
return " ".join(value.split()).strip() |
|
|
|
|
|
def _split_address_line(value: str) -> tuple[str, ...]: |
|
match = re.search(r"\b\d{5}\s+\S.*", value) |
|
if not match: |
|
return (value.strip(),) |
|
return (value[: match.start()].strip(), value[match.start() :].strip()) |
|
|
|
|
|
def _extract_labeled_value( |
|
text: str, |
|
label: str, |
|
) -> str | None: |
|
pattern = re.compile(rf"^{re.escape(label)}\s*:\s*(.+)$", re.IGNORECASE) |
|
match = pattern.match(_normalize_whitespace(text)) |
|
if not match: |
|
return None |
|
return match.group(1).strip() |
|
|
|
|
|
def _lead_details_from_payload( |
|
details_url: str, payload: Mapping[str, Any] |
|
) -> LeadDetails: |
|
summary_paragraphs = list(payload.get("summary_paragraphs", [])) |
|
identity_lines: tuple[str, ...] = tuple() |
|
legal_form: str | None = None |
|
address_lines: tuple[str, ...] = tuple() |
|
phone: str | None = None |
|
mobile: str | None = None |
|
fax: str | None = None |
|
email: str | None = None |
|
website: str | None = None |
|
safe_id: str | None = None |
|
|
|
collecting_names = True |
|
for paragraph in summary_paragraphs: |
|
normalized = _normalize_whitespace(paragraph) |
|
if ( |
|
collecting_names |
|
and ":" not in normalized |
|
and not re.search(r"\b\d{5}\b", normalized) |
|
): |
|
identity_lines += tuple( |
|
part for part in paragraph.splitlines() if part.strip() |
|
) |
|
continue |
|
|
|
legal_value = _extract_labeled_value(paragraph, "Rechtsform") |
|
if legal_value: |
|
legal_form = legal_value |
|
collecting_names = False |
|
continue |
|
|
|
phone_value = _extract_labeled_value(paragraph, "Telefon") |
|
if phone_value: |
|
phone = phone_value |
|
collecting_names = False |
|
continue |
|
|
|
mobile_value = _extract_labeled_value(paragraph, "Mobil") |
|
if mobile_value: |
|
mobile = mobile_value |
|
collecting_names = False |
|
continue |
|
|
|
fax_value = _extract_labeled_value(paragraph, "Telefax") |
|
if fax_value: |
|
fax = fax_value |
|
collecting_names = False |
|
continue |
|
|
|
email_value = _extract_labeled_value(paragraph, "E-Mail") |
|
if email_value: |
|
email = email_value |
|
collecting_names = False |
|
continue |
|
|
|
website_value = _extract_labeled_value(paragraph, "Internet") |
|
if website_value: |
|
website = website_value |
|
collecting_names = False |
|
continue |
|
|
|
safe_id_value = _extract_labeled_value(paragraph, "Safe ID") |
|
if safe_id_value: |
|
safe_id = safe_id_value |
|
collecting_names = False |
|
continue |
|
|
|
if re.search(r"\b\d{5}\b", normalized) and not address_lines: |
|
address_lines = _split_address_line(normalized) |
|
collecting_names = False |
|
|
|
notes_entries: list[NoteEntry] = [] |
|
for line in payload.get("notes_lines", []): |
|
normalized = _normalize_whitespace(line) |
|
if ":" in normalized: |
|
label, value = normalized.split(":", 1) |
|
notes_entries.append(NoteEntry(label=label.strip(), value=value.strip())) |
|
|
|
notes_section = NotesSection(tuple(notes_entries)) if notes_entries else None |
|
|
|
chamber_lines = [_normalize_whitespace(l) for l in payload.get("chamber_lines", [])] |
|
chamber_section: ChamberSection | None = None |
|
additional_offices: AdditionalOfficesSection | None = None |
|
shareholders: ShareholdersSection | None = None |
|
register_section: RegisterSection | None = None |
|
if chamber_lines: |
|
chamber_name = chamber_lines[0] |
|
remaining = list(chamber_lines[1:]) |
|
chamber_address: list[str] = [] |
|
offices: list[str] = [] |
|
shareholders_entries: list[str] = [] |
|
register_entries: list[str] = [] |
|
mode = "address" |
|
for line in remaining: |
|
if line == "Weitere Beratungsstelle(n)": |
|
mode = "offices" |
|
continue |
|
if line == "Gesellschafter": |
|
mode = "shareholders" |
|
continue |
|
if line.startswith("Register"): |
|
mode = "register" |
|
register_entries.append(line) |
|
continue |
|
if line == "Alle ausblenden": |
|
continue |
|
if mode == "address": |
|
chamber_address.append(line) |
|
elif mode == "offices": |
|
offices.append(line) |
|
elif mode == "shareholders": |
|
shareholders_entries.append(line) |
|
elif mode == "register": |
|
register_entries.append(line) |
|
|
|
chamber_section = ChamberSection( |
|
name=chamber_name, address_lines=tuple(chamber_address) |
|
) |
|
if offices: |
|
additional_offices = AdditionalOfficesSection(tuple(offices)) |
|
if shareholders_entries: |
|
shareholders = ShareholdersSection(tuple(shareholders_entries)) |
|
if register_entries: |
|
register_section = RegisterSection(tuple(register_entries)) |
|
|
|
representative_groups: list[RepresentativeGroup] = [] |
|
for group in payload.get("representatives", []): |
|
title = _normalize_whitespace(group.get("title", "")) |
|
members = tuple( |
|
_normalize_whitespace(member) |
|
for member in group.get("members", []) |
|
if _normalize_whitespace(member) |
|
) |
|
if title or members: |
|
representative_groups.append( |
|
RepresentativeGroup(title=title, members=members) |
|
) |
|
|
|
representatives_section = ( |
|
RepresentativesSection(tuple(representative_groups)) |
|
if representative_groups |
|
else None |
|
) |
|
|
|
contacts = ContactsSection( |
|
phone=phone, |
|
mobile=mobile, |
|
fax=fax, |
|
email=email, |
|
website=website, |
|
) |
|
summary = SummarySection( |
|
identity_lines=identity_lines, |
|
legal_form=legal_form, |
|
address_lines=address_lines, |
|
contacts=contacts, |
|
safe_id=safe_id, |
|
) |
|
return LeadDetails( |
|
details_url=details_url, |
|
summary=summary, |
|
notes=notes_section, |
|
chamber=chamber_section, |
|
additional_offices=additional_offices, |
|
shareholders=shareholders, |
|
register=register_section, |
|
representatives=representatives_section, |
|
) |
|
|
|
|
|
async def _extract_detail_payload(page: Any) -> dict[str, Any]: |
|
"""Return structured text blocks for the detail page so Python can map them.""" |
|
return await page.evaluate( |
|
""" |
|
() => { |
|
const heading = (title) => |
|
Array.from(document.querySelectorAll('h2')).find( |
|
(h) => h.textContent && h.textContent.trim() === title |
|
); |
|
|
|
const collectSectionLines = (title) => { |
|
const h2 = heading(title); |
|
if (!h2) return []; |
|
const lines = []; |
|
let el = h2.nextElementSibling; |
|
while (el && el.tagName !== 'H2') { |
|
if (el.innerText) { |
|
const split = el.innerText |
|
.split('\\n') |
|
.map((line) => line.trim()) |
|
.filter(Boolean); |
|
lines.push(...split); |
|
} |
|
el = el.nextElementSibling; |
|
} |
|
return lines; |
|
}; |
|
|
|
const summaryParagraphs = () => { |
|
const firstH2 = document.querySelector('h2'); |
|
if (!firstH2) { |
|
return Array.from(document.querySelectorAll('p')) |
|
.map((p) => p.innerText) |
|
.filter(Boolean); |
|
} |
|
return Array.from(document.querySelectorAll('p')) |
|
.filter((p) => { |
|
return ( |
|
p.compareDocumentPosition(firstH2) & |
|
Node.DOCUMENT_POSITION_FOLLOWING |
|
); |
|
}) |
|
.map((p) => p.innerText) |
|
.filter(Boolean); |
|
}; |
|
|
|
const representatives = () => { |
|
const h2 = heading('Vertreter'); |
|
const container = h2 ? h2.nextElementSibling : null; |
|
if (!container) return []; |
|
const groups = []; |
|
let currentTitle = ''; |
|
for (const child of Array.from(container.children)) { |
|
if (child.tagName === 'SPAN') { |
|
currentTitle = child.textContent ? child.textContent.trim() : ''; |
|
continue; |
|
} |
|
if (child.tagName === 'UL') { |
|
const members = Array.from(child.querySelectorAll('li')) |
|
.map((li) => (li.textContent || '').trim()) |
|
.filter(Boolean); |
|
if (currentTitle || members.length) { |
|
groups.push({ title: currentTitle, members }); |
|
} |
|
} |
|
} |
|
return groups; |
|
}; |
|
|
|
return { |
|
summary_paragraphs: summaryParagraphs(), |
|
notes_lines: collectSectionLines('Hinweise'), |
|
chamber_lines: collectSectionLines('Zuständige Steuerberaterkammer'), |
|
representatives: representatives(), |
|
}; |
|
} |
|
""" |
|
) |
|
|
|
|
|
async def search_zip(zip_code: str) -> list[Lead]: |
|
"""Search the registry by ZIP code and return the visible result cards.""" |
|
async with async_playwright() as playwright: |
|
browser = await playwright.chromium.launch(headless=True) |
|
page = await browser.new_page() |
|
LOGGER.info("Navigating to search page.") |
|
await page.goto(SEARCH_URL, wait_until="domcontentloaded") |
|
LOGGER.info("Filling ZIP code %s.", zip_code) |
|
await page.fill("#plz-text", zip_code) |
|
LOGGER.info("Submitting search.") |
|
await page.click("input.verzeichnis-btn.my-3") |
|
LOGGER.info("Waiting for results header.") |
|
await page.wait_for_selector("text=Treffer:") |
|
LOGGER.info("Waiting for results to settle.") |
|
await page.wait_for_timeout(500) |
|
|
|
cards = ( |
|
page.locator(DETAILS_LINK_SELECTOR) |
|
.filter(has=page.locator("img[alt='next']")) |
|
) |
|
count = await cards.count() |
|
LOGGER.info("Found %s result cards.", count) |
|
leads: list[Lead] = [] |
|
for idx in range(count): |
|
card = cards.nth(idx) |
|
details_url = await card.get_attribute("href") or "" |
|
if details_url.startswith("details/"): |
|
details_url = f"https://steuerberaterverzeichnis.berufs-org.de/{details_url}" |
|
text = await card.inner_text() |
|
name_lines, address_lines = _split_card_lines(text.splitlines()) |
|
leads.append( |
|
Lead( |
|
name_lines=name_lines, |
|
address_lines=address_lines, |
|
details_url=details_url, |
|
) |
|
) |
|
|
|
await browser.close() |
|
return leads |
|
|
|
|
|
async def fetch_details(zip_code: str) -> list[LeadDetails]: |
|
"""Search by ZIP code, follow each result detail page, and parse sections.""" |
|
async with async_playwright() as playwright: |
|
browser = await playwright.chromium.launch(headless=True) |
|
page = await browser.new_page() |
|
LOGGER.info("Navigating to search page.") |
|
await page.goto(SEARCH_URL, wait_until="domcontentloaded") |
|
LOGGER.info("Filling ZIP code %s.", zip_code) |
|
await page.fill("#plz-text", zip_code) |
|
LOGGER.info("Submitting search.") |
|
await page.click("input.verzeichnis-btn.my-3") |
|
LOGGER.info("Waiting for results header.") |
|
await page.wait_for_selector("text=Treffer:") |
|
LOGGER.info("Waiting for results to settle.") |
|
await page.wait_for_timeout(500) |
|
|
|
cards = ( |
|
page.locator(DETAILS_LINK_SELECTOR) |
|
.filter(has=page.locator("img[alt='next']")) |
|
) |
|
count = await cards.count() |
|
LOGGER.info("Found %s result cards.", count) |
|
detail_urls: list[str] = [] |
|
for idx in range(count): |
|
card = cards.nth(idx) |
|
details_url = await card.get_attribute("href") or "" |
|
if details_url.startswith("details/"): |
|
details_url = f"https://steuerberaterverzeichnis.berufs-org.de/{details_url}" |
|
if details_url: |
|
detail_urls.append(details_url) |
|
|
|
results: list[LeadDetails] = [] |
|
for idx, details_url in enumerate(detail_urls, start=1): |
|
LOGGER.info("Fetching details %s/%s.", idx, len(detail_urls)) |
|
detail_page = await browser.new_page() |
|
payload: dict[str, Any] = {} |
|
for _attempt in range(2): |
|
LOGGER.info("Navigating to detail page.") |
|
response = await detail_page.goto( |
|
details_url, wait_until="domcontentloaded" |
|
) |
|
if response is not None and response.status >= 400: |
|
LOGGER.warning( |
|
"Detail page returned %s, retrying.", response.status |
|
) |
|
LOGGER.info("Waiting before retry.") |
|
await detail_page.wait_for_timeout(1000) |
|
continue |
|
try: |
|
LOGGER.info("Waiting for Safe ID marker.") |
|
await detail_page.wait_for_selector("text=Safe ID", timeout=5000) |
|
except Exception: |
|
LOGGER.info("Safe ID not found, waiting for section header.") |
|
try: |
|
await detail_page.wait_for_selector("h2", timeout=3000) |
|
except Exception: |
|
pass |
|
LOGGER.info("Waiting for detail content to settle.") |
|
await detail_page.wait_for_timeout(500) |
|
LOGGER.info("Extracting detail payload.") |
|
payload = await _extract_detail_payload(detail_page) |
|
if payload.get("summary_paragraphs"): |
|
break |
|
LOGGER.info("Detail payload empty, retrying.") |
|
await detail_page.wait_for_timeout(1000) |
|
await detail_page.close() |
|
results.append(_lead_details_from_payload(details_url, payload)) |
|
LOGGER.info("Waiting between detail requests.") |
|
await page.wait_for_timeout(1000) |
|
|
|
await browser.close() |
|
return results |
|
|
|
|
|
async def _main() -> None: |
|
parser = argparse.ArgumentParser( |
|
description=( |
|
"Search the Steuerberaterverzeichnis by ZIP code using Playwright " |
|
"and print the result cards." |
|
) |
|
) |
|
parser.add_argument("--zip", dest="zip_code", required=True) |
|
parser.add_argument( |
|
"--details", |
|
action="store_true", |
|
help="Follow each detail page and print structured section data as JSON.", |
|
) |
|
args = parser.parse_args() |
|
|
|
if args.details: |
|
details = await fetch_details(args.zip_code) |
|
for item in details: |
|
print(json.dumps(asdict(item), ensure_ascii=True)) |
|
else: |
|
leads = await search_zip(args.zip_code) |
|
for lead in leads: |
|
print(lead.format_line()) |
|
|
|
|
|
def main() -> None: |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s %(levelname)s %(message)s", |
|
) |
|
asyncio.run(_main()) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |