Files

342 lines
11 KiB
Python

"""
Paper trading client — drop-in replacement for mt5_client.
Simulates fills using real MT5 tick prices (if MT5 available) or midpoints
from the local candles DB. State persists in paper_state.json.
"""
import asyncio
import json
import logging
import math
from datetime import datetime, date
from pathlib import Path
from typing import Optional
import pandas as pd
from . import config
logger = logging.getLogger(__name__)
ORDER_TYPE_BUY = 0
ORDER_TYPE_SELL = 1
STATE_PATH = Path(__file__).parent.parent / "paper_state.json"
_state: dict = {}
# Try to use MT5 for live prices even in paper mode
try:
from . import mt5_client as _mt5
_MT5_FOR_PRICES = True
except Exception:
_MT5_FOR_PRICES = False
# yfinance as live-price fallback when MT5 unavailable
try:
import yfinance as _yf
import warnings as _warnings
_warnings.filterwarnings("ignore", category=FutureWarning)
_YF_AVAILABLE = True
except ImportError:
_YF_AVAILABLE = False
# candle cache: key → (fetched_at, DataFrame)
_candle_cache: dict[str, tuple[float, pd.DataFrame]] = {}
_CACHE_TTL = 60.0 # seconds
_SPREADS_PIPS: dict[str, float] = {
"EURUSD": 0.8, "GBPUSD": 0.9, "USDJPY": 0.7, "USDCHF": 1.0,
"USDCAD": 1.0, "AUDUSD": 0.9, "NZDUSD": 1.2, "EURJPY": 1.0,
"GBPJPY": 1.5, "EURGBP": 1.0, "EURAUD": 1.5, "EURCAD": 1.5,
"AUDCAD": 1.5, "CADJPY": 1.5, "CHFJPY": 1.5, "EURCHF": 1.0,
"AUDCHF": 1.5, "CADCHF": 1.5, "AUDJPY": 1.2, "GBPCAD": 2.0,
"GBPAUD": 2.0,
}
def _pip_size(symbol: str) -> float:
return 0.01 if symbol.endswith("JPY") else 0.0001
def _half_spread(symbol: str) -> float:
pips = _SPREADS_PIPS.get(symbol, 1.5)
return pips * _pip_size(symbol) / 2
# ── state ─────────────────────────────────────────────────────────────────────
def _load_state() -> dict:
if STATE_PATH.exists():
with open(STATE_PATH) as f:
return json.load(f)
return {
"balance": config.PAPER_INITIAL_BALANCE,
"positions": [],
"history": [],
"daily_pnl": {},
}
def _save_state():
with open(STATE_PATH, "w") as f:
json.dump(_state, f, indent=2, default=str)
def connect() -> bool:
global _state
_state = _load_state()
logger.info("Paper trading — balance $%.2f open positions: %d",
_state["balance"], len(_state["positions"]))
return True
def disconnect():
_save_state()
logger.info("Paper state saved → %s", STATE_PATH)
# ── price helpers ─────────────────────────────────────────────────────────────
def _yf_ticker(symbol: str) -> str:
return f"{symbol}=X"
def _fetch_yf_candles(symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
"""Blocking yfinance fetch — run in executor."""
if not _YF_AVAILABLE:
return None
interval_map = {"M15": "15m", "H1": "1h", "H4": "4h", "D1": "1d"}
interval = interval_map.get(timeframe, "15m")
# Request max available to give EMA(200) on H1 enough warmup
period = "60d" if interval in ("15m", "30m") else "365d"
try:
df = _yf.download(
_yf_ticker(symbol), interval=interval,
period=period, auto_adjust=True, progress=False,
)
if df is None or df.empty:
return None
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
df = df.rename(columns=str.lower)
df.index = pd.to_datetime(df.index, utc=True).tz_localize(None)
df.index.name = "time"
df["tick_volume"] = df.get("volume", 0).fillna(0).astype(int)
return df[["open", "high", "low", "close", "tick_volume"]].dropna()
except Exception as exc:
logger.debug("yfinance fetch failed for %s: %s", symbol, exc)
return None
async def _get_yf_cached(symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
import time
key = f"{symbol}_{timeframe}"
now = time.monotonic()
if key in _candle_cache:
fetched_at, df = _candle_cache[key]
if now - fetched_at < _CACHE_TTL:
return df
loop = asyncio.get_event_loop()
df = await loop.run_in_executor(None, _fetch_yf_candles, symbol, timeframe)
if df is not None and not df.empty:
_candle_cache[key] = (now, df)
return df
async def get_tick(symbol: str) -> Optional[dict]:
# 1. prefer live MT5 tick
if _MT5_FOR_PRICES:
tick = await _mt5.get_tick(symbol)
if tick:
return tick
# 2. yfinance last bar
df = await _get_yf_cached(symbol, "M15")
if df is not None and not df.empty:
mid = float(df["close"].iloc[-1])
hs = _half_spread(symbol)
return {"bid": mid - hs, "ask": mid + hs}
# 3. DB last close
try:
import sqlite3
from engine.data import DB_PATH
conn = sqlite3.connect(DB_PATH)
row = conn.execute(
"SELECT close FROM candles WHERE symbol=? AND timeframe='M15' "
"ORDER BY time DESC LIMIT 1", (symbol,)
).fetchone()
conn.close()
if row:
mid = row[0]
hs = _half_spread(symbol)
return {"bid": mid - hs, "ask": mid + hs}
except Exception:
pass
return None
async def get_candles(symbol: str, timeframe: str, count: int) -> Optional[pd.DataFrame]:
# 1. MT5
if _MT5_FOR_PRICES:
df = await _mt5.get_candles(symbol, timeframe, count)
if df is not None:
return df
# 2. yfinance (returns all 60d; truncate to count)
df = await _get_yf_cached(symbol, timeframe)
if df is not None and not df.empty:
return df.tail(count)
# 3. DB fallback
try:
import sqlite3
from engine.data import DB_PATH
conn = sqlite3.connect(DB_PATH)
rows = conn.execute(
"SELECT time, open, high, low, close, tick_volume "
"FROM candles WHERE symbol=? AND timeframe=? "
"ORDER BY time DESC LIMIT ?",
(symbol, timeframe, count)
).fetchall()
conn.close()
if not rows:
return None
df = pd.DataFrame(rows[::-1],
columns=["time","open","high","low","close","tick_volume"])
df["time"] = pd.to_datetime(df["time"])
return df.set_index("time")
except Exception as exc:
logger.warning("DB candle fallback failed for %s: %s", symbol, exc)
return None
async def get_account_info() -> Optional[dict]:
return {"balance": _state["balance"], "equity": _state["balance"], "margin_level": 0.0}
async def get_open_positions(**_) -> list[dict]:
"""Settle any TP/SL hits, return surviving positions."""
still_open = []
for pos in _state["positions"]:
tick = await get_tick(pos["symbol"])
if tick is None:
still_open.append(pos)
continue
mid = (tick["bid"] + tick["ask"]) / 2
ps = _pip_size(pos["symbol"])
pv = pos["pip_value_per_lot"] * pos["volume"]
closed, pnl, reason, exit_px = False, 0.0, "", mid
if pos["direction"] == ORDER_TYPE_BUY:
if mid <= pos["sl"]:
pnl, reason, exit_px = (pos["sl"] - pos["entry"]) / ps * pv, "SL", pos["sl"]
elif mid >= pos["tp"]:
pnl, reason, exit_px = (pos["tp"] - pos["entry"]) / ps * pv, "TP", pos["tp"]
else:
if mid >= pos["sl"]:
pnl, reason, exit_px = (pos["entry"] - pos["sl"]) / ps * pv, "SL", pos["sl"]
elif mid <= pos["tp"]:
pnl, reason, exit_px = (pos["entry"] - pos["tp"]) / ps * pv, "TP", pos["tp"]
if reason:
_state["balance"] += pnl
today = date.today().isoformat()
_state.setdefault("daily_pnl", {})[today] = \
_state["daily_pnl"].get(today, 0) + pnl
pnl_pips = (exit_px - pos["entry"]) / ps * (1 if pos["direction"] == ORDER_TYPE_BUY else -1)
_state["history"].append({
**pos, "exit": exit_px, "pnl": round(pnl, 2),
"pnl_pips": round(pnl_pips, 1),
"reason": reason, "closed_at": datetime.now().isoformat(),
})
logger.info("%s %-8s %s pnl=$%+.2f (%.1f pips) balance=$%.2f",
reason, pos["symbol"],
"BUY" if pos["direction"] == ORDER_TYPE_BUY else "SELL",
pnl, pnl_pips, _state["balance"])
else:
still_open.append(pos)
if len(still_open) != len(_state["positions"]):
_state["positions"] = still_open
_save_state()
return list(_state["positions"])
async def get_symbol_info(symbol: str) -> Optional[dict]:
"""Return pip-value per standard lot in USD."""
ps = _pip_size(symbol)
base, quote = symbol[:3], symbol[3:]
tick = await get_tick(symbol)
if tick is None:
return None
mid = (tick["bid"] + tick["ask"]) / 2
async def cross_mid(pair: str) -> float:
t = await get_tick(pair)
return (t["bid"] + t["ask"]) / 2 if t else 1.0
if quote == "USD":
pip_value = 100_000 * ps
elif base == "USD":
pip_value = 100_000 * ps / mid
elif quote == "JPY":
rate = await cross_mid("USDJPY")
pip_value = 100_000 * ps / rate
elif quote in ("CAD", "CHF"):
rate = await cross_mid(f"USD{quote}")
pip_value = 100_000 * ps / rate
elif quote in ("AUD", "NZD", "GBP", "EUR"):
rate = await cross_mid(f"{quote}USD")
pip_value = 100_000 * ps * rate
else:
pip_value = 10.0
tick_size = ps / 10
return {
"name": symbol,
"trade_tick_size": tick_size,
"trade_tick_value": pip_value / 10,
"volume_step": 0.01,
"_pip_value_per_lot": pip_value,
}
async def place_order(
symbol: str,
order_type: int,
volume: float,
price: float,
sl: float,
tp: float,
comment: str = "",
**_,
) -> Optional[dict]:
tick = await get_tick(symbol)
if tick is None:
return None
fill = tick["ask"] if order_type == ORDER_TYPE_BUY else tick["bid"]
info = await get_symbol_info(symbol)
pv = info["_pip_value_per_lot"] if info else 10.0
trade_id = f"{symbol}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
_state["positions"].append({
"id": trade_id,
"symbol": symbol,
"direction": order_type,
"entry": fill,
"sl": sl,
"tp": tp,
"volume": volume,
"pip_value_per_lot": pv,
"comment": comment,
"opened_at": datetime.now().isoformat(),
})
_save_state()
logger.info("PAPER %s %-8s %.2f lots @ %.5f SL=%.5f TP=%.5f",
"BUY " if order_type == ORDER_TYPE_BUY else "SELL",
symbol, volume, fill, sl, tp)
return {"order": trade_id}