From 9acb3460a120b2ae059031d277d27a44432951af Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 27 Jun 2026 19:57:29 +0800 Subject: [PATCH] Improve Trader V4 training pipeline Align entry labels with max future edge, tune direction labeling, and harden regression evaluation. Add training diagnostics, price-plan search, feature screening, and nonlinear benchmark scripts. --- .../trader/artifact/TraderArtifactLoader.java | 2 +- .../feature/TraderFeatureVectorBuilder.java | 2 +- .../java/com/quantai/trader/TestFixtures.java | 50 +- .../artifact/TraderArtifactLoaderTest.java | 4 +- .../TraderFeatureVectorBuilderTest.java | 8 +- .../model/OnnxTraderModelServiceTest.java | 2 +- .../OrtTraderOnnxInferenceClientTest.java | 2 +- training/README.md | 16 +- training/scripts/03_build_splits.py | 16 +- training/scripts/04_build_feature_frame.py | 3 +- training/scripts/18_diagnose_training_run.py | 19 + training/scripts/19_search_price_plan.py | 35 ++ training/scripts/20_screen_entry_features.py | 20 + .../scripts/21_benchmark_nonlinear_models.py | 21 + training/tests/test_training_contract.py | 114 +++- training/trader_training/diagnostics.py | 351 +++++++++++++ .../trader_training/entry_feature_screen.py | 306 +++++++++++ training/trader_training/features.py | 131 ++++- training/trader_training/labels.py | 490 ++++++++++-------- .../trader_training/nonlinear_benchmark.py | 167 ++++++ training/trader_training/onnx_export.py | 4 +- training/trader_training/pm.py | 20 +- training/trader_training/price_plan_search.py | 368 +++++++++++++ training/trader_training/replay.py | 204 +++++--- training/trader_training/schemas.py | 19 +- training/trader_training/training.py | 20 +- training/trader_training/validator.py | 6 +- 27 files changed, 2059 insertions(+), 341 deletions(-) create mode 100644 training/scripts/18_diagnose_training_run.py create mode 100644 training/scripts/19_search_price_plan.py create mode 100644 training/scripts/20_screen_entry_features.py create mode 100644 training/scripts/21_benchmark_nonlinear_models.py create mode 100644 training/trader_training/diagnostics.py create mode 100644 training/trader_training/entry_feature_screen.py create mode 100644 training/trader_training/nonlinear_benchmark.py create mode 100644 training/trader_training/price_plan_search.py diff --git a/src/main/java/com/quantai/trader/artifact/TraderArtifactLoader.java b/src/main/java/com/quantai/trader/artifact/TraderArtifactLoader.java index 3162f57..af2f24f 100644 --- a/src/main/java/com/quantai/trader/artifact/TraderArtifactLoader.java +++ b/src/main/java/com/quantai/trader/artifact/TraderArtifactLoader.java @@ -29,7 +29,7 @@ import java.util.stream.StreamSupport; public class TraderArtifactLoader { private static final Logger log = LoggerFactory.getLogger(TraderArtifactLoader.class); private static final Set REQUIRED_MODELS = Set.of("DIRECTION", "ENTRY", "CONTINUE", "EXIT", "RISK"); - private static final int REQUIRED_FEATURE_COUNT = 39; + private static final int REQUIRED_FEATURE_COUNT = 54; private static final int REQUIRED_ONNX_OPSET_VERSION = 17; private static final Map> REQUIRED_OUTPUT_MAPPING_KEYS = Map.of( "DIRECTION", Set.of("long_prob", "short_prob", "neutral_prob"), diff --git a/src/main/java/com/quantai/trader/feature/TraderFeatureVectorBuilder.java b/src/main/java/com/quantai/trader/feature/TraderFeatureVectorBuilder.java index 11843ff..0fe958d 100644 --- a/src/main/java/com/quantai/trader/feature/TraderFeatureVectorBuilder.java +++ b/src/main/java/com/quantai/trader/feature/TraderFeatureVectorBuilder.java @@ -27,7 +27,7 @@ import java.util.stream.StreamSupport; @Component public class TraderFeatureVectorBuilder { private static final Logger log = LoggerFactory.getLogger(TraderFeatureVectorBuilder.class); - private static final int REQUIRED_FEATURE_COUNT = 39; + private static final int REQUIRED_FEATURE_COUNT = 54; private final TraderProperties properties; private final ObjectMapper objectMapper; diff --git a/src/test/java/com/quantai/trader/TestFixtures.java b/src/test/java/com/quantai/trader/TestFixtures.java index b2fbd36..23bc04c 100644 --- a/src/test/java/com/quantai/trader/TestFixtures.java +++ b/src/test/java/com/quantai/trader/TestFixtures.java @@ -106,7 +106,7 @@ public final class TestFixtures { public static TraderMarketSnapshot snapshot(boolean dataReady, String depthNotional5Bps) { return new TraderMarketSnapshot("snapshot-1", "run-1", "cycle-1", "BTC-USDT-PERP", T0, - "feature-v4-p0", bd("100"), bd("99.5"), bd("1.2"), BigDecimal.ZERO, + "feature-v4-p2-book-cross", bd("100"), bd("99.5"), bd("1.2"), BigDecimal.ZERO, bd(depthNotional5Bps), bd(depthNotional5Bps), bd(depthNotional5Bps), dataReady, featureJson(), dataQualityJson()); } @@ -114,7 +114,7 @@ public final class TestFixtures { public static ReplayMarketEvent replayEvent(String runId, Instant eventTime, String markPrice, String indexPrice, String depthNotional) { BigDecimal depth = bd(depthNotional); - return new ReplayMarketEvent(runId, "BTC-USDT-PERP", eventTime, "feature-v4-p0", + return new ReplayMarketEvent(runId, "BTC-USDT-PERP", eventTime, "feature-v4-p2-book-cross", bd(markPrice), bd(indexPrice), bd("1.2"), bd("0.5"), depth, depth.multiply(new BigDecimal("1.4")), depth.multiply(new BigDecimal("2.2")), depth.compareTo(BigDecimal.ZERO) > 0, featureJson(), dataQualityJson()); @@ -165,6 +165,21 @@ public final class TestFixtures { features.put("minute_of_day_sin", bd("0.0")); features.put("minute_of_day_cos", bd("1.0")); features.put("minutes_to_next_funding", bd("120")); + features.put("book_top_imbalance", bd("0.18")); + features.put("book_microprice_basis_bps", bd("0.04")); + features.put("book_bid_depth_l5_quote", bd("5200000")); + features.put("book_ask_depth_l5_quote", bd("4700000")); + features.put("book_depth_imbalance_l5", bd("0.05")); + features.put("book_depth_imbalance_l20", bd("0.08")); + features.put("book_depth_concentration_l5_l20", bd("0.03")); + features.put("book_pressure_spread_ratio", bd("0.033333")); + features.put("book_pressure_taker_1m", bd("0.0032")); + features.put("book_pressure_taker_5m", bd("0.0048")); + features.put("book_l20_imbalance_taker_15m", bd("0.0032")); + features.put("book_l20_imbalance_ret_15m", bd("0.656")); + features.put("book_pressure_vol_adjusted", bd("0.004255")); + features.put("book_depth_pressure_gap", bd("-0.03")); + features.put("book_pressure_reversal_15m", bd("-0.328")); return Map.copyOf(features); } @@ -344,7 +359,14 @@ public final class TestFixtures { "liquidation_buy_notional_1m","liquidation_sell_notional_1m", "liquidation_imbalance_15m","liquidation_notional_zscore_15m", "liquidation_available","minute_of_day_sin","minute_of_day_cos", - "minutes_to_next_funding" + "minutes_to_next_funding","book_top_imbalance","book_microprice_basis_bps", + "book_bid_depth_l5_quote","book_ask_depth_l5_quote", + "book_depth_imbalance_l5","book_depth_imbalance_l20", + "book_depth_concentration_l5_l20","book_pressure_spread_ratio", + "book_pressure_taker_1m","book_pressure_taker_5m", + "book_l20_imbalance_taker_15m","book_l20_imbalance_ret_15m", + "book_pressure_vol_adjusted","book_depth_pressure_gap", + "book_pressure_reversal_15m" ] """); String outputSchemaHash = writeArtifact(artifactRoot.resolve("schemas/outputs.json"), """ @@ -418,7 +440,7 @@ public final class TestFixtures { "manifest_schema_version": "trader-model-bundle-v1", "model_bundle_version": "trader-v4-btc-p0", "calibration_bundle_version": "cal-v4-btc-p0", - "feature_version": "feature-v4-p0", + "feature_version": "feature-v4-p2-book-cross", "label_version": "label-v4-p0", "split_version": "split-v4-p0", "training_run_id": "train-run-p0", @@ -441,9 +463,9 @@ public final class TestFixtures { "symbol_scope_json":["BTC-USDT-PERP"],"bar_interval":"1m","horizon_minutes":45, "model_format":"ONNX","model_runtime":"ONNX_RUNTIME_JAVA","model_runtime_version":"1.22.0", "onnx_opset_version":17,"producer_name":"test-exporter","producer_version":"p0", - "feature_version":"feature-v4-p0","feature_schema_path":"schemas/features.json", + "feature_version":"feature-v4-p2-book-cross","feature_schema_path":"schemas/features.json", "feature_schema_hash":"%6$s","feature_order_path":"schemas/feature_order.json","feature_order_hash":"%8$s", - "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":39}, + "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":54}, "input_example_path":"examples/input.json","output_schema_path":"schemas/outputs.json", "output_schema_hash":"%7$s","output_tensor_names_json":["probabilities"], "output_mapping_json":{"long_prob":"probabilities[0]","short_prob":"probabilities[1]","neutral_prob":"probabilities[2]"}, @@ -461,9 +483,9 @@ public final class TestFixtures { "symbol_scope_json":["BTC-USDT-PERP"],"bar_interval":"1m","horizon_minutes":45, "model_format":"ONNX","model_runtime":"ONNX_RUNTIME_JAVA","model_runtime_version":"1.22.0", "onnx_opset_version":17,"producer_name":"test-exporter","producer_version":"p0", - "feature_version":"feature-v4-p0","feature_schema_path":"schemas/features.json", + "feature_version":"feature-v4-p2-book-cross","feature_schema_path":"schemas/features.json", "feature_schema_hash":"%6$s","feature_order_path":"schemas/feature_order.json","feature_order_hash":"%8$s", - "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":39}, + "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":54}, "input_example_path":"examples/input.json","output_schema_path":"schemas/outputs.json", "output_schema_hash":"%7$s","output_tensor_names_json":["entry"], "output_mapping_json":{"long_entry_prob":"entry[0]","short_entry_prob":"entry[1]","long_expected_net_edge_bps":"entry[2]","short_expected_net_edge_bps":"entry[3]"}, @@ -481,9 +503,9 @@ public final class TestFixtures { "symbol_scope_json":["BTC-USDT-PERP"],"bar_interval":"1m","horizon_minutes":45, "model_format":"ONNX","model_runtime":"ONNX_RUNTIME_JAVA","model_runtime_version":"1.22.0", "onnx_opset_version":17,"producer_name":"test-exporter","producer_version":"p0", - "feature_version":"feature-v4-p0","feature_schema_path":"schemas/features.json", + "feature_version":"feature-v4-p2-book-cross","feature_schema_path":"schemas/features.json", "feature_schema_hash":"%6$s","feature_order_path":"schemas/feature_order.json","feature_order_hash":"%8$s", - "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":39}, + "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":54}, "input_example_path":"examples/input.json","output_schema_path":"schemas/outputs.json", "output_schema_hash":"%7$s","output_tensor_names_json":["continue"], "output_mapping_json":{"long_continue_prob":"continue[0]","short_continue_prob":"continue[1]","long_expected_continue_edge_bps":"continue[2]","short_expected_continue_edge_bps":"continue[3]"}, @@ -501,9 +523,9 @@ public final class TestFixtures { "symbol_scope_json":["BTC-USDT-PERP"],"bar_interval":"1m","horizon_minutes":45, "model_format":"ONNX","model_runtime":"ONNX_RUNTIME_JAVA","model_runtime_version":"1.22.0", "onnx_opset_version":17,"producer_name":"test-exporter","producer_version":"p0", - "feature_version":"feature-v4-p0","feature_schema_path":"schemas/features.json", + "feature_version":"feature-v4-p2-book-cross","feature_schema_path":"schemas/features.json", "feature_schema_hash":"%6$s","feature_order_path":"schemas/feature_order.json","feature_order_hash":"%8$s", - "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":39}, + "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":54}, "input_example_path":"examples/input.json","output_schema_path":"schemas/outputs.json", "output_schema_hash":"%7$s","output_tensor_names_json":["exit"], "output_mapping_json":{"long_exit_prob":"exit[0]","short_exit_prob":"exit[1]","long_adverse_move_bps":"exit[2]","short_adverse_move_bps":"exit[3]","adverse_move_prob":"exit[4]","reversal_prob":"exit[5]","stop_hit_prob":"exit[6]","stagnation_prob":"exit[7]"}, @@ -521,9 +543,9 @@ public final class TestFixtures { "symbol_scope_json":["BTC-USDT-PERP"],"bar_interval":"1m","horizon_minutes":45, "model_format":"ONNX","model_runtime":"ONNX_RUNTIME_JAVA","model_runtime_version":"1.22.0", "onnx_opset_version":17,"producer_name":"test-exporter","producer_version":"p0", - "feature_version":"feature-v4-p0","feature_schema_path":"schemas/features.json", + "feature_version":"feature-v4-p2-book-cross","feature_schema_path":"schemas/features.json", "feature_schema_hash":"%6$s","feature_order_path":"schemas/feature_order.json","feature_order_hash":"%8$s", - "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":39}, + "input_tensor_name":"features","input_dtype":"FLOAT32","input_shape_json":{"batch":1,"features":54}, "input_example_path":"examples/input.json","output_schema_path":"schemas/outputs.json", "output_schema_hash":"%7$s","output_tensor_names_json":["risk"], "output_mapping_json":{"market_risk_prob":"risk[0]","long_position_risk_prob":"risk[1]","short_position_risk_prob":"risk[2]","market_path_risk_bps":"risk[3]","long_position_path_risk_bps":"risk[4]","short_position_path_risk_bps":"risk[5]","market_drawdown_prob":"risk[6]","volatility_expansion_prob":"risk[7]","spike_prob":"risk[8]","liquidity_deterioration_prob":"risk[9]","position_drawdown_prob":"risk[10]"}, diff --git a/src/test/java/com/quantai/trader/artifact/TraderArtifactLoaderTest.java b/src/test/java/com/quantai/trader/artifact/TraderArtifactLoaderTest.java index 635a6e4..6190aac 100644 --- a/src/test/java/com/quantai/trader/artifact/TraderArtifactLoaderTest.java +++ b/src/test/java/com/quantai/trader/artifact/TraderArtifactLoaderTest.java @@ -29,7 +29,7 @@ class TraderArtifactLoaderTest { assertThat(bundle.pricePlanContext().pricePlanConfigHash()).isEqualTo("p0-price-plan-hash"); assertThat(bundle.modelManifests()).allSatisfy(manifest -> { assertThat(manifest.featureOrderPath()).isEqualTo("schemas/feature_order.json"); - assertThat(manifest.inputShapeJson()).containsEntry("features", 39); + assertThat(manifest.inputShapeJson()).containsEntry("features", 54); assertThat(manifest.onnxOpsetVersion()).isEqualTo(17); }); assertThat(bundle.requireReplayModelFixture().entry().longExpectedNetEdgeBps()).isEqualByComparingTo("12.0"); @@ -89,7 +89,7 @@ class TraderArtifactLoaderTest { void rejectsNonV4InputShape() throws IOException { writeArtifactBundle(artifactRoot); Path manifest = artifactRoot.resolve("manifests/model_manifest.json"); - Files.writeString(manifest, Files.readString(manifest).replace("\"features\":39", "\"features\":38")); + Files.writeString(manifest, Files.readString(manifest).replace("\"features\":54", "\"features\":53")); assertThatThrownBy(() -> new TraderArtifactLoader(propertiesWithArtifactRoot(artifactRoot), objectMapper()).loadActiveBundle()) .isInstanceOf(com.quantai.trader.domain.TraderException.class) diff --git a/src/test/java/com/quantai/trader/feature/TraderFeatureVectorBuilderTest.java b/src/test/java/com/quantai/trader/feature/TraderFeatureVectorBuilderTest.java index 50af260..37fe74a 100644 --- a/src/test/java/com/quantai/trader/feature/TraderFeatureVectorBuilderTest.java +++ b/src/test/java/com/quantai/trader/feature/TraderFeatureVectorBuilderTest.java @@ -27,9 +27,11 @@ class TraderFeatureVectorBuilderTest { float[] values = builder.build(snapshot(), bundle); - assertThat(values).hasSize(39); + assertThat(values).hasSize(54); assertThat(values[0]).isEqualTo(1.1f); assertThat(values[38]).isEqualTo(120.0f); + assertThat(values[45]).isEqualTo(0.03f); + assertThat(values[53]).isEqualTo(-0.328f); } @Test @@ -41,7 +43,7 @@ class TraderFeatureVectorBuilderTest { Map features = new LinkedHashMap<>(featureJson()); features.remove("ret_1m_bps"); TraderMarketSnapshot badSnapshot = new TraderMarketSnapshot("snapshot-1", "run-1", "cycle-1", - "BTC-USDT-PERP", T0, "feature-v4-p0", bd("100"), bd("99.5"), bd("1.2"), + "BTC-USDT-PERP", T0, "feature-v4-p2-book-cross", bd("100"), bd("99.5"), bd("1.2"), bd("0.5"), bd("1000"), bd("1400"), bd("2200"), true, features, dataQualityJson()); assertThatThrownBy(() -> builder.build(badSnapshot, bundle)) @@ -58,7 +60,7 @@ class TraderFeatureVectorBuilderTest { Map features = new LinkedHashMap<>(featureJson()); features.put("account_balance", bd("1000")); TraderMarketSnapshot badSnapshot = new TraderMarketSnapshot("snapshot-1", "run-1", "cycle-1", - "BTC-USDT-PERP", T0, "feature-v4-p0", bd("100"), bd("99.5"), bd("1.2"), + "BTC-USDT-PERP", T0, "feature-v4-p2-book-cross", bd("100"), bd("99.5"), bd("1.2"), bd("0.5"), bd("1000"), bd("1400"), bd("2200"), true, features, dataQualityJson()); assertThatThrownBy(() -> builder.build(badSnapshot, bundle)) diff --git a/src/test/java/com/quantai/trader/model/OnnxTraderModelServiceTest.java b/src/test/java/com/quantai/trader/model/OnnxTraderModelServiceTest.java index 0aa01f2..98a6a39 100644 --- a/src/test/java/com/quantai/trader/model/OnnxTraderModelServiceTest.java +++ b/src/test/java/com/quantai/trader/model/OnnxTraderModelServiceTest.java @@ -52,7 +52,7 @@ class OnnxTraderModelServiceTest { new FakeInferenceClient()); var snapshot = new com.quantai.trader.domain.TraderMarketSnapshot("snapshot-1", "run-1", "cycle-1", - "BTC-USDT-PERP", T0, "feature-v4-p0", bd("100"), bd("99.5"), bd("1.2"), + "BTC-USDT-PERP", T0, "feature-v4-p2-book-cross", bd("100"), bd("99.5"), bd("1.2"), bd("0.5"), bd("1000"), bd("1400"), bd("2200"), true, featureJson(), Map.of()); assertThatThrownBy(() -> service.evaluate(snapshot, bundle)) diff --git a/src/test/java/com/quantai/trader/model/OrtTraderOnnxInferenceClientTest.java b/src/test/java/com/quantai/trader/model/OrtTraderOnnxInferenceClientTest.java index 8474af6..0e8a728 100644 --- a/src/test/java/com/quantai/trader/model/OrtTraderOnnxInferenceClientTest.java +++ b/src/test/java/com/quantai/trader/model/OrtTraderOnnxInferenceClientTest.java @@ -25,7 +25,7 @@ class OrtTraderOnnxInferenceClientTest { .orElseThrow(); assertThatThrownBy(() -> new OrtTraderOnnxInferenceClient() - .infer(manifest, artifactRoot.resolve(manifest.artifactPath()), new float[39])) + .infer(manifest, artifactRoot.resolve(manifest.artifactPath()), new float[54])) .isInstanceOf(com.quantai.trader.domain.TraderException.class) .hasMessageContaining("ONNX model cannot be loaded"); } diff --git a/training/README.md b/training/README.md index 8ee6952..d69598c 100644 --- a/training/README.md +++ b/training/README.md @@ -10,8 +10,8 @@ PY=/Users/zach/IdeaProjects/quant-trading-ai/quant-strategy-server/.venv/bin/pyt RUN_ID=btc-v4-p0-001 ROOT=/Users/zach/Desktop/quant-strategy-training-data -$PY training/scripts/01_audit_source_data.py --run-id $RUN_ID --data-root $ROOT --symbol BTC-USDT-PERP --start-date 2025-06-20 --end-date 2026-06-19 -$PY training/scripts/02_build_replay_1m.py --run-id $RUN_ID --data-root $ROOT --symbol BTC-USDT-PERP --start-date 2025-06-20 --end-date 2026-06-19 +$PY training/scripts/01_audit_source_data.py --run-id $RUN_ID --data-root $ROOT --symbol BTC-USDT-PERP --start-date 2025-05-01 --end-date 2026-06-25 +$PY training/scripts/02_build_replay_1m.py --run-id $RUN_ID --data-root $ROOT --symbol BTC-USDT-PERP --start-date 2025-05-01 --end-date 2026-06-25 $PY training/scripts/03_build_splits.py --run-id $RUN_ID --data-root $ROOT $PY training/scripts/04_build_feature_frame.py --run-id $RUN_ID --data-root $ROOT $PY training/scripts/05_build_price_plan_context.py --run-id $RUN_ID --data-root $ROOT @@ -28,6 +28,18 @@ $PY training/scripts/15_export_artifact_bundle.py --run-id $RUN_ID --data-root $ $PY training/scripts/16_validate_artifact_bundle.py --artifact-root $ROOT/trader-v4/runs/$RUN_ID/export/trader-model-bundle-$RUN_ID/artifact_bundle $PY training/scripts/17_promote_artifact_bundle.py --artifact-root $ROOT/trader-v4/runs/$RUN_ID/export/trader-model-bundle-$RUN_ID/artifact_bundle --reason "validation_locked and latest_stress passed for SHADOW" $PY training/scripts/16_validate_artifact_bundle.py --artifact-root $ROOT/trader-v4/runs/$RUN_ID/export/trader-model-bundle-$RUN_ID/artifact_bundle --require-active --run-onnx +$PY training/scripts/18_diagnose_training_run.py --run-id $RUN_ID --data-root $ROOT +$PY training/scripts/19_search_price_plan.py --run-id $RUN_ID --data-root $ROOT +$PY training/scripts/20_screen_entry_features.py --run-id $RUN_ID --data-root $ROOT +$PY training/scripts/21_benchmark_nonlinear_models.py --run-id $RUN_ID --data-root $ROOT ``` Java SHADOW 只加载 `ACTIVE` 包。15 号脚本永远只生成 `CANDIDATE`,16 号校验通过且上线门槛通过后,17 号脚本才允许把包提升为 `ACTIVE`。 + +如果 16/17 号显示包完整但上线门槛不过,就跑 18 号脚本。18 号只做诊断,不改模型、不改阈值,用来说明是标签、模型分数,还是 PM 规则把交易挡住。 + +如果 18 号诊断显示 Entry 净收益为负,就跑 19 号脚本。19 号只搜索下一轮实验用的价格计划,不代表上线结论;选出的价格计划仍然必须重新生成标签、重新训练、重新回测。 + +如果 19 号换了价格计划以后还是没有稳定盈利交易,就跑 20 号脚本。20 号会检查每个 Entry 特征的高低区间,看历史里到底有没有稳定信号;如果连单特征区间都没有稳定改善,就不要继续盲目调阈值,应优先补特征或换模型表达能力。 + +如果 20 号仍然没有稳定正收益区间,就跑 21 号脚本。21 号只做诊断,不导出上线模型;它用更强一点的树模型检查同一批特征和标签里到底有没有可学习信号。如果树模型也没有稳定信号,下一步应回到特征和标签定义,而不是继续放宽 PM 阈值。 diff --git a/training/scripts/03_build_splits.py b/training/scripts/03_build_splits.py index 7418d3e..7ad4e6b 100644 --- a/training/scripts/03_build_splits.py +++ b/training/scripts/03_build_splits.py @@ -12,14 +12,14 @@ def main() -> None: parser = argparse.ArgumentParser() add_common_args(parser) parser.add_argument("--replay-path", type=Path) - parser.add_argument("--fit-inner-start", default="2025-06-20") - parser.add_argument("--fit-inner-end", default="2026-01-15") - parser.add_argument("--tune-inner-start", default="2026-01-16") - parser.add_argument("--tune-inner-end", default="2026-02-28") - parser.add_argument("--validation-locked-start", default="2026-03-01") - parser.add_argument("--validation-locked-end", default="2026-04-30") - parser.add_argument("--latest-stress-start", default="2026-05-01") - parser.add_argument("--latest-stress-end", default="2026-06-19") + parser.add_argument("--fit-inner-start", default="2025-05-01") + parser.add_argument("--fit-inner-end", default="2026-01-31") + parser.add_argument("--tune-inner-start", default="2026-02-01") + parser.add_argument("--tune-inner-end", default="2026-03-15") + parser.add_argument("--validation-locked-start", default="2026-03-16") + parser.add_argument("--validation-locked-end", default="2026-05-15") + parser.add_argument("--latest-stress-start", default="2026-05-16") + parser.add_argument("--latest-stress-end", default="2026-06-25") parser.add_argument("--gap-minutes", type=int, default=60) parser.add_argument("--fold-count", type=int, default=3) args = parser.parse_args() diff --git a/training/scripts/04_build_feature_frame.py b/training/scripts/04_build_feature_frame.py index 800b6b2..7e08380 100644 --- a/training/scripts/04_build_feature_frame.py +++ b/training/scripts/04_build_feature_frame.py @@ -5,7 +5,7 @@ from pathlib import Path import _bootstrap # noqa: F401 from trader_training.features import build_feature_frame -from trader_training.io_utils import add_common_args, setup_logging +from trader_training.io_utils import DEFAULT_RAW_ROOT, add_common_args, setup_logging def main() -> None: @@ -13,6 +13,7 @@ def main() -> None: add_common_args(parser) parser.add_argument("--replay-path", type=Path) parser.add_argument("--split-manifest-path", type=Path) + parser.add_argument("--raw-root", type=Path, default=DEFAULT_RAW_ROOT) parser.add_argument("--allow-incomplete-days", action="store_true") args = parser.parse_args() setup_logging() diff --git a/training/scripts/18_diagnose_training_run.py b/training/scripts/18_diagnose_training_run.py new file mode 100644 index 0000000..9bced43 --- /dev/null +++ b/training/scripts/18_diagnose_training_run.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +import argparse + +import _bootstrap # noqa: F401 +from trader_training.diagnostics import diagnose_training_run +from trader_training.io_utils import add_common_args, setup_logging + + +def main() -> None: + parser = argparse.ArgumentParser() + add_common_args(parser) + args = parser.parse_args() + setup_logging() + diagnose_training_run(args) + + +if __name__ == "__main__": + main() diff --git a/training/scripts/19_search_price_plan.py b/training/scripts/19_search_price_plan.py new file mode 100644 index 0000000..f29df18 --- /dev/null +++ b/training/scripts/19_search_price_plan.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import _bootstrap # noqa: F401 +from trader_training.io_utils import add_common_args, setup_logging +from trader_training.price_plan_search import search_price_plans + + +def _float_tuple(value: str) -> tuple[float, ...]: + return tuple(float(item.strip()) for item in value.split(",") if item.strip()) + + +def _int_tuple(value: str) -> tuple[int, ...]: + return tuple(int(item.strip()) for item in value.split(",") if item.strip()) + + +def main() -> None: + parser = argparse.ArgumentParser() + add_common_args(parser) + parser.add_argument("--feature-path", type=Path) + parser.add_argument("--replay-path", type=Path) + parser.add_argument("--label-config-path", type=Path) + parser.add_argument("--cost-config-path", type=Path) + parser.add_argument("--horizons", type=_int_tuple) + parser.add_argument("--targets", type=_float_tuple) + parser.add_argument("--stops", type=_float_tuple) + args = parser.parse_args() + setup_logging() + search_price_plans(args) + + +if __name__ == "__main__": + main() diff --git a/training/scripts/20_screen_entry_features.py b/training/scripts/20_screen_entry_features.py new file mode 100644 index 0000000..0a729bc --- /dev/null +++ b/training/scripts/20_screen_entry_features.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import argparse + +import _bootstrap # noqa: F401 +from trader_training.entry_feature_screen import screen_entry_features +from trader_training.io_utils import add_common_args, setup_logging + + +def main() -> None: + parser = argparse.ArgumentParser() + add_common_args(parser) + parser.add_argument("--min-bucket-rows", type=int, default=300) + args = parser.parse_args() + setup_logging() + screen_entry_features(args) + + +if __name__ == "__main__": + main() diff --git a/training/scripts/21_benchmark_nonlinear_models.py b/training/scripts/21_benchmark_nonlinear_models.py new file mode 100644 index 0000000..8f475d9 --- /dev/null +++ b/training/scripts/21_benchmark_nonlinear_models.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import argparse +import logging +from pathlib import Path + +import _bootstrap # noqa: F401 +from trader_training.nonlinear_benchmark import benchmark_nonlinear_models + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run diagnostic nonlinear benchmarks for Direction and Entry.") + parser.add_argument("--run-id", required=True) + parser.add_argument("--data-root", required=True, type=Path) + args = parser.parse_args() + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s event=%(message)s") + benchmark_nonlinear_models(args) + + +if __name__ == "__main__": + main() diff --git a/training/tests/test_training_contract.py b/training/tests/test_training_contract.py index 0a28e43..30bca83 100644 --- a/training/tests/test_training_contract.py +++ b/training/tests/test_training_contract.py @@ -15,6 +15,7 @@ if str(TRAINING_ROOT) not in sys.path: from trader_training.onnx_export import LinearHead, export_heads from trader_training.io_utils import read_json, write_json +from trader_training.labels import ENTRY_LABEL_METHOD, _path_stats_for_group, build_entry_labels from trader_training.promote import promote_artifact_bundle from trader_training.replay import build_splits from trader_training.schemas import FEATURE_ORDER, LATEST_STRESS_SPLIT, MODEL_OUTPUTS, OUTPUT_MAPPING, TRAINING_SPLITS, VALIDATION_LOCKED_SPLIT @@ -22,10 +23,10 @@ from trader_training.schemas import FEATURE_ORDER, LATEST_STRESS_SPLIT, MODEL_OU class TrainingContractTest(unittest.TestCase): def test_feature_order_is_v4_contract_size(self) -> None: - self.assertEqual(39, len(FEATURE_ORDER)) + self.assertEqual(54, len(FEATURE_ORDER)) self.assertEqual(len(FEATURE_ORDER), len(set(FEATURE_ORDER))) self.assertEqual("ret_1m_bps", FEATURE_ORDER[0]) - self.assertEqual("minutes_to_next_funding", FEATURE_ORDER[-1]) + self.assertEqual("book_pressure_reversal_15m", FEATURE_ORDER[-1]) def test_output_mapping_matches_model_outputs(self) -> None: for model_name, fields in MODEL_OUTPUTS.items(): @@ -67,6 +68,110 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual([VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT], manifest["sealed_splits"]) self.assertEqual("FINAL_GATE_ONLY", manifest["latest_stress_policy"]) + def test_path_stats_keeps_same_bar_target_stop_as_stop_first(self) -> None: + frame = pd.DataFrame( + { + "event_time": pd.date_range("2026-01-01", periods=6, freq="min", tz="UTC"), + "open_time_ms": np.arange(6, dtype=np.int64) * 60_000, + "symbol": "BTC-USDT-PERP", + "close": [100.0, 100.0, 100.0, 100.0, 100.0, 100.0], + "high": [100.0, 100.05, 100.20, 100.0, 100.0, 100.0], + "low": [100.0, 99.95, 99.70, 100.0, 100.0, 100.0], + "spread_bps": [1.0, 1.1, 1.2, 1.3, 1.4, 1.5], + } + ) + + stats = _path_stats_for_group(frame, "LONG", horizon=3, target_bps=10.0, stop_bps=8.0) + first = stats.loc[stats["open_time_ms"].eq(0)].iloc[0] + + self.assertEqual(0, first["target_hit"]) + self.assertEqual(1, first["stop_hit"]) + self.assertEqual(1, first["ambiguous_hit"]) + self.assertEqual(120_000, first["time_to_stop_ms"]) + self.assertAlmostEqual(-8.0, first["gross_edge_bps"]) + + def test_entry_label_uses_max_future_edge_not_fixed_target_hit(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + data_root = Path(tmp) + run_root = data_root / "trader-v4" / "runs" / "unit-entry" + feature_path = run_root / "feature" / "feature_frame.parquet" + replay_path = run_root / "replay" / "replay_1m.parquet" + plan_path = run_root / "label" / "price_plan_context.json" + config_path = data_root / "label_config.json" + feature_path.parent.mkdir(parents=True) + replay_path.parent.mkdir(parents=True) + + times = pd.date_range("2026-01-01", periods=5, freq="min", tz="UTC") + pd.DataFrame( + { + "sample_id": ["s0", "s1"], + "symbol": "BTC-USDT-PERP", + "event_time": times[:2], + "open_time_ms": [0, 60_000], + "split_id": "fit_inner", + "walk_forward_fold": 0, + "data_quality_flag": "OK", + "spread_bps": 1.0, + "spread_rank_24h_pct": 0.1, + "realized_vol_15m_bps": 2.0, + } + ).to_parquet(feature_path, index=False) + pd.DataFrame( + { + "event_time": times, + "open_time_ms": np.arange(5, dtype=np.int64) * 60_000, + "symbol": "BTC-USDT-PERP", + "open": [100.0, 100.0, 100.0, 100.0, 100.0], + "high": [100.0, 100.05, 100.19, 100.20, 100.0], + "low": [100.0, 99.99, 99.98, 99.97, 100.0], + "close": [100.0, 100.0, 100.0, 100.0, 100.0], + "spread_bps": 1.0, + } + ).to_parquet(replay_path, index=False) + write_json( + config_path, + { + "entry": { + "max_hold_minutes": 3, + "target_bps": 50.0, + "stop_bps": 50.0, + "min_expected_net_edge_bps": 3.0, + } + }, + ) + write_json( + plan_path, + { + "pricePlanId": "unit-plan", + "pricePlanConfigHash": "unit-hash", + "targetDistanceBps": 50.0, + "stopDistanceBps": 50.0, + "maxHoldMinutes": 3, + "costBps": 6.5, + "entryLabelMethod": ENTRY_LABEL_METHOD, + }, + ) + + build_entry_labels( + Namespace( + data_root=data_root, + run_id="unit-entry", + feature_path=feature_path, + replay_path=replay_path, + label_config_path=config_path, + cost_config_path=None, + price_plan_context_path=plan_path, + ) + ) + + labels = pd.read_parquet(run_root / "label" / "entry_labels.parquet") + row = labels[labels["sample_id"].eq("s0") & labels["side"].eq("LONG")].iloc[0] + self.assertEqual(0, row["target_hit"]) + self.assertEqual(1, row["entry_target"]) + self.assertEqual(ENTRY_LABEL_METHOD, row["label_method"]) + self.assertAlmostEqual(13.5, row["expected_net_edge_bps"], places=6) + self.assertAlmostEqual(row["mfe_bps"] - row["cost_bps"], row["max_achievable_net_edge_bps"], places=6) + def test_exported_onnx_accepts_java_feature_shape(self) -> None: import onnxruntime as ort @@ -78,13 +183,14 @@ class TrainingContractTest(unittest.TestCase): LinearHead( "direction", "softmax", - np.zeros((39, 3), dtype=np.float32), + np.zeros((len(FEATURE_ORDER), 3), dtype=np.float32), np.array([0.1, 0.2, 0.3], dtype=np.float32), ) ], + feature_count=len(FEATURE_ORDER), ) session = ort.InferenceSession(str(path)) - output = session.run(None, {"features": np.zeros((1, 39), dtype=np.float32)})[0] + output = session.run(None, {"features": np.zeros((1, len(FEATURE_ORDER)), dtype=np.float32)})[0] self.assertEqual((1, 3), output.shape) self.assertAlmostEqual(1.0, float(output.sum()), places=6) diff --git a/training/trader_training/diagnostics.py b/training/trader_training/diagnostics.py new file mode 100644 index 0000000..f6afb07 --- /dev/null +++ b/training/trader_training/diagnostics.py @@ -0,0 +1,351 @@ +from __future__ import annotations + +import logging +from typing import Any + +import numpy as np +import pandas as pd + +from trader_training.io_utils import read_parquet, run_root, write_json, write_text +from trader_training.pm import _pm_frame, _simulate_open_trades, _threshold_candidates, _trade_metrics +from trader_training.schemas import FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT + + +DIAGNOSTIC_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) +PM_EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) + + +def diagnose_training_run(args: Any) -> None: + root = run_root(args) + label_summary = _label_summary(root) + pm_summary = _pm_summary(root) + payload = { + "run_id": args.run_id, + "label_summary": label_summary, + "pm_summary": pm_summary, + "conclusion": _diagnostic_conclusion(pm_summary), + } + write_json(root / "diagnostics" / "training_failure_diagnostics.json", _jsonable(payload)) + write_text(root / "diagnostics" / "training_failure_diagnostics.md", _markdown_report(payload)) + logging.info( + "trader.training.diagnostics_written runId=%s conclusion=%s path=%s", + args.run_id, + payload["conclusion"]["status"], + root / "diagnostics" / "training_failure_diagnostics.md", + ) + + +def _label_summary(root) -> dict[str, Any]: + direction = read_parquet(root / "label" / "direction_labels.parquet") + entry = read_parquet(root / "label" / "entry_labels.parquet") + summary: dict[str, Any] = {} + for split_id in DIAGNOSTIC_SPLITS: + direction_split = direction[direction["split_id"].eq(split_id)].copy() + entry_split = entry[entry["split_id"].eq(split_id)].copy() + item: dict[str, Any] = {"direction": {}, "entry": {}} + if not direction_split.empty: + item["direction"] = { + "rows": len(direction_split), + "label_ratio": direction_split["direction_label"].value_counts(normalize=True).round(6).to_dict(), + "future_return_bps_quantile": _quantiles(direction_split["future_return_bps"], (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99)), + } + if not entry_split.empty: + grouped = entry_split.groupby("side", observed=False) + item["entry"] = { + "rows": len(entry_split), + "target_rate_by_side": grouped["entry_target"].mean().round(6).to_dict(), + "edge_mean_by_side": grouped["expected_net_edge_bps"].mean().round(6).to_dict(), + "edge_quantile_by_side": { + str(side): _quantiles(group["expected_net_edge_bps"], (0.05, 0.5, 0.95)) + for side, group in grouped + }, + } + summary[split_id] = item + return summary + + +def _pm_summary(root) -> dict[str, Any]: + summary: dict[str, Any] = {} + for split_id in PM_EVAL_SPLITS: + frame = _pm_frame(root, split_id) + item = { + "rows": len(frame), + "score_distribution": _score_distribution(frame), + "gate_funnel": _gate_funnel(frame), + "relaxed_variants": _relaxed_variants(frame), + "top_bucket_edge": _top_bucket_edge(frame), + "grid_search_any_trade": _grid_trade_summary(frame), + } + summary[split_id] = item + return summary + + +def _score_distribution(frame: pd.DataFrame) -> dict[str, dict[str, float]]: + columns = [ + "long_prob", + "short_prob", + "long_entry_prob", + "short_entry_prob", + "market_risk_prob", + "pred_long_expected_net_edge_bps", + "pred_short_expected_net_edge_bps", + "actual_long_expected_net_edge_bps", + "actual_short_expected_net_edge_bps", + ] + return {column: _quantiles(frame[column], (0.0, 0.05, 0.5, 0.95, 1.0)) for column in columns} + + +def _gate_funnel(frame: pd.DataFrame) -> dict[str, Any]: + thresholds = { + "long_open_prob": 0.54, + "short_open_prob": 0.54, + "min_entry_prob": 0.50, + "max_market_risk_prob": 0.55, + "min_expected_edge_bps": 1.0, + "min_direction_margin": 0.02, + } + long_steps = { + "long_prob >= 0.54": frame["long_prob"] >= thresholds["long_open_prob"], + "long_prob - short_prob >= 0.02": (frame["long_prob"] - frame["short_prob"]) >= thresholds["min_direction_margin"], + "long_entry_prob >= 0.50": frame["long_entry_prob"] >= thresholds["min_entry_prob"], + "market_risk_prob <= 0.55": frame["market_risk_prob"] <= thresholds["max_market_risk_prob"], + "pred_long_expected_net_edge_bps >= 1.0": frame["pred_long_expected_net_edge_bps"] >= thresholds["min_expected_edge_bps"], + } + short_steps = { + "short_prob >= 0.54": frame["short_prob"] >= thresholds["short_open_prob"], + "short_prob - long_prob >= 0.02": (frame["short_prob"] - frame["long_prob"]) >= thresholds["min_direction_margin"], + "short_entry_prob >= 0.50": frame["short_entry_prob"] >= thresholds["min_entry_prob"], + "market_risk_prob <= 0.55": frame["market_risk_prob"] <= thresholds["max_market_risk_prob"], + "pred_short_expected_net_edge_bps >= 1.0": frame["pred_short_expected_net_edge_bps"] >= thresholds["min_expected_edge_bps"], + } + return { + "thresholds": thresholds, + "long": _cumulative_gate_counts(long_steps, len(frame)), + "short": _cumulative_gate_counts(short_steps, len(frame)), + } + + +def _cumulative_gate_counts(steps: dict[str, pd.Series], total_rows: int) -> dict[str, Any]: + mask = np.ones(total_rows, dtype=bool) + cumulative = [] + single = {} + for name, step in steps.items(): + values = step.to_numpy(dtype=bool) + single[name] = int(values.sum()) + mask &= values + cumulative.append({"gate": name, "rows_after_gate": int(mask.sum())}) + return {"single_gate_pass": single, "cumulative": cumulative} + + +def _relaxed_variants(frame: pd.DataFrame) -> dict[str, Any]: + variants = { + "no_risk_no_edge": {"prob": 0.54, "entry": 0.50, "margin": 0.02, "risk": 1.0, "edge": -99.0}, + "entry_only_55": {"prob": 0.0, "entry": 0.55, "margin": -99.0, "risk": 1.0, "edge": -99.0}, + "direction_only_54": {"prob": 0.54, "entry": 0.0, "margin": 0.02, "risk": 1.0, "edge": -99.0}, + "very_loose": {"prob": 0.50, "entry": 0.45, "margin": 0.0, "risk": 1.0, "edge": -99.0}, + } + result: dict[str, Any] = {} + for name, thresholds in variants.items(): + trades = _variant_trades(frame, thresholds) + result[name] = _plain_trade_metrics(trades) + return result + + +def _variant_trades(frame: pd.DataFrame, thresholds: dict[str, float]) -> pd.DataFrame: + long_mask = ( + (frame["long_prob"] >= thresholds["prob"]) + & ((frame["long_prob"] - frame["short_prob"]) >= thresholds["margin"]) + & (frame["long_entry_prob"] >= thresholds["entry"]) + & (frame["market_risk_prob"] <= thresholds["risk"]) + & (frame["pred_long_expected_net_edge_bps"] >= thresholds["edge"]) + ) + short_mask = ( + (frame["short_prob"] >= thresholds["prob"]) + & ((frame["short_prob"] - frame["long_prob"]) >= thresholds["margin"]) + & (frame["short_entry_prob"] >= thresholds["entry"]) + & (frame["market_risk_prob"] <= thresholds["risk"]) + & (frame["pred_short_expected_net_edge_bps"] >= thresholds["edge"]) + ) + long = frame.loc[long_mask].copy() + long["side"] = "LONG" + long["actual_edge_bps"] = long["actual_long_expected_net_edge_bps"] + short = frame.loc[short_mask].copy() + short["side"] = "SHORT" + short["actual_edge_bps"] = short["actual_short_expected_net_edge_bps"] + return pd.concat([long, short], ignore_index=True) + + +def _plain_trade_metrics(trades: pd.DataFrame) -> dict[str, Any]: + if trades.empty: + return {"rows": 0, "win_rate": 0.0, "avg_actual_edge_bps": 0.0} + return { + "rows": len(trades), + "win_rate": float((trades["actual_edge_bps"].astype(float) > 0).mean()), + "avg_actual_edge_bps": float(trades["actual_edge_bps"].astype(float).mean()), + "side_counts": trades["side"].value_counts().to_dict(), + } + + +def _top_bucket_edge(frame: pd.DataFrame) -> dict[str, Any]: + side = np.where(frame["long_prob"] >= frame["short_prob"], "LONG", "SHORT") + side_prob = np.where(side == "LONG", frame["long_prob"], frame["short_prob"]) + side_edge = np.where(side == "LONG", frame["actual_long_expected_net_edge_bps"], frame["actual_short_expected_net_edge_bps"]) + direction_frame = pd.DataFrame({"score": side_prob, "actual_edge_bps": side_edge, "side": side}) + direction_top = {} + for fraction in (0.01, 0.02, 0.05, 0.10): + top = direction_frame.sort_values("score", ascending=False).head(max(1, int(len(direction_frame) * fraction))) + direction_top[str(fraction)] = _plain_trade_metrics(top.rename(columns={"actual_edge_bps": "actual_edge_bps"})) + return { + "direction_top_score": direction_top, + "long_entry_prob_deciles": _decile_edge(frame, "long_entry_prob", "actual_long_expected_net_edge_bps", "long_entry_target"), + "short_entry_prob_deciles": _decile_edge(frame, "short_entry_prob", "actual_short_expected_net_edge_bps", "short_entry_target"), + } + + +def _decile_edge(frame: pd.DataFrame, score_col: str, edge_col: str, target_col: str) -> list[dict[str, Any]]: + sample = frame[[score_col, edge_col, target_col]].dropna().copy() + if sample.empty: + return [] + sample["bucket"] = pd.qcut(sample[score_col].rank(method="first"), 10, labels=False) + 1 + rows = [] + for bucket, group in sample.groupby("bucket", observed=False): + rows.append( + { + "bucket": int(bucket), + "rows": len(group), + "score_min": float(group[score_col].min()), + "score_max": float(group[score_col].max()), + "hit_rate": float(group[target_col].mean()), + "avg_actual_edge_bps": float(group[edge_col].mean()), + } + ) + return rows + + +def _grid_trade_summary(frame: pd.DataFrame) -> dict[str, Any]: + nonzero = 0 + best_by_count = None + best_metrics = None + for thresholds in _threshold_candidates(): + trades = _simulate_open_trades(frame, thresholds) + metrics = _trade_metrics(trades) + if metrics["trade_count"] > 0: + nonzero += 1 + if best_metrics is None or metrics["trade_count"] > best_metrics["trade_count"]: + best_by_count = thresholds + best_metrics = metrics + return { + "candidate_count": len(_threshold_candidates()), + "candidates_with_trade": nonzero, + "best_by_trade_count": best_by_count, + "best_metrics": best_metrics, + } + + +def _diagnostic_conclusion(pm_summary: dict[str, Any]) -> dict[str, Any]: + tune = pm_summary.get(TUNE_SPLIT, {}) + gate = tune.get("gate_funnel", {}) + long_single = gate.get("long", {}).get("single_gate_pass", {}) + short_single = gate.get("short", {}).get("single_gate_pass", {}) + pred_edge_blocked = ( + long_single.get("pred_long_expected_net_edge_bps >= 1.0", 0) == 0 + and short_single.get("pred_short_expected_net_edge_bps >= 1.0", 0) == 0 + ) + relaxed = tune.get("relaxed_variants", {}) + any_relaxed_positive = any(item.get("avg_actual_edge_bps", 0.0) > 0 for item in relaxed.values()) + if pred_edge_blocked and not any_relaxed_positive: + return { + "status": "MODEL_SIGNAL_NOT_TRADABLE", + "plain_reason": "Entry 预测的净收益基本都是负数;即使放松风险和收益门槛,选出来的样本平均仍亏。", + "next_action": "优先重查 Entry 标签和价格计划,再考虑更强模型;不要直接放松 PM 阈值上线。", + } + if pred_edge_blocked: + return { + "status": "ENTRY_EDGE_GATE_BLOCKED", + "plain_reason": "PM 没有交易主要是 Entry 预测净收益过低。", + "next_action": "重训 Entry 或调整价格计划后再搜索 PM 阈值。", + } + return { + "status": "NEEDS_MANUAL_REVIEW", + "plain_reason": "没有发现单一硬挡板,需要人工继续看各模型分数和回测明细。", + "next_action": "查看 diagnostics 报告中的漏斗和放松阈值结果。", + } + + +def _quantiles(series: pd.Series, points: tuple[float, ...]) -> dict[str, float]: + values = pd.to_numeric(series, errors="coerce").replace([np.inf, -np.inf], np.nan).dropna() + if values.empty: + return {} + result = values.quantile(list(points)).round(6).to_dict() + return {str(key): float(value) for key, value in result.items()} + + +def _markdown_report(payload: dict[str, Any]) -> str: + lines = [ + "# Trader Training Failure Diagnostics", + "", + f"- run_id: `{payload['run_id']}`", + f"- status: `{payload['conclusion']['status']}`", + f"- 结论: {payload['conclusion']['plain_reason']}", + f"- 下一步: {payload['conclusion']['next_action']}", + "", + "## 标签分布", + "", + ] + for split_id, item in payload["label_summary"].items(): + direction = item.get("direction", {}) + entry = item.get("entry", {}) + lines.append(f"### {split_id}") + lines.append("") + if direction: + lines.append(f"- Direction 行数: {direction['rows']}") + lines.append(f"- Direction 标签比例: `{direction['label_ratio']}`") + lines.append(f"- 45 分钟未来收益分位: `{direction['future_return_bps_quantile']}`") + if entry: + lines.append(f"- Entry 行数: {entry['rows']}") + lines.append(f"- Entry 命中率: `{entry['target_rate_by_side']}`") + lines.append(f"- Entry 平均净收益: `{entry['edge_mean_by_side']}`") + lines.append("") + + lines.extend(["## PM 挡单漏斗", ""]) + for split_id, item in payload["pm_summary"].items(): + lines.append(f"### {split_id}") + lines.append("") + lines.append(f"- 样本数: {item['rows']}") + lines.append(f"- 网格里有交易的候选数: {item['grid_search_any_trade']['candidates_with_trade']} / {item['grid_search_any_trade']['candidate_count']}") + lines.append("") + for side in ("long", "short"): + lines.append(f"#### {side.upper()}") + lines.append("") + rows = item["gate_funnel"][side]["cumulative"] + lines.append("| 条件 | 剩余样本 |") + lines.append("| --- | ---: |") + for row in rows: + lines.append(f"| {row['gate']} | {row['rows_after_gate']} |") + lines.append("") + lines.append("#### 放松条件后的结果") + lines.append("") + lines.append("| 方案 | 样本数 | 胜率 | 平均真实净收益bps |") + lines.append("| --- | ---: | ---: | ---: |") + for name, metrics in item["relaxed_variants"].items(): + lines.append( + f"| {name} | {metrics['rows']} | {metrics['win_rate']:.4f} | {metrics['avg_actual_edge_bps']:.4f} |" + ) + lines.append("") + return "\n".join(lines) + "\n" + + +def _jsonable(value: Any) -> Any: + if isinstance(value, dict): + return {str(key): _jsonable(item) for key, item in value.items()} + if isinstance(value, list): + return [_jsonable(item) for item in value] + if isinstance(value, tuple): + return [_jsonable(item) for item in value] + if isinstance(value, (np.integer,)): + return int(value) + if isinstance(value, (np.floating,)): + return float(value) + if isinstance(value, np.ndarray): + return value.tolist() + return value diff --git a/training/trader_training/entry_feature_screen.py b/training/trader_training/entry_feature_screen.py new file mode 100644 index 0000000..0d28113 --- /dev/null +++ b/training/trader_training/entry_feature_screen.py @@ -0,0 +1,306 @@ +from __future__ import annotations + +import logging +from typing import Any + +import numpy as np +import pandas as pd + +from trader_training.io_utils import read_parquet, run_root, write_json, write_text +from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT + + +EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) +ALL_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) + + +def screen_entry_features(args: Any) -> None: + root = run_root(args) + dataset = read_parquet(root / "dataset" / "entry_train.parquet") + required = {"split_id", *FEATURE_ORDER, "long_entry_target", "short_entry_target", "long_expected_net_edge_bps", "short_expected_net_edge_bps"} + missing = sorted(required.difference(dataset.columns)) + if missing: + raise ValueError(f"entry feature screen missing required columns: {missing}") + + min_bucket_rows = int(args.min_bucket_rows or 300) + rows: list[dict[str, Any]] = [] + for side in ("LONG", "SHORT"): + target_col = "long_entry_target" if side == "LONG" else "short_entry_target" + edge_col = "long_expected_net_edge_bps" if side == "LONG" else "short_expected_net_edge_bps" + baselines = _split_baselines(dataset, target_col, edge_col) + for feature in FEATURE_ORDER: + rows.extend(_feature_rows(dataset, feature, side, target_col, edge_col, baselines)) + + bucket_metrics = pd.DataFrame(rows) + if bucket_metrics.empty: + raise ValueError("entry feature screen produced no bucket metrics") + + candidates = _select_candidates(bucket_metrics, min_bucket_rows) + result = { + "run_id": args.run_id, + "dataset_path": str(root / "dataset" / "entry_train.parquet"), + "feature_count": len(FEATURE_ORDER), + "bucket_metric_count": int(len(bucket_metrics)), + "candidate_count": int(len(candidates)), + "min_bucket_rows": min_bucket_rows, + "selection_rule": "bucket boundaries are learned on fit_inner; candidate is picked by tune_inner and checked on validation_locked/latest_stress", + } + write_json(root / "diagnostics" / "entry_feature_screen_result.json", result) + write_text(root / "diagnostics" / "entry_feature_bucket_metrics.csv", bucket_metrics.to_csv(index=False)) + write_text(root / "diagnostics" / "entry_feature_signal_candidates.csv", candidates.to_csv(index=False)) + write_text(root / "diagnostics" / "entry_feature_screen_report.md", _markdown_report(result, candidates)) + logging.info( + "trader.training.entry_feature_screened runId=%s featureCount=%s bucketMetricCount=%s candidateCount=%s reportPath=%s", + args.run_id, + len(FEATURE_ORDER), + len(bucket_metrics), + len(candidates), + root / "diagnostics" / "entry_feature_screen_report.md", + ) + + +def _split_baselines(dataset: pd.DataFrame, target_col: str, edge_col: str) -> dict[str, dict[str, float]]: + baselines: dict[str, dict[str, float]] = {} + for split_id in ALL_SPLITS: + part = dataset[dataset["split_id"].eq(split_id)] + if part.empty: + continue + baselines[split_id] = { + "rows": float(len(part)), + "positive_rate": float(part[target_col].mean()), + "avg_edge_bps": float(part[edge_col].mean()), + } + return baselines + + +def _feature_rows( + dataset: pd.DataFrame, + feature: str, + side: str, + target_col: str, + edge_col: str, + baselines: dict[str, dict[str, float]], +) -> list[dict[str, Any]]: + train_values = pd.to_numeric(dataset.loc[dataset["split_id"].eq(FIT_SPLIT), feature], errors="coerce").replace([np.inf, -np.inf], np.nan).dropna() + edges = _bucket_edges(train_values.to_numpy(dtype="float64")) + if len(edges) < 3: + logging.info("trader.training.entry_feature_screen_skipped feature=%s reason=not_enough_unique_values", feature) + return [] + + values = pd.to_numeric(dataset[feature], errors="coerce").replace([np.inf, -np.inf], np.nan) + bucket = pd.cut(values, bins=edges, include_lowest=True, labels=False, duplicates="drop") + working = dataset[["split_id", target_col, edge_col]].copy() + working["bucket_index"] = bucket.astype("float") + working = working.dropna(subset=["bucket_index"]) + if working.empty: + return [] + working["bucket_index"] = working["bucket_index"].astype(int) + + rows: list[dict[str, Any]] = [] + for (split_id, bucket_index), part in working.groupby(["split_id", "bucket_index"], sort=True, observed=False): + if split_id not in baselines: + continue + lower = float(edges[bucket_index]) + upper = float(edges[bucket_index + 1]) + baseline = baselines[split_id] + avg_edge = float(part[edge_col].mean()) + positive_rate = float(part[target_col].mean()) + rows.append( + { + "side": side, + "feature": feature, + "split_id": split_id, + "bucket_index": int(bucket_index), + "bucket_count": int(len(edges) - 1), + "bucket_lower": lower, + "bucket_upper": upper, + "row_count": int(len(part)), + "positive_rate": positive_rate, + "baseline_positive_rate": baseline["positive_rate"], + "positive_rate_lift": positive_rate - baseline["positive_rate"], + "avg_edge_bps": avg_edge, + "baseline_avg_edge_bps": baseline["avg_edge_bps"], + "avg_edge_lift_bps": avg_edge - baseline["avg_edge_bps"], + "median_edge_bps": float(part[edge_col].median()), + } + ) + return rows + + +def _bucket_edges(values: np.ndarray) -> np.ndarray: + clean = values[np.isfinite(values)] + if clean.size < 1000: + return np.array([], dtype="float64") + quantiles = np.linspace(0.0, 1.0, 11) + edges = np.quantile(clean, quantiles) + edges = np.unique(edges) + if edges.size < 3: + return np.array([], dtype="float64") + edges[0] = -np.inf + edges[-1] = np.inf + return edges + + +def _select_candidates(bucket_metrics: pd.DataFrame, min_bucket_rows: int) -> pd.DataFrame: + tune = bucket_metrics[bucket_metrics["split_id"].eq(TUNE_SPLIT) & (bucket_metrics["row_count"] >= min_bucket_rows)].copy() + if tune.empty: + return pd.DataFrame() + tune = tune.sort_values(["side", "feature", "avg_edge_lift_bps", "positive_rate_lift"], ascending=[True, True, False, False]) + picked = tune.groupby(["side", "feature"], as_index=False, observed=False).head(1) + + candidates = picked[ + [ + "side", + "feature", + "bucket_index", + "bucket_count", + "bucket_lower", + "bucket_upper", + "row_count", + "positive_rate", + "positive_rate_lift", + "avg_edge_bps", + "avg_edge_lift_bps", + ] + ].rename( + columns={ + "row_count": "tune_rows", + "positive_rate": "tune_positive_rate", + "positive_rate_lift": "tune_positive_rate_lift", + "avg_edge_bps": "tune_avg_edge_bps", + "avg_edge_lift_bps": "tune_avg_edge_lift_bps", + } + ) + for split_id in (VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT): + split_rows = bucket_metrics[bucket_metrics["split_id"].eq(split_id)][ + ["side", "feature", "bucket_index", "row_count", "positive_rate", "positive_rate_lift", "avg_edge_bps", "avg_edge_lift_bps"] + ].rename( + columns={ + "row_count": f"{split_id}_rows", + "positive_rate": f"{split_id}_positive_rate", + "positive_rate_lift": f"{split_id}_positive_rate_lift", + "avg_edge_bps": f"{split_id}_avg_edge_bps", + "avg_edge_lift_bps": f"{split_id}_avg_edge_lift_bps", + } + ) + candidates = candidates.merge(split_rows, on=["side", "feature", "bucket_index"], how="left") + + for column in ( + f"{VALIDATION_LOCKED_SPLIT}_rows", + f"{LATEST_STRESS_SPLIT}_rows", + f"{VALIDATION_LOCKED_SPLIT}_avg_edge_bps", + f"{LATEST_STRESS_SPLIT}_avg_edge_bps", + f"{VALIDATION_LOCKED_SPLIT}_avg_edge_lift_bps", + f"{LATEST_STRESS_SPLIT}_avg_edge_lift_bps", + ): + if column not in candidates.columns: + candidates[column] = np.nan + + candidates["stable_positive_edge"] = ( + (candidates["tune_avg_edge_bps"] > 0.0) + & (candidates[f"{VALIDATION_LOCKED_SPLIT}_avg_edge_bps"] > 0.0) + & (candidates[f"{LATEST_STRESS_SPLIT}_avg_edge_bps"] > 0.0) + ) + candidates["stable_lift"] = ( + (candidates["tune_avg_edge_lift_bps"] > 0.0) + & (candidates[f"{VALIDATION_LOCKED_SPLIT}_avg_edge_lift_bps"] > 0.0) + & (candidates[f"{LATEST_STRESS_SPLIT}_avg_edge_lift_bps"] > 0.0) + ) + candidates["min_eval_edge_bps"] = candidates[["tune_avg_edge_bps", f"{VALIDATION_LOCKED_SPLIT}_avg_edge_bps", f"{LATEST_STRESS_SPLIT}_avg_edge_bps"]].min(axis=1) + candidates["mean_eval_edge_bps"] = candidates[["tune_avg_edge_bps", f"{VALIDATION_LOCKED_SPLIT}_avg_edge_bps", f"{LATEST_STRESS_SPLIT}_avg_edge_bps"]].mean(axis=1) + candidates["min_eval_rows"] = candidates[["tune_rows", f"{VALIDATION_LOCKED_SPLIT}_rows", f"{LATEST_STRESS_SPLIT}_rows"]].min(axis=1) + candidates["screen_score"] = ( + candidates["min_eval_edge_bps"].fillna(-999.0) + + candidates["mean_eval_edge_bps"].fillna(-999.0) * 0.25 + + candidates["stable_lift"].astype(float) * 2.0 + ) + return candidates.sort_values("screen_score", ascending=False).reset_index(drop=True) + + +def _markdown_report(result: dict[str, Any], candidates: pd.DataFrame) -> str: + lines = [ + "# Entry 特征筛查报告", + "", + "## 结论怎么读", + "", + "这份报告只回答一个问题:历史数据里,单个特征的某些区间有没有稳定变好。", + "", + "- `tune_inner` 用来挑候选区间。", + "- `validation_locked` 和 `latest_stress` 用来检查这个区间是不是出了训练样本也还能站住。", + "- `stable_positive_edge=true` 代表这个区间在三个检查集里的平均净收益都大于 0。", + "- `stable_lift=true` 代表这个区间在三个检查集里都比对应大盘样本平均值更好。", + "", + "## 本次结果", + "", + f"- run_id: `{result['run_id']}`", + f"- 特征数: `{result['feature_count']}`", + f"- 分桶明细数: `{result['bucket_metric_count']}`", + f"- 候选数: `{result['candidate_count']}`", + f"- 最小分桶行数: `{result['min_bucket_rows']}`", + "", + ] + if candidates.empty: + lines.extend( + [ + "## 候选特征", + "", + "没有找到满足最小样本数的候选区间。下一步应先扩大数据或重新检查标签/价格计划,不建议直接继续调模型。", + "", + ] + ) + return "\n".join(lines) + stable = candidates[candidates["stable_positive_edge"] & candidates["stable_lift"]] + lines.extend( + [ + "## 稳定候选", + "", + f"- 同时满足正收益和正提升的候选数: `{len(stable)}`", + "", + ] + ) + display_columns = [ + "side", + "feature", + "bucket_index", + "bucket_lower", + "bucket_upper", + "tune_avg_edge_bps", + f"{VALIDATION_LOCKED_SPLIT}_avg_edge_bps", + f"{LATEST_STRESS_SPLIT}_avg_edge_bps", + "stable_positive_edge", + "stable_lift", + "screen_score", + ] + lines.append(_markdown_table(candidates[display_columns].head(20))) + lines.extend( + [ + "", + "## 文件", + "", + "- `diagnostics/entry_feature_bucket_metrics.csv`: 每个特征、每个桶、每个数据段的完整明细。", + "- `diagnostics/entry_feature_signal_candidates.csv`: 每个特征按调参集挑出的最好区间,以及封存验证/压力检查结果。", + "", + ] + ) + return "\n".join(lines) + + +def _markdown_table(frame: pd.DataFrame) -> str: + if frame.empty: + return "_无_" + columns = list(frame.columns) + lines = ["| " + " | ".join(columns) + " |", "| " + " | ".join(["---"] * len(columns)) + " |"] + for _, row in frame.iterrows(): + values = [_format_cell(row[column]) for column in columns] + lines.append("| " + " | ".join(values) + " |") + return "\n".join(lines) + + +def _format_cell(value: Any) -> str: + if pd.isna(value): + return "" + if isinstance(value, (float, np.floating)): + return f"{float(value):.6g}" + if isinstance(value, (bool, np.bool_)): + return "true" if bool(value) else "false" + return str(value) diff --git a/training/trader_training/features.py b/training/trader_training/features.py index ce5b2a4..a59b333 100644 --- a/training/trader_training/features.py +++ b/training/trader_training/features.py @@ -1,12 +1,14 @@ from __future__ import annotations import logging +from pathlib import Path from typing import Any import numpy as np import pandas as pd from trader_training.io_utils import ( + DEFAULT_RAW_ROOT, manifest, read_parquet, require_columns, @@ -49,7 +51,7 @@ def _rolling_rank_last(values: pd.Series, window: int) -> pd.Series: def _complete_days(frame: pd.DataFrame) -> pd.DataFrame: frame = frame.copy() frame["event_date"] = frame["event_time"].dt.strftime("%Y-%m-%d") - counts = frame.groupby(["symbol", "event_date"])["event_time"].count() + counts = frame.groupby(["symbol", "event_date"], observed=False)["event_time"].count() complete = counts[counts == 1440].reset_index()[["symbol", "event_date"]] return frame.merge(complete, on=["symbol", "event_date"], how="inner").drop(columns=["event_date"]) @@ -90,9 +92,18 @@ def build_feature_frame(args: Any) -> None: before = len(replay) replay = _complete_days(replay) logging.info("trader.training.feature_complete_days rowBefore=%s rowAfter=%s", before, len(replay)) + raw_root = Path(getattr(args, "raw_root", None) or DEFAULT_RAW_ROOT) + book_features = _load_book_minute_features(raw_root, replay[["symbol", "event_time", "open_time_ms"]]) + replay = replay.merge(book_features, on=["symbol", "open_time_ms"], how="left") + logging.info( + "trader.training.book_features_merged rowCount=%s bookRows=%s bookAvailableRows=%s", + len(replay), + len(book_features), + int(replay["book_top_imbalance"].notna().sum()), + ) frames: list[pd.DataFrame] = [] - for symbol, group in replay.groupby("symbol", sort=False): + for symbol, group in replay.groupby("symbol", sort=False, observed=False): group = group.sort_values("event_time").reset_index(drop=True).copy() close = group["close"].astype(float) high = group["high"].astype(float) @@ -140,6 +151,14 @@ def build_feature_frame(args: Any) -> None: group["oi_delta_15m_bps"] = (group["open_interest"].astype(float) / group["open_interest"].astype(float).shift(15) - 1.0) * 10000.0 group["oi_delta_60m_bps"] = (group["open_interest"].astype(float) / group["open_interest"].astype(float).shift(60) - 1.0) * 10000.0 group["mark_index_basis_bps"] = (group["mark_price"].astype(float) / group["index_price"].astype(float) - 1.0) * 10000.0 + group["book_pressure_spread_ratio"] = _safe_divide(group["book_microprice_basis_bps"].astype(float), group["spread_bps"].astype(float).abs().clip(lower=0.01)) + group["book_pressure_taker_1m"] = group["book_microprice_basis_bps"].astype(float) * group["taker_imbalance_1m"].astype(float) + group["book_pressure_taker_5m"] = group["book_microprice_basis_bps"].astype(float) * group["taker_imbalance_5m"].astype(float) + group["book_l20_imbalance_taker_15m"] = group["book_depth_imbalance_l20"].astype(float) * group["taker_imbalance_15m"].astype(float) + group["book_l20_imbalance_ret_15m"] = group["book_depth_imbalance_l20"].astype(float) * group["ret_15m_bps"].astype(float) + group["book_pressure_vol_adjusted"] = _safe_divide(group["book_microprice_basis_bps"].astype(float), group["realized_vol_15m_bps"].astype(float).clip(lower=1.0)) + group["book_depth_pressure_gap"] = group["book_depth_imbalance_l5"].astype(float) - group["book_depth_imbalance_l20"].astype(float) + group["book_pressure_reversal_15m"] = -group["book_microprice_basis_bps"].astype(float) * group["ret_15m_bps"].astype(float) liq_buy = group["liquidation_buy_notional_1m"].astype(float) liq_sell = group["liquidation_sell_notional_1m"].astype(float) liq_total_15 = (liq_buy + liq_sell).rolling(15, min_periods=1).sum() @@ -159,12 +178,13 @@ def build_feature_frame(args: Any) -> None: frame["split_id"] = assign_split(frame["event_time"], split_manifest_path) frame["walk_forward_fold"] = np.where(frame["split_id"].eq(FIT_SPLIT), "fold_01", "NO_FOLD") frame["feature_version"] = FEATURE_VERSION - hard_na = frame[FEATURE_ORDER].isna().any(axis=1) + numeric_features = frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan) + hard_na = numeric_features.isna().any(axis=1) optional_missing = frame["liquidation_available"].fillna(0).eq(0) frame["data_quality_flag"] = np.where(hard_na, "WARMUP", np.where(optional_missing, "PARTIAL_OPTIONAL", "OK")) ordered = frame[META_COLUMNS + FEATURE_ORDER].copy() for feature in FEATURE_ORDER: - ordered[feature] = pd.to_numeric(ordered[feature], errors="coerce").astype("float32") + ordered[feature] = pd.to_numeric(ordered[feature], errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32") feature_dir = root / "feature" data_hash = write_parquet(feature_dir / "feature_frame.parquet", ordered) @@ -202,7 +222,7 @@ def build_feature_frame(args: Any) -> None: def write_feature_report(path, frame: pd.DataFrame, feature_schema_hash: str, feature_order_hash: str) -> None: split_rows = [] - for split_id, group in frame.groupby("split_id", sort=True): + for split_id, group in frame.groupby("split_id", sort=True, observed=False): split_rows.append( { "split_id": split_id, @@ -246,6 +266,8 @@ def write_feature_report(path, frame: pd.DataFrame, feature_schema_hash: str, fe "", f"- replay_1m_required_columns: present", f"- liquidation_available_share: {float(frame['liquidation_available'].mean()):.6f}", + f"- book_available_share: {float(frame['book_top_imbalance'].notna().mean()):.6f}", + f"- feature_rows_with_book_missing: {int(frame['book_top_imbalance'].isna().sum())}", f"- feature_rows_with_optional_liquidation_missing: {int(frame['data_quality_flag'].eq('PARTIAL_OPTIONAL').sum())}", "", "## Leakage Check", @@ -286,6 +308,105 @@ def feature_order_hash() -> str: return sha256_json(FEATURE_ORDER) +def _load_book_minute_features(raw_root: Path, replay_keys: pd.DataFrame) -> pd.DataFrame: + if replay_keys.empty: + return pd.DataFrame(columns=["symbol", "open_time_ms", *_book_feature_columns()]) + keys = replay_keys.copy() + keys["event_time"] = to_utc_series(keys["event_time"]) + keys["event_date"] = keys["event_time"].dt.strftime("%Y-%m-%d") + frames: list[pd.DataFrame] = [] + for (symbol, event_date), _ in keys.groupby(["symbol", "event_date"], sort=True, observed=False): + path = raw_root / "table=book" / "exchange=BINANCE_FUTURES" / f"symbol={symbol}" / f"dt={event_date}" / "data.parquet" + if not path.is_file(): + logging.warning("trader.training.book_partition_missing symbol=%s eventDate=%s path=%s", symbol, event_date, path) + continue + day_features = _read_book_day(path, symbol) + frames.append(day_features) + logging.info( + "trader.training.book_partition_loaded symbol=%s eventDate=%s minuteRows=%s path=%s", + symbol, + event_date, + len(day_features), + path, + ) + if not frames: + return pd.DataFrame(columns=["symbol", "open_time_ms", *_book_feature_columns()]) + out = pd.concat(frames, ignore_index=True).drop_duplicates(["symbol", "open_time_ms"], keep="last") + wanted = keys[["symbol", "open_time_ms"]].drop_duplicates() + return wanted.merge(out, on=["symbol", "open_time_ms"], how="inner") + + +def _read_book_day(path: Path, symbol: str) -> pd.DataFrame: + columns = ["origin_time"] + for side in ("bid", "ask"): + for level in range(20): + columns.extend([f"{side}_{level}_price", f"{side}_{level}_size"]) + book = pd.read_parquet(path, columns=columns) + if book.empty: + return pd.DataFrame(columns=["symbol", "open_time_ms", *_book_feature_columns()]) + book = book.dropna(subset=["origin_time", "bid_0_price", "ask_0_price", "bid_0_size", "ask_0_size"]).copy() + book["origin_time"] = to_utc_series(book["origin_time"]) + book["minute"] = book["origin_time"].dt.floor("min") + book = book.sort_values("origin_time").drop_duplicates("minute", keep="last").reset_index(drop=True) + if book.empty: + return pd.DataFrame(columns=["symbol", "open_time_ms", *_book_feature_columns()]) + + bid0 = book["bid_0_price"].astype("float64").to_numpy() + ask0 = book["ask_0_price"].astype("float64").to_numpy() + bid0_size = book["bid_0_size"].astype("float64").to_numpy() + ask0_size = book["ask_0_size"].astype("float64").to_numpy() + mid = (bid0 + ask0) / 2.0 + top_denominator = np.maximum(bid0_size + ask0_size, 1e-12) + microprice = (bid0 * ask0_size + ask0 * bid0_size) / top_denominator + + bid_depth_l5 = _book_level_notional(book, "bid", 5) + ask_depth_l5 = _book_level_notional(book, "ask", 5) + bid_depth_l20 = _book_level_notional(book, "bid", 20) + ask_depth_l20 = _book_level_notional(book, "ask", 20) + total_l5 = bid_depth_l5 + ask_depth_l5 + total_l20 = bid_depth_l20 + ask_depth_l20 + + minute_ms = (book["minute"].astype("int64") // 1_000_000).astype("int64") + return pd.DataFrame( + { + "symbol": symbol, + "open_time_ms": minute_ms, + "book_top_imbalance": (bid0_size - ask0_size) / top_denominator, + "book_microprice_basis_bps": (microprice / mid - 1.0) * 10000.0, + "book_bid_depth_l5_quote": bid_depth_l5, + "book_ask_depth_l5_quote": ask_depth_l5, + "book_depth_imbalance_l5": _depth_imbalance(bid_depth_l5, ask_depth_l5), + "book_depth_imbalance_l20": _depth_imbalance(bid_depth_l20, ask_depth_l20), + "book_depth_concentration_l5_l20": total_l5 / np.maximum(total_l20, 1e-12), + } + ) + + +def _book_level_notional(book: pd.DataFrame, side: str, level_count: int) -> np.ndarray: + total = np.zeros(len(book), dtype="float64") + for level in range(level_count): + price = pd.to_numeric(book[f"{side}_{level}_price"], errors="coerce").fillna(0.0).to_numpy(dtype="float64") + size = pd.to_numeric(book[f"{side}_{level}_size"], errors="coerce").fillna(0.0).to_numpy(dtype="float64") + total += price * size + return total + + +def _depth_imbalance(bid_depth: np.ndarray, ask_depth: np.ndarray) -> np.ndarray: + return (bid_depth - ask_depth) / np.maximum(bid_depth + ask_depth, 1e-12) + + +def _book_feature_columns() -> list[str]: + return [ + "book_top_imbalance", + "book_microprice_basis_bps", + "book_bid_depth_l5_quote", + "book_ask_depth_l5_quote", + "book_depth_imbalance_l5", + "book_depth_imbalance_l20", + "book_depth_concentration_l5_l20", + ] + + def _high_correlation_rows(frame: pd.DataFrame) -> list[dict[str, object]]: sample = frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").dropna() if len(sample) > 5000: diff --git a/training/trader_training/labels.py b/training/trader_training/labels.py index f4f5e64..36db829 100644 --- a/training/trader_training/labels.py +++ b/training/trader_training/labels.py @@ -5,6 +5,7 @@ from typing import Any import numpy as np import pandas as pd +from numpy.lib.stride_tricks import sliding_window_view from trader_training.io_utils import ( manifest, @@ -36,6 +37,8 @@ DEFAULT_COST_CONFIG = { "funding_cost_bps": 0.5, } +ENTRY_LABEL_METHOD = "MAX_FUTURE_EDGE_V1" + def _load_config(path, default): if path is None: @@ -66,94 +69,144 @@ def _base_frames(args: Any) -> tuple[pd.DataFrame, pd.DataFrame]: return features, replay -def _future_path(group: pd.DataFrame, index: int, horizon: int) -> pd.DataFrame: - start = index + 1 - end = min(len(group), index + horizon + 1) - return group.iloc[start:end] +PATH_STAT_COLUMNS = [ + "symbol", + "open_time_ms", + "side", + "target_hit", + "stop_hit", + "timeout_hit", + "ambiguous_hit", + "time_to_target_ms", + "time_to_stop_ms", + "gross_edge_bps", + "future_return_bps", + "mfe_bps", + "mae_bps", + "future_spread_p80", + "future_realized_vol_bps", +] -def _contiguous_future_path(group: pd.DataFrame, index: int, horizon: int) -> pd.DataFrame: - path = _future_path(group, index, horizon) - if len(path) < horizon: - return pd.DataFrame() - current_ms = int(group.iloc[index]["open_time_ms"]) - expected = current_ms + np.arange(1, horizon + 1, dtype=np.int64) * 60_000 - actual = path["open_time_ms"].astype("int64").to_numpy() - if len(actual) != len(expected) or not np.array_equal(actual, expected): - return pd.DataFrame() - return path +def _empty_path_stats_frame() -> pd.DataFrame: + return pd.DataFrame(columns=PATH_STAT_COLUMNS) -def _side_return_bps(side: str, entry_price: float, exit_price: float) -> float: - if side == "LONG": - return (exit_price / entry_price - 1.0) * 10000.0 - return (entry_price / exit_price - 1.0) * 10000.0 +def _first_hit_index(hit_window: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + hit_any = hit_window.any(axis=1) + first_idx = np.argmax(hit_window, axis=1) + first_idx = np.where(hit_any, first_idx, hit_window.shape[1] + 1) + return hit_any, first_idx -def _path_stats(group: pd.DataFrame, index: int, side: str, horizon: int, target_bps: float, stop_bps: float) -> dict[str, Any]: - current = group.iloc[index] - entry = float(current["close"]) - path = _contiguous_future_path(group, index, horizon) - if path.empty: - return {"valid": False} - target_price = entry * (1.0 + target_bps / 10000.0) if side == "LONG" else entry * (1.0 - target_bps / 10000.0) - stop_price = entry * (1.0 - stop_bps / 10000.0) if side == "LONG" else entry * (1.0 + stop_bps / 10000.0) - target_hit = False - stop_hit = False - ambiguous = False - time_to_target_ms = -1 - time_to_stop_ms = -1 - for _, row in path.iterrows(): - high = float(row["high"]) - low = float(row["low"]) - if side == "LONG": - target_now = high >= target_price - stop_now = low <= stop_price - else: - target_now = low <= target_price - stop_now = high >= stop_price - if target_now and stop_now: - ambiguous = True - stop_hit = True - time_to_stop_ms = int(row["open_time_ms"] - current["open_time_ms"]) - break - if target_now: - target_hit = True - time_to_target_ms = int(row["open_time_ms"] - current["open_time_ms"]) - break - if stop_now: - stop_hit = True - time_to_stop_ms = int(row["open_time_ms"] - current["open_time_ms"]) - break - exit_price = float(path.iloc[-1]["close"]) - final_return_bps = _side_return_bps(side, entry, exit_price) - if side == "LONG": - mfe = (path["high"].max() / entry - 1.0) * 10000.0 - mae = (entry / path["low"].min() - 1.0) * 10000.0 +def _path_stats_for_group(group: pd.DataFrame, side: str, horizon: int, target_bps: float, stop_bps: float) -> pd.DataFrame: + if len(group) <= horizon: + return _empty_path_stats_frame() + + grouped = group.sort_values("event_time").reset_index(drop=True) + open_ms = grouped["open_time_ms"].astype("int64").to_numpy() + close = grouped["close"].astype("float64").to_numpy() + high = grouped["high"].astype("float64").to_numpy() + low = grouped["low"].astype("float64").to_numpy() + spread = grouped["spread_bps"].astype("float64").to_numpy() + + entry = close[:-horizon] + exit_price = close[horizon:] + current_open_ms = open_ms[:-horizon] + + bad_gap = (np.diff(open_ms) != 60_000).astype("int64") + gap_cumsum = np.concatenate(([0], np.cumsum(bad_gap))) + contiguous = (gap_cumsum[horizon:] - gap_cumsum[:-horizon]) == 0 + finite = np.isfinite(entry) & np.isfinite(exit_price) + valid = contiguous & finite + + future_high = sliding_window_view(high[1:], horizon) + future_low = sliding_window_view(low[1:], horizon) + future_spread = sliding_window_view(spread[1:], horizon) + + with np.errstate(all="ignore"): + high_max = np.nanmax(future_high, axis=1) + low_min = np.nanmin(future_low, axis=1) + spread_p80 = np.nanquantile(future_spread, 0.8, axis=1) + + if horizon > 1: + log_close = np.log(np.clip(close, 1e-12, None)) + log_return = np.diff(log_close) + future_log_return = sliding_window_view(log_return[1:], horizon - 1) + with np.errstate(all="ignore"): + realized_vol_bps = np.nanstd(future_log_return, axis=1, ddof=1) * 10000.0 else: - mfe = (entry / path["low"].min() - 1.0) * 10000.0 - mae = (path["high"].max() / entry - 1.0) * 10000.0 - if target_hit: - gross = target_bps - elif stop_hit: - gross = -stop_bps + realized_vol_bps = np.full(len(entry), np.nan) + + if side == "LONG": + target_price = entry * (1.0 + target_bps / 10000.0) + stop_price = entry * (1.0 - stop_bps / 10000.0) + target_window = future_high >= target_price[:, None] + stop_window = future_low <= stop_price[:, None] + future_return_bps = (exit_price / entry - 1.0) * 10000.0 + mfe_bps = (high_max / entry - 1.0) * 10000.0 + mae_bps = (entry / low_min - 1.0) * 10000.0 else: - gross = final_return_bps - return { - "valid": True, - "target_hit": int(target_hit), - "stop_hit": int(stop_hit), - "timeout_hit": int(not target_hit and not stop_hit), - "ambiguous_hit": int(ambiguous), - "time_to_target_ms": time_to_target_ms, - "time_to_stop_ms": time_to_stop_ms, - "gross_edge_bps": float(gross), - "future_return_bps": float(final_return_bps), - "mfe_bps": float(mfe), - "mae_bps": float(mae), - "future_spread_p80": float(path["spread_bps"].quantile(0.8)), - "future_realized_vol_bps": float(np.log(path["close"].astype(float) / path["close"].astype(float).shift(1)).std() * 10000.0), - } + target_price = entry * (1.0 - target_bps / 10000.0) + stop_price = entry * (1.0 + stop_bps / 10000.0) + target_window = future_low <= target_price[:, None] + stop_window = future_high >= stop_price[:, None] + future_return_bps = (entry / exit_price - 1.0) * 10000.0 + mfe_bps = (entry / low_min - 1.0) * 10000.0 + mae_bps = (high_max / entry - 1.0) * 10000.0 + + target_any, first_target_idx = _first_hit_index(target_window) + stop_any, first_stop_idx = _first_hit_index(stop_window) + ambiguous_hit = target_any & stop_any & (first_target_idx == first_stop_idx) + target_hit = target_any & (first_target_idx < first_stop_idx) + stop_hit = stop_any & (first_stop_idx <= first_target_idx) + timeout_hit = ~(target_hit | stop_hit) + gross_edge_bps = np.where(target_hit, target_bps, np.where(stop_hit, -stop_bps, future_return_bps)) + + out = pd.DataFrame( + { + "symbol": grouped["symbol"].iloc[0], + "open_time_ms": current_open_ms, + "side": side, + "target_hit": target_hit.astype("int8"), + "stop_hit": stop_hit.astype("int8"), + "timeout_hit": timeout_hit.astype("int8"), + "ambiguous_hit": ambiguous_hit.astype("int8"), + "time_to_target_ms": np.where(target_hit, (first_target_idx + 1) * 60_000, -1).astype("int64"), + "time_to_stop_ms": np.where(stop_hit, (first_stop_idx + 1) * 60_000, -1).astype("int64"), + "gross_edge_bps": gross_edge_bps.astype("float64"), + "future_return_bps": future_return_bps.astype("float64"), + "mfe_bps": mfe_bps.astype("float64"), + "mae_bps": mae_bps.astype("float64"), + "future_spread_p80": spread_p80.astype("float64"), + "future_realized_vol_bps": realized_vol_bps.astype("float64"), + } + ) + return out.loc[valid, PATH_STAT_COLUMNS].reset_index(drop=True) + + +def _build_path_stats(replay: pd.DataFrame, horizon: int, target_bps: float, stop_bps: float) -> pd.DataFrame: + frames: list[pd.DataFrame] = [] + for symbol, group in replay.groupby("symbol", sort=False, observed=False): + logging.info( + "trader.training.path_stats_group_start symbol=%s horizonMinutes=%s rowCount=%s", + symbol, + horizon, + len(group), + ) + for side in ("LONG", "SHORT"): + stats = _path_stats_for_group(group, side, horizon, target_bps, stop_bps) + frames.append(stats) + logging.info( + "trader.training.path_stats_side_done symbol=%s side=%s horizonMinutes=%s rowCount=%s", + symbol, + side, + horizon, + len(stats), + ) + out = pd.concat(frames, ignore_index=True) if frames else _empty_path_stats_frame() + logging.info("trader.training.path_stats_built horizonMinutes=%s rowCount=%s", horizon, len(out)) + return out def write_price_plan_context(args: Any) -> None: @@ -164,11 +217,13 @@ def write_price_plan_context(args: Any) -> None: cost_bps = float(cost["fee_bps"]) + float(cost["slippage_bps"]) + float(cost["funding_cost_bps"]) context = { "pricePlanId": args.price_plan_id, - "pricePlanConfigHash": sha256_json({"entry": entry, "cost": cost}), + "pricePlanConfigHash": sha256_json({"entry": entry, "cost": cost, "entry_label_method": ENTRY_LABEL_METHOD}), "stopDistanceBps": float(entry["stop_bps"]), "targetDistanceBps": float(entry["target_bps"]), "maxHoldMinutes": int(entry["max_hold_minutes"]), + "minExpectedNetEdgeBps": float(entry["min_expected_net_edge_bps"]), "costBps": cost_bps, + "entryLabelMethod": ENTRY_LABEL_METHOD, } path = root / "label" / "price_plan_context.json" write_json(path, context) @@ -178,7 +233,9 @@ def write_price_plan_context(args: Any) -> None: "target_bps": context["targetDistanceBps"], "stop_bps": context["stopDistanceBps"], "max_hold_minutes": context["maxHoldMinutes"], + "min_expected_net_edge_bps": context["minExpectedNetEdgeBps"], "cost_bps": context["costBps"], + "entry_label_method": context["entryLabelMethod"], }]) write_parquet(root / "label" / "price_plan_context.parquet", frame) logging.info("trader.training.price_plan_written runId=%s path=%s", args.run_id, path) @@ -233,43 +290,62 @@ def build_entry_labels(args: Any) -> None: features, replay = _base_frames(args) entry_conf = labels["entry"] cost_bps = float(cost["fee_bps"]) + float(cost["slippage_bps"]) + float(cost["funding_cost_bps"]) - rows: list[dict[str, Any]] = [] - groups, index_by_key = _group_replay_with_index(replay) - for feature in features.itertuples(index=False): - key = (feature.symbol, int(feature.open_time_ms)) - index = index_by_key.get(key) - if index is None: - continue - group = groups[feature.symbol] - for side in ("LONG", "SHORT"): - stats = _path_stats(group, index, side, int(entry_conf["max_hold_minutes"]), float(entry_conf["target_bps"]), float(entry_conf["stop_bps"])) - if not stats["valid"]: - continue - expected = stats["gross_edge_bps"] - cost_bps - rows.append( - { - "sample_id": feature.sample_id, - "symbol": feature.symbol, - "event_time": feature.event_time, - "side": side, - "price_plan_id": plan["pricePlanId"], - "price_plan_hash": plan["pricePlanConfigHash"], - "target_hit": stats["target_hit"], - "stop_hit": stats["stop_hit"], - "timeout_hit": stats["timeout_hit"], - "ambiguous_hit": stats["ambiguous_hit"], - "time_to_target_ms": stats["time_to_target_ms"], - "time_to_stop_ms": stats["time_to_stop_ms"], - "gross_edge_bps": stats["gross_edge_bps"], - "cost_bps": cost_bps, - "expected_net_edge_bps": expected, - "entry_target": int(stats["target_hit"] == 1 and expected >= float(entry_conf["min_expected_net_edge_bps"])), - "split_id": feature.split_id, - "walk_forward_fold": feature.walk_forward_fold, - "label_version": LABEL_VERSION, - } - ) - out = pd.DataFrame(rows) + stats = _build_path_stats( + replay, + int(entry_conf["max_hold_minutes"]), + float(entry_conf["target_bps"]), + float(entry_conf["stop_bps"]), + ) + feature_columns = [ + "sample_id", + "symbol", + "event_time", + "open_time_ms", + "split_id", + "walk_forward_fold", + "spread_bps", + "spread_rank_24h_pct", + "realized_vol_15m_bps", + ] + merged = features[feature_columns].merge(stats, on=["symbol", "open_time_ms"], how="inner") + merged["max_achievable_gross_edge_bps"] = merged["mfe_bps"] + merged["max_achievable_net_edge_bps"] = merged["max_achievable_gross_edge_bps"] - cost_bps + merged["expected_net_edge_bps"] = merged["max_achievable_net_edge_bps"] + merged["entry_target"] = (merged["max_achievable_net_edge_bps"] >= float(entry_conf["min_expected_net_edge_bps"])).astype("int8") + merged["price_plan_id"] = plan["pricePlanId"] + merged["price_plan_hash"] = plan["pricePlanConfigHash"] + merged["cost_bps"] = cost_bps + merged["label_method"] = ENTRY_LABEL_METHOD + merged["label_version"] = LABEL_VERSION + out = merged[ + [ + "sample_id", + "symbol", + "event_time", + "side", + "price_plan_id", + "price_plan_hash", + "target_hit", + "stop_hit", + "timeout_hit", + "ambiguous_hit", + "time_to_target_ms", + "time_to_stop_ms", + "gross_edge_bps", + "future_return_bps", + "mfe_bps", + "mae_bps", + "max_achievable_gross_edge_bps", + "max_achievable_net_edge_bps", + "cost_bps", + "expected_net_edge_bps", + "entry_target", + "label_method", + "split_id", + "walk_forward_fold", + "label_version", + ] + ].copy() path = root / "label" / "entry_labels.parquet" data_hash = write_parquet(path, out) _write_label_manifest(root / "label" / "entry_labels.manifest.json", path, out, data_hash) @@ -286,8 +362,8 @@ def build_position_state_samples(args: Any) -> None: samples = entry[entry["entry_target"] == 1].copy() samples["position_age_minutes"] = 0 samples["unrealized_pnl_bps"] = 0.0 - samples["mfe_bps"] = samples["gross_edge_bps"].clip(lower=0) - samples["mae_bps"] = (-samples["gross_edge_bps"]).clip(lower=0) + samples["mfe_bps"] = pd.to_numeric(samples["mfe_bps"], errors="coerce").fillna(0.0).clip(lower=0) + samples["mae_bps"] = pd.to_numeric(samples["mae_bps"], errors="coerce").fillna(0.0).clip(lower=0) path = root / "label" / "position_state_samples.parquet" data_hash = write_parquet(path, samples) write_json(root / "label" / "position_state_samples.manifest.json", manifest(path, {"row_count": len(samples), "data_hash_sha256": data_hash})) @@ -304,80 +380,95 @@ def build_continue_exit_risk_labels(args: Any) -> None: horizon = int(labels["continue"]["horizon_minutes"]) target_bps = float(plan["targetDistanceBps"]) stop_bps = float(plan["stopDistanceBps"]) - rows_continue: list[dict[str, Any]] = [] - rows_exit: list[dict[str, Any]] = [] - rows_risk: list[dict[str, Any]] = [] - groups, index_by_key = _group_replay_with_index(replay) - for feature in features.itertuples(index=False): - key = (feature.symbol, int(feature.open_time_ms)) - index = index_by_key.get(key) - if index is None: - continue - group = groups[feature.symbol] - long_stats = _path_stats(group, index, "LONG", horizon, target_bps, stop_bps) - short_stats = _path_stats(group, index, "SHORT", horizon, target_bps, stop_bps) - if not long_stats["valid"] or not short_stats["valid"]: - continue - long_edge = long_stats["future_return_bps"] - cost_bps - short_edge = short_stats["future_return_bps"] - cost_bps - min_continue = float(labels["continue"]["min_expected_continue_edge_bps"]) - adverse_threshold = float(labels["exit"]["adverse_move_bps"]) - rows_continue.append( - { - "sample_id": feature.sample_id, - "symbol": feature.symbol, - "event_time": feature.event_time, - "long_continue_target": int(long_edge >= min_continue and long_stats["mae_bps"] < stop_bps), - "short_continue_target": int(short_edge >= min_continue and short_stats["mae_bps"] < stop_bps), - "long_expected_continue_edge_bps": long_edge, - "short_expected_continue_edge_bps": short_edge, - "split_id": feature.split_id, - "walk_forward_fold": feature.walk_forward_fold, - "label_version": LABEL_VERSION, - } - ) - stagnation = int(abs(long_stats["future_return_bps"]) <= float(labels["exit"]["stagnation_abs_return_bps"])) - rows_exit.append( - { - "sample_id": feature.sample_id, - "symbol": feature.symbol, - "event_time": feature.event_time, - "long_exit_target": int(long_stats["stop_hit"] == 1 or long_stats["mae_bps"] >= adverse_threshold), - "short_exit_target": int(short_stats["stop_hit"] == 1 or short_stats["mae_bps"] >= adverse_threshold), - "long_adverse_move_bps": long_stats["mae_bps"], - "short_adverse_move_bps": short_stats["mae_bps"], - "adverse_move_prob_label": int(max(long_stats["mae_bps"], short_stats["mae_bps"]) >= adverse_threshold), - "reversal_prob_label": int(np.sign(long_stats["future_return_bps"]) != np.sign(feature.ret_15m_bps) if hasattr(feature, "ret_15m_bps") else 0), - "stop_hit_prob_label": int(long_stats["stop_hit"] == 1 or short_stats["stop_hit"] == 1), - "stagnation_prob_label": stagnation, - "split_id": feature.split_id, - "walk_forward_fold": feature.walk_forward_fold, - "label_version": LABEL_VERSION, - } - ) - path_risk = max(long_stats["mae_bps"], short_stats["mae_bps"]) - vol_ratio = 0.0 if long_stats["future_realized_vol_bps"] != long_stats["future_realized_vol_bps"] else long_stats["future_realized_vol_bps"] - rows_risk.append( - { - "sample_id": feature.sample_id, - "symbol": feature.symbol, - "event_time": feature.event_time, - "market_risk_target": int(path_risk >= float(labels["risk"]["market_drawdown_bps"])), - "market_path_risk_bps": path_risk, - "long_position_path_risk_bps": long_stats["mae_bps"], - "short_position_path_risk_bps": short_stats["mae_bps"], - "long_position_risk_target": int(long_stats["mae_bps"] >= stop_bps), - "short_position_risk_target": int(short_stats["mae_bps"] >= stop_bps), - "market_drawdown_prob_label": int(path_risk >= float(labels["risk"]["market_drawdown_bps"])), - "volatility_expansion_prob_label": int(vol_ratio >= float(labels["risk"]["spike_bps"])), - "spike_prob_label": int(max(long_stats["mfe_bps"], short_stats["mfe_bps"], path_risk) >= float(labels["risk"]["spike_bps"])), - "liquidity_deterioration_prob_label": int(long_stats["future_spread_p80"] >= float(replay["spread_bps"].quantile(0.9))), - "position_drawdown_prob_label": int(max(long_stats["mae_bps"], short_stats["mae_bps"]) >= stop_bps), - "split_id": feature.split_id, - "walk_forward_fold": feature.walk_forward_fold, - "label_version": LABEL_VERSION, - } - ) + stats = _build_path_stats(replay, horizon, target_bps, stop_bps) + long_stats = stats[stats["side"] == "LONG"].drop(columns=["side"]).add_prefix("long_") + short_stats = stats[stats["side"] == "SHORT"].drop(columns=["side"]).add_prefix("short_") + long_stats = long_stats.rename(columns={"long_symbol": "symbol", "long_open_time_ms": "open_time_ms"}) + short_stats = short_stats.rename(columns={"short_symbol": "symbol", "short_open_time_ms": "open_time_ms"}) + feature_columns = [ + "sample_id", + "symbol", + "event_time", + "open_time_ms", + "split_id", + "walk_forward_fold", + "spread_bps", + "spread_rank_24h_pct", + "realized_vol_15m_bps", + ] + if "ret_15m_bps" in features.columns: + feature_columns.append("ret_15m_bps") + merged = features[feature_columns].merge(long_stats, on=["symbol", "open_time_ms"], how="inner") + merged = merged.merge(short_stats, on=["symbol", "open_time_ms"], how="inner") + min_continue = float(labels["continue"]["min_expected_continue_edge_bps"]) + adverse_threshold = float(labels["exit"]["adverse_move_bps"]) + current_vol = merged["realized_vol_15m_bps"].astype(float).fillna(0.0).clip(lower=1.0) + + long_edge = merged["long_future_return_bps"] - cost_bps + short_edge = merged["short_future_return_bps"] - cost_bps + path_risk = np.maximum(merged["long_mae_bps"], merged["short_mae_bps"]) + max_path_move = np.maximum.reduce([merged["long_mfe_bps"], merged["short_mfe_bps"], path_risk]) + if "ret_15m_bps" in merged.columns: + reversal = (np.sign(merged["long_future_return_bps"]) != np.sign(merged["ret_15m_bps"])).astype("int8") + else: + reversal = pd.Series(0, index=merged.index, dtype="int8") + future_vol = merged["long_future_realized_vol_bps"].fillna(0.0) + volatility_expansion = future_vol >= current_vol * float(labels["risk"]["vol_expansion_ratio"]) + liquidity_deterioration = merged["spread_rank_24h_pct"].astype(float).fillna(0.0) >= 0.90 + + rows_continue = pd.DataFrame( + { + "sample_id": merged["sample_id"], + "symbol": merged["symbol"], + "event_time": merged["event_time"], + "long_continue_target": ((long_edge >= min_continue) & (merged["long_mae_bps"] < stop_bps)).astype("int8"), + "short_continue_target": ((short_edge >= min_continue) & (merged["short_mae_bps"] < stop_bps)).astype("int8"), + "long_expected_continue_edge_bps": long_edge, + "short_expected_continue_edge_bps": short_edge, + "split_id": merged["split_id"], + "walk_forward_fold": merged["walk_forward_fold"], + "label_version": LABEL_VERSION, + } + ) + rows_exit = pd.DataFrame( + { + "sample_id": merged["sample_id"], + "symbol": merged["symbol"], + "event_time": merged["event_time"], + "long_exit_target": ((merged["long_stop_hit"] == 1) | (merged["long_mae_bps"] >= adverse_threshold)).astype("int8"), + "short_exit_target": ((merged["short_stop_hit"] == 1) | (merged["short_mae_bps"] >= adverse_threshold)).astype("int8"), + "long_adverse_move_bps": merged["long_mae_bps"], + "short_adverse_move_bps": merged["short_mae_bps"], + "adverse_move_prob_label": (path_risk >= adverse_threshold).astype("int8"), + "reversal_prob_label": reversal, + "stop_hit_prob_label": ((merged["long_stop_hit"] == 1) | (merged["short_stop_hit"] == 1)).astype("int8"), + "stagnation_prob_label": (merged["long_future_return_bps"].abs() <= float(labels["exit"]["stagnation_abs_return_bps"])).astype("int8"), + "split_id": merged["split_id"], + "walk_forward_fold": merged["walk_forward_fold"], + "label_version": LABEL_VERSION, + } + ) + rows_risk = pd.DataFrame( + { + "sample_id": merged["sample_id"], + "symbol": merged["symbol"], + "event_time": merged["event_time"], + "market_risk_target": (path_risk >= float(labels["risk"]["market_drawdown_bps"])).astype("int8"), + "market_path_risk_bps": path_risk, + "long_position_path_risk_bps": merged["long_mae_bps"], + "short_position_path_risk_bps": merged["short_mae_bps"], + "long_position_risk_target": (merged["long_mae_bps"] >= stop_bps).astype("int8"), + "short_position_risk_target": (merged["short_mae_bps"] >= stop_bps).astype("int8"), + "market_drawdown_prob_label": (path_risk >= float(labels["risk"]["market_drawdown_bps"])).astype("int8"), + "volatility_expansion_prob_label": volatility_expansion.astype("int8"), + "spike_prob_label": (max_path_move >= float(labels["risk"]["spike_bps"])).astype("int8"), + "liquidity_deterioration_prob_label": liquidity_deterioration.astype("int8"), + "position_drawdown_prob_label": (path_risk >= stop_bps).astype("int8"), + "split_id": merged["split_id"], + "walk_forward_fold": merged["walk_forward_fold"], + "label_version": LABEL_VERSION, + } + ) outputs = [ ("continue", pd.DataFrame(rows_continue), "long_continue_target"), ("exit", pd.DataFrame(rows_exit), "long_exit_target"), @@ -404,14 +495,3 @@ def _write_distribution_report(path, frame: pd.DataFrame, column: str) -> None: counts = frame[column].value_counts(dropna=False).to_dict() if not frame.empty else {} lines = ["# Label Report", "", f"- row_count: {len(frame)}", f"- target_column: {column}", f"- distribution: {counts}", ""] write_text(path, "\n".join(lines)) - - -def _group_replay_with_index(replay: pd.DataFrame) -> tuple[dict[str, pd.DataFrame], dict[tuple[str, int], int]]: - groups: dict[str, pd.DataFrame] = {} - index_by_key: dict[tuple[str, int], int] = {} - for symbol, group in replay.groupby("symbol", sort=False): - grouped = group.sort_values("event_time").reset_index(drop=True) - groups[symbol] = grouped - for idx, row in grouped.iterrows(): - index_by_key[(symbol, int(row["open_time_ms"]))] = idx - return groups, index_by_key diff --git a/training/trader_training/nonlinear_benchmark.py b/training/trader_training/nonlinear_benchmark.py new file mode 100644 index 0000000..97d5eae --- /dev/null +++ b/training/trader_training/nonlinear_benchmark.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import json +import logging +from typing import Any + +import numpy as np +import pandas as pd +from sklearn.ensemble import HistGradientBoostingClassifier +from sklearn.metrics import accuracy_score, brier_score_loss, log_loss, roc_auc_score + +from trader_training.io_utils import read_parquet, run_root, write_json, write_text +from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT + + +EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) + + +def benchmark_nonlinear_models(args: Any) -> None: + root = run_root(args) + result = { + "run_id": args.run_id, + "purpose": "diagnostic_only_not_exported", + "model_family": "sklearn_hist_gradient_boosting", + "feature_count": len(FEATURE_ORDER), + "direction": _benchmark_direction(root), + "entry": _benchmark_entry(root), + } + out_dir = root / "diagnostics" + write_json(out_dir / "nonlinear_benchmark_result.json", result) + _write_report(out_dir / "nonlinear_benchmark_report.md", result) + logging.info( + "trader.training.nonlinear_benchmark_written runId=%s path=%s", + args.run_id, + out_dir / "nonlinear_benchmark_report.md", + ) + + +def _benchmark_direction(root) -> dict[str, Any]: + dataset = read_parquet(root / "dataset" / "direction_train.parquet") + train = dataset[dataset["split_id"] == FIT_SPLIT].copy() + x_train = _x(train) + y_train = train[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1) + model = HistGradientBoostingClassifier( + max_iter=120, + learning_rate=0.05, + max_leaf_nodes=31, + l2_regularization=0.01, + early_stopping=True, + random_state=7, + ) + model.fit(x_train, y_train) + train_prior = np.bincount(y_train, minlength=3).astype(float) + train_prior = train_prior / train_prior.sum() + metrics = {} + for split in EVAL_SPLITS: + frame = dataset[dataset["split_id"] == split].copy() + if frame.empty: + continue + y_true = frame[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1) + proba = model.predict_proba(_x(frame)) + metrics[split] = _multiclass_metrics(y_true, proba, train_prior) + return {"metrics": metrics} + + +def _benchmark_entry(root) -> dict[str, Any]: + dataset = read_parquet(root / "dataset" / "entry_train.parquet") + train = dataset[dataset["split_id"] == FIT_SPLIT].copy() + result: dict[str, Any] = {} + for target in ("long_entry_target", "short_entry_target"): + y_train = train[target].astype(int).to_numpy() + if len(np.unique(y_train)) < 2: + result[target] = {"status": "SKIPPED_ONE_CLASS_TRAIN"} + continue + model = HistGradientBoostingClassifier( + max_iter=160, + learning_rate=0.04, + max_leaf_nodes=31, + l2_regularization=0.02, + early_stopping=True, + random_state=11, + ) + model.fit(_x(train), y_train) + train_prior = float(y_train.mean()) + split_metrics = {} + for split in EVAL_SPLITS: + frame = dataset[dataset["split_id"] == split].copy() + if frame.empty: + continue + y_true = frame[target].astype(int).to_numpy() + proba = model.predict_proba(_x(frame))[:, 1] + split_metrics[split] = _binary_metrics(y_true, proba, train_prior) + result[target] = {"metrics": split_metrics} + return result + + +def _x(frame: pd.DataFrame) -> np.ndarray: + return frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32").to_numpy() + + +def _binary_metrics(y_true: np.ndarray, proba: np.ndarray, train_prior: float) -> dict[str, Any]: + prior = float(np.clip(train_prior, 1e-6, 1 - 1e-6)) + constant = np.full(len(y_true), prior) + order = np.argsort(-proba) + top_n = max(1, int(len(y_true) * 0.10)) + metrics = { + "row_count": int(len(y_true)), + "positive_rate": float(y_true.mean()) if len(y_true) else 0.0, + "brier": float(brier_score_loss(y_true, proba)) if len(y_true) else 0.0, + "constant_brier": float(brier_score_loss(y_true, constant)) if len(y_true) else 0.0, + "top10_hit_rate": float(y_true[order[:top_n]].mean()) if len(y_true) else 0.0, + "all_hit_rate": float(y_true.mean()) if len(y_true) else 0.0, + } + if len(np.unique(y_true)) == 2: + metrics["auc"] = float(roc_auc_score(y_true, proba)) + metrics["tradable_signal"] = bool( + metrics.get("auc", 0.0) >= 0.56 + and metrics["brier"] < metrics["constant_brier"] + and metrics["top10_hit_rate"] > metrics["all_hit_rate"] + ) + return metrics + + +def _multiclass_metrics(y_true: np.ndarray, proba: np.ndarray, train_prior: np.ndarray) -> dict[str, Any]: + constant = np.tile(train_prior.reshape(1, -1), (len(y_true), 1)) + pred = proba.argmax(axis=1) + metrics = { + "row_count": int(len(y_true)), + "accuracy": float(accuracy_score(y_true, pred)), + "logloss": float(log_loss(y_true, proba, labels=[0, 1, 2])), + "constant_logloss": float(log_loss(y_true, constant, labels=[0, 1, 2])), + } + for class_id, name in enumerate(("long_auc", "short_auc", "neutral_auc")): + binary = (y_true == class_id).astype(int) + if len(np.unique(binary)) == 2: + metrics[name] = float(roc_auc_score(binary, proba[:, class_id])) + metrics["tradable_signal"] = bool( + metrics.get("long_auc", 0.0) >= 0.56 + and metrics.get("short_auc", 0.0) >= 0.56 + and metrics["logloss"] < metrics["constant_logloss"] + ) + return metrics + + +def _write_report(path, result: dict[str, Any]) -> None: + lines = [ + "# Nonlinear Benchmark Report", + "", + "这份报告只做诊断,不导出上线模型。它回答:同样的特征给更强一点的树模型,能不能找到稳定信号。", + "", + f"- run_id: `{result['run_id']}`", + f"- feature_count: `{result['feature_count']}`", + "", + "## Direction", + "", + _json_block(result["direction"]["metrics"]), + "", + "## Entry", + "", + _json_block(result["entry"]), + "", + ] + write_text(path, "\n".join(lines)) + + +def _json_block(value: Any) -> str: + return "```json\n" + json.dumps(value, ensure_ascii=False, indent=2) + "\n```" diff --git a/training/trader_training/onnx_export.py b/training/trader_training/onnx_export.py index e9618a9..4e164fc 100644 --- a/training/trader_training/onnx_export.py +++ b/training/trader_training/onnx_export.py @@ -5,6 +5,8 @@ from pathlib import Path import numpy as np +from trader_training.schemas import FEATURE_ORDER + @dataclass(frozen=True) class LinearHead: @@ -23,7 +25,7 @@ def require_onnx(): return onnx, TensorProto, helper, numpy_helper -def export_heads(path: Path, heads: list[LinearHead], feature_count: int = 39, opset: int = 17) -> None: +def export_heads(path: Path, heads: list[LinearHead], feature_count: int = len(FEATURE_ORDER), opset: int = 17) -> None: onnx, TensorProto, helper, numpy_helper = require_onnx() nodes = [] initializers = [] diff --git a/training/trader_training/pm.py b/training/trader_training/pm.py index c24df89..9e91f0f 100644 --- a/training/trader_training/pm.py +++ b/training/trader_training/pm.py @@ -228,12 +228,12 @@ def _pm_frame(root, split_id: str) -> pd.DataFrame: def _threshold_candidates() -> list[dict[str, float]]: values = itertools.product( - [0.54, 0.56, 0.58, 0.60], - [0.54, 0.56, 0.58, 0.60], - [0.50, 0.52, 0.55, 0.58], - [0.35, 0.45, 0.55], - [1.0, 2.0, 3.0, 5.0], - [0.02, 0.03, 0.05], + [0.50, 0.52, 0.54, 0.56, 0.58], + [0.50, 0.52, 0.54, 0.56, 0.58], + [0.10, 0.12, 0.14, 0.16, 0.20, 0.30, 0.50], + [0.55, 0.75, 0.90, 1.00], + [-8.0, -4.0, 0.0, 1.0, 3.0], + [0.00, 0.01, 0.02, 0.05], ) return [ { @@ -398,12 +398,18 @@ def _backtest_status(metrics: dict[str, dict[str, Any]]) -> tuple[str, list[str] def _score_thresholds(metrics: dict[str, Any]) -> float: if metrics["trade_count"] == 0: return -1_000_000.0 - low_sample_penalty = max(0, 20 - int(metrics["trade_count"])) * 5.0 + # 最终上线门槛要求 validation_locked 至少 80 笔;调参区如果只挑几十笔, + # 很容易是运气好,不是稳定规则,所以这里提前惩罚小样本阈值。 + low_sample_penalty = max(0, 120 - int(metrics["trade_count"])) * 1.5 + profit_factor_penalty = max(0.0, 1.15 - float(metrics["profit_factor"])) * 20.0 + negative_edge_penalty = max(0.0, -float(metrics["avg_weighted_edge_bps"])) * 40.0 return ( metrics["avg_weighted_edge_bps"] * np.sqrt(metrics["trade_count"]) + metrics["total_weighted_edge_bps"] * 0.05 - metrics["max_drawdown_bps"] * 0.25 - low_sample_penalty + - profit_factor_penalty + - negative_edge_penalty ) diff --git a/training/trader_training/price_plan_search.py b/training/trader_training/price_plan_search.py new file mode 100644 index 0000000..8f604db --- /dev/null +++ b/training/trader_training/price_plan_search.py @@ -0,0 +1,368 @@ +from __future__ import annotations + +import itertools +import logging +from typing import Any + +import numpy as np +import pandas as pd + +from trader_training.io_utils import read_parquet, run_root, write_json, write_text +from trader_training.labels import DEFAULT_COST_CONFIG, DEFAULT_LABEL_CONFIG, ENTRY_LABEL_METHOD, _load_config +from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT + + +EVAL_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) +DEFAULT_HORIZONS = (30, 45, 60, 90, 120) +DEFAULT_TARGETS = (12.0, 16.0, 20.0, 24.0, 32.0, 40.0) +DEFAULT_STOPS = (6.0, 8.0, 10.0, 12.0, 16.0) + + +def search_price_plans(args: Any) -> None: + root = run_root(args) + replay = read_parquet(args.replay_path or root / "replay" / "replay_1m.parquet") + features = read_parquet(args.feature_path or root / "feature" / "feature_frame.parquet") + label_config = _load_config(args.label_config_path, DEFAULT_LABEL_CONFIG) + cost_config = _load_config(args.cost_config_path, DEFAULT_COST_CONFIG) + cost_bps = float(cost_config["fee_bps"]) + float(cost_config["slippage_bps"]) + float(cost_config["funding_cost_bps"]) + min_expected_edge_bps = float(label_config["entry"]["min_expected_net_edge_bps"]) + + trainable = features[ + features["data_quality_flag"].isin(["OK", "PARTIAL_OPTIONAL"]) + & features["split_id"].isin(EVAL_SPLITS) + ][["symbol", "open_time_ms", "split_id"]].copy() + if trainable.empty: + raise ValueError("price plan search needs trainable feature rows") + + rows: list[dict[str, Any]] = [] + for symbol, group in replay.groupby("symbol", sort=False, observed=False): + feature_split_by_ms = ( + trainable[trainable["symbol"].eq(symbol)] + .drop_duplicates("open_time_ms") + .set_index("open_time_ms")["split_id"] + .to_dict() + ) + if not feature_split_by_ms: + continue + symbol_rows = _symbol_plan_rows( + symbol, + group.sort_values("event_time").reset_index(drop=True), + feature_split_by_ms, + cost_bps, + min_expected_edge_bps, + args.horizons or DEFAULT_HORIZONS, + args.targets or DEFAULT_TARGETS, + args.stops or DEFAULT_STOPS, + ) + rows.extend(symbol_rows) + + result = pd.DataFrame(rows) + if result.empty: + raise ValueError("price plan search produced no candidate rows") + summary = _plan_summary(result) + best = _select_best_plan(summary) + payload = { + "run_id": args.run_id, + "cost_bps": cost_bps, + "min_expected_net_edge_bps": min_expected_edge_bps, + "entry_label_method": ENTRY_LABEL_METHOD, + "candidate_count": int(summary["plan_id"].nunique()), + "best_plan": best, + } + write_json(root / "price-plan-search" / "price_plan_search_result.json", _jsonable(payload)) + write_text(root / "price-plan-search" / "price_plan_search_rows.csv", result.to_csv(index=False)) + write_text(root / "price-plan-search" / "price_plan_search_summary.csv", summary.to_csv(index=False)) + write_text(root / "price-plan-search" / "price_plan_search_report.md", _markdown_report(payload, summary)) + logging.info( + "trader.training.price_plan_searched runId=%s candidateCount=%s bestPlan=%s bestScore=%.6f", + args.run_id, + payload["candidate_count"], + best["plan_id"], + best["score"], + ) + + +def _symbol_plan_rows( + symbol: str, + replay: pd.DataFrame, + feature_split_by_ms: dict[int, str], + cost_bps: float, + min_expected_edge_bps: float, + horizons: tuple[int, ...], + targets: tuple[float, ...], + stops: tuple[float, ...], +) -> list[dict[str, Any]]: + close = replay["close"].astype("float64").to_numpy() + high = replay["high"].astype("float64").to_numpy() + low = replay["low"].astype("float64").to_numpy() + open_time_ms = replay["open_time_ms"].astype("int64").to_numpy() + rows: list[dict[str, Any]] = [] + for horizon in horizons: + if len(replay) <= horizon: + continue + high_window = np.lib.stride_tricks.sliding_window_view(high, horizon + 1)[:, 1:] + low_window = np.lib.stride_tricks.sliding_window_view(low, horizon + 1)[:, 1:] + time_window = np.lib.stride_tricks.sliding_window_view(open_time_ms, horizon + 1)[:, 1:] + entry_price = close[: len(high_window)] + exit_price = close[horizon:] + current_ms = open_time_ms[: len(high_window)] + expected_times = current_ms.reshape(-1, 1) + np.arange(1, horizon + 1, dtype=np.int64).reshape(1, -1) * 60_000 + contiguous = np.all(time_window == expected_times, axis=1) + split_values = pd.Series(current_ms).map(feature_split_by_ms).to_numpy() + feature_mask = pd.notna(split_values) + usable = contiguous & feature_mask + if not usable.any(): + continue + for target_bps, stop_bps in itertools.product(targets, stops): + if target_bps - cost_bps < min_expected_edge_bps: + continue + rows.extend( + _plan_side_rows( + symbol, + horizon, + target_bps, + stop_bps, + "LONG", + entry_price, + exit_price, + high_window, + low_window, + split_values, + usable, + cost_bps, + min_expected_edge_bps, + ) + ) + rows.extend( + _plan_side_rows( + symbol, + horizon, + target_bps, + stop_bps, + "SHORT", + entry_price, + exit_price, + high_window, + low_window, + split_values, + usable, + cost_bps, + min_expected_edge_bps, + ) + ) + return rows + + +def _plan_side_rows( + symbol: str, + horizon: int, + target_bps: float, + stop_bps: float, + side: str, + entry_price: np.ndarray, + exit_price: np.ndarray, + high_window: np.ndarray, + low_window: np.ndarray, + split_values: np.ndarray, + usable: np.ndarray, + cost_bps: float, + min_expected_edge_bps: float, +) -> list[dict[str, Any]]: + if side == "LONG": + target_price = entry_price.reshape(-1, 1) * (1.0 + target_bps / 10000.0) + stop_price = entry_price.reshape(-1, 1) * (1.0 - stop_bps / 10000.0) + target_matrix = high_window >= target_price + stop_matrix = low_window <= stop_price + timeout_return = (exit_price / entry_price - 1.0) * 10000.0 + max_achievable_gross = (np.nanmax(high_window, axis=1) / entry_price - 1.0) * 10000.0 + else: + target_price = entry_price.reshape(-1, 1) * (1.0 - target_bps / 10000.0) + stop_price = entry_price.reshape(-1, 1) * (1.0 + stop_bps / 10000.0) + target_matrix = low_window <= target_price + stop_matrix = high_window >= stop_price + timeout_return = (entry_price / exit_price - 1.0) * 10000.0 + max_achievable_gross = (entry_price / np.nanmin(low_window, axis=1) - 1.0) * 10000.0 + + large = target_matrix.shape[1] + 1 + target_any = target_matrix.any(axis=1) + stop_any = stop_matrix.any(axis=1) + target_index = np.where(target_any, target_matrix.argmax(axis=1), large) + stop_index = np.where(stop_any, stop_matrix.argmax(axis=1), large) + target_first = target_any & (~stop_any | (target_index < stop_index)) + stop_first = stop_any & (~target_any | (stop_index <= target_index)) + timeout = ~(target_first | stop_first) + gross = np.where(target_first, target_bps, np.where(stop_first, -stop_bps, timeout_return)) + price_plan_net = gross - cost_bps + expected_net = max_achievable_gross - cost_bps + positive = expected_net >= min_expected_edge_bps + ambiguous = target_any & stop_any & (target_index == stop_index) + + rows: list[dict[str, Any]] = [] + for split_id in EVAL_SPLITS: + mask = usable & (split_values == split_id) + if not mask.any(): + continue + plan_id = f"h{horizon}_t{target_bps:g}_s{stop_bps:g}" + values = expected_net[mask] + plan_values = price_plan_net[mask] + target_rate = float(target_first[mask].mean()) + stop_rate = float(stop_first[mask].mean()) + timeout_rate = float(timeout[mask].mean()) + rows.append( + { + "plan_id": plan_id, + "symbol": symbol, + "split_id": split_id, + "side": side, + "horizon_minutes": horizon, + "target_bps": target_bps, + "stop_bps": stop_bps, + "cost_bps": cost_bps, + "positive_net_bps": target_bps - cost_bps, + "stop_net_bps": -stop_bps - cost_bps, + "rows": int(mask.sum()), + "target_hit_rate": target_rate, + "stop_hit_rate": stop_rate, + "timeout_rate": timeout_rate, + "ambiguous_rate": float(ambiguous[mask].mean()), + "positive_label_rate": float(positive[mask].mean()), + "avg_expected_net_edge_bps": float(values.mean()), + "median_expected_net_edge_bps": float(np.median(values)), + "p95_expected_net_edge_bps": float(np.quantile(values, 0.95)), + "avg_price_plan_net_edge_bps": float(plan_values.mean()), + "required_target_hit_rate": float((stop_bps + cost_bps) / (target_bps + stop_bps)), + "target_rate_margin": float(target_rate - ((stop_bps + cost_bps) / (target_bps + stop_bps))), + } + ) + return rows + + +def _plan_summary(rows: pd.DataFrame) -> pd.DataFrame: + group_cols = ["plan_id", "horizon_minutes", "target_bps", "stop_bps", "side"] + split_rows = rows.pivot_table( + index=group_cols, + columns="split_id", + values=["positive_label_rate", "avg_expected_net_edge_bps", "avg_price_plan_net_edge_bps", "target_rate_margin", "target_hit_rate", "stop_hit_rate"], + aggfunc="mean", + ) + split_rows.columns = [f"{metric}_{split}" for metric, split in split_rows.columns] + split_rows = split_rows.reset_index() + for split_id in EVAL_SPLITS: + for metric in ("positive_label_rate", "avg_expected_net_edge_bps", "avg_price_plan_net_edge_bps", "target_rate_margin", "target_hit_rate", "stop_hit_rate"): + column = f"{metric}_{split_id}" + if column not in split_rows.columns: + split_rows[column] = np.nan + split_rows["min_positive_label_rate_eval"] = split_rows[ + [f"positive_label_rate_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] + ].min(axis=1) + split_rows["max_positive_label_rate_eval"] = split_rows[ + [f"positive_label_rate_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] + ].max(axis=1) + split_rows["avg_edge_eval"] = split_rows[ + [f"avg_expected_net_edge_bps_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] + ].mean(axis=1) + split_rows["avg_price_plan_edge_eval"] = split_rows[ + [f"avg_price_plan_net_edge_bps_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] + ].mean(axis=1) + split_rows["min_margin_eval"] = split_rows[ + [f"target_rate_margin_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] + ].min(axis=1) + # The search score is not an上线门槛. It only chooses the next experiment: + # enough positive samples, less negative average edge, and stable behavior + # across tune/validation/stress. + positive_rate_penalty = ( + (0.08 - split_rows["min_positive_label_rate_eval"]).clip(lower=0.0) * 80.0 + + (split_rows["max_positive_label_rate_eval"] - 0.45).clip(lower=0.0) * 30.0 + ) + spread_bonus = np.log1p((split_rows["target_bps"] - split_rows["stop_bps"]).clip(lower=0.0)) + split_rows["score"] = ( + split_rows["avg_edge_eval"] + + split_rows["avg_price_plan_edge_eval"] * 0.5 + + split_rows["min_margin_eval"] * 20.0 + - positive_rate_penalty + + spread_bonus + ) + return split_rows.sort_values("score", ascending=False).reset_index(drop=True) + + +def _select_best_plan(summary: pd.DataFrame) -> dict[str, Any]: + candidates = summary[ + (summary["min_positive_label_rate_eval"] >= 0.08) + & (summary["max_positive_label_rate_eval"] <= 0.45) + & (summary["target_bps"] > summary["stop_bps"]) + ] + if candidates.empty: + candidates = summary[summary["target_bps"] > summary["stop_bps"]] + if candidates.empty: + candidates = summary + row = candidates.sort_values("score", ascending=False).iloc[0] + return { + "plan_id": str(row["plan_id"]), + "horizon_minutes": int(row["horizon_minutes"]), + "target_bps": float(row["target_bps"]), + "stop_bps": float(row["stop_bps"]), + "side": str(row["side"]), + "score": float(row["score"]), + "avg_edge_eval": float(row["avg_edge_eval"]), + "avg_price_plan_edge_eval": float(row["avg_price_plan_edge_eval"]), + "min_margin_eval": float(row["min_margin_eval"]), + "min_positive_label_rate_eval": float(row["min_positive_label_rate_eval"]), + "max_positive_label_rate_eval": float(row["max_positive_label_rate_eval"]), + } + + +def _markdown_report(payload: dict[str, Any], summary: pd.DataFrame) -> str: + top = summary.head(20) + lines = [ + "# Price Plan Search Report", + "", + f"- run_id: `{payload['run_id']}`", + f"- cost_bps: {payload['cost_bps']}", + f"- min_expected_net_edge_bps: {payload['min_expected_net_edge_bps']}", + f"- entry_label_method: `{payload['entry_label_method']}`", + f"- candidate_count: {payload['candidate_count']}", + "", + "## Best Plan For Next Experiment", + "", + "```json", + str(payload["best_plan"]).replace("'", '"'), + "```", + "", + "## Top Plans", + "", + _markdown_table(top), + "", + "说明:positive_label_rate 和 avg_expected_net_edge_bps 按“未来窗口最大可拿净收益”统计;target_hit_rate、stop_hit_rate、avg_price_plan_net_edge_bps 只用来检查固定止盈止损计划是否顺手。这里选的是下一轮实验用的价格计划,不是上线结论。真正能不能上线仍然看模型训练、PM 搜索、validation_locked 和 latest_stress 回测。", + "", + ] + return "\n".join(lines) + + +def _markdown_table(frame: pd.DataFrame) -> str: + if frame.empty: + return "无数据。" + columns = list(frame.columns) + lines = ["| " + " | ".join(columns) + " |", "| " + " | ".join("---" for _ in columns) + " |"] + for row in frame.to_dict("records"): + values = [] + for column in columns: + value = row.get(column, "") + if isinstance(value, float): + value = round(value, 6) + values.append(str(value)) + lines.append("| " + " | ".join(values) + " |") + return "\n".join(lines) + + +def _jsonable(value: Any) -> Any: + if isinstance(value, dict): + return {str(key): _jsonable(item) for key, item in value.items()} + if isinstance(value, list): + return [_jsonable(item) for item in value] + if isinstance(value, tuple): + return [_jsonable(item) for item in value] + if isinstance(value, (np.integer,)): + return int(value) + if isinstance(value, (np.floating,)): + return float(value) + return value diff --git a/training/trader_training/replay.py b/training/trader_training/replay.py index 537b4d4..2877314 100644 --- a/training/trader_training/replay.py +++ b/training/trader_training/replay.py @@ -275,107 +275,161 @@ def _asof_column( return merged -def build_replay_1m(args: Any) -> None: - root = run_root(args) - raw_root = args.raw_root or DEFAULT_RAW_ROOT - logging.info("trader.training.replay_started runId=%s symbol=%s rawRoot=%s", args.run_id, args.symbol, raw_root) - replay = _read_candles(raw_root, args.symbol, args.start_date, args.end_date) - trades = _read_trades(raw_root, args.symbol, args.start_date, args.end_date) - level1 = _read_level1(raw_root, args.symbol, args.start_date, args.end_date) - liquidations = _read_liquidations(raw_root, args.symbol, args.start_date, args.end_date) +REPLAY_REQUIRED_COLUMNS = [ + "open", + "high", + "low", + "close", + "volume", + "best_bid_price", + "best_ask_price", + "spread_bps", + "level1_ofi_1m", + "funding_bps", + "mark_price", + "index_price", + "open_interest", +] + +REPLAY_OUTPUT_COLUMNS = [ + "symbol", + "timeframe", + "event_time", + "open_time_ms", + "open", + "high", + "low", + "close", + "volume", + "taker_buy_volume", + "taker_sell_volume", + "funding_bps", + "mark_price", + "index_price", + "next_funding_time", + "open_interest", + "best_bid_price", + "best_ask_price", + "spread_bps", + "level1_ofi_1m", + "liquidation_buy_notional_1m", + "liquidation_sell_notional_1m", + "liquidation_available", + "source_coverage", +] + + +def _replay_date_texts(raw_root: Path, symbol: str, start_date: str | None, end_date: str | None) -> list[str]: + if start_date and end_date: + return [day.strftime("%Y-%m-%d") for day in pd.date_range(pd.Timestamp(start_date), pd.Timestamp(end_date), freq="D")] + files = partition_files(raw_root, "candles", symbol, start_date, end_date) + dates = sorted({next((part.split("=", 1)[1] for part in file.parts if part.startswith("dt=")), "") for file in files}) + return [date for date in dates if date] + + +def _previous_date_text(day: str) -> str: + return (pd.Timestamp(day) - pd.Timedelta(days=1)).strftime("%Y-%m-%d") + + +def _build_replay_day(raw_root: Path, symbol: str, day: str) -> pd.DataFrame: + replay = _read_candles(raw_root, symbol, day, day) + replay = replay[replay["event_time"].dt.strftime("%Y-%m-%d").eq(day)].copy() + trades = _read_trades(raw_root, symbol, day, day) + level1 = _read_level1(raw_root, symbol, day, day) + liquidations = _read_liquidations(raw_root, symbol, day, day) replay = replay.merge(trades, on=["symbol", "event_time", "open_time_ms"], how="left") replay = replay.merge(level1, on=["symbol", "event_time", "open_time_ms"], how="left") replay = replay.merge(liquidations, on=["symbol", "event_time", "open_time_ms"], how="left") replay[["taker_buy_volume", "taker_sell_volume"]] = replay[["taker_buy_volume", "taker_sell_volume"]].fillna(0.0) for column in ("liquidation_buy_notional_1m", "liquidation_sell_notional_1m", "liquidation_available"): - replay[column] = replay[column].fillna(0.0) + replay[column] = pd.to_numeric(replay[column], errors="coerce").fillna(0.0) - funding = _asof_column(replay, raw_root, "funding", args.symbol, args.start_date, args.end_date, ("rate", "mark_price", "index_price", "next_funding_time")) + # Funding and open interest are as-of values. Include the previous UTC day so + # the first minutes of a day can use the last known value without reading the + # whole training window into memory. + lookback_start = _previous_date_text(day) + funding = _asof_column(replay, raw_root, "funding", symbol, lookback_start, day, ("rate", "mark_price", "index_price", "next_funding_time")) funding = funding.rename(columns={"rate": "funding_rate"}) funding["funding_bps"] = pd.to_numeric(funding["funding_rate"], errors="coerce") * 10000.0 replay = replay.merge(funding.drop(columns=["funding_rate"]), on=["symbol", "event_time"], how="left") replay["next_funding_time"] = to_utc_series(replay["next_funding_time"]) - oi = _asof_column(replay, raw_root, "open_interest", args.symbol, args.start_date, args.end_date, ("open_interest",)) + oi = _asof_column(replay, raw_root, "open_interest", symbol, lookback_start, day, ("open_interest",)) replay = replay.merge(oi, on=["symbol", "event_time"], how="left") replay["timeframe"] = "1m" replay["source_coverage"] = "crypto_lake_raw" - - required = [ - "open", - "high", - "low", - "close", - "volume", - "best_bid_price", - "best_ask_price", - "spread_bps", - "level1_ofi_1m", - "funding_bps", - "mark_price", - "index_price", - "open_interest", - ] replay["event_date"] = replay["event_time"].dt.strftime("%Y-%m-%d") - missing_required = replay[required].isna().any(axis=1) - day_quality = ( - replay.assign(missing_required=missing_required.astype(int)) - .groupby("event_date", as_index=False, observed=True) - .agg(row_count=("event_time", "count"), missing_required_rows=("missing_required", "sum")) - ) - day_quality["ready"] = (day_quality["row_count"] >= int(args.min_minutes_per_day)) & day_quality["missing_required_rows"].eq(0) - ready_days = sorted(day_quality.loc[day_quality["ready"], "event_date"].astype(str).tolist()) - excluded_days = [ - { - "date": row.event_date, - "row_count": int(row.row_count), - "missing_required_rows": int(row.missing_required_rows), - "reason": "MISSING_REQUIRED_MARKET_FIELDS" if int(row.missing_required_rows) else "INCOMPLETE_MINUTE_COUNT", - } - for row in day_quality.loc[~day_quality["ready"]].itertuples(index=False) - ] + return replay + + +def build_replay_1m(args: Any) -> None: + root = run_root(args) + raw_root = args.raw_root or DEFAULT_RAW_ROOT + logging.info("trader.training.replay_started runId=%s symbol=%s rawRoot=%s", args.run_id, args.symbol, raw_root) + dates = _replay_date_texts(raw_root, args.symbol, args.start_date, args.end_date) + if not dates: + raise ValueError("no candle dates are available for replay_1m") + + ready_days: list[str] = [] + excluded_days: list[dict[str, Any]] = [] + ready_frames: list[pd.DataFrame] = [] + row_before_filter = 0 + for index, day in enumerate(dates, start=1): + logging.info("trader.training.replay_day_started runId=%s day=%s index=%s total=%s", args.run_id, day, index, len(dates)) + try: + day_replay = _build_replay_day(raw_root, args.symbol, day) + except Exception as exc: + excluded_days.append( + { + "date": day, + "row_count": 0, + "missing_required_rows": 0, + "reason": "DAY_BUILD_FAILED", + "error": str(exc), + } + ) + logging.warning("trader.training.replay_day_failed runId=%s day=%s error=%s", args.run_id, day, exc) + continue + + row_count = len(day_replay) + row_before_filter += row_count + missing_required_rows = int(day_replay[REPLAY_REQUIRED_COLUMNS].isna().any(axis=1).sum()) + ready = row_count >= int(args.min_minutes_per_day) and missing_required_rows == 0 + if ready: + ready_days.append(day) + ready_frames.append(day_replay[REPLAY_OUTPUT_COLUMNS].copy()) + else: + excluded_days.append( + { + "date": day, + "row_count": int(row_count), + "missing_required_rows": missing_required_rows, + "reason": "MISSING_REQUIRED_MARKET_FIELDS" if missing_required_rows else "INCOMPLETE_MINUTE_COUNT", + } + ) + logging.info( + "trader.training.replay_day_finished runId=%s day=%s ready=%s rows=%s missingRequiredRows=%s", + args.run_id, + day, + ready, + row_count, + missing_required_rows, + ) + if len(ready_days) < int(args.min_replay_ready_days): write_json(root / "replay" / "excluded_days.json", excluded_days) write_text(root / "replay" / "replay_ready_days.txt", "\n".join(ready_days) + ("\n" if ready_days else "")) raise ValueError(f"replay_1m has only {len(ready_days)} replay-ready days, required {args.min_replay_ready_days}") - before_filter = len(replay) - replay = replay[replay["event_date"].isin(ready_days)].copy() + replay = pd.concat(ready_frames, ignore_index=True) logging.info( "trader.training.replay_ready_days_selected runId=%s readyDays=%s excludedDays=%s rowBefore=%s rowAfter=%s", args.run_id, len(ready_days), len(excluded_days), - before_filter, + row_before_filter, len(replay), ) - - columns = [ - "symbol", - "timeframe", - "event_time", - "open_time_ms", - "open", - "high", - "low", - "close", - "volume", - "taker_buy_volume", - "taker_sell_volume", - "funding_bps", - "mark_price", - "index_price", - "next_funding_time", - "open_interest", - "best_bid_price", - "best_ask_price", - "spread_bps", - "level1_ofi_1m", - "liquidation_buy_notional_1m", - "liquidation_sell_notional_1m", - "liquidation_available", - "source_coverage", - ] - replay = replay[columns].sort_values(["symbol", "event_time"]).reset_index(drop=True) + replay = replay[REPLAY_OUTPUT_COLUMNS].sort_values(["symbol", "event_time"]).reset_index(drop=True) path = root / "replay" / "replay_1m.parquet" data_hash = write_parquet(path, replay) write_json( diff --git a/training/trader_training/schemas.py b/training/trader_training/schemas.py index 8a7ff2c..14bfaaa 100644 --- a/training/trader_training/schemas.py +++ b/training/trader_training/schemas.py @@ -4,8 +4,8 @@ from dataclasses import dataclass from typing import Any -FEATURE_VERSION = "feature-v4-p0" -LABEL_VERSION = "label-v4-p0" +FEATURE_VERSION = "feature-v4-p2-book-cross" +LABEL_VERSION = "label-v4-p1-max-edge" SPLIT_VERSION = "split-v4-p0" MODEL_BUNDLE_VERSION = "trader-v4-btc-p0" CALIBRATION_BUNDLE_VERSION = "cal-v4-btc-p0" @@ -96,6 +96,21 @@ FEATURES: tuple[FeatureDef, ...] = ( FeatureDef(37, "minute_of_day_sin", "日内时间正弦", "Time of day cyclic feature.", ("event_time",), "sin(2*pi*minute_of_day/1440)", "event_time", "ratio", "float32", "never null", True, "event timestamp only", ("Direction", "Entry", "Risk")), FeatureDef(38, "minute_of_day_cos", "日内时间余弦", "Time of day cyclic feature.", ("event_time",), "cos(2*pi*minute_of_day/1440)", "event_time", "ratio", "float32", "never null", True, "event timestamp only", ("Direction", "Entry", "Risk")), FeatureDef(39, "minutes_to_next_funding", "距离下次资金费分钟", "Minutes to next funding settlement.", ("funding", "replay_1m"), "clip((next_funding_time - event_time) / 60000, 0, 480)", "as-of", "minute", "float32", "as-of > 12h -> fail", True, "backward as-of only", ("Entry", "Continue", "Risk")), + FeatureDef(40, "book_top_imbalance", "盘口一档强弱", "Top bid/ask size imbalance.", ("book",), "(bid_0_size - ask_0_size) / max(bid_0_size + ask_0_size, eps)", "last book snapshot in 1m", "ratio", "float32", "missing book -> WARMUP", True, "uses last book snapshot within current closed minute only", ("Direction", "Entry", "Risk")), + FeatureDef(41, "book_microprice_basis_bps", "微价格偏离", "Microprice distance from mid price.", ("book",), "((bid_0_price * ask_0_size + ask_0_price * bid_0_size) / max(bid_0_size + ask_0_size, eps) / mid - 1) * 10000", "last book snapshot in 1m", "bps", "float32", "missing book -> WARMUP", True, "uses last book snapshot within current closed minute only", ("Direction", "Entry", "Risk")), + FeatureDef(42, "book_bid_depth_l5_quote", "前5档买盘深度", "Bid notional in levels 0..4.", ("book",), "sum(bid_i_price * bid_i_size, i=0..4)", "last book snapshot in 1m", "quote", "float32", "missing book -> WARMUP", True, "uses last book snapshot within current closed minute only", ("Entry", "Risk")), + FeatureDef(43, "book_ask_depth_l5_quote", "前5档卖盘深度", "Ask notional in levels 0..4.", ("book",), "sum(ask_i_price * ask_i_size, i=0..4)", "last book snapshot in 1m", "quote", "float32", "missing book -> WARMUP", True, "uses last book snapshot within current closed minute only", ("Entry", "Risk")), + FeatureDef(44, "book_depth_imbalance_l5", "前5档盘口深度差", "Bid/ask notional imbalance in levels 0..4.", ("book",), "(bid_depth_l5 - ask_depth_l5) / max(bid_depth_l5 + ask_depth_l5, eps)", "last book snapshot in 1m", "ratio", "float32", "missing book -> WARMUP", True, "uses last book snapshot within current closed minute only", ("Direction", "Entry", "Risk")), + FeatureDef(45, "book_depth_imbalance_l20", "前20档盘口深度差", "Bid/ask notional imbalance in levels 0..19.", ("book",), "(bid_depth_l20 - ask_depth_l20) / max(bid_depth_l20 + ask_depth_l20, eps)", "last book snapshot in 1m", "ratio", "float32", "missing book -> WARMUP", True, "uses last book snapshot within current closed minute only", ("Direction", "Entry", "Risk")), + FeatureDef(46, "book_depth_concentration_l5_l20", "前5档深度集中度", "How much total book notional is concentrated in top five levels.", ("book",), "(bid_depth_l5 + ask_depth_l5) / max(bid_depth_l20 + ask_depth_l20, eps)", "last book snapshot in 1m", "ratio", "float32", "missing book -> WARMUP", True, "uses last book snapshot within current closed minute only", ("Entry", "Risk")), + FeatureDef(47, "book_pressure_spread_ratio", "盘口压力相对价差", "Microprice pressure normalized by current spread.", ("book", "level_1"), "book_microprice_basis_bps / max(abs(spread_bps), 0.01)", "1m", "ratio", "float32", "missing dependency -> WARMUP", True, "uses <= t book and spread only", ("Direction", "Entry", "Risk")), + FeatureDef(48, "book_pressure_taker_1m", "盘口压力和1分钟主动成交共振", "Microprice pressure confirmed by current taker imbalance.", ("book", "trades"), "book_microprice_basis_bps * taker_imbalance_1m", "1m", "bps", "float32", "missing dependency -> WARMUP", True, "uses <= t book and trades only", ("Direction", "Entry")), + FeatureDef(49, "book_pressure_taker_5m", "盘口压力和5分钟主动成交共振", "Microprice pressure confirmed by short taker imbalance.", ("book", "trades"), "book_microprice_basis_bps * taker_imbalance_5m", "5m", "bps", "float32", "missing dependency -> WARMUP", True, "uses <= t book and trades only", ("Direction", "Entry")), + FeatureDef(50, "book_l20_imbalance_taker_15m", "20档深度和15分钟主动成交共振", "Deep book imbalance confirmed by near taker flow.", ("book", "trades"), "book_depth_imbalance_l20 * taker_imbalance_15m", "15m", "ratio", "float32", "missing dependency -> WARMUP", True, "uses <= t book and trades only", ("Direction", "Entry")), + FeatureDef(51, "book_l20_imbalance_ret_15m", "20档深度和15分钟走势关系", "Deep book pressure interacting with near return.", ("book", "replay_1m"), "book_depth_imbalance_l20 * ret_15m_bps", "15m", "bps", "float32", "missing dependency -> WARMUP", True, "uses <= t book and returns only", ("Direction", "Entry")), + FeatureDef(52, "book_pressure_vol_adjusted", "波动调整后的盘口压力", "Microprice pressure divided by near realized volatility.", ("book", "replay_1m"), "book_microprice_basis_bps / max(realized_vol_15m_bps, 1)", "15m", "ratio", "float32", "missing dependency -> WARMUP", True, "uses <= t book and volatility only", ("Direction", "Entry", "Risk")), + FeatureDef(53, "book_depth_pressure_gap", "近档和深档压力差", "Top five depth imbalance minus top twenty depth imbalance.", ("book",), "book_depth_imbalance_l5 - book_depth_imbalance_l20", "last book snapshot in 1m", "ratio", "float32", "missing book -> WARMUP", True, "uses last book snapshot within current closed minute only", ("Entry", "Risk")), + FeatureDef(54, "book_pressure_reversal_15m", "盘口压力和15分钟反转关系", "Positive when book pressure leans against the recent move.", ("book", "replay_1m"), "-book_microprice_basis_bps * ret_15m_bps", "15m", "bps^2", "float32", "missing dependency -> WARMUP", True, "uses <= t book and returns only", ("Direction", "Entry")), ) diff --git a/training/trader_training/training.py b/training/trader_training/training.py index a10fedf..60a1613 100644 --- a/training/trader_training/training.py +++ b/training/trader_training/training.py @@ -8,7 +8,7 @@ from typing import Any import numpy as np import pandas as pd -from sklearn.linear_model import LogisticRegression, Ridge +from sklearn.linear_model import HuberRegressor, LogisticRegression from sklearn.metrics import accuracy_score, log_loss, mean_absolute_error, roc_auc_score from sklearn.preprocessing import StandardScaler @@ -217,7 +217,7 @@ def _fit_head(item, x_train, x_tune, train: pd.DataFrame, tune: pd.DataFrame, sc if kind == "regression": y_train = pd.to_numeric(train[target], errors="coerce").fillna(0.0).to_numpy() y_val = pd.to_numeric(tune[target], errors="coerce").fillna(0.0).to_numpy() - model = Ridge(alpha=1.0) + model = HuberRegressor(alpha=0.001, epsilon=1.35, max_iter=500) model.fit(x_train, y_train) pred = model.predict(x_tune) weight, bias = _fold_scaler(model.coef_.reshape(1, -1).T, np.array([model.intercept_]), scaler) @@ -297,10 +297,15 @@ def _binary_metrics(y_train: np.ndarray, y_val: np.ndarray, proba: np.ndarray) - def _regression_metrics(y_train: np.ndarray, y_val: np.ndarray, pred: np.ndarray) -> dict[str, Any]: mae = float(mean_absolute_error(y_val, pred)) train_std = float(np.std(y_train)) + train_median = float(np.median(y_train)) if len(y_train) else 0.0 + constant_mae = float(mean_absolute_error(y_val, np.full(len(y_val), train_median))) if len(y_val) else 0.0 metrics: dict[str, Any] = { "mae": mae, + "constant_mae": constant_mae, + "train_target_median": train_median, "train_target_std": train_std, "mae_vs_train_std_ratio": float(mae / train_std) if train_std > 0 else None, + "mae_vs_constant_ratio": float(mae / constant_mae) if constant_mae > 0 else None, } return _with_quality(metrics) @@ -314,8 +319,8 @@ def _with_quality(metrics: dict[str, Any]) -> dict[str, Any]: reasons.append("brier_not_better_than_constant") if "brier_multiclass" in metrics and metrics["brier_multiclass"] >= metrics["constant_brier_multiclass"]: reasons.append("brier_not_better_than_constant") - if "mae" in metrics and metrics.get("train_target_std") is not None and metrics["train_target_std"] > 0 and metrics["mae"] > metrics["train_target_std"]: - reasons.append("mae_above_train_target_std") + if "mae" in metrics and metrics.get("constant_mae") is not None and metrics["constant_mae"] > 0 and metrics["mae"] >= metrics["constant_mae"]: + reasons.append("mae_not_better_than_constant") if "top10_hit_rate" in metrics and "all_hit_rate" in metrics and metrics["top10_hit_rate"] <= metrics["all_hit_rate"]: reasons.append("top10_not_better_than_all") metrics["quality_status"] = "REJECTED" if reasons else "PASS" @@ -360,7 +365,7 @@ def _predict_frame(frame: pd.DataFrame, results: list[HeadResult], include_label for idx, field in enumerate(MODEL_OUTPUTS["DIRECTION"]): out[field] = values[:, idx] elif result.kind == "sigmoid": - out[result.field] = (1.0 / (1.0 + np.exp(-values))).reshape(-1) + out[result.field] = _sigmoid(values).reshape(-1) else: out[result.field] = values.reshape(-1) if include_labels and result.kind != "softmax" and result.target_name and result.target_name in frame.columns: @@ -374,6 +379,11 @@ def _softmax(values: np.ndarray) -> np.ndarray: return exp / exp.sum(axis=1, keepdims=True) +def _sigmoid(values: np.ndarray) -> np.ndarray: + clipped = np.clip(values, -50.0, 50.0) + return 1.0 / (1.0 + np.exp(-clipped)) + + def _write_training_report(path: Path, model_name: str, metrics: dict[str, Any], quality_status: str, quality_reasons: list[str]) -> None: lines = [ "# Trader Model Training Report", diff --git a/training/trader_training/validator.py b/training/trader_training/validator.py index 2ddeb1b..fd9bf0a 100644 --- a/training/trader_training/validator.py +++ b/training/trader_training/validator.py @@ -52,7 +52,7 @@ def validate_artifact_bundle(args: Any) -> None: def _validate_content(root: Path, errors: list[str], require_active: bool, run_onnx: bool) -> None: feature_order = read_json(root / "schemas/feature_order.json") if feature_order != FEATURE_ORDER: - errors.append("feature_order.json does not match V4 39-feature order") + errors.append(f"feature_order.json does not match V4 {len(FEATURE_ORDER)}-feature order") model_bundle = read_json(root / "manifests/model_bundle_manifest.json") if require_active and model_bundle.get("status") != "ACTIVE": errors.append("model_bundle_manifest.status must be ACTIVE for Java SHADOW") @@ -70,8 +70,8 @@ def _validate_content(root: Path, errors: list[str], require_active: bool, run_o errors.append(f"{model_type} model_format must be ONNX") if item.get("input_tensor_name") != "features": errors.append(f"{model_type} input tensor must be features") - if item.get("input_shape_json", {}).get("features") != 39: - errors.append(f"{model_type} input_shape_json.features must be 39") + if item.get("input_shape_json", {}).get("features") != len(FEATURE_ORDER): + errors.append(f"{model_type} input_shape_json.features must be {len(FEATURE_ORDER)}") if item.get("onnx_opset_version") != 17: errors.append(f"{model_type} opset must be 17") if item.get("output_mapping_json") != OUTPUT_MAPPING.get(model_type):