Use actual plan edge for Entry PM training

This commit is contained in:
Codex
2026-06-28 07:26:59 +08:00
parent 3c0f2d0d91
commit 2a86a6e2fa
6 changed files with 129 additions and 63 deletions
+76 -10
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
import sys
import tempfile
import unittest
from pathlib import Path
@@ -12,7 +13,7 @@ if str(TRAINING_ROOT) not in sys.path:
sys.path.insert(0, str(TRAINING_ROOT))
from trader_training.labels import DEFAULT_LABEL_CONFIG, _path_stats_for_group
from trader_training.pm import _probability_implied_edge, _simulate_open_trades, _threshold_candidates, default_pm_config
from trader_training.pm import _pm_frame, _probability_implied_edge, _simulate_open_trades, _threshold_candidates, default_pm_config
class RiskPmFixTest(unittest.TestCase):
@@ -44,13 +45,13 @@ class RiskPmFixTest(unittest.TestCase):
self.assertEqual(80.0, DEFAULT_LABEL_CONFIG["risk"]["spike_bps"])
self.assertEqual(1.8, DEFAULT_LABEL_CONFIG["risk"]["vol_expansion_ratio"])
def test_pm_search_covers_low_entry_probability_without_allowing_negative_edge(self) -> None:
def test_pm_search_uses_strict_entry_probability_and_positive_edge(self) -> None:
candidates = _threshold_candidates()
self.assertTrue(candidates)
self.assertLessEqual(max(item["max_market_risk_prob"] for item in candidates), 0.98)
self.assertLessEqual(min(item["min_entry_prob"] for item in candidates), 0.03)
self.assertGreaterEqual(min(item["min_expected_edge_bps"] for item in candidates), 0.0)
self.assertLessEqual(max(item["max_market_risk_prob"] for item in candidates), 0.65)
self.assertGreaterEqual(min(item["min_entry_prob"] for item in candidates), 0.30)
self.assertGreaterEqual(min(item["min_expected_edge_bps"] for item in candidates), 3.0)
def test_probability_implied_edge_uses_price_plan_payoff(self) -> None:
edge = _probability_implied_edge(
@@ -61,6 +62,71 @@ class RiskPmFixTest(unittest.TestCase):
self.assertAlmostEqual(3.7, float(edge.iloc[0]), places=6)
self.assertAlmostEqual(52.5, float(edge.iloc[1]), places=6)
def test_pm_frame_reads_actual_plan_edge_not_old_opportunity_edge(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
(root / "model" / "direction").mkdir(parents=True)
(root / "model" / "entry").mkdir(parents=True)
(root / "model" / "risk").mkdir(parents=True)
(root / "dataset").mkdir(parents=True)
(root / "label").mkdir(parents=True)
common = {
"sample_id": ["s0"],
"symbol": ["BTC-USDT-PERP"],
"event_time": pd.to_datetime(["2026-01-01T00:00:00Z"]),
"split_id": ["tune_inner"],
}
pd.DataFrame({**common, "long_prob": [0.70], "short_prob": [0.10], "neutral_prob": [0.20]}).to_parquet(
root / "model" / "direction" / "tune_predictions.parquet",
index=False,
)
pd.DataFrame(
{
**common,
"long_entry_prob": [0.80],
"short_entry_prob": [0.20],
"long_expected_net_edge_bps": [12.0],
"short_expected_net_edge_bps": [1.0],
}
).to_parquet(root / "model" / "entry" / "tune_predictions.parquet", index=False)
pd.DataFrame(
{
**common,
"market_risk_prob": [0.20],
"long_position_risk_prob": [0.10],
"short_position_risk_prob": [0.10],
}
).to_parquet(root / "model" / "risk" / "tune_predictions.parquet", index=False)
pd.DataFrame(
{
"sample_id": ["s0"],
"long_entry_target": [0],
"short_entry_target": [0],
"long_expected_net_edge_bps": [99.0],
"short_expected_net_edge_bps": [88.0],
"long_actual_plan_net_edge_bps": [-6.5],
"short_actual_plan_net_edge_bps": [-6.5],
}
).to_parquet(root / "dataset" / "entry_train.parquet", index=False)
pd.DataFrame(
{
"sample_id": ["s0", "s0"],
"side": ["LONG", "SHORT"],
"gross_edge_bps": [0.0, 0.0],
"cost_bps": [6.5, 6.5],
"target_hit": [0, 0],
"stop_hit": [0, 0],
"time_to_target_ms": [-1, -1],
"time_to_stop_ms": [-1, -1],
"time_to_exit_ms": [2_700_000, 2_700_000],
}
).to_parquet(root / "label" / "entry_labels.parquet", index=False)
frame = _pm_frame(root, "tune_inner")
self.assertAlmostEqual(-6.5, float(frame.loc[0, "actual_long_plan_edge_bps"]))
self.assertAlmostEqual(-6.5, float(frame.loc[0, "actual_short_plan_edge_bps"]))
def test_pm_backtest_sizing_uses_position_manager_formula_not_fixed_floor(self) -> None:
frame = pd.DataFrame(
{
@@ -78,8 +144,8 @@ class RiskPmFixTest(unittest.TestCase):
"short_position_risk_prob": [0.10],
"pred_long_expected_net_edge_bps": [40.0],
"pred_short_expected_net_edge_bps": [1.0],
"actual_long_expected_net_edge_bps": [30.0],
"actual_short_expected_net_edge_bps": [-10.0],
"actual_long_plan_edge_bps": [30.0],
"actual_short_plan_edge_bps": [-10.0],
"long_trade_net_edge_bps": [11.0],
"short_trade_net_edge_bps": [-14.5],
"long_target_hit": [1],
@@ -112,7 +178,7 @@ class RiskPmFixTest(unittest.TestCase):
self.assertEqual(1, len(trades))
self.assertAlmostEqual(11.0, float(trades.iloc[0]["actual_edge_bps"]))
self.assertAlmostEqual(30.0, float(trades.iloc[0]["label_max_edge_bps"]))
self.assertAlmostEqual(30.0, float(trades.iloc[0]["label_actual_plan_edge_bps"]))
self.assertGreater(float(trades.iloc[0]["planned_ratio"]), 0.05)
self.assertLessEqual(float(trades.iloc[0]["planned_ratio"]), 0.20)
@@ -133,8 +199,8 @@ class RiskPmFixTest(unittest.TestCase):
"short_position_risk_prob": [0.10, 0.10],
"pred_long_expected_net_edge_bps": [40.0, 42.0],
"pred_short_expected_net_edge_bps": [1.0, 1.0],
"actual_long_expected_net_edge_bps": [30.0, 31.0],
"actual_short_expected_net_edge_bps": [-10.0, -10.0],
"actual_long_plan_edge_bps": [30.0, 31.0],
"actual_short_plan_edge_bps": [-10.0, -10.0],
"long_trade_net_edge_bps": [11.0, 12.0],
"short_trade_net_edge_bps": [-14.5, -14.5],
"long_target_hit": [1, 1],
+7
View File
@@ -22,6 +22,7 @@ from trader_training.ofi_feature_experiment import _load_entry_dataset, l1_snaps
from trader_training.promote import promote_artifact_bundle
from trader_training.replay import build_splits
from trader_training.schemas import FEATURE_ORDER, LATEST_STRESS_SPLIT, MODEL_OUTPUTS, OUTPUT_MAPPING, TRAINING_SPLITS, VALIDATION_LOCKED_SPLIT
from trader_training.training import TARGETS
class TrainingContractTest(unittest.TestCase):
@@ -49,6 +50,12 @@ class TrainingContractTest(unittest.TestCase):
self.assertEqual("long_actual_plan_net_edge_bps", _screen_edge_column(dataset, "LONG"))
self.assertEqual("short_actual_plan_net_edge_bps", _screen_edge_column(dataset, "SHORT"))
def test_entry_regression_heads_train_on_actual_plan_edge(self) -> None:
heads = {head[0]: head[2] for head in TARGETS["ENTRY"]["heads"]}
self.assertEqual("long_actual_plan_net_edge_bps", heads["long_expected_net_edge_bps"])
self.assertEqual("short_actual_plan_net_edge_bps", heads["short_expected_net_edge_bps"])
def test_entry_feature_screen_keeps_zero_inflated_event_features(self) -> None:
values = np.concatenate((np.zeros(5000), np.linspace(1.0, 100.0, 500)))
edges = _bucket_edges(values)
+13 -11
View File
@@ -50,13 +50,16 @@ def _label_summary(root) -> dict[str, Any]:
"future_return_bps_quantile": _quantiles(direction_split["future_return_bps"], (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99)),
}
if not entry_split.empty:
if "actual_plan_net_edge_bps" not in entry_split.columns:
raise ValueError("entry_labels is missing actual_plan_net_edge_bps for diagnostics")
grouped = entry_split.groupby("side", observed=False)
item["entry"] = {
"rows": len(entry_split),
"target_rate_by_side": grouped["entry_target"].mean().round(6).to_dict(),
"edge_mean_by_side": grouped["expected_net_edge_bps"].mean().round(6).to_dict(),
"edge_column": "actual_plan_net_edge_bps",
"edge_mean_by_side": grouped["actual_plan_net_edge_bps"].mean().round(6).to_dict(),
"edge_quantile_by_side": {
str(side): _quantiles(group["expected_net_edge_bps"], (0.05, 0.5, 0.95))
str(side): _quantiles(group["actual_plan_net_edge_bps"], (0.05, 0.5, 0.95))
for side, group in grouped
},
}
@@ -98,8 +101,8 @@ def _score_distribution(frame: pd.DataFrame) -> dict[str, dict[str, float]]:
"pred_short_expected_net_edge_bps",
"model_pred_long_expected_net_edge_bps",
"model_pred_short_expected_net_edge_bps",
"actual_long_expected_net_edge_bps",
"actual_short_expected_net_edge_bps",
"actual_long_plan_edge_bps",
"actual_short_plan_edge_bps",
]
return {column: _quantiles(frame[column], (0.0, 0.05, 0.5, 0.95, 1.0)) for column in columns if column in frame.columns}
@@ -141,11 +144,10 @@ def _cumulative_gate_counts(steps: dict[str, pd.Series], total_rows: int) -> dic
def _relaxed_variants(frame: pd.DataFrame) -> dict[str, Any]:
variants = {
"no_risk_no_edge": {"prob": 0.54, "entry": 0.50, "margin": 0.02, "risk": 1.0, "edge": -99.0},
"rare_entry_low_prob": {"prob": 0.50, "entry": 0.03, "margin": 0.02, "risk": 0.98, "edge": 0.0},
"entry_only_55": {"prob": 0.0, "entry": 0.55, "margin": -99.0, "risk": 1.0, "edge": -99.0},
"direction_only_54": {"prob": 0.54, "entry": 0.0, "margin": 0.02, "risk": 1.0, "edge": -99.0},
"very_loose": {"prob": 0.50, "entry": 0.45, "margin": 0.0, "risk": 1.0, "edge": -99.0},
"entry_30_positive_edge": {"prob": 0.50, "entry": 0.30, "margin": 0.02, "risk": 0.65, "edge": 3.0},
"entry_50_positive_edge": {"prob": 0.50, "entry": 0.50, "margin": 0.02, "risk": 0.65, "edge": 3.0},
"entry_70_positive_edge": {"prob": 0.50, "entry": 0.70, "margin": 0.02, "risk": 0.65, "edge": 3.0},
"direction_only_control": {"prob": 0.54, "entry": 0.0, "margin": 0.02, "risk": 1.0, "edge": -99.0},
}
result: dict[str, Any] = {}
for name, thresholds in variants.items():
@@ -200,8 +202,8 @@ def _top_bucket_edge(frame: pd.DataFrame) -> dict[str, Any]:
direction_top[str(fraction)] = _plain_trade_metrics(top.rename(columns={"actual_edge_bps": "actual_edge_bps"}))
return {
"direction_top_score": direction_top,
"long_entry_prob_deciles": _decile_edge(frame, "long_entry_prob", "actual_long_expected_net_edge_bps", "long_entry_target"),
"short_entry_prob_deciles": _decile_edge(frame, "short_entry_prob", "actual_short_expected_net_edge_bps", "short_entry_target"),
"long_entry_prob_deciles": _decile_edge(frame, "long_entry_prob", "actual_long_plan_edge_bps", "long_entry_target"),
"short_entry_prob_deciles": _decile_edge(frame, "short_entry_prob", "actual_short_plan_edge_bps", "short_entry_target"),
}
@@ -338,36 +338,21 @@ def _load_direction_dataset(baseline_root: Path, feature: pd.DataFrame) -> pd.Da
def _load_entry_dataset(baseline_root: Path, feature: pd.DataFrame) -> pd.DataFrame:
dataset_path = baseline_root / "dataset" / "entry_train.parquet"
if dataset_path.is_file():
labels = read_parquet(dataset_path)
required = {
"sample_id",
"long_entry_target",
"short_entry_target",
"long_actual_plan_net_edge_bps",
"short_actual_plan_net_edge_bps",
}
missing = sorted(required.difference(labels.columns))
if missing:
raise ValueError(f"entry_train dataset missing columns: {missing}")
dataset = feature.merge(labels[list(required)], on="sample_id", how="inner")
logging.info("trader.training.ofi_entry_dataset_loaded source=entry_train rowCount=%s", len(dataset))
return dataset
labels = read_parquet(baseline_root / "label" / "entry_labels.parquet")
required = {"sample_id", "side", "entry_target", "expected_net_edge_bps"}
if not dataset_path.is_file():
raise FileNotFoundError(f"entry_train dataset is required for OFI experiment: {dataset_path}")
labels = read_parquet(dataset_path)
required = {
"sample_id",
"long_entry_target",
"short_entry_target",
"long_actual_plan_net_edge_bps",
"short_actual_plan_net_edge_bps",
}
missing = sorted(required.difference(labels.columns))
if missing:
raise ValueError(f"entry labels missing columns: {missing}")
long = labels[labels["side"].eq("LONG")][["sample_id", "entry_target", "expected_net_edge_bps"]].rename(
columns={"entry_target": "long_entry_target", "expected_net_edge_bps": "long_expected_net_edge_bps"}
)
short = labels[labels["side"].eq("SHORT")][["sample_id", "entry_target", "expected_net_edge_bps"]].rename(
columns={"entry_target": "short_entry_target", "expected_net_edge_bps": "short_expected_net_edge_bps"}
)
pivot = long.merge(short, on="sample_id", how="inner")
dataset = feature.merge(pivot, on="sample_id", how="inner")
logging.info("trader.training.ofi_entry_dataset_loaded source=entry_labels_legacy rowCount=%s", len(dataset))
raise ValueError(f"entry_train dataset missing columns: {missing}")
dataset = feature.merge(labels[list(required)], on="sample_id", how="inner")
logging.info("trader.training.ofi_entry_dataset_loaded source=entry_train rowCount=%s", len(dataset))
return dataset
+13 -10
View File
@@ -232,8 +232,8 @@ def _pm_frame(root, split_id: str) -> pd.DataFrame:
price_plan = _price_plan_context(root)
entry_dataset = read_parquet(root / "dataset" / "entry_train.parquet").rename(
columns={
"long_expected_net_edge_bps": "actual_long_expected_net_edge_bps",
"short_expected_net_edge_bps": "actual_short_expected_net_edge_bps",
"long_actual_plan_net_edge_bps": "actual_long_plan_edge_bps",
"short_actual_plan_net_edge_bps": "actual_short_plan_edge_bps",
}
)
entry_plan_outcome = _entry_plan_outcome_frame(root)
@@ -245,7 +245,10 @@ def _pm_frame(root, split_id: str) -> pd.DataFrame:
"pred_short_expected_net_edge_bps",
]
risk_cols = ["sample_id", "market_risk_prob", "long_position_risk_prob", "short_position_risk_prob"]
actual_cols = ["sample_id", "actual_long_expected_net_edge_bps", "actual_short_expected_net_edge_bps", "long_entry_target", "short_entry_target"]
actual_cols = ["sample_id", "actual_long_plan_edge_bps", "actual_short_plan_edge_bps", "long_entry_target", "short_entry_target"]
missing_actual_cols = sorted(set(actual_cols) - set(entry_dataset.columns))
if missing_actual_cols:
raise ValueError(f"entry_train is missing actual plan edge columns for PM: {missing_actual_cols}")
frame = (
direction[["sample_id", "symbol", "event_time", "split_id", "long_prob", "short_prob", "neutral_prob"]]
.merge(entry[entry_cols], on="sample_id", how="inner")
@@ -257,7 +260,7 @@ def _pm_frame(root, split_id: str) -> pd.DataFrame:
raise ValueError(f"PM frame is empty for {split_id}; check model predictions and entry dataset")
frame["model_pred_long_expected_net_edge_bps"] = frame["pred_long_expected_net_edge_bps"]
frame["model_pred_short_expected_net_edge_bps"] = frame["pred_short_expected_net_edge_bps"]
edge_mode = "MODEL_EXPECTED_NET_EDGE"
edge_mode = "MODEL_ACTUAL_PLAN_EDGE"
if price_plan.get("entryTargetMethod") not in {"OPPORTUNITY_MFE_V1", "OPPORTUNITY_QUALITY_V1"}:
frame["pred_long_expected_net_edge_bps"] = _probability_implied_edge(frame["long_entry_prob"], price_plan)
frame["pred_short_expected_net_edge_bps"] = _probability_implied_edge(frame["short_entry_prob"], price_plan)
@@ -333,9 +336,9 @@ def _threshold_candidates() -> list[dict[str, float]]:
values = itertools.product(
[0.50, 0.60, 0.70, 1.01],
[0.50, 0.60, 0.70, 1.01],
[0.03, 0.50, 0.70, 0.85],
[0.45, 0.65, 0.85],
[0.0, 8.0, 15.0, 25.0],
[0.30, 0.50, 0.70, 0.85],
[0.45, 0.65],
[3.0, 8.0, 15.0, 25.0],
[0.02, 0.06, 0.10],
)
return [
@@ -394,7 +397,7 @@ def _simulate_open_trades(
trades["entry_prob"] = np.where(is_long, trades["long_entry_prob"], trades["short_entry_prob"])
trades["predicted_edge_bps"] = np.where(is_long, trades["pred_long_expected_net_edge_bps"], trades["pred_short_expected_net_edge_bps"])
trades["actual_edge_bps"] = np.where(is_long, trades["long_trade_net_edge_bps"], trades["short_trade_net_edge_bps"])
trades["label_max_edge_bps"] = np.where(is_long, trades["actual_long_expected_net_edge_bps"], trades["actual_short_expected_net_edge_bps"])
trades["label_actual_plan_edge_bps"] = np.where(is_long, trades["actual_long_plan_edge_bps"], trades["actual_short_plan_edge_bps"])
trades["entry_target"] = np.where(is_long, trades["long_entry_target"], trades["short_entry_target"])
effective_pm_config = pm_config or _pm_config_from_thresholds(thresholds)
effective_price_plan = price_plan or DEFAULT_BACKTEST_PRICE_PLAN
@@ -417,7 +420,7 @@ def _simulate_open_trades(
"entry_prob",
"market_risk_prob",
"predicted_edge_bps",
"label_max_edge_bps",
"label_actual_plan_edge_bps",
"actual_edge_bps",
"entry_target",
"time_to_exit_ms",
@@ -440,7 +443,7 @@ def _empty_trade_frame() -> pd.DataFrame:
"entry_prob",
"market_risk_prob",
"predicted_edge_bps",
"label_max_edge_bps",
"label_actual_plan_edge_bps",
"actual_edge_bps",
"entry_target",
"time_to_exit_ms",
+7 -4
View File
@@ -39,8 +39,8 @@ TARGETS = {
"heads": [
("long_entry_prob", "binary", "long_entry_target", ["long_entry_prob"], ["longEntryProb"]),
("short_entry_prob", "binary", "short_entry_target", ["short_entry_prob"], ["shortEntryProb"]),
("long_expected_net_edge_bps", "regression", "long_expected_net_edge_bps", ["long_expected_net_edge_bps"], [None]),
("short_expected_net_edge_bps", "regression", "short_expected_net_edge_bps", ["short_expected_net_edge_bps"], [None]),
("long_expected_net_edge_bps", "regression", "long_actual_plan_net_edge_bps", ["long_expected_net_edge_bps"], [None]),
("short_expected_net_edge_bps", "regression", "short_actual_plan_net_edge_bps", ["short_expected_net_edge_bps"], [None]),
],
},
"CONTINUE": {
@@ -119,11 +119,12 @@ def train_small_models(args: Any) -> None:
head_results.extend(_fit_head(item, x_train_scaled, x_tune_scaled, train, tune, scaler))
for result in head_results:
logging.info(
"trader.training.model_head_trained runId=%s model=%s head=%s kind=%s metrics=%s",
"trader.training.model_head_trained runId=%s model=%s head=%s kind=%s targetSource=%s metrics=%s",
args.run_id,
model_name,
result.field,
result.kind,
result.metrics.get("target_source"),
result.metrics,
)
for result in head_results:
@@ -221,7 +222,9 @@ def _fit_head(item, x_train, x_tune, train: pd.DataFrame, tune: pd.DataFrame, sc
model.fit(x_train, y_train)
pred = model.predict(x_tune)
weight, bias = _fold_scaler(model.coef_.reshape(1, -1).T, np.array([model.intercept_]), scaler)
return [HeadResult(fields[0], None, "identity", weight, bias, _regression_metrics(y_train, y_val, pred), pred.reshape(-1, 1), y_val)]
metrics = _regression_metrics(y_train, y_val, pred)
metrics["target_source"] = target
return [HeadResult(fields[0], None, "identity", weight, bias, metrics, pred.reshape(-1, 1), y_val)]
raise ValueError(f"unsupported head kind: {kind}")