""" Dukascopy public CDN fetcher — no account required. Downloads bi5 tick files, aggregates to OHLC at the requested timeframe. Usage: from shared.fetcher import fetch from datetime import datetime df = fetch("EURUSD", datetime(2024, 1, 1), datetime(2025, 1, 1), "M15") """ import io import lzma import struct import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta, timezone import pandas as pd TIMEFRAME_RESAMPLE = { "M1": "1min", "M5": "5min", "M10": "10min", "M15": "15min", "M30": "30min", "H1": "1h", "H4": "4h", "D": "1D", "D1": "1D", } _POINT_DIVISOR = {"JPY": 1000} _DEFAULT_DIVISOR = 100_000 _TICK_FMT = ">IIIff" # ms_offset, ask, bid, ask_vol, bid_vol _TICK_SIZE = struct.calcsize(_TICK_FMT) def _divisor(symbol: str) -> int: return _POINT_DIVISOR.get(symbol[-3:], _DEFAULT_DIVISOR) def _cdn_url(symbol: str, dt: datetime) -> str: return ( f"https://datafeed.dukascopy.com/datafeed/{symbol}/" f"{dt.year}/{dt.month - 1:02d}/{dt.day:02d}/{dt.hour:02d}h_ticks.bi5" ) def _download_hour(symbol: str, dt: datetime) -> list[tuple[int, float, float]]: """Return list of (unix_ms, ask, bid) for one hour. Empty on any failure.""" url = _cdn_url(symbol, dt) try: with urllib.request.urlopen(url, timeout=15) as resp: raw = resp.read() if not raw: return [] data = lzma.decompress(raw, format=lzma.FORMAT_AUTO) divisor = _divisor(symbol) hour_ms = int(dt.replace(tzinfo=timezone.utc).timestamp() * 1000) n = len(data) // _TICK_SIZE ticks = [] for i in range(n): ms_off, ask_raw, bid_raw, _, _ = struct.unpack_from(_TICK_FMT, data, i * _TICK_SIZE) ticks.append((hour_ms + ms_off, ask_raw / divisor, bid_raw / divisor)) return ticks except Exception: return [] def _trading_hours(start: datetime, end: datetime) -> list[datetime]: """All hourly UTC timestamps Mon–Fri in the date range.""" hours = [] dt = start.replace(minute=0, second=0, microsecond=0) while dt < end: if dt.weekday() < 5: hours.append(dt) dt += timedelta(hours=1) return hours def fetch( symbol: str, start: datetime, end: datetime, timeframe: str, max_workers: int = 24, progress_cb=None, ) -> pd.DataFrame: """ Download tick data from Dukascopy and resample to OHLC candles. Args: symbol: e.g. "EURUSD" start/end: naive UTC datetimes timeframe: one of M1, M5, M15, M30, H1, H4, D1 max_workers: parallel HTTP threads progress_cb: optional callable(completed, total) for progress reporting Returns: DataFrame with columns: time, open, high, low, close, tick_volume """ if timeframe not in TIMEFRAME_RESAMPLE: raise ValueError(f"Unknown timeframe {timeframe!r}. Choose from {list(TIMEFRAME_RESAMPLE)}") hours = _trading_hours(start, end) all_ticks: list[tuple[int, float, float]] = [] completed = 0 with ThreadPoolExecutor(max_workers=max_workers) as pool: futures = {pool.submit(_download_hour, symbol, h): h for h in hours} for fut in as_completed(futures): all_ticks.extend(fut.result()) completed += 1 if progress_cb: progress_cb(completed, len(hours)) if not all_ticks: raise RuntimeError(f"No tick data returned for {symbol} ({start} – {end})") all_ticks.sort(key=lambda t: t[0]) ts, asks, bids = zip(*all_ticks) mids = [(a + b) / 2 for a, b in zip(asks, bids)] idx = pd.to_datetime(ts, unit="ms", utc=True).tz_localize(None) s = pd.Series(mids, index=idx, name="mid") freq = TIMEFRAME_RESAMPLE[timeframe] ohlc = s.resample(freq).ohlc().dropna() vol = s.resample(freq).count().rename("tick_volume") df = ohlc.join(vol).reset_index() df.columns = ["time", "open", "high", "low", "close", "tick_volume"] return df