Files
2026-04-28 21:09:13 +02:00

128 lines
4.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Dukascopy public CDN fetcher — no account required.
Downloads bi5 tick files, aggregates to OHLC at the requested timeframe.
Usage:
from shared.fetcher import fetch
from datetime import datetime
df = fetch("EURUSD", datetime(2024, 1, 1), datetime(2025, 1, 1), "M15")
"""
import io
import lzma
import struct
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, timezone
import pandas as pd
TIMEFRAME_RESAMPLE = {
"M1": "1min", "M5": "5min", "M10": "10min", "M15": "15min", "M30": "30min",
"H1": "1h", "H4": "4h", "D": "1D", "D1": "1D",
}
_POINT_DIVISOR = {"JPY": 1000}
_DEFAULT_DIVISOR = 100_000
_TICK_FMT = ">IIIff" # ms_offset, ask, bid, ask_vol, bid_vol
_TICK_SIZE = struct.calcsize(_TICK_FMT)
def _divisor(symbol: str) -> int:
return _POINT_DIVISOR.get(symbol[-3:], _DEFAULT_DIVISOR)
def _cdn_url(symbol: str, dt: datetime) -> str:
return (
f"https://datafeed.dukascopy.com/datafeed/{symbol}/"
f"{dt.year}/{dt.month - 1:02d}/{dt.day:02d}/{dt.hour:02d}h_ticks.bi5"
)
def _download_hour(symbol: str, dt: datetime) -> list[tuple[int, float, float]]:
"""Return list of (unix_ms, ask, bid) for one hour. Empty on any failure."""
url = _cdn_url(symbol, dt)
try:
with urllib.request.urlopen(url, timeout=15) as resp:
raw = resp.read()
if not raw:
return []
data = lzma.decompress(raw, format=lzma.FORMAT_AUTO)
divisor = _divisor(symbol)
hour_ms = int(dt.replace(tzinfo=timezone.utc).timestamp() * 1000)
n = len(data) // _TICK_SIZE
ticks = []
for i in range(n):
ms_off, ask_raw, bid_raw, _, _ = struct.unpack_from(_TICK_FMT, data, i * _TICK_SIZE)
ticks.append((hour_ms + ms_off, ask_raw / divisor, bid_raw / divisor))
return ticks
except Exception:
return []
def _trading_hours(start: datetime, end: datetime) -> list[datetime]:
"""All hourly UTC timestamps MonFri in the date range."""
hours = []
dt = start.replace(minute=0, second=0, microsecond=0)
while dt < end:
if dt.weekday() < 5:
hours.append(dt)
dt += timedelta(hours=1)
return hours
def fetch(
symbol: str,
start: datetime,
end: datetime,
timeframe: str,
max_workers: int = 24,
progress_cb=None,
) -> pd.DataFrame:
"""
Download tick data from Dukascopy and resample to OHLC candles.
Args:
symbol: e.g. "EURUSD"
start/end: naive UTC datetimes
timeframe: one of M1, M5, M15, M30, H1, H4, D1
max_workers: parallel HTTP threads
progress_cb: optional callable(completed, total) for progress reporting
Returns:
DataFrame with columns: time, open, high, low, close, tick_volume
"""
if timeframe not in TIMEFRAME_RESAMPLE:
raise ValueError(f"Unknown timeframe {timeframe!r}. Choose from {list(TIMEFRAME_RESAMPLE)}")
hours = _trading_hours(start, end)
all_ticks: list[tuple[int, float, float]] = []
completed = 0
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(_download_hour, symbol, h): h for h in hours}
for fut in as_completed(futures):
all_ticks.extend(fut.result())
completed += 1
if progress_cb:
progress_cb(completed, len(hours))
if not all_ticks:
raise RuntimeError(f"No tick data returned for {symbol} ({start} {end})")
all_ticks.sort(key=lambda t: t[0])
ts, asks, bids = zip(*all_ticks)
mids = [(a + b) / 2 for a, b in zip(asks, bids)]
idx = pd.to_datetime(ts, unit="ms", utc=True).tz_localize(None)
s = pd.Series(mids, index=idx, name="mid")
freq = TIMEFRAME_RESAMPLE[timeframe]
ohlc = s.resample(freq).ohlc().dropna()
vol = s.resample(freq).count().rename("tick_volume")
df = ohlc.join(vol).reset_index()
df.columns = ["time", "open", "high", "low", "close", "tick_volume"]
return df