from __future__ import annotations import itertools import logging from typing import Any import numpy as np import pandas as pd from trader_training.io_utils import read_parquet, run_root, write_json, write_text from trader_training.labels import DEFAULT_COST_CONFIG, DEFAULT_LABEL_CONFIG, ENTRY_LABEL_METHOD, _load_config from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT EVAL_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) DEFAULT_HORIZONS = (30, 45, 60, 90, 120) DEFAULT_TARGETS = (12.0, 16.0, 20.0, 24.0, 32.0, 40.0) DEFAULT_STOPS = (6.0, 8.0, 10.0, 12.0, 16.0) def search_price_plans(args: Any) -> None: root = run_root(args) replay = read_parquet(args.replay_path or root / "replay" / "replay_1m.parquet") features = read_parquet(args.feature_path or root / "feature" / "feature_frame.parquet") label_config = _load_config(args.label_config_path, DEFAULT_LABEL_CONFIG) cost_config = _load_config(args.cost_config_path, DEFAULT_COST_CONFIG) cost_bps = float(cost_config["fee_bps"]) + float(cost_config["slippage_bps"]) + float(cost_config["funding_cost_bps"]) min_expected_edge_bps = float(label_config["entry"]["min_expected_net_edge_bps"]) trainable = features[ features["data_quality_flag"].isin(["OK", "PARTIAL_OPTIONAL"]) & features["split_id"].isin(EVAL_SPLITS) ][["symbol", "open_time_ms", "split_id"]].copy() if trainable.empty: raise ValueError("price plan search needs trainable feature rows") rows: list[dict[str, Any]] = [] for symbol, group in replay.groupby("symbol", sort=False, observed=False): feature_split_by_ms = ( trainable[trainable["symbol"].eq(symbol)] .drop_duplicates("open_time_ms") .set_index("open_time_ms")["split_id"] .to_dict() ) if not feature_split_by_ms: continue symbol_rows = _symbol_plan_rows( symbol, group.sort_values("event_time").reset_index(drop=True), feature_split_by_ms, cost_bps, min_expected_edge_bps, args.horizons or DEFAULT_HORIZONS, args.targets or DEFAULT_TARGETS, args.stops or DEFAULT_STOPS, ) rows.extend(symbol_rows) result = pd.DataFrame(rows) if result.empty: raise ValueError("price plan search produced no candidate rows") summary = _plan_summary(result) best = _select_best_plan(summary) payload = { "run_id": args.run_id, "cost_bps": cost_bps, "min_expected_net_edge_bps": min_expected_edge_bps, "entry_label_method": ENTRY_LABEL_METHOD, "candidate_count": int(summary["plan_id"].nunique()), "best_plan": best, } write_json(root / "price-plan-search" / "price_plan_search_result.json", _jsonable(payload)) write_text(root / "price-plan-search" / "price_plan_search_rows.csv", result.to_csv(index=False)) write_text(root / "price-plan-search" / "price_plan_search_summary.csv", summary.to_csv(index=False)) write_text(root / "price-plan-search" / "price_plan_search_report.md", _markdown_report(payload, summary)) logging.info( "trader.training.price_plan_searched runId=%s candidateCount=%s bestPlan=%s bestScore=%.6f", args.run_id, payload["candidate_count"], best["plan_id"], best["score"], ) def _symbol_plan_rows( symbol: str, replay: pd.DataFrame, feature_split_by_ms: dict[int, str], cost_bps: float, min_expected_edge_bps: float, horizons: tuple[int, ...], targets: tuple[float, ...], stops: tuple[float, ...], ) -> list[dict[str, Any]]: close = replay["close"].astype("float64").to_numpy() high = replay["high"].astype("float64").to_numpy() low = replay["low"].astype("float64").to_numpy() open_time_ms = replay["open_time_ms"].astype("int64").to_numpy() rows: list[dict[str, Any]] = [] for horizon in horizons: if len(replay) <= horizon: continue high_window = np.lib.stride_tricks.sliding_window_view(high, horizon + 1)[:, 1:] low_window = np.lib.stride_tricks.sliding_window_view(low, horizon + 1)[:, 1:] time_window = np.lib.stride_tricks.sliding_window_view(open_time_ms, horizon + 1)[:, 1:] entry_price = close[: len(high_window)] exit_price = close[horizon:] current_ms = open_time_ms[: len(high_window)] expected_times = current_ms.reshape(-1, 1) + np.arange(1, horizon + 1, dtype=np.int64).reshape(1, -1) * 60_000 contiguous = np.all(time_window == expected_times, axis=1) split_values = pd.Series(current_ms).map(feature_split_by_ms).to_numpy() feature_mask = pd.notna(split_values) usable = contiguous & feature_mask if not usable.any(): continue for target_bps, stop_bps in itertools.product(targets, stops): if target_bps - cost_bps < min_expected_edge_bps: continue rows.extend( _plan_side_rows( symbol, horizon, target_bps, stop_bps, "LONG", entry_price, exit_price, high_window, low_window, split_values, usable, cost_bps, min_expected_edge_bps, ) ) rows.extend( _plan_side_rows( symbol, horizon, target_bps, stop_bps, "SHORT", entry_price, exit_price, high_window, low_window, split_values, usable, cost_bps, min_expected_edge_bps, ) ) return rows def _plan_side_rows( symbol: str, horizon: int, target_bps: float, stop_bps: float, side: str, entry_price: np.ndarray, exit_price: np.ndarray, high_window: np.ndarray, low_window: np.ndarray, split_values: np.ndarray, usable: np.ndarray, cost_bps: float, min_expected_edge_bps: float, ) -> list[dict[str, Any]]: if side == "LONG": target_price = entry_price.reshape(-1, 1) * (1.0 + target_bps / 10000.0) stop_price = entry_price.reshape(-1, 1) * (1.0 - stop_bps / 10000.0) target_matrix = high_window >= target_price stop_matrix = low_window <= stop_price timeout_return = (exit_price / entry_price - 1.0) * 10000.0 max_achievable_gross = (np.nanmax(high_window, axis=1) / entry_price - 1.0) * 10000.0 else: target_price = entry_price.reshape(-1, 1) * (1.0 - target_bps / 10000.0) stop_price = entry_price.reshape(-1, 1) * (1.0 + stop_bps / 10000.0) target_matrix = low_window <= target_price stop_matrix = high_window >= stop_price timeout_return = (entry_price / exit_price - 1.0) * 10000.0 max_achievable_gross = (entry_price / np.nanmin(low_window, axis=1) - 1.0) * 10000.0 large = target_matrix.shape[1] + 1 target_any = target_matrix.any(axis=1) stop_any = stop_matrix.any(axis=1) target_index = np.where(target_any, target_matrix.argmax(axis=1), large) stop_index = np.where(stop_any, stop_matrix.argmax(axis=1), large) target_first = target_any & (~stop_any | (target_index < stop_index)) stop_first = stop_any & (~target_any | (stop_index <= target_index)) timeout = ~(target_first | stop_first) gross = np.where(target_first, target_bps, np.where(stop_first, -stop_bps, timeout_return)) price_plan_net = gross - cost_bps expected_net = max_achievable_gross - cost_bps positive = price_plan_net >= min_expected_edge_bps ambiguous = target_any & stop_any & (target_index == stop_index) rows: list[dict[str, Any]] = [] for split_id in EVAL_SPLITS: mask = usable & (split_values == split_id) if not mask.any(): continue plan_id = f"h{horizon}_t{target_bps:g}_s{stop_bps:g}" values = expected_net[mask] plan_values = price_plan_net[mask] target_rate = float(target_first[mask].mean()) stop_rate = float(stop_first[mask].mean()) timeout_rate = float(timeout[mask].mean()) rows.append( { "plan_id": plan_id, "symbol": symbol, "split_id": split_id, "side": side, "horizon_minutes": horizon, "target_bps": target_bps, "stop_bps": stop_bps, "cost_bps": cost_bps, "positive_net_bps": target_bps - cost_bps, "stop_net_bps": -stop_bps - cost_bps, "rows": int(mask.sum()), "target_hit_rate": target_rate, "stop_hit_rate": stop_rate, "timeout_rate": timeout_rate, "ambiguous_rate": float(ambiguous[mask].mean()), "positive_label_rate": float(positive[mask].mean()), "avg_expected_net_edge_bps": float(values.mean()), "median_expected_net_edge_bps": float(np.median(values)), "p95_expected_net_edge_bps": float(np.quantile(values, 0.95)), "avg_price_plan_net_edge_bps": float(plan_values.mean()), "required_target_hit_rate": float((stop_bps + cost_bps) / (target_bps + stop_bps)), "target_rate_margin": float(target_rate - ((stop_bps + cost_bps) / (target_bps + stop_bps))), } ) return rows def _plan_summary(rows: pd.DataFrame) -> pd.DataFrame: group_cols = ["plan_id", "horizon_minutes", "target_bps", "stop_bps", "side"] split_rows = rows.pivot_table( index=group_cols, columns="split_id", values=["positive_label_rate", "avg_expected_net_edge_bps", "avg_price_plan_net_edge_bps", "target_rate_margin", "target_hit_rate", "stop_hit_rate"], aggfunc="mean", ) split_rows.columns = [f"{metric}_{split}" for metric, split in split_rows.columns] split_rows = split_rows.reset_index() for split_id in EVAL_SPLITS: for metric in ("positive_label_rate", "avg_expected_net_edge_bps", "avg_price_plan_net_edge_bps", "target_rate_margin", "target_hit_rate", "stop_hit_rate"): column = f"{metric}_{split_id}" if column not in split_rows.columns: split_rows[column] = np.nan split_rows["min_positive_label_rate_eval"] = split_rows[ [f"positive_label_rate_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] ].min(axis=1) split_rows["max_positive_label_rate_eval"] = split_rows[ [f"positive_label_rate_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] ].max(axis=1) split_rows["avg_edge_eval"] = split_rows[ [f"avg_expected_net_edge_bps_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] ].mean(axis=1) split_rows["avg_price_plan_edge_eval"] = split_rows[ [f"avg_price_plan_net_edge_bps_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] ].mean(axis=1) split_rows["min_price_plan_edge_eval"] = split_rows[ [f"avg_price_plan_net_edge_bps_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] ].min(axis=1) split_rows["min_margin_eval"] = split_rows[ [f"target_rate_margin_{split}" for split in (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT)] ].min(axis=1) # The search score is not an上线门槛. It only chooses the next experiment. # Fixed price-plan net edge is weighted most heavily; max future edge is # retained only as a weak tie-breaker because it can be optimistic. positive_rate_penalty = ( (0.03 - split_rows["min_positive_label_rate_eval"]).clip(lower=0.0) * 120.0 + (split_rows["max_positive_label_rate_eval"] - 0.55).clip(lower=0.0) * 20.0 ) spread_bonus = np.log1p((split_rows["target_bps"] - split_rows["stop_bps"]).clip(lower=0.0)) split_rows["score"] = ( split_rows["avg_price_plan_edge_eval"] * 8.0 + split_rows["min_price_plan_edge_eval"] * 3.0 + split_rows["min_margin_eval"] * 80.0 + split_rows["avg_edge_eval"] * 0.05 - positive_rate_penalty + spread_bonus ) return split_rows.sort_values("score", ascending=False).reset_index(drop=True) def _select_best_plan(summary: pd.DataFrame) -> dict[str, Any]: candidates = summary[ (summary["min_positive_label_rate_eval"] >= 0.03) & (summary["max_positive_label_rate_eval"] <= 0.55) & (summary["avg_price_plan_edge_eval"] > 0.0) & (summary["min_price_plan_edge_eval"] > -1.0) & (summary["target_bps"] > summary["stop_bps"]) ] if candidates.empty: candidates = summary[summary["target_bps"] > summary["stop_bps"]] if candidates.empty: candidates = summary row = candidates.sort_values("score", ascending=False).iloc[0] return { "plan_id": str(row["plan_id"]), "horizon_minutes": int(row["horizon_minutes"]), "target_bps": float(row["target_bps"]), "stop_bps": float(row["stop_bps"]), "side": str(row["side"]), "score": float(row["score"]), "avg_edge_eval": float(row["avg_edge_eval"]), "avg_price_plan_edge_eval": float(row["avg_price_plan_edge_eval"]), "min_price_plan_edge_eval": float(row["min_price_plan_edge_eval"]), "min_margin_eval": float(row["min_margin_eval"]), "min_positive_label_rate_eval": float(row["min_positive_label_rate_eval"]), "max_positive_label_rate_eval": float(row["max_positive_label_rate_eval"]), } def _markdown_report(payload: dict[str, Any], summary: pd.DataFrame) -> str: top = summary.head(20) lines = [ "# Price Plan Search Report", "", f"- run_id: `{payload['run_id']}`", f"- cost_bps: {payload['cost_bps']}", f"- min_expected_net_edge_bps: {payload['min_expected_net_edge_bps']}", f"- entry_label_method: `{payload['entry_label_method']}`", f"- candidate_count: {payload['candidate_count']}", "", "## Best Plan For Next Experiment", "", "```json", str(payload["best_plan"]).replace("'", '"'), "```", "", "## Top Plans", "", _markdown_table(top), "", "说明:positive_label_rate 和 avg_price_plan_net_edge_bps 按当前价格计划统计;avg_expected_net_edge_bps 只是辅助观察未来最大可拿空间,不能单独决定价格计划。这里选的是下一轮实验用的价格计划,不是上线结论。真正能不能上线仍然看模型训练、PM 搜索、validation_locked 和 latest_stress 回测。", "", ] return "\n".join(lines) def _markdown_table(frame: pd.DataFrame) -> str: if frame.empty: return "无数据。" columns = list(frame.columns) lines = ["| " + " | ".join(columns) + " |", "| " + " | ".join("---" for _ in columns) + " |"] for row in frame.to_dict("records"): values = [] for column in columns: value = row.get(column, "") if isinstance(value, float): value = round(value, 6) values.append(str(value)) lines.append("| " + " | ".join(values) + " |") return "\n".join(lines) def _jsonable(value: Any) -> Any: if isinstance(value, dict): return {str(key): _jsonable(item) for key, item in value.items()} if isinstance(value, list): return [_jsonable(item) for item in value] if isinstance(value, tuple): return [_jsonable(item) for item in value] if isinstance(value, (np.integer,)): return int(value) if isinstance(value, (np.floating,)): return float(value) return value