Add Entry low-drawdown diagnostics

This commit is contained in:
Codex
2026-06-28 08:28:55 +08:00
parent 3f49af5ba6
commit 7268f640a6
3 changed files with 446 additions and 0 deletions
+47
View File
@@ -17,6 +17,7 @@ from trader_training.onnx_export import LinearHead, export_heads
from trader_training.dynamic_exit_search import search_dynamic_exit_plans
from trader_training.entry_condition_pair_screen import screen_entry_condition_pairs
from trader_training.entry_feature_screen import _bucket_edges, _screen_edge_column
from trader_training.entry_mae_label_diagnostic import diagnose_entry_mae_labels
from trader_training.io_utils import read_json, write_json
from trader_training.labels import ENTRY_LABEL_METHOD, _path_stats_for_group, build_entry_labels
from trader_training.ofi_feature_experiment import _load_entry_dataset, l1_snapshot_diff_ofi_quote
@@ -116,6 +117,52 @@ class TrainingContractTest(unittest.TestCase):
self.assertEqual("LONG", best["side"])
self.assertGreater(float(best["min_eval_edge_bps"]), 0.0)
def test_entry_mae_label_diagnostic_finds_low_drawdown_target(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
data_root = Path(tmp)
run_root = data_root / "trader-v4" / "runs" / "unit-mae-diagnostic"
dataset_path = run_root / "dataset" / "entry_train.parquet"
dataset_path.parent.mkdir(parents=True)
frames = []
row_count = 800
base_feature_values = np.linspace(0.0, 0.999, row_count)
for split_id in TRAINING_SPLITS:
frame = pd.DataFrame({feature: 0.0 for feature in FEATURE_ORDER}, index=np.arange(row_count))
frame["split_id"] = split_id
frame["ret_1m_bps"] = base_feature_values
good_mask = frame["ret_1m_bps"] > 0.85
frame["long_entry_target"] = good_mask.astype(int)
frame["short_entry_target"] = 0
frame["long_actual_plan_net_edge_bps"] = np.where(good_mask, 9.0, -6.0)
frame["short_actual_plan_net_edge_bps"] = -6.0
frame["long_max_achievable_net_edge_bps"] = np.where(good_mask, 18.0, 2.0)
frame["short_max_achievable_net_edge_bps"] = 2.0
frame["long_mae_bps"] = np.where(good_mask, 2.0, 15.0)
frame["short_mae_bps"] = 15.0
frames.append(frame)
pd.concat(frames, ignore_index=True).to_parquet(dataset_path, index=False)
diagnose_entry_mae_labels(
Namespace(
data_root=data_root,
run_id="unit-mae-diagnostic",
max_mae_bps=(4.0,),
min_opportunity_bps=(12.0,),
model_families=("linear",),
top_fraction=0.10,
max_train_rows=0,
)
)
result = read_json(run_root / "diagnostics" / "entry_mae_label_diagnostic_result.json")
candidates = pd.read_csv(run_root / "diagnostics" / "entry_mae_label_diagnostic_candidates.csv")
self.assertGreater(result["positive_top_edge_candidate_count"], 0)
best = candidates.iloc[0]
self.assertEqual("LONG", best["side"])
self.assertTrue(bool(best["stable_top_edge_positive"]))
def test_dynamic_exit_search_writes_plan_diagnostics(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
data_root = Path(tmp)