Add Direction label and PM probe diagnostics

This commit is contained in:
Codex
2026-06-28 09:00:15 +08:00
parent 5ad77ffe90
commit e8420f76fe
8 changed files with 634 additions and 16 deletions
@@ -0,0 +1,28 @@
from __future__ import annotations
import argparse
from pathlib import Path
import _bootstrap # noqa: F401
from trader_training.direction_opportunity_dataset import build_direction_opportunity_dataset
from trader_training.io_utils import add_common_args, setup_logging
def main() -> None:
parser = argparse.ArgumentParser()
add_common_args(parser)
parser.add_argument("--direction-dataset-path", type=Path)
parser.add_argument("--entry-dataset-path", type=Path)
parser.add_argument("--output-path", type=Path)
parser.add_argument("--opportunity-bps", type=float, required=True)
parser.add_argument("--min-advantage-bps", type=float, default=5.0)
parser.add_argument("--long-edge-column", default="long_max_achievable_net_edge_bps")
parser.add_argument("--short-edge-column", default="short_max_achievable_net_edge_bps")
parser.add_argument("--label-method", default="DIRECTION_OPPORTUNITY_FROM_ENTRY_MFE_V1")
args = parser.parse_args()
setup_logging()
build_direction_opportunity_dataset(args)
if __name__ == "__main__":
main()
+19
View File
@@ -0,0 +1,19 @@
from __future__ import annotations
import argparse
import _bootstrap # noqa: F401
from trader_training.io_utils import add_common_args, setup_logging
from trader_training.nonlinear_pm_probe import probe_nonlinear_pm
def main() -> None:
parser = argparse.ArgumentParser(description="Run diagnostic nonlinear Direction + Entry PM probe.")
add_common_args(parser)
args = parser.parse_args()
setup_logging()
probe_nonlinear_pm(args)
if __name__ == "__main__":
main()
+63
View File
@@ -15,17 +15,20 @@ if str(TRAINING_ROOT) not in sys.path:
from trader_training.onnx_export import LinearHead, export_heads
from trader_training.conditional_entry_probe import probe_conditional_entry_training
from trader_training.direction_opportunity_dataset import _opportunity_labels
from trader_training.dynamic_exit_search import search_dynamic_exit_plans
from trader_training.entry_condition_pair_screen import screen_entry_condition_pairs
from trader_training.entry_feature_screen import _bucket_edges, _screen_edge_column
from trader_training.entry_mae_label_diagnostic import diagnose_entry_mae_labels
from trader_training.io_utils import read_json, write_json
from trader_training.labels import ENTRY_LABEL_METHOD, _path_stats_for_group, build_entry_labels
from trader_training.nonlinear_pm_probe import _expanded_threshold_candidates
from trader_training.ofi_feature_experiment import _load_entry_dataset, l1_snapshot_diff_ofi_quote
from trader_training.promote import promote_artifact_bundle
from trader_training.replay import build_splits
from trader_training.schemas import FEATURE_ORDER, LATEST_STRESS_SPLIT, MODEL_OUTPUTS, OUTPUT_MAPPING, TRAINING_SPLITS, VALIDATION_LOCKED_SPLIT
from trader_training.training import TARGETS, _head_train_mask
from trader_training.diagnostics import _label_summary
class TrainingContractTest(unittest.TestCase):
@@ -40,6 +43,21 @@ class TrainingContractTest(unittest.TestCase):
self.assertEqual(set(fields), set(OUTPUT_MAPPING[model_name]))
self.assertEqual([f"prediction[{idx}]" for idx in range(len(fields))], [OUTPUT_MAPPING[model_name][field] for field in fields])
def test_nonlinear_pm_probe_expands_low_probability_thresholds(self) -> None:
candidates = _expanded_threshold_candidates()
self.assertIn(
{
"long_open_prob": 0.2,
"short_open_prob": 0.2,
"min_entry_prob": 0.05,
"max_market_risk_prob": 0.45,
"min_expected_edge_bps": -5.0,
"min_direction_margin": 0.0,
},
candidates,
)
def test_entry_feature_screen_prefers_actual_plan_edge(self) -> None:
dataset = pd.DataFrame(
{
@@ -79,6 +97,51 @@ class TrainingContractTest(unittest.TestCase):
self.assertEqual("ALL_FIT_ROWS", default_filter)
self.assertEqual([True, True, True, True], default_mask.tolist())
def test_direction_opportunity_labels_choose_clear_path_opportunity(self) -> None:
labels = _opportunity_labels(
np.array([45.0, 10.0, 45.0, 42.0, np.nan]),
np.array([20.0, 50.0, 43.0, 48.0, 50.0]),
opportunity_bps=40.0,
min_advantage_bps=5.0,
)
self.assertEqual([1, 0, 0, 0, 0], labels["long_target"].tolist())
self.assertEqual([0, 1, 0, 1, 1], labels["short_target"].tolist())
self.assertEqual([0, 0, 1, 0, 0], labels["neutral_target"].tolist())
def test_diagnostics_reads_actual_training_dataset_labels(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
dataset_dir = root / "dataset"
dataset_dir.mkdir(parents=True)
pd.DataFrame(
{
"sample_id": ["s1", "s2"],
"split_id": ["fit_inner", "fit_inner"],
"long_target": [1, 0],
"short_target": [0, 0],
"neutral_target": [0, 1],
"future_return_bps": [5.0, -1.0],
}
).to_parquet(dataset_dir / "direction_train.parquet", index=False)
pd.DataFrame(
{
"sample_id": ["s1", "s2"],
"split_id": ["fit_inner", "fit_inner"],
"long_entry_target": [1, 0],
"short_entry_target": [0, 1],
"long_actual_plan_net_edge_bps": [8.0, -6.0],
"short_actual_plan_net_edge_bps": [-5.0, 7.0],
}
).to_parquet(dataset_dir / "entry_train.parquet", index=False)
summary = _label_summary(root)
self.assertEqual("dataset/direction_train.parquet", summary["fit_inner"]["direction"]["source"])
self.assertEqual({"LONG": 0.5, "SHORT": 0.0, "NEUTRAL": 0.5}, summary["fit_inner"]["direction"]["label_ratio"])
self.assertEqual("dataset/entry_train.parquet", summary["fit_inner"]["entry"]["source"])
self.assertEqual(0.5, summary["fit_inner"]["entry"]["target_rate_by_side"]["LONG"])
def test_entry_feature_screen_keeps_zero_inflated_event_features(self) -> None:
values = np.concatenate((np.zeros(5000), np.linspace(1.0, 100.0, 500)))
edges = _bucket_edges(values)
+70 -11
View File
@@ -36,8 +36,8 @@ def diagnose_training_run(args: Any) -> None:
def _label_summary(root) -> dict[str, Any]:
direction = read_parquet(root / "label" / "direction_labels.parquet")
entry = read_parquet(root / "label" / "entry_labels.parquet")
direction = read_parquet(root / "dataset" / "direction_train.parquet")
entry = read_parquet(root / "dataset" / "entry_train.parquet")
summary: dict[str, Any] = {}
for split_id in DIAGNOSTIC_SPLITS:
direction_split = direction[direction["split_id"].eq(split_id)].copy()
@@ -45,28 +45,57 @@ def _label_summary(root) -> dict[str, Any]:
item: dict[str, Any] = {"direction": {}, "entry": {}}
if not direction_split.empty:
item["direction"] = {
"source": "dataset/direction_train.parquet",
"rows": len(direction_split),
"label_ratio": direction_split["direction_label"].value_counts(normalize=True).round(6).to_dict(),
"label_ratio": _direction_target_ratio(direction_split),
"future_return_bps_quantile": _quantiles(direction_split["future_return_bps"], (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99)),
}
if not entry_split.empty:
if "actual_plan_net_edge_bps" not in entry_split.columns:
raise ValueError("entry_labels is missing actual_plan_net_edge_bps for diagnostics")
grouped = entry_split.groupby("side", observed=False)
required = {
"long_entry_target",
"short_entry_target",
"long_actual_plan_net_edge_bps",
"short_actual_plan_net_edge_bps",
}
missing = sorted(required - set(entry_split.columns))
if missing:
raise ValueError(f"entry_train is missing columns required by diagnostics: {missing}")
item["entry"] = {
"source": "dataset/entry_train.parquet",
"rows": len(entry_split),
"target_rate_by_side": grouped["entry_target"].mean().round(6).to_dict(),
"target_rate_by_side": {
"LONG": float(entry_split["long_entry_target"].astype(float).mean()),
"SHORT": float(entry_split["short_entry_target"].astype(float).mean()),
},
"edge_column": "actual_plan_net_edge_bps",
"edge_mean_by_side": grouped["actual_plan_net_edge_bps"].mean().round(6).to_dict(),
"edge_mean_by_side": {
"LONG": float(entry_split["long_actual_plan_net_edge_bps"].astype(float).mean()),
"SHORT": float(entry_split["short_actual_plan_net_edge_bps"].astype(float).mean()),
},
"edge_quantile_by_side": {
str(side): _quantiles(group["actual_plan_net_edge_bps"], (0.05, 0.5, 0.95))
for side, group in grouped
"LONG": _quantiles(entry_split["long_actual_plan_net_edge_bps"], (0.05, 0.5, 0.95)),
"SHORT": _quantiles(entry_split["short_actual_plan_net_edge_bps"], (0.05, 0.5, 0.95)),
},
}
summary[split_id] = item
return summary
def _direction_target_ratio(frame: pd.DataFrame) -> dict[str, float]:
required = {"long_target", "short_target", "neutral_target"}
missing = sorted(required - set(frame.columns))
if missing:
raise ValueError(f"direction_train is missing target columns required by diagnostics: {missing}")
rows = len(frame)
if rows == 0:
return {"LONG": 0.0, "SHORT": 0.0, "NEUTRAL": 0.0}
return {
"LONG": float(frame["long_target"].astype(float).mean()),
"SHORT": float(frame["short_target"].astype(float).mean()),
"NEUTRAL": float(frame["neutral_target"].astype(float).mean()),
}
def _pm_summary(root) -> dict[str, Any]:
summary: dict[str, Any] = {}
config_path = root / "pm-search" / "position_manager_config.json"
@@ -259,7 +288,7 @@ def _diagnostic_conclusion(pm_summary: dict[str, Any]) -> dict[str, Any]:
if validation.get("avg_weighted_edge_bps", 0.0) <= 0 and stress.get("avg_weighted_edge_bps", 0.0) <= 0:
return {
"status": "PRICE_PLAN_OR_ENTRY_NOT_TRADABLE",
"plain_reason": "固定止盈止损真实收益算,验证集和压力集选出来的交易平均都不赚钱。",
"plain_reason": "当前价格计划真实收益算,验证集和压力集选出来的交易平均都不赚钱。",
"next_action": "优先重新搜索价格计划,再重建 Entry 标签和模型;不要只放松 PM 阈值。",
}
return {
@@ -296,10 +325,12 @@ def _markdown_report(payload: dict[str, Any]) -> str:
lines.append("")
if direction:
lines.append(f"- Direction 行数: {direction['rows']}")
lines.append(f"- Direction 来源: `{direction['source']}`")
lines.append(f"- Direction 标签比例: `{direction['label_ratio']}`")
lines.append(f"- 45 分钟未来收益分位: `{direction['future_return_bps_quantile']}`")
if entry:
lines.append(f"- Entry 行数: {entry['rows']}")
lines.append(f"- Entry 来源: `{entry['source']}`")
lines.append(f"- Entry 命中率: `{entry['target_rate_by_side']}`")
lines.append(f"- Entry 平均净收益: `{entry['edge_mean_by_side']}`")
lines.append("")
@@ -312,6 +343,7 @@ def _markdown_report(payload: dict[str, Any]) -> str:
lines.append(f"- 当前阈值: `{item['active_thresholds']}`")
lines.append(f"- 当前阈值选中交易: `{item['selected_trade_metrics']}`")
lines.append(f"- 网格里有交易的候选数: {item['grid_search_any_trade']['candidates_with_trade']} / {item['grid_search_any_trade']['candidate_count']}")
lines.extend(_score_distribution_markdown(item["score_distribution"]))
lines.append("")
for side in ("long", "short"):
lines.append(f"#### {side.upper()}")
@@ -334,6 +366,33 @@ def _markdown_report(payload: dict[str, Any]) -> str:
return "\n".join(lines) + "\n"
def _score_distribution_markdown(distribution: dict[str, dict[str, float]]) -> list[str]:
watched_columns = [
"long_prob",
"short_prob",
"long_entry_prob",
"short_entry_prob",
"market_risk_prob",
"pred_long_expected_net_edge_bps",
"pred_short_expected_net_edge_bps",
]
lines = ["", "#### 分数分布", "", "| 字段 | 最小 | 5% | 中位数 | 95% | 最大 |", "| --- | ---: | ---: | ---: | ---: | ---: |"]
for column in watched_columns:
quantiles = distribution.get(column)
if not quantiles:
continue
lines.append(
"| "
+ column
+ f" | {quantiles.get('0.0', 0.0):.4f}"
+ f" | {quantiles.get('0.05', 0.0):.4f}"
+ f" | {quantiles.get('0.5', 0.0):.4f}"
+ f" | {quantiles.get('0.95', 0.0):.4f}"
+ f" | {quantiles.get('1.0', 0.0):.4f} |"
)
return lines
def _jsonable(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _jsonable(item) for key, item in value.items()}
@@ -0,0 +1,140 @@
from __future__ import annotations
import logging
from typing import Any
import numpy as np
import pandas as pd
from trader_training.io_utils import manifest, read_parquet, require_columns, run_root, write_json, write_parquet, write_text
from trader_training.schemas import FEATURE_ORDER
def build_direction_opportunity_dataset(args: Any) -> None:
root = run_root(args)
direction_path = args.direction_dataset_path or root / "dataset" / "direction_train.parquet"
entry_path = args.entry_dataset_path or root / "dataset" / "entry_train.parquet"
output_path = args.output_path or root / "dataset" / "direction_train.parquet"
opportunity_bps = float(args.opportunity_bps)
min_advantage_bps = float(args.min_advantage_bps)
long_edge_column = str(args.long_edge_column)
short_edge_column = str(args.short_edge_column)
label_method = str(args.label_method)
direction = read_parquet(direction_path)
entry = read_parquet(entry_path)
require_columns(direction, ("sample_id", "split_id", *FEATURE_ORDER), "direction_train")
require_columns(entry, ("sample_id", long_edge_column, short_edge_column), "entry_train")
opportunity = entry[["sample_id", long_edge_column, short_edge_column]].copy()
merged = direction.drop(columns=["long_target", "short_target", "neutral_target"], errors="ignore").merge(opportunity, on="sample_id", how="inner", validate="one_to_one")
if len(merged) != len(direction):
raise ValueError(f"direction opportunity dataset lost rows: before={len(direction)} after={len(merged)}")
labels = _opportunity_labels(
pd.to_numeric(merged[long_edge_column], errors="coerce").to_numpy(dtype="float64"),
pd.to_numeric(merged[short_edge_column], errors="coerce").to_numpy(dtype="float64"),
opportunity_bps,
min_advantage_bps,
)
merged["long_target"] = labels["long_target"]
merged["short_target"] = labels["short_target"]
merged["neutral_target"] = labels["neutral_target"]
# 保留 future_return_bps 作为排查字段;训练目标以三列 target 为准。
ordered = [column for column in direction.columns if column in merged.columns and column not in {"long_target", "short_target", "neutral_target"}]
ordered.extend(["long_target", "short_target", "neutral_target"])
for column in (long_edge_column, short_edge_column):
if column not in ordered:
ordered.append(column)
out = merged[ordered].copy()
data_hash = write_parquet(output_path, out)
result = {
"dataset": manifest(
output_path,
{
"row_count": len(out),
"feature_count": len(FEATURE_ORDER),
"data_hash_sha256": data_hash,
"split_counts": out["split_id"].value_counts().to_dict(),
},
),
"label_method": label_method,
"long_edge_column": long_edge_column,
"short_edge_column": short_edge_column,
"opportunity_bps": opportunity_bps,
"min_advantage_bps": min_advantage_bps,
"target_counts": {
"long": int(out["long_target"].sum()),
"short": int(out["short_target"].sum()),
"neutral": int(out["neutral_target"].sum()),
},
"target_rates_by_split": _target_rates_by_split(out),
}
write_json(root / "dataset" / "direction_opportunity_dataset_result.json", result)
write_text(root / "dataset" / "direction_opportunity_dataset_report.md", _markdown_report(result))
logging.info(
"trader.training.direction_opportunity_dataset_written runId=%s opportunityBps=%.6f minAdvantageBps=%.6f rowCount=%s outputPath=%s",
args.run_id,
opportunity_bps,
min_advantage_bps,
len(out),
output_path,
)
def _opportunity_labels(long_edge: np.ndarray, short_edge: np.ndarray, opportunity_bps: float, min_advantage_bps: float) -> dict[str, np.ndarray]:
long_clean = np.nan_to_num(long_edge, nan=-np.inf)
short_clean = np.nan_to_num(short_edge, nan=-np.inf)
long_ok = long_clean >= opportunity_bps
short_ok = short_clean >= opportunity_bps
long_wins = long_ok & ((long_clean - short_clean) >= min_advantage_bps)
short_wins = short_ok & ((short_clean - long_clean) >= min_advantage_bps)
neutral = ~(long_wins | short_wins)
return {
"long_target": long_wins.astype("int8"),
"short_target": short_wins.astype("int8"),
"neutral_target": neutral.astype("int8"),
}
def _target_rates_by_split(frame: pd.DataFrame) -> dict[str, dict[str, float]]:
result: dict[str, dict[str, float]] = {}
for split_id, part in frame.groupby("split_id", observed=False):
rows = len(part)
result[str(split_id)] = {
"rows": float(rows),
"long_rate": float(part["long_target"].mean()) if rows else 0.0,
"short_rate": float(part["short_target"].mean()) if rows else 0.0,
"neutral_rate": float(part["neutral_target"].mean()) if rows else 0.0,
}
return result
def _markdown_report(result: dict[str, Any]) -> str:
lines = [
"# Direction 机会标签数据集报告",
"",
"这份数据集把 Direction 目标从“未来收盘收益方向”改为“未来路径里哪边有可交易空间”。",
"",
f"- label_method: `{result['label_method']}`",
f"- long_edge_column: `{result['long_edge_column']}`",
f"- short_edge_column: `{result['short_edge_column']}`",
f"- opportunity_bps: `{result['opportunity_bps']}`",
f"- min_advantage_bps: `{result['min_advantage_bps']}`",
f"- row_count: `{result['dataset']['row_count']}`",
"",
"## 标签数量",
"",
f"- long: `{result['target_counts']['long']}`",
f"- short: `{result['target_counts']['short']}`",
f"- neutral: `{result['target_counts']['neutral']}`",
"",
"## 分段比例",
"",
"| split | rows | long | short | neutral |",
"| --- | ---: | ---: | ---: | ---: |",
]
for split_id, item in result["target_rates_by_split"].items():
lines.append(f"| {split_id} | {int(item['rows'])} | {item['long_rate']:.4f} | {item['short_rate']:.4f} | {item['neutral_rate']:.4f} |")
lines.append("")
return "\n".join(lines)
@@ -0,0 +1,308 @@
from __future__ import annotations
import itertools
import logging
from typing import Any
import numpy as np
import pandas as pd
from sklearn.ensemble import HistGradientBoostingClassifier, HistGradientBoostingRegressor
from trader_training.io_utils import read_parquet, run_root, write_json, write_text
from trader_training.pm import _pm_config_from_thresholds, _pm_frame, _price_plan_context, _simulate_open_trades, _trade_metrics
from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT
EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)
def probe_nonlinear_pm(args: Any) -> None:
root = run_root(args)
direction_dataset = read_parquet(root / "dataset" / "direction_train.parquet")
entry_dataset = read_parquet(root / "dataset" / "entry_train.parquet")
direction_model = _fit_direction_model(direction_dataset)
entry_models = _fit_entry_models(direction_dataset, entry_dataset)
frames = {
split_id: _prediction_frame(root, split_id, direction_dataset, entry_dataset, direction_model, entry_models)
for split_id in EVAL_SPLITS
}
price_plan = _price_plan_context(root)
candidates = _expanded_threshold_candidates()
tune_rows: list[dict[str, Any]] = []
best_thresholds: dict[str, float] | None = None
best_tune_metrics: dict[str, Any] | None = None
best_score = -float("inf")
for thresholds in candidates:
trades = _simulate_open_trades(frames[TUNE_SPLIT], thresholds, _pm_config_from_thresholds(thresholds), price_plan)
metrics = _trade_metrics(trades)
score = _probe_score(metrics)
tune_rows.append({**thresholds, **metrics, "score": score})
if score > best_score:
best_score = score
best_thresholds = thresholds
best_tune_metrics = metrics
if best_thresholds is None or best_tune_metrics is None:
raise ValueError("nonlinear PM probe did not evaluate any threshold candidate")
split_metrics: dict[str, Any] = {}
for split_id, frame in frames.items():
trades = _simulate_open_trades(frame, best_thresholds, _pm_config_from_thresholds(best_thresholds), price_plan)
split_metrics[split_id] = _trade_metrics(trades)
tune_frame = pd.DataFrame(tune_rows).sort_values("score", ascending=False).reset_index(drop=True)
result = {
"run_id": args.run_id,
"purpose": "diagnostic_only_not_exported",
"model_family": "sklearn_hist_gradient_boosting",
"candidate_count": len(candidates),
"best_thresholds": best_thresholds,
"best_tune_metrics": best_tune_metrics,
"split_metrics": split_metrics,
"verdict": _verdict(split_metrics),
}
out_dir = root / "diagnostics"
write_json(out_dir / "nonlinear_pm_probe_result.json", _jsonable(result))
write_text(out_dir / "nonlinear_pm_probe_candidates.csv", tune_frame.head(200).to_csv(index=False))
write_text(out_dir / "nonlinear_pm_probe_report.md", _markdown_report(result, tune_frame.head(20)))
logging.info(
"trader.training.nonlinear_pm_probe_written runId=%s verdict=%s tuneTrades=%s validationTrades=%s stressTrades=%s",
args.run_id,
result["verdict"]["status"],
split_metrics[TUNE_SPLIT]["trade_count"],
split_metrics[VALIDATION_LOCKED_SPLIT]["trade_count"],
split_metrics[LATEST_STRESS_SPLIT]["trade_count"],
)
def _fit_direction_model(dataset: pd.DataFrame) -> HistGradientBoostingClassifier:
train = dataset[dataset["split_id"].eq(FIT_SPLIT)].copy()
y = train[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1)
model = HistGradientBoostingClassifier(
max_iter=160,
learning_rate=0.04,
max_leaf_nodes=31,
l2_regularization=0.02,
early_stopping=True,
random_state=41,
)
model.fit(_x(train), y)
return model
def _fit_entry_models(direction_dataset: pd.DataFrame, entry_dataset: pd.DataFrame) -> dict[str, Any]:
merged = entry_dataset.merge(
direction_dataset[["sample_id", "long_target", "short_target"]],
on="sample_id",
how="inner",
validate="one_to_one",
)
train = merged[merged["split_id"].eq(FIT_SPLIT)].copy()
return {
"long_entry_prob": _fit_binary_head(train[train["long_target"].eq(1)], "long_entry_target", seed=43),
"short_entry_prob": _fit_binary_head(train[train["short_target"].eq(1)], "short_entry_target", seed=47),
"long_expected_net_edge_bps": _fit_regression_head(train[train["long_target"].eq(1)], "long_actual_plan_net_edge_bps", seed=53),
"short_expected_net_edge_bps": _fit_regression_head(train[train["short_target"].eq(1)], "short_actual_plan_net_edge_bps", seed=59),
}
def _fit_binary_head(train: pd.DataFrame, target: str, seed: int) -> HistGradientBoostingClassifier:
if len(train) < 1000:
raise ValueError(f"not enough rows to train nonlinear Entry head {target}: {len(train)}")
y = train[target].astype(int).to_numpy()
if len(np.unique(y)) < 2:
raise ValueError(f"nonlinear Entry head {target} has only one class")
model = HistGradientBoostingClassifier(
max_iter=180,
learning_rate=0.04,
max_leaf_nodes=31,
l2_regularization=0.02,
early_stopping=True,
random_state=seed,
)
model.fit(_x(train), y)
return model
def _fit_regression_head(train: pd.DataFrame, target: str, seed: int) -> HistGradientBoostingRegressor:
if len(train) < 1000:
raise ValueError(f"not enough rows to train nonlinear Entry head {target}: {len(train)}")
model = HistGradientBoostingRegressor(
max_iter=180,
learning_rate=0.04,
max_leaf_nodes=31,
l2_regularization=0.02,
early_stopping=True,
random_state=seed,
)
model.fit(_x(train), train[target].astype(float).to_numpy())
return model
def _prediction_frame(
root,
split_id: str,
direction_dataset: pd.DataFrame,
entry_dataset: pd.DataFrame,
direction_model: HistGradientBoostingClassifier,
entry_models: dict[str, Any],
) -> pd.DataFrame:
frame = _pm_frame(root, split_id).copy()
direction_split = direction_dataset[direction_dataset["split_id"].eq(split_id)].copy()
entry_split = entry_dataset[entry_dataset["split_id"].eq(split_id)].copy()
direction_proba = direction_model.predict_proba(_x(direction_split))
direction_pred = direction_split[["sample_id"]].copy()
direction_pred["long_prob"] = direction_proba[:, 0]
direction_pred["short_prob"] = direction_proba[:, 1]
direction_pred["neutral_prob"] = direction_proba[:, 2]
entry_pred = entry_split[["sample_id"]].copy()
entry_pred["long_entry_prob"] = entry_models["long_entry_prob"].predict_proba(_x(entry_split))[:, 1]
entry_pred["short_entry_prob"] = entry_models["short_entry_prob"].predict_proba(_x(entry_split))[:, 1]
entry_pred["pred_long_expected_net_edge_bps"] = entry_models["long_expected_net_edge_bps"].predict(_x(entry_split))
entry_pred["pred_short_expected_net_edge_bps"] = entry_models["short_expected_net_edge_bps"].predict(_x(entry_split))
replacements = direction_pred.merge(entry_pred, on="sample_id", how="inner", validate="one_to_one")
out = frame.drop(
columns=[
"long_prob",
"short_prob",
"neutral_prob",
"long_entry_prob",
"short_entry_prob",
"pred_long_expected_net_edge_bps",
"pred_short_expected_net_edge_bps",
],
errors="ignore",
).merge(replacements, on="sample_id", how="inner", validate="one_to_one")
if len(out) != len(frame):
raise ValueError(f"nonlinear prediction frame lost rows for {split_id}: before={len(frame)} after={len(out)}")
return out
def _expanded_threshold_candidates() -> list[dict[str, float]]:
values = itertools.product(
[0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.60],
[0.05, 0.10, 0.20, 0.30, 0.40, 0.50],
[0.45, 0.65, 0.85, 1.00],
[-5.0, 0.0, 3.0, 5.0, 8.0],
[0.00, 0.01, 0.02, 0.05],
)
return [
{
"long_open_prob": direction_prob,
"short_open_prob": direction_prob,
"min_entry_prob": entry_prob,
"max_market_risk_prob": risk_prob,
"min_expected_edge_bps": edge_bps,
"min_direction_margin": margin,
}
for direction_prob, entry_prob, risk_prob, edge_bps, margin in values
]
def _probe_score(metrics: dict[str, Any]) -> float:
if metrics["trade_count"] == 0:
return -1_000_000.0
sample_penalty = max(0, 80 - int(metrics["trade_count"])) * 2.0
return (
float(metrics["avg_weighted_edge_bps"]) * np.sqrt(float(metrics["trade_count"]))
+ float(metrics["total_weighted_edge_bps"]) * 0.03
- float(metrics["max_drawdown_bps"]) * 0.20
- sample_penalty
)
def _verdict(metrics: dict[str, Any]) -> dict[str, Any]:
tune = metrics[TUNE_SPLIT]
validation = metrics[VALIDATION_LOCKED_SPLIT]
stress = metrics[LATEST_STRESS_SPLIT]
passed = (
tune["trade_count"] >= 80
and validation["trade_count"] >= 40
and stress["trade_count"] >= 10
and tune["avg_weighted_edge_bps"] > 0
and validation["avg_weighted_edge_bps"] > 0
and stress["avg_weighted_edge_bps"] > -1.0
)
return {
"status": "PROMISING_DIAGNOSTIC_ONLY" if passed else "NO_STABLE_NONLINEAR_PM_EDGE",
"reason": "只用于判断树模型方向是否值得继续工程化,不代表可上线。",
}
def _x(frame: pd.DataFrame) -> np.ndarray:
return frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32").to_numpy()
def _markdown_report(result: dict[str, Any], top_candidates: pd.DataFrame) -> str:
lines = [
"# Nonlinear PM Probe Report",
"",
"这份报告只做诊断,不导出上线模型。它回答:不加新特征,只换成树模型后,PM 能不能筛出稳定正收益。",
"",
f"- run_id: `{result['run_id']}`",
f"- verdict: `{result['verdict']['status']}`",
f"- candidate_count: `{result['candidate_count']}`",
f"- best_thresholds: `{result['best_thresholds']}`",
"",
"## Split Metrics",
"",
"| split | trades | win_rate | avg_actual_bps | avg_weighted_bps | total_weighted_bps | profit_factor |",
"| --- | ---: | ---: | ---: | ---: | ---: | ---: |",
]
for split_id, metrics in result["split_metrics"].items():
lines.append(
f"| {split_id} | {metrics['trade_count']} | {metrics['win_rate']:.4f} | "
f"{metrics['avg_actual_edge_bps']:.4f} | {metrics['avg_weighted_edge_bps']:.4f} | "
f"{metrics['total_weighted_edge_bps']:.4f} | {metrics['profit_factor']:.4f} |"
)
lines.extend(["", "## Top Tune Candidates", "", _candidate_table(top_candidates), ""])
return "\n".join(lines)
def _candidate_table(frame: pd.DataFrame) -> str:
if frame.empty:
return "无候选。"
columns = [
"long_open_prob",
"short_open_prob",
"min_entry_prob",
"max_market_risk_prob",
"min_expected_edge_bps",
"min_direction_margin",
"trade_count",
"avg_weighted_edge_bps",
"total_weighted_edge_bps",
"profit_factor",
"score",
]
available = [column for column in columns if column in frame.columns]
lines = [
"| " + " | ".join(available) + " |",
"| " + " | ".join(["---" for _ in available]) + " |",
]
for _, row in frame[available].iterrows():
values = []
for column in available:
value = row[column]
if isinstance(value, (float, np.floating)):
values.append(f"{float(value):.6f}")
else:
values.append(str(value))
lines.append("| " + " | ".join(values) + " |")
return "\n".join(lines)
def _jsonable(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _jsonable(item) for key, item in value.items()}
if isinstance(value, list):
return [_jsonable(item) for item in value]
if isinstance(value, (np.integer,)):
return int(value)
if isinstance(value, (np.floating,)):
return float(value)
if isinstance(value, np.ndarray):
return value.tolist()
return value
+5 -4
View File
@@ -165,7 +165,8 @@ def integrated_backtest(args: Any) -> None:
price_plan,
)
stress_trades["eval_split"] = LATEST_STRESS_SPLIT
trades = pd.concat([tune_trades, validation_locked_trades, stress_trades], ignore_index=True)
trade_parts = [part for part in (tune_trades, validation_locked_trades, stress_trades) if not part.empty]
trades = pd.concat(trade_parts, ignore_index=True) if trade_parts else _empty_trade_frame()
metrics = {
TUNE_SPLIT: _trade_metrics(tune_trades),
VALIDATION_LOCKED_SPLIT: _trade_metrics(validation_locked_trades),
@@ -281,7 +282,7 @@ def _probability_implied_edge(entry_prob: pd.Series, price_plan: dict[str, Any])
price_plan.get("costBps", DEFAULT_BACKTEST_PRICE_PLAN["costBps"])
)
probability = pd.to_numeric(entry_prob, errors="coerce").fillna(0.0).clip(lower=0.0, upper=1.0)
# Entry 的概率头比收益回归头稳定。这里用固定止盈止损的盈亏比把概率换成期望收益,
# Entry 的概率头比收益回归头稳定。这里用当前价格计划的盈亏比把概率换成期望收益,
# 让低命中、高赔率计划也能被 PM 正常搜索;真实结果仍由标签里的实际路径收益评估。
return probability * target_net_bps + (1.0 - probability) * stop_net_bps
@@ -690,7 +691,7 @@ def _write_pm_report(path, candidates: pd.DataFrame, best_thresholds: dict[str,
lines = [
"# PM Threshold Report",
"",
"本次不是固定写死阈值,而是在调参集上试一组可复现的阈值。PM 回测使用固定止盈止损后的真实净收益,并且开仓后按持仓结束时间加冷却时间阻止重叠开仓。",
"本次不是固定写死阈值,而是在调参集上试一组可复现的阈值。PM 回测使用当前价格计划的真实净收益,并且开仓后按持仓结束时间加冷却时间阻止重叠开仓。",
"",
"## Best Thresholds",
"",
@@ -716,7 +717,7 @@ def _write_backtest_report(path, result: dict[str, Any]) -> None:
lines = [
"# Integrated Backtest Report",
"",
"这里用验证集模型输出和 PM 阈值生成交易明细,统计净收益、胜率、回撤和分段表现。收益按固定止盈止损计划的真实净收益计算,不使用窗口内最大可拿收益。",
"这里用验证集模型输出和 PM 阈值生成交易明细,统计净收益、胜率、回撤和分段表现。收益按当前价格计划的真实净收益计算,不使用窗口内最大可拿收益。",
"",
"```json",
str(result).replace("'", '"'),
@@ -339,7 +339,7 @@ def _markdown_report(payload: dict[str, Any], summary: pd.DataFrame) -> str:
"",
_markdown_table(top),
"",
"说明:positive_label_rate 和 avg_price_plan_net_edge_bps 按固定止盈止损计划统计;avg_expected_net_edge_bps 只是辅助观察未来最大可拿空间,不能单独决定价格计划。这里选的是下一轮实验用的价格计划,不是上线结论。真正能不能上线仍然看模型训练、PM 搜索、validation_locked 和 latest_stress 回测。",
"说明:positive_label_rate 和 avg_price_plan_net_edge_bps 按当前价格计划统计;avg_expected_net_edge_bps 只是辅助观察未来最大可拿空间,不能单独决定价格计划。这里选的是下一轮实验用的价格计划,不是上线结论。真正能不能上线仍然看模型训练、PM 搜索、validation_locked 和 latest_stress 回测。",
"",
]
return "\n".join(lines)