from __future__ import annotations import logging from typing import Any import numpy as np import pandas as pd from sklearn.ensemble import HistGradientBoostingClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import brier_score_loss, roc_auc_score from sklearn.preprocessing import StandardScaler from trader_training.entry_feature_screen import _markdown_table from trader_training.io_utils import read_parquet, run_root, write_json, write_text from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) def probe_conditional_entry_training(args: Any) -> None: root = run_root(args) dataset = read_parquet(root / "dataset" / "entry_train.parquet") _require_columns(dataset) condition_opportunities = tuple(float(item) for item in (args.condition_opportunity_bps or (6.0, 12.0, 20.0, 40.0, 60.0))) target_edges = tuple(float(item) for item in (args.target_edge_bps or (0.0, 3.0))) model_families = tuple(str(item).strip().lower() for item in (args.model_families or ("linear", "tree")) if str(item).strip()) top_fractions = tuple(float(item) for item in (args.top_fractions or (0.01, 0.02, 0.05, 0.10))) max_train_rows = int(args.max_train_rows or 0) rows: list[dict[str, Any]] = [] skipped: list[dict[str, Any]] = [] for side in ("LONG", "SHORT"): prefix = side.lower() actual_edge_col = f"{prefix}_actual_plan_net_edge_bps" opportunity_col = f"{prefix}_max_achievable_net_edge_bps" for condition_opportunity_bps in condition_opportunities: fit_condition = dataset["split_id"].eq(FIT_SPLIT) & (pd.to_numeric(dataset[opportunity_col], errors="coerce") >= condition_opportunity_bps) fit_frame = dataset.loc[fit_condition].copy() if max_train_rows > 0 and len(fit_frame) > max_train_rows: fit_frame = fit_frame.sort_values("event_time").tail(max_train_rows).copy() if "event_time" in fit_frame.columns else fit_frame.tail(max_train_rows).copy() if len(fit_frame) < int(args.min_train_rows or 1000): skipped.append( { "side": side, "condition_opportunity_bps": condition_opportunity_bps, "reason": "NOT_ENOUGH_TRAIN_ROWS", "train_rows": int(len(fit_frame)), } ) continue x_train = _x(fit_frame) for target_edge_bps in target_edges: y_train = (pd.to_numeric(fit_frame[actual_edge_col], errors="coerce") >= target_edge_bps).astype(int).to_numpy() if len(np.unique(y_train)) < 2: skipped.append( { "side": side, "condition_opportunity_bps": condition_opportunity_bps, "target_edge_bps": target_edge_bps, "reason": "ONE_CLASS_TRAIN", "train_rows": int(len(fit_frame)), "train_positive_rate": float(y_train.mean()) if len(y_train) else 0.0, } ) continue for model_family in model_families: model, scaler = _fit_model(model_family, x_train, y_train) for split_id in EVAL_SPLITS: eval_condition = dataset["split_id"].eq(split_id) & (pd.to_numeric(dataset[opportunity_col], errors="coerce") >= condition_opportunity_bps) eval_frame = dataset.loc[eval_condition].copy() if len(eval_frame) < int(args.min_eval_rows or 500): continue y_true = (pd.to_numeric(eval_frame[actual_edge_col], errors="coerce") >= target_edge_bps).astype(int).to_numpy() proba = _predict(model_family, model, scaler, _x(eval_frame)) for top_fraction in top_fractions: rows.append( _metric_row( eval_frame, y_true, proba, side, model_family, split_id, condition_opportunity_bps, target_edge_bps, top_fraction, actual_edge_col, float(y_train.mean()), len(fit_frame), ) ) logging.info( "trader.training.conditional_entry_probe_fitted side=%s conditionOpportunityBps=%s targetEdgeBps=%s modelFamily=%s trainRows=%s trainPositiveRate=%.6f", side, condition_opportunity_bps, target_edge_bps, model_family, len(fit_frame), float(y_train.mean()), ) metrics = pd.DataFrame(rows) candidates = _select_candidates(metrics) result = { "run_id": args.run_id, "purpose": "diagnostic_only_not_exported", "warning": "condition_opportunity_bps is an oracle future filter; use this only to decide whether conditional Entry training is worth implementing", "feature_count": len(FEATURE_ORDER), "condition_opportunity_bps": list(condition_opportunities), "target_edge_bps": list(target_edges), "model_families": list(model_families), "top_fractions": list(top_fractions), "max_train_rows": max_train_rows, "metric_count": int(len(metrics)), "candidate_count": int(len(candidates)), "stable_positive_count": int(candidates["stable_positive"].sum()) if not candidates.empty else 0, "skipped": skipped, } out_dir = root / "diagnostics" write_json(out_dir / "conditional_entry_probe_result.json", result) write_text(out_dir / "conditional_entry_probe_metrics.csv", metrics.to_csv(index=False)) write_text(out_dir / "conditional_entry_probe_candidates.csv", candidates.to_csv(index=False)) write_text(out_dir / "conditional_entry_probe_report.md", _markdown_report(result, candidates)) logging.info( "trader.training.conditional_entry_probe_written runId=%s metricCount=%s candidateCount=%s stablePositiveCount=%s reportPath=%s", args.run_id, len(metrics), len(candidates), result["stable_positive_count"], out_dir / "conditional_entry_probe_report.md", ) def _require_columns(dataset: pd.DataFrame) -> None: required = {"split_id", *FEATURE_ORDER} for side in ("long", "short"): required.update({f"{side}_actual_plan_net_edge_bps", f"{side}_max_achievable_net_edge_bps"}) missing = sorted(required.difference(dataset.columns)) if missing: raise ValueError(f"conditional entry probe missing required columns: {missing}") def _x(frame: pd.DataFrame) -> np.ndarray: values = frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32") if values.isna().any().any(): missing = values.columns[values.isna().any()].tolist() raise ValueError(f"conditional entry probe found non-finite feature values: {missing}") return values.to_numpy(dtype="float32") def _fit_model(model_family: str, x_train: np.ndarray, y_train: np.ndarray) -> tuple[Any, StandardScaler | None]: if model_family == "linear": scaler = StandardScaler() x_scaled = scaler.fit_transform(x_train) model = LogisticRegression(max_iter=500, class_weight="balanced") model.fit(x_scaled, y_train) return model, scaler if model_family == "tree": model = HistGradientBoostingClassifier( max_iter=120, learning_rate=0.04, max_leaf_nodes=31, l2_regularization=0.02, early_stopping=True, random_state=31, ) model.fit(x_train, y_train) return model, None raise ValueError(f"unsupported model family: {model_family}") def _predict(model_family: str, model: Any, scaler: StandardScaler | None, x: np.ndarray) -> np.ndarray: if model_family == "linear": if scaler is None: raise ValueError("linear model missing scaler") return model.predict_proba(scaler.transform(x))[:, 1] return model.predict_proba(x)[:, 1] def _metric_row( frame: pd.DataFrame, y_true: np.ndarray, proba: np.ndarray, side: str, model_family: str, split_id: str, condition_opportunity_bps: float, target_edge_bps: float, top_fraction: float, actual_edge_col: str, train_positive_rate: float, train_rows: int, ) -> dict[str, Any]: order = np.argsort(-proba) top_n = max(1, int(len(frame) * top_fraction)) top = frame.iloc[order[:top_n]] constant = np.full(len(y_true), np.clip(train_positive_rate, 1e-6, 1 - 1e-6)) row: dict[str, Any] = { "side": side, "model_family": model_family, "split_id": split_id, "condition_opportunity_bps": condition_opportunity_bps, "target_edge_bps": target_edge_bps, "top_fraction": top_fraction, "train_rows": int(train_rows), "train_positive_rate": train_positive_rate, "row_count": int(len(frame)), "positive_rate": float(y_true.mean()) if len(y_true) else 0.0, "brier": float(brier_score_loss(y_true, proba)) if len(y_true) else 0.0, "constant_brier": float(brier_score_loss(y_true, constant)) if len(y_true) else 0.0, "top_rows": int(len(top)), "top_positive_rate": float((top[actual_edge_col] >= target_edge_bps).mean()), "all_actual_edge_bps": float(frame[actual_edge_col].mean()), "top_actual_edge_bps": float(top[actual_edge_col].mean()), "top_probability_min": float(proba[order[:top_n]].min()) if len(proba) else 0.0, "top_probability_max": float(proba[order[:top_n]].max()) if len(proba) else 0.0, } row["auc"] = float(roc_auc_score(y_true, proba)) if len(np.unique(y_true)) == 2 else np.nan row["top_edge_lift_bps"] = row["top_actual_edge_bps"] - row["all_actual_edge_bps"] row["brier_beats_constant"] = bool(row["brier"] < row["constant_brier"]) return row def _select_candidates(metrics: pd.DataFrame) -> pd.DataFrame: if metrics.empty: return pd.DataFrame() key_columns = ["side", "model_family", "condition_opportunity_bps", "target_edge_bps", "top_fraction"] tune = metrics[metrics["split_id"].eq(TUNE_SPLIT)].copy() candidates = tune[ key_columns + [ "train_rows", "train_positive_rate", "row_count", "positive_rate", "auc", "brier_beats_constant", "top_rows", "top_positive_rate", "all_actual_edge_bps", "top_actual_edge_bps", "top_edge_lift_bps", ] ].rename( columns={ "row_count": "tune_rows", "positive_rate": "tune_positive_rate", "auc": "tune_auc", "brier_beats_constant": "tune_brier_beats_constant", "top_rows": "tune_top_rows", "top_positive_rate": "tune_top_positive_rate", "all_actual_edge_bps": "tune_all_actual_edge_bps", "top_actual_edge_bps": "tune_top_actual_edge_bps", "top_edge_lift_bps": "tune_top_edge_lift_bps", } ) for split_id in (VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT): split_rows = metrics[metrics["split_id"].eq(split_id)][ key_columns + ["row_count", "positive_rate", "auc", "brier_beats_constant", "top_rows", "top_positive_rate", "all_actual_edge_bps", "top_actual_edge_bps", "top_edge_lift_bps"] ].rename( columns={ "row_count": f"{split_id}_rows", "positive_rate": f"{split_id}_positive_rate", "auc": f"{split_id}_auc", "brier_beats_constant": f"{split_id}_brier_beats_constant", "top_rows": f"{split_id}_top_rows", "top_positive_rate": f"{split_id}_top_positive_rate", "all_actual_edge_bps": f"{split_id}_all_actual_edge_bps", "top_actual_edge_bps": f"{split_id}_top_actual_edge_bps", "top_edge_lift_bps": f"{split_id}_top_edge_lift_bps", } ) candidates = candidates.merge(split_rows, on=key_columns, how="left") top_edge_columns = ["tune_top_actual_edge_bps", f"{VALIDATION_LOCKED_SPLIT}_top_actual_edge_bps", f"{LATEST_STRESS_SPLIT}_top_actual_edge_bps"] auc_columns = ["tune_auc", f"{VALIDATION_LOCKED_SPLIT}_auc", f"{LATEST_STRESS_SPLIT}_auc"] lift_columns = ["tune_top_edge_lift_bps", f"{VALIDATION_LOCKED_SPLIT}_top_edge_lift_bps", f"{LATEST_STRESS_SPLIT}_top_edge_lift_bps"] candidates["min_top_edge_bps"] = candidates[top_edge_columns].min(axis=1) candidates["mean_top_edge_bps"] = candidates[top_edge_columns].mean(axis=1) candidates["min_auc"] = candidates[auc_columns].min(axis=1) candidates["stable_positive"] = candidates[top_edge_columns].gt(0.0).all(axis=1) candidates["stable_lift"] = candidates[lift_columns].gt(0.0).all(axis=1) candidates["score"] = candidates["min_top_edge_bps"].fillna(-999.0) + candidates["mean_top_edge_bps"].fillna(-999.0) * 0.25 + candidates["stable_positive"].astype(float) * 2.0 return candidates.sort_values("score", ascending=False).reset_index(drop=True) def _markdown_report(result: dict[str, Any], candidates: pd.DataFrame) -> str: lines = [ "# 条件化 Entry 训练诊断报告", "", "这份报告只做诊断,不导出上线模型。它先用未来机会做过滤,模拟“Direction 已经筛过一层”的训练人群。", "", "**注意:这里的过滤条件用了未来机会,不能直接上线,只能判断条件化 Entry 训练是否值得做。**", "", f"- run_id: `{result['run_id']}`", f"- 特征数: `{result['feature_count']}`", f"- 条件机会阈值: `{','.join(str(item) for item in result['condition_opportunity_bps'])}`", f"- 目标真实收益阈值: `{','.join(str(item) for item in result['target_edge_bps'])}`", f"- 模型类型: `{','.join(result['model_families'])}`", f"- top 档位: `{','.join(str(item) for item in result['top_fractions'])}`", f"- 候选数: `{result['candidate_count']}`", f"- 三段 top 真实收益都转正: `{result['stable_positive_count']}`", "", ] if candidates.empty: lines.extend(["## 候选", "", "没有候选。", ""]) return "\n".join(lines) display_columns = [ "side", "model_family", "condition_opportunity_bps", "target_edge_bps", "top_fraction", "tune_top_actual_edge_bps", f"{VALIDATION_LOCKED_SPLIT}_top_actual_edge_bps", f"{LATEST_STRESS_SPLIT}_top_actual_edge_bps", "min_top_edge_bps", "stable_positive", "stable_lift", "score", ] lines.extend( [ "## 候选", "", _markdown_table(candidates[display_columns].head(30)), "", "## 文件", "", "- `diagnostics/conditional_entry_probe_metrics.csv`: 每个组合、每个数据段的完整指标。", "- `diagnostics/conditional_entry_probe_candidates.csv`: 汇总后的候选排序。", "", ] ) return "\n".join(lines)