XGBoost ensemble strategy with volatility regime detection. Production-grade ML pipeline with proper train/test split and feature engineering.
This is a complete, production-ready ML strategy using ensemble learning with volatility regime detection. Multiple XGBoost models vote on predictions, with per-regime decision thresholds optimized via F1 score.
from typing import Any, Dict, Optional, Tuple
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.ensemble import VotingClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import f1_score
from rlxbt import Strategy, Backtester, rlx
class EnsembleRegimeMLStrategy(Strategy):
def __init__(
self, regime_threshold_method: str = "quartiles", horizon_bars: int = 60
):
super().__init__()
self.name = "Ensemble Regime ML Strategy"
self.description = "Multi-model ensemble with volatility regime detection"
self.regime_threshold_method = regime_threshold_method
self.regime_models: Dict[str, Any] = {}
self.horizon_bars = int(horizon_bars)
self.feature_columns = [
"ret_5m", "ret_15m", "price_volatility",
"bar_range_pct", "atr_h_pct", "ema5_slope_5",
"rsi_5", "vwap_dev_15",
]
self.regime_decision_thresholds: Dict[str, float] = {}
self.global_model = None
self.global_threshold = 0.5
self.is_trained = False
self.vol_thresholds = None
def create_small_ensemble_models(self):
"""Return a compact yet diverse set of base models."""
models = []
# Compact XGB variants with different hyperparams
xgb_configs = [
dict(n_estimators=600, learning_rate=0.05, max_depth=4,
subsample=0.9, colsample_bytree=0.9, random_state=1),
dict(n_estimators=800, learning_rate=0.03, max_depth=5,
subsample=0.85, colsample_bytree=0.85, random_state=2),
dict(n_estimators=400, learning_rate=0.08, max_depth=3,
subsample=1.0, colsample_bytree=0.8, random_state=3),
dict(n_estimators=700, learning_rate=0.04, max_depth=6,
subsample=0.9, colsample_bytree=1.0, random_state=4),
dict(n_estimators=500, learning_rate=0.06, max_depth=4,
subsample=0.95, colsample_bytree=0.9, random_state=5),
]
for i, cfg in enumerate(xgb_configs):
models.append((
f"xgb_{i}",
xgb.XGBClassifier(
**cfg, eval_metric="logloss", n_jobs=-1,
tree_method="hist", objective="binary:logistic",
),
))
# Add one very simple model for diversity
models.append(("gnb", GaussianNB()))
return models
def calculate_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate all features for ML model."""
features = pd.DataFrame(index=df.index)
close = df["close"]
high = df["high"]
low = df["low"]
vol = df.get("volume", pd.Series(index=df.index, dtype=float))
# Short-horizon returns
features["ret_5m"] = (close.shift(1) - close.shift(5)) / close.shift(5) * 100
features["ret_15m"] = (close.shift(1) - close.shift(15)) / close.shift(15) * 100
# Micro volatility (15 bars)
roll_std_15 = close.shift(1).rolling(window=15).std()
roll_mean_15 = close.shift(1).rolling(window=15).mean()
features["price_volatility"] = (roll_std_15 / roll_mean_15) * 100
# One-bar range as % of price
features["bar_range_pct"] = (high.shift(1) - low.shift(1)) / close.shift(1) * 100
# ATR over horizon_bars as % of price
tr1 = (high - low).abs()
tr2 = (high - close.shift(1)).abs()
tr3 = (low - close.shift(1)).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr_h = tr.rolling(window=self.horizon_bars).mean()
features["atr_h_pct"] = (atr_h / close) * 100
# EMA(5) slope over last 5 bars
ema5 = close.ewm(span=5, adjust=False).mean()
features["ema5_slope_5"] = (ema5.shift(1) - ema5.shift(6)) / ema5.shift(6) * 100
# RSI(5)
delta = close.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.rolling(window=5).mean()
avg_loss = loss.rolling(window=5).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
features["rsi_5"] = (100 - (100 / (1 + rs))).shift(1)
# VWAP deviation
if not vol.isna().all():
tp = (high + low + close) / 3.0
vol_15 = vol.rolling(window=15).sum()
tpv_15 = (tp * vol).rolling(window=15).sum()
vwap_15 = tpv_15 / vol_15
features["vwap_dev_15"] = (close.shift(1) - vwap_15.shift(1)) / vwap_15.shift(1) * 100
else:
features["vwap_dev_15"] = 0.0
# Target
features["close_next"] = close.shift(-self.horizon_bars)
features["return_next"] = (features["close_next"] - close) / close * 100
return featuresdef train_model(self, X_train: pd.DataFrame, y_train: pd.Series) -> Dict[str, Any]:
# Determine regimes by volatility quartiles
q1 = X_train["price_volatility"].quantile(0.25)
q2 = X_train["price_volatility"].quantile(0.50)
q3 = X_train["price_volatility"].quantile(0.75)
self.vol_thresholds = (q1, q2, q3)
def assign_regime(vol):
if vol <= q1: return "regime_1" # Very low vol
elif vol <= q2: return "regime_2" # Low vol
elif vol <= q3: return "regime_3" # High vol
else: return "regime_4" # Very high vol
train_regime = X_train["price_volatility"].apply(assign_regime)
# Train separate model for each regime
for regime in ["regime_1", "regime_2", "regime_3", "regime_4"]:
mask = train_regime == regime
Xr, yr = X_train[mask], y_train[mask]
if len(Xr) < 200: continue
# Create binary target
thr = yr.quantile(0.714) # Top ~29% returns
yr_bin = (yr > thr).astype(int)
# Train ensemble
trained_models = []
for name, model in self.create_small_ensemble_models():
model.fit(Xr, yr_bin)
trained_models.append((name, model))
voting = VotingClassifier(trained_models, voting="soft")
voting.fit(Xr, yr_bin)
# Optimize decision threshold via F1
val_probs = voting.predict_proba(Xr)[:, 1]
best_thr, best_f1 = 0.5, 0.0
for thr in np.linspace(0.4, 0.7, 16):
preds = (val_probs >= thr).astype(int)
f1 = f1_score(yr_bin, preds)
if f1 > best_f1:
best_f1, best_thr = f1, float(thr)
self.regime_models[regime] = voting
self.regime_decision_thresholds[regime] = best_thr
self.is_trained = Truedef main():
data_path = "data/BTCUSDT_1m_2025-04-30_2025-07-29.csv"
strategy = EnsembleRegimeMLStrategy(
regime_threshold_method="median",
horizon_bars=15, # Predict 15 bars ahead
)
strategy.run_complete_analysis(
data_path,
test_start_date="2025-06-01",
initial_capital=100_000,
)
# Output:
# Training models...
# 🚀 Running Ensemble Strategy Backtest...
#
# 📊 BACKTEST REPORT
# ===========================
# Total Return: 23.45%
# Sharpe Ratio: 1.87
# Max Drawdown: -8.23%
# Trades: 142
# ===========================
#
# Generating Dashboard...| Feature | Description | Window |
|---|---|---|
| ret_5m | 5-bar return percentage | 5 |
| ret_15m | 15-bar return percentage | 15 |
| price_volatility | Rolling std / mean (coefficient of variation) | 15 |
| bar_range_pct | High-Low range as % of close | 1 |
| atr_h_pct | ATR over horizon as % of price | 60 |
| ema5_slope_5 | EMA(5) slope over 5 bars | 5 |
| rsi_5 | Fast RSI indicator | 5 |
| vwap_dev_15 | Price deviation from 15-bar VWAP | 15 |
The modern way to implement ML in RLX. Uses the WalkForwardML trainer to automatically handle rolling training and testing windows.
from rlxbt.ml import BaseMLStrategy, WalkForwardML
from sklearn.ensemble import RandomForestRegressor
class MyStrategy(BaseMLStrategy):
def train(self, data):
X, y = self.prepare_features(data)
self.model.fit(X, y)
def generate_signals(self, data):
X, _ = self.prepare_features(data)
return self.model.predict(X)
# Run rolling retraining every 500 bars
trainer = WalkForwardML(backtester=bt)
results = trainer.run(
strategy_class=MyStrategy,
data=df,
train_size=2000,
test_size=500,
step_size=500
)