Skip to content

Instantly share code, notes, and snippets.

@kubrick06010
Last active March 10, 2026 12:47
Show Gist options
  • Select an option

  • Save kubrick06010/6574985f278914027a77ac592e1b6d79 to your computer and use it in GitHub Desktop.

Select an option

Save kubrick06010/6574985f278914027a77ac592e1b6d79 to your computer and use it in GitHub Desktop.
Scrapping de precios para productos del DIA

DIA scraper - Playwright branch

This branch (playwright-implementation) contains the DIA product scraper implemented with Playwright in headless mode.

What this branch does

  • Queries DIA search (https://www.dia.es/search?q=...)
  • Handles infinite scrolling in the search results container
  • Extracts product name, unit price, unit-of-measure price, and derived quantity
  • Prints semicolon-separated output to stdout

Requirements

  • Python 3.9+
  • Python packages:
    • beautifulsoup4
    • lxml
    • playwright
  • Browser runtime for Playwright:
    • python3 -m playwright install chromium
  • A local Chrome install is recommended (this script first tries Playwright with channel="chrome" to reduce anti-bot blocking).

Install example:

python3 -m pip install beautifulsoup4 lxml playwright
python3 -m playwright install chromium

Usage

python3 productos_dia.py leche
python3 productos_dia.py "leche sin lactosa"

Output columns:

  • Producto
  • Precio unitario
  • Moneda
  • Precio UME
  • Moneda
  • Cantidad
  • UME

Typical usage scenarios

Use this branch when you need:

  • Better reliability on modern, dynamic pages (SPA/infinite scroll)
  • A scraper that must load all visible search results by scrolling
  • Easier future extension to network/event-based waits

Notes

  • DIA can change frontend selectors and anti-bot behavior over time.
  • If blocked, retry later or from a different environment/network profile.
  • For Selenium-based implementation, check branch selenium.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Script para scrapear productos de DIA con Playwright en headless e infinite scroll.
import locale
import re
import sys
import time
from urllib.parse import quote_plus
from bs4 import BeautifulSoup
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0.0.0 Safari/537.36"
)
def _set_es_locale():
for candidate in ("es_ES.UTF-8", "es_ES.utf8", "es_ES", "C.UTF-8"):
try:
locale.setlocale(locale.LC_ALL, candidate)
return
except locale.Error:
continue
locale.setlocale(locale.LC_ALL, "")
def _to_number(raw):
if raw is None:
return None
normalized = raw.replace("\xa0", "").replace(",", ".").strip()
try:
return locale.atof(normalized)
except (ValueError, locale.Error):
try:
return float(normalized)
except ValueError:
return None
def _extract_products(html):
soup = BeautifulSoup(html, "lxml")
rows = []
for card in soup.select(".search-product-card"):
name_elem = card.select_one(".search-product-card__product-name")
price_elem = card.select_one(".search-product-card__active-price")
ume_elem = card.select_one(".search-product-card__price-per-unit")
if not name_elem:
continue
producto_str = name_elem.get_text(" ", strip=True)
price_text = price_elem.get_text(" ", strip=True) if price_elem else ""
ume_text = ume_elem.get_text(" ", strip=True) if ume_elem else ""
unit_match = re.search(r"([\d.,]+)\s*€?", price_text)
ume_match = re.search(r"([\d.,]+)\s*€?\s*/\s*([A-ZÁÉÍÓÚÜÑ]+)", ume_text.upper()) if ume_text else None
precio_unitario_str = unit_match.group(1) if unit_match else ""
precio_ume_str = ume_match.group(1) if ume_match else ""
ume = ume_match.group(2) if ume_match else ""
moneda = "€" if unit_match or ume_match else ""
precio_unitario = _to_number(precio_unitario_str)
precio_ume_num = _to_number(precio_ume_str)
cantidad = ""
if precio_unitario and precio_ume_num:
cantidad = str(round(precio_unitario / precio_ume_num, 2))
rows.append(
(
producto_str,
precio_unitario_str,
moneda,
precio_ume_str,
moneda,
cantidad,
ume,
)
)
return rows
def _extract_declared_results_count(text):
match = re.search(r"Resultados\s+para\s+.+?\((\d+)\)", text, flags=re.IGNORECASE)
if match:
return int(match.group(1))
return None
def _is_access_denied(page):
body_text = (page.inner_text("body") or "").lower()
title = (page.title() or "").lower()
return (
"access denied" in title
or "access denied" in body_text
or "don't have permission" in body_text
or "you don't have permission" in body_text
)
def _scroll_to_load_all(page):
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
try:
page.wait_for_selector(".search-component__content", timeout=15000)
use_container = True
except PlaywrightTimeoutError:
use_container = False
stable_rounds = 0
last_count = -1
for _ in range(120):
cards_count = page.locator(".search-product-card").count()
if cards_count == last_count:
stable_rounds += 1
else:
stable_rounds = 0
last_count = cards_count
if use_container:
page.eval_on_selector(
".search-component__content",
"el => { el.scrollTop = el.scrollHeight; }",
)
else:
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
page.wait_for_timeout(1200)
if stable_rounds >= 8:
break
def scrape_all_pages(search_text):
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
encoded = quote_plus(search_text)
url = f"https://www.dia.es/search?q={encoded}"
all_rows = []
seen_rows = set()
declared_total = None
with sync_playwright() as p:
launch_args = ["--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage"]
try:
browser = p.chromium.launch(headless=True, channel="chrome", args=launch_args)
except Exception:
# Fallback al binario Chromium de Playwright (puede ser bloqueado por DIA).
browser = p.chromium.launch(headless=True, args=launch_args)
context = browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1920, "height": 1080},
locale="es-ES",
)
# Reduce obvious automation fingerprint.
context.add_init_script('Object.defineProperty(navigator, "webdriver", {get: () => undefined})')
page = context.new_page()
try:
page.goto(url, wait_until="domcontentloaded", timeout=60000)
page.wait_for_selector("body", timeout=30000)
if _is_access_denied(page):
raise RuntimeError("DIA ha bloqueado el acceso automatizado (Access Denied).")
try:
page.wait_for_selector(".search-product-card", timeout=20000)
except PlaywrightTimeoutError:
pass
_scroll_to_load_all(page)
html = page.content()
if declared_total is None:
declared_total = _extract_declared_results_count(page.inner_text("body"))
page_rows = _extract_products(html)
for row in page_rows:
if row not in seen_rows:
seen_rows.add(row)
all_rows.append(row)
finally:
context.close()
browser.close()
if declared_total is not None and len(all_rows) < declared_total:
print(
f"AVISO: DIA indica {declared_total} resultados, pero solo se pudieron extraer {len(all_rows)}.",
file=sys.stderr,
)
return all_rows
def main():
if len(sys.argv) < 2:
print("Debe indicar un producto")
sys.exit(1)
_set_es_locale()
query = " ".join(sys.argv[1:]).strip()
try:
rows = scrape_all_pages(query)
except RuntimeError as exc:
print(str(exc), file=sys.stderr)
sys.exit(2)
except ModuleNotFoundError:
print(
"Falta dependencia: instala Playwright con 'pip install playwright' y luego 'playwright install chromium'.",
file=sys.stderr,
)
sys.exit(3)
print(
"Producto",
";",
"Precio unitario",
";",
"Moneda",
";",
"Precio UME",
";",
"Moneda",
";",
"Cantidad",
";",
"UME",
)
sep = ";"
for row in rows:
print(row[0], sep, row[1], sep, row[2], sep, row[3], sep, row[4], sep, row[5], sep, row[6])
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment