Files

334 lines
13 KiB
Python

"""
grid.py — Core grid management logic.
======================================
This is the brain of the bot. Every loop it:
1. Manages the bottom two open positions (TP rules)
2. Audits the order grid for gaps and fills them
3. Cancels orders that fall outside the active window
THE GAP-FILLING FIX (key improvement over v1):
-----------------------------------------------
Old behaviour: bot anchored from lowest existing order and added to bottom.
Problem: if top order filled or was cancelled, gap was never filled at top.
Result: grid drifted downward over time, missing fills near current price.
New behaviour:
1. Calculate ALL expected grid levels from top (below lowest position)
down to bottom (survival floor).
2. Compare expected levels against ALL existing orders.
3. Place orders for any missing level, anywhere in the window.
4. Result: gaps are filled wherever they occur, not just at the bottom.
BOTTOM TWO POSITION RULE:
--------------------------
Lowest open position → TP DISABLED (hold until manual close)
Second lowest → TP ENABLED at standard level
Rationale: the lowest position represents a deep dip buy.
We don't know when price will recover to take profit from there.
Holding it open means maximum profit on recovery.
When a new even-lower position opens, the previous lowest gets its TP
re-enabled because recovery to that level is now more likely.
ORDER WINDOW:
-------------
Top of window = just below lowest open position entry price
Bottom of window = highest_open * (1 - survival_drop_pct / 100)
Orders outside this window are cancelled.
Orders above window_top are too close to open positions (confusing).
Orders below floor are beyond our risk tolerance.
"""
import logging
import time
from state import BotState, pos_level, pos_size, pos_id, pos_tp, ord_level, ord_id
from calculator import GridCalculator
from actions import ActionHandler
import config
log = logging.getLogger("maxbot.grid")
class GridManager:
def __init__(self, state: BotState, calculator: GridCalculator,
actions: ActionHandler):
self.state = state
self.calculator = calc = calculator
self.actions = actions
# Track deal IDs we have already attempted to disable TP on.
# Capital.com may not support removing profitLevel via API,
# so we avoid spamming the same call every loop.
self._tp_disabled_ids: set = set()
# ─────────────────────────────────────────
# BOTTOM TWO POSITIONS
# ─────────────────────────────────────────
def manage_bottom_two_positions(self):
"""
Enforce TP rules on the two lowest open positions.
Only activates when price has dropped 25%+ from highest open position.
RATIONALE:
If price is near the top (within 25% of highest open), positions are
likely to recover quickly and the standard TP will close them normally.
The lowest/second-lowest TP rule is only meaningful when price has
dropped significantly and we want to hold the lowest position for
maximum recovery rather than a small TP.
"""
s = self.state
sorted_pos = s.positions_sorted_asc()
if not sorted_pos:
log.info("No open positions.")
return
# ── Check 25% drop threshold ──
threshold_price = s.highest_open_price * 0.75
if s.current_price > threshold_price:
log.info(
f"Price ${s.current_price:.2f} is within 25% of highest open "
f"${s.highest_open_price:.2f} (threshold ${threshold_price:.2f}) "
f"— TP rules not active."
)
return
# ── Lowest position: TP must be DISABLED ──
lowest = sorted_pos[0]
l_id = pos_id(lowest)
l_price = pos_level(lowest)
l_tp = pos_tp(lowest)
if l_id in self._tp_disabled_ids:
log.info(f"Lowest position ${l_price:.2f} — TP disable already attempted. ✓")
elif l_tp is not None:
log.info(
f"Lowest position ${l_price:.2f} has TP ${l_tp:.2f} — disabling."
)
self.actions.disable_tp(l_id, l_price)
self._tp_disabled_ids.add(l_id)
else:
log.info(f"Lowest position ${l_price:.2f} — TP disabled. ✓")
self._tp_disabled_ids.add(l_id)
# Clear IDs that are no longer the lowest position
# (position closed or new lower opened)
current_ids = {pos_id(p) for p in sorted_pos}
self._tp_disabled_ids &= current_ids
# ── Second lowest position: TP must be ENABLED ──
if len(sorted_pos) < 2:
return
second = sorted_pos[1]
s_id = pos_id(second)
s_price = pos_level(second)
s_tp = pos_tp(second)
s_size = pos_size(second)
expected_tp = self.calculator.get_tp_price(s_price, s_size, s.gbpusd)
if s_tp is None:
log.info(
f"Second lowest ${s_price:.2f} has no TP — "
f"enabling at ${expected_tp:.2f}."
)
self.actions.enable_tp(s_id, s_price, expected_tp)
else:
log.info(
f"Second lowest ${s_price:.2f} — TP at ${s_tp:.2f}. ✓"
)
# ─────────────────────────────────────────
# ORDER GRID MANAGEMENT
# ─────────────────────────────────────────
def manage_orders(self):
"""
Main order management function. Called every loop.
NORMAL MODE (price within 25% of highest open):
- Anchor = lowest open position
- Grid fills below anchor down to floor
- depth orders maintained
DIP MODE (price dropped 25%+ from highest open):
- Lowest open position has TP removed (manual hold) — ignored as anchor
- Anchor = second lowest open position (lowest with TP)
- Grid fills below anchor down to floor
- ALSO fills gaps between no-TP position and anchor if spacing fits
- No orders placed at or above anchor
- No orders placed at or above the no-TP position level
Grid floor is always highest_open * 0.40 (60% drop), fixed.
"""
s = self.state
calc = self.calculator
equity = s.equity
# Get survival-gated spacing and depth
# Spacing first, then depth (depth requires spacing already tightened)
spacing_pct = calc.get_spacing_pct(
equity, s.highest_open_price, s.all_levels(), s.gbpusd
)
depth = calc.get_queue_depth(
equity, spacing_pct, s.highest_open_price,
s.all_levels(), s.gbpusd
)
GRID_FLOOR_PCT = 60.0
floor = s.highest_open_price * (1.0 - GRID_FLOOR_PCT / 100.0)
sorted_pos = s.positions_sorted_asc()
open_prices = [pos_level(p) for p in sorted_pos]
# ── Determine mode and anchor ──
dip_threshold = s.highest_open_price * 0.75 # 25% drop
in_dip_mode = s.current_price <= dip_threshold
no_tp_price = None # price of the no-TP (manual hold) position
if not open_prices:
anchor = s.current_price
log.info(f"No open positions — anchoring to current price ${anchor:.2f}")
elif in_dip_mode and len(sorted_pos) >= 2:
# Dip mode: anchor to second lowest (lowest with TP)
no_tp_price = pos_level(sorted_pos[0]) # lowest — no TP
anchor = pos_level(sorted_pos[1]) # second lowest — has TP
log.info(
f"DIP MODE — anchor: ${anchor:.2f} (second lowest, has TP) | "
f"no-TP position: ${no_tp_price:.2f} (manual hold)"
)
else:
# Normal mode: anchor to lowest open position
anchor = min(open_prices)
log.info(f"NORMAL MODE — anchor: ${anchor:.2f} (lowest open position)")
log.info(
f"Floor: ${floor:.2f} | depth: {depth} | "
f"spacing: {spacing_pct}% (survival-gated)"
)
# ── Generate target slots ──
# Main grid: below anchor down to floor
main_levels = calc.generate_grid(anchor, floor, equity, spacing_pct)
if not main_levels:
log.info("No grid levels fit within current window.")
return
# In dip mode: also generate levels in the gap between
# no-TP position and anchor (if gap is large enough)
gap_levels = []
if in_dip_mode and no_tp_price is not None:
spacing = spacing_pct / 100.0
# Generate levels from just below anchor down to just above no-TP
level = anchor * (1.0 - spacing)
while level > no_tp_price * (1.0 + spacing):
gap_levels.append(round(level, 2))
level = level * (1.0 - spacing)
if gap_levels:
log.info(
f"Gap levels between ${no_tp_price:.2f} and "
f"${anchor:.2f}: {[f'${l:.2f}' for l in gap_levels]}"
)
# Combined target: gap levels + main grid, capped at depth
all_target_levels = (gap_levels + main_levels)
target_slots = all_target_levels[:depth]
if target_slots:
log.info(
f"Target slots: ${target_slots[-1]:.2f} → ${target_slots[0]:.2f}"
)
# ── Scan existing orders — cancel any not matching targets ──
current_orders = {round(ord_level(o), 2): o for o in s.orders}
existing_prices = set(current_orders.keys())
orders_to_cancel = []
for price in sorted(existing_prices):
# Never keep orders at or above anchor
if price >= anchor:
orders_to_cancel.append(price)
continue
# In dip mode: never keep orders at or above no-TP position
if no_tp_price and price >= no_tp_price:
orders_to_cancel.append(price)
continue
# Cancel if not matching any target slot
matches = any(
abs(price - slot) / slot < 0.005
for slot in target_slots
)
if not matches:
orders_to_cancel.append(price)
if orders_to_cancel:
log.info(
f"Cancelling {len(orders_to_cancel)} order(s) "
f"not matching grid: "
f"{[f'${p:.2f}' for p in orders_to_cancel]}"
)
for price in orders_to_cancel:
oid = ord_id(current_orders[price])
self.actions.cancel_order(oid, price)
existing_prices.discard(price)
time.sleep(0.15)
# ── Find and fill gaps ──
missing = calc.find_missing_levels(
target_slots, existing_prices, depth
)
if not missing:
log.info(
f"Grid healthy — {len(existing_prices)}/{depth} "
f"orders in place."
)
return
log.info(
f"Filling {len(missing)} gap(s): "
f"{[f'${p:.2f}' for p in missing]}"
)
# Run survival check ONCE before placing any orders.
# No need to repeat for each order — the grid doesn't change
# between placements in the same loop.
full_sim = calc._simulate_grid(
s.highest_open_price, spacing_pct, s.gbpusd, s.all_levels()
)
survival = calc.survival_check(
s.equity, s.highest_open_price, full_sim, s.gbpusd,
override_pct=60.0
)
if not survival["safe"]:
log.warning(
f"Survival check failed — no orders placed. "
f"Margin at floor: {survival['margin_level_at_floor_pct']:.1f}%"
)
return
for target_price in missing:
# Safety: never place at or above current price
if target_price >= s.current_price:
log.info(f"Skipping ${target_price:.2f} — at/above current price.")
continue
# Safety: never place at or above anchor
if target_price >= anchor:
log.info(f"Skipping ${target_price:.2f} — at/above anchor ${anchor:.2f}.")
continue
# Safety: never place at or above no-TP position in dip mode
if no_tp_price and target_price >= no_tp_price:
log.info(f"Skipping ${target_price:.2f} — at/above no-TP position.")
continue
size = calc.get_size(target_price, s.highest_open_price)
tp_price = calc.get_tp_price(target_price, size, s.gbpusd)
self.actions.place_limit_order(size, target_price, tp_price)
existing_prices.add(target_price)
time.sleep(0.15)