Files
quant-trader-service/training/trader_training/nonlinear_pm_probe.py
2026-06-28 09:59:36 +08:00

475 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import itertools
import logging
from typing import Any
import numpy as np
import pandas as pd
from sklearn.ensemble import HistGradientBoostingClassifier, HistGradientBoostingRegressor
from trader_training.io_utils import read_parquet, run_root, write_json, write_parquet, write_text
from trader_training.pm import _pm_config_from_thresholds, _pm_frame, _price_plan_context, _simulate_open_trades, _trade_metrics
from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT
EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)
def probe_nonlinear_pm(args: Any) -> None:
root = run_root(args)
direction_dataset = read_parquet(root / "dataset" / "direction_train.parquet")
entry_dataset = read_parquet(root / "dataset" / "entry_train.parquet")
probe_mode = _probe_mode(args)
entry_train_filter = _entry_train_filter(args)
entry_opportunity_bps = float(getattr(args, "entry_opportunity_bps", 40.0) or 40.0)
direction_model = _fit_direction_model(direction_dataset) if probe_mode == "direction_entry_tree" else None
entry_models = _fit_entry_models(direction_dataset, entry_dataset, entry_train_filter, entry_opportunity_bps)
frames = {
split_id: _prediction_frame(root, split_id, direction_dataset, entry_dataset, direction_model, entry_models)
for split_id in EVAL_SPLITS
}
price_plan = _price_plan_context(root)
candidates = _expanded_threshold_candidates()
tune_rows: list[dict[str, Any]] = []
best_thresholds: dict[str, float] | None = None
best_tune_metrics: dict[str, Any] | None = None
best_score = -float("inf")
for thresholds in candidates:
trades = _simulate_open_trades(frames[TUNE_SPLIT], thresholds, _pm_config_from_thresholds(thresholds), price_plan)
metrics = _trade_metrics(trades)
score = _probe_score(metrics)
tune_rows.append({**thresholds, **metrics, "score": score})
if score > best_score:
best_score = score
best_thresholds = thresholds
best_tune_metrics = metrics
if best_thresholds is None or best_tune_metrics is None:
raise ValueError("nonlinear PM probe did not evaluate any threshold candidate")
split_metrics: dict[str, Any] = {}
split_trade_frames: dict[str, pd.DataFrame] = {}
for split_id, frame in frames.items():
trades = _simulate_open_trades(frame, best_thresholds, _pm_config_from_thresholds(best_thresholds), price_plan)
trades = trades.copy()
trades["eval_split"] = split_id
split_trade_frames[split_id] = trades
split_metrics[split_id] = _trade_metrics(trades)
side_metrics = _side_metrics(split_trade_frames)
tune_frame = pd.DataFrame(tune_rows).sort_values("score", ascending=False).reset_index(drop=True)
result = {
"run_id": args.run_id,
"purpose": "diagnostic_only_not_exported",
"model_family": "sklearn_hist_gradient_boosting",
"probe_mode": probe_mode,
"entry_train_filter": entry_train_filter,
"entry_opportunity_bps": entry_opportunity_bps,
"candidate_count": len(candidates),
"candidate_summary": _candidate_summary(tune_frame),
"best_thresholds": best_thresholds,
"best_tune_metrics": best_tune_metrics,
"split_metrics": split_metrics,
"side_metrics": side_metrics,
"verdict": _verdict(split_metrics),
}
out_dir = root / "diagnostics"
output_stem = _output_stem(probe_mode)
trade_parts = [trades for trades in split_trade_frames.values() if not trades.empty]
best_trade_frame = pd.concat(trade_parts, ignore_index=True) if trade_parts else pd.DataFrame()
write_json(out_dir / f"{output_stem}_result.json", _jsonable(result))
write_text(out_dir / f"{output_stem}_candidates.csv", tune_frame.head(200).to_csv(index=False))
write_parquet(out_dir / f"{output_stem}_best_trades.parquet", best_trade_frame)
write_text(out_dir / f"{output_stem}_side_metrics.csv", _side_metrics_frame(side_metrics).to_csv(index=False))
write_text(out_dir / f"{output_stem}_report.md", _markdown_report(result, tune_frame.head(20)))
logging.info(
"trader.training.nonlinear_pm_probe_written runId=%s probeMode=%s entryTrainFilter=%s verdict=%s tuneTrades=%s validationTrades=%s stressTrades=%s",
args.run_id,
probe_mode,
entry_train_filter,
result["verdict"]["status"],
split_metrics[TUNE_SPLIT]["trade_count"],
split_metrics[VALIDATION_LOCKED_SPLIT]["trade_count"],
split_metrics[LATEST_STRESS_SPLIT]["trade_count"],
)
def _probe_mode(args: Any) -> str:
mode = str(getattr(args, "probe_mode", "direction_entry_tree") or "direction_entry_tree").strip().lower()
allowed = {"direction_entry_tree", "entry_tree_only"}
if mode not in allowed:
raise ValueError(f"unsupported nonlinear PM probe mode: {mode}")
return mode
def _entry_train_filter(args: Any) -> str:
value = str(getattr(args, "entry_train_filter", "direction_label") or "direction_label").strip().lower()
allowed = {"direction_label", "side_opportunity"}
if value not in allowed:
raise ValueError(f"unsupported nonlinear Entry train filter: {value}")
return value
def _output_stem(probe_mode: str) -> str:
return "nonlinear_pm_probe" if probe_mode == "direction_entry_tree" else f"nonlinear_pm_probe_{probe_mode}"
def _fit_direction_model(dataset: pd.DataFrame) -> HistGradientBoostingClassifier:
train = dataset[dataset["split_id"].eq(FIT_SPLIT)].copy()
y = train[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1)
model = HistGradientBoostingClassifier(
max_iter=160,
learning_rate=0.04,
max_leaf_nodes=31,
l2_regularization=0.02,
early_stopping=True,
random_state=41,
)
model.fit(_x(train), y)
return model
def _fit_entry_models(direction_dataset: pd.DataFrame, entry_dataset: pd.DataFrame, entry_train_filter: str, opportunity_bps: float) -> dict[str, Any]:
long_train = _entry_side_fit_frame(direction_dataset, entry_dataset, "LONG", entry_train_filter, opportunity_bps)
short_train = _entry_side_fit_frame(direction_dataset, entry_dataset, "SHORT", entry_train_filter, opportunity_bps)
return {
"long_entry_prob": _fit_binary_head(long_train, "long_entry_target", seed=43),
"short_entry_prob": _fit_binary_head(short_train, "short_entry_target", seed=47),
"long_expected_net_edge_bps": _fit_regression_head(long_train, "long_actual_plan_net_edge_bps", seed=53),
"short_expected_net_edge_bps": _fit_regression_head(short_train, "short_actual_plan_net_edge_bps", seed=59),
}
def _entry_side_fit_frame(
direction_dataset: pd.DataFrame,
entry_dataset: pd.DataFrame,
side: str,
entry_train_filter: str,
opportunity_bps: float,
) -> pd.DataFrame:
side_lower = side.lower()
train = entry_dataset[entry_dataset["split_id"].eq(FIT_SPLIT)].copy()
if entry_train_filter == "direction_label":
label_column = f"{side_lower}_target"
required = {"sample_id", label_column}
missing = sorted(required - set(direction_dataset.columns))
if missing:
raise ValueError(f"direction dataset missing columns for nonlinear Entry filter: {missing}")
train = train.merge(direction_dataset[list(required)], on="sample_id", how="inner", validate="one_to_one")
if len(train) == 0:
raise ValueError(f"nonlinear Entry {side} direction-label filter produced no rows")
mask = pd.to_numeric(train[label_column], errors="coerce").fillna(0).astype(int).eq(1)
filter_name = f"DIRECTION_LABEL_{side}_FIT_ROWS"
elif entry_train_filter == "side_opportunity":
opportunity_column = f"{side_lower}_max_achievable_net_edge_bps"
if opportunity_column not in train.columns:
raise ValueError(f"entry dataset missing {opportunity_column} for nonlinear Entry side-opportunity filter")
mask = pd.to_numeric(train[opportunity_column], errors="coerce").ge(opportunity_bps).fillna(False)
filter_name = f"SIDE_OPPORTUNITY_{side}_GE_{opportunity_bps:g}_BPS_FIT_ROWS"
else:
raise ValueError(f"unsupported nonlinear Entry train filter: {entry_train_filter}")
out = train.loc[mask].copy()
logging.info(
"trader.training.nonlinear_entry_fit_frame side=%s filter=%s rows=%s totalFitRows=%s",
side,
filter_name,
len(out),
len(train),
)
return out
def _fit_binary_head(train: pd.DataFrame, target: str, seed: int) -> HistGradientBoostingClassifier:
if len(train) < 1000:
raise ValueError(f"not enough rows to train nonlinear Entry head {target}: {len(train)}")
y = train[target].astype(int).to_numpy()
if len(np.unique(y)) < 2:
raise ValueError(f"nonlinear Entry head {target} has only one class")
model = HistGradientBoostingClassifier(
max_iter=180,
learning_rate=0.04,
max_leaf_nodes=31,
l2_regularization=0.02,
early_stopping=True,
random_state=seed,
)
model.fit(_x(train), y)
return model
def _fit_regression_head(train: pd.DataFrame, target: str, seed: int) -> HistGradientBoostingRegressor:
if len(train) < 1000:
raise ValueError(f"not enough rows to train nonlinear Entry head {target}: {len(train)}")
model = HistGradientBoostingRegressor(
max_iter=180,
learning_rate=0.04,
max_leaf_nodes=31,
l2_regularization=0.02,
early_stopping=True,
random_state=seed,
)
model.fit(_x(train), train[target].astype(float).to_numpy())
return model
def _prediction_frame(
root,
split_id: str,
direction_dataset: pd.DataFrame,
entry_dataset: pd.DataFrame,
direction_model: HistGradientBoostingClassifier | None,
entry_models: dict[str, Any],
) -> pd.DataFrame:
frame = _pm_frame(root, split_id).copy()
entry_split = entry_dataset[entry_dataset["split_id"].eq(split_id)].copy()
entry_pred = entry_split[["sample_id"]].copy()
entry_pred["long_entry_prob"] = entry_models["long_entry_prob"].predict_proba(_x(entry_split))[:, 1]
entry_pred["short_entry_prob"] = entry_models["short_entry_prob"].predict_proba(_x(entry_split))[:, 1]
entry_pred["pred_long_expected_net_edge_bps"] = entry_models["long_expected_net_edge_bps"].predict(_x(entry_split))
entry_pred["pred_short_expected_net_edge_bps"] = entry_models["short_expected_net_edge_bps"].predict(_x(entry_split))
replacements = entry_pred
drop_columns = [
"long_entry_prob",
"short_entry_prob",
"pred_long_expected_net_edge_bps",
"pred_short_expected_net_edge_bps",
]
if direction_model is not None:
direction_split = direction_dataset[direction_dataset["split_id"].eq(split_id)].copy()
direction_proba = direction_model.predict_proba(_x(direction_split))
direction_pred = direction_split[["sample_id"]].copy()
direction_pred["long_prob"] = direction_proba[:, 0]
direction_pred["short_prob"] = direction_proba[:, 1]
direction_pred["neutral_prob"] = direction_proba[:, 2]
replacements = direction_pred.merge(entry_pred, on="sample_id", how="inner", validate="one_to_one")
drop_columns.extend(["long_prob", "short_prob", "neutral_prob"])
out = frame.drop(columns=drop_columns, errors="ignore").merge(replacements, on="sample_id", how="inner", validate="one_to_one")
if len(out) != len(frame):
raise ValueError(f"nonlinear prediction frame lost rows for {split_id}: before={len(frame)} after={len(out)}")
return out
def _expanded_threshold_candidates() -> list[dict[str, float]]:
# 多头和空头在不同市场段里的可靠性可能完全不同;这里分开搜,
# 1.01 表示这一侧不开仓,用来检查只做多或只做空是否更稳。
values = itertools.product(
[0.20, 0.30, 0.40, 0.50, 0.60, 1.01],
[0.20, 0.30, 0.40, 0.50, 0.60, 1.01],
[0.05, 0.10, 0.20, 0.30, 0.40, 0.50],
[0.45, 0.65, 0.85, 1.00],
[-5.0, 0.0, 3.0, 5.0, 8.0],
[0.00, 0.01, 0.02, 0.05],
)
return [
{
"long_open_prob": long_prob,
"short_open_prob": short_prob,
"min_entry_prob": entry_prob,
"max_market_risk_prob": risk_prob,
"min_expected_edge_bps": edge_bps,
"min_direction_margin": margin,
}
for long_prob, short_prob, entry_prob, risk_prob, edge_bps, margin in values
]
def _probe_score(metrics: dict[str, Any]) -> float:
if metrics["trade_count"] == 0:
return -1_000_000.0
sample_penalty = max(0, 80 - int(metrics["trade_count"])) * 2.0
return (
float(metrics["avg_weighted_edge_bps"]) * np.sqrt(float(metrics["trade_count"]))
+ float(metrics["total_weighted_edge_bps"]) * 0.03
- float(metrics["max_drawdown_bps"]) * 0.20
- sample_penalty
)
def _side_metrics(split_trade_frames: dict[str, pd.DataFrame]) -> dict[str, dict[str, dict[str, Any]]]:
metrics: dict[str, dict[str, dict[str, Any]]] = {}
for split_id, trades in split_trade_frames.items():
metrics[split_id] = {}
for side in ("LONG", "SHORT"):
side_trades = trades[trades["side"].eq(side)].copy() if not trades.empty else trades.copy()
metrics[split_id][side] = {**_trade_metrics(side_trades), **_exit_metrics(side_trades)}
return metrics
def _exit_metrics(trades: pd.DataFrame) -> dict[str, float]:
if trades.empty:
return {
"target_hit_rate": 0.0,
"stop_hit_rate": 0.0,
"timeout_exit_rate": 0.0,
"avg_time_to_exit_min": 0.0,
"p50_time_to_exit_min": 0.0,
}
target_hit = pd.to_numeric(trades["target_hit"], errors="coerce").fillna(0).astype(int)
stop_hit = pd.to_numeric(trades["stop_hit"], errors="coerce").fillna(0).astype(int)
time_to_exit_min = pd.to_numeric(trades["time_to_exit_ms"], errors="coerce").fillna(0.0).astype(float) / 60_000.0
return {
"target_hit_rate": float(target_hit.eq(1).mean()),
"stop_hit_rate": float(stop_hit.eq(1).mean()),
"timeout_exit_rate": float((target_hit.ne(1) & stop_hit.ne(1)).mean()),
"avg_time_to_exit_min": float(time_to_exit_min.mean()),
"p50_time_to_exit_min": float(time_to_exit_min.median()),
}
def _side_metrics_frame(side_metrics: dict[str, dict[str, dict[str, Any]]]) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
for split_id, split_metrics in side_metrics.items():
for side, metrics in split_metrics.items():
rows.append({"split_id": split_id, "side": side, **metrics})
return pd.DataFrame(rows)
def _candidate_summary(tune_frame: pd.DataFrame) -> dict[str, Any]:
if tune_frame.empty:
return {
"positive_avg_weighted_candidates": 0,
"positive_total_weighted_candidates": 0,
"best_avg_weighted_edge_bps": 0.0,
"best_total_weighted_edge_bps": 0.0,
"min_viable_trade_count": 80,
"positive_avg_weighted_viable_candidates": 0,
"positive_total_weighted_viable_candidates": 0,
"best_viable_avg_weighted_edge_bps": 0.0,
"best_viable_total_weighted_edge_bps": 0.0,
}
viable = tune_frame[tune_frame["trade_count"] >= 80]
return {
"positive_avg_weighted_candidates": int((tune_frame["avg_weighted_edge_bps"] > 0).sum()),
"positive_total_weighted_candidates": int((tune_frame["total_weighted_edge_bps"] > 0).sum()),
"best_avg_weighted_edge_bps": float(tune_frame["avg_weighted_edge_bps"].max()),
"best_total_weighted_edge_bps": float(tune_frame["total_weighted_edge_bps"].max()),
"min_viable_trade_count": 80,
"positive_avg_weighted_viable_candidates": int((viable["avg_weighted_edge_bps"] > 0).sum()),
"positive_total_weighted_viable_candidates": int((viable["total_weighted_edge_bps"] > 0).sum()),
"best_viable_avg_weighted_edge_bps": float(viable["avg_weighted_edge_bps"].max()) if not viable.empty else 0.0,
"best_viable_total_weighted_edge_bps": float(viable["total_weighted_edge_bps"].max()) if not viable.empty else 0.0,
}
def _verdict(metrics: dict[str, Any]) -> dict[str, Any]:
tune = metrics[TUNE_SPLIT]
validation = metrics[VALIDATION_LOCKED_SPLIT]
stress = metrics[LATEST_STRESS_SPLIT]
passed = (
tune["trade_count"] >= 80
and validation["trade_count"] >= 40
and stress["trade_count"] >= 10
and tune["avg_weighted_edge_bps"] > 0
and validation["avg_weighted_edge_bps"] > 0
and stress["avg_weighted_edge_bps"] > -1.0
)
return {
"status": "PROMISING_DIAGNOSTIC_ONLY" if passed else "NO_STABLE_NONLINEAR_PM_EDGE",
"reason": "只用于判断树模型方向是否值得继续工程化,不代表可上线。",
}
def _x(frame: pd.DataFrame) -> np.ndarray:
return frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32").to_numpy()
def _markdown_report(result: dict[str, Any], top_candidates: pd.DataFrame) -> str:
mode_text = "只替换 EntryDirection 使用当前模型输出。" if result["probe_mode"] == "entry_tree_only" else "Direction 和 Entry 都替换成树模型。"
lines = [
"# Nonlinear PM Probe Report",
"",
"这份报告只做诊断,不导出上线模型。它回答:不加新特征,换成树模型后,PM 能不能筛出稳定正收益。",
"",
f"- run_id: `{result['run_id']}`",
f"- probe_mode: `{result['probe_mode']}`",
f"- 说明: {mode_text}",
f"- Entry 训练人群: `{result['entry_train_filter']}`",
f"- Entry 机会阈值: `{result['entry_opportunity_bps']}` bps",
f"- verdict: `{result['verdict']['status']}`",
f"- candidate_count: `{result['candidate_count']}`",
f"- 正收益候选数: `{result['candidate_summary']['positive_avg_weighted_candidates']}`",
f"- 至少 80 单的正收益候选数: `{result['candidate_summary']['positive_avg_weighted_viable_candidates']}`",
f"- 至少 80 单的最好单笔加权收益: `{result['candidate_summary']['best_viable_avg_weighted_edge_bps']:.4f}` bps",
f"- best_thresholds: `{result['best_thresholds']}`",
"",
"## Split Metrics",
"",
"| split | trades | win_rate | avg_actual_bps | avg_weighted_bps | total_weighted_bps | profit_factor |",
"| --- | ---: | ---: | ---: | ---: | ---: | ---: |",
]
for split_id, metrics in result["split_metrics"].items():
lines.append(
f"| {split_id} | {metrics['trade_count']} | {metrics['win_rate']:.4f} | "
f"{metrics['avg_actual_edge_bps']:.4f} | {metrics['avg_weighted_edge_bps']:.4f} | "
f"{metrics['total_weighted_edge_bps']:.4f} | {metrics['profit_factor']:.4f} |"
)
lines.extend(
[
"",
"## Side Breakdown",
"",
"| split | side | trades | win_rate | avg_actual_bps | avg_weighted_bps | target_hit_rate | stop_hit_rate | timeout_rate | avg_exit_min |",
"| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |",
]
)
for split_id, side_metrics in result["side_metrics"].items():
for side, metrics in side_metrics.items():
lines.append(
f"| {split_id} | {side} | {metrics['trade_count']} | {metrics['win_rate']:.4f} | "
f"{metrics['avg_actual_edge_bps']:.4f} | {metrics['avg_weighted_edge_bps']:.4f} | "
f"{metrics['target_hit_rate']:.4f} | {metrics['stop_hit_rate']:.4f} | "
f"{metrics['timeout_exit_rate']:.4f} | {metrics['avg_time_to_exit_min']:.2f} |"
)
lines.extend(["", "## Top Tune Candidates", "", _candidate_table(top_candidates), ""])
return "\n".join(lines)
def _candidate_table(frame: pd.DataFrame) -> str:
if frame.empty:
return "无候选。"
columns = [
"long_open_prob",
"short_open_prob",
"min_entry_prob",
"max_market_risk_prob",
"min_expected_edge_bps",
"min_direction_margin",
"trade_count",
"avg_weighted_edge_bps",
"total_weighted_edge_bps",
"profit_factor",
"score",
]
available = [column for column in columns if column in frame.columns]
lines = [
"| " + " | ".join(available) + " |",
"| " + " | ".join(["---" for _ in available]) + " |",
]
for _, row in frame[available].iterrows():
values = []
for column in available:
value = row[column]
if isinstance(value, (float, np.floating)):
values.append(f"{float(value):.6f}")
else:
values.append(str(value))
lines.append("| " + " | ".join(values) + " |")
return "\n".join(lines)
def _jsonable(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _jsonable(item) for key, item in value.items()}
if isinstance(value, list):
return [_jsonable(item) for item in value]
if isinstance(value, (np.integer,)):
return int(value)
if isinstance(value, (np.floating,)):
return float(value)
if isinstance(value, np.ndarray):
return value.tolist()
return value