From 0323fb3caf6972d2e61839fce03d0269cb95eca7 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 28 Jun 2026 08:33:49 +0800 Subject: [PATCH] Add conditional Entry training probe --- .../27_probe_conditional_entry_training.py | 34 ++ training/tests/test_training_contract.py | 45 +++ .../conditional_entry_probe.py | 337 ++++++++++++++++++ 3 files changed, 416 insertions(+) create mode 100644 training/scripts/27_probe_conditional_entry_training.py create mode 100644 training/trader_training/conditional_entry_probe.py diff --git a/training/scripts/27_probe_conditional_entry_training.py b/training/scripts/27_probe_conditional_entry_training.py new file mode 100644 index 0000000..cb6d0e0 --- /dev/null +++ b/training/scripts/27_probe_conditional_entry_training.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import argparse + +import _bootstrap # noqa: F401 +from trader_training.conditional_entry_probe import probe_conditional_entry_training +from trader_training.io_utils import add_common_args, setup_logging + + +def _float_tuple(value: str) -> tuple[float, ...]: + return tuple(float(item.strip()) for item in value.split(",") if item.strip()) + + +def _str_tuple(value: str) -> tuple[str, ...]: + return tuple(item.strip() for item in value.split(",") if item.strip()) + + +def main() -> None: + parser = argparse.ArgumentParser() + add_common_args(parser) + parser.add_argument("--condition-opportunity-bps", type=_float_tuple, default=(6.0, 12.0, 20.0, 40.0, 60.0)) + parser.add_argument("--target-edge-bps", type=_float_tuple, default=(0.0, 3.0)) + parser.add_argument("--model-families", type=_str_tuple, default=("linear", "tree")) + parser.add_argument("--top-fractions", type=_float_tuple, default=(0.01, 0.02, 0.05, 0.10)) + parser.add_argument("--max-train-rows", type=int, default=0) + parser.add_argument("--min-train-rows", type=int, default=1000) + parser.add_argument("--min-eval-rows", type=int, default=500) + args = parser.parse_args() + setup_logging() + probe_conditional_entry_training(args) + + +if __name__ == "__main__": + main() diff --git a/training/tests/test_training_contract.py b/training/tests/test_training_contract.py index 37bfc55..41b3659 100644 --- a/training/tests/test_training_contract.py +++ b/training/tests/test_training_contract.py @@ -14,6 +14,7 @@ if str(TRAINING_ROOT) not in sys.path: sys.path.insert(0, str(TRAINING_ROOT)) from trader_training.onnx_export import LinearHead, export_heads +from trader_training.conditional_entry_probe import probe_conditional_entry_training from trader_training.dynamic_exit_search import search_dynamic_exit_plans from trader_training.entry_condition_pair_screen import screen_entry_condition_pairs from trader_training.entry_feature_screen import _bucket_edges, _screen_edge_column @@ -163,6 +164,50 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual("LONG", best["side"]) self.assertTrue(bool(best["stable_top_edge_positive"])) + def test_conditional_entry_probe_finds_positive_oracle_direction_subset(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + data_root = Path(tmp) + run_root = data_root / "trader-v4" / "runs" / "unit-conditional-entry" + dataset_path = run_root / "dataset" / "entry_train.parquet" + dataset_path.parent.mkdir(parents=True) + + frames = [] + row_count = 900 + base_feature_values = np.linspace(0.0, 0.999, row_count) + for split_id in TRAINING_SPLITS: + frame = pd.DataFrame({feature: 0.0 for feature in FEATURE_ORDER}, index=np.arange(row_count)) + frame["split_id"] = split_id + frame["ret_1m_bps"] = base_feature_values + good_mask = frame["ret_1m_bps"] > 0.85 + opportunity_mask = frame["ret_1m_bps"] > 0.50 + frame["long_actual_plan_net_edge_bps"] = np.where(good_mask, 10.0, -6.0) + frame["short_actual_plan_net_edge_bps"] = -6.0 + frame["long_max_achievable_net_edge_bps"] = np.where(opportunity_mask, 40.0, 2.0) + frame["short_max_achievable_net_edge_bps"] = 2.0 + frames.append(frame) + pd.concat(frames, ignore_index=True).to_parquet(dataset_path, index=False) + + probe_conditional_entry_training( + Namespace( + data_root=data_root, + run_id="unit-conditional-entry", + condition_opportunity_bps=(20.0,), + target_edge_bps=(0.0,), + model_families=("linear",), + top_fractions=(0.10,), + max_train_rows=0, + min_train_rows=50, + min_eval_rows=50, + ) + ) + + result = read_json(run_root / "diagnostics" / "conditional_entry_probe_result.json") + candidates = pd.read_csv(run_root / "diagnostics" / "conditional_entry_probe_candidates.csv") + + self.assertGreater(result["stable_positive_count"], 0) + self.assertTrue(candidates.iloc[0]["stable_positive"]) + self.assertGreater(float(candidates.iloc[0]["min_top_edge_bps"]), 0.0) + def test_dynamic_exit_search_writes_plan_diagnostics(self) -> None: with tempfile.TemporaryDirectory() as tmp: data_root = Path(tmp) diff --git a/training/trader_training/conditional_entry_probe.py b/training/trader_training/conditional_entry_probe.py new file mode 100644 index 0000000..66be4f8 --- /dev/null +++ b/training/trader_training/conditional_entry_probe.py @@ -0,0 +1,337 @@ +from __future__ import annotations + +import logging +from typing import Any + +import numpy as np +import pandas as pd +from sklearn.ensemble import HistGradientBoostingClassifier +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import brier_score_loss, roc_auc_score +from sklearn.preprocessing import StandardScaler + +from trader_training.entry_feature_screen import _markdown_table +from trader_training.io_utils import read_parquet, run_root, write_json, write_text +from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT + + +EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) + + +def probe_conditional_entry_training(args: Any) -> None: + root = run_root(args) + dataset = read_parquet(root / "dataset" / "entry_train.parquet") + _require_columns(dataset) + + condition_opportunities = tuple(float(item) for item in (args.condition_opportunity_bps or (6.0, 12.0, 20.0, 40.0, 60.0))) + target_edges = tuple(float(item) for item in (args.target_edge_bps or (0.0, 3.0))) + model_families = tuple(str(item).strip().lower() for item in (args.model_families or ("linear", "tree")) if str(item).strip()) + top_fractions = tuple(float(item) for item in (args.top_fractions or (0.01, 0.02, 0.05, 0.10))) + max_train_rows = int(args.max_train_rows or 0) + + rows: list[dict[str, Any]] = [] + skipped: list[dict[str, Any]] = [] + for side in ("LONG", "SHORT"): + prefix = side.lower() + actual_edge_col = f"{prefix}_actual_plan_net_edge_bps" + opportunity_col = f"{prefix}_max_achievable_net_edge_bps" + for condition_opportunity_bps in condition_opportunities: + fit_condition = dataset["split_id"].eq(FIT_SPLIT) & (pd.to_numeric(dataset[opportunity_col], errors="coerce") >= condition_opportunity_bps) + fit_frame = dataset.loc[fit_condition].copy() + if max_train_rows > 0 and len(fit_frame) > max_train_rows: + fit_frame = fit_frame.sort_values("event_time").tail(max_train_rows).copy() if "event_time" in fit_frame.columns else fit_frame.tail(max_train_rows).copy() + if len(fit_frame) < int(args.min_train_rows or 1000): + skipped.append( + { + "side": side, + "condition_opportunity_bps": condition_opportunity_bps, + "reason": "NOT_ENOUGH_TRAIN_ROWS", + "train_rows": int(len(fit_frame)), + } + ) + continue + x_train = _x(fit_frame) + for target_edge_bps in target_edges: + y_train = (pd.to_numeric(fit_frame[actual_edge_col], errors="coerce") >= target_edge_bps).astype(int).to_numpy() + if len(np.unique(y_train)) < 2: + skipped.append( + { + "side": side, + "condition_opportunity_bps": condition_opportunity_bps, + "target_edge_bps": target_edge_bps, + "reason": "ONE_CLASS_TRAIN", + "train_rows": int(len(fit_frame)), + "train_positive_rate": float(y_train.mean()) if len(y_train) else 0.0, + } + ) + continue + for model_family in model_families: + model, scaler = _fit_model(model_family, x_train, y_train) + for split_id in EVAL_SPLITS: + eval_condition = dataset["split_id"].eq(split_id) & (pd.to_numeric(dataset[opportunity_col], errors="coerce") >= condition_opportunity_bps) + eval_frame = dataset.loc[eval_condition].copy() + if len(eval_frame) < int(args.min_eval_rows or 500): + continue + y_true = (pd.to_numeric(eval_frame[actual_edge_col], errors="coerce") >= target_edge_bps).astype(int).to_numpy() + proba = _predict(model_family, model, scaler, _x(eval_frame)) + for top_fraction in top_fractions: + rows.append( + _metric_row( + eval_frame, + y_true, + proba, + side, + model_family, + split_id, + condition_opportunity_bps, + target_edge_bps, + top_fraction, + actual_edge_col, + float(y_train.mean()), + len(fit_frame), + ) + ) + logging.info( + "trader.training.conditional_entry_probe_fitted side=%s conditionOpportunityBps=%s targetEdgeBps=%s modelFamily=%s trainRows=%s trainPositiveRate=%.6f", + side, + condition_opportunity_bps, + target_edge_bps, + model_family, + len(fit_frame), + float(y_train.mean()), + ) + + metrics = pd.DataFrame(rows) + candidates = _select_candidates(metrics) + result = { + "run_id": args.run_id, + "purpose": "diagnostic_only_not_exported", + "warning": "condition_opportunity_bps is an oracle future filter; use this only to decide whether conditional Entry training is worth implementing", + "feature_count": len(FEATURE_ORDER), + "condition_opportunity_bps": list(condition_opportunities), + "target_edge_bps": list(target_edges), + "model_families": list(model_families), + "top_fractions": list(top_fractions), + "max_train_rows": max_train_rows, + "metric_count": int(len(metrics)), + "candidate_count": int(len(candidates)), + "stable_positive_count": int(candidates["stable_positive"].sum()) if not candidates.empty else 0, + "skipped": skipped, + } + out_dir = root / "diagnostics" + write_json(out_dir / "conditional_entry_probe_result.json", result) + write_text(out_dir / "conditional_entry_probe_metrics.csv", metrics.to_csv(index=False)) + write_text(out_dir / "conditional_entry_probe_candidates.csv", candidates.to_csv(index=False)) + write_text(out_dir / "conditional_entry_probe_report.md", _markdown_report(result, candidates)) + logging.info( + "trader.training.conditional_entry_probe_written runId=%s metricCount=%s candidateCount=%s stablePositiveCount=%s reportPath=%s", + args.run_id, + len(metrics), + len(candidates), + result["stable_positive_count"], + out_dir / "conditional_entry_probe_report.md", + ) + + +def _require_columns(dataset: pd.DataFrame) -> None: + required = {"split_id", *FEATURE_ORDER} + for side in ("long", "short"): + required.update({f"{side}_actual_plan_net_edge_bps", f"{side}_max_achievable_net_edge_bps"}) + missing = sorted(required.difference(dataset.columns)) + if missing: + raise ValueError(f"conditional entry probe missing required columns: {missing}") + + +def _x(frame: pd.DataFrame) -> np.ndarray: + values = frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32") + if values.isna().any().any(): + missing = values.columns[values.isna().any()].tolist() + raise ValueError(f"conditional entry probe found non-finite feature values: {missing}") + return values.to_numpy(dtype="float32") + + +def _fit_model(model_family: str, x_train: np.ndarray, y_train: np.ndarray) -> tuple[Any, StandardScaler | None]: + if model_family == "linear": + scaler = StandardScaler() + x_scaled = scaler.fit_transform(x_train) + model = LogisticRegression(max_iter=500, class_weight="balanced") + model.fit(x_scaled, y_train) + return model, scaler + if model_family == "tree": + model = HistGradientBoostingClassifier( + max_iter=120, + learning_rate=0.04, + max_leaf_nodes=31, + l2_regularization=0.02, + early_stopping=True, + random_state=31, + ) + model.fit(x_train, y_train) + return model, None + raise ValueError(f"unsupported model family: {model_family}") + + +def _predict(model_family: str, model: Any, scaler: StandardScaler | None, x: np.ndarray) -> np.ndarray: + if model_family == "linear": + if scaler is None: + raise ValueError("linear model missing scaler") + return model.predict_proba(scaler.transform(x))[:, 1] + return model.predict_proba(x)[:, 1] + + +def _metric_row( + frame: pd.DataFrame, + y_true: np.ndarray, + proba: np.ndarray, + side: str, + model_family: str, + split_id: str, + condition_opportunity_bps: float, + target_edge_bps: float, + top_fraction: float, + actual_edge_col: str, + train_positive_rate: float, + train_rows: int, +) -> dict[str, Any]: + order = np.argsort(-proba) + top_n = max(1, int(len(frame) * top_fraction)) + top = frame.iloc[order[:top_n]] + constant = np.full(len(y_true), np.clip(train_positive_rate, 1e-6, 1 - 1e-6)) + row: dict[str, Any] = { + "side": side, + "model_family": model_family, + "split_id": split_id, + "condition_opportunity_bps": condition_opportunity_bps, + "target_edge_bps": target_edge_bps, + "top_fraction": top_fraction, + "train_rows": int(train_rows), + "train_positive_rate": train_positive_rate, + "row_count": int(len(frame)), + "positive_rate": float(y_true.mean()) if len(y_true) else 0.0, + "brier": float(brier_score_loss(y_true, proba)) if len(y_true) else 0.0, + "constant_brier": float(brier_score_loss(y_true, constant)) if len(y_true) else 0.0, + "top_rows": int(len(top)), + "top_positive_rate": float((top[actual_edge_col] >= target_edge_bps).mean()), + "all_actual_edge_bps": float(frame[actual_edge_col].mean()), + "top_actual_edge_bps": float(top[actual_edge_col].mean()), + "top_probability_min": float(proba[order[:top_n]].min()) if len(proba) else 0.0, + "top_probability_max": float(proba[order[:top_n]].max()) if len(proba) else 0.0, + } + row["auc"] = float(roc_auc_score(y_true, proba)) if len(np.unique(y_true)) == 2 else np.nan + row["top_edge_lift_bps"] = row["top_actual_edge_bps"] - row["all_actual_edge_bps"] + row["brier_beats_constant"] = bool(row["brier"] < row["constant_brier"]) + return row + + +def _select_candidates(metrics: pd.DataFrame) -> pd.DataFrame: + if metrics.empty: + return pd.DataFrame() + key_columns = ["side", "model_family", "condition_opportunity_bps", "target_edge_bps", "top_fraction"] + tune = metrics[metrics["split_id"].eq(TUNE_SPLIT)].copy() + candidates = tune[ + key_columns + + [ + "train_rows", + "train_positive_rate", + "row_count", + "positive_rate", + "auc", + "brier_beats_constant", + "top_rows", + "top_positive_rate", + "all_actual_edge_bps", + "top_actual_edge_bps", + "top_edge_lift_bps", + ] + ].rename( + columns={ + "row_count": "tune_rows", + "positive_rate": "tune_positive_rate", + "auc": "tune_auc", + "brier_beats_constant": "tune_brier_beats_constant", + "top_rows": "tune_top_rows", + "top_positive_rate": "tune_top_positive_rate", + "all_actual_edge_bps": "tune_all_actual_edge_bps", + "top_actual_edge_bps": "tune_top_actual_edge_bps", + "top_edge_lift_bps": "tune_top_edge_lift_bps", + } + ) + for split_id in (VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT): + split_rows = metrics[metrics["split_id"].eq(split_id)][ + key_columns + ["row_count", "positive_rate", "auc", "brier_beats_constant", "top_rows", "top_positive_rate", "all_actual_edge_bps", "top_actual_edge_bps", "top_edge_lift_bps"] + ].rename( + columns={ + "row_count": f"{split_id}_rows", + "positive_rate": f"{split_id}_positive_rate", + "auc": f"{split_id}_auc", + "brier_beats_constant": f"{split_id}_brier_beats_constant", + "top_rows": f"{split_id}_top_rows", + "top_positive_rate": f"{split_id}_top_positive_rate", + "all_actual_edge_bps": f"{split_id}_all_actual_edge_bps", + "top_actual_edge_bps": f"{split_id}_top_actual_edge_bps", + "top_edge_lift_bps": f"{split_id}_top_edge_lift_bps", + } + ) + candidates = candidates.merge(split_rows, on=key_columns, how="left") + top_edge_columns = ["tune_top_actual_edge_bps", f"{VALIDATION_LOCKED_SPLIT}_top_actual_edge_bps", f"{LATEST_STRESS_SPLIT}_top_actual_edge_bps"] + auc_columns = ["tune_auc", f"{VALIDATION_LOCKED_SPLIT}_auc", f"{LATEST_STRESS_SPLIT}_auc"] + lift_columns = ["tune_top_edge_lift_bps", f"{VALIDATION_LOCKED_SPLIT}_top_edge_lift_bps", f"{LATEST_STRESS_SPLIT}_top_edge_lift_bps"] + candidates["min_top_edge_bps"] = candidates[top_edge_columns].min(axis=1) + candidates["mean_top_edge_bps"] = candidates[top_edge_columns].mean(axis=1) + candidates["min_auc"] = candidates[auc_columns].min(axis=1) + candidates["stable_positive"] = candidates[top_edge_columns].gt(0.0).all(axis=1) + candidates["stable_lift"] = candidates[lift_columns].gt(0.0).all(axis=1) + candidates["score"] = candidates["min_top_edge_bps"].fillna(-999.0) + candidates["mean_top_edge_bps"].fillna(-999.0) * 0.25 + candidates["stable_positive"].astype(float) * 2.0 + return candidates.sort_values("score", ascending=False).reset_index(drop=True) + + +def _markdown_report(result: dict[str, Any], candidates: pd.DataFrame) -> str: + lines = [ + "# 条件化 Entry 训练诊断报告", + "", + "这份报告只做诊断,不导出上线模型。它先用未来机会做过滤,模拟“Direction 已经筛过一层”的训练人群。", + "", + "**注意:这里的过滤条件用了未来机会,不能直接上线,只能判断条件化 Entry 训练是否值得做。**", + "", + f"- run_id: `{result['run_id']}`", + f"- 特征数: `{result['feature_count']}`", + f"- 条件机会阈值: `{','.join(str(item) for item in result['condition_opportunity_bps'])}`", + f"- 目标真实收益阈值: `{','.join(str(item) for item in result['target_edge_bps'])}`", + f"- 模型类型: `{','.join(result['model_families'])}`", + f"- top 档位: `{','.join(str(item) for item in result['top_fractions'])}`", + f"- 候选数: `{result['candidate_count']}`", + f"- 三段 top 真实收益都转正: `{result['stable_positive_count']}`", + "", + ] + if candidates.empty: + lines.extend(["## 候选", "", "没有候选。", ""]) + return "\n".join(lines) + display_columns = [ + "side", + "model_family", + "condition_opportunity_bps", + "target_edge_bps", + "top_fraction", + "tune_top_actual_edge_bps", + f"{VALIDATION_LOCKED_SPLIT}_top_actual_edge_bps", + f"{LATEST_STRESS_SPLIT}_top_actual_edge_bps", + "min_top_edge_bps", + "stable_positive", + "stable_lift", + "score", + ] + lines.extend( + [ + "## 候选", + "", + _markdown_table(candidates[display_columns].head(30)), + "", + "## 文件", + "", + "- `diagnostics/conditional_entry_probe_metrics.csv`: 每个组合、每个数据段的完整指标。", + "- `diagnostics/conditional_entry_probe_candidates.csv`: 汇总后的候选排序。", + "", + ] + ) + return "\n".join(lines) +