""" calibration.py — Startup calibration report and analytics. ============================================================ Runs once after every bot startup, after the initial state fetch. Also triggered by fetch_history.py after each history sync. Writes to logs/calibration.log — each run appends a dated report so you can track progress over time. WHAT IT DOES: ------------- 1. Fetches latest transaction history (incremental — only new days) 2. Analyses TSLA-only performance from real historical data 3. Shows survival analysis with progress toward 60% target 4. Tracks all milestones with real estimated days to reach each 5. Recommends next config change (survival threshold increase) MILESTONE TARGETS: ------------------ Priority 1: 60% survival (account resilience target) Priority 2: £1,000 equity Priority 3: £10,000 equity Priority 4: £100,000 equity Priority 5: £1,000,000 equity (the dream) DATA SOURCE: ------------ All financial projections use real TSLA trade history from history.db. Monthly profit trend used for projections (recent months weighted more). Falls back to estimates if no history available. """ import logging import os import sqlite3 from datetime import datetime, date, timedelta from state import BotState, pos_level, pos_size, ord_level, ord_size from calculator import GridCalculator import config log = logging.getLogger("maxbot.calibration") DB_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "data", "history.db" ) def setup_calibration_log(): """Add calibration log handler. Called from bot.py setup_logging.""" base = os.path.dirname(os.path.abspath(__file__)) logs_dir = os.path.join(base, "logs") os.makedirs(logs_dir, exist_ok=True) handler = logging.FileHandler(os.path.join(logs_dir, "calibration.log")) handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) handler.setLevel(logging.INFO) logging.getLogger("maxbot.calibration").addHandler(handler) logging.getLogger("maxbot.calibration").propagate = True # ───────────────────────────────────────────── # HISTORY READER # ───────────────────────────────────────────── class HistoryReader: """Read TSLA performance data from history.db.""" def __init__(self): self.available = os.path.exists(DB_PATH) if self.available: try: self.conn = sqlite3.connect(DB_PATH) self.conn.row_factory = sqlite3.Row except Exception as e: log.warning(f"Could not open history DB: {e}") self.available = False def get_deposit_stats(self) -> dict: """ Analyse deposit history to detect recurring deposit pattern. Returns weekly average based on recent deposits if a pattern exists. Falls back to config.WEEKLY_DEPOSIT_GBP if no pattern detected. """ if not self.available: return { "total_deposited": 0, "deposit_count": 0, "last_deposit": None, "weekly_avg": getattr(config, "WEEKLY_DEPOSIT_GBP", 0.0), "pattern": "config", } try: # All deposits from 2024 onwards only row = self.conn.execute(""" SELECT COUNT(*) as c, SUM(size_gbp) as total, MAX(date_utc) as last FROM transactions WHERE transaction_type='DEPOSIT' AND date_utc >= '2024-01-01' """).fetchone() # Recent deposits — last 12 weeks (from 2024 onwards) recent = self.conn.execute(""" SELECT COUNT(*) as c, SUM(size_gbp) as total, MIN(date_utc) as first, MAX(date_utc) as last FROM transactions WHERE transaction_type='DEPOSIT' AND date_utc >= date('now', '-84 days') AND date_utc >= '2024-01-01' """).fetchone() # Detect pattern: 3+ deposits in last 12 weeks = regular weekly_avg = 0.0 pattern = "none" config_weekly = getattr(config, "WEEKLY_DEPOSIT_GBP", 0.0) if recent["c"] >= 3: # Regular deposit pattern detected weekly_avg = (recent["total"] or 0) / 12.0 pattern = f"detected ({recent['c']} deposits in 12 weeks)" elif config_weekly > 0: # Use config value weekly_avg = config_weekly pattern = "config" else: pattern = "none yet — will auto-detect when regular deposits start" return { "total_deposited": row["total"] or 0, "deposit_count": row["c"] or 0, "last_deposit": row["last"][:10] if row["last"] else None, "weekly_avg": weekly_avg, "pattern": pattern, } except Exception as e: log.warning(f"Could not read deposit stats: {e}") return { "total_deposited": 0, "deposit_count": 0, "last_deposit": None, "weekly_avg": getattr(config, "WEEKLY_DEPOSIT_GBP", 0.0), "pattern": "error", } def get_tsla_stats(self) -> dict: """ Return TSLA performance stats from new strategy start date. New strategy detected as starting July 2024 based on trade history — that's when consistent £1/trade pattern began after previous strategies. """ NEW_STRATEGY_DATE = "2024-01-01" if not self.available: return {} try: # New strategy period only row = self.conn.execute(""" SELECT COUNT(*) as trades, SUM(size_gbp) as profit, MIN(date_utc) as first_trade, MAX(date_utc) as last_trade FROM transactions WHERE transaction_type='TRADE' AND instrument='TSLA' AND date_utc >= ? """, (NEW_STRATEGY_DATE,)).fetchone() swap_row = self.conn.execute(""" SELECT SUM(size_gbp) as fees FROM transactions WHERE transaction_type='SWAP' AND instrument='TSLA' AND date_utc >= ? """, (NEW_STRATEGY_DATE,)).fetchone() deposit_row = None # deposits tracked separately # Monthly profit for last 6 months monthly = [] for m in self.conn.execute(""" SELECT SUBSTR(date_utc,1,7) as month, COUNT(*) as trades, SUM(size_gbp) as profit FROM transactions WHERE transaction_type='TRADE' AND instrument='TSLA' AND date_utc >= date('now','-6 months') AND date_utc >= '2024-01-01' GROUP BY month ORDER BY month """): monthly.append({ "month": m["month"], "trades": m["trades"], "profit": m["profit"], }) # Recent daily average (last 30 days with trades) recent_row = self.conn.execute(""" SELECT AVG(daily) FROM ( SELECT DATE(date_utc) as d, SUM(size_gbp) as daily FROM transactions WHERE transaction_type='TRADE' AND instrument='TSLA' AND date_utc >= date('now','-30 days') AND date_utc >= '2024-01-01' GROUP BY DATE(date_utc) ) """).fetchone() # Last 90 days daily average (more stable) avg90_row = self.conn.execute(""" SELECT AVG(daily) FROM ( SELECT DATE(date_utc) as d, SUM(size_gbp) as daily FROM transactions WHERE transaction_type='TRADE' AND instrument='TSLA' AND date_utc >= date('now','-90 days') AND date_utc >= '2024-01-01' GROUP BY DATE(date_utc) ) """).fetchone() swap_fees = swap_row["fees"] or 0 trade_profit = row["profit"] or 0 recent_daily = recent_row[0] or 0 avg90_daily = avg90_row[0] or 0 return { "trades": row["trades"] or 0, "trade_profit": trade_profit, "swap_fees": swap_fees, "net": trade_profit + swap_fees, "first_trade": row["first_trade"], "last_trade": row["last_trade"], "monthly": monthly, "recent_daily": recent_daily, "avg90_daily": avg90_daily, } except Exception as e: log.warning(f"Could not read TSLA stats: {e}") return {} def sync_history(self): """Run incremental history fetch to get latest data.""" try: import subprocess, sys, time script = os.path.join( os.path.dirname(os.path.abspath(__file__)), "fetch_history.py" ) if os.path.exists(script): log.info("Syncing transaction history...") # Wait 3 seconds — main bot already created a session moments ago # Capital.com rate limits POST /session to 1 request/second time.sleep(3) result = subprocess.run( [sys.executable, script, "--mode", "live"], capture_output=True, text=True, timeout=300 ) if result.returncode == 0: log.info("History sync complete.") else: output = result.stderr or result.stdout err_lines = [l for l in output.split("\n") if "ERROR" in l or "error" in l.lower()] err_msg = err_lines[-1].strip() if err_lines else output[:200] log.warning(f"History sync warning: {err_msg}") except Exception as e: log.warning(f"History sync failed: {e}") def close(self): if self.available: try: self.conn.close() except Exception: pass # ───────────────────────────────────────────── # CALIBRATOR # ───────────────────────────────────────────── class Calibrator: MILESTONES = [ ("£1,000", 1_000), ("£10,000", 10_000), ("£100,000", 100_000), ("£1,000,000", 1_000_000), ] SURVIVAL_MILESTONES = [30, 35, 40, 45, 50, 55, 60] def __init__(self, state: BotState, calculator: GridCalculator): self.state = state self.calculator = calculator def run(self): """Run full calibration and write report.""" s = self.state calc = self.calculator # Sync latest history first reader = HistoryReader() reader.sync_history() tsla = reader.get_tsla_stats() deposits = reader.get_deposit_stats() reader.close() # Pre-calculate everything needed all_levels = s.all_levels() # Daily profit rate if tsla and tsla.get("avg90_daily", 0) > 0: base_daily = tsla["avg90_daily"] recent_daily = tsla.get("recent_daily", base_daily) daily_rate = (recent_daily * 0.6) + (base_daily * 0.4) else: daily_rate = calc.get_queue_depth(s.equity) * config.TP_PROFIT_LARGE * 0.3 weekly_deposit = deposits.get("weekly_avg", 0.0) daily_deposit = weekly_deposit / 7.0 total_daily = daily_rate + daily_deposit # Survival max_safe = self._find_max_safe_pct(s, all_levels) next_survival = self._next_survival_milestone(max_safe) # Grid (survival-gated) spacing = calc.get_spacing_pct( s.equity, s.highest_open_price, all_levels, s.gbpusd ) depth = calc.get_queue_depth( s.equity, spacing, s.highest_open_price, all_levels, s.gbpusd ) # Full 60% simulation result (uses current spacing) full_sim_60 = calc._simulate_grid( s.highest_open_price, spacing, s.gbpusd, all_levels ) survival_60 = calc.survival_check( s.equity, s.highest_open_price, full_sim_60, s.gbpusd, override_pct=60.0 ) lines = [] lines.append("") lines.append("=" * 60) lines.append( f" CALIBRATION REPORT — " f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) lines.append("=" * 60) # ══════════════════════════════════════ # 1. ACCOUNT SNAPSHOT # ══════════════════════════════════════ lines.append("") lines.append(" 1. ACCOUNT") lines.append(f" Equity: £{s.equity:.2f}") lines.append(f" Available: £{s.available:.2f}") lines.append(f" Margin in use: £{s.total_margin_gbp:.2f}") lines.append(f" Margin level: {s.margin_level_pct:.1f}%") lines.append(f" TSLA price: ${s.current_price:.2f}") lines.append(f" Open positions: {len(s.positions)}") lines.append(f" Pending orders: {len(s.orders)}") if deposits.get("total_deposited"): lines.append(f" Deposited (2024+): £{deposits['total_deposited']:.2f}") if deposits.get("last_deposit"): lines.append(f" Last deposit: {deposits['last_deposit']}") # ══════════════════════════════════════ # 2. SURVIVAL (calculated first — everything else depends on it) # ══════════════════════════════════════ progress = min(max_safe / 60.0, 1.0) bar_filled = int(progress * 20) surv_bar = "█" * bar_filled + "░" * (20 - bar_filled) lines.append("") lines.append(" 2. SURVIVAL (target: 60% drop resistance)") lines.append(f" Can survive: {max_safe:.0f}% drop") lines.append(f" Bot setting: {config.SURVIVAL_DROP_PCT:.0f}%") lines.append(f" Progress to 60%: [{surv_bar}] {max_safe:.0f}/60%") # Auto-update logic — all in one place, clear and unambiguous old_pct = config.SURVIVAL_DROP_PCT if max_safe >= 60: lines.append(f" Status: ✓ TARGET REACHED") if config.SURVIVAL_DROP_PCT < 60: self._update_survival_pct(60.0, lines) elif max_safe > config.SURVIVAL_DROP_PCT: new_pct = self._next_survival_milestone(config.SURVIVAL_DROP_PCT) if new_pct <= max_safe: self._update_survival_pct(new_pct, lines) lines.append( f" Status: ↑ Updated {old_pct:.0f}% → " f"{config.SURVIVAL_DROP_PCT:.0f}% (auto)" ) else: lines.append( f" Status: → Holding at {config.SURVIVAL_DROP_PCT:.0f}%" ) else: equity_needed = self._equity_for_survival(s, all_levels, next_survival) gap = max(equity_needed - s.equity, 0) lines.append( f" Status: → Holding at {config.SURVIVAL_DROP_PCT:.0f}%" ) lines.append( f" Next milestone: {next_survival:.0f}% needs " f"~£{equity_needed:.0f} equity" ) if gap > 0 and daily_rate > 0: days = gap / daily_rate lines.append( f" Est. time: ~{days:.0f} days " f"(£{gap:.0f} at £{daily_rate:.2f}/day)" ) # ══════════════════════════════════════ # 3. GRID (frozen until 60% survival achieved) # Spacing tightens only when full 60% drop simulation passes. # Depth increases only after spacing tightens AND 60% passes. # ══════════════════════════════════════ lines.append("") lines.append(" 3. GRID (locked to 60% survival — spacing first, depth second)") lines.append(f" Spacing: {spacing}% {'← frozen, 60% survival not yet met' if not survival_60['safe'] else '← active'}") lines.append(f" Queue depth: {depth} orders") # Next spacing upgrade for threshold, sp in sorted(config.SPACING_TIERS): if sp < spacing: gap = max(threshold - s.equity, 0) days = gap / daily_rate if daily_rate > 0 else 0 sim = calc._simulate_grid( s.highest_open_price, sp, s.gbpusd, all_levels ) sv = calc.survival_check( s.equity, s.highest_open_price, sim, s.gbpusd, override_pct=60.0 ) sv_status = "✓ survival ok" if sv["safe"] else "✗ needs more equity" if gap > 0: lines.append( f" Spacing → {sp}%: £{threshold} needed " f"(£{gap:.0f} away ~{days:.0f}d) {sv_status}" ) else: lines.append( f" Spacing → {sp}%: equity met {sv_status}" ) break # Next depth upgrade for threshold, d, req_sp in sorted(config.QUEUE_TIERS): if d > depth: gap = max(threshold - s.equity, 0) days = gap / daily_rate if daily_rate > 0 else 0 if spacing > req_sp: lines.append( f" Depth → {d} orders: spacing {req_sp}% required first" ) elif gap > 0: lines.append( f" Depth → {d} orders: £{threshold} needed " f"(£{gap:.0f} away ~{days:.0f}d)" ) else: lines.append( f" Depth → {d} orders: equity met — survival check active" ) break # ══════════════════════════════════════ # 4. PERFORMANCE (the engine driving everything) # ══════════════════════════════════════ lines.append("") lines.append(" 4. TSLA PERFORMANCE (current strategy — from Jan 2024)") if tsla: lines.append(f" Trades: {tsla['trades']}") lines.append(f" Trade profit: £{tsla['trade_profit']:.2f}") lines.append(f" Overnight fees: £{tsla['swap_fees']:.2f}") lines.append(f" Net: £{tsla['net']:.2f}") lines.append(f" Last 30d avg: £{tsla.get('recent_daily', 0):.2f}/day") lines.append(f" Last 90d avg: £{tsla.get('avg90_daily', 0):.2f}/day") lines.append(f" Blended rate: £{daily_rate:.2f}/day (used for projections)") if tsla.get("monthly"): lines.append("") lines.append(" Monthly (last 6 months):") for m in tsla["monthly"]: bar_len = min(int(m["profit"] / 2), 25) bar = "█" * max(bar_len, 0) lines.append( f" {m['month']} " f"{m['trades']:3} trades " f"£{m['profit']:6.2f} {bar}" ) # ══════════════════════════════════════ # 5. MILESTONES (projections based on all the above) # ══════════════════════════════════════ lines.append("") lines.append(" 5. MILESTONES (target: £1,000,000)") lines.append(f" Current equity: £{s.equity:.2f}") if weekly_deposit > 0: lines.append( f" Weekly deposits: £{weekly_deposit:.2f}/week " f"({deposits['pattern']})" ) lines.append( f" Total rate: £{total_daily:.2f}/day " f"(£{daily_rate:.2f} trading + £{daily_deposit:.2f} deposits)" ) else: lines.append( f" Weekly deposits: none yet — " f"auto-detects when regular deposits start" ) lines.append("") for label, target in self.MILESTONES: gap = target - s.equity progress = min(s.equity / target, 1.0) filled = int(progress * 20) bar = "█" * filled + "░" * (20 - filled) pct = progress * 100 if s.equity >= target: lines.append(f" {label:12} [{bar}] ✓ REACHED") else: lines.append(f" {label:12} [{bar}] {pct:.1f}%") if total_daily > 0: days = gap / total_daily years = days / 365 if days < 365: eta = f"~{days:.0f} days" elif years < 2: eta = f"~{years:.1f} years" else: eta = f"~{years:.0f} years" rate_str = ( f"£{total_daily:.2f}/day" if weekly_deposit > 0 else f"£{daily_rate:.2f}/day" ) lines.append( f" {'':12} £{gap:,.0f} needed → {eta} at {rate_str}" ) lines.append("") lines.append("=" * 60) lines.append("") for line in lines: log.info(line) # ───────────────────────────────────────── # HELPERS # ───────────────────────────────────────── def _update_survival_pct(self, new_pct: float, lines: list): """ Automatically update SURVIVAL_DROP_PCT in config.py. Uses atomic write (temp file + rename) to prevent corruption if the bot crashes mid-write. """ try: config_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "config.py" ) tmp_path = config_path + ".tmp" with open(config_path, "r") as f: content = f.read() old_line = f"SURVIVAL_DROP_PCT = {config.SURVIVAL_DROP_PCT:.1f}" new_line = f"SURVIVAL_DROP_PCT = {new_pct:.1f}" if old_line not in content: old_line = f"SURVIVAL_DROP_PCT = {int(config.SURVIVAL_DROP_PCT)}" if old_line in content: new_content = content.replace(old_line, new_line, 1) # Write to temp file first, then rename atomically with open(tmp_path, "w") as f: f.write(new_content) os.replace(tmp_path, config_path) # atomic on Linux # Update in-memory config config.SURVIVAL_DROP_PCT = new_pct lines.append( f" ✓ config.py updated: SURVIVAL_DROP_PCT = {new_pct:.1f}" ) log.warning( f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" f" ↑ SURVIVAL THRESHOLD AUTO-UPDATED: {new_pct:.0f}%\n" f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" ) else: lines.append( f" ⚠ Could not auto-update config.py — " f"update manually: SURVIVAL_DROP_PCT = {new_pct:.1f}" ) except Exception as e: # Clean up temp file if it exists try: if os.path.exists(tmp_path): os.remove(tmp_path) except Exception: pass lines.append(f" ⚠ Auto-update failed: {e}") lines.append( f" Update manually in config.py: " f"SURVIVAL_DROP_PCT = {new_pct:.1f}" ) def _find_max_safe_pct(self, s: BotState, all_levels: list) -> float: """ Find maximum drop % the account can currently survive. IMPORTANT: Does NOT just use current orders — simulates the FULL grid that would exist at each drop level. In a real crash, as price falls the bot keeps placing orders on the way down. All of those fill. So we simulate all grid levels from current price down to the floor, not just the 10 orders currently queued. """ if not s.highest_open_price: return 0.0 max_safe = 0.0 for pct in range(10, 81, 5): floor = s.highest_open_price * (1.0 - pct / 100.0) spacing = self.calculator.get_spacing_pct(s.equity) / 100.0 # Build full simulated grid from current price down to floor # These are all the orders that would fill in a crash of this size simulated_levels = [] # Existing open positions always fill for p in s.positions: simulated_levels.append({ "price": pos_level(p), "size": pos_size(p) }) # Simulate all grid levels that would be placed between # lowest open position (or current price if no positions) and floor open_prices = [pos_level(p) for p in s.positions] start_price = min(open_prices) if open_prices else s.current_price level = start_price * (1.0 - spacing) while level >= floor: size = self.calculator.get_size(level, s.highest_open_price) simulated_levels.append({"price": level, "size": size}) level = level * (1.0 - spacing) result = self.calculator.survival_check( s.equity, s.highest_open_price, simulated_levels, s.gbpusd, override_pct=float(pct) ) if result["safe"]: max_safe = float(pct) else: return max_safe if pct > 10 else 0.0 return 80.0 # survived all tested levels def _next_survival_milestone(self, current_pct: float) -> float: for m in self.SURVIVAL_MILESTONES: if m > current_pct: return float(m) return 60.0 def _equity_for_survival(self, s: BotState, all_levels: list, target_pct: float) -> float: """ Estimate equity needed to survive target_pct% drop. Uses full simulated grid — all levels that would fill in a crash. Formula: equity_needed = 0.5 * total_margin - total_unrealised_loss (equity must be above 50% of margin to avoid Capital.com closeout) """ try: spacing = self.calculator.get_spacing_pct(s.equity) full_sim = self.calculator._simulate_grid( s.highest_open_price, spacing, s.gbpusd, all_levels ) drop = target_pct / 100.0 floor = s.highest_open_price * (1.0 - drop) total_margin = sum( lv["price"] * lv["size"] * config.MARGIN_RATE / s.gbpusd for lv in full_sim ) total_loss = sum( (floor - lv["price"]) * lv["size"] / s.gbpusd for lv in full_sim ) # equity_needed + total_loss >= 0.5 * total_margin # equity_needed >= 0.5 * total_margin - total_loss return max((0.5 * total_margin) - total_loss, 0) except Exception: return 0.0