From e8420f76fe8a7db90cd23e2cd9edfb71e4c93c1b Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 28 Jun 2026 09:00:15 +0800 Subject: [PATCH] Add Direction label and PM probe diagnostics --- .../28_build_direction_opportunity_dataset.py | 28 ++ training/scripts/29_probe_nonlinear_pm.py | 19 ++ training/tests/test_training_contract.py | 63 ++++ training/trader_training/diagnostics.py | 81 ++++- .../direction_opportunity_dataset.py | 140 ++++++++ .../trader_training/nonlinear_pm_probe.py | 308 ++++++++++++++++++ training/trader_training/pm.py | 9 +- training/trader_training/price_plan_search.py | 2 +- 8 files changed, 634 insertions(+), 16 deletions(-) create mode 100644 training/scripts/28_build_direction_opportunity_dataset.py create mode 100644 training/scripts/29_probe_nonlinear_pm.py create mode 100644 training/trader_training/direction_opportunity_dataset.py create mode 100644 training/trader_training/nonlinear_pm_probe.py diff --git a/training/scripts/28_build_direction_opportunity_dataset.py b/training/scripts/28_build_direction_opportunity_dataset.py new file mode 100644 index 0000000..d33de7d --- /dev/null +++ b/training/scripts/28_build_direction_opportunity_dataset.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import _bootstrap # noqa: F401 +from trader_training.direction_opportunity_dataset import build_direction_opportunity_dataset +from trader_training.io_utils import add_common_args, setup_logging + + +def main() -> None: + parser = argparse.ArgumentParser() + add_common_args(parser) + parser.add_argument("--direction-dataset-path", type=Path) + parser.add_argument("--entry-dataset-path", type=Path) + parser.add_argument("--output-path", type=Path) + parser.add_argument("--opportunity-bps", type=float, required=True) + parser.add_argument("--min-advantage-bps", type=float, default=5.0) + parser.add_argument("--long-edge-column", default="long_max_achievable_net_edge_bps") + parser.add_argument("--short-edge-column", default="short_max_achievable_net_edge_bps") + parser.add_argument("--label-method", default="DIRECTION_OPPORTUNITY_FROM_ENTRY_MFE_V1") + args = parser.parse_args() + setup_logging() + build_direction_opportunity_dataset(args) + + +if __name__ == "__main__": + main() diff --git a/training/scripts/29_probe_nonlinear_pm.py b/training/scripts/29_probe_nonlinear_pm.py new file mode 100644 index 0000000..fa504c8 --- /dev/null +++ b/training/scripts/29_probe_nonlinear_pm.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +import argparse + +import _bootstrap # noqa: F401 +from trader_training.io_utils import add_common_args, setup_logging +from trader_training.nonlinear_pm_probe import probe_nonlinear_pm + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run diagnostic nonlinear Direction + Entry PM probe.") + add_common_args(parser) + args = parser.parse_args() + setup_logging() + probe_nonlinear_pm(args) + + +if __name__ == "__main__": + main() diff --git a/training/tests/test_training_contract.py b/training/tests/test_training_contract.py index a06ddd8..6854c4b 100644 --- a/training/tests/test_training_contract.py +++ b/training/tests/test_training_contract.py @@ -15,17 +15,20 @@ if str(TRAINING_ROOT) not in sys.path: from trader_training.onnx_export import LinearHead, export_heads from trader_training.conditional_entry_probe import probe_conditional_entry_training +from trader_training.direction_opportunity_dataset import _opportunity_labels from trader_training.dynamic_exit_search import search_dynamic_exit_plans from trader_training.entry_condition_pair_screen import screen_entry_condition_pairs from trader_training.entry_feature_screen import _bucket_edges, _screen_edge_column from trader_training.entry_mae_label_diagnostic import diagnose_entry_mae_labels from trader_training.io_utils import read_json, write_json from trader_training.labels import ENTRY_LABEL_METHOD, _path_stats_for_group, build_entry_labels +from trader_training.nonlinear_pm_probe import _expanded_threshold_candidates from trader_training.ofi_feature_experiment import _load_entry_dataset, l1_snapshot_diff_ofi_quote from trader_training.promote import promote_artifact_bundle from trader_training.replay import build_splits from trader_training.schemas import FEATURE_ORDER, LATEST_STRESS_SPLIT, MODEL_OUTPUTS, OUTPUT_MAPPING, TRAINING_SPLITS, VALIDATION_LOCKED_SPLIT from trader_training.training import TARGETS, _head_train_mask +from trader_training.diagnostics import _label_summary class TrainingContractTest(unittest.TestCase): @@ -40,6 +43,21 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual(set(fields), set(OUTPUT_MAPPING[model_name])) self.assertEqual([f"prediction[{idx}]" for idx in range(len(fields))], [OUTPUT_MAPPING[model_name][field] for field in fields]) + def test_nonlinear_pm_probe_expands_low_probability_thresholds(self) -> None: + candidates = _expanded_threshold_candidates() + + self.assertIn( + { + "long_open_prob": 0.2, + "short_open_prob": 0.2, + "min_entry_prob": 0.05, + "max_market_risk_prob": 0.45, + "min_expected_edge_bps": -5.0, + "min_direction_margin": 0.0, + }, + candidates, + ) + def test_entry_feature_screen_prefers_actual_plan_edge(self) -> None: dataset = pd.DataFrame( { @@ -79,6 +97,51 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual("ALL_FIT_ROWS", default_filter) self.assertEqual([True, True, True, True], default_mask.tolist()) + def test_direction_opportunity_labels_choose_clear_path_opportunity(self) -> None: + labels = _opportunity_labels( + np.array([45.0, 10.0, 45.0, 42.0, np.nan]), + np.array([20.0, 50.0, 43.0, 48.0, 50.0]), + opportunity_bps=40.0, + min_advantage_bps=5.0, + ) + + self.assertEqual([1, 0, 0, 0, 0], labels["long_target"].tolist()) + self.assertEqual([0, 1, 0, 1, 1], labels["short_target"].tolist()) + self.assertEqual([0, 0, 1, 0, 0], labels["neutral_target"].tolist()) + + def test_diagnostics_reads_actual_training_dataset_labels(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + dataset_dir = root / "dataset" + dataset_dir.mkdir(parents=True) + pd.DataFrame( + { + "sample_id": ["s1", "s2"], + "split_id": ["fit_inner", "fit_inner"], + "long_target": [1, 0], + "short_target": [0, 0], + "neutral_target": [0, 1], + "future_return_bps": [5.0, -1.0], + } + ).to_parquet(dataset_dir / "direction_train.parquet", index=False) + pd.DataFrame( + { + "sample_id": ["s1", "s2"], + "split_id": ["fit_inner", "fit_inner"], + "long_entry_target": [1, 0], + "short_entry_target": [0, 1], + "long_actual_plan_net_edge_bps": [8.0, -6.0], + "short_actual_plan_net_edge_bps": [-5.0, 7.0], + } + ).to_parquet(dataset_dir / "entry_train.parquet", index=False) + + summary = _label_summary(root) + + self.assertEqual("dataset/direction_train.parquet", summary["fit_inner"]["direction"]["source"]) + self.assertEqual({"LONG": 0.5, "SHORT": 0.0, "NEUTRAL": 0.5}, summary["fit_inner"]["direction"]["label_ratio"]) + self.assertEqual("dataset/entry_train.parquet", summary["fit_inner"]["entry"]["source"]) + self.assertEqual(0.5, summary["fit_inner"]["entry"]["target_rate_by_side"]["LONG"]) + def test_entry_feature_screen_keeps_zero_inflated_event_features(self) -> None: values = np.concatenate((np.zeros(5000), np.linspace(1.0, 100.0, 500))) edges = _bucket_edges(values) diff --git a/training/trader_training/diagnostics.py b/training/trader_training/diagnostics.py index 63e1323..c0fc082 100644 --- a/training/trader_training/diagnostics.py +++ b/training/trader_training/diagnostics.py @@ -36,8 +36,8 @@ def diagnose_training_run(args: Any) -> None: def _label_summary(root) -> dict[str, Any]: - direction = read_parquet(root / "label" / "direction_labels.parquet") - entry = read_parquet(root / "label" / "entry_labels.parquet") + direction = read_parquet(root / "dataset" / "direction_train.parquet") + entry = read_parquet(root / "dataset" / "entry_train.parquet") summary: dict[str, Any] = {} for split_id in DIAGNOSTIC_SPLITS: direction_split = direction[direction["split_id"].eq(split_id)].copy() @@ -45,28 +45,57 @@ def _label_summary(root) -> dict[str, Any]: item: dict[str, Any] = {"direction": {}, "entry": {}} if not direction_split.empty: item["direction"] = { + "source": "dataset/direction_train.parquet", "rows": len(direction_split), - "label_ratio": direction_split["direction_label"].value_counts(normalize=True).round(6).to_dict(), + "label_ratio": _direction_target_ratio(direction_split), "future_return_bps_quantile": _quantiles(direction_split["future_return_bps"], (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99)), } if not entry_split.empty: - if "actual_plan_net_edge_bps" not in entry_split.columns: - raise ValueError("entry_labels is missing actual_plan_net_edge_bps for diagnostics") - grouped = entry_split.groupby("side", observed=False) + required = { + "long_entry_target", + "short_entry_target", + "long_actual_plan_net_edge_bps", + "short_actual_plan_net_edge_bps", + } + missing = sorted(required - set(entry_split.columns)) + if missing: + raise ValueError(f"entry_train is missing columns required by diagnostics: {missing}") item["entry"] = { + "source": "dataset/entry_train.parquet", "rows": len(entry_split), - "target_rate_by_side": grouped["entry_target"].mean().round(6).to_dict(), + "target_rate_by_side": { + "LONG": float(entry_split["long_entry_target"].astype(float).mean()), + "SHORT": float(entry_split["short_entry_target"].astype(float).mean()), + }, "edge_column": "actual_plan_net_edge_bps", - "edge_mean_by_side": grouped["actual_plan_net_edge_bps"].mean().round(6).to_dict(), + "edge_mean_by_side": { + "LONG": float(entry_split["long_actual_plan_net_edge_bps"].astype(float).mean()), + "SHORT": float(entry_split["short_actual_plan_net_edge_bps"].astype(float).mean()), + }, "edge_quantile_by_side": { - str(side): _quantiles(group["actual_plan_net_edge_bps"], (0.05, 0.5, 0.95)) - for side, group in grouped + "LONG": _quantiles(entry_split["long_actual_plan_net_edge_bps"], (0.05, 0.5, 0.95)), + "SHORT": _quantiles(entry_split["short_actual_plan_net_edge_bps"], (0.05, 0.5, 0.95)), }, } summary[split_id] = item return summary +def _direction_target_ratio(frame: pd.DataFrame) -> dict[str, float]: + required = {"long_target", "short_target", "neutral_target"} + missing = sorted(required - set(frame.columns)) + if missing: + raise ValueError(f"direction_train is missing target columns required by diagnostics: {missing}") + rows = len(frame) + if rows == 0: + return {"LONG": 0.0, "SHORT": 0.0, "NEUTRAL": 0.0} + return { + "LONG": float(frame["long_target"].astype(float).mean()), + "SHORT": float(frame["short_target"].astype(float).mean()), + "NEUTRAL": float(frame["neutral_target"].astype(float).mean()), + } + + def _pm_summary(root) -> dict[str, Any]: summary: dict[str, Any] = {} config_path = root / "pm-search" / "position_manager_config.json" @@ -259,7 +288,7 @@ def _diagnostic_conclusion(pm_summary: dict[str, Any]) -> dict[str, Any]: if validation.get("avg_weighted_edge_bps", 0.0) <= 0 and stress.get("avg_weighted_edge_bps", 0.0) <= 0: return { "status": "PRICE_PLAN_OR_ENTRY_NOT_TRADABLE", - "plain_reason": "按固定止盈止损真实收益算,验证集和压力集选出来的交易平均都不赚钱。", + "plain_reason": "按当前价格计划真实收益算,验证集和压力集选出来的交易平均都不赚钱。", "next_action": "优先重新搜索价格计划,再重建 Entry 标签和模型;不要只放松 PM 阈值。", } return { @@ -296,10 +325,12 @@ def _markdown_report(payload: dict[str, Any]) -> str: lines.append("") if direction: lines.append(f"- Direction 行数: {direction['rows']}") + lines.append(f"- Direction 来源: `{direction['source']}`") lines.append(f"- Direction 标签比例: `{direction['label_ratio']}`") lines.append(f"- 45 分钟未来收益分位: `{direction['future_return_bps_quantile']}`") if entry: lines.append(f"- Entry 行数: {entry['rows']}") + lines.append(f"- Entry 来源: `{entry['source']}`") lines.append(f"- Entry 命中率: `{entry['target_rate_by_side']}`") lines.append(f"- Entry 平均净收益: `{entry['edge_mean_by_side']}`") lines.append("") @@ -312,6 +343,7 @@ def _markdown_report(payload: dict[str, Any]) -> str: lines.append(f"- 当前阈值: `{item['active_thresholds']}`") lines.append(f"- 当前阈值选中交易: `{item['selected_trade_metrics']}`") lines.append(f"- 网格里有交易的候选数: {item['grid_search_any_trade']['candidates_with_trade']} / {item['grid_search_any_trade']['candidate_count']}") + lines.extend(_score_distribution_markdown(item["score_distribution"])) lines.append("") for side in ("long", "short"): lines.append(f"#### {side.upper()}") @@ -334,6 +366,33 @@ def _markdown_report(payload: dict[str, Any]) -> str: return "\n".join(lines) + "\n" +def _score_distribution_markdown(distribution: dict[str, dict[str, float]]) -> list[str]: + watched_columns = [ + "long_prob", + "short_prob", + "long_entry_prob", + "short_entry_prob", + "market_risk_prob", + "pred_long_expected_net_edge_bps", + "pred_short_expected_net_edge_bps", + ] + lines = ["", "#### 分数分布", "", "| 字段 | 最小 | 5% | 中位数 | 95% | 最大 |", "| --- | ---: | ---: | ---: | ---: | ---: |"] + for column in watched_columns: + quantiles = distribution.get(column) + if not quantiles: + continue + lines.append( + "| " + + column + + f" | {quantiles.get('0.0', 0.0):.4f}" + + f" | {quantiles.get('0.05', 0.0):.4f}" + + f" | {quantiles.get('0.5', 0.0):.4f}" + + f" | {quantiles.get('0.95', 0.0):.4f}" + + f" | {quantiles.get('1.0', 0.0):.4f} |" + ) + return lines + + def _jsonable(value: Any) -> Any: if isinstance(value, dict): return {str(key): _jsonable(item) for key, item in value.items()} diff --git a/training/trader_training/direction_opportunity_dataset.py b/training/trader_training/direction_opportunity_dataset.py new file mode 100644 index 0000000..04693f2 --- /dev/null +++ b/training/trader_training/direction_opportunity_dataset.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +import logging +from typing import Any + +import numpy as np +import pandas as pd + +from trader_training.io_utils import manifest, read_parquet, require_columns, run_root, write_json, write_parquet, write_text +from trader_training.schemas import FEATURE_ORDER + + +def build_direction_opportunity_dataset(args: Any) -> None: + root = run_root(args) + direction_path = args.direction_dataset_path or root / "dataset" / "direction_train.parquet" + entry_path = args.entry_dataset_path or root / "dataset" / "entry_train.parquet" + output_path = args.output_path or root / "dataset" / "direction_train.parquet" + opportunity_bps = float(args.opportunity_bps) + min_advantage_bps = float(args.min_advantage_bps) + long_edge_column = str(args.long_edge_column) + short_edge_column = str(args.short_edge_column) + label_method = str(args.label_method) + + direction = read_parquet(direction_path) + entry = read_parquet(entry_path) + require_columns(direction, ("sample_id", "split_id", *FEATURE_ORDER), "direction_train") + require_columns(entry, ("sample_id", long_edge_column, short_edge_column), "entry_train") + + opportunity = entry[["sample_id", long_edge_column, short_edge_column]].copy() + merged = direction.drop(columns=["long_target", "short_target", "neutral_target"], errors="ignore").merge(opportunity, on="sample_id", how="inner", validate="one_to_one") + if len(merged) != len(direction): + raise ValueError(f"direction opportunity dataset lost rows: before={len(direction)} after={len(merged)}") + + labels = _opportunity_labels( + pd.to_numeric(merged[long_edge_column], errors="coerce").to_numpy(dtype="float64"), + pd.to_numeric(merged[short_edge_column], errors="coerce").to_numpy(dtype="float64"), + opportunity_bps, + min_advantage_bps, + ) + merged["long_target"] = labels["long_target"] + merged["short_target"] = labels["short_target"] + merged["neutral_target"] = labels["neutral_target"] + # 保留 future_return_bps 作为排查字段;训练目标以三列 target 为准。 + ordered = [column for column in direction.columns if column in merged.columns and column not in {"long_target", "short_target", "neutral_target"}] + ordered.extend(["long_target", "short_target", "neutral_target"]) + for column in (long_edge_column, short_edge_column): + if column not in ordered: + ordered.append(column) + out = merged[ordered].copy() + data_hash = write_parquet(output_path, out) + result = { + "dataset": manifest( + output_path, + { + "row_count": len(out), + "feature_count": len(FEATURE_ORDER), + "data_hash_sha256": data_hash, + "split_counts": out["split_id"].value_counts().to_dict(), + }, + ), + "label_method": label_method, + "long_edge_column": long_edge_column, + "short_edge_column": short_edge_column, + "opportunity_bps": opportunity_bps, + "min_advantage_bps": min_advantage_bps, + "target_counts": { + "long": int(out["long_target"].sum()), + "short": int(out["short_target"].sum()), + "neutral": int(out["neutral_target"].sum()), + }, + "target_rates_by_split": _target_rates_by_split(out), + } + write_json(root / "dataset" / "direction_opportunity_dataset_result.json", result) + write_text(root / "dataset" / "direction_opportunity_dataset_report.md", _markdown_report(result)) + logging.info( + "trader.training.direction_opportunity_dataset_written runId=%s opportunityBps=%.6f minAdvantageBps=%.6f rowCount=%s outputPath=%s", + args.run_id, + opportunity_bps, + min_advantage_bps, + len(out), + output_path, + ) + + +def _opportunity_labels(long_edge: np.ndarray, short_edge: np.ndarray, opportunity_bps: float, min_advantage_bps: float) -> dict[str, np.ndarray]: + long_clean = np.nan_to_num(long_edge, nan=-np.inf) + short_clean = np.nan_to_num(short_edge, nan=-np.inf) + long_ok = long_clean >= opportunity_bps + short_ok = short_clean >= opportunity_bps + long_wins = long_ok & ((long_clean - short_clean) >= min_advantage_bps) + short_wins = short_ok & ((short_clean - long_clean) >= min_advantage_bps) + neutral = ~(long_wins | short_wins) + return { + "long_target": long_wins.astype("int8"), + "short_target": short_wins.astype("int8"), + "neutral_target": neutral.astype("int8"), + } + + +def _target_rates_by_split(frame: pd.DataFrame) -> dict[str, dict[str, float]]: + result: dict[str, dict[str, float]] = {} + for split_id, part in frame.groupby("split_id", observed=False): + rows = len(part) + result[str(split_id)] = { + "rows": float(rows), + "long_rate": float(part["long_target"].mean()) if rows else 0.0, + "short_rate": float(part["short_target"].mean()) if rows else 0.0, + "neutral_rate": float(part["neutral_target"].mean()) if rows else 0.0, + } + return result + + +def _markdown_report(result: dict[str, Any]) -> str: + lines = [ + "# Direction 机会标签数据集报告", + "", + "这份数据集把 Direction 目标从“未来收盘收益方向”改为“未来路径里哪边有可交易空间”。", + "", + f"- label_method: `{result['label_method']}`", + f"- long_edge_column: `{result['long_edge_column']}`", + f"- short_edge_column: `{result['short_edge_column']}`", + f"- opportunity_bps: `{result['opportunity_bps']}`", + f"- min_advantage_bps: `{result['min_advantage_bps']}`", + f"- row_count: `{result['dataset']['row_count']}`", + "", + "## 标签数量", + "", + f"- long: `{result['target_counts']['long']}`", + f"- short: `{result['target_counts']['short']}`", + f"- neutral: `{result['target_counts']['neutral']}`", + "", + "## 分段比例", + "", + "| split | rows | long | short | neutral |", + "| --- | ---: | ---: | ---: | ---: |", + ] + for split_id, item in result["target_rates_by_split"].items(): + lines.append(f"| {split_id} | {int(item['rows'])} | {item['long_rate']:.4f} | {item['short_rate']:.4f} | {item['neutral_rate']:.4f} |") + lines.append("") + return "\n".join(lines) diff --git a/training/trader_training/nonlinear_pm_probe.py b/training/trader_training/nonlinear_pm_probe.py new file mode 100644 index 0000000..7e9c584 --- /dev/null +++ b/training/trader_training/nonlinear_pm_probe.py @@ -0,0 +1,308 @@ +from __future__ import annotations + +import itertools +import logging +from typing import Any + +import numpy as np +import pandas as pd +from sklearn.ensemble import HistGradientBoostingClassifier, HistGradientBoostingRegressor + +from trader_training.io_utils import read_parquet, run_root, write_json, write_text +from trader_training.pm import _pm_config_from_thresholds, _pm_frame, _price_plan_context, _simulate_open_trades, _trade_metrics +from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT + + +EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) + + +def probe_nonlinear_pm(args: Any) -> None: + root = run_root(args) + direction_dataset = read_parquet(root / "dataset" / "direction_train.parquet") + entry_dataset = read_parquet(root / "dataset" / "entry_train.parquet") + direction_model = _fit_direction_model(direction_dataset) + entry_models = _fit_entry_models(direction_dataset, entry_dataset) + frames = { + split_id: _prediction_frame(root, split_id, direction_dataset, entry_dataset, direction_model, entry_models) + for split_id in EVAL_SPLITS + } + price_plan = _price_plan_context(root) + candidates = _expanded_threshold_candidates() + tune_rows: list[dict[str, Any]] = [] + best_thresholds: dict[str, float] | None = None + best_tune_metrics: dict[str, Any] | None = None + best_score = -float("inf") + + for thresholds in candidates: + trades = _simulate_open_trades(frames[TUNE_SPLIT], thresholds, _pm_config_from_thresholds(thresholds), price_plan) + metrics = _trade_metrics(trades) + score = _probe_score(metrics) + tune_rows.append({**thresholds, **metrics, "score": score}) + if score > best_score: + best_score = score + best_thresholds = thresholds + best_tune_metrics = metrics + + if best_thresholds is None or best_tune_metrics is None: + raise ValueError("nonlinear PM probe did not evaluate any threshold candidate") + + split_metrics: dict[str, Any] = {} + for split_id, frame in frames.items(): + trades = _simulate_open_trades(frame, best_thresholds, _pm_config_from_thresholds(best_thresholds), price_plan) + split_metrics[split_id] = _trade_metrics(trades) + + tune_frame = pd.DataFrame(tune_rows).sort_values("score", ascending=False).reset_index(drop=True) + result = { + "run_id": args.run_id, + "purpose": "diagnostic_only_not_exported", + "model_family": "sklearn_hist_gradient_boosting", + "candidate_count": len(candidates), + "best_thresholds": best_thresholds, + "best_tune_metrics": best_tune_metrics, + "split_metrics": split_metrics, + "verdict": _verdict(split_metrics), + } + out_dir = root / "diagnostics" + write_json(out_dir / "nonlinear_pm_probe_result.json", _jsonable(result)) + write_text(out_dir / "nonlinear_pm_probe_candidates.csv", tune_frame.head(200).to_csv(index=False)) + write_text(out_dir / "nonlinear_pm_probe_report.md", _markdown_report(result, tune_frame.head(20))) + logging.info( + "trader.training.nonlinear_pm_probe_written runId=%s verdict=%s tuneTrades=%s validationTrades=%s stressTrades=%s", + args.run_id, + result["verdict"]["status"], + split_metrics[TUNE_SPLIT]["trade_count"], + split_metrics[VALIDATION_LOCKED_SPLIT]["trade_count"], + split_metrics[LATEST_STRESS_SPLIT]["trade_count"], + ) + + +def _fit_direction_model(dataset: pd.DataFrame) -> HistGradientBoostingClassifier: + train = dataset[dataset["split_id"].eq(FIT_SPLIT)].copy() + y = train[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1) + model = HistGradientBoostingClassifier( + max_iter=160, + learning_rate=0.04, + max_leaf_nodes=31, + l2_regularization=0.02, + early_stopping=True, + random_state=41, + ) + model.fit(_x(train), y) + return model + + +def _fit_entry_models(direction_dataset: pd.DataFrame, entry_dataset: pd.DataFrame) -> dict[str, Any]: + merged = entry_dataset.merge( + direction_dataset[["sample_id", "long_target", "short_target"]], + on="sample_id", + how="inner", + validate="one_to_one", + ) + train = merged[merged["split_id"].eq(FIT_SPLIT)].copy() + return { + "long_entry_prob": _fit_binary_head(train[train["long_target"].eq(1)], "long_entry_target", seed=43), + "short_entry_prob": _fit_binary_head(train[train["short_target"].eq(1)], "short_entry_target", seed=47), + "long_expected_net_edge_bps": _fit_regression_head(train[train["long_target"].eq(1)], "long_actual_plan_net_edge_bps", seed=53), + "short_expected_net_edge_bps": _fit_regression_head(train[train["short_target"].eq(1)], "short_actual_plan_net_edge_bps", seed=59), + } + + +def _fit_binary_head(train: pd.DataFrame, target: str, seed: int) -> HistGradientBoostingClassifier: + if len(train) < 1000: + raise ValueError(f"not enough rows to train nonlinear Entry head {target}: {len(train)}") + y = train[target].astype(int).to_numpy() + if len(np.unique(y)) < 2: + raise ValueError(f"nonlinear Entry head {target} has only one class") + model = HistGradientBoostingClassifier( + max_iter=180, + learning_rate=0.04, + max_leaf_nodes=31, + l2_regularization=0.02, + early_stopping=True, + random_state=seed, + ) + model.fit(_x(train), y) + return model + + +def _fit_regression_head(train: pd.DataFrame, target: str, seed: int) -> HistGradientBoostingRegressor: + if len(train) < 1000: + raise ValueError(f"not enough rows to train nonlinear Entry head {target}: {len(train)}") + model = HistGradientBoostingRegressor( + max_iter=180, + learning_rate=0.04, + max_leaf_nodes=31, + l2_regularization=0.02, + early_stopping=True, + random_state=seed, + ) + model.fit(_x(train), train[target].astype(float).to_numpy()) + return model + + +def _prediction_frame( + root, + split_id: str, + direction_dataset: pd.DataFrame, + entry_dataset: pd.DataFrame, + direction_model: HistGradientBoostingClassifier, + entry_models: dict[str, Any], +) -> pd.DataFrame: + frame = _pm_frame(root, split_id).copy() + direction_split = direction_dataset[direction_dataset["split_id"].eq(split_id)].copy() + entry_split = entry_dataset[entry_dataset["split_id"].eq(split_id)].copy() + direction_proba = direction_model.predict_proba(_x(direction_split)) + direction_pred = direction_split[["sample_id"]].copy() + direction_pred["long_prob"] = direction_proba[:, 0] + direction_pred["short_prob"] = direction_proba[:, 1] + direction_pred["neutral_prob"] = direction_proba[:, 2] + + entry_pred = entry_split[["sample_id"]].copy() + entry_pred["long_entry_prob"] = entry_models["long_entry_prob"].predict_proba(_x(entry_split))[:, 1] + entry_pred["short_entry_prob"] = entry_models["short_entry_prob"].predict_proba(_x(entry_split))[:, 1] + entry_pred["pred_long_expected_net_edge_bps"] = entry_models["long_expected_net_edge_bps"].predict(_x(entry_split)) + entry_pred["pred_short_expected_net_edge_bps"] = entry_models["short_expected_net_edge_bps"].predict(_x(entry_split)) + replacements = direction_pred.merge(entry_pred, on="sample_id", how="inner", validate="one_to_one") + out = frame.drop( + columns=[ + "long_prob", + "short_prob", + "neutral_prob", + "long_entry_prob", + "short_entry_prob", + "pred_long_expected_net_edge_bps", + "pred_short_expected_net_edge_bps", + ], + errors="ignore", + ).merge(replacements, on="sample_id", how="inner", validate="one_to_one") + if len(out) != len(frame): + raise ValueError(f"nonlinear prediction frame lost rows for {split_id}: before={len(frame)} after={len(out)}") + return out + + +def _expanded_threshold_candidates() -> list[dict[str, float]]: + values = itertools.product( + [0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50, 0.60], + [0.05, 0.10, 0.20, 0.30, 0.40, 0.50], + [0.45, 0.65, 0.85, 1.00], + [-5.0, 0.0, 3.0, 5.0, 8.0], + [0.00, 0.01, 0.02, 0.05], + ) + return [ + { + "long_open_prob": direction_prob, + "short_open_prob": direction_prob, + "min_entry_prob": entry_prob, + "max_market_risk_prob": risk_prob, + "min_expected_edge_bps": edge_bps, + "min_direction_margin": margin, + } + for direction_prob, entry_prob, risk_prob, edge_bps, margin in values + ] + + +def _probe_score(metrics: dict[str, Any]) -> float: + if metrics["trade_count"] == 0: + return -1_000_000.0 + sample_penalty = max(0, 80 - int(metrics["trade_count"])) * 2.0 + return ( + float(metrics["avg_weighted_edge_bps"]) * np.sqrt(float(metrics["trade_count"])) + + float(metrics["total_weighted_edge_bps"]) * 0.03 + - float(metrics["max_drawdown_bps"]) * 0.20 + - sample_penalty + ) + + +def _verdict(metrics: dict[str, Any]) -> dict[str, Any]: + tune = metrics[TUNE_SPLIT] + validation = metrics[VALIDATION_LOCKED_SPLIT] + stress = metrics[LATEST_STRESS_SPLIT] + passed = ( + tune["trade_count"] >= 80 + and validation["trade_count"] >= 40 + and stress["trade_count"] >= 10 + and tune["avg_weighted_edge_bps"] > 0 + and validation["avg_weighted_edge_bps"] > 0 + and stress["avg_weighted_edge_bps"] > -1.0 + ) + return { + "status": "PROMISING_DIAGNOSTIC_ONLY" if passed else "NO_STABLE_NONLINEAR_PM_EDGE", + "reason": "只用于判断树模型方向是否值得继续工程化,不代表可上线。", + } + + +def _x(frame: pd.DataFrame) -> np.ndarray: + return frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).astype("float32").to_numpy() + + +def _markdown_report(result: dict[str, Any], top_candidates: pd.DataFrame) -> str: + lines = [ + "# Nonlinear PM Probe Report", + "", + "这份报告只做诊断,不导出上线模型。它回答:不加新特征,只换成树模型后,PM 能不能筛出稳定正收益。", + "", + f"- run_id: `{result['run_id']}`", + f"- verdict: `{result['verdict']['status']}`", + f"- candidate_count: `{result['candidate_count']}`", + f"- best_thresholds: `{result['best_thresholds']}`", + "", + "## Split Metrics", + "", + "| split | trades | win_rate | avg_actual_bps | avg_weighted_bps | total_weighted_bps | profit_factor |", + "| --- | ---: | ---: | ---: | ---: | ---: | ---: |", + ] + for split_id, metrics in result["split_metrics"].items(): + lines.append( + f"| {split_id} | {metrics['trade_count']} | {metrics['win_rate']:.4f} | " + f"{metrics['avg_actual_edge_bps']:.4f} | {metrics['avg_weighted_edge_bps']:.4f} | " + f"{metrics['total_weighted_edge_bps']:.4f} | {metrics['profit_factor']:.4f} |" + ) + lines.extend(["", "## Top Tune Candidates", "", _candidate_table(top_candidates), ""]) + return "\n".join(lines) + + +def _candidate_table(frame: pd.DataFrame) -> str: + if frame.empty: + return "无候选。" + columns = [ + "long_open_prob", + "short_open_prob", + "min_entry_prob", + "max_market_risk_prob", + "min_expected_edge_bps", + "min_direction_margin", + "trade_count", + "avg_weighted_edge_bps", + "total_weighted_edge_bps", + "profit_factor", + "score", + ] + available = [column for column in columns if column in frame.columns] + lines = [ + "| " + " | ".join(available) + " |", + "| " + " | ".join(["---" for _ in available]) + " |", + ] + for _, row in frame[available].iterrows(): + values = [] + for column in available: + value = row[column] + if isinstance(value, (float, np.floating)): + values.append(f"{float(value):.6f}") + else: + values.append(str(value)) + lines.append("| " + " | ".join(values) + " |") + return "\n".join(lines) + + +def _jsonable(value: Any) -> Any: + if isinstance(value, dict): + return {str(key): _jsonable(item) for key, item in value.items()} + if isinstance(value, list): + return [_jsonable(item) for item in value] + if isinstance(value, (np.integer,)): + return int(value) + if isinstance(value, (np.floating,)): + return float(value) + if isinstance(value, np.ndarray): + return value.tolist() + return value diff --git a/training/trader_training/pm.py b/training/trader_training/pm.py index ad89315..573175a 100644 --- a/training/trader_training/pm.py +++ b/training/trader_training/pm.py @@ -165,7 +165,8 @@ def integrated_backtest(args: Any) -> None: price_plan, ) stress_trades["eval_split"] = LATEST_STRESS_SPLIT - trades = pd.concat([tune_trades, validation_locked_trades, stress_trades], ignore_index=True) + trade_parts = [part for part in (tune_trades, validation_locked_trades, stress_trades) if not part.empty] + trades = pd.concat(trade_parts, ignore_index=True) if trade_parts else _empty_trade_frame() metrics = { TUNE_SPLIT: _trade_metrics(tune_trades), VALIDATION_LOCKED_SPLIT: _trade_metrics(validation_locked_trades), @@ -281,7 +282,7 @@ def _probability_implied_edge(entry_prob: pd.Series, price_plan: dict[str, Any]) price_plan.get("costBps", DEFAULT_BACKTEST_PRICE_PLAN["costBps"]) ) probability = pd.to_numeric(entry_prob, errors="coerce").fillna(0.0).clip(lower=0.0, upper=1.0) - # Entry 的概率头比收益回归头稳定。这里用固定止盈止损的盈亏比把概率换成期望收益, + # Entry 的概率头比收益回归头稳定。这里用当前价格计划的盈亏比把概率换成期望收益, # 让低命中、高赔率计划也能被 PM 正常搜索;真实结果仍由标签里的实际路径收益评估。 return probability * target_net_bps + (1.0 - probability) * stop_net_bps @@ -690,7 +691,7 @@ def _write_pm_report(path, candidates: pd.DataFrame, best_thresholds: dict[str, lines = [ "# PM Threshold Report", "", - "本次不是固定写死阈值,而是在调参集上试一组可复现的阈值。PM 回测使用固定止盈止损后的真实净收益,并且开仓后按持仓结束时间加冷却时间阻止重叠开仓。", + "本次不是固定写死阈值,而是在调参集上试一组可复现的阈值。PM 回测使用当前价格计划的真实净收益,并且开仓后按持仓结束时间加冷却时间阻止重叠开仓。", "", "## Best Thresholds", "", @@ -716,7 +717,7 @@ def _write_backtest_report(path, result: dict[str, Any]) -> None: lines = [ "# Integrated Backtest Report", "", - "这里用验证集模型输出和 PM 阈值生成交易明细,统计净收益、胜率、回撤和分段表现。收益按固定止盈止损计划的真实净收益计算,不使用窗口内最大可拿收益。", + "这里用验证集模型输出和 PM 阈值生成交易明细,统计净收益、胜率、回撤和分段表现。收益按当前价格计划的真实净收益计算,不使用窗口内最大可拿收益。", "", "```json", str(result).replace("'", '"'), diff --git a/training/trader_training/price_plan_search.py b/training/trader_training/price_plan_search.py index 151d568..0fc154c 100644 --- a/training/trader_training/price_plan_search.py +++ b/training/trader_training/price_plan_search.py @@ -339,7 +339,7 @@ def _markdown_report(payload: dict[str, Any], summary: pd.DataFrame) -> str: "", _markdown_table(top), "", - "说明:positive_label_rate 和 avg_price_plan_net_edge_bps 按固定止盈止损计划统计;avg_expected_net_edge_bps 只是辅助观察未来最大可拿空间,不能单独决定价格计划。这里选的是下一轮实验用的价格计划,不是上线结论。真正能不能上线仍然看模型训练、PM 搜索、validation_locked 和 latest_stress 回测。", + "说明:positive_label_rate 和 avg_price_plan_net_edge_bps 按当前价格计划统计;avg_expected_net_edge_bps 只是辅助观察未来最大可拿空间,不能单独决定价格计划。这里选的是下一轮实验用的价格计划,不是上线结论。真正能不能上线仍然看模型训练、PM 搜索、validation_locked 和 latest_stress 回测。", "", ] return "\n".join(lines)