Files
Jonathan 486749a890 Initial project scaffold
Full-stack Dutch supermarket price tracker with FastAPI backend,
PostgreSQL/SQLAlchemy, Albert Heijn scraper, and Next.js frontend.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 22:27:24 +02:00

139 lines
4.5 KiB
Python

from datetime import datetime
import httpx
from sqlalchemy import and_, select
from sqlalchemy.orm import Session
from ..models import PriceSnapshot, Product, ScrapeRun, Store
AH_AUTH_URL = "https://api.ah.nl/mobile-auth/v1/auth/token/anonymous"
AH_SEARCH_URL = "https://api.ah.nl/mobile-services/product/search/v2"
AH_BASE_URL = "https://www.ah.nl"
AH_CLIENT_ID = "appie"
def _get_token(client: httpx.Client) -> str:
resp = client.post(AH_AUTH_URL, json={"clientId": AH_CLIENT_ID})
resp.raise_for_status()
return resp.json()["access_token"]
def _search(client: httpx.Client, token: str, query: str, page: int = 0, size: int = 30) -> dict:
resp = client.get(
AH_SEARCH_URL,
params={"query": query, "page": page, "size": size},
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
return resp.json()
def _to_cents(value: float | int | None) -> int | None:
if value is None:
return None
return round(float(value) * 100)
def _upsert_store(db: Session) -> Store:
store = db.scalar(select(Store).where(Store.slug == "albert-heijn"))
if not store:
store = Store(
name="Albert Heijn",
slug="albert-heijn",
country="NL",
website="https://www.ah.nl",
)
db.add(store)
db.commit()
db.refresh(store)
return store
def scrape_query(db: Session, query: str) -> ScrapeRun:
store = _upsert_store(db)
run = ScrapeRun(store_id=store.id, query=query, started_at=datetime.utcnow())
db.add(run)
db.commit()
db.refresh(run)
try:
with httpx.Client(timeout=30.0) as client:
token = _get_token(client)
data = _search(client, token, query)
count = 0
now = datetime.utcnow()
for card in data.get("cards", []):
for raw in card.get("products", []):
external_id = str(raw.get("webshopId", "")).strip()
if not external_id:
continue
product = db.scalar(
select(Product).where(
and_(
Product.store_id == store.id,
Product.external_id == external_id,
)
)
)
if not product:
product = Product(store_id=store.id, external_id=external_id)
db.add(product)
link = raw.get("link", "") or ""
product.name = raw.get("title", "") or ""
product.brand = raw.get("brand") or None
product.category = raw.get("category") or None
product.ean = raw.get("eanCode") or None
product.url = f"{AH_BASE_URL}{link}" if link else None
product.updated_at = now
price_info = raw.get("price") or {}
price_cents = _to_cents(price_info.get("now"))
was_cents = _to_cents(price_info.get("was"))
unit_info = price_info.get("unitInfo") or {}
unit_price_cents = _to_cents(unit_info.get("price"))
unit_description = unit_info.get("description") or None
discount = raw.get("discount") or {}
discount_label = discount.get("label") or None
discount_description = discount.get("description") or None
db.flush() # get product.id if newly created
if price_cents is not None:
snapshot = PriceSnapshot(
product_id=product.id,
scrape_run_id=run.id,
price=price_cents,
unit_price=unit_price_cents,
unit_description=unit_description,
was_price=was_cents,
is_on_sale=was_cents is not None or discount_label is not None,
discount_label=discount_label,
discount_description=discount_description,
timestamp=now,
)
db.add(snapshot)
count += 1
db.commit()
run.status = "success"
run.products_found = count
run.finished_at = datetime.utcnow()
db.commit()
except Exception as exc:
db.rollback()
run.status = "failed"
run.error_message = str(exc)[:900]
run.finished_at = datetime.utcnow()
db.commit()
raise
return run