Initial commit: multi-symbol bot with backtest engine and RSI trend strategy

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-28 21:09:12 +02:00
commit ad8dfa27d7
32 changed files with 2644 additions and 0 deletions
View File
+169
View File
@@ -0,0 +1,169 @@
"""
Backtest engine. Strategy-agnostic — the Strategy object owns all signal
and position-sizing logic; this engine handles order simulation and P&L
accounting.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Protocol
import numpy as np
import pandas as pd
PIP_SIZE: dict[str, float] = {
"JPY": 0.01, # e.g. EURJPY
"default": 0.0001,
}
def pip_size(symbol: str) -> float:
return PIP_SIZE["JPY"] if symbol.endswith("JPY") else PIP_SIZE["default"]
# ── trade record ─────────────────────────────────────────────────────────────
@dataclass
class Trade:
direction: str # 'long' | 'short'
entry_time: datetime
entry_price: float
sl: float
tp: float
risk_pct: float # fraction of balance risked (e.g. 0.01)
exit_time: Optional[datetime] = None
exit_price: Optional[float] = None
exit_reason: str = ""
pnl_pips: float = 0.0
pnl_r: float = 0.0 # P&L in units of risk (R)
@property
def closed(self) -> bool:
return self.exit_time is not None
@property
def duration_hours(self) -> float:
if not self.closed:
return 0.0
return (self.exit_time - self.entry_time).total_seconds() / 3600 # type: ignore[operator]
# ── strategy protocol ─────────────────────────────────────────────────────────
class Strategy(Protocol):
"""Minimal interface every strategy must satisfy."""
def prepare(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute and attach all indicator columns needed by `signal()`."""
...
def signal(
self,
df: pd.DataFrame,
i: int,
) -> Optional[tuple[str, float, float]]:
"""
Called on each bar when no position is open.
Returns (direction, sl_price, tp_price) or None.
direction is 'long' or 'short'.
Prices are for the bar at index i+1 (next open).
"""
...
# ── engine ────────────────────────────────────────────────────────────────────
class BacktestEngine:
def __init__(
self,
symbol: str,
initial_balance: float = 10_000.0,
risk_per_trade: float = 0.01, # 1 % per trade
spread_pips: float = 0.5, # half-spread each side
):
self.symbol = symbol
self.initial_balance = initial_balance
self.risk_per_trade = risk_per_trade
self.spread = spread_pips * pip_size(symbol)
def run(self, df: pd.DataFrame, strategy: Strategy) -> tuple[list[Trade], np.ndarray]:
"""
Run the strategy over `df`.
Returns (trades, equity_curve) where equity_curve has one value per bar.
"""
df = strategy.prepare(df.copy())
trades: list[Trade] = []
equity: list[float] = [self.initial_balance]
balance = self.initial_balance
position: Optional[Trade] = None
for i in range(len(df) - 1):
row = df.iloc[i]
next_bar = df.iloc[i + 1]
# ── manage open position ─────────────────────────────────────────
if position is not None:
hi, lo = row["high"], row["low"]
sl, tp = position.sl, position.tp
ps = pip_size(self.symbol)
hit_sl = (position.direction == "long" and lo <= sl) or \
(position.direction == "short" and hi >= sl)
hit_tp = (position.direction == "long" and hi >= tp) or \
(position.direction == "short" and lo <= tp)
if hit_sl or hit_tp:
exit_px = sl if hit_sl else tp
reason = "SL" if hit_sl else "TP"
raw = exit_px - position.entry_price
pips = (raw if position.direction == "long" else -raw) / ps
sl_dist = abs(position.entry_price - sl) / ps
r_mult = pips / sl_dist if sl_dist > 0 else 0
pnl_pct = r_mult * position.risk_pct
position.exit_time = row.name
position.exit_price = exit_px
position.exit_reason = reason
position.pnl_pips = pips
position.pnl_r = r_mult
balance *= (1 + pnl_pct)
position = None
equity.append(balance)
continue
# ── check for entry ──────────────────────────────────────────────
result = strategy.signal(df, i)
if result is None:
equity.append(balance)
continue
direction, sl_px, tp_px = result
entry_px = next_bar["open"]
# apply spread (widen SL, narrow TP)
if direction == "long":
entry_px += self.spread
sl_px -= self.spread
tp_px -= self.spread
else:
entry_px -= self.spread
sl_px += self.spread
tp_px += self.spread
trade = Trade(
direction = direction,
entry_time = next_bar.name,
entry_price = entry_px,
sl = sl_px,
tp = tp_px,
risk_pct = self.risk_per_trade,
)
trades.append(trade)
position = trade
equity.append(balance)
return trades, np.array(equity)
+40
View File
@@ -0,0 +1,40 @@
"""
Data loading and resampling utilities.
"""
import sqlite3
from pathlib import Path
import pandas as pd
DB_PATH = Path("/home/jonathan/Projects/ForexBots/data/candles.db")
_RESAMPLE_MAP = {
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"tick_volume": "sum",
}
def load_candles(symbol: str, timeframe: str, db_path: Path = DB_PATH) -> pd.DataFrame:
conn = sqlite3.connect(db_path)
df = pd.read_sql_query(
"SELECT time, open, high, low, close, tick_volume "
"FROM candles WHERE symbol=? AND timeframe=? ORDER BY time",
conn, params=(symbol, timeframe),
)
conn.close()
df["time"] = pd.to_datetime(df["time"])
df.set_index("time", inplace=True)
return df
def resample(df: pd.DataFrame, rule: str) -> pd.DataFrame:
"""Resample an OHLCV dataframe to a coarser timeframe.
rule follows pandas offset alias (e.g. '1h', '4h', '1D').
"""
agg = {c: _RESAMPLE_MAP[c] for c in df.columns if c in _RESAMPLE_MAP}
return df.resample(rule, label="left", closed="left").agg(agg).dropna()
+40
View File
@@ -0,0 +1,40 @@
"""
Pure-function technical indicators. All operate on pandas Series/DataFrame
and return pandas objects so NaN propagation is handled automatically.
"""
import numpy as np
import pandas as pd
def ema(series: pd.Series, period: int) -> pd.Series:
return series.ewm(span=period, min_periods=period, adjust=False).mean()
def rsi(close: pd.Series, period: int = 14) -> pd.Series:
delta = close.diff()
gain = delta.clip(lower=0)
loss = (-delta).clip(lower=0)
avg_g = gain.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
avg_l = loss.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
rs = avg_g / avg_l.replace(0, np.inf)
return 100 - (100 / (1 + rs))
def atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
hi, lo, pc = df["high"], df["low"], df["close"].shift(1)
tr = pd.concat([(hi - lo), (hi - pc).abs(), (lo - pc).abs()], axis=1).max(axis=1)
return tr.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
def adx(df: pd.DataFrame, period: int = 14) -> pd.Series:
"""Average Directional Index."""
hi, lo, pc = df["high"], df["low"], df["close"].shift(1)
tr = pd.concat([(hi - lo), (hi - pc).abs(), (lo - pc).abs()], axis=1).max(axis=1)
dm_pos = (hi - hi.shift(1)).clip(lower=0).where((hi - hi.shift(1)) > (lo.shift(1) - lo), 0)
dm_neg = (lo.shift(1) - lo).clip(lower=0).where((lo.shift(1) - lo) > (hi - hi.shift(1)), 0)
atr_s = tr.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
di_pos = 100 * dm_pos.ewm(alpha=1 / period, min_periods=period, adjust=False).mean() / atr_s
di_neg = 100 * dm_neg.ewm(alpha=1 / period, min_periods=period, adjust=False).mean() / atr_s
dx = (100 * (di_pos - di_neg).abs() / (di_pos + di_neg).replace(0, np.nan))
return dx.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
+109
View File
@@ -0,0 +1,109 @@
"""
Performance metrics computed from a completed backtest.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Sequence
import numpy as np
from .backtest import Trade
@dataclass
class Metrics:
n_trades: int
n_wins: int
n_losses: int
win_rate: float # 0-1
avg_win_r: float # average win in R
avg_loss_r: float # average loss in R (negative)
profit_factor: float
expectancy_r: float # expected R per trade
total_pips: float
total_return: float # fractional (e.g. 0.12 = 12%)
max_drawdown: float # fractional (negative)
sharpe: float # annualised daily Sharpe
avg_duration: float # hours
n_longs: int
n_shorts: int
final_balance: float
def __str__(self) -> str:
lines = [
f" Trades : {self.n_trades} ({self.n_longs}L / {self.n_shorts}S)",
f" Win rate : {self.win_rate*100:.1f}%",
f" Avg win (R) : {self.avg_win_r:+.2f}R",
f" Avg loss (R) : {self.avg_loss_r:+.2f}R",
f" Expectancy : {self.expectancy_r:+.3f}R / trade",
f" Profit factor : {self.profit_factor:.2f}",
f" Total pips : {self.total_pips:+.1f}",
f" Total return : {self.total_return*100:+.2f}%",
f" Final balance : ${self.final_balance:,.2f}",
f" Max drawdown : {self.max_drawdown*100:.2f}%",
f" Sharpe : {self.sharpe:.2f}",
f" Avg duration : {self.avg_duration:.1f} h",
]
return "\n".join(lines)
def score(self) -> float:
"""Composite optimisation score (higher = better)."""
if self.n_trades < 20:
return -999.0
return self.sharpe * (1 + self.expectancy_r) * (1 + self.total_return)
def compute(
trades: list[Trade],
equity: np.ndarray,
initial_balance: float = 10_000.0,
) -> Metrics:
closed = [t for t in trades if t.closed]
if not closed:
return Metrics(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, initial_balance)
pips = np.array([t.pnl_pips for t in closed])
r_vals = np.array([t.pnl_r for t in closed])
wins = r_vals[r_vals > 0]
losses = r_vals[r_vals <= 0]
gross_profit = wins.sum() if len(wins) else 0.0
gross_loss = abs(losses.sum()) if len(losses) else 0.0
pf = gross_profit / gross_loss if gross_loss else float("inf")
# equity metrics
peak = np.maximum.accumulate(equity)
dd = (equity - peak) / peak
max_dd = dd.min()
# annualised Sharpe via daily returns
by_day: dict[str, float] = {}
for t in closed:
day = t.exit_time.date().isoformat() # type: ignore[union-attr]
by_day[day] = by_day.get(day, 0) + t.pnl_r * t.risk_pct
daily = np.array(list(by_day.values()))
sharpe = (daily.mean() / daily.std() * np.sqrt(252)) if (len(daily) > 1 and daily.std() > 0) else 0.0
durations = [t.duration_hours for t in closed]
return Metrics(
n_trades = len(closed),
n_wins = int((r_vals > 0).sum()),
n_losses = int((r_vals <= 0).sum()),
win_rate = float((r_vals > 0).mean()),
avg_win_r = float(wins.mean()) if len(wins) else 0.0,
avg_loss_r = float(losses.mean()) if len(losses) else 0.0,
profit_factor = pf,
expectancy_r = float(r_vals.mean()),
total_pips = float(pips.sum()),
total_return = float(equity[-1] / initial_balance - 1),
max_drawdown = float(max_dd),
sharpe = float(sharpe),
avg_duration = float(np.mean(durations)),
n_longs = sum(1 for t in closed if t.direction == "long"),
n_shorts = sum(1 for t in closed if t.direction == "short"),
final_balance = float(equity[-1]),
)
+62
View File
@@ -0,0 +1,62 @@
"""
Grid-search optimizer.
Iterates over a parameter grid, re-instantiates the strategy for each
combination, runs the backtest on the in-sample slice, and ranks by a
composite score (Sharpe × expectancy × return).
"""
from __future__ import annotations
import itertools
from dataclasses import dataclass
from typing import Any, Type
import numpy as np
import pandas as pd
from .backtest import BacktestEngine
from .metrics import Metrics, compute
@dataclass
class OptResult:
params: dict[str, Any]
metrics: Metrics
score: float
def grid_search(
StrategyClass: Type,
param_grid: dict[str, list],
df: pd.DataFrame,
engine: BacktestEngine,
*,
verbose: bool = False,
) -> list[OptResult]:
"""
Run every combination of parameters in `param_grid`.
Returns results sorted best-first by composite score.
"""
keys = list(param_grid.keys())
values = list(param_grid.values())
combos = list(itertools.product(*values))
results: list[OptResult] = []
for idx, combo in enumerate(combos):
params = dict(zip(keys, combo))
strategy = StrategyClass(**params)
trades, equity = engine.run(df, strategy)
m = compute(trades, equity, engine.initial_balance)
s = m.score()
results.append(OptResult(params=params, metrics=m, score=s))
if verbose and (idx + 1) % 50 == 0:
print(f" [{idx+1}/{len(combos)}] best so far: "
f"{max(results, key=lambda r: r.score).score:.4f}")
results.sort(key=lambda r: r.score, reverse=True)
return results