#!/usr/bin/env python3 """ fetch_history.py — Capital.com full transaction history fetcher. ================================================================ Fetches ALL account transactions from Capital.com and stores them in a local SQLite database at data/history.db. ENDPOINT USED: -------------- GET /history/transactions Much richer than /history/activity — returns actual GBP amounts, deposits, trades, overnight fees, all in one place. MODES: ------ python3 fetch_history.py --mode dryrun # preview only, no changes python3 fetch_history.py --mode confirm # asks before starting bulk fetch python3 fetch_history.py --mode live # fully automatic FIRST RUN: ---------- Fetches from 28 Jan 2021 (account open date) to today. ~1,900 API calls at 1/second = ~32 minutes. Progress shown every 10 days. Safe to interrupt — resumes from last successful day on next run. SUBSEQUENT RUNS: ---------------- Only fetches since last successful fetch date. Runs in seconds. Called automatically from calibration report on every bot startup. DATABASE: data/history.db -------------------------- Table: transactions id TEXT PRIMARY KEY (reference field from Capital.com) date_utc TEXT (ISO datetime) transaction_type TEXT (TRADE, SWAP, DEPOSIT, WITHDRAWAL, TRADE_SLIPPAGE_PROTECTION) note TEXT (e.g. "Trade closed", "Overnight fee") instrument TEXT (e.g. "TSLA", "US100", empty for deposits) size_gbp REAL (GBP amount — positive=credit, negative=cost) currency TEXT (always GBP for this account) status TEXT (PROCESSED) raw TEXT (full JSON for future use) Table: fetch_log fetch_date TEXT PRIMARY KEY (YYYY-MM-DD) fetched_at TEXT (when we fetched it) record_count INTEGER (records found that day) status TEXT (ok / error) FINANCIAL LOGIC: ---------------- TRADE size → always positive (GBP profit from closed position) SWAP size → always negative (overnight fee cost) DEPOSIT size → always positive (money added to account) WITHDRAWAL size → negative (money removed) Net performance = sum(TRADE) + sum(SWAP) Net deposited = sum(DEPOSIT) + sum(WITHDRAWAL) Return on capital = net_performance / net_deposited * 100 FUTURE USE: ----------- Calibration report → real profit/day, real fees, real deposits ProQuant backtester → full trade history with entry/exit prices Web dashboard → P&L charts, fee analysis, milestone tracking """ import argparse import json import logging import os import sqlite3 import sys import time from datetime import datetime, timedelta, date, timezone sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import config from client import CapitalClient # ───────────────────────────────────────────── # CONSTANTS # ───────────────────────────────────────────── ACCOUNT_OPEN_DATE = date(2021, 1, 28) DB_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "data", "history.db" ) LOG_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "logs", "fetch_history.log" ) FETCH_DELAY_SECS = 1.1 # slightly over 1s to respect rate limits # ───────────────────────────────────────────── # LOGGING # ───────────────────────────────────────────── def setup_logging(): os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) fmt = "%(asctime)s [%(levelname)s] %(message)s" logging.basicConfig( level=logging.INFO, format=fmt, handlers=[ logging.FileHandler(LOG_PATH), logging.StreamHandler(sys.stdout), ] ) log = logging.getLogger("fetch_history") # ───────────────────────────────────────────── # DATABASE # ───────────────────────────────────────────── class HistoryDB: def __init__(self, db_path: str): os.makedirs(os.path.dirname(db_path), exist_ok=True) self.conn = sqlite3.connect(db_path) self.conn.row_factory = sqlite3.Row self._create_tables() def _create_tables(self): self.conn.executescript(""" CREATE TABLE IF NOT EXISTS transactions ( id TEXT PRIMARY KEY, date_utc TEXT NOT NULL, transaction_type TEXT, note TEXT, instrument TEXT, size_gbp REAL, currency TEXT, status TEXT, raw TEXT ); CREATE INDEX IF NOT EXISTS idx_tx_date ON transactions (date_utc); CREATE INDEX IF NOT EXISTS idx_tx_type ON transactions (transaction_type); CREATE INDEX IF NOT EXISTS idx_tx_instrument ON transactions (instrument); CREATE TABLE IF NOT EXISTS fetch_log ( fetch_date TEXT PRIMARY KEY, fetched_at TEXT NOT NULL, record_count INTEGER DEFAULT 0, status TEXT DEFAULT 'ok' ); """) self.conn.commit() def date_already_fetched(self, fetch_date: date) -> bool: row = self.conn.execute( "SELECT status FROM fetch_log WHERE fetch_date = ? AND status = 'ok'", (fetch_date.isoformat(),) ).fetchone() return row is not None def get_last_fetched_date(self): row = self.conn.execute( "SELECT MAX(fetch_date) as d FROM fetch_log WHERE status = 'ok'" ).fetchone() if row and row["d"]: return date.fromisoformat(row["d"]) return None def save_transactions(self, transactions: list) -> int: """Insert transaction records. Returns number actually inserted.""" inserted = 0 for t in transactions: try: ref = t.get("reference", "") # Build a unique ID from reference + date to avoid collisions tx_id = f"{ref}_{t.get('dateUtc', t.get('date', ''))}" self.conn.execute(""" INSERT OR IGNORE INTO transactions (id, date_utc, transaction_type, note, instrument, size_gbp, currency, status, raw) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( tx_id, t.get("dateUtc", t.get("date", "")), t.get("transactionType", ""), t.get("note", ""), t.get("instrumentName", ""), float(t.get("size", 0)), t.get("currency", "GBP"), t.get("status", ""), json.dumps(t), )) if self.conn.execute("SELECT changes()").fetchone()[0] > 0: inserted += 1 except Exception as e: log.warning(f"Could not insert transaction: {e} — {t}") self.conn.commit() return inserted def log_fetch(self, fetch_date: date, record_count: int, status: str = "ok"): self.conn.execute(""" INSERT OR REPLACE INTO fetch_log (fetch_date, fetched_at, record_count, status) VALUES (?, ?, ?, ?) """, ( fetch_date.isoformat(), datetime.now(tz=timezone.utc).isoformat(), record_count, status, )) self.conn.commit() def get_stats(self) -> dict: """Return summary statistics from the database.""" def scalar(sql, default=0): row = self.conn.execute(sql).fetchone() return row[0] if row and row[0] is not None else default total = scalar("SELECT COUNT(*) FROM transactions") trades = scalar("SELECT COUNT(*) FROM transactions WHERE transaction_type='TRADE'") swaps = scalar("SELECT COUNT(*) FROM transactions WHERE transaction_type='SWAP'") deposits = scalar("SELECT COUNT(*) FROM transactions WHERE transaction_type='DEPOSIT'") other = total - trades - swaps - deposits fetched_days = scalar("SELECT COUNT(*) FROM fetch_log WHERE status='ok'") first_date = scalar("SELECT MIN(date_utc) FROM transactions", None) last_date = scalar("SELECT MAX(date_utc) FROM transactions", None) trade_profit = scalar("SELECT SUM(size_gbp) FROM transactions WHERE transaction_type='TRADE'") swap_fees = scalar("SELECT SUM(size_gbp) FROM transactions WHERE transaction_type='SWAP'") total_deposits = scalar("SELECT SUM(size_gbp) FROM transactions WHERE transaction_type='DEPOSIT'") slippage = scalar("SELECT SUM(size_gbp) FROM transactions WHERE transaction_type='TRADE_SLIPPAGE_PROTECTION'") # Per-instrument breakdown instruments = {} for row in self.conn.execute(""" SELECT instrument, COUNT(*) as c, SUM(size_gbp) as total FROM transactions WHERE transaction_type='TRADE' AND instrument != '' GROUP BY instrument ORDER BY total DESC """): instruments[row["instrument"]] = { "trades": row["c"], "profit": row["total"], } # Daily profit average (only days with trades) daily_avg_row = self.conn.execute(""" SELECT AVG(daily_total) FROM ( SELECT DATE(date_utc) as d, SUM(size_gbp) as daily_total FROM transactions WHERE transaction_type='TRADE' GROUP BY DATE(date_utc) HAVING daily_total > 0 ) """).fetchone() daily_avg = daily_avg_row[0] if daily_avg_row and daily_avg_row[0] else 0 return { "total_records": total, "trades": trades, "swaps": swaps, "deposits": deposits, "other": other, "fetched_days": fetched_days, "first_date": first_date, "last_date": last_date, "trade_profit": trade_profit, "swap_fees": swap_fees, "total_deposits": total_deposits, "slippage_refunds": slippage, "net_performance": trade_profit + swap_fees + slippage, "instruments": instruments, "daily_avg_profit": daily_avg, } def close(self): self.conn.close() # ───────────────────────────────────────────── # FETCHER # ───────────────────────────────────────────── class HistoryFetcher: def __init__(self, mode: str): self.mode = mode self.client = CapitalClient() self.db = HistoryDB(DB_PATH) def fetch_day(self, fetch_date: date): """ Fetch all transactions for a single day. Returns list of records, or None on API error. Capital.com max date range = 1 day. """ from_dt = f"{fetch_date.isoformat()}T00:00:00" to_dt = f"{fetch_date.isoformat()}T23:59:59" try: data = self.client._request( "GET", f"/history/transactions?from={from_dt}&to={to_dt}" ) return data.get("transactions", []) except Exception as e: log.error(f"Failed to fetch {fetch_date}: {e}") return None def print_stats(self): """Print current database statistics.""" stats = self.db.get_stats() log.info("") log.info(" DATABASE SUMMARY") log.info(f" Total records: {stats['total_records']}") log.info(f" Closed trades: {stats['trades']}") log.info(f" Overnight fees: {stats['swaps']}") log.info(f" Deposits: {stats['deposits']}") log.info(f" Other: {stats['other']}") log.info(f" Days fetched: {stats['fetched_days']}") if stats['first_date']: log.info(f" Date range: {stats['first_date'][:10]} → {stats['last_date'][:10]}") log.info("") log.info(" FINANCIAL SUMMARY") log.info(f" Total deposited: £{stats['total_deposits']:.2f}") log.info(f" Trade profit: £{stats['trade_profit']:.2f}") log.info(f" Overnight fees: £{stats['swap_fees']:.2f}") log.info(f" Slippage refunds: £{stats['slippage_refunds']:.2f}") log.info(f" Net performance: £{stats['net_performance']:.2f}") if stats['total_deposits'] > 0: roi = stats['net_performance'] / stats['total_deposits'] * 100 log.info(f" Return on capital: {roi:.1f}%") if stats['daily_avg_profit'] > 0: log.info(f" Avg profit/day: £{stats['daily_avg_profit']:.2f} (days with trades)") log.info("") log.info(" BY INSTRUMENT") for instrument, data in stats['instruments'].items(): log.info(f" {instrument:12} {data['trades']:4} trades £{data['profit']:.2f}") def run(self): log.info("=" * 60) log.info(f" History Fetcher — {self.mode.upper()} MODE") log.info(f" Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") log.info("=" * 60) log.info("Connecting to Capital.com...") try: self.client.create_session() except Exception as e: log.error(f"Connection failed: {e}") sys.exit(1) log.info("Connected.") # Determine start date today = date.today() last = self.db.get_last_fetched_date() if last: start = last + timedelta(days=1) log.info(f"Resuming from {start} (last fetched: {last})") else: start = ACCOUNT_OPEN_DATE log.info(f"First run — fetching from {start}") total_days = max((today - start).days + 1, 0) estimated_mins = total_days * FETCH_DELAY_SECS / 60 # Show current DB state existing_stats = self.db.get_stats() if existing_stats["total_records"] > 0: log.info( f"Database has {existing_stats['total_records']} records " f"({existing_stats['fetched_days']} days already fetched)" ) log.info(f"Days to fetch: {total_days}") log.info(f"Est. time: {estimated_mins:.0f} minutes") if self.mode == "dryrun": log.info("\n[DRY RUN] No data will be fetched or stored.") self.print_stats() self.db.close() return if total_days == 0: log.info("Already up to date.") self.print_stats() self.db.close() return # Confirm in confirm mode if self.mode == "confirm": print(f"\n Fetch {total_days} days from {start} to {today}") print(f" Estimated time: {estimated_mins:.0f} minutes") answer = input(" Start? (y/n): ").strip().lower() if answer != "y": log.info("Aborted.") self.db.close() return # ── Main fetch loop ── current = start total_inserted = 0 days_done = 0 days_empty = 0 log.info("\nFetching...") while current <= today: # Skip already fetched if self.db.date_already_fetched(current): current += timedelta(days=1) continue # Progress every 10 days if days_done % 10 == 0 and days_done > 0: pct = (days_done / max(total_days, 1)) * 100 log.info( f" {days_done}/{total_days} days ({pct:.0f}%) — " f"{total_inserted} records inserted" ) transactions = self.fetch_day(current) if transactions is None: self.db.log_fetch(current, 0, "error") current += timedelta(days=1) days_done += 1 time.sleep(FETCH_DELAY_SECS) continue count = len(transactions) if count > 0: inserted = self.db.save_transactions(transactions) total_inserted += inserted else: days_empty += 1 self.db.log_fetch(current, count) days_done += 1 current += timedelta(days=1) time.sleep(FETCH_DELAY_SECS) # ── Final summary ── log.info("\n" + "=" * 60) log.info(" FETCH COMPLETE") log.info("=" * 60) log.info(f" Days fetched: {days_done}") log.info(f" Days empty: {days_empty}") log.info(f" Records added: {total_inserted}") self.print_stats() self.db.close() # ───────────────────────────────────────────── # ENTRY POINT # ───────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Capital.com Transaction History Fetcher", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Modes: dryrun Show current DB state. No API calls beyond login. No writes. confirm Ask before starting bulk fetch. Use for first run. live Fully automatic. Use for incremental/scheduled runs. To start fresh (recommended after switching from old activity endpoint): rm ~/maxbot/data/history.db python3 fetch_history.py --mode confirm Database: data/history.db Log: logs/fetch_history.log """ ) parser.add_argument( "--mode", choices=["dryrun", "confirm", "live"], default="dryrun", ) args = parser.parse_args() setup_logging() HistoryFetcher(mode=args.mode).run() if __name__ == "__main__": main()