from __future__ import annotations import itertools import logging from typing import Any import numpy as np import pandas as pd from sklearn.ensemble import HistGradientBoostingClassifier, HistGradientBoostingRegressor from trader_training.io_utils import read_parquet, run_root, write_json, write_text from trader_training.pm import _pm_config_from_thresholds, _pm_frame, _price_plan_context, _simulate_open_trades, _trade_metrics from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) def probe_nonlinear_pm(args: Any) -> None: root = run_root(args) direction_dataset = read_parquet(root / "dataset" / "direction_train.parquet") entry_dataset = read_parquet(root / "dataset" / "entry_train.parquet") direction_model = _fit_direction_model(direction_dataset) entry_models = _fit_entry_models(direction_dataset, entry_dataset) frames = { split_id: _prediction_frame(root, split_id, direction_dataset, entry_dataset, direction_model, entry_models) for split_id in EVAL_SPLITS } price_plan = _price_plan_context(root) candidates = _expanded_threshold_candidates() tune_rows: list[dict[str, Any]] = [] best_thresholds: dict[str, float] | None = None best_tune_metrics: dict[str, Any] | None = None best_score = -float("inf") for thresholds in candidates: trades = _simulate_open_trades(frames[TUNE_SPLIT], thresholds, _pm_config_from_thresholds(thresholds), price_plan) metrics = _trade_metrics(trades) score = _probe_score(metrics) tune_rows.append({**thresholds, **metrics, "score": score}) if score > best_score: best_score = score best_thresholds = thresholds best_tune_metrics = metrics if best_thresholds is None or best_tune_metrics is None: raise ValueError("nonlinear PM probe did not evaluate any threshold candidate") split_metrics: dict[str, Any] = {} for split_id, frame in frames.items(): trades = _simulate_open_trades(frame, best_thresholds, _pm_config_from_thresholds(best_thresholds), price_plan) split_metrics[split_id] = _trade_metrics(trades) tune_frame = pd.DataFrame(tune_rows).sort_values("score", ascending=False).reset_index(drop=True) result = { "run_id": args.run_id, "purpose": "diagnostic_only_not_exported", "model_family": "sklearn_hist_gradient_boosting", "candidate_count": len(candidates), "best_thresholds": best_thresholds, "best_tune_metrics": best_tune_metrics, "split_metrics": split_metrics, "verdict": _verdict(split_metrics), } out_dir = root / "diagnostics" write_json(out_dir / "nonlinear_pm_probe_result.json", _jsonable(result)) write_text(out_dir / "nonlinear_pm_probe_candidates.csv", tune_frame.head(200).to_csv(index=False)) write_text(out_dir / "nonlinear_pm_probe_report.md", _markdown_report(result, tune_frame.head(20))) logging.info( "trader.training.nonlinear_pm_probe_written runId=%s verdict=%s tuneTrades=%s validationTrades=%s stressTrades=%s", args.run_id, result["verdict"]["status"], split_metrics[TUNE_SPLIT]["trade_count"], split_metrics[VALIDATION_LOCKED_SPLIT]["trade_count"], split_metrics[LATEST_STRESS_SPLIT]["trade_count"], ) def _fit_direction_model(dataset: pd.DataFrame) -> HistGradientBoostingClassifier: train = dataset[dataset["split_id"].eq(FIT_SPLIT)].copy() y = train[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1) model = HistGradientBoostingClassifier( max_iter=160, learning_rate=0.04, max_leaf_nodes=31, l2_regularization=0.02, early_stopping=True, random_state=41, ) model.fit(_x(train), y) return model def _fit_entry_models(direction_dataset: pd.DataFrame, entry_dataset: pd.DataFrame) -> dict[str, Any]: merged = entry_dataset.merge( direction_dataset[["sample_id", "long_target", "short_target"]], on="sample_id", how="inner", validate="one_to_one", ) train = merged[merged["split_id"].eq(FIT_SPLIT)].copy() return { "long_entry_prob": _fit_binary_head(train[train["long_target"].eq(1)], "long_entry_target", seed=43), "short_entry_prob": _fit_binary_head(train[train["short_target"].eq(1)], "short_entry_target", seed=47), "long_expected_net_edge_bps": _fit_regression_head(train[train["long_target"].eq(1)], "long_actual_plan_net_edge_bps", seed=53), "short_expected_net_edge_bps": _fit_regression_head(train[train["short_target"].eq(1)], "short_actual_plan_net_edge_bps", seed=59), } def _fit_binary_head(train: pd.DataFrame, target: str, seed: int) -> HistGradientBoostingClassifier: if len(train) < 1000: raise ValueError(f"not enough rows to train nonlinear Entry head {target}: {len(train)}") y = train[target].astype(int).to_numpy() if len(np.unique(y)) < 2: raise ValueError(f"nonlinear Entry head {target} has only one class") model = HistGradientBoostingClassifier( max_iter=180, learning_rate=0.04, max_leaf_nodes=31, l2_regularization=0.02, early_stopping=True, random_state=seed, ) model.fit(_x(train), y) return model def _fit_regression_head(train: pd.DataFrame, target: str, seed: int) -> HistGradientBoostingRegressor: if len(train) < 1000: raise ValueError(f"not enough rows to train nonlinear Entry head {target}: {len(train)}") model = HistGradientBoostingRegressor( max_iter=180, learning_rate=0.04, max_leaf_nodes=31, l2_regularization=0.02, early_stopping=True, random_state=seed, ) model.fit(_x(train), train[target].astype(float).to_numpy()) return model def _prediction_frame( root, split_id: str, direction_dataset: pd.DataFrame, entry_dataset: pd.DataFrame, direction_model: HistGradientBoostingClassifier, entry_models: dict[str, Any], ) -> pd.DataFrame: frame = _pm_frame(root, split_id).copy() direction_split = direction_dataset[direction_dataset["split_id"].eq(split_id)].copy() entry_split = entry_dataset[entry_dataset["split_id"].eq(split_id)].copy() direction_proba = direction_model.predict_proba(_x(direction_split)) direction_pred = direction_split[["sample_id"]].copy() direction_pred["long_prob"] = direction_proba[:, 0] direction_pred["short_prob"] = direction_proba[:, 1] direction_pred["neutral_prob"] = direction_proba[:, 2] entry_pred = entry_split[["sample_id"]].copy() entry_pred["long_entry_prob"] = entry_models["long_entry_prob"].predict_proba(_x(entry_split))[:, 1] entry_pred["short_entry_prob"] = entry_models["short_entry_prob"].predict_proba(_x(entry_split))[:, 1] entry_pred["pred_long_expected_net_edge_bps"] = entry_models["long_expected_net_edge_bps"].predict(_x(entry_split)) entry_pred["pred_short_expected_net_edge_bps"] = entry_models["short_expected_net_edge_bps"].predict(_x(entry_split)) replacements = direction_pred.merge(entry_pred, on="sample_id", how="inner", validate="one_to_one") out = frame.drop( columns=[ "long_prob", "short_prob", "neutral_prob", "long_entry_prob", "short_entry_prob", "pred_long_expected_net_edge_bps", "pred_short_expected_net_edge_bps", ], errors="ignore", ).merge(replacements, on="sample_id", how="inner", validate="one_to_one") if len(out) != len(frame): raise ValueError(f"nonlinear prediction frame lost rows for {split_id}: before={len(frame)} after={len(out)}") return out def _expanded_threshold_candidates() -> list[dict[str, float]]: values = itertools.product( [0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.60], [0.05, 0.10, 0.20, 0.30, 0.40, 0.50], [0.45, 0.65, 0.85, 1.00], [-5.0, 0.0, 3.0, 5.0, 8.0], [0.00, 0.01, 0.02, 0.05], ) return [ { "long_open_prob": direction_prob, "short_open_prob": direction_prob, "min_entry_prob": entry_prob, "max_market_risk_prob": risk_prob, "min_expected_edge_bps": edge_bps, "min_direction_margin": margin, } for direction_prob, entry_prob, risk_prob, edge_bps, margin in values ] def _probe_score(metrics: dict[str, Any]) -> float: if metrics["trade_count"] == 0: return -1_000_000.0 sample_penalty = max(0, 80 - int(metrics["trade_count"])) * 2.0 return ( float(metrics["avg_weighted_edge_bps"]) * np.sqrt(float(metrics["trade_count"])) + float(metrics["total_weighted_edge_bps"]) * 0.03 - float(metrics["max_drawdown_bps"]) * 0.20 - sample_penalty ) def _verdict(metrics: dict[str, Any]) -> dict[str, Any]: tune = metrics[TUNE_SPLIT] validation = metrics[VALIDATION_LOCKED_SPLIT] stress = metrics[LATEST_STRESS_SPLIT] passed = ( tune["trade_count"] >= 80 and validation["trade_count"] >= 40 and stress["trade_count"] >= 10 and tune["avg_weighted_edge_bps"] > 0 and validation["avg_weighted_edge_bps"] > 0 and stress["avg_weighted_edge_bps"] > -1.0 ) return { "status": "PROMISING_DIAGNOSTIC_ONLY" if passed else "NO_STABLE_NONLINEAR_PM_EDGE", "reason": "只用于判断树模型方向是否值得继续工程化,不代表可上线。", } def _x(frame: pd.DataFrame) -> np.ndarray: return frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32").to_numpy() def _markdown_report(result: dict[str, Any], top_candidates: pd.DataFrame) -> str: lines = [ "# Nonlinear PM Probe Report", "", "这份报告只做诊断,不导出上线模型。它回答:不加新特征,只换成树模型后,PM 能不能筛出稳定正收益。", "", f"- run_id: `{result['run_id']}`", f"- verdict: `{result['verdict']['status']}`", f"- candidate_count: `{result['candidate_count']}`", f"- best_thresholds: `{result['best_thresholds']}`", "", "## Split Metrics", "", "| split | trades | win_rate | avg_actual_bps | avg_weighted_bps | total_weighted_bps | profit_factor |", "| --- | ---: | ---: | ---: | ---: | ---: | ---: |", ] for split_id, metrics in result["split_metrics"].items(): lines.append( f"| {split_id} | {metrics['trade_count']} | {metrics['win_rate']:.4f} | " f"{metrics['avg_actual_edge_bps']:.4f} | {metrics['avg_weighted_edge_bps']:.4f} | " f"{metrics['total_weighted_edge_bps']:.4f} | {metrics['profit_factor']:.4f} |" ) lines.extend(["", "## Top Tune Candidates", "", _candidate_table(top_candidates), ""]) return "\n".join(lines) def _candidate_table(frame: pd.DataFrame) -> str: if frame.empty: return "无候选。" columns = [ "long_open_prob", "short_open_prob", "min_entry_prob", "max_market_risk_prob", "min_expected_edge_bps", "min_direction_margin", "trade_count", "avg_weighted_edge_bps", "total_weighted_edge_bps", "profit_factor", "score", ] available = [column for column in columns if column in frame.columns] lines = [ "| " + " | ".join(available) + " |", "| " + " | ".join(["---" for _ in available]) + " |", ] for _, row in frame[available].iterrows(): values = [] for column in available: value = row[column] if isinstance(value, (float, np.floating)): values.append(f"{float(value):.6f}") else: values.append(str(value)) lines.append("| " + " | ".join(values) + " |") return "\n".join(lines) def _jsonable(value: Any) -> Any: if isinstance(value, dict): return {str(key): _jsonable(item) for key, item in value.items()} if isinstance(value, list): return [_jsonable(item) for item in value] if isinstance(value, (np.integer,)): return int(value) if isinstance(value, (np.floating,)): return float(value) if isinstance(value, np.ndarray): return value.tolist() return value