From 340d1dd91b31bbb9c958ac8f7a2d593cfcef7b4d Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 28 Jun 2026 00:50:37 +0800 Subject: [PATCH] Improve Trader entry quality training diagnostics --- .../23_train_ofi_feature_experiment.py | 23 + training/tests/test_risk_pm_fix.py | 173 ++++ training/tests/test_training_contract.py | 210 ++++- training/trader_training/datasets.py | 22 +- training/trader_training/diagnostics.py | 85 +- .../trader_training/entry_feature_screen.py | 16 +- training/trader_training/labels.py | 281 +++++- .../trader_training/ofi_feature_experiment.py | 890 ++++++++++++++++++ training/trader_training/pm.py | 272 +++++- training/trader_training/price_plan_search.py | 31 +- training/trader_training/schemas.py | 2 +- 11 files changed, 1895 insertions(+), 110 deletions(-) create mode 100644 training/scripts/23_train_ofi_feature_experiment.py create mode 100644 training/tests/test_risk_pm_fix.py create mode 100644 training/trader_training/ofi_feature_experiment.py diff --git a/training/scripts/23_train_ofi_feature_experiment.py b/training/scripts/23_train_ofi_feature_experiment.py new file mode 100644 index 0000000..958cc97 --- /dev/null +++ b/training/scripts/23_train_ofi_feature_experiment.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import _bootstrap # noqa: F401 +from trader_training.io_utils import add_common_args, setup_logging +from trader_training.ofi_feature_experiment import run_ofi_feature_experiment + + +def main() -> None: + parser = argparse.ArgumentParser() + add_common_args(parser) + parser.add_argument("--baseline-run-id", required=True) + parser.add_argument("--raw-root", type=Path) + parser.add_argument("--max-rows-per-split", type=int, default=0) + args = parser.parse_args() + setup_logging() + run_ofi_feature_experiment(args) + + +if __name__ == "__main__": + main() diff --git a/training/tests/test_risk_pm_fix.py b/training/tests/test_risk_pm_fix.py new file mode 100644 index 0000000..2544222 --- /dev/null +++ b/training/tests/test_risk_pm_fix.py @@ -0,0 +1,173 @@ +from __future__ import annotations + +import sys +import unittest +from pathlib import Path + +import numpy as np +import pandas as pd + +TRAINING_ROOT = Path(__file__).resolve().parents[1] +if str(TRAINING_ROOT) not in sys.path: + sys.path.insert(0, str(TRAINING_ROOT)) + +from trader_training.labels import DEFAULT_LABEL_CONFIG, _path_stats_for_group +from trader_training.pm import _probability_implied_edge, _simulate_open_trades, _threshold_candidates, default_pm_config + + +class RiskPmFixTest(unittest.TestCase): + def test_path_stats_never_writes_negative_adverse_or_favorable_move(self) -> None: + frame = pd.DataFrame( + { + "event_time": pd.date_range("2026-01-01", periods=4, freq="min", tz="UTC"), + "open_time_ms": np.arange(4, dtype=np.int64) * 60_000, + "symbol": "BTC-USDT-PERP", + "close": [100.0, 101.0, 102.0, 103.0], + "high": [100.0, 101.0, 102.0, 103.0], + "low": [100.0, 101.0, 102.0, 103.0], + "spread_bps": [1.0, 1.0, 1.0, 1.0], + } + ) + + long_stats = _path_stats_for_group(frame, "LONG", horizon=2, target_bps=500.0, stop_bps=500.0) + short_stats = _path_stats_for_group(frame, "SHORT", horizon=2, target_bps=500.0, stop_bps=500.0) + + self.assertGreaterEqual(float(long_stats["mae_bps"].min()), 0.0) + self.assertGreaterEqual(float(long_stats["mfe_bps"].min()), 0.0) + self.assertGreaterEqual(float(short_stats["mae_bps"].min()), 0.0) + self.assertGreaterEqual(float(short_stats["mfe_bps"].min()), 0.0) + + def test_default_risk_labels_match_design_thresholds(self) -> None: + self.assertEqual(45, DEFAULT_LABEL_CONFIG["continue"]["horizon_minutes"]) + self.assertEqual(60.0, DEFAULT_LABEL_CONFIG["risk"]["market_drawdown_bps"]) + self.assertEqual(35.0, DEFAULT_LABEL_CONFIG["risk"]["position_path_risk_bps"]) + self.assertEqual(80.0, DEFAULT_LABEL_CONFIG["risk"]["spike_bps"]) + self.assertEqual(1.8, DEFAULT_LABEL_CONFIG["risk"]["vol_expansion_ratio"]) + + def test_pm_search_covers_low_entry_probability_without_allowing_negative_edge(self) -> None: + candidates = _threshold_candidates() + + self.assertTrue(candidates) + self.assertLessEqual(max(item["max_market_risk_prob"] for item in candidates), 0.98) + self.assertLessEqual(min(item["min_entry_prob"] for item in candidates), 0.03) + self.assertGreaterEqual(min(item["min_expected_edge_bps"] for item in candidates), 0.0) + + def test_probability_implied_edge_uses_price_plan_payoff(self) -> None: + edge = _probability_implied_edge( + pd.Series([0.10, 0.50]), + {"targetDistanceBps": 120.0, "stopDistanceBps": 2.0, "costBps": 6.5}, + ) + + self.assertAlmostEqual(3.7, float(edge.iloc[0]), places=6) + self.assertAlmostEqual(52.5, float(edge.iloc[1]), places=6) + + def test_pm_backtest_sizing_uses_position_manager_formula_not_fixed_floor(self) -> None: + frame = pd.DataFrame( + { + "sample_id": ["s0"], + "symbol": ["BTC-USDT-PERP"], + "event_time": pd.to_datetime(["2026-01-01T00:00:00Z"]), + "split_id": ["tune_inner"], + "long_prob": [0.70], + "short_prob": [0.10], + "neutral_prob": [0.20], + "long_entry_prob": [0.80], + "short_entry_prob": [0.20], + "market_risk_prob": [0.20], + "long_position_risk_prob": [0.10], + "short_position_risk_prob": [0.10], + "pred_long_expected_net_edge_bps": [40.0], + "pred_short_expected_net_edge_bps": [1.0], + "actual_long_expected_net_edge_bps": [30.0], + "actual_short_expected_net_edge_bps": [-10.0], + "long_trade_net_edge_bps": [11.0], + "short_trade_net_edge_bps": [-14.5], + "long_target_hit": [1], + "short_target_hit": [0], + "long_stop_hit": [0], + "short_stop_hit": [1], + "long_time_to_target_ms": [300_000], + "short_time_to_target_ms": [-1], + "long_time_to_stop_ms": [-1], + "short_time_to_stop_ms": [180_000], + "long_entry_target": [1], + "short_entry_target": [0], + } + ) + thresholds = { + "long_open_prob": 0.55, + "short_open_prob": 0.55, + "min_entry_prob": 0.55, + "max_market_risk_prob": 0.55, + "min_expected_edge_bps": 3.0, + "min_direction_margin": 0.02, + } + + trades = _simulate_open_trades( + frame, + thresholds, + default_pm_config(), + {"stopDistanceBps": 8.0, "costBps": 6.5}, + ) + + self.assertEqual(1, len(trades)) + self.assertAlmostEqual(11.0, float(trades.iloc[0]["actual_edge_bps"])) + self.assertAlmostEqual(30.0, float(trades.iloc[0]["label_max_edge_bps"])) + self.assertGreater(float(trades.iloc[0]["planned_ratio"]), 0.05) + self.assertLessEqual(float(trades.iloc[0]["planned_ratio"]), 0.20) + + def test_pm_backtest_blocks_overlapping_open_trades_until_exit_and_cooldown(self) -> None: + frame = pd.DataFrame( + { + "sample_id": ["s0", "s1"], + "symbol": ["BTC-USDT-PERP", "BTC-USDT-PERP"], + "event_time": pd.to_datetime(["2026-01-01T00:00:00Z", "2026-01-01T00:01:00Z"]), + "split_id": ["tune_inner", "tune_inner"], + "long_prob": [0.70, 0.72], + "short_prob": [0.10, 0.10], + "neutral_prob": [0.20, 0.18], + "long_entry_prob": [0.80, 0.82], + "short_entry_prob": [0.20, 0.20], + "market_risk_prob": [0.20, 0.20], + "long_position_risk_prob": [0.10, 0.10], + "short_position_risk_prob": [0.10, 0.10], + "pred_long_expected_net_edge_bps": [40.0, 42.0], + "pred_short_expected_net_edge_bps": [1.0, 1.0], + "actual_long_expected_net_edge_bps": [30.0, 31.0], + "actual_short_expected_net_edge_bps": [-10.0, -10.0], + "long_trade_net_edge_bps": [11.0, 12.0], + "short_trade_net_edge_bps": [-14.5, -14.5], + "long_target_hit": [1, 1], + "short_target_hit": [0, 0], + "long_stop_hit": [0, 0], + "short_stop_hit": [1, 1], + "long_time_to_target_ms": [300_000, 300_000], + "short_time_to_target_ms": [-1, -1], + "long_time_to_stop_ms": [-1, -1], + "short_time_to_stop_ms": [180_000, 180_000], + "long_entry_target": [1, 1], + "short_entry_target": [0, 0], + } + ) + thresholds = { + "long_open_prob": 0.55, + "short_open_prob": 0.55, + "min_entry_prob": 0.55, + "max_market_risk_prob": 0.55, + "min_expected_edge_bps": 3.0, + "min_direction_margin": 0.02, + } + + trades = _simulate_open_trades( + frame, + thresholds, + default_pm_config(), + {"stopDistanceBps": 8.0, "costBps": 6.5, "maxHoldMinutes": 45}, + ) + + self.assertEqual(1, len(trades)) + self.assertEqual("s0", trades.iloc[0]["sample_id"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/training/tests/test_training_contract.py b/training/tests/test_training_contract.py index 30bca83..7e23f95 100644 --- a/training/tests/test_training_contract.py +++ b/training/tests/test_training_contract.py @@ -14,8 +14,10 @@ if str(TRAINING_ROOT) not in sys.path: sys.path.insert(0, str(TRAINING_ROOT)) from trader_training.onnx_export import LinearHead, export_heads +from trader_training.entry_feature_screen import _screen_edge_column from trader_training.io_utils import read_json, write_json from trader_training.labels import ENTRY_LABEL_METHOD, _path_stats_for_group, build_entry_labels +from trader_training.ofi_feature_experiment import l1_snapshot_diff_ofi_quote from trader_training.promote import promote_artifact_bundle from trader_training.replay import build_splits from trader_training.schemas import FEATURE_ORDER, LATEST_STRESS_SPLIT, MODEL_OUTPUTS, OUTPUT_MAPPING, TRAINING_SPLITS, VALIDATION_LOCKED_SPLIT @@ -33,6 +35,19 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual(set(fields), set(OUTPUT_MAPPING[model_name])) self.assertEqual([f"prediction[{idx}]" for idx in range(len(fields))], [OUTPUT_MAPPING[model_name][field] for field in fields]) + def test_entry_feature_screen_prefers_actual_plan_edge(self) -> None: + dataset = pd.DataFrame( + { + "long_expected_net_edge_bps": [20.0], + "short_expected_net_edge_bps": [15.0], + "long_actual_plan_net_edge_bps": [-3.0], + "short_actual_plan_net_edge_bps": [4.0], + } + ) + + self.assertEqual("long_actual_plan_net_edge_bps", _screen_edge_column(dataset, "LONG")) + self.assertEqual("short_actual_plan_net_edge_bps", _screen_edge_column(dataset, "SHORT")) + def test_split_builder_uses_locked_validation_contract(self) -> None: with tempfile.TemporaryDirectory() as tmp: data_root = Path(tmp) @@ -90,7 +105,7 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual(120_000, first["time_to_stop_ms"]) self.assertAlmostEqual(-8.0, first["gross_edge_bps"]) - def test_entry_label_uses_max_future_edge_not_fixed_target_hit(self) -> None: + def test_entry_label_uses_price_plan_outcome_not_max_future_edge(self) -> None: with tempfile.TemporaryDirectory() as tmp: data_root = Path(tmp) run_root = data_root / "trader-v4" / "runs" / "unit-entry" @@ -167,11 +182,200 @@ class TrainingContractTest(unittest.TestCase): labels = pd.read_parquet(run_root / "label" / "entry_labels.parquet") row = labels[labels["sample_id"].eq("s0") & labels["side"].eq("LONG")].iloc[0] self.assertEqual(0, row["target_hit"]) - self.assertEqual(1, row["entry_target"]) + self.assertEqual(0, row["entry_target"]) self.assertEqual(ENTRY_LABEL_METHOD, row["label_method"]) - self.assertAlmostEqual(13.5, row["expected_net_edge_bps"], places=6) + self.assertAlmostEqual(-6.5, row["expected_net_edge_bps"], places=6) + self.assertAlmostEqual(row["gross_edge_bps"] - row["cost_bps"], row["expected_net_edge_bps"], places=6) self.assertAlmostEqual(row["mfe_bps"] - row["cost_bps"], row["max_achievable_net_edge_bps"], places=6) + def test_entry_opportunity_label_keeps_plan_outcome_for_pm(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + data_root = Path(tmp) + run_root = data_root / "trader-v4" / "runs" / "unit-entry-opportunity" + feature_path = run_root / "feature" / "feature_frame.parquet" + replay_path = run_root / "replay" / "replay_1m.parquet" + plan_path = run_root / "label" / "price_plan_context.json" + config_path = data_root / "label_config.json" + feature_path.parent.mkdir(parents=True) + replay_path.parent.mkdir(parents=True) + + times = pd.date_range("2026-01-01", periods=5, freq="min", tz="UTC") + pd.DataFrame( + { + "sample_id": ["s0"], + "symbol": "BTC-USDT-PERP", + "event_time": [times[0]], + "open_time_ms": [0], + "split_id": "fit_inner", + "walk_forward_fold": 0, + "data_quality_flag": "OK", + "spread_bps": 1.0, + "spread_rank_24h_pct": 0.1, + "realized_vol_15m_bps": 2.0, + } + ).to_parquet(feature_path, index=False) + pd.DataFrame( + { + "event_time": times, + "open_time_ms": np.arange(5, dtype=np.int64) * 60_000, + "symbol": "BTC-USDT-PERP", + "open": [100.0] * 5, + "high": [100.0, 100.05, 100.19, 100.20, 100.0], + "low": [100.0, 99.99, 99.98, 99.97, 100.0], + "close": [100.0] * 5, + "spread_bps": 1.0, + } + ).to_parquet(replay_path, index=False) + write_json( + config_path, + { + "entry": { + "max_hold_minutes": 3, + "target_bps": 50.0, + "stop_bps": 50.0, + "min_expected_net_edge_bps": 3.0, + "target_method": "OPPORTUNITY_MFE_V1", + } + }, + ) + write_json( + plan_path, + { + "pricePlanId": "unit-plan", + "pricePlanConfigHash": "unit-hash", + "targetDistanceBps": 50.0, + "stopDistanceBps": 50.0, + "maxHoldMinutes": 3, + "costBps": 6.5, + "entryLabelMethod": ENTRY_LABEL_METHOD, + "entryTargetMethod": "OPPORTUNITY_MFE_V1", + }, + ) + + build_entry_labels( + Namespace( + data_root=data_root, + run_id="unit-entry-opportunity", + feature_path=feature_path, + replay_path=replay_path, + label_config_path=config_path, + cost_config_path=None, + price_plan_context_path=plan_path, + ) + ) + + labels = pd.read_parquet(run_root / "label" / "entry_labels.parquet") + row = labels[labels["sample_id"].eq("s0") & labels["side"].eq("LONG")].iloc[0] + self.assertEqual(0, row["target_hit"]) + self.assertEqual(1, row["entry_target"]) + self.assertEqual("OPPORTUNITY_MFE_V1", row["label_method"]) + self.assertAlmostEqual(row["mfe_bps"] - row["cost_bps"], row["expected_net_edge_bps"], places=6) + self.assertAlmostEqual(-6.5, row["gross_edge_bps"] - row["cost_bps"], places=6) + + def test_entry_quality_label_rejects_untradable_opportunity(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + data_root = Path(tmp) + run_root = data_root / "trader-v4" / "runs" / "unit-entry-quality" + feature_path = run_root / "feature" / "feature_frame.parquet" + replay_path = run_root / "replay" / "replay_1m.parquet" + plan_path = run_root / "label" / "price_plan_context.json" + config_path = data_root / "label_config.json" + feature_path.parent.mkdir(parents=True) + replay_path.parent.mkdir(parents=True) + + times = pd.date_range("2026-01-01", periods=5, freq="min", tz="UTC") + pd.DataFrame( + { + "sample_id": ["s0"], + "symbol": "BTC-USDT-PERP", + "event_time": [times[0]], + "open_time_ms": [0], + "split_id": "fit_inner", + "walk_forward_fold": 0, + "data_quality_flag": "OK", + "spread_bps": 1.0, + "spread_rank_24h_pct": 0.1, + "realized_vol_15m_bps": 2.0, + } + ).to_parquet(feature_path, index=False) + pd.DataFrame( + { + "event_time": times, + "open_time_ms": np.arange(5, dtype=np.int64) * 60_000, + "symbol": "BTC-USDT-PERP", + "open": [100.0] * 5, + "high": [100.0, 100.05, 100.19, 100.20, 100.0], + "low": [100.0, 99.99, 99.98, 99.97, 100.0], + "close": [100.0] * 5, + "spread_bps": 1.0, + } + ).to_parquet(replay_path, index=False) + write_json( + config_path, + { + "entry": { + "max_hold_minutes": 3, + "target_bps": 50.0, + "stop_bps": 50.0, + "min_expected_net_edge_bps": 3.0, + "min_plan_net_edge_bps": 0.0, + "max_entry_mae_bps": 12.0, + "target_method": "OPPORTUNITY_QUALITY_V1", + } + }, + ) + write_json( + plan_path, + { + "pricePlanId": "unit-plan", + "pricePlanConfigHash": "unit-hash", + "targetDistanceBps": 50.0, + "stopDistanceBps": 50.0, + "maxHoldMinutes": 3, + "costBps": 6.5, + "entryLabelMethod": ENTRY_LABEL_METHOD, + "entryTargetMethod": "OPPORTUNITY_QUALITY_V1", + }, + ) + + build_entry_labels( + Namespace( + data_root=data_root, + run_id="unit-entry-quality", + feature_path=feature_path, + replay_path=replay_path, + label_config_path=config_path, + cost_config_path=None, + price_plan_context_path=plan_path, + ) + ) + + labels = pd.read_parquet(run_root / "label" / "entry_labels.parquet") + row = labels[labels["sample_id"].eq("s0") & labels["side"].eq("LONG")].iloc[0] + self.assertEqual("OPPORTUNITY_QUALITY_V1", row["label_method"]) + self.assertGreater(row["expected_net_edge_bps"], 3.0) + self.assertLess(row["actual_plan_net_edge_bps"], 0.0) + self.assertEqual(0, row["entry_target"]) + + def test_l1_snapshot_diff_ofi_uses_quote_notional_and_signed_ask_side(self) -> None: + bid_part, ask_part = l1_snapshot_diff_ofi_quote( + pd.Series([101.0, 101.0, 100.5]), + pd.Series([2.0, 3.0, 4.0]), + pd.Series([102.0, 101.5, 102.5]), + pd.Series([5.0, 6.0, 7.0]), + pd.Series([100.0, 101.0, 101.0]), + pd.Series([1.5, 2.0, 3.0]), + pd.Series([102.0, 102.0, 101.5]), + pd.Series([4.0, 5.0, 6.0]), + ) + + self.assertAlmostEqual(202.0, bid_part.iloc[0]) + self.assertAlmostEqual(-102.0, ask_part.iloc[0]) + self.assertAlmostEqual(101.0, bid_part.iloc[1]) + self.assertAlmostEqual(-609.0, ask_part.iloc[1]) + self.assertAlmostEqual(-303.0, bid_part.iloc[2]) + self.assertAlmostEqual(609.0, ask_part.iloc[2]) + def test_exported_onnx_accepts_java_feature_shape(self) -> None: import onnxruntime as ort diff --git a/training/trader_training/datasets.py b/training/trader_training/datasets.py index 5ea016c..6640d4b 100644 --- a/training/trader_training/datasets.py +++ b/training/trader_training/datasets.py @@ -83,11 +83,25 @@ def build_train_datasets(args: Any) -> None: def _entry_pivot(entry: pd.DataFrame) -> pd.DataFrame: require_columns(entry, ("sample_id", "side", "entry_target", "expected_net_edge_bps"), "entry_labels") - long = entry[entry["side"] == "LONG"][["sample_id", "entry_target", "expected_net_edge_bps"]].rename( - columns={"entry_target": "long_entry_target", "expected_net_edge_bps": "long_expected_net_edge_bps"} + optional_columns = [column for column in ("actual_plan_net_edge_bps", "max_achievable_net_edge_bps", "mae_bps") if column in entry.columns] + selected_columns = ["sample_id", "entry_target", "expected_net_edge_bps", *optional_columns] + long = entry[entry["side"] == "LONG"][selected_columns].rename( + columns={ + "entry_target": "long_entry_target", + "expected_net_edge_bps": "long_expected_net_edge_bps", + "actual_plan_net_edge_bps": "long_actual_plan_net_edge_bps", + "max_achievable_net_edge_bps": "long_max_achievable_net_edge_bps", + "mae_bps": "long_mae_bps", + } ) - short = entry[entry["side"] == "SHORT"][["sample_id", "entry_target", "expected_net_edge_bps"]].rename( - columns={"entry_target": "short_entry_target", "expected_net_edge_bps": "short_expected_net_edge_bps"} + short = entry[entry["side"] == "SHORT"][selected_columns].rename( + columns={ + "entry_target": "short_entry_target", + "expected_net_edge_bps": "short_expected_net_edge_bps", + "actual_plan_net_edge_bps": "short_actual_plan_net_edge_bps", + "max_achievable_net_edge_bps": "short_max_achievable_net_edge_bps", + "mae_bps": "short_mae_bps", + } ) return long.merge(short, on="sample_id", how="inner") diff --git a/training/trader_training/diagnostics.py b/training/trader_training/diagnostics.py index f6afb07..ae052b7 100644 --- a/training/trader_training/diagnostics.py +++ b/training/trader_training/diagnostics.py @@ -6,8 +6,8 @@ from typing import Any import numpy as np import pandas as pd -from trader_training.io_utils import read_parquet, run_root, write_json, write_text -from trader_training.pm import _pm_frame, _simulate_open_trades, _threshold_candidates, _trade_metrics +from trader_training.io_utils import read_json, read_parquet, run_root, write_json, write_text +from trader_training.pm import _pm_frame, _price_plan_context, _simulate_open_trades, _threshold_candidates, _thresholds_from_config, _trade_metrics, default_pm_config from trader_training.schemas import FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT @@ -66,12 +66,19 @@ def _label_summary(root) -> dict[str, Any]: def _pm_summary(root) -> dict[str, Any]: summary: dict[str, Any] = {} + config_path = root / "pm-search" / "position_manager_config.json" + config = read_json(config_path)["config"] if config_path.is_file() else default_pm_config() + thresholds = _thresholds_from_config(config) + price_plan = _price_plan_context(root) for split_id in PM_EVAL_SPLITS: frame = _pm_frame(root, split_id) + selected_trades = _simulate_open_trades(frame, thresholds, config, price_plan) item = { "rows": len(frame), "score_distribution": _score_distribution(frame), - "gate_funnel": _gate_funnel(frame), + "active_thresholds": thresholds, + "gate_funnel": _gate_funnel(frame, thresholds), + "selected_trade_metrics": _trade_metrics(selected_trades), "relaxed_variants": _relaxed_variants(frame), "top_bucket_edge": _top_bucket_edge(frame), "grid_search_any_trade": _grid_trade_summary(frame), @@ -89,34 +96,29 @@ def _score_distribution(frame: pd.DataFrame) -> dict[str, dict[str, float]]: "market_risk_prob", "pred_long_expected_net_edge_bps", "pred_short_expected_net_edge_bps", + "model_pred_long_expected_net_edge_bps", + "model_pred_short_expected_net_edge_bps", "actual_long_expected_net_edge_bps", "actual_short_expected_net_edge_bps", ] - return {column: _quantiles(frame[column], (0.0, 0.05, 0.5, 0.95, 1.0)) for column in columns} + return {column: _quantiles(frame[column], (0.0, 0.05, 0.5, 0.95, 1.0)) for column in columns if column in frame.columns} -def _gate_funnel(frame: pd.DataFrame) -> dict[str, Any]: - thresholds = { - "long_open_prob": 0.54, - "short_open_prob": 0.54, - "min_entry_prob": 0.50, - "max_market_risk_prob": 0.55, - "min_expected_edge_bps": 1.0, - "min_direction_margin": 0.02, - } +def _gate_funnel(frame: pd.DataFrame, thresholds: dict[str, float]) -> dict[str, Any]: + direction_margin = (frame["long_prob"] - frame["short_prob"]).abs() long_steps = { - "long_prob >= 0.54": frame["long_prob"] >= thresholds["long_open_prob"], - "long_prob - short_prob >= 0.02": (frame["long_prob"] - frame["short_prob"]) >= thresholds["min_direction_margin"], - "long_entry_prob >= 0.50": frame["long_entry_prob"] >= thresholds["min_entry_prob"], - "market_risk_prob <= 0.55": frame["market_risk_prob"] <= thresholds["max_market_risk_prob"], - "pred_long_expected_net_edge_bps >= 1.0": frame["pred_long_expected_net_edge_bps"] >= thresholds["min_expected_edge_bps"], + f"long_prob > {thresholds['long_open_prob']}": frame["long_prob"] > thresholds["long_open_prob"], + f"abs(long_prob - short_prob) > {thresholds['min_direction_margin']}": direction_margin > thresholds["min_direction_margin"], + f"long_entry_prob > {thresholds['min_entry_prob']}": frame["long_entry_prob"] > thresholds["min_entry_prob"], + f"market_risk_prob < {thresholds['max_market_risk_prob']}": frame["market_risk_prob"] < thresholds["max_market_risk_prob"], + f"pred_long_expected_net_edge_bps > {thresholds['min_expected_edge_bps']}": frame["pred_long_expected_net_edge_bps"] > thresholds["min_expected_edge_bps"], } short_steps = { - "short_prob >= 0.54": frame["short_prob"] >= thresholds["short_open_prob"], - "short_prob - long_prob >= 0.02": (frame["short_prob"] - frame["long_prob"]) >= thresholds["min_direction_margin"], - "short_entry_prob >= 0.50": frame["short_entry_prob"] >= thresholds["min_entry_prob"], - "market_risk_prob <= 0.55": frame["market_risk_prob"] <= thresholds["max_market_risk_prob"], - "pred_short_expected_net_edge_bps >= 1.0": frame["pred_short_expected_net_edge_bps"] >= thresholds["min_expected_edge_bps"], + f"short_prob > {thresholds['short_open_prob']}": frame["short_prob"] > thresholds["short_open_prob"], + f"abs(long_prob - short_prob) > {thresholds['min_direction_margin']}": direction_margin > thresholds["min_direction_margin"], + f"short_entry_prob > {thresholds['min_entry_prob']}": frame["short_entry_prob"] > thresholds["min_entry_prob"], + f"market_risk_prob < {thresholds['max_market_risk_prob']}": frame["market_risk_prob"] < thresholds["max_market_risk_prob"], + f"pred_short_expected_net_edge_bps > {thresholds['min_expected_edge_bps']}": frame["pred_short_expected_net_edge_bps"] > thresholds["min_expected_edge_bps"], } return { "thresholds": thresholds, @@ -140,6 +142,7 @@ def _cumulative_gate_counts(steps: dict[str, pd.Series], total_rows: int) -> dic def _relaxed_variants(frame: pd.DataFrame) -> dict[str, Any]: variants = { "no_risk_no_edge": {"prob": 0.54, "entry": 0.50, "margin": 0.02, "risk": 1.0, "edge": -99.0}, + "rare_entry_low_prob": {"prob": 0.50, "entry": 0.03, "margin": 0.02, "risk": 0.98, "edge": 0.0}, "entry_only_55": {"prob": 0.0, "entry": 0.55, "margin": -99.0, "risk": 1.0, "edge": -99.0}, "direction_only_54": {"prob": 0.54, "entry": 0.0, "margin": 0.02, "risk": 1.0, "edge": -99.0}, "very_loose": {"prob": 0.50, "entry": 0.45, "margin": 0.0, "risk": 1.0, "edge": -99.0}, @@ -168,10 +171,10 @@ def _variant_trades(frame: pd.DataFrame, thresholds: dict[str, float]) -> pd.Dat ) long = frame.loc[long_mask].copy() long["side"] = "LONG" - long["actual_edge_bps"] = long["actual_long_expected_net_edge_bps"] + long["actual_edge_bps"] = long["long_trade_net_edge_bps"] short = frame.loc[short_mask].copy() short["side"] = "SHORT" - short["actual_edge_bps"] = short["actual_short_expected_net_edge_bps"] + short["actual_edge_bps"] = short["short_trade_net_edge_bps"] return pd.concat([long, short], ignore_index=True) @@ -189,7 +192,7 @@ def _plain_trade_metrics(trades: pd.DataFrame) -> dict[str, Any]: def _top_bucket_edge(frame: pd.DataFrame) -> dict[str, Any]: side = np.where(frame["long_prob"] >= frame["short_prob"], "LONG", "SHORT") side_prob = np.where(side == "LONG", frame["long_prob"], frame["short_prob"]) - side_edge = np.where(side == "LONG", frame["actual_long_expected_net_edge_bps"], frame["actual_short_expected_net_edge_bps"]) + side_edge = np.where(side == "LONG", frame["long_trade_net_edge_bps"], frame["short_trade_net_edge_bps"]) direction_frame = pd.DataFrame({"score": side_prob, "actual_edge_bps": side_edge, "side": side}) direction_top = {} for fraction in (0.01, 0.02, 0.05, 0.10): @@ -243,27 +246,19 @@ def _grid_trade_summary(frame: pd.DataFrame) -> dict[str, Any]: def _diagnostic_conclusion(pm_summary: dict[str, Any]) -> dict[str, Any]: - tune = pm_summary.get(TUNE_SPLIT, {}) - gate = tune.get("gate_funnel", {}) - long_single = gate.get("long", {}).get("single_gate_pass", {}) - short_single = gate.get("short", {}).get("single_gate_pass", {}) - pred_edge_blocked = ( - long_single.get("pred_long_expected_net_edge_bps >= 1.0", 0) == 0 - and short_single.get("pred_short_expected_net_edge_bps >= 1.0", 0) == 0 - ) - relaxed = tune.get("relaxed_variants", {}) - any_relaxed_positive = any(item.get("avg_actual_edge_bps", 0.0) > 0 for item in relaxed.values()) - if pred_edge_blocked and not any_relaxed_positive: + validation = pm_summary.get(VALIDATION_LOCKED_SPLIT, {}).get("selected_trade_metrics", {}) + stress = pm_summary.get(LATEST_STRESS_SPLIT, {}).get("selected_trade_metrics", {}) + if validation.get("trade_count", 0) == 0: return { - "status": "MODEL_SIGNAL_NOT_TRADABLE", - "plain_reason": "Entry 预测的净收益基本都是负数;即使放松风险和收益门槛,选出来的样本平均仍亏。", - "next_action": "优先重查 Entry 标签和价格计划,再考虑更强模型;不要直接放松 PM 阈值上线。", + "status": "NO_VALIDATION_TRADE", + "plain_reason": "当前 PM 阈值在验证集没有选出交易,主要要看挡单漏斗。", + "next_action": "先看 Direction、Risk、Entry 哪个门槛挡住,再做阈值实验。", } - if pred_edge_blocked: + if validation.get("avg_weighted_edge_bps", 0.0) <= 0 and stress.get("avg_weighted_edge_bps", 0.0) <= 0: return { - "status": "ENTRY_EDGE_GATE_BLOCKED", - "plain_reason": "PM 没有交易主要是 Entry 预测净收益过低。", - "next_action": "重训 Entry 或调整价格计划后再搜索 PM 阈值。", + "status": "PRICE_PLAN_OR_ENTRY_NOT_TRADABLE", + "plain_reason": "按固定止盈止损真实收益算,验证集和压力集选出来的交易平均都不赚钱。", + "next_action": "优先重新搜索价格计划,再重建 Entry 标签和模型;不要只放松 PM 阈值。", } return { "status": "NEEDS_MANUAL_REVIEW", @@ -312,6 +307,8 @@ def _markdown_report(payload: dict[str, Any]) -> str: lines.append(f"### {split_id}") lines.append("") lines.append(f"- 样本数: {item['rows']}") + lines.append(f"- 当前阈值: `{item['active_thresholds']}`") + lines.append(f"- 当前阈值选中交易: `{item['selected_trade_metrics']}`") lines.append(f"- 网格里有交易的候选数: {item['grid_search_any_trade']['candidates_with_trade']} / {item['grid_search_any_trade']['candidate_count']}") lines.append("") for side in ("long", "short"): diff --git a/training/trader_training/entry_feature_screen.py b/training/trader_training/entry_feature_screen.py index 0d28113..783c31c 100644 --- a/training/trader_training/entry_feature_screen.py +++ b/training/trader_training/entry_feature_screen.py @@ -24,9 +24,11 @@ def screen_entry_features(args: Any) -> None: min_bucket_rows = int(args.min_bucket_rows or 300) rows: list[dict[str, Any]] = [] + edge_source_by_side: dict[str, str] = {} for side in ("LONG", "SHORT"): target_col = "long_entry_target" if side == "LONG" else "short_entry_target" - edge_col = "long_expected_net_edge_bps" if side == "LONG" else "short_expected_net_edge_bps" + edge_col = _screen_edge_column(dataset, side) + edge_source_by_side[side] = edge_col baselines = _split_baselines(dataset, target_col, edge_col) for feature in FEATURE_ORDER: rows.extend(_feature_rows(dataset, feature, side, target_col, edge_col, baselines)) @@ -43,6 +45,7 @@ def screen_entry_features(args: Any) -> None: "bucket_metric_count": int(len(bucket_metrics)), "candidate_count": int(len(candidates)), "min_bucket_rows": min_bucket_rows, + "edge_source_by_side": edge_source_by_side, "selection_rule": "bucket boundaries are learned on fit_inner; candidate is picked by tune_inner and checked on validation_locked/latest_stress", } write_json(root / "diagnostics" / "entry_feature_screen_result.json", result) @@ -59,6 +62,14 @@ def screen_entry_features(args: Any) -> None: ) +def _screen_edge_column(dataset: pd.DataFrame, side: str) -> str: + prefix = "long" if side == "LONG" else "short" + actual_col = f"{prefix}_actual_plan_net_edge_bps" + if actual_col in dataset.columns: + return actual_col + return f"{prefix}_expected_net_edge_bps" + + def _split_baselines(dataset: pd.DataFrame, target_col: str, edge_col: str) -> dict[str, dict[str, float]]: baselines: dict[str, dict[str, float]] = {} for split_id in ALL_SPLITS: @@ -225,6 +236,7 @@ def _markdown_report(result: dict[str, Any], candidates: pd.DataFrame) -> str: "", "这份报告只回答一个问题:历史数据里,单个特征的某些区间有没有稳定变好。", "", + "- 如果数据里有真实出场净收益,本报告用真实出场净收益;没有时才退回训练收益标签。", "- `tune_inner` 用来挑候选区间。", "- `validation_locked` 和 `latest_stress` 用来检查这个区间是不是出了训练样本也还能站住。", "- `stable_positive_edge=true` 代表这个区间在三个检查集里的平均净收益都大于 0。", @@ -237,6 +249,8 @@ def _markdown_report(result: dict[str, Any], candidates: pd.DataFrame) -> str: f"- 分桶明细数: `{result['bucket_metric_count']}`", f"- 候选数: `{result['candidate_count']}`", f"- 最小分桶行数: `{result['min_bucket_rows']}`", + f"- 做多收益来源: `{result['edge_source_by_side'].get('LONG')}`", + f"- 做空收益来源: `{result['edge_source_by_side'].get('SHORT')}`", "", ] if candidates.empty: diff --git a/training/trader_training/labels.py b/training/trader_training/labels.py index aa1b9ac..9253b7a 100644 --- a/training/trader_training/labels.py +++ b/training/trader_training/labels.py @@ -24,7 +24,21 @@ from trader_training.schemas import LABEL_VERSION DEFAULT_LABEL_CONFIG = { "direction": {"horizon_minutes": 45, "long_threshold_bps": 5.0, "short_threshold_bps": -5.0}, - "entry": {"max_hold_minutes": 45, "target_bps": 12.0, "stop_bps": 8.0, "min_expected_net_edge_bps": 3.0}, + "entry": { + "max_hold_minutes": 45, + "target_bps": 12.0, + "stop_bps": 8.0, + "min_expected_net_edge_bps": 3.0, + "plan_method": "FIXED_TARGET_STOP_V1", + "target_method": "PRICE_PLAN_OUTCOME_V1", + "min_plan_net_edge_bps": 0.0, + "max_entry_mae_bps": 12.0, + "partial_take_1_ratio": 0.50, + "partial_take_2_ratio": 0.25, + "second_target_bps": 24.0, + "trailing_stop_bps": 10.0, + "breakeven_after_first_target": True, + }, "continue": {"horizon_minutes": 45, "min_expected_continue_edge_bps": 5.0}, "exit": {"horizon_minutes": 45, "adverse_move_bps": 20.0, "stagnation_abs_return_bps": 5.0}, "risk": { @@ -92,6 +106,7 @@ PATH_STAT_COLUMNS = [ "ambiguous_hit", "time_to_target_ms", "time_to_stop_ms", + "time_to_exit_ms", "gross_edge_bps", "future_return_bps", "mfe_bps", @@ -112,7 +127,14 @@ def _first_hit_index(hit_window: np.ndarray) -> tuple[np.ndarray, np.ndarray]: return hit_any, first_idx -def _path_stats_for_group(group: pd.DataFrame, side: str, horizon: int, target_bps: float, stop_bps: float) -> pd.DataFrame: +def _path_stats_for_group( + group: pd.DataFrame, + side: str, + horizon: int, + target_bps: float, + stop_bps: float, + plan_config: dict[str, Any] | None = None, +) -> pd.DataFrame: if len(group) <= horizon: return _empty_path_stats_frame() @@ -151,6 +173,29 @@ def _path_stats_for_group(group: pd.DataFrame, side: str, horizon: int, target_b else: realized_vol_bps = np.full(len(entry), np.nan) + method = str((plan_config or {}).get("plan_method", "FIXED_TARGET_STOP_V1")) + if method == "DYNAMIC_TRAILING_V1": + return _dynamic_path_stats_for_group( + grouped, + side, + horizon, + target_bps, + stop_bps, + close, + high, + low, + spread, + open_ms, + valid, + future_high, + future_low, + future_spread, + future_realized_vol_bps=realized_vol_bps, + plan_config=plan_config or {}, + ) + if method != "FIXED_TARGET_STOP_V1": + raise ValueError(f"unsupported entry plan_method: {method}") + if side == "LONG": target_price = entry * (1.0 + target_bps / 10000.0) stop_price = entry * (1.0 - stop_bps / 10000.0) @@ -175,6 +220,11 @@ def _path_stats_for_group(group: pd.DataFrame, side: str, horizon: int, target_b stop_hit = stop_any & (first_stop_idx <= first_target_idx) timeout_hit = ~(target_hit | stop_hit) gross_edge_bps = np.where(target_hit, target_bps, np.where(stop_hit, -stop_bps, future_return_bps)) + time_to_exit_ms = np.where( + target_hit, + (first_target_idx + 1) * 60_000, + np.where(stop_hit, (first_stop_idx + 1) * 60_000, horizon * 60_000), + ) out = pd.DataFrame( { @@ -187,6 +237,7 @@ def _path_stats_for_group(group: pd.DataFrame, side: str, horizon: int, target_b "ambiguous_hit": ambiguous_hit.astype("int8"), "time_to_target_ms": np.where(target_hit, (first_target_idx + 1) * 60_000, -1).astype("int64"), "time_to_stop_ms": np.where(stop_hit, (first_stop_idx + 1) * 60_000, -1).astype("int64"), + "time_to_exit_ms": time_to_exit_ms.astype("int64"), "gross_edge_bps": gross_edge_bps.astype("float64"), "future_return_bps": future_return_bps.astype("float64"), "mfe_bps": mfe_bps.astype("float64"), @@ -198,27 +249,184 @@ def _path_stats_for_group(group: pd.DataFrame, side: str, horizon: int, target_b return out.loc[valid, PATH_STAT_COLUMNS].reset_index(drop=True) -def _build_path_stats(replay: pd.DataFrame, horizon: int, target_bps: float, stop_bps: float) -> pd.DataFrame: +def _dynamic_path_stats_for_group( + grouped: pd.DataFrame, + side: str, + horizon: int, + target_bps: float, + stop_bps: float, + close: np.ndarray, + high: np.ndarray, + low: np.ndarray, + spread: np.ndarray, + open_ms: np.ndarray, + valid: np.ndarray, + future_high: np.ndarray, + future_low: np.ndarray, + future_spread: np.ndarray, + future_realized_vol_bps: np.ndarray, + plan_config: dict[str, Any], +) -> pd.DataFrame: + entry = close[:-horizon] + exit_price = close[horizon:] + current_open_ms = open_ms[:-horizon] + future_close = sliding_window_view(close[1:], horizon) + with np.errstate(all="ignore"): + high_max = np.nanmax(future_high, axis=1) + low_min = np.nanmin(future_low, axis=1) + spread_p80 = np.nanquantile(future_spread, 0.8, axis=1) + + take1_ratio = float(plan_config.get("partial_take_1_ratio", 0.50)) + take2_ratio = float(plan_config.get("partial_take_2_ratio", 0.25)) + take1_ratio = float(np.clip(take1_ratio, 0.0, 1.0)) + take2_ratio = float(np.clip(take2_ratio, 0.0, max(0.0, 1.0 - take1_ratio))) + target2_bps = float(plan_config.get("second_target_bps", target_bps * 2.0)) + trailing_stop_bps = float(plan_config.get("trailing_stop_bps", stop_bps)) + breakeven_after_first = bool(plan_config.get("breakeven_after_first_target", True)) + + n = len(entry) + active = np.ones(n, dtype=bool) + first_target_done = np.zeros(n, dtype=bool) + second_target_done = np.zeros(n, dtype=bool) + bad_stop_done = np.zeros(n, dtype=bool) + trailing_exit_done = np.zeros(n, dtype=bool) + remaining = np.ones(n, dtype="float64") + gross = np.zeros(n, dtype="float64") + first_target_idx = np.full(n, horizon + 1, dtype="int64") + stop_idx = np.full(n, horizon + 1, dtype="int64") + exit_idx = np.full(n, horizon, dtype="int64") + + if side == "LONG": + high_water = entry.copy() + for step in range(horizon): + h = future_high[:, step] + l = future_low[:, step] + prior_high_water = high_water.copy() + trailing_stop_price = prior_high_water * (1.0 - trailing_stop_bps / 10000.0) + if breakeven_after_first: + trailing_stop_price = np.maximum(trailing_stop_price, entry) + stop_price = np.where(first_target_done, trailing_stop_price, entry * (1.0 - stop_bps / 10000.0)) + stop_now = active & (l <= stop_price) + stop_gross = (stop_price / entry - 1.0) * 10000.0 + gross = np.where(stop_now, gross + remaining * stop_gross, gross) + trailing_exit_done |= stop_now & first_target_done + bad_stop_done |= stop_now & (~first_target_done) + stop_idx = np.where(stop_now, step, stop_idx) + exit_idx = np.where(stop_now, step, exit_idx) + remaining = np.where(stop_now, 0.0, remaining) + active &= ~stop_now + + first_now = active & (~first_target_done) & (h >= entry * (1.0 + target_bps / 10000.0)) + gross = np.where(first_now, gross + take1_ratio * target_bps, gross) + remaining = np.where(first_now, remaining - take1_ratio, remaining) + first_target_done |= first_now + first_target_idx = np.where(first_now, step, first_target_idx) + + second_now = active & first_target_done & (~second_target_done) & (h >= entry * (1.0 + target2_bps / 10000.0)) + gross = np.where(second_now, gross + take2_ratio * target2_bps, gross) + remaining = np.where(second_now, remaining - take2_ratio, remaining) + second_target_done |= second_now + + high_water = np.maximum(high_water, h) + timeout_return = (exit_price / entry - 1.0) * 10000.0 + future_return_bps = timeout_return + mfe_bps = np.maximum((high_max / entry - 1.0) * 10000.0, 0.0) + mae_bps = np.maximum((entry / low_min - 1.0) * 10000.0, 0.0) + else: + low_water = entry.copy() + for step in range(horizon): + h = future_high[:, step] + l = future_low[:, step] + prior_low_water = low_water.copy() + trailing_stop_price = prior_low_water * (1.0 + trailing_stop_bps / 10000.0) + if breakeven_after_first: + trailing_stop_price = np.minimum(trailing_stop_price, entry) + stop_price = np.where(first_target_done, trailing_stop_price, entry * (1.0 + stop_bps / 10000.0)) + stop_now = active & (h >= stop_price) + stop_gross = (entry / stop_price - 1.0) * 10000.0 + gross = np.where(stop_now, gross + remaining * stop_gross, gross) + trailing_exit_done |= stop_now & first_target_done + bad_stop_done |= stop_now & (~first_target_done) + stop_idx = np.where(stop_now, step, stop_idx) + exit_idx = np.where(stop_now, step, exit_idx) + remaining = np.where(stop_now, 0.0, remaining) + active &= ~stop_now + + first_now = active & (~first_target_done) & (l <= entry * (1.0 - target_bps / 10000.0)) + gross = np.where(first_now, gross + take1_ratio * target_bps, gross) + remaining = np.where(first_now, remaining - take1_ratio, remaining) + first_target_done |= first_now + first_target_idx = np.where(first_now, step, first_target_idx) + + second_now = active & first_target_done & (~second_target_done) & (l <= entry * (1.0 - target2_bps / 10000.0)) + gross = np.where(second_now, gross + take2_ratio * target2_bps, gross) + remaining = np.where(second_now, remaining - take2_ratio, remaining) + second_target_done |= second_now + + low_water = np.minimum(low_water, l) + timeout_return = (entry / exit_price - 1.0) * 10000.0 + future_return_bps = timeout_return + mfe_bps = np.maximum((entry / low_min - 1.0) * 10000.0, 0.0) + mae_bps = np.maximum((high_max / entry - 1.0) * 10000.0, 0.0) + + timeout_now = active + gross = np.where(timeout_now, gross + remaining * timeout_return, gross) + exit_idx = np.where(timeout_now, horizon - 1, exit_idx) + target_hit = first_target_done + stop_hit = bad_stop_done + timeout_hit = timeout_now + ambiguous_hit = np.zeros(n, dtype=bool) + out = pd.DataFrame( + { + "symbol": grouped["symbol"].iloc[0], + "open_time_ms": current_open_ms, + "side": side, + "target_hit": target_hit.astype("int8"), + "stop_hit": stop_hit.astype("int8"), + "timeout_hit": timeout_hit.astype("int8"), + "ambiguous_hit": ambiguous_hit.astype("int8"), + "time_to_target_ms": np.where(target_hit, (first_target_idx + 1) * 60_000, -1).astype("int64"), + "time_to_stop_ms": np.where(stop_hit | trailing_exit_done, (stop_idx + 1) * 60_000, -1).astype("int64"), + "time_to_exit_ms": ((exit_idx + 1) * 60_000).astype("int64"), + "gross_edge_bps": gross.astype("float64"), + "future_return_bps": future_return_bps.astype("float64"), + "mfe_bps": mfe_bps.astype("float64"), + "mae_bps": mae_bps.astype("float64"), + "future_spread_p80": spread_p80.astype("float64"), + "future_realized_vol_bps": future_realized_vol_bps.astype("float64"), + } + ) + return out.loc[valid, PATH_STAT_COLUMNS].reset_index(drop=True) + + +def _build_path_stats(replay: pd.DataFrame, horizon: int, target_bps: float, stop_bps: float, plan_config: dict[str, Any] | None = None) -> pd.DataFrame: frames: list[pd.DataFrame] = [] for symbol, group in replay.groupby("symbol", sort=False, observed=False): logging.info( - "trader.training.path_stats_group_start symbol=%s horizonMinutes=%s rowCount=%s", + "trader.training.path_stats_group_start symbol=%s horizonMinutes=%s planMethod=%s rowCount=%s", symbol, horizon, + (plan_config or {}).get("plan_method", "FIXED_TARGET_STOP_V1"), len(group), ) for side in ("LONG", "SHORT"): - stats = _path_stats_for_group(group, side, horizon, target_bps, stop_bps) + stats = _path_stats_for_group(group, side, horizon, target_bps, stop_bps, plan_config=plan_config) frames.append(stats) logging.info( - "trader.training.path_stats_side_done symbol=%s side=%s horizonMinutes=%s rowCount=%s", + "trader.training.path_stats_side_done symbol=%s side=%s horizonMinutes=%s planMethod=%s rowCount=%s", symbol, side, horizon, + (plan_config or {}).get("plan_method", "FIXED_TARGET_STOP_V1"), len(stats), ) out = pd.concat(frames, ignore_index=True) if frames else _empty_path_stats_frame() - logging.info("trader.training.path_stats_built horizonMinutes=%s rowCount=%s", horizon, len(out)) + logging.info( + "trader.training.path_stats_built horizonMinutes=%s planMethod=%s rowCount=%s", + horizon, + (plan_config or {}).get("plan_method", "FIXED_TARGET_STOP_V1"), + len(out), + ) return out @@ -235,8 +443,17 @@ def write_price_plan_context(args: Any) -> None: "targetDistanceBps": float(entry["target_bps"]), "maxHoldMinutes": int(entry["max_hold_minutes"]), "minExpectedNetEdgeBps": float(entry["min_expected_net_edge_bps"]), + "minPlanNetEdgeBps": float(entry.get("min_plan_net_edge_bps", 0.0)), + "maxEntryMaeBps": float(entry.get("max_entry_mae_bps", entry["stop_bps"])), "costBps": cost_bps, "entryLabelMethod": ENTRY_LABEL_METHOD, + "entryTargetMethod": str(entry.get("target_method", ENTRY_LABEL_METHOD)), + "entryPlanMethod": str(entry.get("plan_method", "FIXED_TARGET_STOP_V1")), + "partialTake1Ratio": float(entry.get("partial_take_1_ratio", 0.50)), + "partialTake2Ratio": float(entry.get("partial_take_2_ratio", 0.25)), + "secondTargetBps": float(entry.get("second_target_bps", float(entry["target_bps"]) * 2.0)), + "trailingStopBps": float(entry.get("trailing_stop_bps", float(entry["stop_bps"]))), + "breakevenAfterFirstTarget": bool(entry.get("breakeven_after_first_target", True)), } path = root / "label" / "price_plan_context.json" write_json(path, context) @@ -247,8 +464,17 @@ def write_price_plan_context(args: Any) -> None: "stop_bps": context["stopDistanceBps"], "max_hold_minutes": context["maxHoldMinutes"], "min_expected_net_edge_bps": context["minExpectedNetEdgeBps"], + "min_plan_net_edge_bps": context["minPlanNetEdgeBps"], + "max_entry_mae_bps": context["maxEntryMaeBps"], "cost_bps": context["costBps"], "entry_label_method": context["entryLabelMethod"], + "entry_target_method": context["entryTargetMethod"], + "entry_plan_method": context["entryPlanMethod"], + "partial_take_1_ratio": context["partialTake1Ratio"], + "partial_take_2_ratio": context["partialTake2Ratio"], + "second_target_bps": context["secondTargetBps"], + "trailing_stop_bps": context["trailingStopBps"], + "breakeven_after_first_target": context["breakevenAfterFirstTarget"], }]) write_parquet(root / "label" / "price_plan_context.parquet", frame) logging.info("trader.training.price_plan_written runId=%s path=%s", args.run_id, path) @@ -308,6 +534,7 @@ def build_entry_labels(args: Any) -> None: int(entry_conf["max_hold_minutes"]), float(entry_conf["target_bps"]), float(entry_conf["stop_bps"]), + plan_config=entry_conf, ) feature_columns = [ "sample_id", @@ -321,14 +548,28 @@ def build_entry_labels(args: Any) -> None: "realized_vol_15m_bps", ] merged = features[feature_columns].merge(stats, on=["symbol", "open_time_ms"], how="inner") + merged["actual_plan_net_edge_bps"] = merged["gross_edge_bps"] - cost_bps merged["max_achievable_gross_edge_bps"] = merged["mfe_bps"] merged["max_achievable_net_edge_bps"] = merged["max_achievable_gross_edge_bps"] - cost_bps - merged["expected_net_edge_bps"] = merged["gross_edge_bps"] - cost_bps - merged["entry_target"] = (merged["expected_net_edge_bps"] >= float(entry_conf["min_expected_net_edge_bps"])).astype("int8") + target_method = str(entry_conf.get("target_method", ENTRY_LABEL_METHOD)) + if target_method == "PRICE_PLAN_OUTCOME_V1": + merged["expected_net_edge_bps"] = merged["actual_plan_net_edge_bps"] + elif target_method in {"OPPORTUNITY_MFE_V1", "OPPORTUNITY_QUALITY_V1"}: + merged["expected_net_edge_bps"] = merged["max_achievable_net_edge_bps"] + else: + raise ValueError(f"unsupported entry target_method: {target_method}") + opportunity = merged["expected_net_edge_bps"] >= float(entry_conf["min_expected_net_edge_bps"]) + if target_method == "OPPORTUNITY_QUALITY_V1": + # MFE 只说明价格曾经给过机会;真实开仓还要确认这笔机会按计划能拿走, + # 并且过程中没有先承受过大的反向波动。 + min_plan_net_edge_bps = float(entry_conf.get("min_plan_net_edge_bps", 0.0)) + max_entry_mae_bps = float(entry_conf.get("max_entry_mae_bps", entry_conf["stop_bps"])) + opportunity = opportunity & (merged["actual_plan_net_edge_bps"] >= min_plan_net_edge_bps) & (merged["mae_bps"] <= max_entry_mae_bps) + merged["entry_target"] = opportunity.astype("int8") merged["price_plan_id"] = plan["pricePlanId"] merged["price_plan_hash"] = plan["pricePlanConfigHash"] merged["cost_bps"] = cost_bps - merged["label_method"] = ENTRY_LABEL_METHOD + merged["label_method"] = target_method merged["label_version"] = LABEL_VERSION out = merged[ [ @@ -344,10 +585,12 @@ def build_entry_labels(args: Any) -> None: "ambiguous_hit", "time_to_target_ms", "time_to_stop_ms", + "time_to_exit_ms", "gross_edge_bps", "future_return_bps", "mfe_bps", "mae_bps", + "actual_plan_net_edge_bps", "max_achievable_gross_edge_bps", "max_achievable_net_edge_bps", "cost_bps", @@ -393,7 +636,18 @@ def build_continue_exit_risk_labels(args: Any) -> None: horizon = int(labels["continue"]["horizon_minutes"]) target_bps = float(plan["targetDistanceBps"]) stop_bps = float(plan["stopDistanceBps"]) - stats = _build_path_stats(replay, horizon, target_bps, stop_bps) + plan_config = { + "plan_method": plan.get("entryPlanMethod", labels["entry"].get("plan_method", "FIXED_TARGET_STOP_V1")), + "partial_take_1_ratio": plan.get("partialTake1Ratio", labels["entry"].get("partial_take_1_ratio", 0.50)), + "partial_take_2_ratio": plan.get("partialTake2Ratio", labels["entry"].get("partial_take_2_ratio", 0.25)), + "second_target_bps": plan.get("secondTargetBps", labels["entry"].get("second_target_bps", target_bps * 2.0)), + "trailing_stop_bps": plan.get("trailingStopBps", labels["entry"].get("trailing_stop_bps", stop_bps)), + "breakeven_after_first_target": plan.get( + "breakevenAfterFirstTarget", + labels["entry"].get("breakeven_after_first_target", True), + ), + } + stats = _build_path_stats(replay, horizon, target_bps, stop_bps, plan_config=plan_config) long_stats = stats[stats["side"] == "LONG"].drop(columns=["side"]).add_prefix("long_") short_stats = stats[stats["side"] == "SHORT"].drop(columns=["side"]).add_prefix("short_") long_stats = long_stats.rename(columns={"long_symbol": "symbol", "long_open_time_ms": "open_time_ms"}) @@ -432,6 +686,7 @@ def build_continue_exit_risk_labels(args: Any) -> None: long_edge = merged["long_gross_edge_bps"] - cost_bps short_edge = merged["short_gross_edge_bps"] - cost_bps + dynamic_plan = str(plan_config.get("plan_method")) == "DYNAMIC_TRAILING_V1" path_risk = np.maximum(merged["long_mae_bps"], merged["short_mae_bps"]) max_path_move = np.maximum.reduce([merged["long_mfe_bps"], merged["short_mfe_bps"], path_risk]) if "ret_15m_bps" in merged.columns: @@ -449,8 +704,8 @@ def build_continue_exit_risk_labels(args: Any) -> None: "sample_id": merged["sample_id"], "symbol": merged["symbol"], "event_time": merged["event_time"], - "long_continue_target": ((long_edge >= min_continue) & (merged["long_stop_hit"] == 0)).astype("int8"), - "short_continue_target": ((short_edge >= min_continue) & (merged["short_stop_hit"] == 0)).astype("int8"), + "long_continue_target": ((long_edge >= min_continue) & ((merged["long_stop_hit"] == 0) | dynamic_plan)).astype("int8"), + "short_continue_target": ((short_edge >= min_continue) & ((merged["short_stop_hit"] == 0) | dynamic_plan)).astype("int8"), "long_expected_continue_edge_bps": long_edge, "short_expected_continue_edge_bps": short_edge, "split_id": merged["split_id"], diff --git a/training/trader_training/ofi_feature_experiment.py b/training/trader_training/ofi_feature_experiment.py new file mode 100644 index 0000000..c49fe2b --- /dev/null +++ b/training/trader_training/ofi_feature_experiment.py @@ -0,0 +1,890 @@ +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any + +import numpy as np +import pandas as pd +from sklearn.linear_model import HuberRegressor, LogisticRegression +from sklearn.metrics import accuracy_score, brier_score_loss, log_loss, mean_absolute_error, roc_auc_score +from sklearn.preprocessing import StandardScaler + +from trader_training.io_utils import ( + DEFAULT_RAW_ROOT, + read_json, + read_parquet, + run_root, + sha256_json, + to_utc_series, + write_json, + write_parquet, + write_text, +) +from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT + + +OFI_METHOD = "SNAPSHOT_DIFF_V1" +BOOK_OFI_FEATURES = [ + "ofi_l1_1m", + "ofi_l1_3m", + "ofi_l1_5m", + "ofi_l1_15m", + "mlofi_l5_1m", + "mlofi_l5_5m", + "mlofi_l20_1m", + "mlofi_l20_5m", + "mlofi_l5_l20_gap_1m", + "microprice_basis_change_1m_bps", + "microprice_basis_change_5m_bps", + "ofi_l1_5m_zscore_240m", + "mlofi_l20_5m_zscore_240m", +] +CROSS_OFI_FEATURES = [ + "ofi_l1_5m_clipped", + "ofi_l1_taker_5m", + "ofi_l1_spread_rank_5m", +] +OFI_FEATURES = [*BOOK_OFI_FEATURES, *CROSS_OFI_FEATURES] + +META_COLUMNS = [ + "sample_id", + "symbol", + "event_time", + "open_time_ms", + "split_id", + "walk_forward_fold", + "data_quality_flag", +] +ALL_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) +EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) + + +def run_ofi_feature_experiment(args: Any) -> None: + root = run_root(args) + baseline_root = args.data_root / "trader-v4" / "runs" / args.baseline_run_id + out_dir = root / "experiments" / "ofi_b2_mlofi" + raw_root = Path(args.raw_root or DEFAULT_RAW_ROOT) + logging.info( + "trader.training.ofi_experiment_started runId=%s baselineRunId=%s rawRoot=%s", + args.run_id, + args.baseline_run_id, + raw_root, + ) + + feature = _load_baseline_feature_frame(baseline_root) + ofi_delta = build_snapshot_diff_l1_ofi_features(raw_root, feature[["symbol", "event_time", "open_time_ms"]]) + dataset = _merge_feature_delta(feature, ofi_delta) + if args.max_rows_per_split: + dataset = _cap_rows_per_split(dataset, int(args.max_rows_per_split)) + delta_hash = write_parquet(out_dir / "ofi_feature_delta.parquet", ofi_delta) + dataset_hash = write_parquet(out_dir / "ofi_experiment_feature_frame.parquet", dataset) + write_json(out_dir / "ofi_feature_order.json", OFI_FEATURES) + write_json(out_dir / "ofi_feature_schema.json", _ofi_feature_schema()) + write_json(out_dir / "experiment_manifest.json", _experiment_manifest(args, baseline_root, raw_root, ofi_delta, dataset, delta_hash, dataset_hash)) + write_text(out_dir / "feature_delta_report.md", _feature_delta_report(ofi_delta, dataset)) + + direction = _load_direction_dataset(baseline_root, dataset) + entry = _load_entry_dataset(baseline_root, dataset) + results: dict[str, Any] = {} + prediction_frames: list[pd.DataFrame] = [] + for feature_set_name, columns in _feature_sets().items(): + direction_result, direction_predictions = _train_direction(direction, columns) + entry_result, entry_predictions = _train_entry(entry, columns) + results[feature_set_name] = {"DIRECTION": direction_result, "ENTRY": entry_result} + direction_predictions["model"] = "DIRECTION" + direction_predictions["feature_set"] = feature_set_name + entry_predictions["model"] = "ENTRY" + entry_predictions["feature_set"] = feature_set_name + prediction_frames.extend([direction_predictions, entry_predictions]) + logging.info( + "trader.training.ofi_feature_set_trained runId=%s featureSet=%s directionTuneShortAuc=%s entryTuneShortAuc=%s", + args.run_id, + feature_set_name, + direction_result.get(TUNE_SPLIT, {}).get("short_auc"), + entry_result.get("short_entry_prob", {}).get(TUNE_SPLIT, {}).get("auc"), + ) + + predictions = pd.concat(prediction_frames, ignore_index=True) if prediction_frames else pd.DataFrame() + write_parquet(out_dir / "direction_entry_predictions.parquet", predictions) + write_json(out_dir / "ofi_experiment_result.json", results) + write_text(out_dir / "model_compare_to_run10.md", _model_compare_report(args, baseline_root, results, dataset)) + write_text(out_dir / "backtest_compare_to_run10.md", _backtest_placeholder_report(args, baseline_root)) + write_text(out_dir / "contract_change_report.md", _contract_change_report()) + write_text(out_dir / "failure_cases_compare.md", _failure_case_placeholder_report(args)) + logging.info("trader.training.ofi_experiment_finished runId=%s report=%s", args.run_id, out_dir / "model_compare_to_run10.md") + + +def _load_baseline_feature_frame(baseline_root: Path) -> pd.DataFrame: + frame = read_parquet(baseline_root / "feature" / "feature_frame.parquet") + required = set(META_COLUMNS + FEATURE_ORDER) + missing = sorted(required.difference(frame.columns)) + if missing: + raise ValueError(f"baseline feature frame missing columns: {missing}") + frame = frame[frame["data_quality_flag"].isin(["OK", "PARTIAL_OPTIONAL"])].copy() + frame = frame[frame["split_id"].isin(ALL_SPLITS)].copy() + frame["event_time"] = to_utc_series(frame["event_time"]) + logging.info("trader.training.ofi_baseline_feature_loaded rowCount=%s splitCounts=%s", len(frame), frame["split_id"].value_counts().to_dict()) + return frame + + +def build_snapshot_diff_l1_ofi_features(raw_root: Path, replay_keys: pd.DataFrame) -> pd.DataFrame: + if replay_keys.empty: + return pd.DataFrame(columns=["symbol", "open_time_ms", *BOOK_OFI_FEATURES]) + keys = replay_keys.copy() + keys["event_time"] = to_utc_series(keys["event_time"]) + keys["event_date"] = keys["event_time"].dt.strftime("%Y-%m-%d") + frames: list[pd.DataFrame] = [] + for (symbol, event_date), _ in keys.groupby(["symbol", "event_date"], sort=True, observed=False): + path = raw_root / "table=book" / "exchange=BINANCE_FUTURES" / f"symbol={symbol}" / f"dt={event_date}" / "data.parquet" + if not path.is_file(): + logging.warning("trader.training.ofi_book_partition_missing symbol=%s eventDate=%s path=%s", symbol, event_date, path) + continue + day = _read_l1_book_day(path, symbol) + if not day.empty: + frames.append(day) + logging.info("trader.training.ofi_book_partition_loaded symbol=%s eventDate=%s minuteRows=%s path=%s", symbol, event_date, len(day), path) + if not frames: + raise ValueError(f"no book partitions loaded from {raw_root}") + minute_book = pd.concat(frames, ignore_index=True).sort_values(["symbol", "open_time_ms"]) + minute_book = minute_book.drop_duplicates(["symbol", "open_time_ms"], keep="last").reset_index(drop=True) + feature_frames = [] + for symbol, group in minute_book.groupby("symbol", sort=False, observed=False): + feature_frames.append(_compute_l1_ofi_for_symbol(group, str(symbol))) + features = pd.concat(feature_frames, ignore_index=True) if feature_frames else pd.DataFrame(columns=["symbol", "open_time_ms", *BOOK_OFI_FEATURES]) + wanted = keys[["symbol", "open_time_ms"]].drop_duplicates() + out = wanted.merge(features, on=["symbol", "open_time_ms"], how="left") + logging.info( + "trader.training.ofi_features_built wantedRows=%s matchedRows=%s featureRows=%s", + len(wanted), + int(out[BOOK_OFI_FEATURES].notna().all(axis=1).sum()), + len(features), + ) + return out + + +def _read_l1_book_day(path: Path, symbol: str) -> pd.DataFrame: + columns = ["origin_time"] + for side in ("bid", "ask"): + for level in range(20): + columns.extend([f"{side}_{level}_price", f"{side}_{level}_size"]) + book = pd.read_parquet(path, columns=columns) + if book.empty: + return pd.DataFrame(columns=["symbol", "open_time_ms", *columns[1:]]) + required = ["origin_time", "bid_0_price", "bid_0_size", "ask_0_price", "ask_0_size"] + book = book.dropna(subset=required).copy() + book["origin_time"] = to_utc_series(book["origin_time"]) + book["minute"] = book["origin_time"].dt.floor("min") + book = book.sort_values("origin_time").drop_duplicates("minute", keep="last") + if book.empty: + return pd.DataFrame(columns=["symbol", "open_time_ms", *columns[1:]]) + out = pd.DataFrame({"symbol": symbol, "open_time_ms": (book["minute"].astype("int64") // 1_000_000).astype("int64")}) + for column in columns[1:]: + out[column] = pd.to_numeric(book[column], errors="coerce").astype("float64") + return out + + +def _compute_l1_ofi_for_symbol(book: pd.DataFrame, symbol: str) -> pd.DataFrame: + group = book.sort_values("open_time_ms").reset_index(drop=True).copy() + gap = group["open_time_ms"].astype("int64").diff().ne(60_000) + segment = gap.cumsum() + level_ofi = [] + for level in range(20): + prev_bid_price = group.groupby(segment, sort=False)[f"bid_{level}_price"].shift(1) + prev_bid_size = group.groupby(segment, sort=False)[f"bid_{level}_size"].shift(1) + prev_ask_price = group.groupby(segment, sort=False)[f"ask_{level}_price"].shift(1) + prev_ask_size = group.groupby(segment, sort=False)[f"ask_{level}_size"].shift(1) + bid_part, ask_part = l1_snapshot_diff_ofi_quote( + group[f"bid_{level}_price"], + group[f"bid_{level}_size"], + group[f"ask_{level}_price"], + group[f"ask_{level}_size"], + prev_bid_price, + prev_bid_size, + prev_ask_price, + prev_ask_size, + ) + level_ofi.append((bid_part + ask_part).rename(f"ofi_level_{level}_quote")) + level_frame = pd.concat(level_ofi, axis=1) + group["ofi_l1_event_quote"] = level_frame["ofi_level_0_quote"] + group["mlofi_l5_event_quote"] = level_frame[[f"ofi_level_{level}_quote" for level in range(5)]].sum(axis=1, min_count=5) + group["mlofi_l20_event_quote"] = level_frame.sum(axis=1, min_count=20) + group["l1_depth_quote"] = _depth_quote(group, 1) + group["l5_depth_quote"] = _depth_quote(group, 5) + group["l20_depth_quote"] = _depth_quote(group, 20) + for window in (1, 3, 5, 15): + group[f"ofi_l1_{window}m"] = _rolling_normalized(group, segment, "ofi_l1_event_quote", "l1_depth_quote", window) + for window in (1, 5): + group[f"mlofi_l5_{window}m"] = _rolling_normalized(group, segment, "mlofi_l5_event_quote", "l5_depth_quote", window) + group[f"mlofi_l20_{window}m"] = _rolling_normalized(group, segment, "mlofi_l20_event_quote", "l20_depth_quote", window) + group["mlofi_l5_l20_gap_1m"] = group["mlofi_l5_1m"] - group["mlofi_l20_1m"] + mid = (group["bid_0_price"] + group["ask_0_price"]) / 2.0 + microprice = (group["ask_0_price"] * group["bid_0_size"] + group["bid_0_price"] * group["ask_0_size"]) / (group["bid_0_size"] + group["ask_0_size"]).clip(lower=1e-12) + group["microprice_basis_bps"] = (microprice / mid - 1.0) * 10000.0 + group["microprice_basis_change_1m_bps"] = group.groupby(segment, sort=False)["microprice_basis_bps"].diff(1) + group["microprice_basis_change_5m_bps"] = group.groupby(segment, sort=False)["microprice_basis_bps"].diff(5) + group["ofi_l1_5m_zscore_240m"] = _rolling_zscore(group, segment, "ofi_l1_5m", 240) + group["mlofi_l20_5m_zscore_240m"] = _rolling_zscore(group, segment, "mlofi_l20_5m", 240) + out = group[["symbol", "open_time_ms", *BOOK_OFI_FEATURES]].replace([np.inf, -np.inf], np.nan) + for column in BOOK_OFI_FEATURES: + out[column] = pd.to_numeric(out[column], errors="coerce").astype("float32") + logging.info( + "trader.training.ofi_symbol_features_built symbol=%s minuteRows=%s fullFeatureRows=%s", + symbol, + len(out), + int(out[BOOK_OFI_FEATURES].notna().all(axis=1).sum()), + ) + return out + + +def _depth_quote(group: pd.DataFrame, level_count: int) -> pd.Series: + total = pd.Series(0.0, index=group.index, dtype="float64") + for level in range(level_count): + total = total + group[f"bid_{level}_price"] * group[f"bid_{level}_size"] + group[f"ask_{level}_price"] * group[f"ask_{level}_size"] + return total + + +def _rolling_normalized(group: pd.DataFrame, segment: pd.Series, event_column: str, depth_column: str, window: int) -> pd.Series: + summed = group.groupby(segment, sort=False)[event_column].rolling(window, min_periods=window).sum().reset_index(level=0, drop=True) + averaged_depth = group.groupby(segment, sort=False)[depth_column].rolling(window, min_periods=window).mean().reset_index(level=0, drop=True) + return summed / averaged_depth.clip(lower=1e-12) + + +def _rolling_zscore(group: pd.DataFrame, segment: pd.Series, column: str, window: int) -> pd.Series: + rolling = group.groupby(segment, sort=False)[column].rolling(window, min_periods=window) + mean = rolling.mean().reset_index(level=0, drop=True) + std = rolling.std().reset_index(level=0, drop=True).replace(0, np.nan) + return (group[column] - mean) / std + + +def l1_snapshot_diff_ofi_quote( + bid_price: pd.Series, + bid_size: pd.Series, + ask_price: pd.Series, + ask_size: pd.Series, + prev_bid_price: pd.Series, + prev_bid_size: pd.Series, + prev_ask_price: pd.Series, + prev_ask_size: pd.Series, +) -> tuple[pd.Series, pd.Series]: + bid = pd.to_numeric(bid_price, errors="coerce") + bid_sz = pd.to_numeric(bid_size, errors="coerce") + ask = pd.to_numeric(ask_price, errors="coerce") + ask_sz = pd.to_numeric(ask_size, errors="coerce") + prev_bid = pd.to_numeric(prev_bid_price, errors="coerce") + prev_bid_sz = pd.to_numeric(prev_bid_size, errors="coerce") + prev_ask = pd.to_numeric(prev_ask_price, errors="coerce") + prev_ask_sz = pd.to_numeric(prev_ask_size, errors="coerce") + valid = prev_bid.notna() & prev_bid_sz.notna() & prev_ask.notna() & prev_ask_sz.notna() + + bid_part = pd.Series(np.nan, index=bid.index, dtype="float64") + ask_part = pd.Series(np.nan, index=ask.index, dtype="float64") + bid_up = valid & bid.gt(prev_bid) + bid_same = valid & bid.eq(prev_bid) + bid_down = valid & bid.lt(prev_bid) + bid_part.loc[bid_up] = bid_sz.loc[bid_up] * bid.loc[bid_up] + bid_part.loc[bid_same] = (bid_sz.loc[bid_same] - prev_bid_sz.loc[bid_same]) * bid.loc[bid_same] + bid_part.loc[bid_down] = -prev_bid_sz.loc[bid_down] * prev_bid.loc[bid_down] + + ask_down = valid & ask.lt(prev_ask) + ask_same = valid & ask.eq(prev_ask) + ask_up = valid & ask.gt(prev_ask) + ask_part.loc[ask_down] = -ask_sz.loc[ask_down] * ask.loc[ask_down] + ask_part.loc[ask_same] = -(ask_sz.loc[ask_same] - prev_ask_sz.loc[ask_same]) * ask.loc[ask_same] + ask_part.loc[ask_up] = prev_ask_sz.loc[ask_up] * prev_ask.loc[ask_up] + return bid_part, ask_part + + +def _merge_feature_delta(feature: pd.DataFrame, delta: pd.DataFrame) -> pd.DataFrame: + merged = feature.merge(delta, on=["symbol", "open_time_ms"], how="left") + merged["ofi_l1_5m_clipped"] = pd.to_numeric(merged["ofi_l1_5m"], errors="coerce").clip(-5.0, 5.0) + merged["ofi_l1_taker_5m"] = merged["ofi_l1_5m_clipped"] * pd.to_numeric(merged["taker_imbalance_5m"], errors="coerce") + merged["ofi_l1_spread_rank_5m"] = merged["ofi_l1_5m_clipped"] * pd.to_numeric(merged["spread_rank_24h_pct"], errors="coerce") + before = len(merged) + merged = merged.dropna(subset=OFI_FEATURES).copy() + logging.info( + "trader.training.ofi_feature_delta_merged rowBefore=%s rowAfter=%s droppedRows=%s splitCounts=%s", + before, + len(merged), + before - len(merged), + merged["split_id"].value_counts().to_dict(), + ) + if merged.empty: + raise ValueError("OFI feature experiment has no rows after merging feature delta") + return merged + + +def _cap_rows_per_split(frame: pd.DataFrame, max_rows_per_split: int) -> pd.DataFrame: + capped = [] + for split_id, part in frame.sort_values("event_time").groupby("split_id", sort=False, observed=False): + if len(part) > max_rows_per_split: + part = part.tail(max_rows_per_split).copy() + capped.append(part) + logging.info("trader.training.ofi_split_capped splitId=%s rowCount=%s maxRows=%s", split_id, len(part), max_rows_per_split) + return pd.concat(capped, ignore_index=True) + + +def _load_direction_dataset(baseline_root: Path, feature: pd.DataFrame) -> pd.DataFrame: + labels = read_parquet(baseline_root / "label" / "direction_labels.parquet") + required = {"sample_id", "long_target", "short_target", "neutral_target", "future_return_bps"} + missing = sorted(required.difference(labels.columns)) + if missing: + raise ValueError(f"direction labels missing columns: {missing}") + dataset = feature.merge(labels[list(required)], on="sample_id", how="inner") + logging.info("trader.training.ofi_direction_dataset_loaded rowCount=%s", len(dataset)) + return dataset + + +def _load_entry_dataset(baseline_root: Path, feature: pd.DataFrame) -> pd.DataFrame: + labels = read_parquet(baseline_root / "label" / "entry_labels.parquet") + required = {"sample_id", "side", "entry_target", "expected_net_edge_bps"} + missing = sorted(required.difference(labels.columns)) + if missing: + raise ValueError(f"entry labels missing columns: {missing}") + long = labels[labels["side"].eq("LONG")][["sample_id", "entry_target", "expected_net_edge_bps"]].rename( + columns={"entry_target": "long_entry_target", "expected_net_edge_bps": "long_expected_net_edge_bps"} + ) + short = labels[labels["side"].eq("SHORT")][["sample_id", "entry_target", "expected_net_edge_bps"]].rename( + columns={"entry_target": "short_entry_target", "expected_net_edge_bps": "short_expected_net_edge_bps"} + ) + pivot = long.merge(short, on="sample_id", how="inner") + dataset = feature.merge(pivot, on="sample_id", how="inner") + logging.info("trader.training.ofi_entry_dataset_loaded rowCount=%s", len(dataset)) + return dataset + + +def _feature_sets() -> dict[str, list[str]]: + return { + "market_only": FEATURE_ORDER, + "market_plus_ofi": [*FEATURE_ORDER, *OFI_FEATURES], + } + + +def _train_direction(frame: pd.DataFrame, feature_columns: list[str]) -> tuple[dict[str, Any], pd.DataFrame]: + train = frame[frame["split_id"].eq(FIT_SPLIT)].copy() + if train.empty: + raise ValueError("direction experiment has no fit_inner rows") + scaler = StandardScaler() + x_train = scaler.fit_transform(train[feature_columns].astype("float32")) + y_train = train[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1) + model = LogisticRegression(max_iter=500) + model.fit(x_train, y_train) + train_prior = train[["long_target", "short_target", "neutral_target"]].to_numpy(dtype=float).mean(axis=0) + + metrics: dict[str, Any] = {"feature_count": len(feature_columns), "feature_hash": sha256_json(feature_columns)} + prediction_frames = [] + for split_id in ALL_SPLITS: + part = frame[frame["split_id"].eq(split_id)].copy() + if part.empty: + continue + x = scaler.transform(part[feature_columns].astype("float32")) + proba = model.predict_proba(x) + y = part[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1) + metrics[split_id] = _direction_metrics(y, proba, train_prior) + pred = part[["sample_id", "symbol", "event_time", "split_id"]].copy() + pred["long_prob"] = proba[:, 0].astype("float32") + pred["short_prob"] = proba[:, 1].astype("float32") + pred["neutral_prob"] = proba[:, 2].astype("float32") + pred["label_long"] = (y == 0).astype("int8") + pred["label_short"] = (y == 1).astype("int8") + pred["label_neutral"] = (y == 2).astype("int8") + prediction_frames.append(pred) + return metrics, pd.concat(prediction_frames, ignore_index=True) + + +def _train_entry(frame: pd.DataFrame, feature_columns: list[str]) -> tuple[dict[str, Any], pd.DataFrame]: + train = frame[frame["split_id"].eq(FIT_SPLIT)].copy() + if train.empty: + raise ValueError("entry experiment has no fit_inner rows") + scaler = StandardScaler() + x_train = scaler.fit_transform(train[feature_columns].astype("float32")) + x_by_split = { + split_id: scaler.transform(frame[frame["split_id"].eq(split_id)][feature_columns].astype("float32")) + for split_id in ALL_SPLITS + if not frame[frame["split_id"].eq(split_id)].empty + } + specs = [ + ("long_entry_prob", "binary", "long_entry_target"), + ("short_entry_prob", "binary", "short_entry_target"), + ("long_expected_net_edge_bps", "regression", "long_expected_net_edge_bps"), + ("short_expected_net_edge_bps", "regression", "short_expected_net_edge_bps"), + ] + results: dict[str, Any] = {"feature_count": len(feature_columns), "feature_hash": sha256_json(feature_columns)} + split_predictions: dict[str, pd.DataFrame] = { + split_id: frame[frame["split_id"].eq(split_id)][["sample_id", "symbol", "event_time", "split_id"]].copy().reset_index(drop=True) + for split_id in x_by_split + } + for name, kind, target in specs: + y_train = pd.to_numeric(train[target], errors="coerce").fillna(0.0).to_numpy() + if kind == "binary": + model = LogisticRegression(max_iter=500) + model.fit(x_train, y_train.astype(int)) + else: + model = HuberRegressor(alpha=0.001, epsilon=1.35, max_iter=500) + model.fit(x_train, y_train.astype(float)) + results[name] = {} + for split_id, x in x_by_split.items(): + part = frame[frame["split_id"].eq(split_id)].copy() + y = pd.to_numeric(part[target], errors="coerce").fillna(0.0).to_numpy() + if kind == "binary": + pred = model.predict_proba(x)[:, 1] + results[name][split_id] = _binary_metrics(y_train.astype(int), y.astype(int), pred) + else: + pred = model.predict(x) + results[name][split_id] = _regression_metrics(y_train.astype(float), y.astype(float), pred) + split_predictions[split_id][name] = pred.astype("float32") + split_predictions[split_id][f"label_{target}"] = y + return results, pd.concat(split_predictions.values(), ignore_index=True) + + +def _direction_metrics(y: np.ndarray, proba: np.ndarray, train_prior: np.ndarray) -> dict[str, Any]: + labels = [0, 1, 2] + train_prior = np.asarray(train_prior, dtype=float) + train_prior = train_prior / train_prior.sum() if train_prior.sum() > 0 else np.full(3, 1.0 / 3.0) + constant = np.tile(train_prior.reshape(1, -1), (len(y), 1)) + one_hot = np.eye(3, dtype=float)[y] + clipped = _clip_normalize(proba) + constant_clipped = _clip_normalize(constant) + out: dict[str, Any] = { + "row_count": int(len(y)), + "accuracy": float(accuracy_score(y, proba.argmax(axis=1))), + "logloss": float(log_loss(y, clipped, labels=labels)), + "constant_logloss": float(log_loss(y, constant_clipped, labels=labels)), + "brier_multiclass": float(np.mean(np.sum((one_hot - proba) ** 2, axis=1))), + "constant_brier_multiclass": float(np.mean(np.sum((one_hot - constant) ** 2, axis=1))), + } + for idx, name in enumerate(("long", "short", "neutral")): + target = (y == idx).astype(int) + if target.sum() >= 200 and (len(target) - target.sum()) >= 200: + out[f"{name}_auc"] = float(roc_auc_score(target, proba[:, idx])) + top_count = max(1, int(len(y) * 0.10)) + top_idx = np.argsort(proba.max(axis=1))[-top_count:] + out["top10_hit_rate"] = float((proba.argmax(axis=1)[top_idx] == y[top_idx]).mean()) + out["all_hit_rate"] = float((proba.argmax(axis=1) == y).mean()) + out["logloss_vs_constant_ratio"] = float(out["logloss"] / out["constant_logloss"]) if out["constant_logloss"] > 0 else None + out["brier_vs_constant_ratio"] = float(out["brier_multiclass"] / out["constant_brier_multiclass"]) if out["constant_brier_multiclass"] > 0 else None + return out + + +def _binary_metrics(y_train: np.ndarray, y: np.ndarray, proba: np.ndarray) -> dict[str, Any]: + train_rate = float(np.mean(y_train)) + constant = np.full(len(y), train_rate) + out: dict[str, Any] = { + "row_count": int(len(y)), + "positive_rate": float(np.mean(y)) if len(y) else 0.0, + "brier": float(brier_score_loss(y, proba)), + "constant_brier": float(brier_score_loss(y, constant)), + } + if len(np.unique(y)) == 2: + out["auc"] = float(roc_auc_score(y, proba)) + top_count = max(1, int(len(y) * 0.10)) + top_idx = np.argsort(proba)[-top_count:] + out["top10_hit_rate"] = float(np.mean(y[top_idx])) + out["all_hit_rate"] = float(np.mean(y)) + out["brier_vs_constant_ratio"] = float(out["brier"] / out["constant_brier"]) if out["constant_brier"] > 0 else None + return out + + +def _regression_metrics(y_train: np.ndarray, y: np.ndarray, pred: np.ndarray) -> dict[str, Any]: + train_median = float(np.median(y_train)) if len(y_train) else 0.0 + constant = np.full(len(y), train_median) + mae = float(mean_absolute_error(y, pred)) + constant_mae = float(mean_absolute_error(y, constant)) + return { + "row_count": int(len(y)), + "mae": mae, + "constant_mae": constant_mae, + "mae_vs_constant_ratio": float(mae / constant_mae) if constant_mae > 0 else None, + "train_target_median": train_median, + } + + +def _clip_normalize(values: np.ndarray) -> np.ndarray: + values = np.clip(np.asarray(values, dtype=float), 1e-6, 1.0) + return values / values.sum(axis=1, keepdims=True) + + +def _experiment_manifest( + args: Any, + baseline_root: Path, + raw_root: Path, + ofi_delta: pd.DataFrame, + dataset: pd.DataFrame, + delta_hash: str, + dataset_hash: str, +) -> dict[str, Any]: + return { + "experiment": "ofi_l1_microprice_diagnostic_v1", + "run_id": args.run_id, + "baseline_run_id": args.baseline_run_id, + "baseline_root": str(baseline_root), + "raw_root": str(raw_root), + "ofi_method": OFI_METHOD, + "uses_event_stream_ofi": False, + "normalization": "quote_notional_over_average_l1_depth_quote", + "new_features": OFI_FEATURES, + "formal_model_contract_changed": False, + "java_contract_changed": False, + "label_changed": False, + "pm_threshold_changed": False, + "delta_row_count": int(len(ofi_delta)), + "trainable_row_count": int(len(dataset)), + "split_counts": dataset["split_id"].value_counts().to_dict(), + "ofi_delta_hash_sha256": delta_hash, + "ofi_experiment_feature_frame_hash_sha256": dataset_hash, + } + + +def _ofi_feature_schema() -> list[dict[str, Any]]: + rows = [ + { + "name": "ofi_l1_1m", + "meaning": "买一卖一盘口变化强度,1分钟窗口", + "unit": "ratio", + "source": "Crypto Lake book snapshot", + "window": "1m", + "formula": "sum(l1 snapshot-diff quote OFI) / avg(l1 depth quote)", + "ofi_method": OFI_METHOD, + "null_handling": "first minute or gap warmup -> drop in experiment", + "order": 1, + }, + { + "name": "ofi_l1_3m", + "meaning": "买一卖一盘口变化强度,3分钟窗口", + "unit": "ratio", + "source": "Crypto Lake book snapshot", + "window": "3m", + "formula": "sum(l1 snapshot-diff quote OFI over last 3 closed minutes) / avg(l1 depth quote)", + "ofi_method": OFI_METHOD, + "null_handling": "window warmup or gap warmup -> drop in experiment", + "order": 2, + }, + { + "name": "ofi_l1_5m", + "meaning": "买一卖一盘口变化强度,5分钟窗口", + "unit": "ratio", + "source": "Crypto Lake book snapshot", + "window": "5m", + "formula": "sum(l1 snapshot-diff quote OFI over last 5 closed minutes) / avg(l1 depth quote)", + "ofi_method": OFI_METHOD, + "null_handling": "window warmup or gap warmup -> drop in experiment", + "order": 3, + }, + { + "name": "ofi_l1_15m", + "meaning": "买一卖一盘口变化强度,15分钟窗口", + "unit": "ratio", + "source": "Crypto Lake book snapshot", + "window": "15m", + "formula": "sum(l1 snapshot-diff quote OFI over last 15 closed minutes) / avg(l1 depth quote)", + "ofi_method": OFI_METHOD, + "null_handling": "window warmup or gap warmup -> drop in experiment", + "order": 4, + }, + { + "name": "microprice_basis_change_1m_bps", + "meaning": "微价格偏离值的1分钟变化", + "unit": "bps", + "source": "Crypto Lake book snapshot", + "window": "1m", + "formula": "microprice_basis_bps(t) - microprice_basis_bps(t-1m)", + "ofi_method": OFI_METHOD, + "null_handling": "first minute or gap warmup -> drop in experiment", + "order": 5, + }, + { + "name": "microprice_basis_change_5m_bps", + "meaning": "微价格偏离值的5分钟变化", + "unit": "bps", + "source": "Crypto Lake book snapshot", + "window": "5m", + "formula": "microprice_basis_bps(t) - microprice_basis_bps(t-5m)", + "ofi_method": OFI_METHOD, + "null_handling": "window warmup or gap warmup -> drop in experiment", + "order": 6, + }, + ] + rows.extend( + [ + { + "name": "mlofi_l5_1m", + "meaning": "前5档盘口变化强度,1分钟窗口", + "unit": "ratio", + "source": "Crypto Lake book snapshot", + "window": "1m", + "formula": "sum(level0..4 snapshot-diff quote OFI) / avg(level0..4 depth quote)", + "ofi_method": OFI_METHOD, + "null_handling": "first minute or gap warmup -> drop in experiment", + "order": 7, + }, + { + "name": "mlofi_l5_5m", + "meaning": "前5档盘口变化强度,5分钟窗口", + "unit": "ratio", + "source": "Crypto Lake book snapshot", + "window": "5m", + "formula": "sum(level0..4 snapshot-diff quote OFI over last 5 closed minutes) / avg(level0..4 depth quote)", + "ofi_method": OFI_METHOD, + "null_handling": "window warmup or gap warmup -> drop in experiment", + "order": 8, + }, + { + "name": "mlofi_l20_1m", + "meaning": "前20档盘口变化强度,1分钟窗口", + "unit": "ratio", + "source": "Crypto Lake book snapshot", + "window": "1m", + "formula": "sum(level0..19 snapshot-diff quote OFI) / avg(level0..19 depth quote)", + "ofi_method": OFI_METHOD, + "null_handling": "first minute or gap warmup -> drop in experiment", + "order": 9, + }, + { + "name": "mlofi_l20_5m", + "meaning": "前20档盘口变化强度,5分钟窗口", + "unit": "ratio", + "source": "Crypto Lake book snapshot", + "window": "5m", + "formula": "sum(level0..19 snapshot-diff quote OFI over last 5 closed minutes) / avg(level0..19 depth quote)", + "ofi_method": OFI_METHOD, + "null_handling": "window warmup or gap warmup -> drop in experiment", + "order": 10, + }, + { + "name": "mlofi_l5_l20_gap_1m", + "meaning": "近档盘口变化和深档盘口变化的差", + "unit": "ratio", + "source": "Crypto Lake book snapshot", + "window": "1m", + "formula": "mlofi_l5_1m - mlofi_l20_1m", + "ofi_method": OFI_METHOD, + "null_handling": "dependency missing -> drop in experiment", + "order": 11, + }, + { + "name": "ofi_l1_5m_zscore_240m", + "meaning": "L1 OFI 5分钟值相对最近240分钟的异常程度", + "unit": "zscore", + "source": "Crypto Lake book snapshot", + "window": "240m", + "formula": "(ofi_l1_5m - rolling_mean_240m) / rolling_std_240m", + "ofi_method": OFI_METHOD, + "null_handling": "window warmup or gap warmup -> drop in experiment", + "order": 12, + }, + { + "name": "mlofi_l20_5m_zscore_240m", + "meaning": "L20 多层 OFI 5分钟值相对最近240分钟的异常程度", + "unit": "zscore", + "source": "Crypto Lake book snapshot", + "window": "240m", + "formula": "(mlofi_l20_5m - rolling_mean_240m) / rolling_std_240m", + "ofi_method": OFI_METHOD, + "null_handling": "window warmup or gap warmup -> drop in experiment", + "order": 13, + }, + { + "name": "ofi_l1_5m_clipped", + "meaning": "截尾后的 L1 OFI 5分钟值", + "unit": "ratio", + "source": "derived from ofi_l1_5m", + "window": "5m", + "formula": "clip(ofi_l1_5m, -5, 5)", + "ofi_method": OFI_METHOD, + "null_handling": "dependency missing -> drop in experiment", + "order": 14, + }, + { + "name": "ofi_l1_taker_5m", + "meaning": "L1 OFI 和5分钟主动成交是否同向", + "unit": "ratio", + "source": "book + trades", + "window": "5m", + "formula": "ofi_l1_5m_clipped * taker_imbalance_5m", + "ofi_method": OFI_METHOD, + "null_handling": "dependency missing -> drop in experiment", + "order": 15, + }, + { + "name": "ofi_l1_spread_rank_5m", + "meaning": "L1 OFI 在高价差环境下的强度", + "unit": "ratio", + "source": "book + level_1", + "window": "5m", + "formula": "ofi_l1_5m_clipped * spread_rank_24h_pct", + "ofi_method": OFI_METHOD, + "null_handling": "dependency missing -> drop in experiment", + "order": 16, + }, + ] + ) + return rows + + +def _feature_delta_report(ofi_delta: pd.DataFrame, dataset: pd.DataFrame) -> str: + rows = [] + for feature in OFI_FEATURES: + series = pd.to_numeric(dataset[feature], errors="coerce") + q = series.quantile([0.01, 0.5, 0.99]) + rows.append( + { + "feature": feature, + "null_in_delta": int(ofi_delta[feature].isna().sum()) if feature in ofi_delta.columns else "derived_after_merge", + "trainable_null": int(series.isna().sum()), + "p01": round(float(q.loc[0.01]), 6), + "p50": round(float(q.loc[0.5]), 6), + "p99": round(float(q.loc[0.99]), 6), + } + ) + lines = [ + "# OFI Feature Delta Report", + "", + f"- ofi_method: `{OFI_METHOD}`", + f"- delta_rows: `{len(ofi_delta)}`", + f"- trainable_rows_after_drop: `{len(dataset)}`", + f"- split_counts: `{dataset['split_id'].value_counts().to_dict()}`", + "", + "| feature | null_in_delta | trainable_null | p01 | p50 | p99 |", + "| --- | ---: | ---: | ---: | ---: | ---: |", + ] + for row in rows: + lines.append(f"| {row['feature']} | {row['null_in_delta']} | {row['trainable_null']} | {row['p01']} | {row['p50']} | {row['p99']} |") + lines.extend( + [ + "", + "## Leakage Check", + "", + "- 只用当前分钟及之前的 book 快照。", + "- 第一条快照和断档后的窗口不补 0,直接作为 warmup 丢掉。", + "- 分子和分母都使用 quote 金额口径。", + "", + ] + ) + return "\n".join(lines) + + +def _model_compare_report(args: Any, baseline_root: Path, results: dict[str, Any], dataset: pd.DataFrame) -> str: + baseline = read_json(baseline_root / "model" / "model_train_manifest.json") + baseline_direction = baseline["DIRECTION"]["metrics"]["direction"] + baseline_entry = baseline["ENTRY"]["metrics"] + lines = [ + "# OFI Model Compare To Run10", + "", + f"- run_id: `{args.run_id}`", + f"- baseline_run_id: `{args.baseline_run_id}`", + f"- ofi_method: `{OFI_METHOD}`", + f"- rows: `{len(dataset)}`", + "", + "## Run10 Tune Baseline", + "", + "| model | metric | value |", + "| --- | --- | ---: |", + f"| Direction | long_auc | {baseline_direction.get('long_auc')} |", + f"| Direction | short_auc | {baseline_direction.get('short_auc')} |", + f"| Direction | neutral_auc | {baseline_direction.get('neutral_auc')} |", + f"| Entry | long_auc | {baseline_entry['long_entry_prob'].get('auc')} |", + f"| Entry | short_auc | {baseline_entry['short_entry_prob'].get('auc')} |", + f"| Entry | long_edge_mae_ratio | {baseline_entry['long_expected_net_edge_bps'].get('mae_vs_constant_ratio')} |", + f"| Entry | short_edge_mae_ratio | {baseline_entry['short_expected_net_edge_bps'].get('mae_vs_constant_ratio')} |", + "", + "## Diagnostic Direction Result", + "", + "| feature_set | split | long_auc | short_auc | neutral_auc | logloss_ratio | top10_hit_rate |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: |", + ] + for feature_set_name, payload in results.items(): + direction = payload["DIRECTION"] + for split_id in EVAL_SPLITS: + metric = direction.get(split_id, {}) + lines.append( + f"| {feature_set_name} | {split_id} | {metric.get('long_auc')} | {metric.get('short_auc')} | {metric.get('neutral_auc')} | {metric.get('logloss_vs_constant_ratio')} | {metric.get('top10_hit_rate')} |" + ) + lines.extend( + [ + "", + "## Diagnostic Entry Result", + "", + "| head | feature_set | split | auc/mae_ratio | brier_ratio | top10_hit_rate |", + "| --- | --- | --- | ---: | ---: | ---: |", + ] + ) + for feature_set_name, payload in results.items(): + entry = payload["ENTRY"] + for head in ("long_entry_prob", "short_entry_prob"): + for split_id in EVAL_SPLITS: + metric = entry.get(head, {}).get(split_id, {}) + lines.append( + f"| {head} | {feature_set_name} | {split_id} | {metric.get('auc')} | {metric.get('brier_vs_constant_ratio')} | {metric.get('top10_hit_rate')} |" + ) + for head in ("long_expected_net_edge_bps", "short_expected_net_edge_bps"): + for split_id in EVAL_SPLITS: + metric = entry.get(head, {}).get(split_id, {}) + lines.append(f"| {head} | {feature_set_name} | {split_id} | {metric.get('mae_vs_constant_ratio')} | | |") + lines.extend( + [ + "", + "## Verdict Rule", + "", + "只有 `market_plus_ofi` 在 validation_locked 和 latest_stress 上同时好过 `market_only`,才进入正式特征链路。", + "", + ] + ) + return "\n".join(lines) + + +def _backtest_placeholder_report(args: Any, baseline_root: Path) -> str: + return "\n".join( + [ + "# Backtest Compare To Run10", + "", + f"- run_id: `{args.run_id}`", + f"- baseline_run_id: `{args.baseline_run_id}`", + "", + "本轮是 Direction / Entry 特征诊断,没有导出正式 ONNX,也没有改 PM 阈值,所以不跑组合回测。", + "", + "如果诊断指标通过,下一步才把 OFI 特征纳入正式 `feature_schema.json`、导出模型包,再做 validation_locked 和 latest_stress 的完整回测。", + "", + f"- run10_baseline_root: `{baseline_root}`", + "", + ] + ) + + +def _contract_change_report() -> str: + return "\n".join( + [ + "# Contract Change Report", + "", + "| 项 | 结论 |", + "| --- | --- |", + "| 正式 ONNX 输入 | 未改变 |", + "| Java SHADOW 输入契约 | 未改变 |", + "| 模型输出字段 | 未改变 |", + "| 标签口径 | 未改变 |", + "| PM 阈值 | 未改变 |", + "", + "原因:本轮只做旁路诊断。只有验证通过后,才会进入正式特征表和 Java 契约同步。", + "", + ] + ) + + +def _failure_case_placeholder_report(args: Any) -> str: + return "\n".join( + [ + "# Failure Cases Compare", + "", + f"- run_id: `{args.run_id}`", + "", + "本轮没有产生正式交易决策,因此没有最差交易样本可比。", + "", + "下一步如果 OFI 进入正式模型包,必须用完整回测交易明细比较:", + "", + "1. validation_locked 最大亏损交易。", + "2. latest_stress 最大亏损交易。", + "3. 连续亏损段。", + "4. 高 OFI 但反向亏损样本。", + "", + ] + ) diff --git a/training/trader_training/pm.py b/training/trader_training/pm.py index 9e91f0f..a8124a1 100644 --- a/training/trader_training/pm.py +++ b/training/trader_training/pm.py @@ -11,6 +11,13 @@ from trader_training.io_utils import read_json, read_parquet, run_root, sha256_j from trader_training.schemas import LATEST_STRESS_SPLIT, PM_CONFIG_VERSION, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT +DEFAULT_BACKTEST_PRICE_PLAN = { + "stopDistanceBps": 35.0, + "costBps": 4.0, + "maxHoldMinutes": 45, +} + + def default_pm_config() -> dict: return { "pmConfigVersion": PM_CONFIG_VERSION, @@ -70,6 +77,7 @@ def default_pm_config() -> dict: def search_pm_thresholds(args: Any) -> None: root = run_root(args) frame = _pm_tune_frame(root) + price_plan = _price_plan_context(root) candidate_rows: list[dict[str, Any]] = [] best_score = -float("inf") best_thresholds: dict[str, float] | None = None @@ -77,7 +85,8 @@ def search_pm_thresholds(args: Any) -> None: best_trades = pd.DataFrame() for thresholds in _threshold_candidates(): - trades = _simulate_open_trades(frame, thresholds) + config = _pm_config_from_thresholds(thresholds) + trades = _simulate_open_trades(frame, thresholds, config, price_plan) metrics = _trade_metrics(trades) score = _score_thresholds(metrics) candidate_rows.append({**thresholds, **metrics, "score": score}) @@ -134,11 +143,27 @@ def integrated_backtest(args: Any) -> None: trades_path = root / "pm-search" / "pm_backtest_trades.parquet" # PM search is allowed to use tune_inner, but final acceptance must be # measured on the sealed validation_locked and latest_stress splits. - tune_trades = read_parquet(trades_path) if trades_path.is_file() else _simulate_open_trades(_pm_tune_frame(root), _thresholds_from_config(pm_payload["config"])) + price_plan = _price_plan_context(root) + tune_trades = read_parquet(trades_path) if trades_path.is_file() else _simulate_open_trades( + _pm_tune_frame(root), + _thresholds_from_config(pm_payload["config"]), + pm_payload["config"], + price_plan, + ) tune_trades["eval_split"] = TUNE_SPLIT - validation_locked_trades = _simulate_open_trades(_pm_frame(root, VALIDATION_LOCKED_SPLIT), _thresholds_from_config(pm_payload["config"])) + validation_locked_trades = _simulate_open_trades( + _pm_frame(root, VALIDATION_LOCKED_SPLIT), + _thresholds_from_config(pm_payload["config"]), + pm_payload["config"], + price_plan, + ) validation_locked_trades["eval_split"] = VALIDATION_LOCKED_SPLIT - stress_trades = _simulate_open_trades(_pm_frame(root, LATEST_STRESS_SPLIT), _thresholds_from_config(pm_payload["config"])) + stress_trades = _simulate_open_trades( + _pm_frame(root, LATEST_STRESS_SPLIT), + _thresholds_from_config(pm_payload["config"]), + pm_payload["config"], + price_plan, + ) stress_trades["eval_split"] = LATEST_STRESS_SPLIT trades = pd.concat([tune_trades, validation_locked_trades, stress_trades], ignore_index=True) metrics = { @@ -154,6 +179,8 @@ def integrated_backtest(args: Any) -> None: "backtest_manifest_id": f"backtest-{args.run_id}", "mode": "VALIDATION_PM_BACKTEST", "pm_config_hash_sha256": pm_payload["config_hash_sha256"], + "price_plan_id": price_plan.get("pricePlanId"), + "price_plan_config_hash": price_plan.get("pricePlanConfigHash"), "metrics": metrics, "status_reasons": status_reasons, "status": status, @@ -179,6 +206,14 @@ def _pm_tune_frame(root) -> pd.DataFrame: return _pm_frame(root, TUNE_SPLIT) +def _price_plan_context(root) -> dict[str, Any]: + path = root / "label" / "price_plan_context.json" + if path.is_file(): + return read_json(path) + logging.warning("trader.training.price_plan_missing_for_pm path=%s usingDefault=%s", path, DEFAULT_BACKTEST_PRICE_PLAN) + return DEFAULT_BACKTEST_PRICE_PLAN.copy() + + def _pm_frame(root, split_id: str) -> pd.DataFrame: prediction_files = { TUNE_SPLIT: "tune_predictions.parquet", @@ -194,12 +229,14 @@ def _pm_frame(root, split_id: str) -> pd.DataFrame: } ) risk = read_parquet(root / "model" / "risk" / prediction_file) + price_plan = _price_plan_context(root) entry_dataset = read_parquet(root / "dataset" / "entry_train.parquet").rename( columns={ "long_expected_net_edge_bps": "actual_long_expected_net_edge_bps", "short_expected_net_edge_bps": "actual_short_expected_net_edge_bps", } ) + entry_plan_outcome = _entry_plan_outcome_frame(root) entry_cols = [ "sample_id", "long_entry_prob", @@ -214,26 +251,92 @@ def _pm_frame(root, split_id: str) -> pd.DataFrame: .merge(entry[entry_cols], on="sample_id", how="inner") .merge(risk[risk_cols], on="sample_id", how="inner") .merge(entry_dataset[actual_cols], on="sample_id", how="inner") + .merge(entry_plan_outcome, on="sample_id", how="inner") ) if frame.empty: raise ValueError(f"PM frame is empty for {split_id}; check model predictions and entry dataset") + frame["model_pred_long_expected_net_edge_bps"] = frame["pred_long_expected_net_edge_bps"] + frame["model_pred_short_expected_net_edge_bps"] = frame["pred_short_expected_net_edge_bps"] + edge_mode = "MODEL_EXPECTED_NET_EDGE" + if price_plan.get("entryTargetMethod") not in {"OPPORTUNITY_MFE_V1", "OPPORTUNITY_QUALITY_V1"}: + frame["pred_long_expected_net_edge_bps"] = _probability_implied_edge(frame["long_entry_prob"], price_plan) + frame["pred_short_expected_net_edge_bps"] = _probability_implied_edge(frame["short_entry_prob"], price_plan) + edge_mode = "ENTRY_PROBABILITY_PAYOFF" logging.info( - "trader.training.pm_frame_loaded splitId=%s rowCount=%s splitCounts=%s", + "trader.training.pm_frame_loaded splitId=%s rowCount=%s splitCounts=%s edgeMode=%s", split_id, len(frame), frame["split_id"].value_counts().to_dict(), + edge_mode, ) return frame +def _probability_implied_edge(entry_prob: pd.Series, price_plan: dict[str, Any]) -> pd.Series: + target_net_bps = float(price_plan.get("targetDistanceBps", 0.0)) - float(price_plan.get("costBps", 0.0)) + stop_net_bps = -float(price_plan.get("stopDistanceBps", DEFAULT_BACKTEST_PRICE_PLAN["stopDistanceBps"])) - float( + price_plan.get("costBps", DEFAULT_BACKTEST_PRICE_PLAN["costBps"]) + ) + probability = pd.to_numeric(entry_prob, errors="coerce").fillna(0.0).clip(lower=0.0, upper=1.0) + # Entry 的概率头比收益回归头稳定。这里用固定止盈止损的盈亏比把概率换成期望收益, + # 让低命中、高赔率计划也能被 PM 正常搜索;真实结果仍由标签里的实际路径收益评估。 + return probability * target_net_bps + (1.0 - probability) * stop_net_bps + + +def _entry_plan_outcome_frame(root) -> pd.DataFrame: + labels = read_parquet(root / "label" / "entry_labels.parquet").copy() + required = { + "sample_id", + "side", + "gross_edge_bps", + "cost_bps", + "target_hit", + "stop_hit", + "time_to_target_ms", + "time_to_stop_ms", + "time_to_exit_ms", + } + missing = sorted(required - set(labels.columns)) + if missing: + raise ValueError(f"entry_labels is missing PM outcome columns: {missing}") + labels["trade_net_edge_bps"] = pd.to_numeric(labels["gross_edge_bps"], errors="coerce").fillna(0.0) - pd.to_numeric( + labels["cost_bps"], errors="coerce" + ).fillna(0.0) + + def side_frame(side: str, prefix: str) -> pd.DataFrame: + return labels[labels["side"].eq(side)][ + [ + "sample_id", + "trade_net_edge_bps", + "target_hit", + "stop_hit", + "time_to_target_ms", + "time_to_stop_ms", + "time_to_exit_ms", + ] + ].rename( + columns={ + "trade_net_edge_bps": f"{prefix}_trade_net_edge_bps", + "target_hit": f"{prefix}_target_hit", + "stop_hit": f"{prefix}_stop_hit", + "time_to_target_ms": f"{prefix}_time_to_target_ms", + "time_to_stop_ms": f"{prefix}_time_to_stop_ms", + "time_to_exit_ms": f"{prefix}_time_to_exit_ms", + } + ) + + return side_frame("LONG", "long").merge(side_frame("SHORT", "short"), on="sample_id", how="inner") + + def _threshold_candidates() -> list[dict[str, float]]: + # 1.01 表示这一侧不开仓,用来检查“只做多”或“只做空”是否更稳。 values = itertools.product( - [0.50, 0.52, 0.54, 0.56, 0.58], - [0.50, 0.52, 0.54, 0.56, 0.58], - [0.10, 0.12, 0.14, 0.16, 0.20, 0.30, 0.50], - [0.55, 0.75, 0.90, 1.00], - [-8.0, -4.0, 0.0, 1.0, 3.0], - [0.00, 0.01, 0.02, 0.05], + [0.50, 0.60, 0.70, 1.01], + [0.50, 0.60, 0.70, 1.01], + [0.03, 0.50, 0.70, 0.85], + [0.45, 0.65, 0.85], + [0.0, 8.0, 15.0, 25.0], + [0.02, 0.06, 0.10], ) return [ { @@ -248,23 +351,39 @@ def _threshold_candidates() -> list[dict[str, float]]: ] -def _simulate_open_trades(frame: pd.DataFrame, thresholds: dict[str, float]) -> pd.DataFrame: +def _simulate_open_trades( + frame: pd.DataFrame, + thresholds: dict[str, float], + pm_config: dict[str, Any] | None = None, + price_plan: dict[str, Any] | None = None, +) -> pd.DataFrame: + direction_margin = (frame["long_prob"] - frame["short_prob"]).abs() long_mask = ( - (frame["long_prob"] >= thresholds["long_open_prob"]) - & ((frame["long_prob"] - frame["short_prob"]) >= thresholds["min_direction_margin"]) - & (frame["long_entry_prob"] >= thresholds["min_entry_prob"]) - & (frame["market_risk_prob"] <= thresholds["max_market_risk_prob"]) - & (frame["pred_long_expected_net_edge_bps"] >= thresholds["min_expected_edge_bps"]) + (frame["long_prob"] > thresholds["long_open_prob"]) + & (direction_margin > thresholds["min_direction_margin"]) + & (frame["long_entry_prob"] > thresholds["min_entry_prob"]) + & (frame["market_risk_prob"] < thresholds["max_market_risk_prob"]) + & (frame["pred_long_expected_net_edge_bps"] > thresholds["min_expected_edge_bps"]) ) short_mask = ( - (frame["short_prob"] >= thresholds["short_open_prob"]) - & ((frame["short_prob"] - frame["long_prob"]) >= thresholds["min_direction_margin"]) - & (frame["short_entry_prob"] >= thresholds["min_entry_prob"]) - & (frame["market_risk_prob"] <= thresholds["max_market_risk_prob"]) - & (frame["pred_short_expected_net_edge_bps"] >= thresholds["min_expected_edge_bps"]) + (frame["short_prob"] > thresholds["short_open_prob"]) + & (direction_margin > thresholds["min_direction_margin"]) + & (frame["short_entry_prob"] > thresholds["min_entry_prob"]) + & (frame["market_risk_prob"] < thresholds["max_market_risk_prob"]) + & (frame["pred_short_expected_net_edge_bps"] > thresholds["min_expected_edge_bps"]) + ) + long_score = ( + frame["pred_long_expected_net_edge_bps"].clip(lower=0.0) + * frame["long_prob"] + * frame["long_entry_prob"] + * (1.0 - frame["market_risk_prob"].clip(lower=0.0, upper=1.0)) + ) + short_score = ( + frame["pred_short_expected_net_edge_bps"].clip(lower=0.0) + * frame["short_prob"] + * frame["short_entry_prob"] + * (1.0 - frame["market_risk_prob"].clip(lower=0.0, upper=1.0)) ) - long_score = frame["pred_long_expected_net_edge_bps"] + (frame["long_prob"] - frame["short_prob"]) * 10.0 - short_score = frame["pred_short_expected_net_edge_bps"] + (frame["short_prob"] - frame["long_prob"]) * 10.0 side = np.where(long_mask & (~short_mask | (long_score >= short_score)), "LONG", np.where(short_mask, "SHORT", "")) trades = frame.loc[side != ""].copy().reset_index(drop=True) if trades.empty: @@ -274,11 +393,19 @@ def _simulate_open_trades(frame: pd.DataFrame, thresholds: dict[str, float]) -> trades["direction_prob"] = np.where(is_long, trades["long_prob"], trades["short_prob"]) trades["entry_prob"] = np.where(is_long, trades["long_entry_prob"], trades["short_entry_prob"]) trades["predicted_edge_bps"] = np.where(is_long, trades["pred_long_expected_net_edge_bps"], trades["pred_short_expected_net_edge_bps"]) - trades["actual_edge_bps"] = np.where(is_long, trades["actual_long_expected_net_edge_bps"], trades["actual_short_expected_net_edge_bps"]) + trades["actual_edge_bps"] = np.where(is_long, trades["long_trade_net_edge_bps"], trades["short_trade_net_edge_bps"]) + trades["label_max_edge_bps"] = np.where(is_long, trades["actual_long_expected_net_edge_bps"], trades["actual_short_expected_net_edge_bps"]) trades["entry_target"] = np.where(is_long, trades["long_entry_target"], trades["short_entry_target"]) - trades["planned_ratio"] = _planned_ratio(trades["predicted_edge_bps"], trades["market_risk_prob"], thresholds["min_expected_edge_bps"]) + effective_pm_config = pm_config or _pm_config_from_thresholds(thresholds) + effective_price_plan = price_plan or DEFAULT_BACKTEST_PRICE_PLAN + trades["time_to_exit_ms"] = _time_to_exit_ms(trades, is_long, effective_price_plan) + trades["planned_ratio"] = _planned_ratio_like_position_manager(trades, effective_pm_config["sizing"], effective_price_plan) + trades = trades[trades["planned_ratio"] > 0].copy() + if trades.empty: + return _empty_trade_frame() trades["weighted_edge_bps"] = trades["actual_edge_bps"] * trades["planned_ratio"] trades["threshold_hash"] = sha256_json(thresholds)[:16] + trades = _enforce_non_overlapping_entries(trades, effective_pm_config, effective_price_plan) return trades[ [ "sample_id", @@ -290,8 +417,10 @@ def _simulate_open_trades(frame: pd.DataFrame, thresholds: dict[str, float]) -> "entry_prob", "market_risk_prob", "predicted_edge_bps", + "label_max_edge_bps", "actual_edge_bps", "entry_target", + "time_to_exit_ms", "planned_ratio", "weighted_edge_bps", "threshold_hash", @@ -311,8 +440,10 @@ def _empty_trade_frame() -> pd.DataFrame: "entry_prob", "market_risk_prob", "predicted_edge_bps", + "label_max_edge_bps", "actual_edge_bps", "entry_target", + "time_to_exit_ms", "planned_ratio", "weighted_edge_bps", "threshold_hash", @@ -320,10 +451,78 @@ def _empty_trade_frame() -> pd.DataFrame: ) -def _planned_ratio(predicted_edge: pd.Series, market_risk: pd.Series, min_edge: float) -> np.ndarray: - edge_strength = ((predicted_edge.astype(float) - min_edge) / 20.0).clip(lower=0.0, upper=1.5) - risk_discount = (1.0 - market_risk.astype(float)).clip(lower=0.0, upper=1.0) - return (edge_strength * risk_discount).clip(lower=0.05, upper=1.0).to_numpy() +def _time_to_exit_ms(trades: pd.DataFrame, is_long: pd.Series, price_plan: dict[str, Any]) -> np.ndarray: + max_hold_ms = int(price_plan.get("maxHoldMinutes", DEFAULT_BACKTEST_PRICE_PLAN["maxHoldMinutes"])) * 60_000 + long_exit_col = "long_time_to_exit_ms" + short_exit_col = "short_time_to_exit_ms" + if long_exit_col in trades.columns and short_exit_col in trades.columns: + label_exit_ms = np.where(is_long, trades[long_exit_col], trades[short_exit_col]).astype("float64") + return np.where(np.isfinite(label_exit_ms) & (label_exit_ms > 0), label_exit_ms, max_hold_ms) + target_hit = np.where(is_long, trades["long_target_hit"], trades["short_target_hit"]) + stop_hit = np.where(is_long, trades["long_stop_hit"], trades["short_stop_hit"]) + target_ms = np.where(is_long, trades["long_time_to_target_ms"], trades["short_time_to_target_ms"]).astype("float64") + stop_ms = np.where(is_long, trades["long_time_to_stop_ms"], trades["short_time_to_stop_ms"]).astype("float64") + return np.where((target_hit == 1) & (target_ms >= 0), target_ms, np.where((stop_hit == 1) & (stop_ms >= 0), stop_ms, max_hold_ms)) + + +def _enforce_non_overlapping_entries(trades: pd.DataFrame, pm_config: dict[str, Any], price_plan: dict[str, Any]) -> pd.DataFrame: + if trades.empty: + return trades + cooldown_ms = int(pm_config.get("add", {}).get("cooldownMinutes", 0)) * 60_000 + max_hold_ms = int(price_plan.get("maxHoldMinutes", DEFAULT_BACKTEST_PRICE_PLAN["maxHoldMinutes"])) * 60_000 + sort_columns = ["symbol", "event_time", "predicted_edge_bps"] + sorted_keys = trades[["symbol", "event_time", "predicted_edge_bps", "time_to_exit_ms"]].sort_values( + sort_columns, + ascending=[True, True, False], + ) + event_ns = pd.to_datetime(sorted_keys["event_time"], utc=True).astype("int64").to_numpy() + symbols = sorted_keys["symbol"].astype(str).to_numpy() + exit_delay_values = pd.to_numeric(sorted_keys["time_to_exit_ms"], errors="coerce").fillna(max_hold_ms).to_numpy(dtype="float64") + original_indices = sorted_keys.index.to_numpy() + + next_available_ns_by_symbol: dict[str, int] = {} + keep: list[int] = [] + for index, symbol, event_time_ns, exit_delay_ms in zip(original_indices, symbols, event_ns, exit_delay_values): + next_available_ns = next_available_ns_by_symbol.get(symbol) + if next_available_ns is not None and event_time_ns < next_available_ns: + continue + keep.append(index) + if not np.isfinite(exit_delay_ms) or exit_delay_ms <= 0: + exit_delay_ms = max_hold_ms + next_available_ns_by_symbol[symbol] = int(event_time_ns + (exit_delay_ms + cooldown_ms) * 1_000_000) + return trades.loc[keep].sort_values("event_time").reset_index(drop=True) + + +def _planned_ratio_like_position_manager(trades: pd.DataFrame, sizing: dict[str, Any], price_plan: dict[str, Any]) -> np.ndarray: + expected_edge = trades["predicted_edge_bps"].astype(float).clip(lower=0.0) + direction_strength = trades["direction_prob"].astype(float).clip(lower=0.0, upper=1.0) + entry_prob = trades["entry_prob"].astype(float).clip(lower=0.0, upper=1.0) + market_risk = trades["market_risk_prob"].astype(float).clip(lower=0.0, upper=1.0) + + min_edge = float(sizing["minEdgeBps"]) + stop_loss_budget = max( + float(price_plan.get("stopDistanceBps", DEFAULT_BACKTEST_PRICE_PLAN["stopDistanceBps"])) + + float(price_plan.get("costBps", DEFAULT_BACKTEST_PRICE_PLAN["costBps"])), + 1.0, + ) + raw = ( + float(sizing["baseRatio"]) + * (expected_edge / stop_loss_budget) + * direction_strength + * entry_prob + * (1.0 - market_risk) + ) + hard_cap = min( + float(sizing["maxSingleLegRatio"]), + float(sizing["maxTotalPositionRatio"]), + float(sizing["maxLiquidityUsageRatio"]), + float(sizing["maxLossPerTradeBps"]) / stop_loss_budget, + ) + min_ratio = float(sizing["minInitialRatio"]) + if hard_cap < min_ratio: + return np.zeros(len(trades), dtype="float64") + ratio = raw.clip(lower=min_ratio, upper=hard_cap) + return np.where(expected_edge >= min_edge, ratio, 0.0) def _trade_metrics(trades: pd.DataFrame) -> dict[str, Any]: @@ -336,6 +535,9 @@ def _trade_metrics(trades: pd.DataFrame) -> dict[str, Any]: "total_weighted_edge_bps": 0.0, "max_drawdown_bps": 0.0, "avg_planned_ratio": 0.0, + "min_planned_ratio": 0.0, + "p50_planned_ratio": 0.0, + "max_planned_ratio": 0.0, "profit_factor": 0.0, "max_consecutive_losses": 0, } @@ -351,6 +553,9 @@ def _trade_metrics(trades: pd.DataFrame) -> dict[str, Any]: "total_weighted_edge_bps": float(equity.iloc[-1]), "max_drawdown_bps": float(drawdown.max()), "avg_planned_ratio": float(trades["planned_ratio"].astype(float).mean()), + "min_planned_ratio": float(trades["planned_ratio"].astype(float).min()), + "p50_planned_ratio": float(trades["planned_ratio"].astype(float).median()), + "max_planned_ratio": float(trades["planned_ratio"].astype(float).max()), "profit_factor": float(gains / losses) if losses > 0 else float("inf"), "max_consecutive_losses": _max_consecutive_losses(trades["weighted_edge_bps"].astype(float).to_numpy()), } @@ -382,6 +587,8 @@ def _backtest_status(metrics: dict[str, dict[str, Any]]) -> tuple[str, list[str] reasons.append("validation_locked_avg_trade_edge_not_positive") if validation_locked["max_consecutive_losses"] > 8: reasons.append("validation_locked_max_consecutive_losses_above_8") + if validation_locked["trade_count"] > 0 and validation_locked["max_planned_ratio"] <= 0.050001: + reasons.append("validation_locked_sizing_collapsed_to_min_initial") if stress["trade_count"] < 20: reasons.append("latest_stress_trade_count_below_20") if stress["profit_factor"] < 1.0: @@ -427,6 +634,7 @@ def _pm_config_from_thresholds(thresholds: dict[str, float]) -> dict: } ) config["add"]["maxMarketRiskProb"] = thresholds["max_market_risk_prob"] + config["add"]["minEntryProb"] = thresholds["min_entry_prob"] config["add"]["minExpectedEdgeBps"] = thresholds["min_expected_edge_bps"] config["sizing"]["minEdgeBps"] = thresholds["min_expected_edge_bps"] config["sizing"]["maxSingleLegRatio"] = 1.0 @@ -479,7 +687,7 @@ def _write_pm_report(path, candidates: pd.DataFrame, best_thresholds: dict[str, lines = [ "# PM Threshold Report", "", - "本次不是固定写死阈值,而是在验证集上试一组可复现的阈值,选择净收益、回撤、交易数量综合更好的那组。", + "本次不是固定写死阈值,而是在调参集上试一组可复现的阈值。PM 回测使用固定止盈止损后的真实净收益,并且开仓后按持仓结束时间加冷却时间阻止重叠开仓。", "", "## Best Thresholds", "", @@ -505,7 +713,7 @@ def _write_backtest_report(path, result: dict[str, Any]) -> None: lines = [ "# Integrated Backtest Report", "", - "这里用验证集模型输出和 PM 阈值生成交易明细,统计净收益、胜率、回撤和分段表现。", + "这里用验证集模型输出和 PM 阈值生成交易明细,统计净收益、胜率、回撤和分段表现。收益按固定止盈止损计划的真实净收益计算,不使用窗口内最大可拿收益。", "", "```json", str(result).replace("'", '"'), diff --git a/training/trader_training/price_plan_search.py b/training/trader_training/price_plan_search.py index 8f604db..151d568 100644 --- a/training/trader_training/price_plan_search.py +++ b/training/trader_training/price_plan_search.py @@ -194,7 +194,7 @@ def _plan_side_rows( gross = np.where(target_first, target_bps, np.where(stop_first, -stop_bps, timeout_return)) price_plan_net = gross - cost_bps expected_net = max_achievable_gross - cost_bps - positive = expected_net >= min_expected_edge_bps + positive = price_plan_net >= min_expected_edge_bps ambiguous = target_any & stop_any & (target_index == stop_index) rows: list[dict[str, Any]] = [] @@ -264,21 +264,25 @@ def _plan_summary(rows: pd.DataFrame) -> pd.DataFrame: split_rows["avg_price_plan_edge_eval"] = split_rows[ [f"avg_price_plan_net_edge_bps_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] ].mean(axis=1) + split_rows["min_price_plan_edge_eval"] = split_rows[ + [f"avg_price_plan_net_edge_bps_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] + ].min(axis=1) split_rows["min_margin_eval"] = split_rows[ [f"target_rate_margin_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] ].min(axis=1) - # The search score is not an上线门槛. It only chooses the next experiment: - # enough positive samples, less negative average edge, and stable behavior - # across tune/validation/stress. + # The search score is not an上线门槛. It only chooses the next experiment. + # Fixed price-plan net edge is weighted most heavily; max future edge is + # retained only as a weak tie-breaker because it can be optimistic. positive_rate_penalty = ( - (0.08 - split_rows["min_positive_label_rate_eval"]).clip(lower=0.0) * 80.0 - + (split_rows["max_positive_label_rate_eval"] - 0.45).clip(lower=0.0) * 30.0 + (0.03 - split_rows["min_positive_label_rate_eval"]).clip(lower=0.0) * 120.0 + + (split_rows["max_positive_label_rate_eval"] - 0.55).clip(lower=0.0) * 20.0 ) spread_bonus = np.log1p((split_rows["target_bps"] - split_rows["stop_bps"]).clip(lower=0.0)) split_rows["score"] = ( - split_rows["avg_edge_eval"] - + split_rows["avg_price_plan_edge_eval"] * 0.5 - + split_rows["min_margin_eval"] * 20.0 + split_rows["avg_price_plan_edge_eval"] * 8.0 + + split_rows["min_price_plan_edge_eval"] * 3.0 + + split_rows["min_margin_eval"] * 80.0 + + split_rows["avg_edge_eval"] * 0.05 - positive_rate_penalty + spread_bonus ) @@ -287,8 +291,10 @@ def _plan_summary(rows: pd.DataFrame) -> pd.DataFrame: def _select_best_plan(summary: pd.DataFrame) -> dict[str, Any]: candidates = summary[ - (summary["min_positive_label_rate_eval"] >= 0.08) - & (summary["max_positive_label_rate_eval"] <= 0.45) + (summary["min_positive_label_rate_eval"] >= 0.03) + & (summary["max_positive_label_rate_eval"] <= 0.55) + & (summary["avg_price_plan_edge_eval"] > 0.0) + & (summary["min_price_plan_edge_eval"] > -1.0) & (summary["target_bps"] > summary["stop_bps"]) ] if candidates.empty: @@ -305,6 +311,7 @@ def _select_best_plan(summary: pd.DataFrame) -> dict[str, Any]: "score": float(row["score"]), "avg_edge_eval": float(row["avg_edge_eval"]), "avg_price_plan_edge_eval": float(row["avg_price_plan_edge_eval"]), + "min_price_plan_edge_eval": float(row["min_price_plan_edge_eval"]), "min_margin_eval": float(row["min_margin_eval"]), "min_positive_label_rate_eval": float(row["min_positive_label_rate_eval"]), "max_positive_label_rate_eval": float(row["max_positive_label_rate_eval"]), @@ -332,7 +339,7 @@ def _markdown_report(payload: dict[str, Any], summary: pd.DataFrame) -> str: "", _markdown_table(top), "", - "说明:positive_label_rate 和 avg_expected_net_edge_bps 按“未来窗口最大可拿净收益”统计;target_hit_rate、stop_hit_rate、avg_price_plan_net_edge_bps 只用来检查固定止盈止损计划是否顺手。这里选的是下一轮实验用的价格计划,不是上线结论。真正能不能上线仍然看模型训练、PM 搜索、validation_locked 和 latest_stress 回测。", + "说明:positive_label_rate 和 avg_price_plan_net_edge_bps 按固定止盈止损计划统计;avg_expected_net_edge_bps 只是辅助观察未来最大可拿空间,不能单独决定价格计划。这里选的是下一轮实验用的价格计划,不是上线结论。真正能不能上线仍然看模型训练、PM 搜索、validation_locked 和 latest_stress 回测。", "", ] return "\n".join(lines) diff --git a/training/trader_training/schemas.py b/training/trader_training/schemas.py index 14bfaaa..e5f4426 100644 --- a/training/trader_training/schemas.py +++ b/training/trader_training/schemas.py @@ -5,7 +5,7 @@ from typing import Any FEATURE_VERSION = "feature-v4-p2-book-cross" -LABEL_VERSION = "label-v4-p1-max-edge" +LABEL_VERSION = "label-v4-p5-entry-quality" SPLIT_VERSION = "split-v4-p0" MODEL_BUNDLE_VERSION = "trader-v4-btc-p0" CALIBRATION_BUNDLE_VERSION = "cal-v4-btc-p0"