Initial project scaffold

Full-stack Dutch supermarket price tracker with FastAPI backend,
PostgreSQL/SQLAlchemy, Albert Heijn scraper, and Next.js frontend.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-04 22:27:24 +02:00
commit 486749a890
40 changed files with 1596 additions and 0 deletions
View File
+11
View File
@@ -0,0 +1,11 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str = "postgresql://postgres:postgres@db:5432/food_prices"
ah_client_id: str = "appie"
model_config = {"env_file": ".env"}
settings = Settings()
+19
View File
@@ -0,0 +1,19 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from .config import settings
engine = create_engine(settings.database_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
+26
View File
@@ -0,0 +1,26 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .database import Base, engine
from .routers import prices, products, scrape_runs, stores
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Dutch Food Price Tracker", version="0.1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(products.router)
app.include_router(stores.router)
app.include_router(prices.router)
app.include_router(scrape_runs.router)
@app.get("/")
def root():
return {"status": "ok", "service": "dutch-food-price-tracker"}
+83
View File
@@ -0,0 +1,83 @@
from datetime import datetime
import sqlalchemy as sa
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .database import Base
class Store(Base):
__tablename__ = "stores"
id: Mapped[int] = mapped_column(sa.Integer, primary_key=True)
name: Mapped[str] = mapped_column(sa.String(100), nullable=False)
slug: Mapped[str] = mapped_column(sa.String(50), unique=True, nullable=False)
country: Mapped[str] = mapped_column(sa.String(2), default="NL")
website: Mapped[str | None] = mapped_column(sa.String(255))
products: Mapped[list["Product"]] = relationship(back_populates="store")
scrape_runs: Mapped[list["ScrapeRun"]] = relationship(back_populates="store")
class Product(Base):
__tablename__ = "products"
__table_args__ = (
sa.UniqueConstraint("store_id", "external_id", name="uq_products_store_external"),
)
id: Mapped[int] = mapped_column(sa.Integer, primary_key=True)
store_id: Mapped[int] = mapped_column(sa.Integer, sa.ForeignKey("stores.id"), nullable=False)
external_id: Mapped[str] = mapped_column(sa.String(50), nullable=False)
ean: Mapped[str | None] = mapped_column(sa.String(20), index=True)
name: Mapped[str] = mapped_column(sa.String(255), nullable=False)
brand: Mapped[str | None] = mapped_column(sa.String(100))
category: Mapped[str | None] = mapped_column(sa.String(100))
unit_size: Mapped[str | None] = mapped_column(sa.String(50))
url: Mapped[str | None] = mapped_column(sa.String(500))
created_at: Mapped[datetime] = mapped_column(sa.DateTime, default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(
sa.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
store: Mapped["Store"] = relationship(back_populates="products")
price_snapshots: Mapped[list["PriceSnapshot"]] = relationship(back_populates="product")
class ScrapeRun(Base):
__tablename__ = "scrape_runs"
id: Mapped[int] = mapped_column(sa.Integer, primary_key=True)
store_id: Mapped[int] = mapped_column(sa.Integer, sa.ForeignKey("stores.id"), nullable=False)
query: Mapped[str] = mapped_column(sa.String(255), nullable=False)
started_at: Mapped[datetime] = mapped_column(sa.DateTime, default=datetime.utcnow)
finished_at: Mapped[datetime | None] = mapped_column(sa.DateTime)
status: Mapped[str] = mapped_column(sa.String(20), default="running")
products_found: Mapped[int] = mapped_column(sa.Integer, default=0)
error_message: Mapped[str | None] = mapped_column(sa.String(1000))
store: Mapped["Store"] = relationship(back_populates="scrape_runs")
price_snapshots: Mapped[list["PriceSnapshot"]] = relationship(back_populates="scrape_run")
class PriceSnapshot(Base):
__tablename__ = "price_snapshots"
id: Mapped[int] = mapped_column(sa.Integer, primary_key=True)
product_id: Mapped[int] = mapped_column(
sa.Integer, sa.ForeignKey("products.id"), nullable=False
)
scrape_run_id: Mapped[int] = mapped_column(
sa.Integer, sa.ForeignKey("scrape_runs.id"), nullable=False
)
price: Mapped[int] = mapped_column(sa.Integer, nullable=False) # euro cents
unit_price: Mapped[int | None] = mapped_column(sa.Integer) # euro cents
unit_description: Mapped[str | None] = mapped_column(sa.String(50))
currency: Mapped[str] = mapped_column(sa.String(3), default="EUR")
discount_label: Mapped[str | None] = mapped_column(sa.String(100))
discount_description: Mapped[str | None] = mapped_column(sa.String(255))
was_price: Mapped[int | None] = mapped_column(sa.Integer) # euro cents, original price
is_on_sale: Mapped[bool] = mapped_column(sa.Boolean, default=False)
timestamp: Mapped[datetime] = mapped_column(sa.DateTime, default=datetime.utcnow, index=True)
product: Mapped["Product"] = relationship(back_populates="price_snapshots")
scrape_run: Mapped["ScrapeRun"] = relationship(back_populates="price_snapshots")
View File
+57
View File
@@ -0,0 +1,57 @@
from datetime import date, datetime
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.orm import Session, selectinload
from ..database import get_db
from ..models import PriceSnapshot, Product
from ..schemas import CheapestProduct, Product as ProductSchema
router = APIRouter(prefix="/api/prices", tags=["prices"])
@router.get("/cheapest", response_model=list[CheapestProduct])
def get_cheapest(
date_filter: date = Query(default=None, alias="date"),
limit: int = Query(default=20, le=100),
db: Session = Depends(get_db),
):
target = date_filter or date.today()
day_start = datetime(target.year, target.month, target.day, 0, 0, 0)
day_end = datetime(target.year, target.month, target.day, 23, 59, 59)
min_per_product = (
select(
PriceSnapshot.product_id,
func.min(PriceSnapshot.price).label("min_price"),
)
.where(PriceSnapshot.timestamp.between(day_start, day_end))
.group_by(PriceSnapshot.product_id)
.subquery()
)
rows = db.execute(
select(PriceSnapshot, Product)
.join(
min_per_product,
(PriceSnapshot.product_id == min_per_product.c.product_id)
& (PriceSnapshot.price == min_per_product.c.min_price),
)
.join(Product, PriceSnapshot.product_id == Product.id)
.options(selectinload(Product.store))
.order_by(PriceSnapshot.price.asc())
.limit(limit)
).all()
return [
CheapestProduct(
product=ProductSchema.model_validate(product),
price=snapshot.price,
unit_price=snapshot.unit_price,
unit_description=snapshot.unit_description,
is_on_sale=snapshot.is_on_sale,
timestamp=snapshot.timestamp,
)
for snapshot, product in rows
]
+64
View File
@@ -0,0 +1,64 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from ..database import get_db
from ..models import PriceSnapshot, Product
from ..schemas import PriceSnapshot as PriceSnapshotSchema, ProductWithLatestPrice
router = APIRouter(prefix="/api/products", tags=["products"])
def _attach_latest_price(product: Product, db: Session) -> ProductWithLatestPrice:
p = ProductWithLatestPrice.model_validate(product)
latest = db.scalar(
select(PriceSnapshot)
.where(PriceSnapshot.product_id == product.id)
.order_by(PriceSnapshot.timestamp.desc())
.limit(1)
)
if latest:
p.latest_price = latest.price
p.latest_price_timestamp = latest.timestamp
p.is_on_sale = latest.is_on_sale
return p
@router.get("", response_model=list[ProductWithLatestPrice])
def search_products(
search: str = Query(default=""),
limit: int = Query(default=20, le=100),
db: Session = Depends(get_db),
):
q = select(Product).options(selectinload(Product.store))
if search:
q = q.where(Product.name.ilike(f"%{search}%"))
q = q.order_by(Product.name).limit(limit)
products = db.scalars(q).all()
return [_attach_latest_price(p, db) for p in products]
@router.get("/{product_id}", response_model=ProductWithLatestPrice)
def get_product(product_id: int, db: Session = Depends(get_db)):
product = db.scalar(
select(Product)
.where(Product.id == product_id)
.options(selectinload(Product.store))
)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return _attach_latest_price(product, db)
@router.get("/{product_id}/prices", response_model=list[PriceSnapshotSchema])
def get_product_prices(
product_id: int,
limit: int = Query(default=200, le=1000),
db: Session = Depends(get_db),
):
return db.scalars(
select(PriceSnapshot)
.where(PriceSnapshot.product_id == product_id)
.order_by(PriceSnapshot.timestamp.asc())
.limit(limit)
).all()
+19
View File
@@ -0,0 +1,19 @@
from fastapi import APIRouter, Depends, Query
from sqlalchemy import select
from sqlalchemy.orm import Session
from ..database import get_db
from ..models import ScrapeRun
from ..schemas import ScrapeRun as ScrapeRunSchema
router = APIRouter(prefix="/api/scrape-runs", tags=["scrape-runs"])
@router.get("", response_model=list[ScrapeRunSchema])
def list_scrape_runs(
limit: int = Query(default=20, le=100),
db: Session = Depends(get_db),
):
return db.scalars(
select(ScrapeRun).order_by(ScrapeRun.started_at.desc()).limit(limit)
).all()
+14
View File
@@ -0,0 +1,14 @@
from fastapi import APIRouter, Depends
from sqlalchemy import select
from sqlalchemy.orm import Session
from ..database import get_db
from ..models import Store
from ..schemas import Store as StoreSchema
router = APIRouter(prefix="/api/stores", tags=["stores"])
@router.get("", response_model=list[StoreSchema])
def list_stores(db: Session = Depends(get_db)):
return db.scalars(select(Store).order_by(Store.name)).all()
+75
View File
@@ -0,0 +1,75 @@
from datetime import datetime
from pydantic import BaseModel
class Store(BaseModel):
id: int
name: str
slug: str
country: str
website: str | None = None
model_config = {"from_attributes": True}
class Product(BaseModel):
id: int
store_id: int
external_id: str
ean: str | None = None
name: str
brand: str | None = None
category: str | None = None
unit_size: str | None = None
url: str | None = None
created_at: datetime
updated_at: datetime
store: Store | None = None
model_config = {"from_attributes": True}
class ProductWithLatestPrice(Product):
latest_price: int | None = None
latest_price_timestamp: datetime | None = None
is_on_sale: bool = False
class PriceSnapshot(BaseModel):
id: int
product_id: int
scrape_run_id: int
price: int
unit_price: int | None = None
unit_description: str | None = None
currency: str
discount_label: str | None = None
discount_description: str | None = None
was_price: int | None = None
is_on_sale: bool
timestamp: datetime
model_config = {"from_attributes": True}
class ScrapeRun(BaseModel):
id: int
store_id: int
query: str
started_at: datetime
finished_at: datetime | None = None
status: str
products_found: int
error_message: str | None = None
model_config = {"from_attributes": True}
class CheapestProduct(BaseModel):
product: Product
price: int
unit_price: int | None = None
unit_description: str | None = None
is_on_sale: bool
timestamp: datetime
View File
+138
View File
@@ -0,0 +1,138 @@
from datetime import datetime
import httpx
from sqlalchemy import and_, select
from sqlalchemy.orm import Session
from ..models import PriceSnapshot, Product, ScrapeRun, Store
AH_AUTH_URL = "https://api.ah.nl/mobile-auth/v1/auth/token/anonymous"
AH_SEARCH_URL = "https://api.ah.nl/mobile-services/product/search/v2"
AH_BASE_URL = "https://www.ah.nl"
AH_CLIENT_ID = "appie"
def _get_token(client: httpx.Client) -> str:
resp = client.post(AH_AUTH_URL, json={"clientId": AH_CLIENT_ID})
resp.raise_for_status()
return resp.json()["access_token"]
def _search(client: httpx.Client, token: str, query: str, page: int = 0, size: int = 30) -> dict:
resp = client.get(
AH_SEARCH_URL,
params={"query": query, "page": page, "size": size},
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
return resp.json()
def _to_cents(value: float | int | None) -> int | None:
if value is None:
return None
return round(float(value) * 100)
def _upsert_store(db: Session) -> Store:
store = db.scalar(select(Store).where(Store.slug == "albert-heijn"))
if not store:
store = Store(
name="Albert Heijn",
slug="albert-heijn",
country="NL",
website="https://www.ah.nl",
)
db.add(store)
db.commit()
db.refresh(store)
return store
def scrape_query(db: Session, query: str) -> ScrapeRun:
store = _upsert_store(db)
run = ScrapeRun(store_id=store.id, query=query, started_at=datetime.utcnow())
db.add(run)
db.commit()
db.refresh(run)
try:
with httpx.Client(timeout=30.0) as client:
token = _get_token(client)
data = _search(client, token, query)
count = 0
now = datetime.utcnow()
for card in data.get("cards", []):
for raw in card.get("products", []):
external_id = str(raw.get("webshopId", "")).strip()
if not external_id:
continue
product = db.scalar(
select(Product).where(
and_(
Product.store_id == store.id,
Product.external_id == external_id,
)
)
)
if not product:
product = Product(store_id=store.id, external_id=external_id)
db.add(product)
link = raw.get("link", "") or ""
product.name = raw.get("title", "") or ""
product.brand = raw.get("brand") or None
product.category = raw.get("category") or None
product.ean = raw.get("eanCode") or None
product.url = f"{AH_BASE_URL}{link}" if link else None
product.updated_at = now
price_info = raw.get("price") or {}
price_cents = _to_cents(price_info.get("now"))
was_cents = _to_cents(price_info.get("was"))
unit_info = price_info.get("unitInfo") or {}
unit_price_cents = _to_cents(unit_info.get("price"))
unit_description = unit_info.get("description") or None
discount = raw.get("discount") or {}
discount_label = discount.get("label") or None
discount_description = discount.get("description") or None
db.flush() # get product.id if newly created
if price_cents is not None:
snapshot = PriceSnapshot(
product_id=product.id,
scrape_run_id=run.id,
price=price_cents,
unit_price=unit_price_cents,
unit_description=unit_description,
was_price=was_cents,
is_on_sale=was_cents is not None or discount_label is not None,
discount_label=discount_label,
discount_description=discount_description,
timestamp=now,
)
db.add(snapshot)
count += 1
db.commit()
run.status = "success"
run.products_found = count
run.finished_at = datetime.utcnow()
db.commit()
except Exception as exc:
db.rollback()
run.status = "failed"
run.error_message = str(exc)[:900]
run.finished_at = datetime.utcnow()
db.commit()
raise
return run