from __future__ import annotations import json import logging from pathlib import Path from typing import Any import numpy as np import pandas as pd from sklearn.linear_model import HuberRegressor, LogisticRegression from sklearn.metrics import accuracy_score, brier_score_loss, log_loss, mean_absolute_error, roc_auc_score from sklearn.preprocessing import StandardScaler from trader_training.io_utils import ( DEFAULT_RAW_ROOT, read_json, read_parquet, run_root, sha256_json, to_utc_series, write_json, write_parquet, write_text, ) from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT OFI_METHOD = "SNAPSHOT_DIFF_V1" BOOK_OFI_FEATURES = [ "ofi_l1_1m", "ofi_l1_3m", "ofi_l1_5m", "ofi_l1_15m", "mlofi_l5_1m", "mlofi_l5_5m", "mlofi_l20_1m", "mlofi_l20_5m", "mlofi_l5_l20_gap_1m", "microprice_basis_change_1m_bps", "microprice_basis_change_5m_bps", "ofi_l1_5m_zscore_240m", "mlofi_l20_5m_zscore_240m", ] CROSS_OFI_FEATURES = [ "ofi_l1_5m_clipped", "ofi_l1_taker_5m", "ofi_l1_spread_rank_5m", ] OFI_FEATURES = [*BOOK_OFI_FEATURES, *CROSS_OFI_FEATURES] META_COLUMNS = [ "sample_id", "symbol", "event_time", "open_time_ms", "split_id", "walk_forward_fold", "data_quality_flag", ] ALL_SPLITS = (FIT_SPLIT, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) EVAL_SPLITS = (TUNE_SPLIT, VALIDATION_LOCKED_SPLIT, LATEST_STRESS_SPLIT) def run_ofi_feature_experiment(args: Any) -> None: root = run_root(args) baseline_root = args.data_root / "trader-v4" / "runs" / args.baseline_run_id out_dir = root / "experiments" / "ofi_b2_mlofi" raw_root = Path(args.raw_root or DEFAULT_RAW_ROOT) logging.info( "trader.training.ofi_experiment_started runId=%s baselineRunId=%s rawRoot=%s", args.run_id, args.baseline_run_id, raw_root, ) feature = _load_baseline_feature_frame(baseline_root) ofi_delta = build_snapshot_diff_l1_ofi_features(raw_root, feature[["symbol", "event_time", "open_time_ms"]]) dataset = _merge_feature_delta(feature, ofi_delta) if args.max_rows_per_split: dataset = _cap_rows_per_split(dataset, int(args.max_rows_per_split)) delta_hash = write_parquet(out_dir / "ofi_feature_delta.parquet", ofi_delta) dataset_hash = write_parquet(out_dir / "ofi_experiment_feature_frame.parquet", dataset) write_json(out_dir / "ofi_feature_order.json", OFI_FEATURES) write_json(out_dir / "ofi_feature_schema.json", _ofi_feature_schema()) write_json(out_dir / "experiment_manifest.json", _experiment_manifest(args, baseline_root, raw_root, ofi_delta, dataset, delta_hash, dataset_hash)) write_text(out_dir / "feature_delta_report.md", _feature_delta_report(ofi_delta, dataset)) direction = _load_direction_dataset(baseline_root, dataset) entry = _load_entry_dataset(baseline_root, dataset) results: dict[str, Any] = {} prediction_frames: list[pd.DataFrame] = [] for feature_set_name, columns in _feature_sets().items(): direction_result, direction_predictions = _train_direction(direction, columns) entry_result, entry_predictions = _train_entry(entry, columns) results[feature_set_name] = {"DIRECTION": direction_result, "ENTRY": entry_result} direction_predictions["model"] = "DIRECTION" direction_predictions["feature_set"] = feature_set_name entry_predictions["model"] = "ENTRY" entry_predictions["feature_set"] = feature_set_name prediction_frames.extend([direction_predictions, entry_predictions]) logging.info( "trader.training.ofi_feature_set_trained runId=%s featureSet=%s directionTuneShortAuc=%s entryTuneShortAuc=%s", args.run_id, feature_set_name, direction_result.get(TUNE_SPLIT, {}).get("short_auc"), entry_result.get("short_entry_prob", {}).get(TUNE_SPLIT, {}).get("auc"), ) predictions = pd.concat(prediction_frames, ignore_index=True) if prediction_frames else pd.DataFrame() write_parquet(out_dir / "direction_entry_predictions.parquet", predictions) write_json(out_dir / "ofi_experiment_result.json", results) write_text(out_dir / "model_compare_to_run10.md", _model_compare_report(args, baseline_root, results, dataset)) write_text(out_dir / "backtest_compare_to_run10.md", _backtest_placeholder_report(args, baseline_root)) write_text(out_dir / "contract_change_report.md", _contract_change_report()) write_text(out_dir / "failure_cases_compare.md", _failure_case_placeholder_report(args)) logging.info("trader.training.ofi_experiment_finished runId=%s report=%s", args.run_id, out_dir / "model_compare_to_run10.md") def _load_baseline_feature_frame(baseline_root: Path) -> pd.DataFrame: frame = read_parquet(baseline_root / "feature" / "feature_frame.parquet") required = set(META_COLUMNS + FEATURE_ORDER) missing = sorted(required.difference(frame.columns)) if missing: raise ValueError(f"baseline feature frame missing columns: {missing}") frame = frame[frame["data_quality_flag"].isin(["OK", "PARTIAL_OPTIONAL"])].copy() frame = frame[frame["split_id"].isin(ALL_SPLITS)].copy() frame["event_time"] = to_utc_series(frame["event_time"]) logging.info("trader.training.ofi_baseline_feature_loaded rowCount=%s splitCounts=%s", len(frame), frame["split_id"].value_counts().to_dict()) return frame def build_snapshot_diff_l1_ofi_features(raw_root: Path, replay_keys: pd.DataFrame) -> pd.DataFrame: if replay_keys.empty: return pd.DataFrame(columns=["symbol", "open_time_ms", *BOOK_OFI_FEATURES]) keys = replay_keys.copy() keys["event_time"] = to_utc_series(keys["event_time"]) keys["event_date"] = keys["event_time"].dt.strftime("%Y-%m-%d") frames: list[pd.DataFrame] = [] for (symbol, event_date), _ in keys.groupby(["symbol", "event_date"], sort=True, observed=False): path = raw_root / "table=book" / "exchange=BINANCE_FUTURES" / f"symbol={symbol}" / f"dt={event_date}" / "data.parquet" if not path.is_file(): logging.warning("trader.training.ofi_book_partition_missing symbol=%s eventDate=%s path=%s", symbol, event_date, path) continue day = _read_l1_book_day(path, symbol) if not day.empty: frames.append(day) logging.info("trader.training.ofi_book_partition_loaded symbol=%s eventDate=%s minuteRows=%s path=%s", symbol, event_date, len(day), path) if not frames: raise ValueError(f"no book partitions loaded from {raw_root}") minute_book = pd.concat(frames, ignore_index=True).sort_values(["symbol", "open_time_ms"]) minute_book = minute_book.drop_duplicates(["symbol", "open_time_ms"], keep="last").reset_index(drop=True) feature_frames = [] for symbol, group in minute_book.groupby("symbol", sort=False, observed=False): feature_frames.append(_compute_l1_ofi_for_symbol(group, str(symbol))) features = pd.concat(feature_frames, ignore_index=True) if feature_frames else pd.DataFrame(columns=["symbol", "open_time_ms", *BOOK_OFI_FEATURES]) wanted = keys[["symbol", "open_time_ms"]].drop_duplicates() out = wanted.merge(features, on=["symbol", "open_time_ms"], how="left") logging.info( "trader.training.ofi_features_built wantedRows=%s matchedRows=%s featureRows=%s", len(wanted), int(out[BOOK_OFI_FEATURES].notna().all(axis=1).sum()), len(features), ) return out def _read_l1_book_day(path: Path, symbol: str) -> pd.DataFrame: columns = ["origin_time"] for side in ("bid", "ask"): for level in range(20): columns.extend([f"{side}_{level}_price", f"{side}_{level}_size"]) book = pd.read_parquet(path, columns=columns) if book.empty: return pd.DataFrame(columns=["symbol", "open_time_ms", *columns[1:]]) required = ["origin_time", "bid_0_price", "bid_0_size", "ask_0_price", "ask_0_size"] book = book.dropna(subset=required).copy() book["origin_time"] = to_utc_series(book["origin_time"]) book["minute"] = book["origin_time"].dt.floor("min") book = book.sort_values("origin_time").drop_duplicates("minute", keep="last") if book.empty: return pd.DataFrame(columns=["symbol", "open_time_ms", *columns[1:]]) out = pd.DataFrame({"symbol": symbol, "open_time_ms": (book["minute"].astype("int64") // 1_000_000).astype("int64")}) for column in columns[1:]: out[column] = pd.to_numeric(book[column], errors="coerce").astype("float64") return out def _compute_l1_ofi_for_symbol(book: pd.DataFrame, symbol: str) -> pd.DataFrame: group = book.sort_values("open_time_ms").reset_index(drop=True).copy() gap = group["open_time_ms"].astype("int64").diff().ne(60_000) segment = gap.cumsum() level_ofi = [] for level in range(20): prev_bid_price = group.groupby(segment, sort=False)[f"bid_{level}_price"].shift(1) prev_bid_size = group.groupby(segment, sort=False)[f"bid_{level}_size"].shift(1) prev_ask_price = group.groupby(segment, sort=False)[f"ask_{level}_price"].shift(1) prev_ask_size = group.groupby(segment, sort=False)[f"ask_{level}_size"].shift(1) bid_part, ask_part = l1_snapshot_diff_ofi_quote( group[f"bid_{level}_price"], group[f"bid_{level}_size"], group[f"ask_{level}_price"], group[f"ask_{level}_size"], prev_bid_price, prev_bid_size, prev_ask_price, prev_ask_size, ) level_ofi.append((bid_part + ask_part).rename(f"ofi_level_{level}_quote")) level_frame = pd.concat(level_ofi, axis=1) group["ofi_l1_event_quote"] = level_frame["ofi_level_0_quote"] group["mlofi_l5_event_quote"] = level_frame[[f"ofi_level_{level}_quote" for level in range(5)]].sum(axis=1, min_count=5) group["mlofi_l20_event_quote"] = level_frame.sum(axis=1, min_count=20) group["l1_depth_quote"] = _depth_quote(group, 1) group["l5_depth_quote"] = _depth_quote(group, 5) group["l20_depth_quote"] = _depth_quote(group, 20) for window in (1, 3, 5, 15): group[f"ofi_l1_{window}m"] = _rolling_normalized(group, segment, "ofi_l1_event_quote", "l1_depth_quote", window) for window in (1, 5): group[f"mlofi_l5_{window}m"] = _rolling_normalized(group, segment, "mlofi_l5_event_quote", "l5_depth_quote", window) group[f"mlofi_l20_{window}m"] = _rolling_normalized(group, segment, "mlofi_l20_event_quote", "l20_depth_quote", window) group["mlofi_l5_l20_gap_1m"] = group["mlofi_l5_1m"] - group["mlofi_l20_1m"] mid = (group["bid_0_price"] + group["ask_0_price"]) / 2.0 microprice = (group["ask_0_price"] * group["bid_0_size"] + group["bid_0_price"] * group["ask_0_size"]) / (group["bid_0_size"] + group["ask_0_size"]).clip(lower=1e-12) group["microprice_basis_bps"] = (microprice / mid - 1.0) * 10000.0 group["microprice_basis_change_1m_bps"] = group.groupby(segment, sort=False)["microprice_basis_bps"].diff(1) group["microprice_basis_change_5m_bps"] = group.groupby(segment, sort=False)["microprice_basis_bps"].diff(5) group["ofi_l1_5m_zscore_240m"] = _rolling_zscore(group, segment, "ofi_l1_5m", 240) group["mlofi_l20_5m_zscore_240m"] = _rolling_zscore(group, segment, "mlofi_l20_5m", 240) out = group[["symbol", "open_time_ms", *BOOK_OFI_FEATURES]].replace([np.inf, -np.inf], np.nan) for column in BOOK_OFI_FEATURES: out[column] = pd.to_numeric(out[column], errors="coerce").astype("float32") logging.info( "trader.training.ofi_symbol_features_built symbol=%s minuteRows=%s fullFeatureRows=%s", symbol, len(out), int(out[BOOK_OFI_FEATURES].notna().all(axis=1).sum()), ) return out def _depth_quote(group: pd.DataFrame, level_count: int) -> pd.Series: total = pd.Series(0.0, index=group.index, dtype="float64") for level in range(level_count): total = total + group[f"bid_{level}_price"] * group[f"bid_{level}_size"] + group[f"ask_{level}_price"] * group[f"ask_{level}_size"] return total def _rolling_normalized(group: pd.DataFrame, segment: pd.Series, event_column: str, depth_column: str, window: int) -> pd.Series: summed = group.groupby(segment, sort=False)[event_column].rolling(window, min_periods=window).sum().reset_index(level=0, drop=True) averaged_depth = group.groupby(segment, sort=False)[depth_column].rolling(window, min_periods=window).mean().reset_index(level=0, drop=True) return summed / averaged_depth.clip(lower=1e-12) def _rolling_zscore(group: pd.DataFrame, segment: pd.Series, column: str, window: int) -> pd.Series: rolling = group.groupby(segment, sort=False)[column].rolling(window, min_periods=window) mean = rolling.mean().reset_index(level=0, drop=True) std = rolling.std().reset_index(level=0, drop=True).replace(0, np.nan) return (group[column] - mean) / std def l1_snapshot_diff_ofi_quote( bid_price: pd.Series, bid_size: pd.Series, ask_price: pd.Series, ask_size: pd.Series, prev_bid_price: pd.Series, prev_bid_size: pd.Series, prev_ask_price: pd.Series, prev_ask_size: pd.Series, ) -> tuple[pd.Series, pd.Series]: bid = pd.to_numeric(bid_price, errors="coerce") bid_sz = pd.to_numeric(bid_size, errors="coerce") ask = pd.to_numeric(ask_price, errors="coerce") ask_sz = pd.to_numeric(ask_size, errors="coerce") prev_bid = pd.to_numeric(prev_bid_price, errors="coerce") prev_bid_sz = pd.to_numeric(prev_bid_size, errors="coerce") prev_ask = pd.to_numeric(prev_ask_price, errors="coerce") prev_ask_sz = pd.to_numeric(prev_ask_size, errors="coerce") valid = prev_bid.notna() & prev_bid_sz.notna() & prev_ask.notna() & prev_ask_sz.notna() bid_part = pd.Series(np.nan, index=bid.index, dtype="float64") ask_part = pd.Series(np.nan, index=ask.index, dtype="float64") bid_up = valid & bid.gt(prev_bid) bid_same = valid & bid.eq(prev_bid) bid_down = valid & bid.lt(prev_bid) bid_part.loc[bid_up] = bid_sz.loc[bid_up] * bid.loc[bid_up] bid_part.loc[bid_same] = (bid_sz.loc[bid_same] - prev_bid_sz.loc[bid_same]) * bid.loc[bid_same] bid_part.loc[bid_down] = -prev_bid_sz.loc[bid_down] * prev_bid.loc[bid_down] ask_down = valid & ask.lt(prev_ask) ask_same = valid & ask.eq(prev_ask) ask_up = valid & ask.gt(prev_ask) ask_part.loc[ask_down] = -ask_sz.loc[ask_down] * ask.loc[ask_down] ask_part.loc[ask_same] = -(ask_sz.loc[ask_same] - prev_ask_sz.loc[ask_same]) * ask.loc[ask_same] ask_part.loc[ask_up] = prev_ask_sz.loc[ask_up] * prev_ask.loc[ask_up] return bid_part, ask_part def _merge_feature_delta(feature: pd.DataFrame, delta: pd.DataFrame) -> pd.DataFrame: merged = feature.merge(delta, on=["symbol", "open_time_ms"], how="left") merged["ofi_l1_5m_clipped"] = pd.to_numeric(merged["ofi_l1_5m"], errors="coerce").clip(-5.0, 5.0) merged["ofi_l1_taker_5m"] = merged["ofi_l1_5m_clipped"] * pd.to_numeric(merged["taker_imbalance_5m"], errors="coerce") merged["ofi_l1_spread_rank_5m"] = merged["ofi_l1_5m_clipped"] * pd.to_numeric(merged["spread_rank_24h_pct"], errors="coerce") before = len(merged) merged = merged.dropna(subset=OFI_FEATURES).copy() logging.info( "trader.training.ofi_feature_delta_merged rowBefore=%s rowAfter=%s droppedRows=%s splitCounts=%s", before, len(merged), before - len(merged), merged["split_id"].value_counts().to_dict(), ) if merged.empty: raise ValueError("OFI feature experiment has no rows after merging feature delta") return merged def _cap_rows_per_split(frame: pd.DataFrame, max_rows_per_split: int) -> pd.DataFrame: capped = [] for split_id, part in frame.sort_values("event_time").groupby("split_id", sort=False, observed=False): if len(part) > max_rows_per_split: part = part.tail(max_rows_per_split).copy() capped.append(part) logging.info("trader.training.ofi_split_capped splitId=%s rowCount=%s maxRows=%s", split_id, len(part), max_rows_per_split) return pd.concat(capped, ignore_index=True) def _load_direction_dataset(baseline_root: Path, feature: pd.DataFrame) -> pd.DataFrame: labels = read_parquet(baseline_root / "label" / "direction_labels.parquet") required = {"sample_id", "long_target", "short_target", "neutral_target", "future_return_bps"} missing = sorted(required.difference(labels.columns)) if missing: raise ValueError(f"direction labels missing columns: {missing}") dataset = feature.merge(labels[list(required)], on="sample_id", how="inner") logging.info("trader.training.ofi_direction_dataset_loaded rowCount=%s", len(dataset)) return dataset def _load_entry_dataset(baseline_root: Path, feature: pd.DataFrame) -> pd.DataFrame: labels = read_parquet(baseline_root / "label" / "entry_labels.parquet") required = {"sample_id", "side", "entry_target", "expected_net_edge_bps"} missing = sorted(required.difference(labels.columns)) if missing: raise ValueError(f"entry labels missing columns: {missing}") long = labels[labels["side"].eq("LONG")][["sample_id", "entry_target", "expected_net_edge_bps"]].rename( columns={"entry_target": "long_entry_target", "expected_net_edge_bps": "long_expected_net_edge_bps"} ) short = labels[labels["side"].eq("SHORT")][["sample_id", "entry_target", "expected_net_edge_bps"]].rename( columns={"entry_target": "short_entry_target", "expected_net_edge_bps": "short_expected_net_edge_bps"} ) pivot = long.merge(short, on="sample_id", how="inner") dataset = feature.merge(pivot, on="sample_id", how="inner") logging.info("trader.training.ofi_entry_dataset_loaded rowCount=%s", len(dataset)) return dataset def _feature_sets() -> dict[str, list[str]]: return { "market_only": FEATURE_ORDER, "market_plus_ofi": [*FEATURE_ORDER, *OFI_FEATURES], } def _train_direction(frame: pd.DataFrame, feature_columns: list[str]) -> tuple[dict[str, Any], pd.DataFrame]: train = frame[frame["split_id"].eq(FIT_SPLIT)].copy() if train.empty: raise ValueError("direction experiment has no fit_inner rows") scaler = StandardScaler() x_train = scaler.fit_transform(train[feature_columns].astype("float32")) y_train = train[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1) model = LogisticRegression(max_iter=500) model.fit(x_train, y_train) train_prior = train[["long_target", "short_target", "neutral_target"]].to_numpy(dtype=float).mean(axis=0) metrics: dict[str, Any] = {"feature_count": len(feature_columns), "feature_hash": sha256_json(feature_columns)} prediction_frames = [] for split_id in ALL_SPLITS: part = frame[frame["split_id"].eq(split_id)].copy() if part.empty: continue x = scaler.transform(part[feature_columns].astype("float32")) proba = model.predict_proba(x) y = part[["long_target", "short_target", "neutral_target"]].to_numpy().argmax(axis=1) metrics[split_id] = _direction_metrics(y, proba, train_prior) pred = part[["sample_id", "symbol", "event_time", "split_id"]].copy() pred["long_prob"] = proba[:, 0].astype("float32") pred["short_prob"] = proba[:, 1].astype("float32") pred["neutral_prob"] = proba[:, 2].astype("float32") pred["label_long"] = (y == 0).astype("int8") pred["label_short"] = (y == 1).astype("int8") pred["label_neutral"] = (y == 2).astype("int8") prediction_frames.append(pred) return metrics, pd.concat(prediction_frames, ignore_index=True) def _train_entry(frame: pd.DataFrame, feature_columns: list[str]) -> tuple[dict[str, Any], pd.DataFrame]: train = frame[frame["split_id"].eq(FIT_SPLIT)].copy() if train.empty: raise ValueError("entry experiment has no fit_inner rows") scaler = StandardScaler() x_train = scaler.fit_transform(train[feature_columns].astype("float32")) x_by_split = { split_id: scaler.transform(frame[frame["split_id"].eq(split_id)][feature_columns].astype("float32")) for split_id in ALL_SPLITS if not frame[frame["split_id"].eq(split_id)].empty } specs = [ ("long_entry_prob", "binary", "long_entry_target"), ("short_entry_prob", "binary", "short_entry_target"), ("long_expected_net_edge_bps", "regression", "long_expected_net_edge_bps"), ("short_expected_net_edge_bps", "regression", "short_expected_net_edge_bps"), ] results: dict[str, Any] = {"feature_count": len(feature_columns), "feature_hash": sha256_json(feature_columns)} split_predictions: dict[str, pd.DataFrame] = { split_id: frame[frame["split_id"].eq(split_id)][["sample_id", "symbol", "event_time", "split_id"]].copy().reset_index(drop=True) for split_id in x_by_split } for name, kind, target in specs: y_train = pd.to_numeric(train[target], errors="coerce").fillna(0.0).to_numpy() if kind == "binary": model = LogisticRegression(max_iter=500) model.fit(x_train, y_train.astype(int)) else: model = HuberRegressor(alpha=0.001, epsilon=1.35, max_iter=500) model.fit(x_train, y_train.astype(float)) results[name] = {} for split_id, x in x_by_split.items(): part = frame[frame["split_id"].eq(split_id)].copy() y = pd.to_numeric(part[target], errors="coerce").fillna(0.0).to_numpy() if kind == "binary": pred = model.predict_proba(x)[:, 1] results[name][split_id] = _binary_metrics(y_train.astype(int), y.astype(int), pred) else: pred = model.predict(x) results[name][split_id] = _regression_metrics(y_train.astype(float), y.astype(float), pred) split_predictions[split_id][name] = pred.astype("float32") split_predictions[split_id][f"label_{target}"] = y return results, pd.concat(split_predictions.values(), ignore_index=True) def _direction_metrics(y: np.ndarray, proba: np.ndarray, train_prior: np.ndarray) -> dict[str, Any]: labels = [0, 1, 2] train_prior = np.asarray(train_prior, dtype=float) train_prior = train_prior / train_prior.sum() if train_prior.sum() > 0 else np.full(3, 1.0 / 3.0) constant = np.tile(train_prior.reshape(1, -1), (len(y), 1)) one_hot = np.eye(3, dtype=float)[y] clipped = _clip_normalize(proba) constant_clipped = _clip_normalize(constant) out: dict[str, Any] = { "row_count": int(len(y)), "accuracy": float(accuracy_score(y, proba.argmax(axis=1))), "logloss": float(log_loss(y, clipped, labels=labels)), "constant_logloss": float(log_loss(y, constant_clipped, labels=labels)), "brier_multiclass": float(np.mean(np.sum((one_hot - proba) ** 2, axis=1))), "constant_brier_multiclass": float(np.mean(np.sum((one_hot - constant) ** 2, axis=1))), } for idx, name in enumerate(("long", "short", "neutral")): target = (y == idx).astype(int) if target.sum() >= 200 and (len(target) - target.sum()) >= 200: out[f"{name}_auc"] = float(roc_auc_score(target, proba[:, idx])) top_count = max(1, int(len(y) * 0.10)) top_idx = np.argsort(proba.max(axis=1))[-top_count:] out["top10_hit_rate"] = float((proba.argmax(axis=1)[top_idx] == y[top_idx]).mean()) out["all_hit_rate"] = float((proba.argmax(axis=1) == y).mean()) out["logloss_vs_constant_ratio"] = float(out["logloss"] / out["constant_logloss"]) if out["constant_logloss"] > 0 else None out["brier_vs_constant_ratio"] = float(out["brier_multiclass"] / out["constant_brier_multiclass"]) if out["constant_brier_multiclass"] > 0 else None return out def _binary_metrics(y_train: np.ndarray, y: np.ndarray, proba: np.ndarray) -> dict[str, Any]: train_rate = float(np.mean(y_train)) constant = np.full(len(y), train_rate) out: dict[str, Any] = { "row_count": int(len(y)), "positive_rate": float(np.mean(y)) if len(y) else 0.0, "brier": float(brier_score_loss(y, proba)), "constant_brier": float(brier_score_loss(y, constant)), } if len(np.unique(y)) == 2: out["auc"] = float(roc_auc_score(y, proba)) top_count = max(1, int(len(y) * 0.10)) top_idx = np.argsort(proba)[-top_count:] out["top10_hit_rate"] = float(np.mean(y[top_idx])) out["all_hit_rate"] = float(np.mean(y)) out["brier_vs_constant_ratio"] = float(out["brier"] / out["constant_brier"]) if out["constant_brier"] > 0 else None return out def _regression_metrics(y_train: np.ndarray, y: np.ndarray, pred: np.ndarray) -> dict[str, Any]: train_median = float(np.median(y_train)) if len(y_train) else 0.0 constant = np.full(len(y), train_median) mae = float(mean_absolute_error(y, pred)) constant_mae = float(mean_absolute_error(y, constant)) return { "row_count": int(len(y)), "mae": mae, "constant_mae": constant_mae, "mae_vs_constant_ratio": float(mae / constant_mae) if constant_mae > 0 else None, "train_target_median": train_median, } def _clip_normalize(values: np.ndarray) -> np.ndarray: values = np.clip(np.asarray(values, dtype=float), 1e-6, 1.0) return values / values.sum(axis=1, keepdims=True) def _experiment_manifest( args: Any, baseline_root: Path, raw_root: Path, ofi_delta: pd.DataFrame, dataset: pd.DataFrame, delta_hash: str, dataset_hash: str, ) -> dict[str, Any]: return { "experiment": "ofi_l1_microprice_diagnostic_v1", "run_id": args.run_id, "baseline_run_id": args.baseline_run_id, "baseline_root": str(baseline_root), "raw_root": str(raw_root), "ofi_method": OFI_METHOD, "uses_event_stream_ofi": False, "normalization": "quote_notional_over_average_l1_depth_quote", "new_features": OFI_FEATURES, "formal_model_contract_changed": False, "java_contract_changed": False, "label_changed": False, "pm_threshold_changed": False, "delta_row_count": int(len(ofi_delta)), "trainable_row_count": int(len(dataset)), "split_counts": dataset["split_id"].value_counts().to_dict(), "ofi_delta_hash_sha256": delta_hash, "ofi_experiment_feature_frame_hash_sha256": dataset_hash, } def _ofi_feature_schema() -> list[dict[str, Any]]: rows = [ { "name": "ofi_l1_1m", "meaning": "买一卖一盘口变化强度,1分钟窗口", "unit": "ratio", "source": "Crypto Lake book snapshot", "window": "1m", "formula": "sum(l1 snapshot-diff quote OFI) / avg(l1 depth quote)", "ofi_method": OFI_METHOD, "null_handling": "first minute or gap warmup -> drop in experiment", "order": 1, }, { "name": "ofi_l1_3m", "meaning": "买一卖一盘口变化强度,3分钟窗口", "unit": "ratio", "source": "Crypto Lake book snapshot", "window": "3m", "formula": "sum(l1 snapshot-diff quote OFI over last 3 closed minutes) / avg(l1 depth quote)", "ofi_method": OFI_METHOD, "null_handling": "window warmup or gap warmup -> drop in experiment", "order": 2, }, { "name": "ofi_l1_5m", "meaning": "买一卖一盘口变化强度,5分钟窗口", "unit": "ratio", "source": "Crypto Lake book snapshot", "window": "5m", "formula": "sum(l1 snapshot-diff quote OFI over last 5 closed minutes) / avg(l1 depth quote)", "ofi_method": OFI_METHOD, "null_handling": "window warmup or gap warmup -> drop in experiment", "order": 3, }, { "name": "ofi_l1_15m", "meaning": "买一卖一盘口变化强度,15分钟窗口", "unit": "ratio", "source": "Crypto Lake book snapshot", "window": "15m", "formula": "sum(l1 snapshot-diff quote OFI over last 15 closed minutes) / avg(l1 depth quote)", "ofi_method": OFI_METHOD, "null_handling": "window warmup or gap warmup -> drop in experiment", "order": 4, }, { "name": "microprice_basis_change_1m_bps", "meaning": "微价格偏离值的1分钟变化", "unit": "bps", "source": "Crypto Lake book snapshot", "window": "1m", "formula": "microprice_basis_bps(t) - microprice_basis_bps(t-1m)", "ofi_method": OFI_METHOD, "null_handling": "first minute or gap warmup -> drop in experiment", "order": 5, }, { "name": "microprice_basis_change_5m_bps", "meaning": "微价格偏离值的5分钟变化", "unit": "bps", "source": "Crypto Lake book snapshot", "window": "5m", "formula": "microprice_basis_bps(t) - microprice_basis_bps(t-5m)", "ofi_method": OFI_METHOD, "null_handling": "window warmup or gap warmup -> drop in experiment", "order": 6, }, ] rows.extend( [ { "name": "mlofi_l5_1m", "meaning": "前5档盘口变化强度,1分钟窗口", "unit": "ratio", "source": "Crypto Lake book snapshot", "window": "1m", "formula": "sum(level0..4 snapshot-diff quote OFI) / avg(level0..4 depth quote)", "ofi_method": OFI_METHOD, "null_handling": "first minute or gap warmup -> drop in experiment", "order": 7, }, { "name": "mlofi_l5_5m", "meaning": "前5档盘口变化强度,5分钟窗口", "unit": "ratio", "source": "Crypto Lake book snapshot", "window": "5m", "formula": "sum(level0..4 snapshot-diff quote OFI over last 5 closed minutes) / avg(level0..4 depth quote)", "ofi_method": OFI_METHOD, "null_handling": "window warmup or gap warmup -> drop in experiment", "order": 8, }, { "name": "mlofi_l20_1m", "meaning": "前20档盘口变化强度,1分钟窗口", "unit": "ratio", "source": "Crypto Lake book snapshot", "window": "1m", "formula": "sum(level0..19 snapshot-diff quote OFI) / avg(level0..19 depth quote)", "ofi_method": OFI_METHOD, "null_handling": "first minute or gap warmup -> drop in experiment", "order": 9, }, { "name": "mlofi_l20_5m", "meaning": "前20档盘口变化强度,5分钟窗口", "unit": "ratio", "source": "Crypto Lake book snapshot", "window": "5m", "formula": "sum(level0..19 snapshot-diff quote OFI over last 5 closed minutes) / avg(level0..19 depth quote)", "ofi_method": OFI_METHOD, "null_handling": "window warmup or gap warmup -> drop in experiment", "order": 10, }, { "name": "mlofi_l5_l20_gap_1m", "meaning": "近档盘口变化和深档盘口变化的差", "unit": "ratio", "source": "Crypto Lake book snapshot", "window": "1m", "formula": "mlofi_l5_1m - mlofi_l20_1m", "ofi_method": OFI_METHOD, "null_handling": "dependency missing -> drop in experiment", "order": 11, }, { "name": "ofi_l1_5m_zscore_240m", "meaning": "L1 OFI 5分钟值相对最近240分钟的异常程度", "unit": "zscore", "source": "Crypto Lake book snapshot", "window": "240m", "formula": "(ofi_l1_5m - rolling_mean_240m) / rolling_std_240m", "ofi_method": OFI_METHOD, "null_handling": "window warmup or gap warmup -> drop in experiment", "order": 12, }, { "name": "mlofi_l20_5m_zscore_240m", "meaning": "L20 多层 OFI 5分钟值相对最近240分钟的异常程度", "unit": "zscore", "source": "Crypto Lake book snapshot", "window": "240m", "formula": "(mlofi_l20_5m - rolling_mean_240m) / rolling_std_240m", "ofi_method": OFI_METHOD, "null_handling": "window warmup or gap warmup -> drop in experiment", "order": 13, }, { "name": "ofi_l1_5m_clipped", "meaning": "截尾后的 L1 OFI 5分钟值", "unit": "ratio", "source": "derived from ofi_l1_5m", "window": "5m", "formula": "clip(ofi_l1_5m, -5, 5)", "ofi_method": OFI_METHOD, "null_handling": "dependency missing -> drop in experiment", "order": 14, }, { "name": "ofi_l1_taker_5m", "meaning": "L1 OFI 和5分钟主动成交是否同向", "unit": "ratio", "source": "book + trades", "window": "5m", "formula": "ofi_l1_5m_clipped * taker_imbalance_5m", "ofi_method": OFI_METHOD, "null_handling": "dependency missing -> drop in experiment", "order": 15, }, { "name": "ofi_l1_spread_rank_5m", "meaning": "L1 OFI 在高价差环境下的强度", "unit": "ratio", "source": "book + level_1", "window": "5m", "formula": "ofi_l1_5m_clipped * spread_rank_24h_pct", "ofi_method": OFI_METHOD, "null_handling": "dependency missing -> drop in experiment", "order": 16, }, ] ) return rows def _feature_delta_report(ofi_delta: pd.DataFrame, dataset: pd.DataFrame) -> str: rows = [] for feature in OFI_FEATURES: series = pd.to_numeric(dataset[feature], errors="coerce") q = series.quantile([0.01, 0.5, 0.99]) rows.append( { "feature": feature, "null_in_delta": int(ofi_delta[feature].isna().sum()) if feature in ofi_delta.columns else "derived_after_merge", "trainable_null": int(series.isna().sum()), "p01": round(float(q.loc[0.01]), 6), "p50": round(float(q.loc[0.5]), 6), "p99": round(float(q.loc[0.99]), 6), } ) lines = [ "# OFI Feature Delta Report", "", f"- ofi_method: `{OFI_METHOD}`", f"- delta_rows: `{len(ofi_delta)}`", f"- trainable_rows_after_drop: `{len(dataset)}`", f"- split_counts: `{dataset['split_id'].value_counts().to_dict()}`", "", "| feature | null_in_delta | trainable_null | p01 | p50 | p99 |", "| --- | ---: | ---: | ---: | ---: | ---: |", ] for row in rows: lines.append(f"| {row['feature']} | {row['null_in_delta']} | {row['trainable_null']} | {row['p01']} | {row['p50']} | {row['p99']} |") lines.extend( [ "", "## Leakage Check", "", "- 只用当前分钟及之前的 book 快照。", "- 第一条快照和断档后的窗口不补 0,直接作为 warmup 丢掉。", "- 分子和分母都使用 quote 金额口径。", "", ] ) return "\n".join(lines) def _model_compare_report(args: Any, baseline_root: Path, results: dict[str, Any], dataset: pd.DataFrame) -> str: baseline = read_json(baseline_root / "model" / "model_train_manifest.json") baseline_direction = baseline["DIRECTION"]["metrics"]["direction"] baseline_entry = baseline["ENTRY"]["metrics"] lines = [ "# OFI Model Compare To Run10", "", f"- run_id: `{args.run_id}`", f"- baseline_run_id: `{args.baseline_run_id}`", f"- ofi_method: `{OFI_METHOD}`", f"- rows: `{len(dataset)}`", "", "## Run10 Tune Baseline", "", "| model | metric | value |", "| --- | --- | ---: |", f"| Direction | long_auc | {baseline_direction.get('long_auc')} |", f"| Direction | short_auc | {baseline_direction.get('short_auc')} |", f"| Direction | neutral_auc | {baseline_direction.get('neutral_auc')} |", f"| Entry | long_auc | {baseline_entry['long_entry_prob'].get('auc')} |", f"| Entry | short_auc | {baseline_entry['short_entry_prob'].get('auc')} |", f"| Entry | long_edge_mae_ratio | {baseline_entry['long_expected_net_edge_bps'].get('mae_vs_constant_ratio')} |", f"| Entry | short_edge_mae_ratio | {baseline_entry['short_expected_net_edge_bps'].get('mae_vs_constant_ratio')} |", "", "## Diagnostic Direction Result", "", "| feature_set | split | long_auc | short_auc | neutral_auc | logloss_ratio | top10_hit_rate |", "| --- | --- | ---: | ---: | ---: | ---: | ---: |", ] for feature_set_name, payload in results.items(): direction = payload["DIRECTION"] for split_id in EVAL_SPLITS: metric = direction.get(split_id, {}) lines.append( f"| {feature_set_name} | {split_id} | {metric.get('long_auc')} | {metric.get('short_auc')} | {metric.get('neutral_auc')} | {metric.get('logloss_vs_constant_ratio')} | {metric.get('top10_hit_rate')} |" ) lines.extend( [ "", "## Diagnostic Entry Result", "", "| head | feature_set | split | auc/mae_ratio | brier_ratio | top10_hit_rate |", "| --- | --- | --- | ---: | ---: | ---: |", ] ) for feature_set_name, payload in results.items(): entry = payload["ENTRY"] for head in ("long_entry_prob", "short_entry_prob"): for split_id in EVAL_SPLITS: metric = entry.get(head, {}).get(split_id, {}) lines.append( f"| {head} | {feature_set_name} | {split_id} | {metric.get('auc')} | {metric.get('brier_vs_constant_ratio')} | {metric.get('top10_hit_rate')} |" ) for head in ("long_expected_net_edge_bps", "short_expected_net_edge_bps"): for split_id in EVAL_SPLITS: metric = entry.get(head, {}).get(split_id, {}) lines.append(f"| {head} | {feature_set_name} | {split_id} | {metric.get('mae_vs_constant_ratio')} | | |") lines.extend( [ "", "## Verdict Rule", "", "只有 `market_plus_ofi` 在 validation_locked 和 latest_stress 上同时好过 `market_only`,才进入正式特征链路。", "", ] ) return "\n".join(lines) def _backtest_placeholder_report(args: Any, baseline_root: Path) -> str: return "\n".join( [ "# Backtest Compare To Run10", "", f"- run_id: `{args.run_id}`", f"- baseline_run_id: `{args.baseline_run_id}`", "", "本轮是 Direction / Entry 特征诊断,没有导出正式 ONNX,也没有改 PM 阈值,所以不跑组合回测。", "", "如果诊断指标通过,下一步才把 OFI 特征纳入正式 `feature_schema.json`、导出模型包,再做 validation_locked 和 latest_stress 的完整回测。", "", f"- run10_baseline_root: `{baseline_root}`", "", ] ) def _contract_change_report() -> str: return "\n".join( [ "# Contract Change Report", "", "| 项 | 结论 |", "| --- | --- |", "| 正式 ONNX 输入 | 未改变 |", "| Java SHADOW 输入契约 | 未改变 |", "| 模型输出字段 | 未改变 |", "| 标签口径 | 未改变 |", "| PM 阈值 | 未改变 |", "", "原因:本轮只做旁路诊断。只有验证通过后,才会进入正式特征表和 Java 契约同步。", "", ] ) def _failure_case_placeholder_report(args: Any) -> str: return "\n".join( [ "# Failure Cases Compare", "", f"- run_id: `{args.run_id}`", "", "本轮没有产生正式交易决策,因此没有最差交易样本可比。", "", "下一步如果 OFI 进入正式模型包,必须用完整回测交易明细比较:", "", "1. validation_locked 最大亏损交易。", "2. latest_stress 最大亏损交易。", "3. 连续亏损段。", "4. 高 OFI 但反向亏损样本。", "", ] )