From 2a86a6e2fa202dfa71d1f5989e3a041137411c2e Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 28 Jun 2026 07:26:59 +0800 Subject: [PATCH] Use actual plan edge for Entry PM training --- training/tests/test_risk_pm_fix.py | 86 ++++++++++++++++--- training/tests/test_training_contract.py | 7 ++ training/trader_training/diagnostics.py | 24 +++--- .../trader_training/ofi_feature_experiment.py | 41 +++------ training/trader_training/pm.py | 23 ++--- training/trader_training/training.py | 11 ++- 6 files changed, 129 insertions(+), 63 deletions(-) diff --git a/training/tests/test_risk_pm_fix.py b/training/tests/test_risk_pm_fix.py index 2544222..f95a370 100644 --- a/training/tests/test_risk_pm_fix.py +++ b/training/tests/test_risk_pm_fix.py @@ -1,6 +1,7 @@ from __future__ import annotations import sys +import tempfile import unittest from pathlib import Path @@ -12,7 +13,7 @@ if str(TRAINING_ROOT) not in sys.path: sys.path.insert(0, str(TRAINING_ROOT)) from trader_training.labels import DEFAULT_LABEL_CONFIG, _path_stats_for_group -from trader_training.pm import _probability_implied_edge, _simulate_open_trades, _threshold_candidates, default_pm_config +from trader_training.pm import _pm_frame, _probability_implied_edge, _simulate_open_trades, _threshold_candidates, default_pm_config class RiskPmFixTest(unittest.TestCase): @@ -44,13 +45,13 @@ class RiskPmFixTest(unittest.TestCase): self.assertEqual(80.0, DEFAULT_LABEL_CONFIG["risk"]["spike_bps"]) self.assertEqual(1.8, DEFAULT_LABEL_CONFIG["risk"]["vol_expansion_ratio"]) - def test_pm_search_covers_low_entry_probability_without_allowing_negative_edge(self) -> None: + def test_pm_search_uses_strict_entry_probability_and_positive_edge(self) -> None: candidates = _threshold_candidates() self.assertTrue(candidates) - self.assertLessEqual(max(item["max_market_risk_prob"] for item in candidates), 0.98) - self.assertLessEqual(min(item["min_entry_prob"] for item in candidates), 0.03) - self.assertGreaterEqual(min(item["min_expected_edge_bps"] for item in candidates), 0.0) + self.assertLessEqual(max(item["max_market_risk_prob"] for item in candidates), 0.65) + self.assertGreaterEqual(min(item["min_entry_prob"] for item in candidates), 0.30) + self.assertGreaterEqual(min(item["min_expected_edge_bps"] for item in candidates), 3.0) def test_probability_implied_edge_uses_price_plan_payoff(self) -> None: edge = _probability_implied_edge( @@ -61,6 +62,71 @@ class RiskPmFixTest(unittest.TestCase): self.assertAlmostEqual(3.7, float(edge.iloc[0]), places=6) self.assertAlmostEqual(52.5, float(edge.iloc[1]), places=6) + def test_pm_frame_reads_actual_plan_edge_not_old_opportunity_edge(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + (root / "model" / "direction").mkdir(parents=True) + (root / "model" / "entry").mkdir(parents=True) + (root / "model" / "risk").mkdir(parents=True) + (root / "dataset").mkdir(parents=True) + (root / "label").mkdir(parents=True) + common = { + "sample_id": ["s0"], + "symbol": ["BTC-USDT-PERP"], + "event_time": pd.to_datetime(["2026-01-01T00:00:00Z"]), + "split_id": ["tune_inner"], + } + pd.DataFrame({**common, "long_prob": [0.70], "short_prob": [0.10], "neutral_prob": [0.20]}).to_parquet( + root / "model" / "direction" / "tune_predictions.parquet", + index=False, + ) + pd.DataFrame( + { + **common, + "long_entry_prob": [0.80], + "short_entry_prob": [0.20], + "long_expected_net_edge_bps": [12.0], + "short_expected_net_edge_bps": [1.0], + } + ).to_parquet(root / "model" / "entry" / "tune_predictions.parquet", index=False) + pd.DataFrame( + { + **common, + "market_risk_prob": [0.20], + "long_position_risk_prob": [0.10], + "short_position_risk_prob": [0.10], + } + ).to_parquet(root / "model" / "risk" / "tune_predictions.parquet", index=False) + pd.DataFrame( + { + "sample_id": ["s0"], + "long_entry_target": [0], + "short_entry_target": [0], + "long_expected_net_edge_bps": [99.0], + "short_expected_net_edge_bps": [88.0], + "long_actual_plan_net_edge_bps": [-6.5], + "short_actual_plan_net_edge_bps": [-6.5], + } + ).to_parquet(root / "dataset" / "entry_train.parquet", index=False) + pd.DataFrame( + { + "sample_id": ["s0", "s0"], + "side": ["LONG", "SHORT"], + "gross_edge_bps": [0.0, 0.0], + "cost_bps": [6.5, 6.5], + "target_hit": [0, 0], + "stop_hit": [0, 0], + "time_to_target_ms": [-1, -1], + "time_to_stop_ms": [-1, -1], + "time_to_exit_ms": [2_700_000, 2_700_000], + } + ).to_parquet(root / "label" / "entry_labels.parquet", index=False) + + frame = _pm_frame(root, "tune_inner") + + self.assertAlmostEqual(-6.5, float(frame.loc[0, "actual_long_plan_edge_bps"])) + self.assertAlmostEqual(-6.5, float(frame.loc[0, "actual_short_plan_edge_bps"])) + def test_pm_backtest_sizing_uses_position_manager_formula_not_fixed_floor(self) -> None: frame = pd.DataFrame( { @@ -78,8 +144,8 @@ class RiskPmFixTest(unittest.TestCase): "short_position_risk_prob": [0.10], "pred_long_expected_net_edge_bps": [40.0], "pred_short_expected_net_edge_bps": [1.0], - "actual_long_expected_net_edge_bps": [30.0], - "actual_short_expected_net_edge_bps": [-10.0], + "actual_long_plan_edge_bps": [30.0], + "actual_short_plan_edge_bps": [-10.0], "long_trade_net_edge_bps": [11.0], "short_trade_net_edge_bps": [-14.5], "long_target_hit": [1], @@ -112,7 +178,7 @@ class RiskPmFixTest(unittest.TestCase): self.assertEqual(1, len(trades)) self.assertAlmostEqual(11.0, float(trades.iloc[0]["actual_edge_bps"])) - self.assertAlmostEqual(30.0, float(trades.iloc[0]["label_max_edge_bps"])) + self.assertAlmostEqual(30.0, float(trades.iloc[0]["label_actual_plan_edge_bps"])) self.assertGreater(float(trades.iloc[0]["planned_ratio"]), 0.05) self.assertLessEqual(float(trades.iloc[0]["planned_ratio"]), 0.20) @@ -133,8 +199,8 @@ class RiskPmFixTest(unittest.TestCase): "short_position_risk_prob": [0.10, 0.10], "pred_long_expected_net_edge_bps": [40.0, 42.0], "pred_short_expected_net_edge_bps": [1.0, 1.0], - "actual_long_expected_net_edge_bps": [30.0, 31.0], - "actual_short_expected_net_edge_bps": [-10.0, -10.0], + "actual_long_plan_edge_bps": [30.0, 31.0], + "actual_short_plan_edge_bps": [-10.0, -10.0], "long_trade_net_edge_bps": [11.0, 12.0], "short_trade_net_edge_bps": [-14.5, -14.5], "long_target_hit": [1, 1], diff --git a/training/tests/test_training_contract.py b/training/tests/test_training_contract.py index be61aec..290dfab 100644 --- a/training/tests/test_training_contract.py +++ b/training/tests/test_training_contract.py @@ -22,6 +22,7 @@ from trader_training.ofi_feature_experiment import _load_entry_dataset, l1_snaps from trader_training.promote import promote_artifact_bundle from trader_training.replay import build_splits from trader_training.schemas import FEATURE_ORDER, LATEST_STRESS_SPLIT, MODEL_OUTPUTS, OUTPUT_MAPPING, TRAINING_SPLITS, VALIDATION_LOCKED_SPLIT +from trader_training.training import TARGETS class TrainingContractTest(unittest.TestCase): @@ -49,6 +50,12 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual("long_actual_plan_net_edge_bps", _screen_edge_column(dataset, "LONG")) self.assertEqual("short_actual_plan_net_edge_bps", _screen_edge_column(dataset, "SHORT")) + def test_entry_regression_heads_train_on_actual_plan_edge(self) -> None: + heads = {head[0]: head[2] for head in TARGETS["ENTRY"]["heads"]} + + self.assertEqual("long_actual_plan_net_edge_bps", heads["long_expected_net_edge_bps"]) + self.assertEqual("short_actual_plan_net_edge_bps", heads["short_expected_net_edge_bps"]) + def test_entry_feature_screen_keeps_zero_inflated_event_features(self) -> None: values = np.concatenate((np.zeros(5000), np.linspace(1.0, 100.0, 500))) edges = _bucket_edges(values) diff --git a/training/trader_training/diagnostics.py b/training/trader_training/diagnostics.py index ae052b7..63e1323 100644 --- a/training/trader_training/diagnostics.py +++ b/training/trader_training/diagnostics.py @@ -50,13 +50,16 @@ def _label_summary(root) -> dict[str, Any]: "future_return_bps_quantile": _quantiles(direction_split["future_return_bps"], (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99)), } if not entry_split.empty: + if "actual_plan_net_edge_bps" not in entry_split.columns: + raise ValueError("entry_labels is missing actual_plan_net_edge_bps for diagnostics") grouped = entry_split.groupby("side", observed=False) item["entry"] = { "rows": len(entry_split), "target_rate_by_side": grouped["entry_target"].mean().round(6).to_dict(), - "edge_mean_by_side": grouped["expected_net_edge_bps"].mean().round(6).to_dict(), + "edge_column": "actual_plan_net_edge_bps", + "edge_mean_by_side": grouped["actual_plan_net_edge_bps"].mean().round(6).to_dict(), "edge_quantile_by_side": { - str(side): _quantiles(group["expected_net_edge_bps"], (0.05, 0.5, 0.95)) + str(side): _quantiles(group["actual_plan_net_edge_bps"], (0.05, 0.5, 0.95)) for side, group in grouped }, } @@ -98,8 +101,8 @@ def _score_distribution(frame: pd.DataFrame) -> dict[str, dict[str, float]]: "pred_short_expected_net_edge_bps", "model_pred_long_expected_net_edge_bps", "model_pred_short_expected_net_edge_bps", - "actual_long_expected_net_edge_bps", - "actual_short_expected_net_edge_bps", + "actual_long_plan_edge_bps", + "actual_short_plan_edge_bps", ] return {column: _quantiles(frame[column], (0.0, 0.05, 0.5, 0.95, 1.0)) for column in columns if column in frame.columns} @@ -141,11 +144,10 @@ def _cumulative_gate_counts(steps: dict[str, pd.Series], total_rows: int) -> dic def _relaxed_variants(frame: pd.DataFrame) -> dict[str, Any]: variants = { - "no_risk_no_edge": {"prob": 0.54, "entry": 0.50, "margin": 0.02, "risk": 1.0, "edge": -99.0}, - "rare_entry_low_prob": {"prob": 0.50, "entry": 0.03, "margin": 0.02, "risk": 0.98, "edge": 0.0}, - "entry_only_55": {"prob": 0.0, "entry": 0.55, "margin": -99.0, "risk": 1.0, "edge": -99.0}, - "direction_only_54": {"prob": 0.54, "entry": 0.0, "margin": 0.02, "risk": 1.0, "edge": -99.0}, - "very_loose": {"prob": 0.50, "entry": 0.45, "margin": 0.0, "risk": 1.0, "edge": -99.0}, + "entry_30_positive_edge": {"prob": 0.50, "entry": 0.30, "margin": 0.02, "risk": 0.65, "edge": 3.0}, + "entry_50_positive_edge": {"prob": 0.50, "entry": 0.50, "margin": 0.02, "risk": 0.65, "edge": 3.0}, + "entry_70_positive_edge": {"prob": 0.50, "entry": 0.70, "margin": 0.02, "risk": 0.65, "edge": 3.0}, + "direction_only_control": {"prob": 0.54, "entry": 0.0, "margin": 0.02, "risk": 1.0, "edge": -99.0}, } result: dict[str, Any] = {} for name, thresholds in variants.items(): @@ -200,8 +202,8 @@ def _top_bucket_edge(frame: pd.DataFrame) -> dict[str, Any]: direction_top[str(fraction)] = _plain_trade_metrics(top.rename(columns={"actual_edge_bps": "actual_edge_bps"})) return { "direction_top_score": direction_top, - "long_entry_prob_deciles": _decile_edge(frame, "long_entry_prob", "actual_long_expected_net_edge_bps", "long_entry_target"), - "short_entry_prob_deciles": _decile_edge(frame, "short_entry_prob", "actual_short_expected_net_edge_bps", "short_entry_target"), + "long_entry_prob_deciles": _decile_edge(frame, "long_entry_prob", "actual_long_plan_edge_bps", "long_entry_target"), + "short_entry_prob_deciles": _decile_edge(frame, "short_entry_prob", "actual_short_plan_edge_bps", "short_entry_target"), } diff --git a/training/trader_training/ofi_feature_experiment.py b/training/trader_training/ofi_feature_experiment.py index 9357de0..a7c999a 100644 --- a/training/trader_training/ofi_feature_experiment.py +++ b/training/trader_training/ofi_feature_experiment.py @@ -338,36 +338,21 @@ def _load_direction_dataset(baseline_root: Path, feature: pd.DataFrame) -> pd.Da def _load_entry_dataset(baseline_root: Path, feature: pd.DataFrame) -> pd.DataFrame: dataset_path = baseline_root / "dataset" / "entry_train.parquet" - if dataset_path.is_file(): - labels = read_parquet(dataset_path) - required = { - "sample_id", - "long_entry_target", - "short_entry_target", - "long_actual_plan_net_edge_bps", - "short_actual_plan_net_edge_bps", - } - missing = sorted(required.difference(labels.columns)) - if missing: - raise ValueError(f"entry_train dataset missing columns: {missing}") - dataset = feature.merge(labels[list(required)], on="sample_id", how="inner") - logging.info("trader.training.ofi_entry_dataset_loaded source=entry_train rowCount=%s", len(dataset)) - return dataset - - labels = read_parquet(baseline_root / "label" / "entry_labels.parquet") - required = {"sample_id", "side", "entry_target", "expected_net_edge_bps"} + if not dataset_path.is_file(): + raise FileNotFoundError(f"entry_train dataset is required for OFI experiment: {dataset_path}") + labels = read_parquet(dataset_path) + required = { + "sample_id", + "long_entry_target", + "short_entry_target", + "long_actual_plan_net_edge_bps", + "short_actual_plan_net_edge_bps", + } missing = sorted(required.difference(labels.columns)) if missing: - raise ValueError(f"entry labels missing columns: {missing}") - long = labels[labels["side"].eq("LONG")][["sample_id", "entry_target", "expected_net_edge_bps"]].rename( - columns={"entry_target": "long_entry_target", "expected_net_edge_bps": "long_expected_net_edge_bps"} - ) - short = labels[labels["side"].eq("SHORT")][["sample_id", "entry_target", "expected_net_edge_bps"]].rename( - columns={"entry_target": "short_entry_target", "expected_net_edge_bps": "short_expected_net_edge_bps"} - ) - pivot = long.merge(short, on="sample_id", how="inner") - dataset = feature.merge(pivot, on="sample_id", how="inner") - logging.info("trader.training.ofi_entry_dataset_loaded source=entry_labels_legacy rowCount=%s", len(dataset)) + raise ValueError(f"entry_train dataset missing columns: {missing}") + dataset = feature.merge(labels[list(required)], on="sample_id", how="inner") + logging.info("trader.training.ofi_entry_dataset_loaded source=entry_train rowCount=%s", len(dataset)) return dataset diff --git a/training/trader_training/pm.py b/training/trader_training/pm.py index a8124a1..ad89315 100644 --- a/training/trader_training/pm.py +++ b/training/trader_training/pm.py @@ -232,8 +232,8 @@ def _pm_frame(root, split_id: str) -> pd.DataFrame: price_plan = _price_plan_context(root) entry_dataset = read_parquet(root / "dataset" / "entry_train.parquet").rename( columns={ - "long_expected_net_edge_bps": "actual_long_expected_net_edge_bps", - "short_expected_net_edge_bps": "actual_short_expected_net_edge_bps", + "long_actual_plan_net_edge_bps": "actual_long_plan_edge_bps", + "short_actual_plan_net_edge_bps": "actual_short_plan_edge_bps", } ) entry_plan_outcome = _entry_plan_outcome_frame(root) @@ -245,7 +245,10 @@ def _pm_frame(root, split_id: str) -> pd.DataFrame: "pred_short_expected_net_edge_bps", ] risk_cols = ["sample_id", "market_risk_prob", "long_position_risk_prob", "short_position_risk_prob"] - actual_cols = ["sample_id", "actual_long_expected_net_edge_bps", "actual_short_expected_net_edge_bps", "long_entry_target", "short_entry_target"] + actual_cols = ["sample_id", "actual_long_plan_edge_bps", "actual_short_plan_edge_bps", "long_entry_target", "short_entry_target"] + missing_actual_cols = sorted(set(actual_cols) - set(entry_dataset.columns)) + if missing_actual_cols: + raise ValueError(f"entry_train is missing actual plan edge columns for PM: {missing_actual_cols}") frame = ( direction[["sample_id", "symbol", "event_time", "split_id", "long_prob", "short_prob", "neutral_prob"]] .merge(entry[entry_cols], on="sample_id", how="inner") @@ -257,7 +260,7 @@ def _pm_frame(root, split_id: str) -> pd.DataFrame: raise ValueError(f"PM frame is empty for {split_id}; check model predictions and entry dataset") frame["model_pred_long_expected_net_edge_bps"] = frame["pred_long_expected_net_edge_bps"] frame["model_pred_short_expected_net_edge_bps"] = frame["pred_short_expected_net_edge_bps"] - edge_mode = "MODEL_EXPECTED_NET_EDGE" + edge_mode = "MODEL_ACTUAL_PLAN_EDGE" if price_plan.get("entryTargetMethod") not in {"OPPORTUNITY_MFE_V1", "OPPORTUNITY_QUALITY_V1"}: frame["pred_long_expected_net_edge_bps"] = _probability_implied_edge(frame["long_entry_prob"], price_plan) frame["pred_short_expected_net_edge_bps"] = _probability_implied_edge(frame["short_entry_prob"], price_plan) @@ -333,9 +336,9 @@ def _threshold_candidates() -> list[dict[str, float]]: values = itertools.product( [0.50, 0.60, 0.70, 1.01], [0.50, 0.60, 0.70, 1.01], - [0.03, 0.50, 0.70, 0.85], - [0.45, 0.65, 0.85], - [0.0, 8.0, 15.0, 25.0], + [0.30, 0.50, 0.70, 0.85], + [0.45, 0.65], + [3.0, 8.0, 15.0, 25.0], [0.02, 0.06, 0.10], ) return [ @@ -394,7 +397,7 @@ def _simulate_open_trades( trades["entry_prob"] = np.where(is_long, trades["long_entry_prob"], trades["short_entry_prob"]) trades["predicted_edge_bps"] = np.where(is_long, trades["pred_long_expected_net_edge_bps"], trades["pred_short_expected_net_edge_bps"]) trades["actual_edge_bps"] = np.where(is_long, trades["long_trade_net_edge_bps"], trades["short_trade_net_edge_bps"]) - trades["label_max_edge_bps"] = np.where(is_long, trades["actual_long_expected_net_edge_bps"], trades["actual_short_expected_net_edge_bps"]) + trades["label_actual_plan_edge_bps"] = np.where(is_long, trades["actual_long_plan_edge_bps"], trades["actual_short_plan_edge_bps"]) trades["entry_target"] = np.where(is_long, trades["long_entry_target"], trades["short_entry_target"]) effective_pm_config = pm_config or _pm_config_from_thresholds(thresholds) effective_price_plan = price_plan or DEFAULT_BACKTEST_PRICE_PLAN @@ -417,7 +420,7 @@ def _simulate_open_trades( "entry_prob", "market_risk_prob", "predicted_edge_bps", - "label_max_edge_bps", + "label_actual_plan_edge_bps", "actual_edge_bps", "entry_target", "time_to_exit_ms", @@ -440,7 +443,7 @@ def _empty_trade_frame() -> pd.DataFrame: "entry_prob", "market_risk_prob", "predicted_edge_bps", - "label_max_edge_bps", + "label_actual_plan_edge_bps", "actual_edge_bps", "entry_target", "time_to_exit_ms", diff --git a/training/trader_training/training.py b/training/trader_training/training.py index 60a1613..3afa581 100644 --- a/training/trader_training/training.py +++ b/training/trader_training/training.py @@ -39,8 +39,8 @@ TARGETS = { "heads": [ ("long_entry_prob", "binary", "long_entry_target", ["long_entry_prob"], ["longEntryProb"]), ("short_entry_prob", "binary", "short_entry_target", ["short_entry_prob"], ["shortEntryProb"]), - ("long_expected_net_edge_bps", "regression", "long_expected_net_edge_bps", ["long_expected_net_edge_bps"], [None]), - ("short_expected_net_edge_bps", "regression", "short_expected_net_edge_bps", ["short_expected_net_edge_bps"], [None]), + ("long_expected_net_edge_bps", "regression", "long_actual_plan_net_edge_bps", ["long_expected_net_edge_bps"], [None]), + ("short_expected_net_edge_bps", "regression", "short_actual_plan_net_edge_bps", ["short_expected_net_edge_bps"], [None]), ], }, "CONTINUE": { @@ -119,11 +119,12 @@ def train_small_models(args: Any) -> None: head_results.extend(_fit_head(item, x_train_scaled, x_tune_scaled, train, tune, scaler)) for result in head_results: logging.info( - "trader.training.model_head_trained runId=%s model=%s head=%s kind=%s metrics=%s", + "trader.training.model_head_trained runId=%s model=%s head=%s kind=%s targetSource=%s metrics=%s", args.run_id, model_name, result.field, result.kind, + result.metrics.get("target_source"), result.metrics, ) for result in head_results: @@ -221,7 +222,9 @@ def _fit_head(item, x_train, x_tune, train: pd.DataFrame, tune: pd.DataFrame, sc model.fit(x_train, y_train) pred = model.predict(x_tune) weight, bias = _fold_scaler(model.coef_.reshape(1, -1).T, np.array([model.intercept_]), scaler) - return [HeadResult(fields[0], None, "identity", weight, bias, _regression_metrics(y_train, y_val, pred), pred.reshape(-1, 1), y_val)] + metrics = _regression_metrics(y_train, y_val, pred) + metrics["target_source"] = target + return [HeadResult(fields[0], None, "identity", weight, bias, metrics, pred.reshape(-1, 1), y_val)] raise ValueError(f"unsupported head kind: {kind}")