364 lines
16 KiB
Python
364 lines
16 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import logging
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
import numpy as np
|
||
|
|
import pandas as pd
|
||
|
|
from sklearn.ensemble import HistGradientBoostingClassifier
|
||
|
|
from sklearn.metrics import roc_auc_score
|
||
|
|
|
||
|
|
from trader_training.io_utils import read_parquet, run_root, write_json, write_text
|
||
|
|
from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT
|
||
|
|
|
||
|
|
|
||
|
|
ALL_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)
|
||
|
|
EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)
|
||
|
|
|
||
|
|
|
||
|
|
def diagnose_good_trade_structure(args: Any) -> None:
|
||
|
|
root = run_root(args)
|
||
|
|
dataset = read_parquet(root / "dataset" / "entry_train.parquet")
|
||
|
|
min_good_edge_bps = float(args.min_good_edge_bps)
|
||
|
|
bad_edge_bps = float(args.bad_edge_bps)
|
||
|
|
top_fractions = tuple(float(item) for item in args.top_fractions)
|
||
|
|
_require_columns(dataset)
|
||
|
|
|
||
|
|
side_frames = {
|
||
|
|
side: _side_frame(dataset, side, min_good_edge_bps, bad_edge_bps)
|
||
|
|
for side in ("LONG", "SHORT")
|
||
|
|
}
|
||
|
|
split_summary = pd.concat([_split_summary(frame, side) for side, frame in side_frames.items()], ignore_index=True)
|
||
|
|
feature_rows = pd.concat([_feature_candidates(frame, side, top_fractions) for side, frame in side_frames.items()], ignore_index=True)
|
||
|
|
model_rows = pd.concat([_tree_model_top_rows(frame, side, top_fractions) for side, frame in side_frames.items()], ignore_index=True)
|
||
|
|
result = {
|
||
|
|
"run_id": args.run_id,
|
||
|
|
"min_good_edge_bps": min_good_edge_bps,
|
||
|
|
"bad_edge_bps": bad_edge_bps,
|
||
|
|
"feature_count": len(FEATURE_ORDER),
|
||
|
|
"feature_candidate_count": int(len(feature_rows)),
|
||
|
|
"stable_feature_count": int(feature_rows["stable_auc"].sum()) if not feature_rows.empty else 0,
|
||
|
|
"stable_positive_top_feature_count": int(feature_rows["stable_positive_top_edge"].sum()) if not feature_rows.empty else 0,
|
||
|
|
"tree_model_verdict": _tree_verdict(model_rows),
|
||
|
|
}
|
||
|
|
out_dir = root / "diagnostics"
|
||
|
|
write_json(out_dir / "good_trade_structure_result.json", _jsonable(result))
|
||
|
|
write_text(out_dir / "good_trade_split_summary.csv", split_summary.to_csv(index=False))
|
||
|
|
write_text(out_dir / "good_trade_feature_candidates.csv", feature_rows.to_csv(index=False))
|
||
|
|
write_text(out_dir / "good_trade_tree_model_top.csv", model_rows.to_csv(index=False))
|
||
|
|
write_text(out_dir / "good_trade_structure_report.md", _markdown_report(result, split_summary, feature_rows, model_rows))
|
||
|
|
logging.info(
|
||
|
|
"trader.training.good_trade_structure_written runId=%s stableFeatureCount=%s stablePositiveTopFeatureCount=%s treeVerdict=%s",
|
||
|
|
args.run_id,
|
||
|
|
result["stable_feature_count"],
|
||
|
|
result["stable_positive_top_feature_count"],
|
||
|
|
result["tree_model_verdict"]["status"],
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _require_columns(dataset: pd.DataFrame) -> None:
|
||
|
|
required = {
|
||
|
|
"split_id",
|
||
|
|
*FEATURE_ORDER,
|
||
|
|
"long_actual_plan_net_edge_bps",
|
||
|
|
"short_actual_plan_net_edge_bps",
|
||
|
|
}
|
||
|
|
missing = sorted(required - set(dataset.columns))
|
||
|
|
if missing:
|
||
|
|
raise ValueError(f"good trade structure diagnostic missing required columns: {missing}")
|
||
|
|
|
||
|
|
|
||
|
|
def _side_frame(dataset: pd.DataFrame, side: str, min_good_edge_bps: float, bad_edge_bps: float) -> pd.DataFrame:
|
||
|
|
edge_col = "long_actual_plan_net_edge_bps" if side == "LONG" else "short_actual_plan_net_edge_bps"
|
||
|
|
frame = dataset[["sample_id", "split_id", edge_col, *FEATURE_ORDER]].copy()
|
||
|
|
frame = frame.rename(columns={edge_col: "actual_edge_bps"})
|
||
|
|
frame["side"] = side
|
||
|
|
frame["actual_edge_bps"] = pd.to_numeric(frame["actual_edge_bps"], errors="coerce")
|
||
|
|
frame["good_trade"] = frame["actual_edge_bps"].ge(min_good_edge_bps).astype("int8")
|
||
|
|
frame["breakeven_trade"] = frame["actual_edge_bps"].ge(0.0).astype("int8")
|
||
|
|
frame["bad_trade"] = frame["actual_edge_bps"].le(bad_edge_bps).astype("int8")
|
||
|
|
return frame.dropna(subset=["actual_edge_bps"]).reset_index(drop=True)
|
||
|
|
|
||
|
|
|
||
|
|
def _split_summary(frame: pd.DataFrame, side: str) -> pd.DataFrame:
|
||
|
|
rows: list[dict[str, Any]] = []
|
||
|
|
for split_id in ALL_SPLITS:
|
||
|
|
part = frame[frame["split_id"].eq(split_id)]
|
||
|
|
if part.empty:
|
||
|
|
continue
|
||
|
|
edge = part["actual_edge_bps"].astype(float)
|
||
|
|
rows.append(
|
||
|
|
{
|
||
|
|
"side": side,
|
||
|
|
"split_id": split_id,
|
||
|
|
"rows": int(len(part)),
|
||
|
|
"good_rate": float(part["good_trade"].mean()),
|
||
|
|
"breakeven_rate": float(part["breakeven_trade"].mean()),
|
||
|
|
"bad_rate": float(part["bad_trade"].mean()),
|
||
|
|
"avg_edge_bps": float(edge.mean()),
|
||
|
|
"p50_edge_bps": float(edge.quantile(0.50)),
|
||
|
|
"p90_edge_bps": float(edge.quantile(0.90)),
|
||
|
|
"p99_edge_bps": float(edge.quantile(0.99)),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return pd.DataFrame(rows)
|
||
|
|
|
||
|
|
|
||
|
|
def _feature_candidates(frame: pd.DataFrame, side: str, top_fractions: tuple[float, ...]) -> pd.DataFrame:
|
||
|
|
rows: list[dict[str, Any]] = []
|
||
|
|
tune = frame[frame["split_id"].eq(TUNE_SPLIT)]
|
||
|
|
for feature in FEATURE_ORDER:
|
||
|
|
tune_auc = _raw_auc(tune, feature)
|
||
|
|
if tune_auc is None:
|
||
|
|
continue
|
||
|
|
direction = "HIGH" if tune_auc >= 0.5 else "LOW"
|
||
|
|
row: dict[str, Any] = {
|
||
|
|
"side": side,
|
||
|
|
"feature": feature,
|
||
|
|
"better_when": direction,
|
||
|
|
"tune_raw_auc": float(tune_auc),
|
||
|
|
}
|
||
|
|
directional_aucs = []
|
||
|
|
top_edges = []
|
||
|
|
top_good_rates = []
|
||
|
|
for split_id in EVAL_SPLITS:
|
||
|
|
part = frame[frame["split_id"].eq(split_id)]
|
||
|
|
directional_auc = _directional_auc(part, feature, direction)
|
||
|
|
top_metrics = _feature_top_metrics(part, feature, direction, top_fractions[0])
|
||
|
|
row[f"{split_id}_directional_auc"] = directional_auc
|
||
|
|
row[f"{split_id}_top{_fraction_label(top_fractions[0])}_rows"] = top_metrics["rows"]
|
||
|
|
row[f"{split_id}_top{_fraction_label(top_fractions[0])}_good_rate"] = top_metrics["good_rate"]
|
||
|
|
row[f"{split_id}_top{_fraction_label(top_fractions[0])}_avg_edge_bps"] = top_metrics["avg_edge_bps"]
|
||
|
|
if directional_auc is not None:
|
||
|
|
directional_aucs.append(float(directional_auc))
|
||
|
|
if top_metrics["rows"] > 0:
|
||
|
|
top_edges.append(float(top_metrics["avg_edge_bps"]))
|
||
|
|
top_good_rates.append(float(top_metrics["good_rate"]))
|
||
|
|
row["min_eval_directional_auc"] = min(directional_aucs) if directional_aucs else np.nan
|
||
|
|
row["min_top_avg_edge_bps"] = min(top_edges) if top_edges else np.nan
|
||
|
|
row["min_top_good_rate"] = min(top_good_rates) if top_good_rates else np.nan
|
||
|
|
row["stable_auc"] = bool(len(directional_aucs) == len(EVAL_SPLITS) and min(directional_aucs) >= 0.53)
|
||
|
|
row["stable_positive_top_edge"] = bool(len(top_edges) == len(EVAL_SPLITS) and min(top_edges) > 0.0)
|
||
|
|
row["score"] = (
|
||
|
|
float(row["min_eval_directional_auc"]) * 10.0
|
||
|
|
+ float(row["min_top_avg_edge_bps"]) * 0.10
|
||
|
|
+ (2.0 if row["stable_auc"] else 0.0)
|
||
|
|
+ (3.0 if row["stable_positive_top_edge"] else 0.0)
|
||
|
|
if np.isfinite(row["min_eval_directional_auc"]) and np.isfinite(row["min_top_avg_edge_bps"])
|
||
|
|
else -999.0
|
||
|
|
)
|
||
|
|
rows.append(row)
|
||
|
|
if not rows:
|
||
|
|
return pd.DataFrame()
|
||
|
|
return pd.DataFrame(rows).sort_values("score", ascending=False).reset_index(drop=True)
|
||
|
|
|
||
|
|
|
||
|
|
def _raw_auc(frame: pd.DataFrame, feature: str) -> float | None:
|
||
|
|
values = pd.to_numeric(frame[feature], errors="coerce").replace([np.inf, -np.inf], np.nan)
|
||
|
|
working = pd.DataFrame({"x": values, "y": frame["good_trade"].astype(int)}).dropna()
|
||
|
|
if len(working) < 1000 or working["x"].nunique() < 2 or working["y"].nunique() < 2:
|
||
|
|
return None
|
||
|
|
return float(roc_auc_score(working["y"].to_numpy(), working["x"].to_numpy()))
|
||
|
|
|
||
|
|
|
||
|
|
def _directional_auc(frame: pd.DataFrame, feature: str, direction: str) -> float | None:
|
||
|
|
auc = _raw_auc(frame, feature)
|
||
|
|
if auc is None:
|
||
|
|
return None
|
||
|
|
return float(auc if direction == "HIGH" else 1.0 - auc)
|
||
|
|
|
||
|
|
|
||
|
|
def _feature_top_metrics(frame: pd.DataFrame, feature: str, direction: str, fraction: float) -> dict[str, Any]:
|
||
|
|
values = pd.to_numeric(frame[feature], errors="coerce").replace([np.inf, -np.inf], np.nan)
|
||
|
|
working = pd.DataFrame(
|
||
|
|
{
|
||
|
|
"x": values,
|
||
|
|
"good_trade": frame["good_trade"].astype(int),
|
||
|
|
"actual_edge_bps": frame["actual_edge_bps"].astype(float),
|
||
|
|
}
|
||
|
|
).dropna()
|
||
|
|
if working.empty:
|
||
|
|
return {"rows": 0, "good_rate": 0.0, "avg_edge_bps": 0.0}
|
||
|
|
ascending = direction == "LOW"
|
||
|
|
top = working.sort_values("x", ascending=ascending).head(max(1, int(len(working) * fraction)))
|
||
|
|
return {
|
||
|
|
"rows": int(len(top)),
|
||
|
|
"good_rate": float(top["good_trade"].mean()),
|
||
|
|
"avg_edge_bps": float(top["actual_edge_bps"].mean()),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _tree_model_top_rows(frame: pd.DataFrame, side: str, top_fractions: tuple[float, ...]) -> pd.DataFrame:
|
||
|
|
train = frame[frame["split_id"].eq(FIT_SPLIT)].copy()
|
||
|
|
if train.empty or train["good_trade"].nunique() < 2:
|
||
|
|
return pd.DataFrame()
|
||
|
|
model = HistGradientBoostingClassifier(
|
||
|
|
max_iter=180,
|
||
|
|
learning_rate=0.04,
|
||
|
|
max_leaf_nodes=31,
|
||
|
|
l2_regularization=0.02,
|
||
|
|
early_stopping=True,
|
||
|
|
random_state=71 if side == "LONG" else 73,
|
||
|
|
)
|
||
|
|
model.fit(_x(train), train["good_trade"].astype(int).to_numpy())
|
||
|
|
rows: list[dict[str, Any]] = []
|
||
|
|
for split_id in EVAL_SPLITS:
|
||
|
|
part = frame[frame["split_id"].eq(split_id)].copy()
|
||
|
|
if part.empty:
|
||
|
|
continue
|
||
|
|
proba = model.predict_proba(_x(part))[:, 1]
|
||
|
|
auc = _model_auc(part["good_trade"].astype(int).to_numpy(), proba)
|
||
|
|
for fraction in top_fractions:
|
||
|
|
metrics = _top_fraction_metrics(part, proba, fraction)
|
||
|
|
rows.append(
|
||
|
|
{
|
||
|
|
"side": side,
|
||
|
|
"split_id": split_id,
|
||
|
|
"model": "HistGradientBoostingClassifier",
|
||
|
|
"auc": auc,
|
||
|
|
"top_fraction": fraction,
|
||
|
|
**metrics,
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return pd.DataFrame(rows)
|
||
|
|
|
||
|
|
|
||
|
|
def _model_auc(y_true: np.ndarray, proba: np.ndarray) -> float | None:
|
||
|
|
if len(np.unique(y_true)) < 2:
|
||
|
|
return None
|
||
|
|
return float(roc_auc_score(y_true, proba))
|
||
|
|
|
||
|
|
|
||
|
|
def _top_fraction_metrics(frame: pd.DataFrame, score: np.ndarray, fraction: float) -> dict[str, Any]:
|
||
|
|
working = frame[["good_trade", "actual_edge_bps"]].copy()
|
||
|
|
working["score"] = score
|
||
|
|
top = working.sort_values("score", ascending=False).head(max(1, int(len(working) * fraction)))
|
||
|
|
return {
|
||
|
|
"rows": int(len(top)),
|
||
|
|
"good_rate": float(top["good_trade"].mean()),
|
||
|
|
"avg_edge_bps": float(top["actual_edge_bps"].mean()),
|
||
|
|
"p50_edge_bps": float(top["actual_edge_bps"].quantile(0.50)),
|
||
|
|
"p90_edge_bps": float(top["actual_edge_bps"].quantile(0.90)),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _tree_verdict(model_rows: pd.DataFrame) -> dict[str, Any]:
|
||
|
|
if model_rows.empty:
|
||
|
|
return {"status": "NO_MODEL_ROWS", "reason": "没有足够样本训练树模型诊断。"}
|
||
|
|
top10 = model_rows[model_rows["top_fraction"].eq(0.10)].copy()
|
||
|
|
if top10.empty:
|
||
|
|
return {"status": "NO_TOP10_ROWS", "reason": "没有 top10 诊断结果。"}
|
||
|
|
grouped = top10.groupby("side", observed=False)
|
||
|
|
promising = []
|
||
|
|
for side, part in grouped:
|
||
|
|
if set(part["split_id"]) >= set(EVAL_SPLITS) and part["avg_edge_bps"].min() > 0.0 and part["auc"].min() >= 0.56:
|
||
|
|
promising.append(str(side))
|
||
|
|
if promising:
|
||
|
|
return {"status": "PROMISING_TREE_STRUCTURE", "reason": f"树模型 top10 在这些方向三段为正: {promising}"}
|
||
|
|
return {"status": "NO_STABLE_TREE_STRUCTURE", "reason": "树模型 top10 也没有在 tune/validation/latest 三段同时转正。"}
|
||
|
|
|
||
|
|
|
||
|
|
def _x(frame: pd.DataFrame) -> np.ndarray:
|
||
|
|
return frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32").to_numpy()
|
||
|
|
|
||
|
|
|
||
|
|
def _markdown_report(result: dict[str, Any], split_summary: pd.DataFrame, feature_rows: pd.DataFrame, model_rows: pd.DataFrame) -> str:
|
||
|
|
top_fraction = 0.10
|
||
|
|
lines = [
|
||
|
|
"# 好单结构诊断报告",
|
||
|
|
"",
|
||
|
|
"这份报告只看一件事:现有 54 个特征能不能把真实赚钱单和亏钱单分开。",
|
||
|
|
"",
|
||
|
|
f"- run_id: `{result['run_id']}`",
|
||
|
|
f"- 好单定义: 当前价格计划真实净收益 >= `{result['min_good_edge_bps']}` bps",
|
||
|
|
f"- 坏单辅助定义: 当前价格计划真实净收益 <= `{result['bad_edge_bps']}` bps",
|
||
|
|
f"- 树模型诊断结论: `{result['tree_model_verdict']['status']}`",
|
||
|
|
f"- 结论说明: {result['tree_model_verdict']['reason']}",
|
||
|
|
"",
|
||
|
|
"## 基础分布",
|
||
|
|
"",
|
||
|
|
_markdown_table(split_summary),
|
||
|
|
"",
|
||
|
|
"## 单特征分辨力",
|
||
|
|
"",
|
||
|
|
f"- 稳定 AUC 特征数: `{result['stable_feature_count']}`",
|
||
|
|
f"- top {_fraction_label(top_fraction)} 平均收益三段都为正的特征数: `{result['stable_positive_top_feature_count']}`",
|
||
|
|
"",
|
||
|
|
]
|
||
|
|
feature_display = _feature_display(feature_rows, top_fraction)
|
||
|
|
lines.append(_markdown_table(feature_display.head(25)))
|
||
|
|
lines.extend(["", "## 树模型 top 分桶", ""])
|
||
|
|
model_display = model_rows.sort_values(["side", "top_fraction", "split_id"]).copy() if not model_rows.empty else pd.DataFrame()
|
||
|
|
lines.append(_markdown_table(model_display))
|
||
|
|
lines.extend(
|
||
|
|
[
|
||
|
|
"",
|
||
|
|
"## 文件",
|
||
|
|
"",
|
||
|
|
"- `diagnostics/good_trade_split_summary.csv`: 好单/坏单基础分布。",
|
||
|
|
"- `diagnostics/good_trade_feature_candidates.csv`: 单特征分辨力明细。",
|
||
|
|
"- `diagnostics/good_trade_tree_model_top.csv`: 树模型 top 分桶明细。",
|
||
|
|
"",
|
||
|
|
]
|
||
|
|
)
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
|
||
|
|
def _feature_display(feature_rows: pd.DataFrame, top_fraction: float) -> pd.DataFrame:
|
||
|
|
if feature_rows.empty:
|
||
|
|
return pd.DataFrame()
|
||
|
|
label = _fraction_label(top_fraction)
|
||
|
|
columns = [
|
||
|
|
"side",
|
||
|
|
"feature",
|
||
|
|
"better_when",
|
||
|
|
"min_eval_directional_auc",
|
||
|
|
f"{TUNE_SPLIT}_top{label}_avg_edge_bps",
|
||
|
|
f"{VALIDATION_LOCKED_SPLIT}_top{label}_avg_edge_bps",
|
||
|
|
f"{LATEST_STRESS_SPLIT}_top{label}_avg_edge_bps",
|
||
|
|
"min_top_avg_edge_bps",
|
||
|
|
"min_top_good_rate",
|
||
|
|
"stable_auc",
|
||
|
|
"stable_positive_top_edge",
|
||
|
|
"score",
|
||
|
|
]
|
||
|
|
return feature_rows[[column for column in columns if column in feature_rows.columns]].copy()
|
||
|
|
|
||
|
|
|
||
|
|
def _markdown_table(frame: pd.DataFrame) -> str:
|
||
|
|
if frame.empty:
|
||
|
|
return "_无_"
|
||
|
|
columns = list(frame.columns)
|
||
|
|
lines = ["| " + " | ".join(columns) + " |", "| " + " | ".join(["---"] * len(columns)) + " |"]
|
||
|
|
for _, row in frame.iterrows():
|
||
|
|
lines.append("| " + " | ".join(_format_cell(row[column]) for column in columns) + " |")
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
|
||
|
|
def _format_cell(value: Any) -> str:
|
||
|
|
if value is None or pd.isna(value):
|
||
|
|
return ""
|
||
|
|
if isinstance(value, (float, np.floating)):
|
||
|
|
return f"{float(value):.6g}"
|
||
|
|
if isinstance(value, (bool, np.bool_)):
|
||
|
|
return "true" if bool(value) else "false"
|
||
|
|
return str(value)
|
||
|
|
|
||
|
|
|
||
|
|
def _fraction_label(fraction: float) -> str:
|
||
|
|
return str(int(round(fraction * 100)))
|
||
|
|
|
||
|
|
|
||
|
|
def _jsonable(value: Any) -> Any:
|
||
|
|
if isinstance(value, dict):
|
||
|
|
return {str(key): _jsonable(item) for key, item in value.items()}
|
||
|
|
if isinstance(value, list):
|
||
|
|
return [_jsonable(item) for item in value]
|
||
|
|
if isinstance(value, (np.integer,)):
|
||
|
|
return int(value)
|
||
|
|
if isinstance(value, (np.floating,)):
|
||
|
|
return float(value)
|
||
|
|
if isinstance(value, np.ndarray):
|
||
|
|
return value.tolist()
|
||
|
|
return value
|