|
#!/usr/bin/python3 |
|
# -*- coding: utf-8 -*- |
|
# Script para scrapear productos de DIA con Playwright en headless e infinite scroll. |
|
|
|
import locale |
|
import re |
|
import sys |
|
import time |
|
from urllib.parse import quote_plus |
|
|
|
from bs4 import BeautifulSoup |
|
|
|
USER_AGENT = ( |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " |
|
"AppleWebKit/537.36 (KHTML, like Gecko) " |
|
"Chrome/123.0.0.0 Safari/537.36" |
|
) |
|
|
|
|
|
def _set_es_locale(): |
|
for candidate in ("es_ES.UTF-8", "es_ES.utf8", "es_ES", "C.UTF-8"): |
|
try: |
|
locale.setlocale(locale.LC_ALL, candidate) |
|
return |
|
except locale.Error: |
|
continue |
|
locale.setlocale(locale.LC_ALL, "") |
|
|
|
|
|
def _to_number(raw): |
|
if raw is None: |
|
return None |
|
normalized = raw.replace("\xa0", "").replace(",", ".").strip() |
|
try: |
|
return locale.atof(normalized) |
|
except (ValueError, locale.Error): |
|
try: |
|
return float(normalized) |
|
except ValueError: |
|
return None |
|
|
|
|
|
def _extract_products(html): |
|
soup = BeautifulSoup(html, "lxml") |
|
rows = [] |
|
|
|
for card in soup.select(".search-product-card"): |
|
name_elem = card.select_one(".search-product-card__product-name") |
|
price_elem = card.select_one(".search-product-card__active-price") |
|
ume_elem = card.select_one(".search-product-card__price-per-unit") |
|
if not name_elem: |
|
continue |
|
|
|
producto_str = name_elem.get_text(" ", strip=True) |
|
price_text = price_elem.get_text(" ", strip=True) if price_elem else "" |
|
ume_text = ume_elem.get_text(" ", strip=True) if ume_elem else "" |
|
|
|
unit_match = re.search(r"([\d.,]+)\s*€?", price_text) |
|
ume_match = re.search(r"([\d.,]+)\s*€?\s*/\s*([A-ZÁÉÍÓÚÜÑ]+)", ume_text.upper()) if ume_text else None |
|
|
|
precio_unitario_str = unit_match.group(1) if unit_match else "" |
|
precio_ume_str = ume_match.group(1) if ume_match else "" |
|
ume = ume_match.group(2) if ume_match else "" |
|
moneda = "€" if unit_match or ume_match else "" |
|
|
|
precio_unitario = _to_number(precio_unitario_str) |
|
precio_ume_num = _to_number(precio_ume_str) |
|
cantidad = "" |
|
if precio_unitario and precio_ume_num: |
|
cantidad = str(round(precio_unitario / precio_ume_num, 2)) |
|
|
|
rows.append( |
|
( |
|
producto_str, |
|
precio_unitario_str, |
|
moneda, |
|
precio_ume_str, |
|
moneda, |
|
cantidad, |
|
ume, |
|
) |
|
) |
|
|
|
return rows |
|
|
|
|
|
def _extract_declared_results_count(text): |
|
match = re.search(r"Resultados\s+para\s+.+?\((\d+)\)", text, flags=re.IGNORECASE) |
|
if match: |
|
return int(match.group(1)) |
|
return None |
|
|
|
|
|
def _is_access_denied(page): |
|
body_text = (page.inner_text("body") or "").lower() |
|
title = (page.title() or "").lower() |
|
return ( |
|
"access denied" in title |
|
or "access denied" in body_text |
|
or "don't have permission" in body_text |
|
or "you don't have permission" in body_text |
|
) |
|
|
|
|
|
def _scroll_to_load_all(page): |
|
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError |
|
|
|
try: |
|
page.wait_for_selector(".search-component__content", timeout=15000) |
|
use_container = True |
|
except PlaywrightTimeoutError: |
|
use_container = False |
|
|
|
stable_rounds = 0 |
|
last_count = -1 |
|
|
|
for _ in range(120): |
|
cards_count = page.locator(".search-product-card").count() |
|
if cards_count == last_count: |
|
stable_rounds += 1 |
|
else: |
|
stable_rounds = 0 |
|
last_count = cards_count |
|
|
|
if use_container: |
|
page.eval_on_selector( |
|
".search-component__content", |
|
"el => { el.scrollTop = el.scrollHeight; }", |
|
) |
|
else: |
|
page.evaluate("window.scrollTo(0, document.body.scrollHeight)") |
|
|
|
page.wait_for_timeout(1200) |
|
if stable_rounds >= 8: |
|
break |
|
|
|
|
|
def scrape_all_pages(search_text): |
|
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError |
|
from playwright.sync_api import sync_playwright |
|
|
|
encoded = quote_plus(search_text) |
|
url = f"https://www.dia.es/search?q={encoded}" |
|
|
|
all_rows = [] |
|
seen_rows = set() |
|
declared_total = None |
|
|
|
with sync_playwright() as p: |
|
launch_args = ["--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage"] |
|
try: |
|
browser = p.chromium.launch(headless=True, channel="chrome", args=launch_args) |
|
except Exception: |
|
# Fallback al binario Chromium de Playwright (puede ser bloqueado por DIA). |
|
browser = p.chromium.launch(headless=True, args=launch_args) |
|
context = browser.new_context( |
|
user_agent=USER_AGENT, |
|
viewport={"width": 1920, "height": 1080}, |
|
locale="es-ES", |
|
) |
|
# Reduce obvious automation fingerprint. |
|
context.add_init_script('Object.defineProperty(navigator, "webdriver", {get: () => undefined})') |
|
page = context.new_page() |
|
|
|
try: |
|
page.goto(url, wait_until="domcontentloaded", timeout=60000) |
|
page.wait_for_selector("body", timeout=30000) |
|
|
|
if _is_access_denied(page): |
|
raise RuntimeError("DIA ha bloqueado el acceso automatizado (Access Denied).") |
|
|
|
try: |
|
page.wait_for_selector(".search-product-card", timeout=20000) |
|
except PlaywrightTimeoutError: |
|
pass |
|
|
|
_scroll_to_load_all(page) |
|
html = page.content() |
|
if declared_total is None: |
|
declared_total = _extract_declared_results_count(page.inner_text("body")) |
|
|
|
page_rows = _extract_products(html) |
|
for row in page_rows: |
|
if row not in seen_rows: |
|
seen_rows.add(row) |
|
all_rows.append(row) |
|
finally: |
|
context.close() |
|
browser.close() |
|
|
|
if declared_total is not None and len(all_rows) < declared_total: |
|
print( |
|
f"AVISO: DIA indica {declared_total} resultados, pero solo se pudieron extraer {len(all_rows)}.", |
|
file=sys.stderr, |
|
) |
|
|
|
return all_rows |
|
|
|
|
|
def main(): |
|
if len(sys.argv) < 2: |
|
print("Debe indicar un producto") |
|
sys.exit(1) |
|
|
|
_set_es_locale() |
|
query = " ".join(sys.argv[1:]).strip() |
|
|
|
try: |
|
rows = scrape_all_pages(query) |
|
except RuntimeError as exc: |
|
print(str(exc), file=sys.stderr) |
|
sys.exit(2) |
|
except ModuleNotFoundError: |
|
print( |
|
"Falta dependencia: instala Playwright con 'pip install playwright' y luego 'playwright install chromium'.", |
|
file=sys.stderr, |
|
) |
|
sys.exit(3) |
|
|
|
print( |
|
"Producto", |
|
";", |
|
"Precio unitario", |
|
";", |
|
"Moneda", |
|
";", |
|
"Precio UME", |
|
";", |
|
"Moneda", |
|
";", |
|
"Cantidad", |
|
";", |
|
"UME", |
|
) |
|
|
|
sep = ";" |
|
for row in rows: |
|
print(row[0], sep, row[1], sep, row[2], sep, row[3], sep, row[4], sep, row[5], sep, row[6]) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |