from __future__ import annotations import json import logging from dataclasses import dataclass from pathlib import Path from typing import Any import numpy as np import pandas as pd from sklearn.linear_model import HuberRegressor, LogisticRegression from sklearn.metrics import accuracy_score, log_loss, mean_absolute_error, roc_auc_score from sklearn.preprocessing import StandardScaler from trader_training.io_utils import read_parquet, run_root, sha256_file, write_json, write_parquet, write_text from trader_training.onnx_export import LinearHead, export_heads from trader_training.schemas import FEATURE_ORDER, FIT_SPLIT, LATEST_STRESS_SPLIT, MODEL_OUTPUTS, PROBABILITY_TARGET_NAMES, TUNE_SPLIT, VALIDATION_LOCKED_SPLIT @dataclass class HeadResult: field: str target_name: str | None kind: str weight: np.ndarray bias: np.ndarray metrics: dict[str, Any] tune_prediction: np.ndarray tune_target: np.ndarray | None TARGETS = { "DIRECTION": { "dataset": "direction_train.parquet", "heads": [("direction", "multiclass", ["long_target", "short_target", "neutral_target"], ["long_prob", "short_prob", "neutral_prob"], ["longProb", "shortProb", "neutralProb"])], }, "ENTRY": { "dataset": "entry_train.parquet", "heads": [ ("long_entry_prob", "binary", "long_entry_target", ["long_entry_prob"], ["longEntryProb"]), ("short_entry_prob", "binary", "short_entry_target", ["short_entry_prob"], ["shortEntryProb"]), ("long_expected_net_edge_bps", "regression", "long_actual_plan_net_edge_bps", ["long_expected_net_edge_bps"], [None]), ("short_expected_net_edge_bps", "regression", "short_actual_plan_net_edge_bps", ["short_expected_net_edge_bps"], [None]), ], }, "CONTINUE": { "dataset": "continue_train.parquet", "heads": [ ("long_continue_prob", "binary", "long_continue_target", ["long_continue_prob"], ["longContinueProb"]), ("short_continue_prob", "binary", "short_continue_target", ["short_continue_prob"], ["shortContinueProb"]), ("long_expected_continue_edge_bps", "regression", "long_expected_continue_edge_bps", ["long_expected_continue_edge_bps"], [None]), ("short_expected_continue_edge_bps", "regression", "short_expected_continue_edge_bps", ["short_expected_continue_edge_bps"], [None]), ], }, "EXIT": { "dataset": "exit_train.parquet", "heads": [ ("long_exit_prob", "binary", "long_exit_target", ["long_exit_prob"], ["longExitProb"]), ("short_exit_prob", "binary", "short_exit_target", ["short_exit_prob"], ["shortExitProb"]), ("long_adverse_move_bps", "regression", "long_adverse_move_bps", ["long_adverse_move_bps"], [None]), ("short_adverse_move_bps", "regression", "short_adverse_move_bps", ["short_adverse_move_bps"], [None]), ("adverse_move_prob", "binary", "adverse_move_prob_label", ["adverse_move_prob"], ["adverse_move_prob"]), ("reversal_prob", "binary", "reversal_prob_label", ["reversal_prob"], ["reversal_prob"]), ("stop_hit_prob", "binary", "stop_hit_prob_label", ["stop_hit_prob"], ["stop_hit_prob"]), ("stagnation_prob", "binary", "stagnation_prob_label", ["stagnation_prob"], ["stagnation_prob"]), ], }, "RISK": { "dataset": "risk_train.parquet", "heads": [ ("market_risk_prob", "binary", "market_risk_target", ["market_risk_prob"], ["marketRiskProb"]), ("long_position_risk_prob", "binary", "long_position_risk_target", ["long_position_risk_prob"], ["longPositionRiskProb"]), ("short_position_risk_prob", "binary", "short_position_risk_target", ["short_position_risk_prob"], ["shortPositionRiskProb"]), ("market_path_risk_bps", "regression", "market_path_risk_bps", ["market_path_risk_bps"], [None]), ("long_position_path_risk_bps", "regression", "long_position_path_risk_bps", ["long_position_path_risk_bps"], [None]), ("short_position_path_risk_bps", "regression", "short_position_path_risk_bps", ["short_position_path_risk_bps"], [None]), ("market_drawdown_prob", "binary", "market_drawdown_prob_label", ["market_drawdown_prob"], ["market_drawdown_prob"]), ("volatility_expansion_prob", "binary", "volatility_expansion_prob_label", ["volatility_expansion_prob"], ["volatility_expansion_prob"]), ("spike_prob", "binary", "spike_prob_label", ["spike_prob"], ["spike_prob"]), ("liquidity_deterioration_prob", "binary", "liquidity_deterioration_prob_label", ["liquidity_deterioration_prob"], ["liquidity_deterioration_prob"]), ("position_drawdown_prob", "binary", "position_drawdown_prob_label", ["position_drawdown_prob"], ["position_drawdown_prob"]), ], }, } def train_small_models(args: Any) -> None: root = run_root(args) model_manifest: dict[str, Any] = {} for model_name, spec in TARGETS.items(): dataset = read_parquet(root / "dataset" / spec["dataset"]) if model_name == "ENTRY" and _conditional_entry_source(args) == "direction_label": dataset = _attach_direction_fit_labels(root, dataset) if args.max_rows and len(dataset) > args.max_rows: dataset = dataset.sort_values("event_time").tail(args.max_rows).copy() if dataset.empty: raise ValueError(f"dataset is empty for {model_name}") train = dataset[dataset["split_id"] == FIT_SPLIT].copy() tune = dataset[dataset["split_id"] == TUNE_SPLIT].copy() validation_locked = dataset[dataset["split_id"] == VALIDATION_LOCKED_SPLIT].copy() latest_stress = dataset[dataset["split_id"] == LATEST_STRESS_SPLIT].copy() if train.empty or tune.empty: raise ValueError(f"{model_name} needs {FIT_SPLIT} and {TUNE_SPLIT} rows") logging.info( "trader.training.model_dataset_loaded runId=%s model=%s totalRows=%s trainRows=%s tuneRows=%s validationLockedRows=%s latestStressRows=%s splitCounts=%s", args.run_id, model_name, len(dataset), len(train), len(tune), len(validation_locked), len(latest_stress), dataset["split_id"].value_counts().to_dict(), ) scaler = StandardScaler() x_train_scaled = scaler.fit_transform(train[FEATURE_ORDER].astype("float32")) x_tune_scaled = scaler.transform(tune[FEATURE_ORDER].astype("float32")) heads: list[LinearHead] = [] head_results: list[HeadResult] = [] for item in spec["heads"]: head_name = item[0] head_train_mask, head_filter = _head_train_mask(model_name, head_name, train, args) head_results.extend(_fit_head(item, x_train_scaled, x_tune_scaled, train, tune, scaler, head_train_mask, head_filter, args)) for result in head_results: logging.info( "trader.training.model_head_trained runId=%s model=%s head=%s kind=%s targetSource=%s metrics=%s", args.run_id, model_name, result.field, result.kind, result.metrics.get("target_source"), result.metrics, ) for result in head_results: heads.append(LinearHead(result.field, _onnx_kind(result.kind), result.weight, result.bias)) model_dir = root / "model" / model_name.lower() model_path = model_dir / f"{model_name.lower()}.onnx" export_heads(model_path, heads, feature_count=len(FEATURE_ORDER), opset=17) predictions = _tune_prediction_frame(tune, head_results) write_parquet(model_dir / "tune_predictions.parquet", predictions) if not validation_locked.empty: write_parquet(model_dir / "validation_locked_predictions.parquet", _predict_frame(validation_locked, head_results, include_labels=True)) if not latest_stress.empty: write_parquet(model_dir / "latest_stress_predictions.parquet", _predict_frame(latest_stress, head_results, include_labels=False)) metrics = {result.field: result.metrics for result in head_results} model_hash = sha256_file(model_path) quality_status, quality_reasons = _model_quality(head_results) write_json( model_dir / "model_train_result.json", { "model_name": model_name, "metrics": metrics, "quality_status": quality_status, "quality_reasons": quality_reasons, "artifact_hash_sha256": model_hash, }, ) write_json( model_dir / "model_manifest.json", { "model_name": model_name, "model_path": str(model_path), "model_format": "ONNX", "input_tensor_name": "features", "input_feature_count": len(FEATURE_ORDER), "output_tensor_name": "prediction", "output_fields": MODEL_OUTPUTS[model_name], "quality_status": quality_status, "quality_reasons": quality_reasons, "artifact_hash_sha256": model_hash, }, ) _write_model_examples(model_dir, model_name, tune, predictions) _write_feature_importance(model_dir / "feature_importance.csv", head_results) _write_version_compare(model_dir, model_name, metrics) _write_training_report(model_dir / "training_report.md", model_name, metrics, quality_status, quality_reasons) model_manifest[model_name] = {"path": str(model_path), "hash_sha256": model_hash, "metrics": metrics, "quality_status": quality_status, "quality_reasons": quality_reasons} logging.info( "trader.training.model_trained runId=%s model=%s qualityStatus=%s qualityReasons=%s path=%s tunePredictionRows=%s featureImportancePath=%s", args.run_id, model_name, quality_status, quality_reasons, model_path, len(predictions), model_dir / "feature_importance.csv", ) write_json(root / "model" / "model_train_manifest.json", model_manifest) def _conditional_entry_enabled(args: Any) -> bool: return _conditional_entry_source(args) != "none" def _conditional_entry_source(args: Any) -> str: source = str(getattr(args, "conditional_entry_source", "none") or "none").strip().lower() if bool(getattr(args, "conditional_entry_direction_labels", False)): source = "direction_label" allowed = {"none", "direction_label", "side_opportunity"} if source not in allowed: raise ValueError(f"unsupported conditional Entry source: {source}") return source def _attach_direction_fit_labels(root: Path, entry_dataset: pd.DataFrame) -> pd.DataFrame: direction = read_parquet(root / "dataset" / "direction_train.parquet") required = {"sample_id", "long_target", "short_target"} missing = sorted(required - set(direction.columns)) if missing: raise ValueError(f"direction_train is missing columns required by conditional Entry training: {missing}") merged = entry_dataset.merge(direction[list(required)], on="sample_id", how="inner", validate="one_to_one") if len(merged) != len(entry_dataset): raise ValueError( f"conditional Entry training lost rows while attaching direction labels: before={len(entry_dataset)} after={len(merged)}" ) logging.info( "trader.training.entry_direction_labels_attached rowCount=%s longDirectionRows=%s shortDirectionRows=%s", len(merged), int(pd.to_numeric(merged["long_target"], errors="coerce").fillna(0).astype(int).sum()), int(pd.to_numeric(merged["short_target"], errors="coerce").fillna(0).astype(int).sum()), ) return merged def _head_train_mask(model_name: str, head_name: str, train: pd.DataFrame, args: Any) -> tuple[np.ndarray, str]: source = _conditional_entry_source(args) if model_name != "ENTRY" or source == "none": return np.ones(len(train), dtype=bool), "ALL_FIT_ROWS" if head_name.startswith("long_"): side = "LONG" direction_label_column = "long_target" opportunity_column = "long_max_achievable_net_edge_bps" elif head_name.startswith("short_"): side = "SHORT" direction_label_column = "short_target" opportunity_column = "short_max_achievable_net_edge_bps" else: return np.ones(len(train), dtype=bool), "ALL_FIT_ROWS" if source == "direction_label": if direction_label_column not in train.columns: raise ValueError(f"conditional Entry training requires {direction_label_column} for head {head_name}") mask = pd.to_numeric(train[direction_label_column], errors="coerce").fillna(0).astype(int).eq(1).to_numpy() return mask, f"DIRECTION_LABEL_{side}_FIT_ROWS" threshold = float(getattr(args, "conditional_entry_opportunity_bps", 40.0) or 40.0) if opportunity_column not in train.columns: raise ValueError(f"side opportunity Entry training requires {opportunity_column} for head {head_name}") mask = pd.to_numeric(train[opportunity_column], errors="coerce").ge(threshold).fillna(False).to_numpy() filter_name = f"SIDE_OPPORTUNITY_{side}_GE_{threshold:g}_BPS_FIT_ROWS" return mask, filter_name def _fit_head( item, x_train, x_tune, train: pd.DataFrame, tune: pd.DataFrame, scaler: StandardScaler, head_train_mask: np.ndarray | None = None, head_filter: str = "ALL_FIT_ROWS", args: Any | None = None, ) -> list[HeadResult]: name, kind, target, fields, target_names = item if head_train_mask is None: head_train_mask = np.ones(len(train), dtype=bool) head_train_mask = np.asarray(head_train_mask, dtype=bool) if len(head_train_mask) != len(train): raise ValueError(f"head train mask length mismatch for {name}: mask={len(head_train_mask)} train={len(train)}") min_fit_rows = int(getattr(args, "conditional_entry_min_fit_rows", 1000) or 1000) if head_filter != "ALL_FIT_ROWS" else 1 head_fit_rows = int(head_train_mask.sum()) if head_fit_rows < min_fit_rows: raise ValueError(f"{name} has too few fit rows after {head_filter}: {head_fit_rows} < {min_fit_rows}") head_train = train.loc[head_train_mask].copy() x_head_train = x_train[head_train_mask] if kind == "multiclass": y_train = head_train[target].to_numpy().argmax(axis=1) y_val = tune[target].to_numpy().argmax(axis=1) model = LogisticRegression(max_iter=500) model.fit(x_head_train, y_train) proba = model.predict_proba(x_tune) weight, bias = _fold_scaler(model.coef_.T, model.intercept_, scaler) train_prior = head_train[target].to_numpy().mean(axis=0) metrics = _multiclass_metrics(y_train, y_val, proba, train_prior) _add_fit_filter_metrics(metrics, head_filter, head_fit_rows, len(train)) return [HeadResult("direction", target_names[0], "softmax", weight, bias, metrics, proba, y_val)] if kind == "binary": y_train = pd.to_numeric(head_train[target], errors="coerce").fillna(0).astype(int).to_numpy() y_val = pd.to_numeric(tune[target], errors="coerce").fillna(0).astype(int).to_numpy() if len(np.unique(y_train)) < 2: prevalence = float(np.clip(y_train.mean(), 1e-6, 1 - 1e-6)) coef = np.zeros((1, len(FEATURE_ORDER)), dtype=np.float32) intercept = np.array([np.log(prevalence / (1 - prevalence))], dtype=np.float32) proba = np.full(len(y_val), prevalence, dtype=np.float32) else: model = LogisticRegression(max_iter=500) model.fit(x_head_train, y_train) coef = model.coef_ intercept = model.intercept_ proba = model.predict_proba(x_tune)[:, 1] weight, bias = _fold_scaler(coef.T, intercept, scaler) metrics = _binary_metrics(y_train, y_val, proba) if len(np.unique(y_val)) == 2: metrics["auc"] = float(roc_auc_score(y_val, proba)) _add_fit_filter_metrics(metrics, head_filter, head_fit_rows, len(train)) return [HeadResult(fields[0], target_names[0], "sigmoid", weight, bias, metrics, proba.reshape(-1, 1), y_val)] if kind == "regression": y_train = pd.to_numeric(head_train[target], errors="coerce").fillna(0.0).to_numpy() y_val = pd.to_numeric(tune[target], errors="coerce").fillna(0.0).to_numpy() model = HuberRegressor(alpha=0.001, epsilon=1.35, max_iter=500) model.fit(x_head_train, y_train) pred = model.predict(x_tune) weight, bias = _fold_scaler(model.coef_.reshape(1, -1).T, np.array([model.intercept_]), scaler) metrics = _regression_metrics(y_train, y_val, pred) metrics["target_source"] = target _add_fit_filter_metrics(metrics, head_filter, head_fit_rows, len(train)) return [HeadResult(fields[0], None, "identity", weight, bias, metrics, pred.reshape(-1, 1), y_val)] raise ValueError(f"unsupported head kind: {kind}") def _add_fit_filter_metrics(metrics: dict[str, Any], fit_filter: str, fit_rows: int, total_fit_rows: int) -> None: metrics["fit_filter"] = fit_filter metrics["fit_rows"] = int(fit_rows) metrics["fit_total_rows"] = int(total_fit_rows) metrics["fit_row_ratio"] = float(fit_rows / total_fit_rows) if total_fit_rows else 0.0 def _fold_scaler(weight_scaled: np.ndarray, bias_scaled: np.ndarray, scaler: StandardScaler) -> tuple[np.ndarray, np.ndarray]: scale = np.where(scaler.scale_ == 0, 1.0, scaler.scale_) weight = weight_scaled / scale.reshape(-1, 1) bias = bias_scaled - np.sum((scaler.mean_ / scale).reshape(-1, 1) * weight_scaled, axis=0) return weight.astype(np.float32), bias.astype(np.float32) def _onnx_kind(kind: str) -> str: if kind in ("softmax", "sigmoid", "identity"): return kind raise ValueError(f"unsupported result kind: {kind}") def _multiclass_metrics(y_train: np.ndarray, y_val: np.ndarray, proba: np.ndarray, train_prior: np.ndarray) -> dict[str, Any]: one_hot = np.eye(proba.shape[1], dtype=float)[y_val] train_prior = np.asarray(train_prior, dtype=float) train_prior = train_prior / train_prior.sum() if train_prior.sum() > 0 else np.full(proba.shape[1], 1.0 / proba.shape[1]) constant = np.tile(train_prior.reshape(1, -1), (len(y_val), 1)) proba_for_logloss = _clip_normalize(proba) constant_for_logloss = _clip_normalize(constant) metrics: dict[str, Any] = { "accuracy": float(accuracy_score(y_val, proba.argmax(axis=1))), "logloss": float(log_loss(y_val, proba_for_logloss, labels=list(range(proba.shape[1])))), "constant_logloss": float(log_loss(y_val, constant_for_logloss, labels=list(range(proba.shape[1])))), "brier_multiclass": float(np.mean(np.sum((one_hot - proba) ** 2, axis=1))), "constant_brier_multiclass": float(np.mean(np.sum((one_hot - constant) ** 2, axis=1))), } for idx, name in enumerate(("long", "short", "neutral")): binary_target = (y_val == idx).astype(int) positives = int(binary_target.sum()) negatives = int(len(binary_target) - positives) if positives >= 200 and negatives >= 200: metrics[f"{name}_auc"] = float(roc_auc_score(binary_target, proba[:, idx])) else: metrics[f"{name}_auc_status"] = "INSUFFICIENT_SAMPLE" metrics[f"{name}_positive_count"] = positives metrics[f"{name}_negative_count"] = negatives max_prob = proba.max(axis=1) predicted_class = proba.argmax(axis=1) top_count = max(1, int(len(y_val) * 0.10)) top_idx = np.argsort(max_prob)[-top_count:] metrics["top10_hit_rate"] = float((predicted_class[top_idx] == y_val[top_idx]).mean()) metrics["all_hit_rate"] = float((predicted_class == y_val).mean()) return _with_quality(metrics) def _clip_normalize(values: np.ndarray) -> np.ndarray: values = np.clip(np.asarray(values, dtype=float), 1e-6, 1.0) return values / values.sum(axis=1, keepdims=True) def _binary_metrics(y_train: np.ndarray, y_val: np.ndarray, proba: np.ndarray) -> dict[str, Any]: proba = np.asarray(proba, dtype=float) train_rate = float(np.mean(y_train)) if len(y_train) else 0.0 constant = np.full(len(y_val), train_rate) metrics: dict[str, Any] = { "positive_rate": train_rate, "tune_positive_rate": float(np.mean(y_val)) if len(y_val) else 0.0, "brier": float(np.mean((y_val - proba) ** 2)) if len(y_val) else 0.0, "constant_brier": float(np.mean((y_val - constant) ** 2)) if len(y_val) else 0.0, } if len(y_val): top_count = max(1, int(len(y_val) * 0.10)) top_idx = np.argsort(proba)[-top_count:] metrics["top10_hit_rate"] = float(np.mean(y_val[top_idx])) metrics["all_hit_rate"] = float(np.mean(y_val)) return _with_quality(metrics) def _regression_metrics(y_train: np.ndarray, y_val: np.ndarray, pred: np.ndarray) -> dict[str, Any]: mae = float(mean_absolute_error(y_val, pred)) train_std = float(np.std(y_train)) train_median = float(np.median(y_train)) if len(y_train) else 0.0 constant_mae = float(mean_absolute_error(y_val, np.full(len(y_val), train_median))) if len(y_val) else 0.0 metrics: dict[str, Any] = { "mae": mae, "constant_mae": constant_mae, "train_target_median": train_median, "train_target_std": train_std, "mae_vs_train_std_ratio": float(mae / train_std) if train_std > 0 else None, "mae_vs_constant_ratio": float(mae / constant_mae) if constant_mae > 0 else None, } return _with_quality(metrics) def _with_quality(metrics: dict[str, Any]) -> dict[str, Any]: reasons: list[str] = [] for key, value in metrics.items(): if key.endswith("_auc") and isinstance(value, float) and value < 0.53: reasons.append(f"{key}_below_0.53") if "brier" in metrics and metrics.get("constant_brier") is not None and metrics["brier"] >= metrics["constant_brier"]: reasons.append("brier_not_better_than_constant") if "brier_multiclass" in metrics and metrics["brier_multiclass"] >= metrics["constant_brier_multiclass"]: reasons.append("brier_not_better_than_constant") if "mae" in metrics and metrics.get("constant_mae") is not None and metrics["constant_mae"] > 0 and metrics["mae"] >= metrics["constant_mae"]: reasons.append("mae_not_better_than_constant") if "top10_hit_rate" in metrics and "all_hit_rate" in metrics and metrics["top10_hit_rate"] <= metrics["all_hit_rate"]: reasons.append("top10_not_better_than_all") metrics["quality_status"] = "REJECTED" if reasons else "PASS" metrics["quality_reasons"] = reasons return metrics def _model_quality(results: list[HeadResult]) -> tuple[str, list[str]]: reasons = [] for result in results: if result.metrics.get("quality_status") == "REJECTED": for reason in result.metrics.get("quality_reasons", []): reasons.append(f"{result.field}:{reason}") return ("REJECTED", reasons) if reasons else ("PASS", []) def _tune_prediction_frame(tune: pd.DataFrame, results: list[HeadResult]) -> pd.DataFrame: out = tune[["sample_id", "symbol", "event_time", "split_id"]].copy().reset_index(drop=True) for result in results: values = result.tune_prediction if result.kind == "softmax": for idx, field in enumerate(MODEL_OUTPUTS["DIRECTION"]): out[field] = values[:, idx] if result.tune_target is not None: out["label__longProb"] = (result.tune_target == 0).astype(int) out["label__shortProb"] = (result.tune_target == 1).astype(int) out["label__neutralProb"] = (result.tune_target == 2).astype(int) else: out[result.field] = values.reshape(-1) if result.kind != "softmax" and result.target_name and result.tune_target is not None: out[f"label__{result.target_name}"] = result.tune_target return out def _predict_frame(frame: pd.DataFrame, results: list[HeadResult], include_labels: bool) -> pd.DataFrame: out = frame[["sample_id", "symbol", "event_time", "split_id"]].copy().reset_index(drop=True) features = frame[FEATURE_ORDER].astype("float32").to_numpy() for result in results: values = features @ result.weight + result.bias.reshape(1, -1) if result.kind == "softmax": values = _softmax(values) for idx, field in enumerate(MODEL_OUTPUTS["DIRECTION"]): out[field] = values[:, idx] elif result.kind == "sigmoid": out[result.field] = _sigmoid(values).reshape(-1) else: out[result.field] = values.reshape(-1) if include_labels and result.kind != "softmax" and result.target_name and result.target_name in frame.columns: out[f"label__{result.target_name}"] = frame[result.target_name].to_numpy() return out def _softmax(values: np.ndarray) -> np.ndarray: shifted = values - np.max(values, axis=1, keepdims=True) exp = np.exp(shifted) return exp / exp.sum(axis=1, keepdims=True) def _sigmoid(values: np.ndarray) -> np.ndarray: clipped = np.clip(values, -50.0, 50.0) return 1.0 / (1.0 + np.exp(-clipped)) def _write_training_report(path: Path, model_name: str, metrics: dict[str, Any], quality_status: str, quality_reasons: list[str]) -> None: lines = [ "# Trader Model Training Report", "", f"- model: {model_name}", f"- quality_status: {quality_status}", f"- quality_reasons: {quality_reasons}", "", "```json", json.dumps(metrics, indent=2, sort_keys=True), "```", "", ] write_text(path, "\n".join(lines)) def _write_model_examples(model_dir: Path, model_name: str, tune: pd.DataFrame, predictions: pd.DataFrame) -> None: sample_input = {feature: float(tune.iloc[0][feature]) for feature in FEATURE_ORDER} sample_output = {field: float(predictions.iloc[0][field]) for field in MODEL_OUTPUTS[model_name]} write_json(model_dir / "sample_input.json", sample_input) write_json(model_dir / "sample_output.json", sample_output) def _write_feature_importance(path: Path, results: list[HeadResult]) -> None: rows = [] for result in results: importance = np.mean(np.abs(result.weight), axis=1) for feature, value in zip(FEATURE_ORDER, importance): rows.append({"head": result.field, "feature": feature, "abs_weight": float(value)}) frame = pd.DataFrame(rows).sort_values(["head", "abs_weight"], ascending=[True, False]) write_text(path, frame.to_csv(index=False)) def _write_version_compare(model_dir: Path, model_name: str, metrics: dict[str, Any]) -> None: payload = { "model_name": model_name, "status": "NO_BASELINE", "reason": "first executable V4 training chain has no previous approved artifact bundle under this run root", "current_metrics": metrics, } write_json(model_dir / "version_compare_metrics.json", payload) write_text(model_dir / "version_compare_by_regime.csv", "regime,status,reason\nNO_BASELINE,NO_BASELINE,no previous approved artifact bundle\n") write_text(model_dir / "version_compare_top_bucket.csv", "bucket,status,reason\nNO_BASELINE,NO_BASELINE,no previous approved artifact bundle\n") lines = [ "# Version Compare Report", "", f"- model: {model_name}", "- status: NO_BASELINE", "- reason: 当前 run 目录没有上一版已验收模型包,所以首版只能记录当前指标,不能做新旧优劣判断。", "", ] write_text(model_dir / "version_compare_report.md", "\n".join(lines)) def build_calibrators(args: Any) -> None: root = run_root(args) manifest_rows = [] for model_name, target_names in PROBABILITY_TARGET_NAMES.items(): prediction_path = root / "model" / model_name.lower() / "tune_predictions.parquet" predictions = read_parquet(prediction_path) targets = {} reliability_rows = [] quality_reasons = [] for target_name in target_names: raw_field = _target_to_raw_field(model_name, target_name) label_field = f"label__{target_name}" labels = predictions[label_field].to_numpy() if label_field in predictions.columns else None raw = predictions[raw_field].to_numpy() bins = _calibration_bins(raw, labels) metrics, rows = _calibration_metrics(raw, labels, bins, target_name) targets[target_name] = {"bins": bins, "metrics": metrics} reliability_rows.extend(rows) if metrics.get("quality_status") == "REJECTED": quality_reasons.append(f"{target_name}:{metrics.get('quality_reason')}") calibrator = { "calibrator_version": f"{model_name.lower()}-cal-v4-btc-p0", "method": "BINNING", "targets": targets, "clip": {"min": 0.0, "max": 1.0}, "fallback_policy": "FAIL_FAST", } path = root / "calibration" / model_name.lower() / "calibrator.json" cal_hash = write_json(path, calibrator) quality_status = "REJECTED" if quality_reasons else "PASS" manifest_rows.append( { "model_name": model_name, "calibrator_path": str(path), "calibrator_hash_sha256": cal_hash, "target_count": len(targets), "quality_status": quality_status, "quality_reasons": quality_reasons, } ) write_text(root / "calibration" / model_name.lower() / "reliability_curve.csv", pd.DataFrame(reliability_rows).to_csv(index=False)) _write_calibration_report(root / "calibration" / model_name.lower() / "calibration_report.md", model_name, targets, quality_status, quality_reasons) logging.info("trader.training.calibrator_written runId=%s model=%s path=%s", args.run_id, model_name, path) write_json(root / "calibration" / "calibration_train_manifest.json", {"calibrators": manifest_rows}) def _target_to_raw_field(model_name: str, target_name: str) -> str: mapping = { "longProb": "long_prob", "shortProb": "short_prob", "neutralProb": "neutral_prob", "longEntryProb": "long_entry_prob", "shortEntryProb": "short_entry_prob", "longContinueProb": "long_continue_prob", "shortContinueProb": "short_continue_prob", "longExitProb": "long_exit_prob", "shortExitProb": "short_exit_prob", "marketRiskProb": "market_risk_prob", "longPositionRiskProb": "long_position_risk_prob", "shortPositionRiskProb": "short_position_risk_prob", } return mapping.get(target_name, target_name) def _calibration_bins(raw: np.ndarray, labels: np.ndarray | None) -> list[dict[str, float]]: raw = np.asarray(raw, dtype=float) raw = np.clip(np.nan_to_num(raw, nan=0.5), 0.0, 1.0) if labels is None or len(labels) != len(raw): return [{"min": 0.0, "max": 1.0, "calibrated": 0.5}] labels = np.asarray(labels, dtype=float) bins = [] edges = np.linspace(0.0, 1.0, 11) for left, right in zip(edges[:-1], edges[1:]): if right == 1.0: mask = (raw >= left) & (raw <= right) else: mask = (raw >= left) & (raw < right) calibrated = float(labels[mask].mean()) if mask.any() else float((left + right) / 2.0) bins.append({"min": float(left), "max": float(right), "calibrated": float(np.clip(calibrated, 0.0, 1.0))}) return bins def _apply_calibration(raw: np.ndarray, bins: list[dict[str, float]]) -> np.ndarray: out = np.zeros_like(raw, dtype=float) for item in bins: left = float(item["min"]) right = float(item["max"]) if right >= 1.0: mask = (raw >= left) & (raw <= right) else: mask = (raw >= left) & (raw < right) out[mask] = float(item["calibrated"]) return np.clip(out, 0.0, 1.0) def _calibration_metrics(raw: np.ndarray, labels: np.ndarray | None, bins: list[dict[str, float]], target_name: str) -> tuple[dict[str, Any], list[dict[str, Any]]]: raw = np.clip(np.asarray(raw, dtype=float), 0.0, 1.0) if labels is None or len(labels) != len(raw): return {"quality_status": "REJECTED", "quality_reason": "missing_labels"}, [] labels = np.asarray(labels, dtype=float) calibrated = _apply_calibration(raw, bins) raw_ece, rows = _ece(raw, labels, target_name, "raw") calibrated_ece, calibrated_rows = _ece(calibrated, labels, target_name, "calibrated") rows.extend(calibrated_rows) quality_status = "PASS" if calibrated_ece <= raw_ece else "REJECTED" return ( { "raw_ece": raw_ece, "calibrated_ece": calibrated_ece, "quality_status": quality_status, "quality_reason": None if quality_status == "PASS" else "calibrated_ece_not_improved", }, rows, ) def _ece(values: np.ndarray, labels: np.ndarray, target_name: str, series_name: str) -> tuple[float, list[dict[str, Any]]]: rows = [] total = len(values) ece = 0.0 edges = np.linspace(0.0, 1.0, 11) for left, right in zip(edges[:-1], edges[1:]): mask = (values >= left) & (values <= right) if right >= 1.0 else (values >= left) & (values < right) count = int(mask.sum()) confidence = float(values[mask].mean()) if count else float((left + right) / 2.0) accuracy = float(labels[mask].mean()) if count else 0.0 ece += (count / total) * abs(confidence - accuracy) if total else 0.0 rows.append( { "target": target_name, "series": series_name, "bin_min": left, "bin_max": right, "count": count, "confidence": confidence, "accuracy": accuracy, } ) return float(ece), rows def _write_calibration_report(path: Path, model_name: str, targets: dict[str, Any], quality_status: str, quality_reasons: list[str]) -> None: lines = ["# Trader Calibration Report", "", f"- model: {model_name}", f"- target_count: {len(targets)}", f"- quality_status: {quality_status}", f"- quality_reasons: {quality_reasons}", ""] for target_name, payload in targets.items(): lines.append(f"## {target_name}") lines.append("") lines.append("```json") lines.append(json.dumps(payload.get("metrics", {}), indent=2, sort_keys=True)) lines.append("```") lines.append("") write_text(path, "\n".join(lines))