10f2253a6a
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
128 lines
4.0 KiB
Python
128 lines
4.0 KiB
Python
"""
|
||
Dukascopy public CDN fetcher — no account required.
|
||
Downloads bi5 tick files, aggregates to OHLC at the requested timeframe.
|
||
|
||
Usage:
|
||
from shared.fetcher import fetch
|
||
from datetime import datetime
|
||
|
||
df = fetch("EURUSD", datetime(2024, 1, 1), datetime(2025, 1, 1), "M15")
|
||
"""
|
||
import io
|
||
import lzma
|
||
import struct
|
||
import urllib.request
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
import pandas as pd
|
||
|
||
TIMEFRAME_RESAMPLE = {
|
||
"M1": "1min", "M5": "5min", "M10": "10min", "M15": "15min", "M30": "30min",
|
||
"H1": "1h", "H4": "4h", "D": "1D", "D1": "1D",
|
||
}
|
||
|
||
_POINT_DIVISOR = {"JPY": 1000}
|
||
_DEFAULT_DIVISOR = 100_000
|
||
|
||
_TICK_FMT = ">IIIff" # ms_offset, ask, bid, ask_vol, bid_vol
|
||
_TICK_SIZE = struct.calcsize(_TICK_FMT)
|
||
|
||
|
||
def _divisor(symbol: str) -> int:
|
||
return _POINT_DIVISOR.get(symbol[-3:], _DEFAULT_DIVISOR)
|
||
|
||
|
||
def _cdn_url(symbol: str, dt: datetime) -> str:
|
||
return (
|
||
f"https://datafeed.dukascopy.com/datafeed/{symbol}/"
|
||
f"{dt.year}/{dt.month - 1:02d}/{dt.day:02d}/{dt.hour:02d}h_ticks.bi5"
|
||
)
|
||
|
||
|
||
def _download_hour(symbol: str, dt: datetime) -> list[tuple[int, float, float]]:
|
||
"""Return list of (unix_ms, ask, bid) for one hour. Empty on any failure."""
|
||
url = _cdn_url(symbol, dt)
|
||
try:
|
||
with urllib.request.urlopen(url, timeout=15) as resp:
|
||
raw = resp.read()
|
||
if not raw:
|
||
return []
|
||
data = lzma.decompress(raw, format=lzma.FORMAT_AUTO)
|
||
divisor = _divisor(symbol)
|
||
hour_ms = int(dt.replace(tzinfo=timezone.utc).timestamp() * 1000)
|
||
n = len(data) // _TICK_SIZE
|
||
ticks = []
|
||
for i in range(n):
|
||
ms_off, ask_raw, bid_raw, _, _ = struct.unpack_from(_TICK_FMT, data, i * _TICK_SIZE)
|
||
ticks.append((hour_ms + ms_off, ask_raw / divisor, bid_raw / divisor))
|
||
return ticks
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
def _trading_hours(start: datetime, end: datetime) -> list[datetime]:
|
||
"""All hourly UTC timestamps Mon–Fri in the date range."""
|
||
hours = []
|
||
dt = start.replace(minute=0, second=0, microsecond=0)
|
||
while dt < end:
|
||
if dt.weekday() < 5:
|
||
hours.append(dt)
|
||
dt += timedelta(hours=1)
|
||
return hours
|
||
|
||
|
||
def fetch(
|
||
symbol: str,
|
||
start: datetime,
|
||
end: datetime,
|
||
timeframe: str,
|
||
max_workers: int = 24,
|
||
progress_cb=None,
|
||
) -> pd.DataFrame:
|
||
"""
|
||
Download tick data from Dukascopy and resample to OHLC candles.
|
||
|
||
Args:
|
||
symbol: e.g. "EURUSD"
|
||
start/end: naive UTC datetimes
|
||
timeframe: one of M1, M5, M15, M30, H1, H4, D1
|
||
max_workers: parallel HTTP threads
|
||
progress_cb: optional callable(completed, total) for progress reporting
|
||
|
||
Returns:
|
||
DataFrame with columns: time, open, high, low, close, tick_volume
|
||
"""
|
||
if timeframe not in TIMEFRAME_RESAMPLE:
|
||
raise ValueError(f"Unknown timeframe {timeframe!r}. Choose from {list(TIMEFRAME_RESAMPLE)}")
|
||
|
||
hours = _trading_hours(start, end)
|
||
all_ticks: list[tuple[int, float, float]] = []
|
||
completed = 0
|
||
|
||
with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
||
futures = {pool.submit(_download_hour, symbol, h): h for h in hours}
|
||
for fut in as_completed(futures):
|
||
all_ticks.extend(fut.result())
|
||
completed += 1
|
||
if progress_cb:
|
||
progress_cb(completed, len(hours))
|
||
|
||
if not all_ticks:
|
||
raise RuntimeError(f"No tick data returned for {symbol} ({start} – {end})")
|
||
|
||
all_ticks.sort(key=lambda t: t[0])
|
||
ts, asks, bids = zip(*all_ticks)
|
||
mids = [(a + b) / 2 for a, b in zip(asks, bids)]
|
||
|
||
idx = pd.to_datetime(ts, unit="ms", utc=True).tz_localize(None)
|
||
s = pd.Series(mids, index=idx, name="mid")
|
||
|
||
freq = TIMEFRAME_RESAMPLE[timeframe]
|
||
ohlc = s.resample(freq).ohlc().dropna()
|
||
vol = s.resample(freq).count().rename("tick_volume")
|
||
|
||
df = ohlc.join(vol).reset_index()
|
||
df.columns = ["time", "open", "high", "low", "close", "tick_volume"]
|
||
return df
|