""" Paper trading client — drop-in replacement for mt5_client. Simulates fills using real MT5 tick prices (if MT5 available) or midpoints from the local candles DB. State persists in paper_state.json. """ import asyncio import json import logging import math from datetime import datetime, date from pathlib import Path from typing import Optional import pandas as pd from . import config logger = logging.getLogger(__name__) ORDER_TYPE_BUY = 0 ORDER_TYPE_SELL = 1 STATE_PATH = Path(__file__).parent.parent / "paper_state.json" _state: dict = {} # Try to use MT5 for live prices even in paper mode try: from . import mt5_client as _mt5 _MT5_FOR_PRICES = True except Exception: _MT5_FOR_PRICES = False # yfinance as live-price fallback when MT5 unavailable try: import yfinance as _yf import warnings as _warnings _warnings.filterwarnings("ignore", category=FutureWarning) _YF_AVAILABLE = True except ImportError: _YF_AVAILABLE = False # candle cache: key → (fetched_at, DataFrame) _candle_cache: dict[str, tuple[float, pd.DataFrame]] = {} _CACHE_TTL = 60.0 # seconds _SPREADS_PIPS: dict[str, float] = { "EURUSD": 0.8, "GBPUSD": 0.9, "USDJPY": 0.7, "USDCHF": 1.0, "USDCAD": 1.0, "AUDUSD": 0.9, "NZDUSD": 1.2, "EURJPY": 1.0, "GBPJPY": 1.5, "EURGBP": 1.0, "EURAUD": 1.5, "EURCAD": 1.5, "AUDCAD": 1.5, "CADJPY": 1.5, "CHFJPY": 1.5, "EURCHF": 1.0, "AUDCHF": 1.5, "CADCHF": 1.5, "AUDJPY": 1.2, "GBPCAD": 2.0, "GBPAUD": 2.0, } def _pip_size(symbol: str) -> float: return 0.01 if symbol.endswith("JPY") else 0.0001 def _half_spread(symbol: str) -> float: pips = _SPREADS_PIPS.get(symbol, 1.5) return pips * _pip_size(symbol) / 2 # ── state ───────────────────────────────────────────────────────────────────── def _load_state() -> dict: if STATE_PATH.exists(): with open(STATE_PATH) as f: return json.load(f) return { "balance": config.PAPER_INITIAL_BALANCE, "positions": [], "history": [], "daily_pnl": {}, } def _save_state(): with open(STATE_PATH, "w") as f: json.dump(_state, f, indent=2, default=str) def connect() -> bool: global _state _state = _load_state() logger.info("Paper trading — balance $%.2f open positions: %d", _state["balance"], len(_state["positions"])) return True def disconnect(): _save_state() logger.info("Paper state saved → %s", STATE_PATH) # ── price helpers ───────────────────────────────────────────────────────────── def _yf_ticker(symbol: str) -> str: return f"{symbol}=X" def _fetch_yf_candles(symbol: str, timeframe: str) -> Optional[pd.DataFrame]: """Blocking yfinance fetch — run in executor.""" if not _YF_AVAILABLE: return None interval_map = {"M15": "15m", "H1": "1h", "H4": "4h", "D1": "1d"} interval = interval_map.get(timeframe, "15m") # Request max available to give EMA(200) on H1 enough warmup period = "60d" if interval in ("15m", "30m") else "365d" try: df = _yf.download( _yf_ticker(symbol), interval=interval, period=period, auto_adjust=True, progress=False, ) if df is None or df.empty: return None if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) df = df.rename(columns=str.lower) df.index = pd.to_datetime(df.index, utc=True).tz_localize(None) df.index.name = "time" df["tick_volume"] = df.get("volume", 0).fillna(0).astype(int) return df[["open", "high", "low", "close", "tick_volume"]].dropna() except Exception as exc: logger.debug("yfinance fetch failed for %s: %s", symbol, exc) return None async def _get_yf_cached(symbol: str, timeframe: str) -> Optional[pd.DataFrame]: import time key = f"{symbol}_{timeframe}" now = time.monotonic() if key in _candle_cache: fetched_at, df = _candle_cache[key] if now - fetched_at < _CACHE_TTL: return df loop = asyncio.get_event_loop() df = await loop.run_in_executor(None, _fetch_yf_candles, symbol, timeframe) if df is not None and not df.empty: _candle_cache[key] = (now, df) return df async def get_tick(symbol: str) -> Optional[dict]: # 1. prefer live MT5 tick if _MT5_FOR_PRICES: tick = await _mt5.get_tick(symbol) if tick: return tick # 2. yfinance last bar df = await _get_yf_cached(symbol, "M15") if df is not None and not df.empty: mid = float(df["close"].iloc[-1]) hs = _half_spread(symbol) return {"bid": mid - hs, "ask": mid + hs} # 3. DB last close try: import sqlite3 from engine.data import DB_PATH conn = sqlite3.connect(DB_PATH) row = conn.execute( "SELECT close FROM candles WHERE symbol=? AND timeframe='M15' " "ORDER BY time DESC LIMIT 1", (symbol,) ).fetchone() conn.close() if row: mid = row[0] hs = _half_spread(symbol) return {"bid": mid - hs, "ask": mid + hs} except Exception: pass return None async def get_candles(symbol: str, timeframe: str, count: int) -> Optional[pd.DataFrame]: # 1. MT5 if _MT5_FOR_PRICES: df = await _mt5.get_candles(symbol, timeframe, count) if df is not None: return df # 2. yfinance (returns all 60d; truncate to count) df = await _get_yf_cached(symbol, timeframe) if df is not None and not df.empty: return df.tail(count) # 3. DB fallback try: import sqlite3 from engine.data import DB_PATH conn = sqlite3.connect(DB_PATH) rows = conn.execute( "SELECT time, open, high, low, close, tick_volume " "FROM candles WHERE symbol=? AND timeframe=? " "ORDER BY time DESC LIMIT ?", (symbol, timeframe, count) ).fetchall() conn.close() if not rows: return None df = pd.DataFrame(rows[::-1], columns=["time","open","high","low","close","tick_volume"]) df["time"] = pd.to_datetime(df["time"]) return df.set_index("time") except Exception as exc: logger.warning("DB candle fallback failed for %s: %s", symbol, exc) return None async def get_account_info() -> Optional[dict]: return {"balance": _state["balance"], "equity": _state["balance"], "margin_level": 0.0} async def get_open_positions(**_) -> list[dict]: """Settle any TP/SL hits, return surviving positions.""" still_open = [] for pos in _state["positions"]: tick = await get_tick(pos["symbol"]) if tick is None: still_open.append(pos) continue mid = (tick["bid"] + tick["ask"]) / 2 ps = _pip_size(pos["symbol"]) pv = pos["pip_value_per_lot"] * pos["volume"] closed, pnl, reason, exit_px = False, 0.0, "", mid if pos["direction"] == ORDER_TYPE_BUY: if mid <= pos["sl"]: pnl, reason, exit_px = (pos["sl"] - pos["entry"]) / ps * pv, "SL", pos["sl"] elif mid >= pos["tp"]: pnl, reason, exit_px = (pos["tp"] - pos["entry"]) / ps * pv, "TP", pos["tp"] else: if mid >= pos["sl"]: pnl, reason, exit_px = (pos["entry"] - pos["sl"]) / ps * pv, "SL", pos["sl"] elif mid <= pos["tp"]: pnl, reason, exit_px = (pos["entry"] - pos["tp"]) / ps * pv, "TP", pos["tp"] if reason: _state["balance"] += pnl today = date.today().isoformat() _state.setdefault("daily_pnl", {})[today] = \ _state["daily_pnl"].get(today, 0) + pnl pnl_pips = (exit_px - pos["entry"]) / ps * (1 if pos["direction"] == ORDER_TYPE_BUY else -1) _state["history"].append({ **pos, "exit": exit_px, "pnl": round(pnl, 2), "pnl_pips": round(pnl_pips, 1), "reason": reason, "closed_at": datetime.now().isoformat(), }) logger.info("%s %-8s %s pnl=$%+.2f (%.1f pips) balance=$%.2f", reason, pos["symbol"], "BUY" if pos["direction"] == ORDER_TYPE_BUY else "SELL", pnl, pnl_pips, _state["balance"]) else: still_open.append(pos) if len(still_open) != len(_state["positions"]): _state["positions"] = still_open _save_state() return list(_state["positions"]) async def get_symbol_info(symbol: str) -> Optional[dict]: """Return pip-value per standard lot in USD.""" ps = _pip_size(symbol) base, quote = symbol[:3], symbol[3:] tick = await get_tick(symbol) if tick is None: return None mid = (tick["bid"] + tick["ask"]) / 2 async def cross_mid(pair: str) -> float: t = await get_tick(pair) return (t["bid"] + t["ask"]) / 2 if t else 1.0 if quote == "USD": pip_value = 100_000 * ps elif base == "USD": pip_value = 100_000 * ps / mid elif quote == "JPY": rate = await cross_mid("USDJPY") pip_value = 100_000 * ps / rate elif quote in ("CAD", "CHF"): rate = await cross_mid(f"USD{quote}") pip_value = 100_000 * ps / rate elif quote in ("AUD", "NZD", "GBP", "EUR"): rate = await cross_mid(f"{quote}USD") pip_value = 100_000 * ps * rate else: pip_value = 10.0 tick_size = ps / 10 return { "name": symbol, "trade_tick_size": tick_size, "trade_tick_value": pip_value / 10, "volume_step": 0.01, "_pip_value_per_lot": pip_value, } async def place_order( symbol: str, order_type: int, volume: float, price: float, sl: float, tp: float, comment: str = "", **_, ) -> Optional[dict]: tick = await get_tick(symbol) if tick is None: return None fill = tick["ask"] if order_type == ORDER_TYPE_BUY else tick["bid"] info = await get_symbol_info(symbol) pv = info["_pip_value_per_lot"] if info else 10.0 trade_id = f"{symbol}_{datetime.now().strftime('%Y%m%d%H%M%S')}" _state["positions"].append({ "id": trade_id, "symbol": symbol, "direction": order_type, "entry": fill, "sl": sl, "tp": tp, "volume": volume, "pip_value_per_lot": pv, "comment": comment, "opened_at": datetime.now().isoformat(), }) _save_state() logger.info("PAPER %s %-8s %.2f lots @ %.5f SL=%.5f TP=%.5f", "BUY " if order_type == ORDER_TYPE_BUY else "SELL", symbol, volume, fill, sl, tp) return {"order": trade_id}