diff --git a/training/scripts/11_train_small_models.py b/training/scripts/11_train_small_models.py index 3037fd0..70ff17c 100644 --- a/training/scripts/11_train_small_models.py +++ b/training/scripts/11_train_small_models.py @@ -11,6 +11,8 @@ def main() -> None: parser = argparse.ArgumentParser() add_common_args(parser) parser.add_argument("--max-rows", type=int, default=0) + parser.add_argument("--conditional-entry-direction-labels", action="store_true") + parser.add_argument("--conditional-entry-min-fit-rows", type=int, default=1000) args = parser.parse_args() setup_logging() train_small_models(args) diff --git a/training/tests/test_training_contract.py b/training/tests/test_training_contract.py index 41b3659..a06ddd8 100644 --- a/training/tests/test_training_contract.py +++ b/training/tests/test_training_contract.py @@ -25,7 +25,7 @@ from trader_training.ofi_feature_experiment import _load_entry_dataset, l1_snaps from trader_training.promote import promote_artifact_bundle from trader_training.replay import build_splits from trader_training.schemas import FEATURE_ORDER, LATEST_STRESS_SPLIT, MODEL_OUTPUTS, OUTPUT_MAPPING, TRAINING_SPLITS, VALIDATION_LOCKED_SPLIT -from trader_training.training import TARGETS +from trader_training.training import TARGETS, _head_train_mask class TrainingContractTest(unittest.TestCase): @@ -65,6 +65,20 @@ class TrainingContractTest(unittest.TestCase): self.assertEqual("long_actual_plan_net_edge_bps", heads["long_expected_net_edge_bps"]) self.assertEqual("short_actual_plan_net_edge_bps", heads["short_expected_net_edge_bps"]) + def test_conditional_entry_training_uses_direction_label_rows(self) -> None: + train = pd.DataFrame({"long_target": [1, 0, 1, 0], "short_target": [0, 1, 0, 1]}) + + long_mask, long_filter = _head_train_mask("ENTRY", "long_entry_prob", train, Namespace(conditional_entry_direction_labels=True)) + short_mask, short_filter = _head_train_mask("ENTRY", "short_expected_net_edge_bps", train, Namespace(conditional_entry_direction_labels=True)) + default_mask, default_filter = _head_train_mask("ENTRY", "long_entry_prob", train, Namespace(conditional_entry_direction_labels=False)) + + self.assertEqual("DIRECTION_LABEL_LONG_FIT_ROWS", long_filter) + self.assertEqual([True, False, True, False], long_mask.tolist()) + self.assertEqual("DIRECTION_LABEL_SHORT_FIT_ROWS", short_filter) + self.assertEqual([False, True, False, True], short_mask.tolist()) + self.assertEqual("ALL_FIT_ROWS", default_filter) + self.assertEqual([True, True, True, True], default_mask.tolist()) + def test_entry_feature_screen_keeps_zero_inflated_event_features(self) -> None: values = np.concatenate((np.zeros(5000), np.linspace(1.0, 100.0, 500))) edges = _bucket_edges(values) diff --git a/training/trader_training/training.py b/training/trader_training/training.py index 3afa581..400437d 100644 --- a/training/trader_training/training.py +++ b/training/trader_training/training.py @@ -89,6 +89,8 @@ def train_small_models(args: Any) -> None: model_manifest: dict[str, Any] = {} for model_name, spec in TARGETS.items(): dataset = read_parquet(root / "dataset" / spec["dataset"]) + if model_name == "ENTRY" and _conditional_entry_enabled(args): + dataset = _attach_direction_fit_labels(root, dataset) if args.max_rows and len(dataset) > args.max_rows: dataset = dataset.sort_values("event_time").tail(args.max_rows).copy() if dataset.empty: @@ -116,7 +118,9 @@ def train_small_models(args: Any) -> None: heads: list[LinearHead] = [] head_results: list[HeadResult] = [] for item in spec["heads"]: - head_results.extend(_fit_head(item, x_train_scaled, x_tune_scaled, train, tune, scaler)) + head_name = item[0] + head_train_mask, head_filter = _head_train_mask(model_name, head_name, train, args) + head_results.extend(_fit_head(item, x_train_scaled, x_tune_scaled, train, tune, scaler, head_train_mask, head_filter, args)) for result in head_results: logging.info( "trader.training.model_head_trained runId=%s model=%s head=%s kind=%s targetSource=%s metrics=%s", @@ -184,20 +188,83 @@ def train_small_models(args: Any) -> None: write_json(root / "model" / "model_train_manifest.json", model_manifest) -def _fit_head(item, x_train, x_tune, train: pd.DataFrame, tune: pd.DataFrame, scaler: StandardScaler) -> list[HeadResult]: +def _conditional_entry_enabled(args: Any) -> bool: + return bool(getattr(args, "conditional_entry_direction_labels", False)) + + +def _attach_direction_fit_labels(root: Path, entry_dataset: pd.DataFrame) -> pd.DataFrame: + direction = read_parquet(root / "dataset" / "direction_train.parquet") + required = {"sample_id", "long_target", "short_target"} + missing = sorted(required - set(direction.columns)) + if missing: + raise ValueError(f"direction_train is missing columns required by conditional Entry training: {missing}") + merged = entry_dataset.merge(direction[list(required)], on="sample_id", how="inner", validate="one_to_one") + if len(merged) != len(entry_dataset): + raise ValueError( + f"conditional Entry training lost rows while attaching direction labels: before={len(entry_dataset)} after={len(merged)}" + ) + logging.info( + "trader.training.entry_direction_labels_attached rowCount=%s longDirectionRows=%s shortDirectionRows=%s", + len(merged), + int(pd.to_numeric(merged["long_target"], errors="coerce").fillna(0).astype(int).sum()), + int(pd.to_numeric(merged["short_target"], errors="coerce").fillna(0).astype(int).sum()), + ) + return merged + + +def _head_train_mask(model_name: str, head_name: str, train: pd.DataFrame, args: Any) -> tuple[np.ndarray, str]: + if model_name != "ENTRY" or not _conditional_entry_enabled(args): + return np.ones(len(train), dtype=bool), "ALL_FIT_ROWS" + if head_name.startswith("long_"): + condition_column = "long_target" + filter_name = "DIRECTION_LABEL_LONG_FIT_ROWS" + elif head_name.startswith("short_"): + condition_column = "short_target" + filter_name = "DIRECTION_LABEL_SHORT_FIT_ROWS" + else: + return np.ones(len(train), dtype=bool), "ALL_FIT_ROWS" + if condition_column not in train.columns: + raise ValueError(f"conditional Entry training requires {condition_column} for head {head_name}") + mask = pd.to_numeric(train[condition_column], errors="coerce").fillna(0).astype(int).eq(1).to_numpy() + return mask, filter_name + + +def _fit_head( + item, + x_train, + x_tune, + train: pd.DataFrame, + tune: pd.DataFrame, + scaler: StandardScaler, + head_train_mask: np.ndarray | None = None, + head_filter: str = "ALL_FIT_ROWS", + args: Any | None = None, +) -> list[HeadResult]: name, kind, target, fields, target_names = item + if head_train_mask is None: + head_train_mask = np.ones(len(train), dtype=bool) + head_train_mask = np.asarray(head_train_mask, dtype=bool) + if len(head_train_mask) != len(train): + raise ValueError(f"head train mask length mismatch for {name}: mask={len(head_train_mask)} train={len(train)}") + min_fit_rows = int(getattr(args, "conditional_entry_min_fit_rows", 1000) or 1000) if head_filter != "ALL_FIT_ROWS" else 1 + head_fit_rows = int(head_train_mask.sum()) + if head_fit_rows < min_fit_rows: + raise ValueError(f"{name} has too few fit rows after {head_filter}: {head_fit_rows} < {min_fit_rows}") + head_train = train.loc[head_train_mask].copy() + x_head_train = x_train[head_train_mask] if kind == "multiclass": - y_train = train[target].to_numpy().argmax(axis=1) + y_train = head_train[target].to_numpy().argmax(axis=1) y_val = tune[target].to_numpy().argmax(axis=1) model = LogisticRegression(max_iter=500) - model.fit(x_train, y_train) + model.fit(x_head_train, y_train) proba = model.predict_proba(x_tune) weight, bias = _fold_scaler(model.coef_.T, model.intercept_, scaler) - train_prior = train[target].to_numpy().mean(axis=0) + train_prior = head_train[target].to_numpy().mean(axis=0) metrics = _multiclass_metrics(y_train, y_val, proba, train_prior) + _add_fit_filter_metrics(metrics, head_filter, head_fit_rows, len(train)) return [HeadResult("direction", target_names[0], "softmax", weight, bias, metrics, proba, y_val)] if kind == "binary": - y_train = pd.to_numeric(train[target], errors="coerce").fillna(0).astype(int).to_numpy() + y_train = pd.to_numeric(head_train[target], errors="coerce").fillna(0).astype(int).to_numpy() y_val = pd.to_numeric(tune[target], errors="coerce").fillna(0).astype(int).to_numpy() if len(np.unique(y_train)) < 2: prevalence = float(np.clip(y_train.mean(), 1e-6, 1 - 1e-6)) @@ -206,7 +273,7 @@ def _fit_head(item, x_train, x_tune, train: pd.DataFrame, tune: pd.DataFrame, sc proba = np.full(len(y_val), prevalence, dtype=np.float32) else: model = LogisticRegression(max_iter=500) - model.fit(x_train, y_train) + model.fit(x_head_train, y_train) coef = model.coef_ intercept = model.intercept_ proba = model.predict_proba(x_tune)[:, 1] @@ -214,20 +281,29 @@ def _fit_head(item, x_train, x_tune, train: pd.DataFrame, tune: pd.DataFrame, sc metrics = _binary_metrics(y_train, y_val, proba) if len(np.unique(y_val)) == 2: metrics["auc"] = float(roc_auc_score(y_val, proba)) + _add_fit_filter_metrics(metrics, head_filter, head_fit_rows, len(train)) return [HeadResult(fields[0], target_names[0], "sigmoid", weight, bias, metrics, proba.reshape(-1, 1), y_val)] if kind == "regression": - y_train = pd.to_numeric(train[target], errors="coerce").fillna(0.0).to_numpy() + y_train = pd.to_numeric(head_train[target], errors="coerce").fillna(0.0).to_numpy() y_val = pd.to_numeric(tune[target], errors="coerce").fillna(0.0).to_numpy() model = HuberRegressor(alpha=0.001, epsilon=1.35, max_iter=500) - model.fit(x_train, y_train) + model.fit(x_head_train, y_train) pred = model.predict(x_tune) weight, bias = _fold_scaler(model.coef_.reshape(1, -1).T, np.array([model.intercept_]), scaler) metrics = _regression_metrics(y_train, y_val, pred) metrics["target_source"] = target + _add_fit_filter_metrics(metrics, head_filter, head_fit_rows, len(train)) return [HeadResult(fields[0], None, "identity", weight, bias, metrics, pred.reshape(-1, 1), y_val)] raise ValueError(f"unsupported head kind: {kind}") +def _add_fit_filter_metrics(metrics: dict[str, Any], fit_filter: str, fit_rows: int, total_fit_rows: int) -> None: + metrics["fit_filter"] = fit_filter + metrics["fit_rows"] = int(fit_rows) + metrics["fit_total_rows"] = int(total_fit_rows) + metrics["fit_row_ratio"] = float(fit_rows / total_fit_rows) if total_fit_rows else 0.0 + + def _fold_scaler(weight_scaled: np.ndarray, bias_scaled: np.ndarray, scaler: StandardScaler) -> tuple[np.ndarray, np.ndarray]: scale = np.where(scaler.scale_ == 0, 1.0, scaler.scale_) weight = weight_scaled / scale.reshape(-1, 1)