From 3f49af5ba67fb255e27ded0cca0f894d6803d3b4 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 28 Jun 2026 08:21:01 +0800 Subject: [PATCH] Add Entry condition pair diagnostics --- .../25_screen_entry_condition_pairs.py | 23 ++ training/tests/test_training_contract.py | 46 +++ .../entry_condition_pair_screen.py | 377 ++++++++++++++++++ 3 files changed, 446 insertions(+) create mode 100644 training/scripts/25_screen_entry_condition_pairs.py create mode 100644 training/trader_training/entry_condition_pair_screen.py diff --git a/training/scripts/25_screen_entry_condition_pairs.py b/training/scripts/25_screen_entry_condition_pairs.py new file mode 100644 index 0000000..7f14b5b --- /dev/null +++ b/training/scripts/25_screen_entry_condition_pairs.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import argparse + +import _bootstrap # noqa: F401 +from trader_training.entry_condition_pair_screen import screen_entry_condition_pairs +from trader_training.io_utils import add_common_args, setup_logging + + +def main() -> None: + parser = argparse.ArgumentParser() + add_common_args(parser) + parser.add_argument("--min-seed-rows", type=int, default=300) + parser.add_argument("--min-pair-rows", type=int, default=150) + parser.add_argument("--max-seed-conditions-per-side", type=int, default=32) + parser.add_argument("--max-buckets-per-feature", type=int, default=2) + args = parser.parse_args() + setup_logging() + screen_entry_condition_pairs(args) + + +if __name__ == "__main__": + main() diff --git a/training/tests/test_training_contract.py b/training/tests/test_training_contract.py index 0a9b21c..7850e08 100644 --- a/training/tests/test_training_contract.py +++ b/training/tests/test_training_contract.py @@ -15,6 +15,7 @@ if str(TRAINING_ROOT) not in sys.path: from trader_training.onnx_export import LinearHead, export_heads from trader_training.dynamic_exit_search import search_dynamic_exit_plans +from trader_training.entry_condition_pair_screen import screen_entry_condition_pairs from trader_training.entry_feature_screen import _bucket_edges, _screen_edge_column from trader_training.io_utils import read_json, write_json from trader_training.labels import ENTRY_LABEL_METHOD, _path_stats_for_group, build_entry_labels @@ -70,6 +71,51 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual(-np.inf, edges[0]) self.assertEqual(np.inf, edges[-1]) + def test_entry_condition_pair_screen_finds_stable_two_feature_filter(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + data_root = Path(tmp) + run_root = data_root / "trader-v4" / "runs" / "unit-condition-pair" + dataset_path = run_root / "dataset" / "entry_train.parquet" + dataset_path.parent.mkdir(parents=True) + + frames = [] + row_count = 1200 + base_feature_values = np.linspace(0.0, 0.999, row_count) + for split_id in TRAINING_SPLITS: + frame = pd.DataFrame({feature: 0.0 for feature in FEATURE_ORDER}, index=np.arange(row_count)) + frame["split_id"] = split_id + frame["ret_1m_bps"] = base_feature_values + frame["ret_5m_bps"] = base_feature_values + good_mask = (frame["ret_1m_bps"] > 0.9) & (frame["ret_5m_bps"] > 0.9) + frame["long_entry_target"] = good_mask.astype(int) + frame["short_entry_target"] = 0 + frame["long_actual_plan_net_edge_bps"] = np.where(good_mask, 8.0, -6.0) + frame["short_actual_plan_net_edge_bps"] = -6.0 + frame["long_mae_bps"] = np.where(good_mask, 2.0, 15.0) + frame["short_mae_bps"] = 15.0 + frames.append(frame) + pd.concat(frames, ignore_index=True).to_parquet(dataset_path, index=False) + + screen_entry_condition_pairs( + Namespace( + data_root=data_root, + run_id="unit-condition-pair", + min_seed_rows=50, + min_pair_rows=50, + max_seed_conditions_per_side=8, + max_buckets_per_feature=2, + ) + ) + + result = read_json(run_root / "diagnostics" / "entry_condition_pair_screen_result.json") + candidates = pd.read_csv(run_root / "diagnostics" / "entry_condition_pair_candidates.csv") + + self.assertGreater(result["stable_candidate_count"], 0) + self.assertTrue(candidates["usable_candidate"].any()) + best = candidates.iloc[0] + self.assertEqual("LONG", best["side"]) + self.assertGreater(float(best["min_eval_edge_bps"]), 0.0) + def test_dynamic_exit_search_writes_plan_diagnostics(self) -> None: with tempfile.TemporaryDirectory() as tmp: data_root = Path(tmp) diff --git a/training/trader_training/entry_condition_pair_screen.py b/training/trader_training/entry_condition_pair_screen.py new file mode 100644 index 0000000..4dd20e3 --- /dev/null +++ b/training/trader_training/entry_condition_pair_screen.py @@ -0,0 +1,377 @@ +from __future__ import annotations + +import logging +from itertools import combinations +from typing import Any + +import numpy as np +import pandas as pd + +from trader_training.entry_feature_screen import _bucket_edges, _markdown_table +from trader_training.io_utils import read_parquet, run_root, write_json, write_text +from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT + + +EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) +ALL_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) + + +def screen_entry_condition_pairs(args: Any) -> None: + root = run_root(args) + dataset = read_parquet(root / "dataset" / "entry_train.parquet") + _require_columns(dataset) + + min_seed_rows = int(args.min_seed_rows or 300) + min_pair_rows = int(args.min_pair_rows or 150) + max_seed_conditions_per_side = int(args.max_seed_conditions_per_side or 32) + max_buckets_per_feature = int(args.max_buckets_per_feature or 2) + + rows: list[dict[str, Any]] = [] + seed_frames: list[pd.DataFrame] = [] + bucketed_features = _bucketed_features(dataset) + for side in ("LONG", "SHORT"): + target_col = "long_entry_target" if side == "LONG" else "short_entry_target" + edge_col = _actual_edge_column(side) + mae_col = "long_mae_bps" if side == "LONG" else "short_mae_bps" + baselines = _split_baselines(dataset, target_col, edge_col, mae_col) + seeds = _seed_conditions( + dataset, + bucketed_features, + side, + target_col, + edge_col, + mae_col, + baselines, + min_seed_rows, + max_buckets_per_feature, + max_seed_conditions_per_side, + ) + seed_frames.append(seeds) + side_rows = _condition_pair_rows( + dataset, + bucketed_features, + seeds, + side, + target_col, + edge_col, + mae_col, + baselines, + min_pair_rows, + ) + rows.extend(side_rows) + logging.info( + "trader.training.entry_condition_pair_side_screened side=%s seedCount=%s pairMetricRows=%s", + side, + len(seeds), + len(side_rows), + ) + + pair_metrics = pd.DataFrame(rows) + candidates = _select_candidates(pair_metrics, min_pair_rows) if not pair_metrics.empty else pd.DataFrame() + seeds_all = pd.concat(seed_frames, ignore_index=True) if seed_frames else pd.DataFrame() + result = { + "run_id": args.run_id, + "dataset_path": str(root / "dataset" / "entry_train.parquet"), + "feature_count": len(FEATURE_ORDER), + "seed_count": int(len(seeds_all)), + "pair_metric_count": int(len(pair_metrics)), + "candidate_count": int(len(candidates)), + "stable_candidate_count": int((candidates.get("stable_positive_edge", pd.Series(dtype=bool)) & candidates.get("stable_lift", pd.Series(dtype=bool))).sum()) if not candidates.empty else 0, + "min_seed_rows": min_seed_rows, + "min_pair_rows": min_pair_rows, + "max_seed_conditions_per_side": max_seed_conditions_per_side, + "max_buckets_per_feature": max_buckets_per_feature, + "selection_rule": "single buckets are chosen on tune_inner, then feature-pair intersections are checked on tune_inner/validation_locked/latest_stress", + } + write_json(root / "diagnostics" / "entry_condition_pair_screen_result.json", result) + write_text(root / "diagnostics" / "entry_condition_pair_seeds.csv", seeds_all.to_csv(index=False)) + write_text(root / "diagnostics" / "entry_condition_pair_metrics.csv", pair_metrics.to_csv(index=False)) + write_text(root / "diagnostics" / "entry_condition_pair_candidates.csv", candidates.to_csv(index=False)) + write_text(root / "diagnostics" / "entry_condition_pair_screen_report.md", _markdown_report(result, candidates)) + logging.info( + "trader.training.entry_condition_pair_screened runId=%s seedCount=%s pairMetricCount=%s candidateCount=%s reportPath=%s", + args.run_id, + len(seeds_all), + len(pair_metrics), + len(candidates), + root / "diagnostics" / "entry_condition_pair_screen_report.md", + ) + + +def _require_columns(dataset: pd.DataFrame) -> None: + required = { + "split_id", + *FEATURE_ORDER, + "long_entry_target", + "short_entry_target", + "long_actual_plan_net_edge_bps", + "short_actual_plan_net_edge_bps", + "long_mae_bps", + "short_mae_bps", + } + missing = sorted(required.difference(dataset.columns)) + if missing: + raise ValueError(f"entry condition pair screen missing required columns: {missing}") + + +def _actual_edge_column(side: str) -> str: + if side == "LONG": + return "long_actual_plan_net_edge_bps" + if side == "SHORT": + return "short_actual_plan_net_edge_bps" + raise ValueError(f"unsupported side: {side}") + + +def _bucketed_features(dataset: pd.DataFrame) -> dict[str, pd.Series]: + bucketed: dict[str, pd.Series] = {} + fit_mask = dataset["split_id"].eq(FIT_SPLIT) + for feature in FEATURE_ORDER: + train_values = pd.to_numeric(dataset.loc[fit_mask, feature], errors="coerce").replace([np.inf, -np.inf], np.nan).dropna() + edges = _bucket_edges(train_values.to_numpy(dtype="float64")) + if len(edges) < 3: + continue + values = pd.to_numeric(dataset[feature], errors="coerce").replace([np.inf, -np.inf], np.nan) + bucket = pd.cut(values, bins=edges, include_lowest=True, labels=False, duplicates="drop") + bucketed[feature] = bucket.astype("float") + logging.info("trader.training.entry_condition_pair_bucketed featureCount=%s", len(bucketed)) + return bucketed + + +def _split_baselines(dataset: pd.DataFrame, target_col: str, edge_col: str, mae_col: str) -> dict[str, dict[str, float]]: + baselines: dict[str, dict[str, float]] = {} + for split_id in ALL_SPLITS: + part = dataset[dataset["split_id"].eq(split_id)] + if part.empty: + continue + baselines[split_id] = { + "rows": float(len(part)), + "positive_rate": float(part[target_col].mean()), + "avg_edge_bps": float(part[edge_col].mean()), + "avg_mae_bps": float(part[mae_col].mean()), + } + return baselines + + +def _seed_conditions( + dataset: pd.DataFrame, + bucketed_features: dict[str, pd.Series], + side: str, + target_col: str, + edge_col: str, + mae_col: str, + baselines: dict[str, dict[str, float]], + min_seed_rows: int, + max_buckets_per_feature: int, + max_seed_conditions_per_side: int, +) -> pd.DataFrame: + tune_mask = dataset["split_id"].eq(TUNE_SPLIT) + baseline = baselines[TUNE_SPLIT] + rows: list[dict[str, Any]] = [] + for feature, bucket in bucketed_features.items(): + working = dataset.loc[tune_mask, [target_col, edge_col, mae_col]].copy() + working["bucket_index"] = bucket.loc[tune_mask].to_numpy() + working = working.dropna(subset=["bucket_index"]) + if working.empty: + continue + working["bucket_index"] = working["bucket_index"].astype(int) + for bucket_index, part in working.groupby("bucket_index", sort=True, observed=False): + if len(part) < min_seed_rows: + continue + avg_edge = float(part[edge_col].mean()) + positive_rate = float(part[target_col].mean()) + avg_mae = float(part[mae_col].mean()) + rows.append( + { + "side": side, + "feature": feature, + "bucket_index": int(bucket_index), + "tune_rows": int(len(part)), + "tune_positive_rate": positive_rate, + "tune_positive_rate_lift": positive_rate - baseline["positive_rate"], + "tune_avg_edge_bps": avg_edge, + "tune_avg_edge_lift_bps": avg_edge - baseline["avg_edge_bps"], + "tune_avg_mae_bps": avg_mae, + "tune_avg_mae_lift_bps": avg_mae - baseline["avg_mae_bps"], + } + ) + if not rows: + return pd.DataFrame() + seeds = pd.DataFrame(rows).sort_values(["feature", "tune_avg_edge_lift_bps", "tune_avg_edge_bps"], ascending=[True, False, False]) + seeds = seeds.groupby("feature", as_index=False, observed=False).head(max_buckets_per_feature) + seeds = seeds.sort_values(["tune_avg_edge_lift_bps", "tune_avg_edge_bps", "tune_rows"], ascending=[False, False, False]) + return seeds.head(max_seed_conditions_per_side).reset_index(drop=True) + + +def _condition_pair_rows( + dataset: pd.DataFrame, + bucketed_features: dict[str, pd.Series], + seeds: pd.DataFrame, + side: str, + target_col: str, + edge_col: str, + mae_col: str, + baselines: dict[str, dict[str, float]], + min_pair_rows: int, +) -> list[dict[str, Any]]: + if seeds.empty: + return [] + rows: list[dict[str, Any]] = [] + seed_records = seeds.to_dict("records") + for left, right in combinations(seed_records, 2): + left_feature = str(left["feature"]) + right_feature = str(right["feature"]) + if left_feature == right_feature: + continue + left_bucket = int(left["bucket_index"]) + right_bucket = int(right["bucket_index"]) + left_mask = bucketed_features[left_feature].eq(left_bucket) + right_mask = bucketed_features[right_feature].eq(right_bucket) + pair_mask = left_mask & right_mask + tune_rows = int((pair_mask & dataset["split_id"].eq(TUNE_SPLIT)).sum()) + if tune_rows < min_pair_rows: + continue + for split_id in ALL_SPLITS: + split_mask = pair_mask & dataset["split_id"].eq(split_id) + part = dataset.loc[split_mask, [target_col, edge_col, mae_col]] + if part.empty or split_id not in baselines: + continue + baseline = baselines[split_id] + avg_edge = float(part[edge_col].mean()) + positive_rate = float(part[target_col].mean()) + avg_mae = float(part[mae_col].mean()) + rows.append( + { + "side": side, + "left_feature": left_feature, + "left_bucket_index": left_bucket, + "right_feature": right_feature, + "right_bucket_index": right_bucket, + "split_id": split_id, + "row_count": int(len(part)), + "positive_rate": positive_rate, + "baseline_positive_rate": baseline["positive_rate"], + "positive_rate_lift": positive_rate - baseline["positive_rate"], + "avg_edge_bps": avg_edge, + "baseline_avg_edge_bps": baseline["avg_edge_bps"], + "avg_edge_lift_bps": avg_edge - baseline["avg_edge_bps"], + "avg_mae_bps": avg_mae, + "baseline_avg_mae_bps": baseline["avg_mae_bps"], + "avg_mae_lift_bps": avg_mae - baseline["avg_mae_bps"], + "median_edge_bps": float(part[edge_col].median()), + } + ) + return rows + + +def _select_candidates(pair_metrics: pd.DataFrame, min_pair_rows: int) -> pd.DataFrame: + tune = pair_metrics[pair_metrics["split_id"].eq(TUNE_SPLIT) & (pair_metrics["row_count"] >= min_pair_rows)].copy() + if tune.empty: + return pd.DataFrame() + key_columns = ["side", "left_feature", "left_bucket_index", "right_feature", "right_bucket_index"] + candidates = tune[key_columns + ["row_count", "positive_rate", "positive_rate_lift", "avg_edge_bps", "avg_edge_lift_bps", "avg_mae_bps", "avg_mae_lift_bps"]].rename( + columns={ + "row_count": "tune_rows", + "positive_rate": "tune_positive_rate", + "positive_rate_lift": "tune_positive_rate_lift", + "avg_edge_bps": "tune_avg_edge_bps", + "avg_edge_lift_bps": "tune_avg_edge_lift_bps", + "avg_mae_bps": "tune_avg_mae_bps", + "avg_mae_lift_bps": "tune_avg_mae_lift_bps", + } + ) + for split_id in (VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT): + split_rows = pair_metrics[pair_metrics["split_id"].eq(split_id)][ + key_columns + ["row_count", "positive_rate", "positive_rate_lift", "avg_edge_bps", "avg_edge_lift_bps", "avg_mae_bps", "avg_mae_lift_bps"] + ].rename( + columns={ + "row_count": f"{split_id}_rows", + "positive_rate": f"{split_id}_positive_rate", + "positive_rate_lift": f"{split_id}_positive_rate_lift", + "avg_edge_bps": f"{split_id}_avg_edge_bps", + "avg_edge_lift_bps": f"{split_id}_avg_edge_lift_bps", + "avg_mae_bps": f"{split_id}_avg_mae_bps", + "avg_mae_lift_bps": f"{split_id}_avg_mae_lift_bps", + } + ) + candidates = candidates.merge(split_rows, on=key_columns, how="left") + + edge_columns = ["tune_avg_edge_bps", f"{VALIDATION_LOCKED_SPLIT}_avg_edge_bps", f"{LATEST_STRESS_SPLIT}_avg_edge_bps"] + lift_columns = ["tune_avg_edge_lift_bps", f"{VALIDATION_LOCKED_SPLIT}_avg_edge_lift_bps", f"{LATEST_STRESS_SPLIT}_avg_edge_lift_bps"] + row_columns = ["tune_rows", f"{VALIDATION_LOCKED_SPLIT}_rows", f"{LATEST_STRESS_SPLIT}_rows"] + positive_columns = ["tune_positive_rate", f"{VALIDATION_LOCKED_SPLIT}_positive_rate", f"{LATEST_STRESS_SPLIT}_positive_rate"] + candidates["stable_positive_edge"] = candidates[edge_columns].gt(0.0).all(axis=1) + candidates["stable_lift"] = candidates[lift_columns].gt(0.0).all(axis=1) + candidates["min_eval_edge_bps"] = candidates[edge_columns].min(axis=1) + candidates["mean_eval_edge_bps"] = candidates[edge_columns].mean(axis=1) + candidates["min_eval_rows"] = candidates[row_columns].min(axis=1) + candidates["min_eval_positive_rate"] = candidates[positive_columns].min(axis=1) + candidates["stable_enough_rows"] = candidates["min_eval_rows"].ge(min_pair_rows) + candidates["usable_candidate"] = candidates["stable_positive_edge"] & candidates["stable_lift"] & candidates["stable_enough_rows"] + candidates["screen_score"] = ( + candidates["min_eval_edge_bps"].fillna(-999.0) + + candidates["mean_eval_edge_bps"].fillna(-999.0) * 0.25 + + candidates["stable_lift"].astype(float) * 2.0 + + candidates["stable_enough_rows"].astype(float) + ) + return candidates.sort_values("screen_score", ascending=False).reset_index(drop=True) + + +def _markdown_report(result: dict[str, Any], candidates: pd.DataFrame) -> str: + lines = [ + "# Entry 组合条件筛查报告", + "", + "## 结论怎么读", + "", + "这份报告只回答一个问题:两个特征条件同时出现时,能不能稳定筛掉坏开仓点。", + "", + "- 只使用真实计划收益,不使用旧的最大可拿收益。", + "- `tune_inner` 用来挑条件组合。", + "- `validation_locked` 和 `latest_stress` 用来检查组合是否还能站住。", + "- `usable_candidate=true` 才表示这个组合既三段正收益、三段比大盘好、三段样本数也够。", + "", + "## 本次结果", + "", + f"- run_id: `{result['run_id']}`", + f"- 特征数: `{result['feature_count']}`", + f"- 种子条件数: `{result['seed_count']}`", + f"- 组合明细数: `{result['pair_metric_count']}`", + f"- 候选组合数: `{result['candidate_count']}`", + f"- 稳定候选数: `{result['stable_candidate_count']}`", + f"- 单条件最小行数: `{result['min_seed_rows']}`", + f"- 组合最小行数: `{result['min_pair_rows']}`", + "", + ] + if candidates.empty: + lines.extend(["## 候选组合", "", "没有找到满足最小样本数的组合条件。", ""]) + return "\n".join(lines) + display_columns = [ + "side", + "left_feature", + "left_bucket_index", + "right_feature", + "right_bucket_index", + "tune_avg_edge_bps", + f"{VALIDATION_LOCKED_SPLIT}_avg_edge_bps", + f"{LATEST_STRESS_SPLIT}_avg_edge_bps", + "min_eval_edge_bps", + "min_eval_rows", + "stable_positive_edge", + "stable_lift", + "usable_candidate", + "screen_score", + ] + lines.extend( + [ + "## 候选组合", + "", + _markdown_table(candidates[display_columns].head(25)), + "", + "## 文件", + "", + "- `diagnostics/entry_condition_pair_seeds.csv`: 进入组合筛查的单条件。", + "- `diagnostics/entry_condition_pair_metrics.csv`: 每个组合在每个数据段的完整明细。", + "- `diagnostics/entry_condition_pair_candidates.csv`: 按调参集挑出的组合候选,以及封存验证/压力检查结果。", + "", + ] + ) + return "\n".join(lines)