Files
maxbot/fetch_history.py
george 2be8b491d0 Initial commit — TSLA grid trading bot
- Grid strategy with survival-gated spacing and depth
- Full 60% drop simulation for all survival checks
- Calibration report with auto-updating survival threshold
- Transaction history sync from Capital.com
- Dip mode with bottom-two TP rules
2026-05-27 07:02:58 +01:00

492 lines
19 KiB
Python

#!/usr/bin/env python3
"""
fetch_history.py — Capital.com full transaction history fetcher.
================================================================
Fetches ALL account transactions from Capital.com and stores them
in a local SQLite database at data/history.db.
ENDPOINT USED:
--------------
GET /history/transactions
Much richer than /history/activity — returns actual GBP amounts,
deposits, trades, overnight fees, all in one place.
MODES:
------
python3 fetch_history.py --mode dryrun # preview only, no changes
python3 fetch_history.py --mode confirm # asks before starting bulk fetch
python3 fetch_history.py --mode live # fully automatic
FIRST RUN:
----------
Fetches from 28 Jan 2021 (account open date) to today.
~1,900 API calls at 1/second = ~32 minutes.
Progress shown every 10 days. Safe to interrupt — resumes from
last successful day on next run.
SUBSEQUENT RUNS:
----------------
Only fetches since last successful fetch date. Runs in seconds.
Called automatically from calibration report on every bot startup.
DATABASE: data/history.db
--------------------------
Table: transactions
id TEXT PRIMARY KEY (reference field from Capital.com)
date_utc TEXT (ISO datetime)
transaction_type TEXT (TRADE, SWAP, DEPOSIT, WITHDRAWAL,
TRADE_SLIPPAGE_PROTECTION)
note TEXT (e.g. "Trade closed", "Overnight fee")
instrument TEXT (e.g. "TSLA", "US100", empty for deposits)
size_gbp REAL (GBP amount — positive=credit, negative=cost)
currency TEXT (always GBP for this account)
status TEXT (PROCESSED)
raw TEXT (full JSON for future use)
Table: fetch_log
fetch_date TEXT PRIMARY KEY (YYYY-MM-DD)
fetched_at TEXT (when we fetched it)
record_count INTEGER (records found that day)
status TEXT (ok / error)
FINANCIAL LOGIC:
----------------
TRADE size → always positive (GBP profit from closed position)
SWAP size → always negative (overnight fee cost)
DEPOSIT size → always positive (money added to account)
WITHDRAWAL size → negative (money removed)
Net performance = sum(TRADE) + sum(SWAP)
Net deposited = sum(DEPOSIT) + sum(WITHDRAWAL)
Return on capital = net_performance / net_deposited * 100
FUTURE USE:
-----------
Calibration report → real profit/day, real fees, real deposits
ProQuant backtester → full trade history with entry/exit prices
Web dashboard → P&L charts, fee analysis, milestone tracking
"""
import argparse
import json
import logging
import os
import sqlite3
import sys
import time
from datetime import datetime, timedelta, date, timezone
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import config
from client import CapitalClient
# ─────────────────────────────────────────────
# CONSTANTS
# ─────────────────────────────────────────────
ACCOUNT_OPEN_DATE = date(2021, 1, 28)
DB_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "data", "history.db"
)
LOG_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "logs", "fetch_history.log"
)
FETCH_DELAY_SECS = 1.1 # slightly over 1s to respect rate limits
# ─────────────────────────────────────────────
# LOGGING
# ─────────────────────────────────────────────
def setup_logging():
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
fmt = "%(asctime)s [%(levelname)s] %(message)s"
logging.basicConfig(
level=logging.INFO,
format=fmt,
handlers=[
logging.FileHandler(LOG_PATH),
logging.StreamHandler(sys.stdout),
]
)
log = logging.getLogger("fetch_history")
# ─────────────────────────────────────────────
# DATABASE
# ─────────────────────────────────────────────
class HistoryDB:
def __init__(self, db_path: str):
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self._create_tables()
def _create_tables(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS transactions (
id TEXT PRIMARY KEY,
date_utc TEXT NOT NULL,
transaction_type TEXT,
note TEXT,
instrument TEXT,
size_gbp REAL,
currency TEXT,
status TEXT,
raw TEXT
);
CREATE INDEX IF NOT EXISTS idx_tx_date
ON transactions (date_utc);
CREATE INDEX IF NOT EXISTS idx_tx_type
ON transactions (transaction_type);
CREATE INDEX IF NOT EXISTS idx_tx_instrument
ON transactions (instrument);
CREATE TABLE IF NOT EXISTS fetch_log (
fetch_date TEXT PRIMARY KEY,
fetched_at TEXT NOT NULL,
record_count INTEGER DEFAULT 0,
status TEXT DEFAULT 'ok'
);
""")
self.conn.commit()
def date_already_fetched(self, fetch_date: date) -> bool:
row = self.conn.execute(
"SELECT status FROM fetch_log WHERE fetch_date = ? AND status = 'ok'",
(fetch_date.isoformat(),)
).fetchone()
return row is not None
def get_last_fetched_date(self):
row = self.conn.execute(
"SELECT MAX(fetch_date) as d FROM fetch_log WHERE status = 'ok'"
).fetchone()
if row and row["d"]:
return date.fromisoformat(row["d"])
return None
def save_transactions(self, transactions: list) -> int:
"""Insert transaction records. Returns number actually inserted."""
inserted = 0
for t in transactions:
try:
ref = t.get("reference", "")
# Build a unique ID from reference + date to avoid collisions
tx_id = f"{ref}_{t.get('dateUtc', t.get('date', ''))}"
self.conn.execute("""
INSERT OR IGNORE INTO transactions
(id, date_utc, transaction_type, note, instrument,
size_gbp, currency, status, raw)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
tx_id,
t.get("dateUtc", t.get("date", "")),
t.get("transactionType", ""),
t.get("note", ""),
t.get("instrumentName", ""),
float(t.get("size", 0)),
t.get("currency", "GBP"),
t.get("status", ""),
json.dumps(t),
))
if self.conn.execute("SELECT changes()").fetchone()[0] > 0:
inserted += 1
except Exception as e:
log.warning(f"Could not insert transaction: {e}{t}")
self.conn.commit()
return inserted
def log_fetch(self, fetch_date: date, record_count: int, status: str = "ok"):
self.conn.execute("""
INSERT OR REPLACE INTO fetch_log (fetch_date, fetched_at, record_count, status)
VALUES (?, ?, ?, ?)
""", (
fetch_date.isoformat(),
datetime.now(tz=timezone.utc).isoformat(),
record_count,
status,
))
self.conn.commit()
def get_stats(self) -> dict:
"""Return summary statistics from the database."""
def scalar(sql, default=0):
row = self.conn.execute(sql).fetchone()
return row[0] if row and row[0] is not None else default
total = scalar("SELECT COUNT(*) FROM transactions")
trades = scalar("SELECT COUNT(*) FROM transactions WHERE transaction_type='TRADE'")
swaps = scalar("SELECT COUNT(*) FROM transactions WHERE transaction_type='SWAP'")
deposits = scalar("SELECT COUNT(*) FROM transactions WHERE transaction_type='DEPOSIT'")
other = total - trades - swaps - deposits
fetched_days = scalar("SELECT COUNT(*) FROM fetch_log WHERE status='ok'")
first_date = scalar("SELECT MIN(date_utc) FROM transactions", None)
last_date = scalar("SELECT MAX(date_utc) FROM transactions", None)
trade_profit = scalar("SELECT SUM(size_gbp) FROM transactions WHERE transaction_type='TRADE'")
swap_fees = scalar("SELECT SUM(size_gbp) FROM transactions WHERE transaction_type='SWAP'")
total_deposits = scalar("SELECT SUM(size_gbp) FROM transactions WHERE transaction_type='DEPOSIT'")
slippage = scalar("SELECT SUM(size_gbp) FROM transactions WHERE transaction_type='TRADE_SLIPPAGE_PROTECTION'")
# Per-instrument breakdown
instruments = {}
for row in self.conn.execute("""
SELECT instrument, COUNT(*) as c, SUM(size_gbp) as total
FROM transactions
WHERE transaction_type='TRADE' AND instrument != ''
GROUP BY instrument
ORDER BY total DESC
"""):
instruments[row["instrument"]] = {
"trades": row["c"],
"profit": row["total"],
}
# Daily profit average (only days with trades)
daily_avg_row = self.conn.execute("""
SELECT AVG(daily_total) FROM (
SELECT DATE(date_utc) as d, SUM(size_gbp) as daily_total
FROM transactions
WHERE transaction_type='TRADE'
GROUP BY DATE(date_utc)
HAVING daily_total > 0
)
""").fetchone()
daily_avg = daily_avg_row[0] if daily_avg_row and daily_avg_row[0] else 0
return {
"total_records": total,
"trades": trades,
"swaps": swaps,
"deposits": deposits,
"other": other,
"fetched_days": fetched_days,
"first_date": first_date,
"last_date": last_date,
"trade_profit": trade_profit,
"swap_fees": swap_fees,
"total_deposits": total_deposits,
"slippage_refunds": slippage,
"net_performance": trade_profit + swap_fees + slippage,
"instruments": instruments,
"daily_avg_profit": daily_avg,
}
def close(self):
self.conn.close()
# ─────────────────────────────────────────────
# FETCHER
# ─────────────────────────────────────────────
class HistoryFetcher:
def __init__(self, mode: str):
self.mode = mode
self.client = CapitalClient()
self.db = HistoryDB(DB_PATH)
def fetch_day(self, fetch_date: date):
"""
Fetch all transactions for a single day.
Returns list of records, or None on API error.
Capital.com max date range = 1 day.
"""
from_dt = f"{fetch_date.isoformat()}T00:00:00"
to_dt = f"{fetch_date.isoformat()}T23:59:59"
try:
data = self.client._request(
"GET",
f"/history/transactions?from={from_dt}&to={to_dt}"
)
return data.get("transactions", [])
except Exception as e:
log.error(f"Failed to fetch {fetch_date}: {e}")
return None
def print_stats(self):
"""Print current database statistics."""
stats = self.db.get_stats()
log.info("")
log.info(" DATABASE SUMMARY")
log.info(f" Total records: {stats['total_records']}")
log.info(f" Closed trades: {stats['trades']}")
log.info(f" Overnight fees: {stats['swaps']}")
log.info(f" Deposits: {stats['deposits']}")
log.info(f" Other: {stats['other']}")
log.info(f" Days fetched: {stats['fetched_days']}")
if stats['first_date']:
log.info(f" Date range: {stats['first_date'][:10]}{stats['last_date'][:10]}")
log.info("")
log.info(" FINANCIAL SUMMARY")
log.info(f" Total deposited: £{stats['total_deposits']:.2f}")
log.info(f" Trade profit: £{stats['trade_profit']:.2f}")
log.info(f" Overnight fees: £{stats['swap_fees']:.2f}")
log.info(f" Slippage refunds: £{stats['slippage_refunds']:.2f}")
log.info(f" Net performance: £{stats['net_performance']:.2f}")
if stats['total_deposits'] > 0:
roi = stats['net_performance'] / stats['total_deposits'] * 100
log.info(f" Return on capital: {roi:.1f}%")
if stats['daily_avg_profit'] > 0:
log.info(f" Avg profit/day: £{stats['daily_avg_profit']:.2f} (days with trades)")
log.info("")
log.info(" BY INSTRUMENT")
for instrument, data in stats['instruments'].items():
log.info(f" {instrument:12} {data['trades']:4} trades £{data['profit']:.2f}")
def run(self):
log.info("=" * 60)
log.info(f" History Fetcher — {self.mode.upper()} MODE")
log.info(f" Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
log.info("=" * 60)
log.info("Connecting to Capital.com...")
try:
self.client.create_session()
except Exception as e:
log.error(f"Connection failed: {e}")
sys.exit(1)
log.info("Connected.")
# Determine start date
today = date.today()
last = self.db.get_last_fetched_date()
if last:
start = last + timedelta(days=1)
log.info(f"Resuming from {start} (last fetched: {last})")
else:
start = ACCOUNT_OPEN_DATE
log.info(f"First run — fetching from {start}")
total_days = max((today - start).days + 1, 0)
estimated_mins = total_days * FETCH_DELAY_SECS / 60
# Show current DB state
existing_stats = self.db.get_stats()
if existing_stats["total_records"] > 0:
log.info(
f"Database has {existing_stats['total_records']} records "
f"({existing_stats['fetched_days']} days already fetched)"
)
log.info(f"Days to fetch: {total_days}")
log.info(f"Est. time: {estimated_mins:.0f} minutes")
if self.mode == "dryrun":
log.info("\n[DRY RUN] No data will be fetched or stored.")
self.print_stats()
self.db.close()
return
if total_days == 0:
log.info("Already up to date.")
self.print_stats()
self.db.close()
return
# Confirm in confirm mode
if self.mode == "confirm":
print(f"\n Fetch {total_days} days from {start} to {today}")
print(f" Estimated time: {estimated_mins:.0f} minutes")
answer = input(" Start? (y/n): ").strip().lower()
if answer != "y":
log.info("Aborted.")
self.db.close()
return
# ── Main fetch loop ──
current = start
total_inserted = 0
days_done = 0
days_empty = 0
log.info("\nFetching...")
while current <= today:
# Skip already fetched
if self.db.date_already_fetched(current):
current += timedelta(days=1)
continue
# Progress every 10 days
if days_done % 10 == 0 and days_done > 0:
pct = (days_done / max(total_days, 1)) * 100
log.info(
f" {days_done}/{total_days} days ({pct:.0f}%) — "
f"{total_inserted} records inserted"
)
transactions = self.fetch_day(current)
if transactions is None:
self.db.log_fetch(current, 0, "error")
current += timedelta(days=1)
days_done += 1
time.sleep(FETCH_DELAY_SECS)
continue
count = len(transactions)
if count > 0:
inserted = self.db.save_transactions(transactions)
total_inserted += inserted
else:
days_empty += 1
self.db.log_fetch(current, count)
days_done += 1
current += timedelta(days=1)
time.sleep(FETCH_DELAY_SECS)
# ── Final summary ──
log.info("\n" + "=" * 60)
log.info(" FETCH COMPLETE")
log.info("=" * 60)
log.info(f" Days fetched: {days_done}")
log.info(f" Days empty: {days_empty}")
log.info(f" Records added: {total_inserted}")
self.print_stats()
self.db.close()
# ─────────────────────────────────────────────
# ENTRY POINT
# ─────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Capital.com Transaction History Fetcher",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Modes:
dryrun Show current DB state. No API calls beyond login. No writes.
confirm Ask before starting bulk fetch. Use for first run.
live Fully automatic. Use for incremental/scheduled runs.
To start fresh (recommended after switching from old activity endpoint):
rm ~/maxbot/data/history.db
python3 fetch_history.py --mode confirm
Database: data/history.db
Log: logs/fetch_history.log
"""
)
parser.add_argument(
"--mode",
choices=["dryrun", "confirm", "live"],
default="dryrun",
)
args = parser.parse_args()
setup_logging()
HistoryFetcher(mode=args.mode).run()
if __name__ == "__main__":
main()