diff --git a/training/scripts/22_train_state_continue_experiment.py b/training/scripts/22_train_state_continue_experiment.py index acdc134..1f5028b 100644 --- a/training/scripts/22_train_state_continue_experiment.py +++ b/training/scripts/22_train_state_continue_experiment.py @@ -13,6 +13,10 @@ def main() -> None: parser.add_argument("--baseline-run-id", required=True) parser.add_argument("--ages-minutes", default="5,15,30") parser.add_argument("--max-rows-per-split", type=int, default=0) + parser.add_argument("--regressor-kind", choices=["huber", "ridge"], default="huber") + parser.add_argument("--ridge-alpha", type=float, default=10.0) + parser.add_argument("--huber-max-iter", type=int, default=1000) + parser.add_argument("--regression-target-clip-bps", type=float, default=0.0) args = parser.parse_args() setup_logging() run_state_continue_experiment(args) diff --git a/training/tests/test_state_continue_experiment.py b/training/tests/test_state_continue_experiment.py new file mode 100644 index 0000000..4ed433c --- /dev/null +++ b/training/tests/test_state_continue_experiment.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +import sys +import tempfile +import unittest +from pathlib import Path + +import numpy as np +import pandas as pd + +TRAINING_ROOT = Path(__file__).resolve().parents[1] +if str(TRAINING_ROOT) not in sys.path: + sys.path.insert(0, str(TRAINING_ROOT)) + +from trader_training.onnx_export import LinearHead, export_heads +from trader_training.schemas import FEATURE_ORDER +from trader_training.state_continue_experiment import STATE_FEATURES, _predict_frozen_linear_model, _state_rows_for_age, _train_side_models, _verdict + + +class StateContinueExperimentTest(unittest.TestCase): + def test_state_rows_include_required_position_and_frozen_entry_features(self) -> None: + row = { + "current_sample_id": "s0", + "symbol": "BTC-USDT-PERP", + "current_event_time": pd.Timestamp("2026-01-01T00:05:00Z"), + "current_open_time_ms": 300_000, + "side": "LONG", + "split_id": "fit_inner", + "walk_forward_fold": 0, + "time_in_position_minutes": 5, + "entry_price": 100.0, + "current_price": 100.1, + "high_since_entry": 100.2, + "low_since_entry": 99.95, + "future_return_bps": 12.0, + "mae_bps": 3.0, + "entry_predicted_edge_bps": 8.5, + "entry_direction_prob": 0.64, + "add_count": 0.0, + "minutes_since_last_add": 9999.0, + } + for feature_name in FEATURE_ORDER: + row[feature_name] = 0.0 + frame = pd.DataFrame([row]) + + out = _state_rows_for_age(frame, stop_bps=8.0, target_bps=12.0, cost_bps=6.5) + + self.assertEqual(set(STATE_FEATURES), set(STATE_FEATURES).intersection(out.columns)) + self.assertAlmostEqual(5.5, float(out.iloc[0]["expected_continue_edge_bps"])) + self.assertEqual(1, int(out.iloc[0]["continue_target"])) + self.assertAlmostEqual(8.5, float(out.iloc[0]["entry_predicted_edge_bps"])) + self.assertAlmostEqual(0.64, float(out.iloc[0]["entry_direction_prob"]), places=6) + self.assertAlmostEqual(0.0, float(out.iloc[0]["add_count"])) + self.assertAlmostEqual(9999.0, float(out.iloc[0]["minutes_since_last_add"])) + + def test_frozen_linear_onnx_weights_are_read_without_row_by_row_runtime(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + model_path = Path(tmp) / "direction.onnx" + export_heads( + model_path, + [ + LinearHead( + "direction", + "softmax", + np.zeros((len(FEATURE_ORDER), 3), dtype=np.float32), + np.array([0.0, 1.0, 2.0], dtype=np.float32), + ), + LinearHead( + "long_expected_net_edge_bps", + "identity", + np.zeros((len(FEATURE_ORDER), 1), dtype=np.float32), + np.array([7.25], dtype=np.float32), + ), + ], + feature_count=len(FEATURE_ORDER), + ) + frame = pd.DataFrame({"sample_id": ["s0", "s1"]}) + for feature_name in FEATURE_ORDER: + frame[feature_name] = 0.0 + + out = _predict_frozen_linear_model( + model_path, + frame, + { + "direction": ("softmax", ("long_prob", "short_prob", "neutral_prob")), + "long_expected_net_edge_bps": ("identity", ("long_edge",)), + }, + ) + + self.assertEqual(["s0", "s1"], out["sample_id"].tolist()) + self.assertTrue(np.allclose(1.0, out[["long_prob", "short_prob", "neutral_prob"]].sum(axis=1))) + self.assertLess(float(out.iloc[0]["long_prob"]), float(out.iloc[0]["neutral_prob"])) + self.assertAlmostEqual(7.25, float(out.iloc[0]["long_edge"]), places=6) + + def test_verdict_refuses_state_continue_when_edge_mae_is_not_good_enough(self) -> None: + results = {} + for side in ("long", "short"): + results[f"{side}_market_only"] = { + "validation_locked": {"continue_auc": 0.61, "edge_mae_vs_constant_ratio": 0.985}, + "latest_stress": {"continue_auc": 0.62, "edge_mae_vs_constant_ratio": 0.984}, + "regressor_converged": True, + } + results[f"{side}_market_plus_state"] = { + "validation_locked": {"continue_auc": 0.63, "edge_mae_vs_constant_ratio": 0.979}, + "latest_stress": {"continue_auc": 0.64, "edge_mae_vs_constant_ratio": 0.978}, + "regressor_converged": True, + } + + verdict = _verdict(results) + + self.assertEqual("NOT_READY_FOR_FORMAL_CHAIN", verdict["status"]) + self.assertTrue(any("above 0.97" in reason for reason in verdict["reasons"])) + + def test_train_side_models_supports_ridge_regressor_diagnostic(self) -> None: + rows = [] + for split_id in ("fit_inner", "tune_inner", "validation_locked", "latest_stress"): + for index, target in enumerate((0, 1)): + row = { + "sample_id": f"{split_id}-{index}", + "symbol": "BTC-USDT-PERP", + "event_time": pd.Timestamp("2026-01-01T00:00:00Z") + pd.Timedelta(minutes=len(rows)), + "split_id": split_id, + "position_side": "LONG", + "continue_target": target, + "expected_continue_edge_bps": -3.0 if target == 0 else 6.0, + } + for feature_name in FEATURE_ORDER: + row[feature_name] = float(index) + for feature_name in STATE_FEATURES: + row[feature_name] = float(index) + rows.append(row) + frame = pd.DataFrame(rows) + + metrics, predictions = _train_side_models( + frame, + "LONG", + [*FEATURE_ORDER, *STATE_FEATURES], + regressor_kind="ridge", + ridge_alpha=1.0, + regression_target_clip_bps=5.0, + ) + + self.assertEqual("ridge", metrics["regressor_kind"]) + self.assertEqual(5.0, metrics["regression_target_clip_bps"]) + self.assertTrue(metrics["regressor_converged"]) + self.assertEqual(8, len(predictions)) + self.assertIn("time_in_position_minutes", predictions.columns) + + +if __name__ == "__main__": + unittest.main() diff --git a/training/trader_training/state_continue_experiment.py b/training/trader_training/state_continue_experiment.py index 4af9fcf..a17d0d6 100644 --- a/training/trader_training/state_continue_experiment.py +++ b/training/trader_training/state_continue_experiment.py @@ -7,12 +7,12 @@ from typing import Any import numpy as np import pandas as pd -from sklearn.linear_model import HuberRegressor, LogisticRegression +from sklearn.linear_model import HuberRegressor, LogisticRegression, Ridge from sklearn.metrics import brier_score_loss, mean_absolute_error, roc_auc_score from sklearn.preprocessing import StandardScaler from trader_training.io_utils import read_json, read_parquet, run_root, sha256_json, write_json, write_parquet, write_text -from trader_training.labels import _build_path_stats +from trader_training.labels import DEFAULT_LABEL_CONFIG, _build_path_stats from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT @@ -24,6 +24,10 @@ STATE_FEATURES = [ "mae_since_entry_bps", "distance_to_stop_bps", "distance_to_target_bps", + "entry_predicted_edge_bps", + "entry_direction_prob", + "add_count", + "minutes_since_last_add", ] EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) @@ -35,22 +39,32 @@ def run_state_continue_experiment(args: Any) -> None: baseline_root = args.data_root / "trader-v4" / "runs" / args.baseline_run_id out_dir = root / "experiments" / "state_continue" ages = _parse_ages(args.ages_minutes) + regressor_kind = getattr(args, "regressor_kind", "huber") + ridge_alpha = float(getattr(args, "ridge_alpha", 10.0)) + huber_max_iter = int(getattr(args, "huber_max_iter", 1000)) + regression_target_clip_bps = float(getattr(args, "regression_target_clip_bps", 0.0)) logging.info( - "trader.training.state_continue_experiment_started runId=%s baselineRunId=%s ages=%s", + "trader.training.state_continue_experiment_started runId=%s baselineRunId=%s ages=%s regressorKind=%s ridgeAlpha=%s huberMaxIter=%s regressionTargetClipBps=%s", args.run_id, args.baseline_run_id, ages, + regressor_kind, + ridge_alpha, + huber_max_iter, + regression_target_clip_bps, ) feature = _load_feature_frame(baseline_root) - entry = _load_entry_labels(baseline_root) + entry = _load_entry_labels(baseline_root, feature) replay = _load_replay(baseline_root) plan = read_json(baseline_root / "label" / "price_plan_context.json") stop_bps = float(plan["stopDistanceBps"]) target_bps = float(plan["targetDistanceBps"]) cost_bps = float(plan["costBps"]) + continue_horizon = int(DEFAULT_LABEL_CONFIG["continue"]["horizon_minutes"]) + min_continue_edge_bps = float(DEFAULT_LABEL_CONFIG["continue"]["min_expected_continue_edge_bps"]) - state_frame = _build_state_frame(feature, entry, replay, ages, stop_bps, target_bps, cost_bps) + state_frame = _build_state_frame(feature, entry, replay, ages, stop_bps, target_bps, cost_bps, continue_horizon, min_continue_edge_bps) if args.max_rows_per_split: state_frame = _cap_rows_per_split(state_frame, int(args.max_rows_per_split)) dataset_hash = write_parquet(out_dir / "state_continue_train.parquet", state_frame) @@ -62,17 +76,33 @@ def run_state_continue_experiment(args: Any) -> None: out_dir / "state_continue_train.parquet", ) - source_manifest = _source_manifest(args, baseline_root, ages, stop_bps, target_bps, cost_bps, state_frame, dataset_hash) + source_manifest = _source_manifest( + args, + baseline_root, + ages, + stop_bps, + target_bps, + cost_bps, + continue_horizon, + min_continue_edge_bps, + state_frame, + dataset_hash, + regressor_kind, + ridge_alpha, + huber_max_iter, + regression_target_clip_bps, + ) write_json(out_dir / "experiment_manifest.json", source_manifest) write_json(out_dir / "position_state_feature_schema.json", _state_feature_schema()) order_hash = write_json(out_dir / "position_state_feature_order.json", STATE_FEATURES) write_json( out_dir / "position_state_source_manifest.json", { - "entry_predicted_edge_bps": "NOT_USED_IN_THIS_DIAGNOSTIC", - "entry_direction_prob": "NOT_USED_IN_THIS_DIAGNOSTIC", + "entry_predicted_edge_bps": "run-10 frozen ENTRY ONNX output selected by entry side", + "entry_direction_prob": "run-10 frozen DIRECTION ONNX output selected by entry side", "out_of_fold_used": False, - "frozen_model_output_used": False, + "frozen_model_output_used": True, + "frozen_model_output_policy": "baseline model is fixed and is not retrained inside this experiment", "replay_decision_trace_used": False, "state_feature_order_hash": order_hash, "row_count": len(state_frame), @@ -90,7 +120,7 @@ def run_state_continue_experiment(args: Any) -> None: side_frame = state_frame[state_frame["position_side"].eq(side)].copy() for feature_set_name, feature_columns in feature_sets.items(): key = f"{side.lower()}_{feature_set_name}" - result, predictions = _train_side_models(side_frame, side, feature_columns) + result, predictions = _train_side_models(side_frame, side, feature_columns, regressor_kind, ridge_alpha, huber_max_iter, regression_target_clip_bps) results[key] = result predictions["side"] = side predictions["feature_set"] = feature_set_name @@ -105,9 +135,11 @@ def run_state_continue_experiment(args: Any) -> None: ) predictions = pd.concat(prediction_frames, ignore_index=True) if prediction_frames else pd.DataFrame() + verdict = _verdict(results) write_parquet(out_dir / "state_continue_predictions.parquet", predictions) write_json(out_dir / "state_continue_result.json", results) - write_text(out_dir / "state_continue_experiment_report.md", _report(args, baseline_root, source_manifest, results)) + write_json(out_dir / "state_continue_verdict.json", verdict) + write_text(out_dir / "state_continue_experiment_report.md", _report(args, baseline_root, source_manifest, results, verdict)) logging.info("trader.training.state_continue_experiment_finished runId=%s report=%s", args.run_id, out_dir / "state_continue_experiment_report.md") @@ -129,7 +161,7 @@ def _load_feature_frame(baseline_root: Path) -> pd.DataFrame: return feature -def _load_entry_labels(baseline_root: Path) -> pd.DataFrame: +def _load_entry_labels(baseline_root: Path, feature: pd.DataFrame) -> pd.DataFrame: entry = read_parquet(baseline_root / "label" / "entry_labels.parquet") required = {"sample_id", "symbol", "event_time", "side", "entry_target", "split_id", "walk_forward_fold"} missing = sorted(required.difference(entry.columns)) @@ -137,7 +169,82 @@ def _load_entry_labels(baseline_root: Path) -> pd.DataFrame: raise ValueError(f"baseline entry labels missing columns: {missing}") entry = entry[(entry["entry_target"] == 1) & (entry["side"].isin(["LONG", "SHORT"]))].copy() entry["entry_open_time_ms"] = pd.to_datetime(entry["event_time"], utc=True).astype("int64") // 1_000_000 - return entry[["sample_id", "symbol", "event_time", "side", "entry_open_time_ms"]].copy() + entry_scores = _frozen_entry_scores_by_sample(baseline_root, feature) + entry = entry.merge(entry_scores, on="sample_id", how="inner") + if entry.empty: + raise ValueError("state continue entry set is empty after merging frozen baseline model outputs") + long_mask = entry["side"].eq("LONG") + entry["entry_predicted_edge_bps"] = np.where( + long_mask, + entry["frozen_long_expected_net_edge_bps"], + entry["frozen_short_expected_net_edge_bps"], + ) + entry["entry_direction_prob"] = np.where(long_mask, entry["frozen_long_prob"], entry["frozen_short_prob"]) + return entry[["sample_id", "symbol", "event_time", "side", "entry_open_time_ms", "entry_predicted_edge_bps", "entry_direction_prob"]].copy() + + +def _frozen_entry_scores_by_sample(baseline_root: Path, feature: pd.DataFrame) -> pd.DataFrame: + source = feature[["sample_id", *FEATURE_ORDER]].drop_duplicates("sample_id").copy() + direction = _predict_frozen_linear_model( + baseline_root / "model" / "direction" / "direction.onnx", + source, + { + "direction": ("softmax", ("frozen_long_prob", "frozen_short_prob", "frozen_neutral_prob")), + }, + ) + entry = _predict_frozen_linear_model( + baseline_root / "model" / "entry" / "entry.onnx", + source, + { + "long_entry_prob": ("sigmoid", ("frozen_long_entry_prob",)), + "short_entry_prob": ("sigmoid", ("frozen_short_entry_prob",)), + "long_expected_net_edge_bps": ("identity", ("frozen_long_expected_net_edge_bps",)), + "short_expected_net_edge_bps": ("identity", ("frozen_short_expected_net_edge_bps",)), + }, + ) + return direction.merge(entry, on="sample_id", how="inner") + + +def _predict_frozen_linear_model(model_path: Path, frame: pd.DataFrame, heads: dict[str, tuple[str, tuple[str, ...]]]) -> pd.DataFrame: + try: + import onnx + from onnx import numpy_helper + except ModuleNotFoundError as exc: + raise SystemExit("Python package 'onnx' is required to read frozen baseline ONNX weights.") from exc + if not model_path.is_file(): + raise FileNotFoundError(f"frozen model is missing: {model_path}") + model = onnx.load(model_path) + initializers = {item.name: numpy_helper.to_array(item) for item in model.graph.initializer} + x = frame[FEATURE_ORDER].apply(pd.to_numeric, errors="coerce").replace([np.inf, -np.inf], np.nan).fillna(0.0).astype("float32").to_numpy() + out = pd.DataFrame({"sample_id": frame["sample_id"].to_numpy()}) + for head_name, (kind, output_columns) in heads.items(): + weight_name = f"{head_name}_W" + bias_name = f"{head_name}_B" + if weight_name not in initializers or bias_name not in initializers: + raise ValueError(f"frozen model {model_path} is missing head initializers: {head_name}") + values = x @ np.asarray(initializers[weight_name], dtype=np.float32) + np.asarray(initializers[bias_name], dtype=np.float32).reshape(1, -1) + if kind == "softmax": + values = _softmax(values) + elif kind == "sigmoid": + values = _sigmoid(values) + elif kind != "identity": + raise ValueError(f"unsupported frozen head kind: {kind}") + if values.shape[1] != len(output_columns): + raise ValueError(f"head {head_name} output width mismatch: {values.shape[1]} != {len(output_columns)}") + for index, column in enumerate(output_columns): + out[column] = values[:, index].astype("float32") + return out + + +def _softmax(values: np.ndarray) -> np.ndarray: + shifted = values - np.max(values, axis=1, keepdims=True) + exp = np.exp(shifted) + return exp / exp.sum(axis=1, keepdims=True) + + +def _sigmoid(values: np.ndarray) -> np.ndarray: + clipped = np.clip(values, -50.0, 50.0) + return 1.0 / (1.0 + np.exp(-clipped)) def _load_replay(baseline_root: Path) -> pd.DataFrame: @@ -159,8 +266,10 @@ def _build_state_frame( stop_bps: float, target_bps: float, cost_bps: float, + continue_horizon: int, + min_continue_edge_bps: float, ) -> pd.DataFrame: - future_stats = _build_path_stats(replay, horizon=30, target_bps=target_bps, stop_bps=stop_bps) + future_stats = _build_path_stats(replay, horizon=continue_horizon, target_bps=target_bps, stop_bps=stop_bps) future_stats = future_stats.rename(columns={"open_time_ms": "current_open_time_ms"}) current_feature = feature.rename(columns={"sample_id": "current_sample_id", "event_time": "current_event_time", "open_time_ms": "current_open_time_ms"}) replay_state_source = _state_source_by_age(replay, ages) @@ -168,6 +277,8 @@ def _build_state_frame( for age in ages: candidates = entry.copy() candidates["time_in_position_minutes"] = age + candidates["add_count"] = 0.0 + candidates["minutes_since_last_add"] = 9999.0 candidates["current_open_time_ms"] = candidates["entry_open_time_ms"] + age * 60_000 candidates = candidates.merge( replay_state_source[replay_state_source["time_in_position_minutes"].eq(age)], @@ -183,7 +294,7 @@ def _build_state_frame( ) if candidates.empty: continue - frames.append(_state_rows_for_age(candidates, stop_bps, target_bps, cost_bps)) + frames.append(_state_rows_for_age(candidates, stop_bps, target_bps, cost_bps, min_continue_edge_bps)) logging.info("trader.training.state_continue_age_built ageMinutes=%s rowCount=%s", age, len(candidates)) if not frames: raise ValueError("state continue experiment produced no rows") @@ -216,7 +327,7 @@ def _state_source_by_age(replay: pd.DataFrame, ages: list[int]) -> pd.DataFrame: return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame() -def _state_rows_for_age(frame: pd.DataFrame, stop_bps: float, target_bps: float, cost_bps: float) -> pd.DataFrame: +def _state_rows_for_age(frame: pd.DataFrame, stop_bps: float, target_bps: float, cost_bps: float, min_continue_edge_bps: float = 5.0) -> pd.DataFrame: side_sign = np.where(frame["side"].eq("LONG"), 1.0, -1.0) entry_price = frame["entry_price"].astype(float) current_price = frame["current_price"].astype(float) @@ -232,7 +343,7 @@ def _state_rows_for_age(frame: pd.DataFrame, stop_bps: float, target_bps: float, distance_to_stop = np.where(long_mask, (current_price / stop_price - 1.0) * 10000.0, (stop_price / current_price - 1.0) * 10000.0) distance_to_target = np.where(long_mask, (target_price / current_price - 1.0) * 10000.0, (current_price / target_price - 1.0) * 10000.0) expected_edge = frame["future_return_bps"].astype(float) - cost_bps - continue_target = ((expected_edge >= 2.0) & (frame["mae_bps"].astype(float) < stop_bps)).astype("int8") + continue_target = ((expected_edge >= min_continue_edge_bps) & (frame["mae_bps"].astype(float) < stop_bps)).astype("int8") out = frame[ [ @@ -261,6 +372,10 @@ def _state_rows_for_age(frame: pd.DataFrame, stop_bps: float, target_bps: float, out["mae_since_entry_bps"] = np.maximum(mae, 0.0).astype("float32") out["distance_to_stop_bps"] = distance_to_stop.astype("float32") out["distance_to_target_bps"] = distance_to_target.astype("float32") + out["entry_predicted_edge_bps"] = frame["entry_predicted_edge_bps"].astype("float32") + out["entry_direction_prob"] = frame["entry_direction_prob"].astype("float32") + out["add_count"] = frame["add_count"].astype("float32") + out["minutes_since_last_add"] = frame["minutes_since_last_add"].astype("float32") out["continue_target"] = continue_target out["expected_continue_edge_bps"] = expected_edge.astype("float32") return out @@ -276,7 +391,15 @@ def _cap_rows_per_split(frame: pd.DataFrame, max_rows_per_split: int) -> pd.Data return pd.concat(capped, ignore_index=True) -def _train_side_models(frame: pd.DataFrame, side: str, feature_columns: list[str]) -> tuple[dict[str, Any], pd.DataFrame]: +def _train_side_models( + frame: pd.DataFrame, + side: str, + feature_columns: list[str], + regressor_kind: str = "huber", + ridge_alpha: float = 10.0, + huber_max_iter: int = 1000, + regression_target_clip_bps: float = 0.0, +) -> tuple[dict[str, Any], pd.DataFrame]: train = frame[frame["split_id"].eq(FIT_SPLIT)].copy() if train.empty: raise ValueError(f"state continue {side} has no fit_inner rows") @@ -284,11 +407,20 @@ def _train_side_models(frame: pd.DataFrame, side: str, feature_columns: list[str x_train = scaler.fit_transform(train[feature_columns].astype("float32")) y_train_cls = train["continue_target"].astype(int).to_numpy() y_train_reg = train["expected_continue_edge_bps"].astype(float).to_numpy() + y_train_fit = y_train_reg + if regression_target_clip_bps > 0: + y_train_fit = np.clip(y_train_reg, -regression_target_clip_bps, regression_target_clip_bps) clf = LogisticRegression(max_iter=500) clf.fit(x_train, y_train_cls) - reg = HuberRegressor(alpha=0.001, epsilon=1.35, max_iter=300) - reg.fit(x_train, y_train_reg) + reg_max_iter = huber_max_iter + if regressor_kind == "huber": + reg = HuberRegressor(alpha=0.001, epsilon=1.35, max_iter=reg_max_iter) + elif regressor_kind == "ridge": + reg = Ridge(alpha=ridge_alpha) + else: + raise ValueError(f"unsupported state continue regressor kind: {regressor_kind}") + reg.fit(x_train, y_train_fit) metrics: dict[str, Any] = {} prediction_frames: list[pd.DataFrame] = [] @@ -301,14 +433,39 @@ def _train_side_models(frame: pd.DataFrame, side: str, feature_columns: list[str y_reg = part["expected_continue_edge_bps"].astype(float).to_numpy() proba = clf.predict_proba(x)[:, 1] pred_edge = reg.predict(x) + if regression_target_clip_bps > 0: + pred_edge = np.clip(pred_edge, -regression_target_clip_bps, regression_target_clip_bps) metrics[split_id] = _split_metrics(y_train_cls, y_train_reg, y_cls, y_reg, proba, pred_edge) - pred_frame = part[["sample_id", "symbol", "event_time", "split_id", "position_side", "continue_target", "expected_continue_edge_bps"]].copy() + pred_frame = part[ + [ + "sample_id", + "symbol", + "event_time", + "split_id", + "position_side", + "time_in_position_minutes", + "unrealized_pnl_bps", + "mfe_since_entry_bps", + "mae_since_entry_bps", + "entry_predicted_edge_bps", + "entry_direction_prob", + "continue_target", + "expected_continue_edge_bps", + ] + ].copy() pred_frame["continue_prob"] = proba.astype("float32") pred_frame["predicted_continue_edge_bps"] = pred_edge.astype("float32") prediction_frames.append(pred_frame) metrics["row_count"] = int(len(frame)) metrics["feature_count"] = len(feature_columns) metrics["feature_hash"] = sha256_json(feature_columns) + n_iter = getattr(reg, "n_iter_", None) + metrics["regressor_kind"] = regressor_kind + metrics["ridge_alpha"] = ridge_alpha if regressor_kind == "ridge" else None + metrics["regressor_iterations"] = int(n_iter) if n_iter is not None else 0 + metrics["regressor_max_iter"] = reg_max_iter + metrics["regressor_converged"] = True if n_iter is None else 0 <= metrics["regressor_iterations"] < reg_max_iter + metrics["regression_target_clip_bps"] = regression_target_clip_bps if regression_target_clip_bps > 0 else None return metrics, pd.concat(prediction_frames, ignore_index=True) @@ -346,8 +503,14 @@ def _source_manifest( stop_bps: float, target_bps: float, cost_bps: float, + continue_horizon: int, + min_continue_edge_bps: float, state_frame: pd.DataFrame, dataset_hash: str, + regressor_kind: str, + ridge_alpha: float, + huber_max_iter: int, + regression_target_clip_bps: float, ) -> dict[str, Any]: return { "experiment": "state_continue_diagnostic_v1", @@ -358,6 +521,12 @@ def _source_manifest( "target_bps": target_bps, "stop_bps": stop_bps, "cost_bps": cost_bps, + "continue_horizon_minutes": continue_horizon, + "min_continue_edge_bps": min_continue_edge_bps, + "regressor_kind": regressor_kind, + "ridge_alpha": ridge_alpha if regressor_kind == "ridge" else None, + "huber_max_iter": huber_max_iter if regressor_kind == "huber" else None, + "regression_target_clip_bps": regression_target_clip_bps if regression_target_clip_bps > 0 else None, "dataset_hash_sha256": dataset_hash, "row_count": int(len(state_frame)), "split_counts": state_frame["split_id"].value_counts().to_dict(), @@ -370,8 +539,10 @@ def _source_manifest( "leakage_policy": { "uses_future_entry_label_as_feature": False, "uses_same_round_model_prediction_as_feature": False, - "entry_predicted_edge_bps": "not used", - "entry_direction_prob": "not used", + "entry_predicted_edge_bps": "baseline frozen ENTRY ONNX output selected by side", + "entry_direction_prob": "baseline frozen DIRECTION ONNX output selected by side", + "add_count": "synthetic first-position diagnostic, fixed to 0", + "minutes_since_last_add": "synthetic first-position diagnostic, fixed to 9999", }, } @@ -385,10 +556,54 @@ def _state_feature_schema() -> list[dict[str, Any]]: {"name": "mae_since_entry_bps", "unit": "bps", "source": "low/high since entry", "leakage_check": "uses only entry..current low/high"}, {"name": "distance_to_stop_bps", "unit": "bps", "source": "price plan and current close", "leakage_check": "uses fixed plan and current price"}, {"name": "distance_to_target_bps", "unit": "bps", "source": "price plan and current close", "leakage_check": "uses fixed plan and current price"}, + {"name": "entry_predicted_edge_bps", "unit": "bps", "source": "baseline frozen ENTRY ONNX", "leakage_check": "baseline model is fixed before this experiment"}, + {"name": "entry_direction_prob", "unit": "probability", "source": "baseline frozen DIRECTION ONNX", "leakage_check": "baseline model is fixed before this experiment"}, + {"name": "add_count", "unit": "count", "source": "synthetic position state", "leakage_check": "known at current position time"}, + {"name": "minutes_since_last_add", "unit": "minute", "source": "synthetic position state", "leakage_check": "known at current position time"}, ] -def _report(args: Any, baseline_root: Path, manifest: dict[str, Any], results: dict[str, Any]) -> str: +def _verdict(results: dict[str, Any]) -> dict[str, Any]: + reasons: list[str] = [] + passed_checks: list[str] = [] + for side in ("long", "short"): + plus = results[f"{side}_market_plus_state"] + base = results[f"{side}_market_only"] + if not plus.get("regressor_converged"): + reasons.append(f"{side} market_plus_state regressor did not converge") + for split_id in (VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT): + plus_metric = plus.get(split_id, {}) + base_metric = base.get(split_id, {}) + plus_auc = plus_metric.get("continue_auc") + base_auc = base_metric.get("continue_auc") + plus_mae = plus_metric.get("edge_mae_vs_constant_ratio") + base_mae = base_metric.get("edge_mae_vs_constant_ratio") + if plus_auc is None or plus_auc < 0.60: + reasons.append(f"{side} {split_id} continue_auc below 0.60: {plus_auc}") + elif base_auc is not None and plus_auc <= base_auc: + reasons.append(f"{side} {split_id} continue_auc not better than market_only: {plus_auc} <= {base_auc}") + else: + passed_checks.append(f"{side} {split_id} continue_auc") + if plus_mae is None or plus_mae > 0.97: + reasons.append(f"{side} {split_id} edge_mae_vs_constant_ratio above 0.97: {plus_mae}") + elif base_mae is not None and plus_mae >= base_mae: + reasons.append(f"{side} {split_id} edge_mae_vs_constant_ratio not better than market_only: {plus_mae} >= {base_mae}") + else: + passed_checks.append(f"{side} {split_id} edge_mae_vs_constant_ratio") + return { + "status": "PASS_TO_FORMAL_CHAIN" if not reasons else "NOT_READY_FOR_FORMAL_CHAIN", + "acceptance_rule": { + "validation_and_latest_auc_min": 0.60, + "validation_and_latest_edge_mae_vs_constant_max": 0.97, + "must_beat_market_only": True, + "regressor_must_converge": True, + }, + "passed_checks": passed_checks, + "reasons": reasons, + } + + +def _report(args: Any, baseline_root: Path, manifest: dict[str, Any], results: dict[str, Any], verdict: dict[str, Any]) -> str: baseline = read_json(baseline_root / "model" / "model_train_manifest.json") continue_metrics = baseline["CONTINUE"]["metrics"] lines = [ @@ -398,6 +613,11 @@ def _report(args: Any, baseline_root: Path, manifest: dict[str, Any], results: d f"- baseline_run_id: `{args.baseline_run_id}`", f"- row_count: `{manifest['row_count']}`", f"- ages_minutes: `{manifest['ages_minutes']}`", + f"- regressor_kind: `{manifest['regressor_kind']}`", + f"- huber_max_iter: `{manifest['huber_max_iter']}`", + f"- regression_target_clip_bps: `{manifest['regression_target_clip_bps']}`", + f"- continue_horizon_minutes: `{manifest['continue_horizon_minutes']}`", + f"- min_continue_edge_bps: `{manifest['min_continue_edge_bps']}`", "", "## Baseline run-10 Continue", "", @@ -427,6 +647,17 @@ def _report(args: Any, baseline_root: Path, manifest: dict[str, Any], results: d "", "状态特征只有在 `market_plus_state` 同时好过 `market_only`,并且 validation_locked / latest_stress 没有反向变差时,才进入正式链路。", "", + "## Verdict", + "", + f"- status: `{verdict['status']}`", + f"- reasons: `{len(verdict['reasons'])}`", + "", ] ) + for reason in verdict["reasons"]: + lines.append(f"- {reason}") + if verdict["passed_checks"]: + lines.extend(["", "## Passed Checks", ""]) + for item in verdict["passed_checks"]: + lines.append(f"- {item}") return "\n".join(lines)