Files

364 lines
16 KiB
Python
Raw Permalink Normal View History

2026-06-28 09:27:59 +08:00
from __future__ import annotations
import logging
from typing import Any
import numpy as np
import pandas as pd
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import roc_auc_score
from trader_training.io_utils import read_parquet, run_root, write_json, write_text
from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT
ALL_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)
EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)
def diagnose_good_trade_structure(args: Any) -> None:
root = run_root(args)
dataset = read_parquet(root / "dataset" / "entry_train.parquet")
min_good_edge_bps = float(args.min_good_edge_bps)
bad_edge_bps = float(args.bad_edge_bps)
top_fractions = tuple(float(item) for item in args.top_fractions)
_require_columns(dataset)
side_frames = {
side: _side_frame(dataset, side, min_good_edge_bps, bad_edge_bps)
for side in ("LONG", "SHORT")
}
split_summary = pd.concat([_split_summary(frame, side) for side, frame in side_frames.items()], ignore_index=True)
feature_rows = pd.concat([_feature_candidates(frame, side, top_fractions) for side, frame in side_frames.items()], ignore_index=True)
model_rows = pd.concat([_tree_model_top_rows(frame, side, top_fractions) for side, frame in side_frames.items()], ignore_index=True)
result = {
"run_id": args.run_id,
"min_good_edge_bps": min_good_edge_bps,
"bad_edge_bps": bad_edge_bps,
"feature_count": len(FEATURE_ORDER),
"feature_candidate_count": int(len(feature_rows)),
"stable_feature_count": int(feature_rows["stable_auc"].sum()) if not feature_rows.empty else 0,
"stable_positive_top_feature_count": int(feature_rows["stable_positive_top_edge"].sum()) if not feature_rows.empty else 0,
"tree_model_verdict": _tree_verdict(model_rows),
}
out_dir = root / "diagnostics"
write_json(out_dir / "good_trade_structure_result.json", _jsonable(result))
write_text(out_dir / "good_trade_split_summary.csv", split_summary.to_csv(index=False))
write_text(out_dir / "good_trade_feature_candidates.csv", feature_rows.to_csv(index=False))
write_text(out_dir / "good_trade_tree_model_top.csv", model_rows.to_csv(index=False))
write_text(out_dir / "good_trade_structure_report.md", _markdown_report(result, split_summary, feature_rows, model_rows))
logging.info(
"trader.training.good_trade_structure_written runId=%s stableFeatureCount=%s stablePositiveTopFeatureCount=%s treeVerdict=%s",
args.run_id,
result["stable_feature_count"],
result["stable_positive_top_feature_count"],
result["tree_model_verdict"]["status"],
)
def _require_columns(dataset: pd.DataFrame) -> None:
required = {
"split_id",
*FEATURE_ORDER,
"long_actual_plan_net_edge_bps",
"short_actual_plan_net_edge_bps",
}
missing = sorted(required - set(dataset.columns))
if missing:
raise ValueError(f"good trade structure diagnostic missing required columns: {missing}")
def _side_frame(dataset: pd.DataFrame, side: str, min_good_edge_bps: float, bad_edge_bps: float) -> pd.DataFrame:
edge_col = "long_actual_plan_net_edge_bps" if side == "LONG" else "short_actual_plan_net_edge_bps"
frame = dataset[["sample_id", "split_id", edge_col, *FEATURE_ORDER]].copy()
frame = frame.rename(columns={edge_col: "actual_edge_bps"})
frame["side"] = side
frame["actual_edge_bps"] = pd.to_numeric(frame["actual_edge_bps"], errors="coerce")
frame["good_trade"] = frame["actual_edge_bps"].ge(min_good_edge_bps).astype("int8")
frame["breakeven_trade"] = frame["actual_edge_bps"].ge(0.0).astype("int8")
frame["bad_trade"] = frame["actual_edge_bps"].le(bad_edge_bps).astype("int8")
return frame.dropna(subset=["actual_edge_bps"]).reset_index(drop=True)
def _split_summary(frame: pd.DataFrame, side: str) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
for split_id in ALL_SPLITS:
part = frame[frame["split_id"].eq(split_id)]
if part.empty:
continue
edge = part["actual_edge_bps"].astype(float)
rows.append(
{
"side": side,
"split_id": split_id,
"rows": int(len(part)),
"good_rate": float(part["good_trade"].mean()),
"breakeven_rate": float(part["breakeven_trade"].mean()),
"bad_rate": float(part["bad_trade"].mean()),
"avg_edge_bps": float(edge.mean()),
"p50_edge_bps": float(edge.quantile(0.50)),
"p90_edge_bps": float(edge.quantile(0.90)),
"p99_edge_bps": float(edge.quantile(0.99)),
}
)
return pd.DataFrame(rows)
def _feature_candidates(frame: pd.DataFrame, side: str, top_fractions: tuple[float, ...]) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
tune = frame[frame["split_id"].eq(TUNE_SPLIT)]
for feature in FEATURE_ORDER:
tune_auc = _raw_auc(tune, feature)
if tune_auc is None:
continue
direction = "HIGH" if tune_auc >= 0.5 else "LOW"
row: dict[str, Any] = {
"side": side,
"feature": feature,
"better_when": direction,
"tune_raw_auc": float(tune_auc),
}
directional_aucs = []
top_edges = []
top_good_rates = []
for split_id in EVAL_SPLITS:
part = frame[frame["split_id"].eq(split_id)]
directional_auc = _directional_auc(part, feature, direction)
top_metrics = _feature_top_metrics(part, feature, direction, top_fractions[0])
row[f"{split_id}_directional_auc"] = directional_auc
row[f"{split_id}_top{_fraction_label(top_fractions[0])}_rows"] = top_metrics["rows"]
row[f"{split_id}_top{_fraction_label(top_fractions[0])}_good_rate"] = top_metrics["good_rate"]
row[f"{split_id}_top{_fraction_label(top_fractions[0])}_avg_edge_bps"] = top_metrics["avg_edge_bps"]
if directional_auc is not None:
directional_aucs.append(float(directional_auc))
if top_metrics["rows"] > 0:
top_edges.append(float(top_metrics["avg_edge_bps"]))
top_good_rates.append(float(top_metrics["good_rate"]))
row["min_eval_directional_auc"] = min(directional_aucs) if directional_aucs else np.nan
row["min_top_avg_edge_bps"] = min(top_edges) if top_edges else np.nan
row["min_top_good_rate"] = min(top_good_rates) if top_good_rates else np.nan
row["stable_auc"] = bool(len(directional_aucs) == len(EVAL_SPLITS) and min(directional_aucs) >= 0.53)
row["stable_positive_top_edge"] = bool(len(top_edges) == len(EVAL_SPLITS) and min(top_edges) > 0.0)
row["score"] = (
float(row["min_eval_directional_auc"]) * 10.0
+ float(row["min_top_avg_edge_bps"]) * 0.10
+ (2.0 if row["stable_auc"] else 0.0)
+ (3.0 if row["stable_positive_top_edge"] else 0.0)
if np.isfinite(row["min_eval_directional_auc"]) and np.isfinite(row["min_top_avg_edge_bps"])
else -999.0
)
rows.append(row)
if not rows:
return pd.DataFrame()
return pd.DataFrame(rows).sort_values("score", ascending=False).reset_index(drop=True)
def _raw_auc(frame: pd.DataFrame, feature: str) -> float | None:
values = pd.to_numeric(frame[feature], errors="coerce").replace([np.inf, -np.inf], np.nan)
working = pd.DataFrame({"x": values, "y": frame["good_trade"].astype(int)}).dropna()
if len(working) < 1000 or working["x"].nunique() < 2 or working["y"].nunique() < 2:
return None
return float(roc_auc_score(working["y"].to_numpy(), working["x"].to_numpy()))
def _directional_auc(frame: pd.DataFrame, feature: str, direction: str) -> float | None:
auc = _raw_auc(frame, feature)
if auc is None:
return None
return float(auc if direction == "HIGH" else 1.0 - auc)
def _feature_top_metrics(frame: pd.DataFrame, feature: str, direction: str, fraction: float) -> dict[str, Any]:
values = pd.to_numeric(frame[feature], errors="coerce").replace([np.inf, -np.inf], np.nan)
working = pd.DataFrame(
{
"x": values,
"good_trade": frame["good_trade"].astype(int),
"actual_edge_bps": frame["actual_edge_bps"].astype(float),
}
).dropna()
if working.empty:
return {"rows": 0, "good_rate": 0.0, "avg_edge_bps": 0.0}
ascending = direction == "LOW"
top = working.sort_values("x", ascending=ascending).head(max(1, int(len(working) * fraction)))
return {
"rows": int(len(top)),
"good_rate": float(top["good_trade"].mean()),
"avg_edge_bps": float(top["actual_edge_bps"].mean()),
}
def _tree_model_top_rows(frame: pd.DataFrame, side: str, top_fractions: tuple[float, ...]) -> pd.DataFrame:
train = frame[frame["split_id"].eq(FIT_SPLIT)].copy()
if train.empty or train["good_trade"].nunique() < 2:
return pd.DataFrame()
model = HistGradientBoostingClassifier(
max_iter=180,
learning_rate=0.04,
max_leaf_nodes=31,
l2_regularization=0.02,
early_stopping=True,
random_state=71 if side == "LONG" else 73,
)
model.fit(_x(train), train["good_trade"].astype(int).to_numpy())
rows: list[dict[str, Any]] = []
for split_id in EVAL_SPLITS:
part = frame[frame["split_id"].eq(split_id)].copy()
if part.empty:
continue
proba = model.predict_proba(_x(part))[:, 1]
auc = _model_auc(part["good_trade"].astype(int).to_numpy(), proba)
for fraction in top_fractions:
metrics = _top_fraction_metrics(part, proba, fraction)
rows.append(
{
"side": side,
"split_id": split_id,
"model": "HistGradientBoostingClassifier",
"auc": auc,
"top_fraction": fraction,
**metrics,
}
)
return pd.DataFrame(rows)
def _model_auc(y_true: np.ndarray, proba: np.ndarray) -> float | None:
if len(np.unique(y_true)) < 2:
return None
return float(roc_auc_score(y_true, proba))
def _top_fraction_metrics(frame: pd.DataFrame, score: np.ndarray, fraction: float) -> dict[str, Any]:
working = frame[["good_trade", "actual_edge_bps"]].copy()
working["score"] = score
top = working.sort_values("score", ascending=False).head(max(1, int(len(working) * fraction)))
return {
"rows": int(len(top)),
"good_rate": float(top["good_trade"].mean()),
"avg_edge_bps": float(top["actual_edge_bps"].mean()),
"p50_edge_bps": float(top["actual_edge_bps"].quantile(0.50)),
"p90_edge_bps": float(top["actual_edge_bps"].quantile(0.90)),
}
def _tree_verdict(model_rows: pd.DataFrame) -> dict[str, Any]:
if model_rows.empty:
return {"status": "NO_MODEL_ROWS", "reason": "没有足够样本训练树模型诊断。"}
top10 = model_rows[model_rows["top_fraction"].eq(0.10)].copy()
if top10.empty:
return {"status": "NO_TOP10_ROWS", "reason": "没有 top10 诊断结果。"}
grouped = top10.groupby("side", observed=False)
promising = []
for side, part in grouped:
if set(part["split_id"]) >= set(EVAL_SPLITS) and part["avg_edge_bps"].min() > 0.0 and part["auc"].min() >= 0.56:
promising.append(str(side))
if promising:
return {"status": "PROMISING_TREE_STRUCTURE", "reason": f"树模型 top10 在这些方向三段为正: {promising}"}
return {"status": "NO_STABLE_TREE_STRUCTURE", "reason": "树模型 top10 也没有在 tune/validation/latest 三段同时转正。"}
def _x(frame: pd.DataFrame) -> np.ndarray:
return frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32").to_numpy()
def _markdown_report(result: dict[str, Any], split_summary: pd.DataFrame, feature_rows: pd.DataFrame, model_rows: pd.DataFrame) -> str:
top_fraction = 0.10
lines = [
"# 好单结构诊断报告",
"",
"这份报告只看一件事:现有 54 个特征能不能把真实赚钱单和亏钱单分开。",
"",
f"- run_id: `{result['run_id']}`",
f"- 好单定义: 当前价格计划真实净收益 >= `{result['min_good_edge_bps']}` bps",
f"- 坏单辅助定义: 当前价格计划真实净收益 <= `{result['bad_edge_bps']}` bps",
f"- 树模型诊断结论: `{result['tree_model_verdict']['status']}`",
f"- 结论说明: {result['tree_model_verdict']['reason']}",
"",
"## 基础分布",
"",
_markdown_table(split_summary),
"",
"## 单特征分辨力",
"",
f"- 稳定 AUC 特征数: `{result['stable_feature_count']}`",
f"- top {_fraction_label(top_fraction)} 平均收益三段都为正的特征数: `{result['stable_positive_top_feature_count']}`",
"",
]
feature_display = _feature_display(feature_rows, top_fraction)
lines.append(_markdown_table(feature_display.head(25)))
lines.extend(["", "## 树模型 top 分桶", ""])
model_display = model_rows.sort_values(["side", "top_fraction", "split_id"]).copy() if not model_rows.empty else pd.DataFrame()
lines.append(_markdown_table(model_display))
lines.extend(
[
"",
"## 文件",
"",
"- `diagnostics/good_trade_split_summary.csv`: 好单/坏单基础分布。",
"- `diagnostics/good_trade_feature_candidates.csv`: 单特征分辨力明细。",
"- `diagnostics/good_trade_tree_model_top.csv`: 树模型 top 分桶明细。",
"",
]
)
return "\n".join(lines)
def _feature_display(feature_rows: pd.DataFrame, top_fraction: float) -> pd.DataFrame:
if feature_rows.empty:
return pd.DataFrame()
label = _fraction_label(top_fraction)
columns = [
"side",
"feature",
"better_when",
"min_eval_directional_auc",
f"{TUNE_SPLIT}_top{label}_avg_edge_bps",
f"{VALIDATION_LOCKED_SPLIT}_top{label}_avg_edge_bps",
f"{LATEST_STRESS_SPLIT}_top{label}_avg_edge_bps",
"min_top_avg_edge_bps",
"min_top_good_rate",
"stable_auc",
"stable_positive_top_edge",
"score",
]
return feature_rows[[column for column in columns if column in feature_rows.columns]].copy()
def _markdown_table(frame: pd.DataFrame) -> str:
if frame.empty:
return "_无_"
columns = list(frame.columns)
lines = ["| " + " | ".join(columns) + " |", "| " + " | ".join(["---"] * len(columns)) + " |"]
for _, row in frame.iterrows():
lines.append("| " + " | ".join(_format_cell(row[column]) for column in columns) + " |")
return "\n".join(lines)
def _format_cell(value: Any) -> str:
if value is None or pd.isna(value):
return ""
if isinstance(value, (float, np.floating)):
return f"{float(value):.6g}"
if isinstance(value, (bool, np.bool_)):
return "true" if bool(value) else "false"
return str(value)
def _fraction_label(fraction: float) -> str:
return str(int(round(fraction * 100)))
def _jsonable(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _jsonable(item) for key, item in value.items()}
if isinstance(value, list):
return [_jsonable(item) for item in value]
if isinstance(value, (np.integer,)):
return int(value)
if isinstance(value, (np.floating,)):
return float(value)
if isinstance(value, np.ndarray):
return value.tolist()
return value