From 7268f640a612c30b347bf116b6769d598ec1e014 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 28 Jun 2026 08:28:55 +0800 Subject: [PATCH] Add Entry low-drawdown diagnostics --- .../scripts/26_diagnose_entry_mae_labels.py | 33 ++ training/tests/test_training_contract.py | 47 +++ .../entry_mae_label_diagnostic.py | 366 ++++++++++++++++++ 3 files changed, 446 insertions(+) create mode 100644 training/scripts/26_diagnose_entry_mae_labels.py create mode 100644 training/trader_training/entry_mae_label_diagnostic.py diff --git a/training/scripts/26_diagnose_entry_mae_labels.py b/training/scripts/26_diagnose_entry_mae_labels.py new file mode 100644 index 0000000..080a89e --- /dev/null +++ b/training/scripts/26_diagnose_entry_mae_labels.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import argparse + +import _bootstrap # noqa: F401 +from trader_training.entry_mae_label_diagnostic import diagnose_entry_mae_labels +from trader_training.io_utils import add_common_args, setup_logging + + +def _float_tuple(value: str) -> tuple[float, ...]: + return tuple(float(item.strip()) for item in value.split(",") if item.strip()) + + +def _str_tuple(value: str) -> tuple[str, ...]: + return tuple(item.strip() for item in value.split(",") if item.strip()) + + +def main() -> None: + parser = argparse.ArgumentParser() + add_common_args(parser) + parser.add_argument("--max-mae-bps", type=_float_tuple, default=(4.0, 6.0, 8.0, 12.0)) + parser.add_argument("--min-opportunity-bps", type=_float_tuple, default=(6.0, 12.0, 20.0)) + parser.add_argument("--model-families", type=_str_tuple, default=("linear",)) + parser.add_argument("--top-fraction", type=float, default=0.10) + parser.add_argument("--top-fractions", type=_float_tuple) + parser.add_argument("--max-train-rows", type=int, default=0) + args = parser.parse_args() + setup_logging() + diagnose_entry_mae_labels(args) + + +if __name__ == "__main__": + main() diff --git a/training/tests/test_training_contract.py b/training/tests/test_training_contract.py index 7850e08..37bfc55 100644 --- a/training/tests/test_training_contract.py +++ b/training/tests/test_training_contract.py @@ -17,6 +17,7 @@ from trader_training.onnx_export import LinearHead, export_heads from trader_training.dynamic_exit_search import search_dynamic_exit_plans from trader_training.entry_condition_pair_screen import screen_entry_condition_pairs from trader_training.entry_feature_screen import _bucket_edges, _screen_edge_column +from trader_training.entry_mae_label_diagnostic import diagnose_entry_mae_labels from trader_training.io_utils import read_json, write_json from trader_training.labels import ENTRY_LABEL_METHOD, _path_stats_for_group, build_entry_labels from trader_training.ofi_feature_experiment import _load_entry_dataset, l1_snapshot_diff_ofi_quote @@ -116,6 +117,52 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual("LONG", best["side"]) self.assertGreater(float(best["min_eval_edge_bps"]), 0.0) + def test_entry_mae_label_diagnostic_finds_low_drawdown_target(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + data_root = Path(tmp) + run_root = data_root / "trader-v4" / "runs" / "unit-mae-diagnostic" + dataset_path = run_root / "dataset" / "entry_train.parquet" + dataset_path.parent.mkdir(parents=True) + + frames = [] + row_count = 800 + base_feature_values = np.linspace(0.0, 0.999, row_count) + for split_id in TRAINING_SPLITS: + frame = pd.DataFrame({feature: 0.0 for feature in FEATURE_ORDER}, index=np.arange(row_count)) + frame["split_id"] = split_id + frame["ret_1m_bps"] = base_feature_values + good_mask = frame["ret_1m_bps"] > 0.85 + frame["long_entry_target"] = good_mask.astype(int) + frame["short_entry_target"] = 0 + frame["long_actual_plan_net_edge_bps"] = np.where(good_mask, 9.0, -6.0) + frame["short_actual_plan_net_edge_bps"] = -6.0 + frame["long_max_achievable_net_edge_bps"] = np.where(good_mask, 18.0, 2.0) + frame["short_max_achievable_net_edge_bps"] = 2.0 + frame["long_mae_bps"] = np.where(good_mask, 2.0, 15.0) + frame["short_mae_bps"] = 15.0 + frames.append(frame) + pd.concat(frames, ignore_index=True).to_parquet(dataset_path, index=False) + + diagnose_entry_mae_labels( + Namespace( + data_root=data_root, + run_id="unit-mae-diagnostic", + max_mae_bps=(4.0,), + min_opportunity_bps=(12.0,), + model_families=("linear",), + top_fraction=0.10, + max_train_rows=0, + ) + ) + + result = read_json(run_root / "diagnostics" / "entry_mae_label_diagnostic_result.json") + candidates = pd.read_csv(run_root / "diagnostics" / "entry_mae_label_diagnostic_candidates.csv") + + self.assertGreater(result["positive_top_edge_candidate_count"], 0) + best = candidates.iloc[0] + self.assertEqual("LONG", best["side"]) + self.assertTrue(bool(best["stable_top_edge_positive"])) + def test_dynamic_exit_search_writes_plan_diagnostics(self) -> None: with tempfile.TemporaryDirectory() as tmp: data_root = Path(tmp) diff --git a/training/trader_training/entry_mae_label_diagnostic.py b/training/trader_training/entry_mae_label_diagnostic.py new file mode 100644 index 0000000..d190332 --- /dev/null +++ b/training/trader_training/entry_mae_label_diagnostic.py @@ -0,0 +1,366 @@ +from __future__ import annotations + +import logging +from typing import Any + +import numpy as np +import pandas as pd +from sklearn.ensemble import HistGradientBoostingClassifier +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import brier_score_loss, roc_auc_score +from sklearn.preprocessing import StandardScaler + +from trader_training.entry_feature_screen import _markdown_table +from trader_training.io_utils import read_parquet, run_root, write_json, write_text +from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT + + +EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) + + +def diagnose_entry_mae_labels(args: Any) -> None: + root = run_root(args) + dataset = read_parquet(root / "dataset" / "entry_train.parquet") + _require_columns(dataset) + + max_mae_values = tuple(float(item) for item in (args.max_mae_bps or (4.0, 6.0, 8.0, 12.0))) + min_opportunity_values = tuple(float(item) for item in (args.min_opportunity_bps or (6.0, 12.0, 20.0))) + model_families = tuple(str(item).strip().lower() for item in (args.model_families or ("linear",)) if str(item).strip()) + top_fractions = tuple(float(item) for item in (getattr(args, "top_fractions", None) or (float(args.top_fraction or 0.10),))) + max_train_rows = int(args.max_train_rows or 0) + + x_train_frame = dataset[dataset["split_id"].eq(FIT_SPLIT)].copy() + if x_train_frame.empty: + raise ValueError("entry mae label diagnostic needs fit_inner rows") + if max_train_rows > 0 and len(x_train_frame) > max_train_rows: + x_train_frame = x_train_frame.sort_values("event_time").tail(max_train_rows).copy() if "event_time" in x_train_frame.columns else x_train_frame.tail(max_train_rows).copy() + x_train = _x(x_train_frame) + + rows: list[dict[str, Any]] = [] + for side in ("LONG", "SHORT"): + actual_edge_col = f"{side.lower()}_actual_plan_net_edge_bps" + mae_col = f"{side.lower()}_mae_bps" + opportunity_col = f"{side.lower()}_max_achievable_net_edge_bps" + for max_mae_bps in max_mae_values: + for min_opportunity_bps in min_opportunity_values: + target_name = f"{side.lower()}_mae_le_{max_mae_bps:g}_opp_ge_{min_opportunity_bps:g}" + y_train = _target(x_train_frame, mae_col, opportunity_col, max_mae_bps, min_opportunity_bps) + if len(np.unique(y_train)) < 2: + rows.append( + { + "side": side, + "target_name": target_name, + "model_family": "SKIPPED", + "max_mae_bps": max_mae_bps, + "min_opportunity_bps": min_opportunity_bps, + "status": "SKIPPED_ONE_CLASS_TRAIN", + "train_rows": int(len(y_train)), + "train_positive_rate": float(y_train.mean()) if len(y_train) else 0.0, + } + ) + continue + for model_family in model_families: + model, scaler = _fit_model(model_family, x_train, y_train) + for split_id in EVAL_SPLITS: + split_frame = dataset[dataset["split_id"].eq(split_id)].copy() + if split_frame.empty: + continue + y_true = _target(split_frame, mae_col, opportunity_col, max_mae_bps, min_opportunity_bps) + proba = _predict(model_family, model, scaler, _x(split_frame)) + for top_fraction in top_fractions: + rows.append( + _metric_row( + split_frame, + y_true, + proba, + side, + target_name, + model_family, + split_id, + max_mae_bps, + min_opportunity_bps, + top_fraction, + actual_edge_col, + mae_col, + opportunity_col, + float(y_train.mean()), + ) + ) + logging.info( + "trader.training.entry_mae_label_diagnosed side=%s target=%s modelFamily=%s trainRows=%s trainPositiveRate=%.6f", + side, + target_name, + model_family, + len(y_train), + float(y_train.mean()), + ) + + metrics = pd.DataFrame(rows) + candidates = _select_candidates(metrics) + result = { + "run_id": args.run_id, + "feature_count": len(FEATURE_ORDER), + "max_mae_bps": list(max_mae_values), + "min_opportunity_bps": list(min_opportunity_values), + "model_families": list(model_families), + "top_fractions": list(top_fractions), + "max_train_rows": max_train_rows, + "metric_count": int(len(metrics)), + "candidate_count": int(len(candidates)), + "positive_top_edge_candidate_count": int(candidates["stable_top_edge_positive"].sum()) if not candidates.empty else 0, + "purpose": "diagnostic_only_not_exported", + "selection_rule": "fit on fit_inner; rank by top predicted low-MAE opportunity samples on tune_inner/validation_locked/latest_stress", + } + out_dir = root / "diagnostics" + write_json(out_dir / "entry_mae_label_diagnostic_result.json", result) + write_text(out_dir / "entry_mae_label_diagnostic_metrics.csv", metrics.to_csv(index=False)) + write_text(out_dir / "entry_mae_label_diagnostic_candidates.csv", candidates.to_csv(index=False)) + write_text(out_dir / "entry_mae_label_diagnostic_report.md", _markdown_report(result, candidates)) + logging.info( + "trader.training.entry_mae_label_diagnostic_written runId=%s metricCount=%s candidateCount=%s reportPath=%s", + args.run_id, + len(metrics), + len(candidates), + out_dir / "entry_mae_label_diagnostic_report.md", + ) + + +def _require_columns(dataset: pd.DataFrame) -> None: + required = {"split_id", *FEATURE_ORDER} + for side in ("long", "short"): + required.update( + { + f"{side}_actual_plan_net_edge_bps", + f"{side}_mae_bps", + f"{side}_max_achievable_net_edge_bps", + } + ) + missing = sorted(required.difference(dataset.columns)) + if missing: + raise ValueError(f"entry mae label diagnostic missing required columns: {missing}") + + +def _x(frame: pd.DataFrame) -> np.ndarray: + values = frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32") + if values.isna().any().any(): + missing = values.columns[values.isna().any()].tolist() + raise ValueError(f"entry mae label diagnostic found non-finite feature values: {missing}") + return values.to_numpy(dtype="float32") + + +def _target(frame: pd.DataFrame, mae_col: str, opportunity_col: str, max_mae_bps: float, min_opportunity_bps: float) -> np.ndarray: + mae = pd.to_numeric(frame[mae_col], errors="coerce") + opportunity = pd.to_numeric(frame[opportunity_col], errors="coerce") + return ((mae <= max_mae_bps) & (opportunity >= min_opportunity_bps)).astype(int).to_numpy() + + +def _fit_model(model_family: str, x_train: np.ndarray, y_train: np.ndarray) -> tuple[Any, StandardScaler | None]: + if model_family == "linear": + scaler = StandardScaler() + x_scaled = scaler.fit_transform(x_train) + model = LogisticRegression(max_iter=500, class_weight="balanced") + model.fit(x_scaled, y_train) + return model, scaler + if model_family == "tree": + model = HistGradientBoostingClassifier( + max_iter=120, + learning_rate=0.04, + max_leaf_nodes=31, + l2_regularization=0.02, + early_stopping=True, + random_state=23, + ) + model.fit(x_train, y_train) + return model, None + raise ValueError(f"unsupported model family: {model_family}") + + +def _predict(model_family: str, model: Any, scaler: StandardScaler | None, x: np.ndarray) -> np.ndarray: + if model_family == "linear": + if scaler is None: + raise ValueError("linear model missing scaler") + return model.predict_proba(scaler.transform(x))[:, 1] + return model.predict_proba(x)[:, 1] + + +def _metric_row( + frame: pd.DataFrame, + y_true: np.ndarray, + proba: np.ndarray, + side: str, + target_name: str, + model_family: str, + split_id: str, + max_mae_bps: float, + min_opportunity_bps: float, + top_fraction: float, + actual_edge_col: str, + mae_col: str, + opportunity_col: str, + train_positive_rate: float, +) -> dict[str, Any]: + order = np.argsort(-proba) + top_n = max(1, int(len(frame) * top_fraction)) + top_index = frame.index.to_numpy()[order[:top_n]] + top = frame.loc[top_index] + constant = np.full(len(y_true), np.clip(train_positive_rate, 1e-6, 1 - 1e-6)) + row: dict[str, Any] = { + "side": side, + "target_name": target_name, + "model_family": model_family, + "split_id": split_id, + "status": "OK", + "max_mae_bps": max_mae_bps, + "min_opportunity_bps": min_opportunity_bps, + "row_count": int(len(frame)), + "positive_rate": float(y_true.mean()) if len(y_true) else 0.0, + "train_positive_rate": train_positive_rate, + "brier": float(brier_score_loss(y_true, proba)) if len(y_true) else 0.0, + "constant_brier": float(brier_score_loss(y_true, constant)) if len(y_true) else 0.0, + "top_fraction": top_fraction, + "top_rows": int(len(top)), + "top_target_rate": float(y_true[order[:top_n]].mean()) if len(y_true) else 0.0, + "all_actual_edge_bps": float(frame[actual_edge_col].mean()), + "top_actual_edge_bps": float(top[actual_edge_col].mean()), + "top_mae_bps": float(top[mae_col].mean()), + "top_opportunity_bps": float(top[opportunity_col].mean()), + "top_probability_min": float(proba[order[:top_n]].min()) if len(proba) else 0.0, + "top_probability_max": float(proba[order[:top_n]].max()) if len(proba) else 0.0, + } + if len(np.unique(y_true)) == 2: + row["auc"] = float(roc_auc_score(y_true, proba)) + else: + row["auc"] = np.nan + row["brier_beats_constant"] = bool(row["brier"] < row["constant_brier"]) + row["top_edge_lift_bps"] = row["top_actual_edge_bps"] - row["all_actual_edge_bps"] + row["top_target_lift"] = row["top_target_rate"] - row["positive_rate"] + return row + + +def _select_candidates(metrics: pd.DataFrame) -> pd.DataFrame: + ok = metrics[metrics["status"].eq("OK")].copy() + if ok.empty: + return pd.DataFrame() + key_columns = ["side", "target_name", "model_family", "max_mae_bps", "min_opportunity_bps", "top_fraction"] + tune = ok[ok["split_id"].eq(TUNE_SPLIT)].copy() + candidates = tune[ + key_columns + + [ + "row_count", + "positive_rate", + "auc", + "brier", + "constant_brier", + "brier_beats_constant", + "top_target_rate", + "top_actual_edge_bps", + "top_edge_lift_bps", + "top_mae_bps", + "top_opportunity_bps", + ] + ].rename( + columns={ + "row_count": "tune_rows", + "positive_rate": "tune_positive_rate", + "auc": "tune_auc", + "brier": "tune_brier", + "constant_brier": "tune_constant_brier", + "brier_beats_constant": "tune_brier_beats_constant", + "top_target_rate": "tune_top_target_rate", + "top_actual_edge_bps": "tune_top_actual_edge_bps", + "top_edge_lift_bps": "tune_top_edge_lift_bps", + "top_mae_bps": "tune_top_mae_bps", + "top_opportunity_bps": "tune_top_opportunity_bps", + } + ) + for split_id in (VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT): + split_rows = ok[ok["split_id"].eq(split_id)][ + key_columns + ["row_count", "positive_rate", "auc", "brier", "constant_brier", "brier_beats_constant", "top_target_rate", "top_actual_edge_bps", "top_edge_lift_bps", "top_mae_bps", "top_opportunity_bps"] + ].rename( + columns={ + "row_count": f"{split_id}_rows", + "positive_rate": f"{split_id}_positive_rate", + "auc": f"{split_id}_auc", + "brier": f"{split_id}_brier", + "constant_brier": f"{split_id}_constant_brier", + "brier_beats_constant": f"{split_id}_brier_beats_constant", + "top_target_rate": f"{split_id}_top_target_rate", + "top_actual_edge_bps": f"{split_id}_top_actual_edge_bps", + "top_edge_lift_bps": f"{split_id}_top_edge_lift_bps", + "top_mae_bps": f"{split_id}_top_mae_bps", + "top_opportunity_bps": f"{split_id}_top_opportunity_bps", + } + ) + candidates = candidates.merge(split_rows, on=key_columns, how="left") + + top_edge_columns = ["tune_top_actual_edge_bps", f"{VALIDATION_LOCKED_SPLIT}_top_actual_edge_bps", f"{LATEST_STRESS_SPLIT}_top_actual_edge_bps"] + auc_columns = ["tune_auc", f"{VALIDATION_LOCKED_SPLIT}_auc", f"{LATEST_STRESS_SPLIT}_auc"] + lift_columns = ["tune_top_edge_lift_bps", f"{VALIDATION_LOCKED_SPLIT}_top_edge_lift_bps", f"{LATEST_STRESS_SPLIT}_top_edge_lift_bps"] + candidates["min_eval_top_edge_bps"] = candidates[top_edge_columns].min(axis=1) + candidates["mean_eval_top_edge_bps"] = candidates[top_edge_columns].mean(axis=1) + candidates["min_eval_auc"] = candidates[auc_columns].min(axis=1) + candidates["stable_top_edge_positive"] = candidates[top_edge_columns].gt(0.0).all(axis=1) + candidates["stable_lift"] = candidates[lift_columns].gt(0.0).all(axis=1) + brier_flag_columns = ["tune_brier_beats_constant", f"{VALIDATION_LOCKED_SPLIT}_brier_beats_constant", f"{LATEST_STRESS_SPLIT}_brier_beats_constant"] + for column in brier_flag_columns: + candidates[column] = candidates[column].map(lambda value: bool(value) if pd.notna(value) else False) + candidates["stable_brier_beats_constant"] = candidates[brier_flag_columns].all(axis=1) + candidates["diagnostic_score"] = ( + candidates["min_eval_top_edge_bps"].fillna(-999.0) + + candidates["mean_eval_top_edge_bps"].fillna(-999.0) * 0.25 + + candidates["min_eval_auc"].fillna(0.0) * 2.0 + + candidates["stable_lift"].astype(float) + ) + return candidates.sort_values("diagnostic_score", ascending=False).reset_index(drop=True) + + +def _markdown_report(result: dict[str, Any], candidates: pd.DataFrame) -> str: + lines = [ + "# Entry 低回撤标签诊断报告", + "", + "这份报告只做诊断,不导出上线模型。它回答:现有特征能不能识别“回撤小、同时有足够空间”的开仓点。", + "", + f"- run_id: `{result['run_id']}`", + f"- 特征数: `{result['feature_count']}`", + f"- 模型类型: `{','.join(result['model_families'])}`", + f"- top_fractions: `{','.join(str(item) for item in result['top_fractions'])}`", + f"- 指标行数: `{result['metric_count']}`", + f"- 候选数: `{result['candidate_count']}`", + f"- top 真实收益三段都转正的候选数: `{result['positive_top_edge_candidate_count']}`", + "", + ] + if candidates.empty: + lines.extend(["## 候选", "", "没有候选。", ""]) + return "\n".join(lines) + display_columns = [ + "side", + "model_family", + "top_fraction", + "max_mae_bps", + "min_opportunity_bps", + "tune_auc", + f"{VALIDATION_LOCKED_SPLIT}_auc", + f"{LATEST_STRESS_SPLIT}_auc", + "tune_top_actual_edge_bps", + f"{VALIDATION_LOCKED_SPLIT}_top_actual_edge_bps", + f"{LATEST_STRESS_SPLIT}_top_actual_edge_bps", + "min_eval_top_edge_bps", + "stable_top_edge_positive", + "stable_lift", + "stable_brier_beats_constant", + "diagnostic_score", + ] + lines.extend( + [ + "## 候选", + "", + _markdown_table(candidates[display_columns].head(25)), + "", + "## 文件", + "", + "- `diagnostics/entry_mae_label_diagnostic_metrics.csv`: 每个标签、方向、模型、数据段的完整指标。", + "- `diagnostics/entry_mae_label_diagnostic_candidates.csv`: 按三段 top 真实收益排序的候选。", + "", + ] + ) + return "\n".join(lines)