from __future__ import annotations import logging from itertools import combinations from typing import Any import numpy as np import pandas as pd from trader_training.entry_feature_screen import _bucket_edges, _markdown_table from trader_training.io_utils import read_parquet, run_root, write_json, write_text from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) ALL_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) def screen_entry_condition_pairs(args: Any) -> None: root = run_root(args) dataset = read_parquet(root / "dataset" / "entry_train.parquet") _require_columns(dataset) min_seed_rows = int(args.min_seed_rows or 300) min_pair_rows = int(args.min_pair_rows or 150) max_seed_conditions_per_side = int(args.max_seed_conditions_per_side or 32) max_buckets_per_feature = int(args.max_buckets_per_feature or 2) rows: list[dict[str, Any]] = [] seed_frames: list[pd.DataFrame] = [] bucketed_features = _bucketed_features(dataset) for side in ("LONG", "SHORT"): target_col = "long_entry_target" if side == "LONG" else "short_entry_target" edge_col = _actual_edge_column(side) mae_col = "long_mae_bps" if side == "LONG" else "short_mae_bps" baselines = _split_baselines(dataset, target_col, edge_col, mae_col) seeds = _seed_conditions( dataset, bucketed_features, side, target_col, edge_col, mae_col, baselines, min_seed_rows, max_buckets_per_feature, max_seed_conditions_per_side, ) seed_frames.append(seeds) side_rows = _condition_pair_rows( dataset, bucketed_features, seeds, side, target_col, edge_col, mae_col, baselines, min_pair_rows, ) rows.extend(side_rows) logging.info( "trader.training.entry_condition_pair_side_screened side=%s seedCount=%s pairMetricRows=%s", side, len(seeds), len(side_rows), ) pair_metrics = pd.DataFrame(rows) candidates = _select_candidates(pair_metrics, min_pair_rows) if not pair_metrics.empty else pd.DataFrame() seeds_all = pd.concat(seed_frames, ignore_index=True) if seed_frames else pd.DataFrame() result = { "run_id": args.run_id, "dataset_path": str(root / "dataset" / "entry_train.parquet"), "feature_count": len(FEATURE_ORDER), "seed_count": int(len(seeds_all)), "pair_metric_count": int(len(pair_metrics)), "candidate_count": int(len(candidates)), "stable_candidate_count": int((candidates.get("stable_positive_edge", pd.Series(dtype=bool)) & candidates.get("stable_lift", pd.Series(dtype=bool))).sum()) if not candidates.empty else 0, "min_seed_rows": min_seed_rows, "min_pair_rows": min_pair_rows, "max_seed_conditions_per_side": max_seed_conditions_per_side, "max_buckets_per_feature": max_buckets_per_feature, "selection_rule": "single buckets are chosen on tune_inner, then feature-pair intersections are checked on tune_inner/validation_locked/latest_stress", } write_json(root / "diagnostics" / "entry_condition_pair_screen_result.json", result) write_text(root / "diagnostics" / "entry_condition_pair_seeds.csv", seeds_all.to_csv(index=False)) write_text(root / "diagnostics" / "entry_condition_pair_metrics.csv", pair_metrics.to_csv(index=False)) write_text(root / "diagnostics" / "entry_condition_pair_candidates.csv", candidates.to_csv(index=False)) write_text(root / "diagnostics" / "entry_condition_pair_screen_report.md", _markdown_report(result, candidates)) logging.info( "trader.training.entry_condition_pair_screened runId=%s seedCount=%s pairMetricCount=%s candidateCount=%s reportPath=%s", args.run_id, len(seeds_all), len(pair_metrics), len(candidates), root / "diagnostics" / "entry_condition_pair_screen_report.md", ) def _require_columns(dataset: pd.DataFrame) -> None: required = { "split_id", *FEATURE_ORDER, "long_entry_target", "short_entry_target", "long_actual_plan_net_edge_bps", "short_actual_plan_net_edge_bps", "long_mae_bps", "short_mae_bps", } missing = sorted(required.difference(dataset.columns)) if missing: raise ValueError(f"entry condition pair screen missing required columns: {missing}") def _actual_edge_column(side: str) -> str: if side == "LONG": return "long_actual_plan_net_edge_bps" if side == "SHORT": return "short_actual_plan_net_edge_bps" raise ValueError(f"unsupported side: {side}") def _bucketed_features(dataset: pd.DataFrame) -> dict[str, pd.Series]: bucketed: dict[str, pd.Series] = {} fit_mask = dataset["split_id"].eq(FIT_SPLIT) for feature in FEATURE_ORDER: train_values = pd.to_numeric(dataset.loc[fit_mask, feature], errors="coerce").replace([np.inf, -np.inf], np.nan).dropna() edges = _bucket_edges(train_values.to_numpy(dtype="float64")) if len(edges) < 3: continue values = pd.to_numeric(dataset[feature], errors="coerce").replace([np.inf, -np.inf], np.nan) bucket = pd.cut(values, bins=edges, include_lowest=True, labels=False, duplicates="drop") bucketed[feature] = bucket.astype("float") logging.info("trader.training.entry_condition_pair_bucketed featureCount=%s", len(bucketed)) return bucketed def _split_baselines(dataset: pd.DataFrame, target_col: str, edge_col: str, mae_col: str) -> dict[str, dict[str, float]]: baselines: dict[str, dict[str, float]] = {} for split_id in ALL_SPLITS: part = dataset[dataset["split_id"].eq(split_id)] if part.empty: continue baselines[split_id] = { "rows": float(len(part)), "positive_rate": float(part[target_col].mean()), "avg_edge_bps": float(part[edge_col].mean()), "avg_mae_bps": float(part[mae_col].mean()), } return baselines def _seed_conditions( dataset: pd.DataFrame, bucketed_features: dict[str, pd.Series], side: str, target_col: str, edge_col: str, mae_col: str, baselines: dict[str, dict[str, float]], min_seed_rows: int, max_buckets_per_feature: int, max_seed_conditions_per_side: int, ) -> pd.DataFrame: tune_mask = dataset["split_id"].eq(TUNE_SPLIT) baseline = baselines[TUNE_SPLIT] rows: list[dict[str, Any]] = [] for feature, bucket in bucketed_features.items(): working = dataset.loc[tune_mask, [target_col, edge_col, mae_col]].copy() working["bucket_index"] = bucket.loc[tune_mask].to_numpy() working = working.dropna(subset=["bucket_index"]) if working.empty: continue working["bucket_index"] = working["bucket_index"].astype(int) for bucket_index, part in working.groupby("bucket_index", sort=True, observed=False): if len(part) < min_seed_rows: continue avg_edge = float(part[edge_col].mean()) positive_rate = float(part[target_col].mean()) avg_mae = float(part[mae_col].mean()) rows.append( { "side": side, "feature": feature, "bucket_index": int(bucket_index), "tune_rows": int(len(part)), "tune_positive_rate": positive_rate, "tune_positive_rate_lift": positive_rate - baseline["positive_rate"], "tune_avg_edge_bps": avg_edge, "tune_avg_edge_lift_bps": avg_edge - baseline["avg_edge_bps"], "tune_avg_mae_bps": avg_mae, "tune_avg_mae_lift_bps": avg_mae - baseline["avg_mae_bps"], } ) if not rows: return pd.DataFrame() seeds = pd.DataFrame(rows).sort_values(["feature", "tune_avg_edge_lift_bps", "tune_avg_edge_bps"], ascending=[True, False, False]) seeds = seeds.groupby("feature", as_index=False, observed=False).head(max_buckets_per_feature) seeds = seeds.sort_values(["tune_avg_edge_lift_bps", "tune_avg_edge_bps", "tune_rows"], ascending=[False, False, False]) return seeds.head(max_seed_conditions_per_side).reset_index(drop=True) def _condition_pair_rows( dataset: pd.DataFrame, bucketed_features: dict[str, pd.Series], seeds: pd.DataFrame, side: str, target_col: str, edge_col: str, mae_col: str, baselines: dict[str, dict[str, float]], min_pair_rows: int, ) -> list[dict[str, Any]]: if seeds.empty: return [] rows: list[dict[str, Any]] = [] seed_records = seeds.to_dict("records") for left, right in combinations(seed_records, 2): left_feature = str(left["feature"]) right_feature = str(right["feature"]) if left_feature == right_feature: continue left_bucket = int(left["bucket_index"]) right_bucket = int(right["bucket_index"]) left_mask = bucketed_features[left_feature].eq(left_bucket) right_mask = bucketed_features[right_feature].eq(right_bucket) pair_mask = left_mask & right_mask tune_rows = int((pair_mask & dataset["split_id"].eq(TUNE_SPLIT)).sum()) if tune_rows < min_pair_rows: continue for split_id in ALL_SPLITS: split_mask = pair_mask & dataset["split_id"].eq(split_id) part = dataset.loc[split_mask, [target_col, edge_col, mae_col]] if part.empty or split_id not in baselines: continue baseline = baselines[split_id] avg_edge = float(part[edge_col].mean()) positive_rate = float(part[target_col].mean()) avg_mae = float(part[mae_col].mean()) rows.append( { "side": side, "left_feature": left_feature, "left_bucket_index": left_bucket, "right_feature": right_feature, "right_bucket_index": right_bucket, "split_id": split_id, "row_count": int(len(part)), "positive_rate": positive_rate, "baseline_positive_rate": baseline["positive_rate"], "positive_rate_lift": positive_rate - baseline["positive_rate"], "avg_edge_bps": avg_edge, "baseline_avg_edge_bps": baseline["avg_edge_bps"], "avg_edge_lift_bps": avg_edge - baseline["avg_edge_bps"], "avg_mae_bps": avg_mae, "baseline_avg_mae_bps": baseline["avg_mae_bps"], "avg_mae_lift_bps": avg_mae - baseline["avg_mae_bps"], "median_edge_bps": float(part[edge_col].median()), } ) return rows def _select_candidates(pair_metrics: pd.DataFrame, min_pair_rows: int) -> pd.DataFrame: tune = pair_metrics[pair_metrics["split_id"].eq(TUNE_SPLIT) & (pair_metrics["row_count"] >= min_pair_rows)].copy() if tune.empty: return pd.DataFrame() key_columns = ["side", "left_feature", "left_bucket_index", "right_feature", "right_bucket_index"] candidates = tune[key_columns + ["row_count", "positive_rate", "positive_rate_lift", "avg_edge_bps", "avg_edge_lift_bps", "avg_mae_bps", "avg_mae_lift_bps"]].rename( columns={ "row_count": "tune_rows", "positive_rate": "tune_positive_rate", "positive_rate_lift": "tune_positive_rate_lift", "avg_edge_bps": "tune_avg_edge_bps", "avg_edge_lift_bps": "tune_avg_edge_lift_bps", "avg_mae_bps": "tune_avg_mae_bps", "avg_mae_lift_bps": "tune_avg_mae_lift_bps", } ) for split_id in (VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT): split_rows = pair_metrics[pair_metrics["split_id"].eq(split_id)][ key_columns + ["row_count", "positive_rate", "positive_rate_lift", "avg_edge_bps", "avg_edge_lift_bps", "avg_mae_bps", "avg_mae_lift_bps"] ].rename( columns={ "row_count": f"{split_id}_rows", "positive_rate": f"{split_id}_positive_rate", "positive_rate_lift": f"{split_id}_positive_rate_lift", "avg_edge_bps": f"{split_id}_avg_edge_bps", "avg_edge_lift_bps": f"{split_id}_avg_edge_lift_bps", "avg_mae_bps": f"{split_id}_avg_mae_bps", "avg_mae_lift_bps": f"{split_id}_avg_mae_lift_bps", } ) candidates = candidates.merge(split_rows, on=key_columns, how="left") edge_columns = ["tune_avg_edge_bps", f"{VALIDATION_LOCKED_SPLIT}_avg_edge_bps", f"{LATEST_STRESS_SPLIT}_avg_edge_bps"] lift_columns = ["tune_avg_edge_lift_bps", f"{VALIDATION_LOCKED_SPLIT}_avg_edge_lift_bps", f"{LATEST_STRESS_SPLIT}_avg_edge_lift_bps"] row_columns = ["tune_rows", f"{VALIDATION_LOCKED_SPLIT}_rows", f"{LATEST_STRESS_SPLIT}_rows"] positive_columns = ["tune_positive_rate", f"{VALIDATION_LOCKED_SPLIT}_positive_rate", f"{LATEST_STRESS_SPLIT}_positive_rate"] candidates["stable_positive_edge"] = candidates[edge_columns].gt(0.0).all(axis=1) candidates["stable_lift"] = candidates[lift_columns].gt(0.0).all(axis=1) candidates["min_eval_edge_bps"] = candidates[edge_columns].min(axis=1) candidates["mean_eval_edge_bps"] = candidates[edge_columns].mean(axis=1) candidates["min_eval_rows"] = candidates[row_columns].min(axis=1) candidates["min_eval_positive_rate"] = candidates[positive_columns].min(axis=1) candidates["stable_enough_rows"] = candidates["min_eval_rows"].ge(min_pair_rows) candidates["usable_candidate"] = candidates["stable_positive_edge"] & candidates["stable_lift"] & candidates["stable_enough_rows"] candidates["screen_score"] = ( candidates["min_eval_edge_bps"].fillna(-999.0) + candidates["mean_eval_edge_bps"].fillna(-999.0) * 0.25 + candidates["stable_lift"].astype(float) * 2.0 + candidates["stable_enough_rows"].astype(float) ) return candidates.sort_values("screen_score", ascending=False).reset_index(drop=True) def _markdown_report(result: dict[str, Any], candidates: pd.DataFrame) -> str: lines = [ "# Entry 组合条件筛查报告", "", "## 结论怎么读", "", "这份报告只回答一个问题:两个特征条件同时出现时,能不能稳定筛掉坏开仓点。", "", "- 只使用真实计划收益,不使用旧的最大可拿收益。", "- `tune_inner` 用来挑条件组合。", "- `validation_locked` 和 `latest_stress` 用来检查组合是否还能站住。", "- `usable_candidate=true` 才表示这个组合既三段正收益、三段比大盘好、三段样本数也够。", "", "## 本次结果", "", f"- run_id: `{result['run_id']}`", f"- 特征数: `{result['feature_count']}`", f"- 种子条件数: `{result['seed_count']}`", f"- 组合明细数: `{result['pair_metric_count']}`", f"- 候选组合数: `{result['candidate_count']}`", f"- 稳定候选数: `{result['stable_candidate_count']}`", f"- 单条件最小行数: `{result['min_seed_rows']}`", f"- 组合最小行数: `{result['min_pair_rows']}`", "", ] if candidates.empty: lines.extend(["## 候选组合", "", "没有找到满足最小样本数的组合条件。", ""]) return "\n".join(lines) display_columns = [ "side", "left_feature", "left_bucket_index", "right_feature", "right_bucket_index", "tune_avg_edge_bps", f"{VALIDATION_LOCKED_SPLIT}_avg_edge_bps", f"{LATEST_STRESS_SPLIT}_avg_edge_bps", "min_eval_edge_bps", "min_eval_rows", "stable_positive_edge", "stable_lift", "usable_candidate", "screen_score", ] lines.extend( [ "## 候选组合", "", _markdown_table(candidates[display_columns].head(25)), "", "## 文件", "", "- `diagnostics/entry_condition_pair_seeds.csv`: 进入组合筛查的单条件。", "- `diagnostics/entry_condition_pair_metrics.csv`: 每个组合在每个数据段的完整明细。", "- `diagnostics/entry_condition_pair_candidates.csv`: 按调参集挑出的组合候选,以及封存验证/压力检查结果。", "", ] ) return "\n".join(lines)