Files
quant-trader-service/training/tests/test_state_continue_experiment.py

208 lines
9.2 KiB
Python

from __future__ import annotations
import sys
import tempfile
import unittest
from pathlib import Path
import numpy as np
import pandas as pd
TRAINING_ROOT = Path(__file__).resolve().parents[1]
if str(TRAINING_ROOT) not in sys.path:
sys.path.insert(0, str(TRAINING_ROOT))
from trader_training.onnx_export import LinearHead, export_heads
from trader_training.schemas import FEATURE_ORDER
from trader_training.state_continue_experiment import STATE_FEATURES, _predict_frozen_linear_model, _state_rows_for_age, _train_side_models, _verdict
class StateContinueExperimentTest(unittest.TestCase):
def test_state_rows_include_required_position_path_and_side_market_features(self) -> None:
row = {
"current_sample_id": "s0",
"symbol": "BTC-USDT-PERP",
"current_event_time": pd.Timestamp("2026-01-01T00:05:00Z"),
"current_open_time_ms": 300_000,
"side": "LONG",
"split_id": "fit_inner",
"walk_forward_fold": 0,
"time_in_position_minutes": 5,
"entry_price": 100.0,
"current_price": 100.1,
"high_since_entry": 100.2,
"low_since_entry": 99.95,
"future_return_bps": 12.0,
"gross_edge_bps": 12.0,
"mae_bps": 20.0,
"stop_hit": 0,
"entry_predicted_edge_bps": 8.5,
"entry_direction_prob": 0.64,
"add_count": 0.0,
"minutes_since_last_add": 9999.0,
}
for feature_name in FEATURE_ORDER:
row[feature_name] = 0.0
row["ret_1m_bps"] = 2.0
row["ret_5m_bps"] = 3.0
row["taker_imbalance_1m"] = 0.1
row["taker_imbalance_5m"] = 0.2
row["book_microprice_basis_bps"] = 4.0
row["book_pressure_taker_1m"] = 5.0
row["book_pressure_taker_5m"] = 6.0
frame = pd.DataFrame([row])
out = _state_rows_for_age(frame, stop_bps=8.0, target_bps=12.0, cost_bps=6.5)
self.assertEqual(set(STATE_FEATURES), set(STATE_FEATURES).intersection(out.columns))
self.assertAlmostEqual(5.5, float(out.iloc[0]["expected_continue_edge_bps"]))
self.assertEqual(1, int(out.iloc[0]["continue_target"]))
self.assertAlmostEqual(8.5, float(out.iloc[0]["entry_predicted_edge_bps"]))
self.assertAlmostEqual(0.64, float(out.iloc[0]["entry_direction_prob"]), places=6)
self.assertAlmostEqual(16.5, float(out.iloc[0]["giveback_from_mfe_bps"]), places=4)
self.assertAlmostEqual(8.5025, float(out.iloc[0]["recovery_from_mae_bps"]), places=4)
self.assertGreater(float(out.iloc[0]["path_efficiency"]), 0.13)
self.assertGreater(float(out.iloc[0]["mfe_mae_ratio"]), 3.3)
self.assertAlmostEqual(2.0, float(out.iloc[0]["side_ret_1m_bps"]))
self.assertAlmostEqual(3.0, float(out.iloc[0]["side_ret_5m_bps"]))
self.assertAlmostEqual(0.1, float(out.iloc[0]["side_taker_imbalance_1m"]), places=6)
self.assertAlmostEqual(0.2, float(out.iloc[0]["side_taker_imbalance_5m"]), places=6)
self.assertAlmostEqual(4.0, float(out.iloc[0]["side_book_microprice_basis_bps"]))
self.assertAlmostEqual(5.0, float(out.iloc[0]["side_book_pressure_taker_1m"]))
self.assertAlmostEqual(6.0, float(out.iloc[0]["side_book_pressure_taker_5m"]))
self.assertAlmostEqual(0.0, float(out.iloc[0]["add_count"]))
self.assertAlmostEqual(9999.0, float(out.iloc[0]["minutes_since_last_add"]))
def test_frozen_linear_onnx_weights_are_read_without_row_by_row_runtime(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
model_path = Path(tmp) / "direction.onnx"
export_heads(
model_path,
[
LinearHead(
"direction",
"softmax",
np.zeros((len(FEATURE_ORDER), 3), dtype=np.float32),
np.array([0.0, 1.0, 2.0], dtype=np.float32),
),
LinearHead(
"long_expected_net_edge_bps",
"identity",
np.zeros((len(FEATURE_ORDER), 1), dtype=np.float32),
np.array([7.25], dtype=np.float32),
),
],
feature_count=len(FEATURE_ORDER),
)
frame = pd.DataFrame({"sample_id": ["s0", "s1"]})
for feature_name in FEATURE_ORDER:
frame[feature_name] = 0.0
out = _predict_frozen_linear_model(
model_path,
frame,
{
"direction": ("softmax", ("long_prob", "short_prob", "neutral_prob")),
"long_expected_net_edge_bps": ("identity", ("long_edge",)),
},
)
self.assertEqual(["s0", "s1"], out["sample_id"].tolist())
self.assertTrue(np.allclose(1.0, out[["long_prob", "short_prob", "neutral_prob"]].sum(axis=1)))
self.assertLess(float(out.iloc[0]["long_prob"]), float(out.iloc[0]["neutral_prob"]))
self.assertAlmostEqual(7.25, float(out.iloc[0]["long_edge"]), places=6)
def test_verdict_refuses_state_continue_when_edge_mae_is_not_good_enough(self) -> None:
results = {}
for side in ("long", "short"):
results[f"{side}_market_only"] = {
"validation_locked": {"continue_auc": 0.61, "edge_mae_vs_constant_ratio": 0.985},
"latest_stress": {"continue_auc": 0.62, "edge_mae_vs_constant_ratio": 0.984},
"regressor_converged": True,
}
results[f"{side}_market_plus_state"] = {
"validation_locked": {"continue_auc": 0.63, "edge_mae_vs_constant_ratio": 0.979},
"latest_stress": {"continue_auc": 0.64, "edge_mae_vs_constant_ratio": 0.978},
"regressor_converged": True,
}
verdict = _verdict(results)
self.assertEqual("NOT_READY_FOR_FORMAL_CHAIN", verdict["status"])
self.assertTrue(any("above 0.97" in reason for reason in verdict["reasons"]))
def test_verdict_reports_when_state_features_do_not_beat_market_only(self) -> None:
results = {}
for side in ("long", "short"):
results[f"{side}_market_only"] = {
"validation_locked": {"continue_auc": 0.64, "edge_mae_vs_constant_ratio": 0.965},
"latest_stress": {"continue_auc": 0.65, "edge_mae_vs_constant_ratio": 0.964},
"regressor_converged": True,
}
results[f"{side}_market_plus_state"] = {
"validation_locked": {"continue_auc": 0.63, "edge_mae_vs_constant_ratio": 0.975},
"latest_stress": {"continue_auc": 0.66, "edge_mae_vs_constant_ratio": 0.963},
"regressor_converged": True,
}
verdict = _verdict(results)
self.assertEqual("NOT_READY_FOR_FORMAL_CHAIN", verdict["status"])
self.assertTrue(any("continue_auc not better than market_only" in reason for reason in verdict["reasons"]))
self.assertTrue(any("edge_mae_vs_constant_ratio not better than market_only" in reason for reason in verdict["reasons"]))
def test_train_side_models_supports_ridge_regressor_diagnostic(self) -> None:
rows = []
for split_id in ("fit_inner", "tune_inner", "validation_locked", "latest_stress"):
for index, target in enumerate((0, 1)):
row = {
"sample_id": f"{split_id}-{index}",
"symbol": "BTC-USDT-PERP",
"event_time": pd.Timestamp("2026-01-01T00:00:00Z") + pd.Timedelta(minutes=len(rows)),
"split_id": split_id,
"position_side": "LONG",
"continue_target": target,
"expected_continue_edge_bps": -3.0 if target == 0 else 6.0,
}
for feature_name in FEATURE_ORDER:
row[feature_name] = float(index)
for feature_name in STATE_FEATURES:
row[feature_name] = float(index)
rows.append(row)
frame = pd.DataFrame(rows)
metrics, predictions = _train_side_models(
frame,
"LONG",
[*FEATURE_ORDER, *STATE_FEATURES],
regressor_kind="ridge",
ridge_alpha=1.0,
regression_target_clip_bps=5.0,
)
self.assertEqual("ridge", metrics["regressor_kind"])
self.assertEqual(5.0, metrics["regression_target_clip_bps"])
self.assertTrue(metrics["regressor_converged"])
self.assertEqual(8, len(predictions))
self.assertIn("time_in_position_minutes", predictions.columns)
huber_metrics, _ = _train_side_models(
frame,
"LONG",
[*FEATURE_ORDER, *STATE_FEATURES],
regressor_kind="huber",
huber_alpha=0.002,
huber_epsilon=1.10,
huber_max_iter=100,
regression_target_clip_bps=4.0,
)
self.assertEqual("huber", huber_metrics["regressor_kind"])
self.assertEqual(0.002, huber_metrics["huber_alpha"])
self.assertEqual(1.10, huber_metrics["huber_epsilon"])
self.assertEqual(4.0, huber_metrics["regression_target_clip_bps"])
if __name__ == "__main__":
unittest.main()