diff --git a/pom.xml b/pom.xml
index 814705a..ef9ecd9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,11 @@
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
+
+ org.apache.commons
+ commons-csv
+ 1.10.0
+
org.springframework.boot
spring-boot-starter-flyway
diff --git a/src/main/java/com/quantai/trader/brain/PlaybookCandidateEngine.java b/src/main/java/com/quantai/trader/brain/PlaybookCandidateEngine.java
index e07b995..0cce383 100644
--- a/src/main/java/com/quantai/trader/brain/PlaybookCandidateEngine.java
+++ b/src/main/java/com/quantai/trader/brain/PlaybookCandidateEngine.java
@@ -46,7 +46,7 @@ public class PlaybookCandidateEngine {
Ids.candidateId(cycle, playbook.playbookId()),
playbook.playbookId(),
playbook.playbookVersion(),
- TraderSide.LONG,
+ requiredSide(snapshot.setupFeatures(), "side"),
playbook.variant(),
snapshot.snapshotTime(),
pricePlan,
@@ -55,6 +55,20 @@ public class PlaybookCandidateEngine {
));
}
+ private TraderSide requiredSide(Map map, String key) {
+ Object value = map.get(key);
+ if (value instanceof TraderSide side) {
+ return side;
+ }
+ if (value instanceof String text && !text.isBlank()) {
+ return TraderSide.valueOf(text.trim().toUpperCase());
+ }
+ throw new TraderException(
+ TraderErrorCode.TRADER_ENTRY_PLAN_INCOMPLETE,
+ "setup feature is required when setupPass=true: " + key
+ );
+ }
+
private BigDecimal requiredDecimal(Map map, String key) {
Object value = map.get(key);
if (value instanceof Number number) {
diff --git a/src/main/java/com/quantai/trader/brain/TraderDecisionCycleRunner.java b/src/main/java/com/quantai/trader/brain/TraderDecisionCycleRunner.java
index b300595..bb5dad9 100644
--- a/src/main/java/com/quantai/trader/brain/TraderDecisionCycleRunner.java
+++ b/src/main/java/com/quantai/trader/brain/TraderDecisionCycleRunner.java
@@ -79,14 +79,14 @@ public class TraderDecisionCycleRunner {
StageDecision context = contextGate.evaluate(snapshot);
evidenceAppender.append(cycle, "CONTEXT_GATE", context);
if (context.blocked()) {
- TraderTrainingSample sample = sampleExporter.export(cycle.withState(TraderState.BLOCKED, "BLOCKED", context.blocker()), null, null, null);
+ TraderTrainingSample sample = sampleExporter.export(cycle.withState(TraderState.BLOCKED, "BLOCKED", context.blocker()), snapshot, null, null, null);
return new TraderCycleResult(cycle, null, null, sample);
}
List candidates = playbookCandidateEngine.generate(snapshot, cycle);
if (candidates.isEmpty()) {
evidenceAppender.append(cycle, "PLAYBOOK_CANDIDATE", StageDecision.block("NO_PLAYBOOK_CANDIDATE", "NO_PLAYBOOK_CANDIDATE"));
- TraderTrainingSample sample = sampleExporter.export(cycle, null, null, null);
+ TraderTrainingSample sample = sampleExporter.export(cycle, snapshot, null, null, null);
return new TraderCycleResult(cycle, null, null, sample);
}
@@ -94,7 +94,7 @@ public class TraderDecisionCycleRunner {
var trigger = triggerMarkoutService.evaluate(snapshot, selected);
evidenceAppender.append(cycle, "TRIGGER_MARKOUT", new StageDecision(trigger.pass(), trigger.reason(), trigger.blocker(), trigger.details()));
if (trigger.blocked()) {
- TraderTrainingSample sample = sampleExporter.export(cycle.withState(TraderState.TRIGGER_WAIT, "WAIT", trigger.blocker()), selected, null, null);
+ TraderTrainingSample sample = sampleExporter.export(cycle.withState(TraderState.TRIGGER_WAIT, "WAIT", trigger.blocker()), snapshot, selected, null, null);
return new TraderCycleResult(cycle, null, null, sample);
}
@@ -105,7 +105,7 @@ public class TraderDecisionCycleRunner {
RiskDecision risk = riskGate.evaluate(entryCycle, entryPlan, execution);
evidenceAppender.append(entryCycle, "RISK_GATE", new StageDecision(risk.allowAction(), risk.allowAction() ? "RISK_PASS" : "RISK_BLOCKED", risk.blocker(), risk.details()));
if (execution.blocked() || risk.blocked()) {
- TraderTrainingSample sample = sampleExporter.export(entryCycle.withState(TraderState.BLOCKED, "BLOCKED", risk.blocker()), selected, null, null);
+ TraderTrainingSample sample = sampleExporter.export(entryCycle.withState(TraderState.BLOCKED, "BLOCKED", risk.blocker()), snapshot, selected, null, null);
return new TraderCycleResult(entryCycle, null, null, sample);
}
@@ -115,6 +115,7 @@ public class TraderDecisionCycleRunner {
TraderLifecycleResult lifecycle = runPositionLifecycle(entryCycle, selected, action, path, snapshot);
TraderTrainingSample sample = sampleExporter.export(
lifecycle.finalCycle(),
+ snapshot,
selected,
lifecycle.lastAction(),
lifecycle.finalPath()
diff --git a/src/main/java/com/quantai/trader/domain/TraderMarketSnapshot.java b/src/main/java/com/quantai/trader/domain/TraderMarketSnapshot.java
index 2ab4c81..294b935 100644
--- a/src/main/java/com/quantai/trader/domain/TraderMarketSnapshot.java
+++ b/src/main/java/com/quantai/trader/domain/TraderMarketSnapshot.java
@@ -14,7 +14,8 @@ public record TraderMarketSnapshot(
Map setupFeatures,
Map triggerFeatures,
Map executionFeatures,
- Map dataQuality
+ Map dataQuality,
+ Map labelInputs
) {
public TraderMarketSnapshot {
@@ -23,5 +24,6 @@ public record TraderMarketSnapshot(
triggerFeatures = Maps.immutable(triggerFeatures);
executionFeatures = Maps.immutable(executionFeatures);
dataQuality = Maps.immutable(dataQuality);
+ labelInputs = Maps.immutable(labelInputs);
}
}
diff --git a/src/main/java/com/quantai/trader/market/SnapshotBuilder.java b/src/main/java/com/quantai/trader/market/SnapshotBuilder.java
index ca3d0ff..22343cb 100644
--- a/src/main/java/com/quantai/trader/market/SnapshotBuilder.java
+++ b/src/main/java/com/quantai/trader/market/SnapshotBuilder.java
@@ -31,7 +31,8 @@ public class SnapshotBuilder {
Objects.requireNonNull(tick.setupFeatures(), "setupFeatures is required"),
Objects.requireNonNull(tick.triggerFeatures(), "triggerFeatures is required"),
Objects.requireNonNull(tick.executionFeatures(), "executionFeatures is required"),
- Objects.requireNonNull(tick.dataQuality(), "dataQuality is required")
+ Objects.requireNonNull(tick.dataQuality(), "dataQuality is required"),
+ Objects.requireNonNull(tick.labelInputs(), "labelInputs is required")
);
}
diff --git a/src/main/java/com/quantai/trader/replay/CryptoLakeReplayCsvMarketEventReader.java b/src/main/java/com/quantai/trader/replay/CryptoLakeReplayCsvMarketEventReader.java
new file mode 100644
index 0000000..0f1ff0d
--- /dev/null
+++ b/src/main/java/com/quantai/trader/replay/CryptoLakeReplayCsvMarketEventReader.java
@@ -0,0 +1,498 @@
+package com.quantai.trader.replay;
+
+import com.quantai.trader.domain.TraderException;
+import com.quantai.trader.enums.TraderErrorCode;
+import com.quantai.trader.enums.TraderSide;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+@Component
+public class CryptoLakeReplayCsvMarketEventReader implements ReplayMarketEventReader {
+
+ private static final Logger log = LoggerFactory.getLogger(CryptoLakeReplayCsvMarketEventReader.class);
+ private static final MathContext MC = new MathContext(16, RoundingMode.HALF_UP);
+ private static final String REPLAY_SOURCE_KEY = "cryptoLakeReplay1m";
+ private static final String CANDIDATE_SOURCE_KEY = "candidateEvents";
+ private static final BigDecimal LONG_INVALID_BPS = new BigDecimal("12.0");
+ private static final BigDecimal LONG_STOP_BPS = new BigDecimal("8.0");
+ private static final BigDecimal LONG_TARGET_BPS = new BigDecimal("30.0");
+ private static final BigDecimal SHORT_INVALID_BPS = new BigDecimal("12.0");
+ private static final BigDecimal SHORT_STOP_BPS = new BigDecimal("8.0");
+ private static final BigDecimal SHORT_TARGET_BPS = new BigDecimal("30.0");
+
+ @Override
+ public boolean supports(ReplayRunConfig config) {
+ DataSourceSpec source = config.dataSources() == null ? null : config.dataSources().get(REPLAY_SOURCE_KEY);
+ return source != null && source.path() != null && source.path().endsWith(".csv");
+ }
+
+ @Override
+ public void validateReadable(ReplayRunConfig config) {
+ validateSource(selectReplaySource(config), REPLAY_SOURCE_KEY);
+ DataSourceSpec candidateSource = config.dataSources().get(CANDIDATE_SOURCE_KEY);
+ if (candidateSource != null) {
+ validateSource(candidateSource, CANDIDATE_SOURCE_KEY);
+ }
+ }
+
+ @Override
+ public List readTicks(ReplayRunConfig config) {
+ validateReadable(config);
+ NavigableMap bars = readReplayBars(config);
+ List ticks = config.dataSources().containsKey(CANDIDATE_SOURCE_KEY)
+ ? readCandidateTicks(config, bars)
+ : readMarketAuditTicks(config, bars);
+ if (ticks.isEmpty()) {
+ throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "crypto lake replay csv produced no ticks");
+ }
+ log.info(
+ "event=trader.replay.crypto_lake_csv.loaded runId={} symbol={} tickCount={} candidateMode={}",
+ config.runId(),
+ config.symbol(),
+ ticks.size(),
+ config.dataSources().containsKey(CANDIDATE_SOURCE_KEY)
+ );
+ return ticks;
+ }
+
+ private NavigableMap readReplayBars(ReplayRunConfig config) {
+ Path path = Path.of(selectReplaySource(config).path());
+ NavigableMap bars = new TreeMap<>();
+ try (CSVParser parser = CSVParser.parse(path, java.nio.charset.StandardCharsets.UTF_8,
+ CSVFormat.DEFAULT.builder().setHeader().setSkipHeaderRecord(true).build())) {
+ for (CSVRecord record : parser) {
+ if (!config.symbol().equals(required(record, "symbol"))) {
+ continue;
+ }
+ if (!"1m".equals(required(record, "timeframe"))) {
+ continue;
+ }
+ MarketBar bar = marketBar(record);
+ bars.put(bar.openTime(), bar);
+ }
+ } catch (IOException ex) {
+ throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "failed to read crypto lake replay csv: " + ex.getMessage());
+ }
+ if (bars.isEmpty()) {
+ throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "crypto lake replay csv has no rows for symbol: " + config.symbol());
+ }
+ return bars;
+ }
+
+ private List readMarketAuditTicks(ReplayRunConfig config, NavigableMap bars) {
+ List ticks = new ArrayList<>();
+ List ordered = List.copyOf(bars.values());
+ for (int i = 0; i < ordered.size(); i++) {
+ MarketBar bar = ordered.get(i);
+ if (outsideRunWindow(config, bar.openTime())) {
+ continue;
+ }
+ ticks.add(toTick(config, bar, null, labelInputs(ordered, i, null)));
+ }
+ return ticks.stream()
+ .sorted(Comparator.comparing(ReplayClockTick::eventTime))
+ .toList();
+ }
+
+ private List readCandidateTicks(ReplayRunConfig config, NavigableMap bars) {
+ Path path = Path.of(config.dataSources().get(CANDIDATE_SOURCE_KEY).path());
+ List ordered = List.copyOf(bars.values());
+ Map indexByTime = new LinkedHashMap<>();
+ for (int i = 0; i < ordered.size(); i++) {
+ indexByTime.put(ordered.get(i).openTime(), i);
+ }
+ List ticks = new ArrayList<>();
+ try (CSVParser parser = CSVParser.parse(path, java.nio.charset.StandardCharsets.UTF_8,
+ CSVFormat.DEFAULT.builder().setHeader().setSkipHeaderRecord(true).build())) {
+ for (CSVRecord record : parser) {
+ if (!config.symbol().equals(required(record, "symbol"))) {
+ continue;
+ }
+ Instant candidateTime = Instant.ofEpochMilli(requiredLong(record, "bar_time"));
+ if (outsideRunWindow(config, candidateTime)) {
+ continue;
+ }
+ Map.Entry entry = bars.ceilingEntry(candidateTime);
+ if (entry == null || outsideRunWindow(config, entry.getKey())) {
+ continue;
+ }
+ int barIndex = indexByTime.get(entry.getKey());
+ CandidateEvent event = candidateEvent(record, candidateTime);
+ ticks.add(toTick(config, entry.getValue(), event, labelInputs(ordered, barIndex, event.side())));
+ }
+ } catch (IOException ex) {
+ throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "failed to read candidate events csv: " + ex.getMessage());
+ }
+ return ticks.stream()
+ .sorted(Comparator.comparing(ReplayClockTick::eventTime))
+ .toList();
+ }
+
+ private ReplayClockTick toTick(
+ ReplayRunConfig config,
+ MarketBar bar,
+ CandidateEvent candidate,
+ Map labelInputs
+ ) {
+ List missing = missingFeatures(bar);
+ Map context = new LinkedHashMap<>();
+ context.put("contextPass", missing.isEmpty());
+ context.put("replaySourceType", "CRYPTO_LAKE_1M_CSV");
+ putDecimal(context, "sourceCoverage", bar.sourceCoverage());
+ putDecimal(context, "fundingBps", bar.fundingBps());
+ putDecimal(context, "openInterest", bar.openInterest());
+ putDecimal(context, "volume", bar.volume());
+
+ Map setup = new LinkedHashMap<>();
+ setup.put("setupPass", candidate != null);
+ setup.put("setupName", candidate == null ? "market_audit_only" : "candidate_event_replay");
+ if (candidate != null) {
+ if (bar.close() == null) {
+ throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "candidate event matched a replay bar without close price");
+ }
+ setup.put("candidateEventId", candidate.eventId());
+ setup.put("signalType", candidate.signalType());
+ setup.put("side", candidate.side().name());
+ setup.put("sourceService", candidate.sourceService());
+ putDecimal(setup, "entryPrice", bar.close());
+ putDecimal(setup, "invalidPrice", priceByBps(bar.close(), invalidBps(candidate.side()), adverseSign(candidate.side())));
+ putDecimal(setup, "stopPrice", priceByBps(bar.close(), stopBps(candidate.side()), adverseSign(candidate.side())));
+ putDecimal(setup, "targetPrice", priceByBps(bar.close(), targetBps(candidate.side()), favorableSign(candidate.side())));
+ putDecimal(setup, "executionQualityScore", executionQualityScore(bar));
+ }
+
+ Map trigger = new LinkedHashMap<>();
+ if (candidate != null && candidate.triggerScore() != null) {
+ putDecimal(trigger, "triggerScore", candidate.triggerScore());
+ }
+ trigger.put("replayTriggerSource", candidate == null ? "NONE" : "CANDIDATE_EVENT");
+
+ Map execution = new LinkedHashMap<>();
+ putDecimal(execution, "lastPrice", bar.close());
+ putDecimal(execution, "bestBidPrice", bar.bestBidPrice());
+ putDecimal(execution, "bestAskPrice", bar.bestAskPrice());
+ putDecimal(execution, "observedSpreadBps", bar.observedSpreadBps());
+ putDecimal(execution, "expectedSlippageBps", bar.expectedSlippageBps());
+ putDecimal(execution, "p95LatencyMs", bar.p95LatencyMs());
+
+ Map dataQuality = new LinkedHashMap<>();
+ dataQuality.put("missing_features", missing);
+ putDecimal(dataQuality, "sourceCoverage", bar.sourceCoverage());
+ dataQuality.put("replaySourcePath", selectReplaySource(config).path());
+
+ return new ReplayClockTick(
+ config.runId(),
+ config.symbol(),
+ bar.openTime(),
+ context,
+ setup,
+ trigger,
+ execution,
+ dataQuality,
+ labelInputs
+ );
+ }
+
+ private Map labelInputs(List bars, int index, TraderSide side) {
+ Map labels = new LinkedHashMap<>();
+ labels.put("labelSource", "CRYPTO_LAKE_1M_REPLAY");
+ if (side == null) {
+ labels.put("labelStatus", "MARKET_AUDIT_NO_SIDE");
+ return labels;
+ }
+ MarketBar entry = bars.get(index);
+ labels.put("side", side.name());
+ putDecimal(labels, "entryPrice", entry.close());
+ putIfPresent(labels, "markoutBps1m", markout(bars, index, side, 1));
+ putIfPresent(labels, "markoutBps5m", markout(bars, index, side, 5));
+ putIfPresent(labels, "markoutBps15m", markout(bars, index, side, 15));
+ putIfPresent(labels, "mfeBps15m", mfe(bars, index, side, 15));
+ putIfPresent(labels, "maeBps15m", mae(bars, index, side, 15));
+ putIfPresent(labels, "targetBeforeStop15m", targetBeforeStop(bars, index, side, targetBps(side), stopBps(side), 15));
+ putDecimal(labels, "expectedSlippageBps", entry.expectedSlippageBps());
+ labels.put("labelStatus", hasMandatoryLabels(labels) ? "REPLAY_MARKOUT_LABELED" : "FUTURE_WINDOW_INCOMPLETE");
+ return labels;
+ }
+
+ private boolean hasMandatoryLabels(Map labels) {
+ return labels.containsKey("markoutBps1m")
+ && labels.containsKey("markoutBps5m")
+ && labels.containsKey("markoutBps15m");
+ }
+
+ private String markout(List bars, int index, TraderSide side, int minutes) {
+ if (index + minutes >= bars.size()) {
+ return null;
+ }
+ BigDecimal entry = bars.get(index).close();
+ BigDecimal close = bars.get(index + minutes).close();
+ return decimalText(sideReturnBps(side, entry, close));
+ }
+
+ private String mfe(List bars, int index, TraderSide side, int minutes) {
+ if (index + minutes >= bars.size()) {
+ return null;
+ }
+ BigDecimal entry = bars.get(index).close();
+ BigDecimal best = BigDecimal.ZERO;
+ for (int i = index + 1; i <= index + minutes; i++) {
+ BigDecimal favorable = side == TraderSide.LONG ? bars.get(i).high() : bars.get(i).low();
+ best = best.max(sideReturnBps(side, entry, favorable));
+ }
+ return decimalText(best.max(BigDecimal.ZERO));
+ }
+
+ private String mae(List bars, int index, TraderSide side, int minutes) {
+ if (index + minutes >= bars.size()) {
+ return null;
+ }
+ BigDecimal entry = bars.get(index).close();
+ BigDecimal worst = BigDecimal.ZERO;
+ for (int i = index + 1; i <= index + minutes; i++) {
+ BigDecimal adverse = side == TraderSide.LONG ? bars.get(i).low() : bars.get(i).high();
+ BigDecimal signed = sideReturnBps(side, entry, adverse);
+ if (signed.compareTo(BigDecimal.ZERO) < 0) {
+ worst = worst.max(signed.abs());
+ }
+ }
+ return decimalText(worst);
+ }
+
+ private Boolean targetBeforeStop(List bars, int index, TraderSide side, BigDecimal targetBps, BigDecimal stopBps, int minutes) {
+ if (index + minutes >= bars.size()) {
+ return null;
+ }
+ BigDecimal entry = bars.get(index).close();
+ BigDecimal target = priceByBps(entry, targetBps, favorableSign(side));
+ BigDecimal stop = priceByBps(entry, stopBps, adverseSign(side));
+ for (int i = index + 1; i <= index + minutes; i++) {
+ MarketBar bar = bars.get(i);
+ boolean targetHit = side == TraderSide.LONG
+ ? bar.high().compareTo(target) >= 0
+ : bar.low().compareTo(target) <= 0;
+ boolean stopHit = side == TraderSide.LONG
+ ? bar.low().compareTo(stop) <= 0
+ : bar.high().compareTo(stop) >= 0;
+ if (targetHit) {
+ return true;
+ }
+ if (stopHit) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ private MarketBar marketBar(CSVRecord record) {
+ return new MarketBar(
+ Instant.parse(required(record, "open_time")),
+ decimal(record, "open"),
+ decimal(record, "high"),
+ decimal(record, "low"),
+ decimal(record, "close"),
+ decimal(record, "volume"),
+ decimal(record, "taker_buy_volume"),
+ decimal(record, "funding_bps"),
+ decimal(record, "open_interest"),
+ decimal(record, "best_bid_price"),
+ decimal(record, "best_ask_price"),
+ decimal(record, "observed_spread_bps"),
+ decimal(record, "expected_slippage_bps"),
+ decimal(record, "p95_latency_ms"),
+ decimal(record, "source_coverage")
+ );
+ }
+
+ private CandidateEvent candidateEvent(CSVRecord record, Instant candidateTime) {
+ String side = required(record, "direction").toUpperCase();
+ return new CandidateEvent(
+ required(record, "event_id"),
+ candidateTime,
+ required(record, "signal_type"),
+ TraderSide.valueOf(side),
+ required(record, "source_service"),
+ firstDecimal(record, "old_fusion_score", "legacy_fusion_score")
+ );
+ }
+
+ private List missingFeatures(MarketBar bar) {
+ List missing = new ArrayList<>();
+ requirePresent(missing, "open", bar.open());
+ requirePresent(missing, "high", bar.high());
+ requirePresent(missing, "low", bar.low());
+ requirePresent(missing, "close", bar.close());
+ requirePresent(missing, "taker_buy_volume", bar.takerBuyVolume());
+ requirePresent(missing, "expected_slippage_bps", bar.expectedSlippageBps());
+ requirePresent(missing, "source_coverage", bar.sourceCoverage());
+ return missing;
+ }
+
+ private void requirePresent(List missing, String field, BigDecimal value) {
+ if (value == null) {
+ missing.add(field);
+ }
+ }
+
+ private DataSourceSpec selectReplaySource(ReplayRunConfig config) {
+ DataSourceSpec source = config.dataSources().get(REPLAY_SOURCE_KEY);
+ if (source == null) {
+ throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "dataSources.cryptoLakeReplay1m is required");
+ }
+ return source;
+ }
+
+ private void validateSource(DataSourceSpec source, String sourceType) {
+ if (source.path() == null || source.path().isBlank()) {
+ throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source path is required: " + sourceType);
+ }
+ Path path = Path.of(source.path());
+ if (!Files.isRegularFile(path) || !Files.isReadable(path)) {
+ throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source is not readable: " + source.path());
+ }
+ }
+
+ private boolean outsideRunWindow(ReplayRunConfig config, Instant time) {
+ return time.isBefore(config.from()) || !time.isBefore(config.to());
+ }
+
+ private String required(CSVRecord record, String column) {
+ String value = record.get(column);
+ if (value == null || value.isBlank()) {
+ throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "csv column is required: " + column);
+ }
+ return value;
+ }
+
+ private long requiredLong(CSVRecord record, String column) {
+ return Long.parseLong(required(record, column));
+ }
+
+ private BigDecimal firstDecimal(CSVRecord record, String... columns) {
+ for (String column : columns) {
+ if (!record.isMapped(column)) {
+ continue;
+ }
+ BigDecimal value = decimal(record, column);
+ if (value != null) {
+ return value;
+ }
+ }
+ return null;
+ }
+
+ private BigDecimal decimal(CSVRecord record, String column) {
+ if (!record.isMapped(column)) {
+ return null;
+ }
+ String value = record.get(column);
+ if (value == null || value.isBlank()) {
+ return null;
+ }
+ return new BigDecimal(value);
+ }
+
+ private String decimalText(BigDecimal value) {
+ return value == null ? null : value.stripTrailingZeros().toPlainString();
+ }
+
+ private void putDecimal(Map target, String key, BigDecimal value) {
+ String text = decimalText(value);
+ if (text != null) {
+ target.put(key, text);
+ }
+ }
+
+ private void putIfPresent(Map target, String key, Object value) {
+ if (value != null) {
+ target.put(key, value);
+ }
+ }
+
+ private BigDecimal executionQualityScore(MarketBar bar) {
+ if (bar.expectedSlippageBps() == null) {
+ return null;
+ }
+ BigDecimal score = BigDecimal.ONE.subtract(bar.expectedSlippageBps().divide(new BigDecimal("20.0"), MC), MC);
+ return score.max(new BigDecimal("0.20")).min(BigDecimal.ONE).setScale(8, RoundingMode.HALF_UP);
+ }
+
+ private BigDecimal sideReturnBps(TraderSide side, BigDecimal entry, BigDecimal exit) {
+ BigDecimal gross = exit.subtract(entry, MC)
+ .divide(entry, MC)
+ .multiply(new BigDecimal("10000"), MC);
+ return side == TraderSide.LONG ? gross : gross.negate();
+ }
+
+ private BigDecimal priceByBps(BigDecimal entry, BigDecimal bps, int sign) {
+ BigDecimal multiplier = BigDecimal.ONE.add(BigDecimal.valueOf(sign).multiply(bps, MC).divide(new BigDecimal("10000"), MC), MC);
+ return entry.multiply(multiplier, MC).setScale(8, RoundingMode.HALF_UP);
+ }
+
+ private int favorableSign(TraderSide side) {
+ return side == TraderSide.LONG ? 1 : -1;
+ }
+
+ private int adverseSign(TraderSide side) {
+ return side == TraderSide.LONG ? -1 : 1;
+ }
+
+ private BigDecimal invalidBps(TraderSide side) {
+ return side == TraderSide.LONG ? LONG_INVALID_BPS : SHORT_INVALID_BPS;
+ }
+
+ private BigDecimal stopBps(TraderSide side) {
+ return side == TraderSide.LONG ? LONG_STOP_BPS : SHORT_STOP_BPS;
+ }
+
+ private BigDecimal targetBps(TraderSide side) {
+ return side == TraderSide.LONG ? LONG_TARGET_BPS : SHORT_TARGET_BPS;
+ }
+
+ private record MarketBar(
+ Instant openTime,
+ BigDecimal open,
+ BigDecimal high,
+ BigDecimal low,
+ BigDecimal close,
+ BigDecimal volume,
+ BigDecimal takerBuyVolume,
+ BigDecimal fundingBps,
+ BigDecimal openInterest,
+ BigDecimal bestBidPrice,
+ BigDecimal bestAskPrice,
+ BigDecimal observedSpreadBps,
+ BigDecimal expectedSlippageBps,
+ BigDecimal p95LatencyMs,
+ BigDecimal sourceCoverage
+ ) {
+ }
+
+ private record CandidateEvent(
+ String eventId,
+ Instant barTime,
+ String signalType,
+ TraderSide side,
+ String sourceService,
+ BigDecimal triggerScore
+ ) {
+ }
+}
diff --git a/src/main/java/com/quantai/trader/replay/JsonlReplayMarketEventReader.java b/src/main/java/com/quantai/trader/replay/JsonlReplayMarketEventReader.java
index a09a348..3f79494 100644
--- a/src/main/java/com/quantai/trader/replay/JsonlReplayMarketEventReader.java
+++ b/src/main/java/com/quantai/trader/replay/JsonlReplayMarketEventReader.java
@@ -22,6 +22,12 @@ public class JsonlReplayMarketEventReader implements ReplayMarketEventReader {
this.objectMapper = new ObjectMapper().findAndRegisterModules();
}
+ @Override
+ public boolean supports(ReplayRunConfig config) {
+ DataSourceSpec source = config.dataSources() == null ? null : config.dataSources().get("ticks");
+ return source != null && source.path() != null && source.path().endsWith(".jsonl");
+ }
+
@Override
public void validateReadable(ReplayRunConfig config) {
DataSourceSpec source = selectReplaySource(config);
@@ -76,7 +82,8 @@ public class JsonlReplayMarketEventReader implements ReplayMarketEventReader {
fixture.setupFeatures(),
fixture.triggerFeatures(),
fixture.executionFeatures(),
- fixture.dataQuality()
+ fixture.dataQuality(),
+ fixture.labelInputs() == null ? Map.of() : fixture.labelInputs()
);
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "invalid replay tick json: " + ex.getMessage());
@@ -100,7 +107,8 @@ public class JsonlReplayMarketEventReader implements ReplayMarketEventReader {
Map setupFeatures,
Map triggerFeatures,
Map executionFeatures,
- Map dataQuality
+ Map dataQuality,
+ Map labelInputs
) {
}
}
diff --git a/src/main/java/com/quantai/trader/replay/ReplayClockTick.java b/src/main/java/com/quantai/trader/replay/ReplayClockTick.java
index 7bcd2c9..229d5f9 100644
--- a/src/main/java/com/quantai/trader/replay/ReplayClockTick.java
+++ b/src/main/java/com/quantai/trader/replay/ReplayClockTick.java
@@ -1,6 +1,7 @@
package com.quantai.trader.replay;
import java.time.Instant;
+import java.util.LinkedHashMap;
import java.util.Map;
public record ReplayClockTick(
@@ -11,6 +12,23 @@ public record ReplayClockTick(
Map setupFeatures,
Map triggerFeatures,
Map executionFeatures,
- Map dataQuality
+ Map dataQuality,
+ Map labelInputs
) {
+
+ public ReplayClockTick {
+ contextFeatures = immutable(contextFeatures);
+ setupFeatures = immutable(setupFeatures);
+ triggerFeatures = immutable(triggerFeatures);
+ executionFeatures = immutable(executionFeatures);
+ dataQuality = immutable(dataQuality);
+ labelInputs = immutable(labelInputs);
+ }
+
+ private static Map immutable(Map value) {
+ if (value == null || value.isEmpty()) {
+ return Map.of();
+ }
+ return Map.copyOf(new LinkedHashMap<>(value));
+ }
}
diff --git a/src/main/java/com/quantai/trader/replay/ReplayMarketEventReader.java b/src/main/java/com/quantai/trader/replay/ReplayMarketEventReader.java
index 8c317a9..39aaea8 100644
--- a/src/main/java/com/quantai/trader/replay/ReplayMarketEventReader.java
+++ b/src/main/java/com/quantai/trader/replay/ReplayMarketEventReader.java
@@ -4,6 +4,8 @@ import java.util.List;
public interface ReplayMarketEventReader {
+ boolean supports(ReplayRunConfig config);
+
void validateReadable(ReplayRunConfig config);
List readTicks(ReplayRunConfig config);
diff --git a/src/main/java/com/quantai/trader/replay/ReplayRunService.java b/src/main/java/com/quantai/trader/replay/ReplayRunService.java
index ffc2c33..2d7db82 100644
--- a/src/main/java/com/quantai/trader/replay/ReplayRunService.java
+++ b/src/main/java/com/quantai/trader/replay/ReplayRunService.java
@@ -32,7 +32,7 @@ public class ReplayRunService {
private final TraderPlaybookCatalog catalog;
private final ReplayRunRepository repository;
private final ReplayReportWriter reportWriter;
- private final ReplayMarketEventReader eventReader;
+ private final List eventReaders;
private final TraderDecisionCycleRunner cycleRunner;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable, "trader-replay-worker");
@@ -44,13 +44,13 @@ public class ReplayRunService {
TraderPlaybookCatalog catalog,
ReplayRunRepository repository,
ReplayReportWriter reportWriter,
- ReplayMarketEventReader eventReader,
+ List eventReaders,
TraderDecisionCycleRunner cycleRunner
) {
this.catalog = catalog;
this.repository = repository;
this.reportWriter = reportWriter;
- this.eventReader = eventReader;
+ this.eventReaders = List.copyOf(eventReaders);
this.cycleRunner = cycleRunner;
}
@@ -58,7 +58,7 @@ public class ReplayRunService {
validateRequest(request);
TraderPlaybookDefinitionSnapshot playbook = catalog.require(request.playbookId(), request.playbookVersion());
request.dataSources().forEach((sourceType, spec) -> validateDataSource(request, sourceType, spec));
- eventReader.validateReadable(request);
+ readerFor(request).validateReadable(request);
String runId = Ids.runId(Instant.now());
ReplayRunConfig config = request.withRunId(runId);
@@ -114,7 +114,7 @@ public class ReplayRunService {
playbook.playbookVersion(),
ReplayRunStatus.RUNNING
);
- List ticks = eventReader.readTicks(run.config());
+ List ticks = readerFor(run.config()).readTicks(run.config());
List results = new ArrayList<>(ticks.size());
TraderRuntimeState runtimeState = new TraderRuntimeState(
run.runId(),
@@ -186,6 +186,16 @@ public class ReplayRunService {
.orElseThrow(() -> new IllegalStateException("replay run disappeared: " + runId));
}
+ private ReplayMarketEventReader readerFor(ReplayRunConfig config) {
+ return eventReaders.stream()
+ .filter(reader -> reader.supports(config))
+ .findFirst()
+ .orElseThrow(() -> new TraderException(
+ TraderErrorCode.TRADER_DATA_SOURCE_MISSING,
+ "no replay reader supports the requested dataSources"
+ ));
+ }
+
private void validateDataSource(ReplayRunConfig request, String sourceType, DataSourceSpec spec) {
if (spec.timezone() == null || spec.timezone().isBlank()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source timezone is required: " + sourceType);
diff --git a/src/main/java/com/quantai/trader/report/ReplayReportWriter.java b/src/main/java/com/quantai/trader/report/ReplayReportWriter.java
index 8f552d8..92aacb3 100644
--- a/src/main/java/com/quantai/trader/report/ReplayReportWriter.java
+++ b/src/main/java/com/quantai/trader/report/ReplayReportWriter.java
@@ -2,6 +2,7 @@ package com.quantai.trader.report;
import com.quantai.trader.domain.TraderReplayReport;
import com.quantai.trader.brain.TraderCycleResult;
+import com.quantai.trader.domain.TraderTrainingSample;
import com.quantai.trader.persistence.ReplayReportRepository;
import com.quantai.trader.playbook.TraderPlaybookDefinitionSnapshot;
import com.quantai.trader.replay.ReplayRunConfig;
@@ -11,8 +12,10 @@ import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
@Component
public class ReplayReportWriter {
@@ -30,13 +33,25 @@ public class ReplayReportWriter {
) {
int actionCount = (int) results.stream().filter(result -> result.action() != null).count();
int sampleCount = (int) results.stream().filter(result -> result.sample() != null).count();
+ SampleAudit audit = audit(results);
int monthsCovered = Math.max(1, (int) ChronoUnit.MONTHS.between(
config.from().atZone(java.time.ZoneOffset.UTC).withDayOfMonth(1),
config.to().atZone(java.time.ZoneOffset.UTC).withDayOfMonth(1)
));
- List failureRisks = actionCount == 0
- ? List.of("no_action_generated", "proxy_only_execution")
- : List.of("proxy_only_execution");
+ List failureRisks = failureRisks(actionCount, audit);
+ Map auditReport = new LinkedHashMap<>();
+ auditReport.put("replayEngine", replayEngine(config));
+ auditReport.put("tickCount", results.size());
+ auditReport.put("sampleCount", sampleCount);
+ auditReport.put("actionCount", actionCount);
+ auditReport.put("labeledSampleCount", audit.labeledSampleCount());
+ auditReport.put("proxyOnlySampleCount", audit.proxyOnlySampleCount());
+ auditReport.put("positiveNetReturnCount", audit.positiveNetReturnCount());
+ auditReport.put("negativeNetReturnCount", audit.negativeNetReturnCount());
+ auditReport.put("missingNetReturnCount", audit.missingNetReturnCount());
+ putIfPresent(auditReport, "meanNetReturnBps1x", audit.meanNetReturnBps1x());
+ putIfPresent(auditReport, "meanNetReturnBps10x", audit.meanNetReturnBps10x());
+ auditReport.put("labelStatusDistribution", audit.labelStatusDistribution());
TraderReplayReport report = new TraderReplayReport(
config.runId(),
Ids.reportId(config.runId()),
@@ -45,21 +60,94 @@ public class ReplayReportWriter {
playbook.playbookVersion(),
actionCount,
monthsCovered,
- BigDecimal.ZERO,
- BigDecimal.ZERO,
- BigDecimal.ZERO,
- Map.of(
- "p0ReplayEngine", "jsonl_fixture",
- "tickCount", results.size(),
- "sampleCount", sampleCount,
- "actionCount", actionCount
- ),
+ audit.meanNetReturnBps1x(),
+ audit.meanNetReturnBps10x(),
+ null,
+ auditReport,
failureRisks,
- "P0_OBSERVE_ONLY",
+ audit.labeledSampleCount() > 0 ? "TRAINING_SAMPLE_AUDIT_ONLY" : "P0_OBSERVE_ONLY",
null,
Instant.now()
);
repository.insert(report);
return report;
}
+
+ private String replayEngine(ReplayRunConfig config) {
+ if (config.dataSources().containsKey("cryptoLakeReplay1m")) {
+ return "crypto_lake_1m_csv";
+ }
+ return "jsonl_fixture";
+ }
+
+ private List failureRisks(int actionCount, SampleAudit audit) {
+ java.util.ArrayList risks = new java.util.ArrayList<>();
+ if (actionCount == 0) {
+ risks.add("no_action_generated");
+ }
+ if (audit.labeledSampleCount() == 0) {
+ risks.add("no_replay_markout_labels");
+ }
+ if (audit.proxyOnlySampleCount() > 0) {
+ risks.add("proxy_only_samples_present");
+ }
+ if (audit.missingNetReturnCount() > 0) {
+ risks.add("missing_net_return_labels");
+ }
+ return risks;
+ }
+
+ private SampleAudit audit(List results) {
+ List samples = results.stream()
+ .map(TraderCycleResult::sample)
+ .filter(Objects::nonNull)
+ .toList();
+ int proxyOnly = (int) samples.stream().filter(TraderTrainingSample::proxyOnly).count();
+ int labeled = samples.size() - proxyOnly;
+ int missingNet = (int) samples.stream().filter(sample -> sample.netReturnBps1x() == null).count();
+ int positive = (int) samples.stream()
+ .filter(sample -> sample.netReturnBps1x() != null && sample.netReturnBps1x().compareTo(BigDecimal.ZERO) > 0)
+ .count();
+ int negative = (int) samples.stream()
+ .filter(sample -> sample.netReturnBps1x() != null && sample.netReturnBps1x().compareTo(BigDecimal.ZERO) < 0)
+ .count();
+ BigDecimal mean1x = mean(samples.stream()
+ .map(TraderTrainingSample::netReturnBps1x)
+ .filter(Objects::nonNull)
+ .toList());
+ BigDecimal mean10x = mean(samples.stream()
+ .map(TraderTrainingSample::netReturnBps10x)
+ .filter(Objects::nonNull)
+ .toList());
+ Map labelStatuses = samples.stream()
+ .map(sample -> String.valueOf(sample.labels().getOrDefault("label_status", "UNKNOWN")))
+ .collect(java.util.stream.Collectors.groupingBy(status -> status, LinkedHashMap::new, java.util.stream.Collectors.counting()));
+ return new SampleAudit(labeled, proxyOnly, positive, negative, missingNet, mean1x, mean10x, labelStatuses);
+ }
+
+ private BigDecimal mean(List values) {
+ if (values.isEmpty()) {
+ return null;
+ }
+ BigDecimal sum = values.stream().reduce(BigDecimal.ZERO, BigDecimal::add);
+ return sum.divide(BigDecimal.valueOf(values.size()), 8, java.math.RoundingMode.HALF_UP);
+ }
+
+ private void putIfPresent(Map target, String key, Object value) {
+ if (value != null) {
+ target.put(key, value);
+ }
+ }
+
+ private record SampleAudit(
+ int labeledSampleCount,
+ int proxyOnlySampleCount,
+ int positiveNetReturnCount,
+ int negativeNetReturnCount,
+ int missingNetReturnCount,
+ BigDecimal meanNetReturnBps1x,
+ BigDecimal meanNetReturnBps10x,
+ Map labelStatusDistribution
+ ) {
+ }
}
diff --git a/src/main/java/com/quantai/trader/sample/TrainingLabelSet.java b/src/main/java/com/quantai/trader/sample/TrainingLabelSet.java
new file mode 100644
index 0000000..9ed4aa5
--- /dev/null
+++ b/src/main/java/com/quantai/trader/sample/TrainingLabelSet.java
@@ -0,0 +1,12 @@
+package com.quantai.trader.sample;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+public record TrainingLabelSet(
+ Map labels,
+ BigDecimal netReturnBps1x,
+ BigDecimal netReturnBps10x,
+ boolean proxyOnly
+) {
+}
diff --git a/src/main/java/com/quantai/trader/sample/TrainingSampleExporter.java b/src/main/java/com/quantai/trader/sample/TrainingSampleExporter.java
index 016f8cc..ef0b39b 100644
--- a/src/main/java/com/quantai/trader/sample/TrainingSampleExporter.java
+++ b/src/main/java/com/quantai/trader/sample/TrainingSampleExporter.java
@@ -4,6 +4,7 @@ import com.quantai.trader.config.TraderProperties;
import com.quantai.trader.domain.PlaybookCandidate;
import com.quantai.trader.domain.TraderAction;
import com.quantai.trader.domain.TraderDecisionCycle;
+import com.quantai.trader.domain.TraderMarketSnapshot;
import com.quantai.trader.domain.TraderPositionPath;
import com.quantai.trader.domain.TraderTrainingSample;
import com.quantai.trader.persistence.TraderSampleRepository;
@@ -13,6 +14,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
+import java.util.LinkedHashMap;
import java.util.Map;
@Component
@@ -21,31 +23,23 @@ public class TrainingSampleExporter {
private static final Logger log = LoggerFactory.getLogger(TrainingSampleExporter.class);
private final TraderProperties properties;
private final TraderSampleRepository repository;
+ private final TriggerMarkoutLabeler labeler;
- public TrainingSampleExporter(TraderProperties properties, TraderSampleRepository repository) {
+ public TrainingSampleExporter(TraderProperties properties, TraderSampleRepository repository, TriggerMarkoutLabeler labeler) {
this.properties = properties;
this.repository = repository;
+ this.labeler = labeler;
}
public TraderTrainingSample export(
TraderDecisionCycle cycle,
+ TraderMarketSnapshot snapshot,
PlaybookCandidate candidate,
TraderAction action,
TraderPositionPath path
) {
- Map features = Map.of(
- "playbookId", candidate == null ? cycle.playbookId() : candidate.playbookId(),
- "playbookVersion", candidate == null ? cycle.playbookVersion() : candidate.playbookVersion(),
- "state", cycle.state().name(),
- "actionType", action == null ? "NONE" : action.actionType().name(),
- "proxyOnly", true
- );
- Map labels = Map.of(
- "trigger_acceptance", action != null,
- "target_before_stop", path != null && path.targetBeforeStop(),
- "stagnation_timeout_hit", path != null && path.stagnationTimeoutHit(),
- "best_counterfactual_action", action == null ? "WAIT" : action.actionType().name()
- );
+ TrainingLabelSet labelSet = labeler.label(snapshot, candidate, action, path);
+ Map features = features(cycle, snapshot, candidate, action);
TraderTrainingSample sample = new TraderTrainingSample(
cycle.runId(),
cycle.cycleId(),
@@ -56,13 +50,13 @@ public class TrainingSampleExporter {
properties.getLabelVersion(),
cycle.cycleTime(),
features,
- labels,
- BigDecimal.ZERO,
- BigDecimal.ZERO,
- true
+ labelSet.labels(),
+ labelSet.netReturnBps1x(),
+ labelSet.netReturnBps10x(),
+ labelSet.proxyOnly()
);
log.info(
- "event=trader.sample.export_start runId={} cycleId={} symbol={} playbookId={} playbookVersion={} state={} actionId={} positionId={} sampleId={} proxyOnly=true",
+ "event=trader.sample.export_start runId={} cycleId={} symbol={} playbookId={} playbookVersion={} state={} actionId={} positionId={} sampleId={} proxyOnly={} labelStatus={}",
cycle.runId(),
cycle.cycleId(),
cycle.symbol(),
@@ -71,11 +65,13 @@ public class TrainingSampleExporter {
cycle.state(),
sample.actionId(),
sample.positionId(),
- sample.sampleId()
+ sample.sampleId(),
+ sample.proxyOnly(),
+ sample.labels().get("label_status")
);
repository.insert(sample);
log.info(
- "event=trader.sample.exported runId={} cycleId={} symbol={} playbookId={} playbookVersion={} state={} actionId={} positionId={} sampleId={} proxyOnly=true",
+ "event=trader.sample.exported runId={} cycleId={} symbol={} playbookId={} playbookVersion={} state={} actionId={} positionId={} sampleId={} proxyOnly={} netReturnBps1x={}",
cycle.runId(),
cycle.cycleId(),
cycle.symbol(),
@@ -84,8 +80,35 @@ public class TrainingSampleExporter {
cycle.state(),
sample.actionId(),
sample.positionId(),
- sample.sampleId()
+ sample.sampleId(),
+ sample.proxyOnly(),
+ sample.netReturnBps1x()
);
return sample;
}
+
+ private Map features(
+ TraderDecisionCycle cycle,
+ TraderMarketSnapshot snapshot,
+ PlaybookCandidate candidate,
+ TraderAction action
+ ) {
+ Map features = new LinkedHashMap<>();
+ features.put("playbookId", candidate == null ? cycle.playbookId() : candidate.playbookId());
+ features.put("playbookVersion", candidate == null ? cycle.playbookVersion() : candidate.playbookVersion());
+ features.put("state", cycle.state().name());
+ features.put("actionType", action == null ? "NONE" : action.actionType().name());
+ if (candidate != null) {
+ features.put("candidateSide", candidate.side().name());
+ features.put("candidateVariant", candidate.variant());
+ }
+ if (snapshot != null) {
+ features.put("context", snapshot.contextFeatures());
+ features.put("setup", snapshot.setupFeatures());
+ features.put("trigger", snapshot.triggerFeatures());
+ features.put("execution", snapshot.executionFeatures());
+ features.put("dataQuality", snapshot.dataQuality());
+ }
+ return features;
+ }
}
diff --git a/src/main/java/com/quantai/trader/sample/TriggerMarkoutLabeler.java b/src/main/java/com/quantai/trader/sample/TriggerMarkoutLabeler.java
new file mode 100644
index 0000000..e4d1d02
--- /dev/null
+++ b/src/main/java/com/quantai/trader/sample/TriggerMarkoutLabeler.java
@@ -0,0 +1,90 @@
+package com.quantai.trader.sample;
+
+import com.quantai.trader.domain.PlaybookCandidate;
+import com.quantai.trader.domain.TraderAction;
+import com.quantai.trader.domain.TraderMarketSnapshot;
+import com.quantai.trader.domain.TraderPositionPath;
+import org.springframework.stereotype.Component;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+@Component
+public class TriggerMarkoutLabeler {
+
+ private static final MathContext MC = new MathContext(16, RoundingMode.HALF_UP);
+ private static final BigDecimal TAKER_FEE_ROUND_TRIP_BPS = new BigDecimal("8.0");
+
+ public TrainingLabelSet label(
+ TraderMarketSnapshot snapshot,
+ PlaybookCandidate candidate,
+ TraderAction action,
+ TraderPositionPath path
+ ) {
+ Map labels = new LinkedHashMap<>();
+ labels.put("label_family", "TRIGGER_MARKOUT");
+ labels.put("trigger_acceptance", action != null);
+ labels.put("target_before_stop", path != null && path.targetBeforeStop());
+ labels.put("stagnation_timeout_hit", path != null && path.stagnationTimeoutHit());
+ labels.put("action_type", action == null ? "NONE" : action.actionType().name());
+ if (candidate != null) {
+ labels.put("candidate_side", candidate.side().name());
+ }
+
+ if (snapshot == null || snapshot.labelInputs().isEmpty()) {
+ labels.put("label_status", "PROXY_ONLY_NO_REPLAY_LABEL");
+ labels.put("best_counterfactual_action", action == null ? "WAIT" : action.actionType().name());
+ return new TrainingLabelSet(labels, null, null, true);
+ }
+
+ Map labelInputs = snapshot.labelInputs();
+ labelInputs.forEach((key, value) -> labels.put("replay_" + key, value));
+ String labelStatus = String.valueOf(labelInputs.getOrDefault("labelStatus", "UNKNOWN"));
+ labels.put("label_status", labelStatus);
+ labels.put("best_counterfactual_action", counterfactualAction(labelInputs));
+
+ BigDecimal netReturn1x = netReturn1x(labelInputs);
+ BigDecimal netReturn10x = netReturn1x == null
+ ? null
+ : netReturn1x.multiply(BigDecimal.TEN, MC).setScale(8, RoundingMode.HALF_UP);
+ boolean proxyOnly = !"REPLAY_MARKOUT_LABELED".equals(labelStatus);
+ return new TrainingLabelSet(labels, netReturn1x, netReturn10x, proxyOnly);
+ }
+
+ private String counterfactualAction(Map labelInputs) {
+ BigDecimal netReturn = netReturn1x(labelInputs);
+ if (netReturn == null) {
+ return "WAIT";
+ }
+ return netReturn.compareTo(BigDecimal.ZERO) > 0 ? "OPEN_INITIAL" : "WAIT";
+ }
+
+ private BigDecimal netReturn1x(Map labelInputs) {
+ BigDecimal markout15m = decimal(labelInputs.get("markoutBps15m"));
+ BigDecimal expectedSlippage = decimal(labelInputs.get("expectedSlippageBps"));
+ if (markout15m == null || expectedSlippage == null) {
+ return null;
+ }
+ // TriggerMarkout is a market-path label. Round-trip taker fee and
+ // level_1 expected slippage keep the label cost-aware without pretending
+ // we have real App fill feedback.
+ BigDecimal executionCost = TAKER_FEE_ROUND_TRIP_BPS.add(expectedSlippage.multiply(BigDecimal.valueOf(2), MC), MC);
+ return markout15m.subtract(executionCost, MC).setScale(8, RoundingMode.HALF_UP);
+ }
+
+ private BigDecimal decimal(Object value) {
+ if (value instanceof BigDecimal decimal) {
+ return decimal;
+ }
+ if (value instanceof Number number) {
+ return BigDecimal.valueOf(number.doubleValue());
+ }
+ if (value instanceof String text && !text.isBlank()) {
+ return new BigDecimal(text);
+ }
+ return null;
+ }
+}
diff --git a/src/test/java/com/quantai/trader/TestFixtures.java b/src/test/java/com/quantai/trader/TestFixtures.java
index 643c452..4c10918 100644
--- a/src/test/java/com/quantai/trader/TestFixtures.java
+++ b/src/test/java/com/quantai/trader/TestFixtures.java
@@ -5,6 +5,7 @@ import com.quantai.trader.domain.PlaybookCandidate;
import com.quantai.trader.domain.TraderAction;
import com.quantai.trader.domain.TraderDecisionCycle;
import com.quantai.trader.domain.TraderEntryPlan;
+import com.quantai.trader.domain.TraderMarketSnapshot;
import com.quantai.trader.domain.TraderPositionPath;
import com.quantai.trader.domain.TraderPricePlan;
import com.quantai.trader.domain.TriggerDecision;
@@ -15,6 +16,7 @@ import com.quantai.trader.enums.TraderState;
import java.math.BigDecimal;
import java.time.Instant;
+import java.util.List;
import java.util.Map;
public final class TestFixtures {
@@ -72,6 +74,35 @@ public final class TestFixtures {
);
}
+ public static TraderMarketSnapshot labeledSnapshot() {
+ return new TraderMarketSnapshot(
+ "trader_run_test",
+ "trader_cycle_test",
+ "trader_snapshot_test",
+ "BTCUSDT",
+ NOW,
+ "trader_feature_v0",
+ Map.of("contextPass", true),
+ Map.of("setupPass", true, "side", "LONG"),
+ Map.of("triggerScore", "0.95"),
+ Map.of("lastPrice", "65010"),
+ Map.of("missing_features", List.of()),
+ Map.ofEntries(
+ Map.entry("labelSource", "CRYPTO_LAKE_1M_REPLAY"),
+ Map.entry("labelStatus", "REPLAY_MARKOUT_LABELED"),
+ Map.entry("side", "LONG"),
+ Map.entry("entryPrice", "65000"),
+ Map.entry("markoutBps1m", "5"),
+ Map.entry("markoutBps5m", "12"),
+ Map.entry("markoutBps15m", "24"),
+ Map.entry("mfeBps15m", "30"),
+ Map.entry("maeBps15m", "6"),
+ Map.entry("targetBeforeStop15m", true),
+ Map.entry("expectedSlippageBps", "1")
+ )
+ );
+ }
+
public static TriggerDecision strongTrigger() {
return new TriggerDecision(true, new BigDecimal("0.95"), "TRIGGER_ACCEPTED", null, Map.of());
}
diff --git a/src/test/java/com/quantai/trader/brain/TraderDecisionCycleRunnerTest.java b/src/test/java/com/quantai/trader/brain/TraderDecisionCycleRunnerTest.java
index f9c25c4..9d751ca 100644
--- a/src/test/java/com/quantai/trader/brain/TraderDecisionCycleRunnerTest.java
+++ b/src/test/java/com/quantai/trader/brain/TraderDecisionCycleRunnerTest.java
@@ -12,6 +12,7 @@ import org.springframework.boot.test.context.SpringBootTest;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Map;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -24,13 +25,15 @@ class TraderDecisionCycleRunnerTest {
@Test
void exportsSampleForHappyPathReplayTick() {
+ String runId = "trader_run_runner_" + UUID.randomUUID().toString().replace("-", "").substring(0, 12);
ReplayClockTick tick = new ReplayClockTick(
- "trader_run_runner",
+ runId,
"BTCUSDT",
Instant.parse("2026-06-23T12:00:00Z"),
Map.of("contextPass", true),
Map.of(
"setupPass", true,
+ "side", "LONG",
"entryPrice", new BigDecimal("65000"),
"invalidPrice", new BigDecimal("64800"),
"stopPrice", new BigDecimal("64920"),
@@ -39,13 +42,14 @@ class TraderDecisionCycleRunnerTest {
),
Map.of("triggerScore", "0.95"),
Map.of("lastPrice", "65010"),
+ Map.of(),
Map.of()
);
TraderCycleResult result = runner.runReplayTick(
tick,
new TraderRuntimeState(
- "trader_run_runner",
+ runId,
TraderRunMode.REPLAY,
"BREAKOUT_RETEST_CONTINUATION",
"2026-06-22.p0"
@@ -64,9 +68,10 @@ class TraderDecisionCycleRunnerTest {
"BTCUSDT",
Instant.parse("2026-06-23T12:00:00Z"),
Map.of("contextPass", true),
- Map.of("setupPass", true),
+ Map.of("setupPass", true, "side", "LONG"),
Map.of("triggerScore", "0.95"),
Map.of("lastPrice", "65010"),
+ Map.of(),
Map.of()
);
diff --git a/src/test/java/com/quantai/trader/controller/TraderControllerTest.java b/src/test/java/com/quantai/trader/controller/TraderControllerTest.java
index d57363b..83164b4 100644
--- a/src/test/java/com/quantai/trader/controller/TraderControllerTest.java
+++ b/src/test/java/com/quantai/trader/controller/TraderControllerTest.java
@@ -88,7 +88,7 @@ class TraderControllerTest {
mockMvc.perform(get("/api/trader/replay/runs/{runId}/report", runId))
.andExpect(status().isOk())
.andExpect(jsonPath("$.candidateEvents").value(1))
- .andExpect(jsonPath("$.strictVsLoose.p0ReplayEngine").value("jsonl_fixture"));
+ .andExpect(jsonPath("$.strictVsLoose.replayEngine").value("jsonl_fixture"));
}
@Test
diff --git a/src/test/java/com/quantai/trader/persistence/MybatisReplayPersistenceTest.java b/src/test/java/com/quantai/trader/persistence/MybatisReplayPersistenceTest.java
index 320dbe2..ee0988d 100644
--- a/src/test/java/com/quantai/trader/persistence/MybatisReplayPersistenceTest.java
+++ b/src/test/java/com/quantai/trader/persistence/MybatisReplayPersistenceTest.java
@@ -47,7 +47,7 @@ class MybatisReplayPersistenceTest {
assertThat(run.status()).isEqualTo(ReplayRunStatus.COMPLETED);
assertThat(reportRepository.findByRunId(run.runId())).isPresent()
.get()
- .extracting(report -> report.strictVsLoose().get("p0ReplayEngine"))
+ .extracting(report -> report.strictVsLoose().get("replayEngine"))
.isEqualTo("jsonl_fixture");
var samples = sampleRepository.findByRunId(run.runId());
diff --git a/src/test/java/com/quantai/trader/position/TraderPositionManagerTest.java b/src/test/java/com/quantai/trader/position/TraderPositionManagerTest.java
index 3e08e47..6974aa1 100644
--- a/src/test/java/com/quantai/trader/position/TraderPositionManagerTest.java
+++ b/src/test/java/com/quantai/trader/position/TraderPositionManagerTest.java
@@ -56,6 +56,7 @@ class TraderPositionManagerTest {
Map.of(),
Map.of(),
Map.of("lastPrice", "65010"),
+ Map.of(),
Map.of()
);
}
diff --git a/src/test/java/com/quantai/trader/replay/CryptoLakeReplayCsvMarketEventReaderTest.java b/src/test/java/com/quantai/trader/replay/CryptoLakeReplayCsvMarketEventReaderTest.java
new file mode 100644
index 0000000..cc79423
--- /dev/null
+++ b/src/test/java/com/quantai/trader/replay/CryptoLakeReplayCsvMarketEventReaderTest.java
@@ -0,0 +1,76 @@
+package com.quantai.trader.replay;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.core.io.ClassPathResource;
+
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class CryptoLakeReplayCsvMarketEventReaderTest {
+
+ private final CryptoLakeReplayCsvMarketEventReader reader = new CryptoLakeReplayCsvMarketEventReader();
+
+ @Test
+ void readsCandidateTicksAndRecomputesReplayMarkoutLabels() throws Exception {
+ ReplayRunConfig config = config();
+
+ var ticks = reader.readTicks(config);
+
+ assertThat(ticks).hasSize(2);
+ assertThat(ticks.getFirst().setupFeatures())
+ .containsEntry("setupPass", true)
+ .containsEntry("side", "LONG")
+ .containsEntry("entryPrice", "100");
+ assertThat(ticks.getFirst().labelInputs())
+ .containsEntry("labelStatus", "REPLAY_MARKOUT_LABELED")
+ .containsEntry("side", "LONG")
+ .containsEntry("markoutBps15m", "200");
+ assertThat(ticks.get(1).setupFeatures())
+ .containsEntry("side", "SHORT");
+ assertThat(ticks.get(1).labelInputs())
+ .containsEntry("labelStatus", "REPLAY_MARKOUT_LABELED")
+ .containsEntry("side", "SHORT");
+ }
+
+ private ReplayRunConfig config() throws Exception {
+ Path replay = Path.of(new ClassPathResource("replay-fixtures/crypto-lake-replay-mini.csv").getFile().toURI());
+ Path candidates = Path.of(new ClassPathResource("replay-fixtures/crypto-lake-candidate-events-mini.csv").getFile().toURI());
+ return new ReplayRunConfig(
+ "trader_run_csv_test",
+ "BTCUSDT",
+ "BREAKOUT_RETEST_CONTINUATION",
+ "2026-06-22.p0",
+ Instant.parse("2026-01-01T00:00:00Z"),
+ Instant.parse("2026-01-01T00:05:00Z"),
+ "trader_feature_v0",
+ "trader_label_v0",
+ Map.of(
+ "cryptoLakeReplay1m", new DataSourceSpec(
+ "crypto-lake-mini",
+ replay.toString(),
+ "fixture-hash-not-used",
+ null,
+ 18L,
+ null,
+ null,
+ "UTC",
+ Map.of()
+ ),
+ "candidateEvents", new DataSourceSpec(
+ "candidate-events-mini",
+ candidates.toString(),
+ "fixture-hash-not-used",
+ null,
+ 2L,
+ null,
+ null,
+ "UTC",
+ Map.of()
+ )
+ )
+ );
+ }
+}
diff --git a/src/test/java/com/quantai/trader/replay/TraderReplayFixtureAcceptanceTest.java b/src/test/java/com/quantai/trader/replay/TraderReplayFixtureAcceptanceTest.java
index 49227b7..39353c4 100644
--- a/src/test/java/com/quantai/trader/replay/TraderReplayFixtureAcceptanceTest.java
+++ b/src/test/java/com/quantai/trader/replay/TraderReplayFixtureAcceptanceTest.java
@@ -44,15 +44,18 @@ class TraderReplayFixtureAcceptanceTest {
assertThat(run.status()).as(fixture.fileName()).isEqualTo(ReplayRunStatus.COMPLETED);
TraderReplayReport report = reportRepository.findByRunId(run.runId()).orElseThrow();
assertThat(report.strictVsLoose())
- .containsEntry("p0ReplayEngine", "jsonl_fixture")
+ .containsEntry("replayEngine", "jsonl_fixture")
.containsEntry("tickCount", 1)
.containsEntry("sampleCount", 1)
- .containsEntry("actionCount", fixture.expectedActionCount());
+ .containsEntry("actionCount", fixture.expectedActionCount())
+ .containsEntry("labeledSampleCount", 0)
+ .containsEntry("proxyOnlySampleCount", 1);
assertThat(report.candidateEvents()).isEqualTo(fixture.expectedActionCount());
var samples = sampleRepository.findByRunId(run.runId());
assertThat(samples).hasSize(1);
assertThat(samples.getFirst().features()).containsEntry("actionType", fixture.expectedSampleActionType());
+ assertThat(samples.getFirst().labels()).containsEntry("label_status", "PROXY_ONLY_NO_REPLAY_LABEL");
}
}
diff --git a/src/test/java/com/quantai/trader/sample/TrainingSampleExporterTest.java b/src/test/java/com/quantai/trader/sample/TrainingSampleExporterTest.java
index 1c317ae..dc766dc 100644
--- a/src/test/java/com/quantai/trader/sample/TrainingSampleExporterTest.java
+++ b/src/test/java/com/quantai/trader/sample/TrainingSampleExporterTest.java
@@ -13,20 +13,23 @@ import static org.assertj.core.api.Assertions.assertThat;
class TrainingSampleExporterTest {
@Test
- void exportsProxyOnlySampleWithFeatureAndLabelVersions() {
+ void exportsReplayMarkoutSampleWithFeatureAndLabelVersions() {
CapturingSampleRepository repository = new CapturingSampleRepository();
- TrainingSampleExporter exporter = new TrainingSampleExporter(TestFixtures.properties(), repository);
+ TrainingSampleExporter exporter = new TrainingSampleExporter(TestFixtures.properties(), repository, new TriggerMarkoutLabeler());
var sample = exporter.export(
TestFixtures.cycle(com.quantai.trader.enums.TraderState.SAMPLE_EXPORTED),
+ TestFixtures.labeledSnapshot(),
TestFixtures.candidate(),
TestFixtures.action(),
TestFixtures.openedPath(false)
);
- assertThat(sample.proxyOnly()).isTrue();
+ assertThat(sample.proxyOnly()).isFalse();
assertThat(sample.featureVersion()).isEqualTo("trader_feature_v0");
assertThat(sample.labelVersion()).isEqualTo("trader_label_v0");
+ assertThat(sample.netReturnBps1x()).isEqualByComparingTo("14.00000000");
+ assertThat(sample.labels()).containsEntry("label_status", "REPLAY_MARKOUT_LABELED");
assertThat(repository.findByRunId("trader_run_test")).hasSize(1);
}
diff --git a/src/test/resources/replay-fixtures/crypto-lake-candidate-events-mini.csv b/src/test/resources/replay-fixtures/crypto-lake-candidate-events-mini.csv
new file mode 100644
index 0000000..10b4740
--- /dev/null
+++ b/src/test/resources/replay-fixtures/crypto-lake-candidate-events-mini.csv
@@ -0,0 +1,3 @@
+event_id,bar_time,signal_type,direction,source_service,symbol,old_fusion_score
+mini-long-1,1767225600000,BREAKOUT_RETEST_CONTINUATION,LONG,TEST_CANDIDATE_EVENT,BTCUSDT,0.95
+mini-short-1,1767225720000,BREAKOUT_RETEST_CONTINUATION,SHORT,TEST_CANDIDATE_EVENT,BTCUSDT,0.90
diff --git a/src/test/resources/replay-fixtures/crypto-lake-replay-mini.csv b/src/test/resources/replay-fixtures/crypto-lake-replay-mini.csv
new file mode 100644
index 0000000..ba26988
--- /dev/null
+++ b/src/test/resources/replay-fixtures/crypto-lake-replay-mini.csv
@@ -0,0 +1,19 @@
+symbol,timeframe,open_time,open,high,low,close,volume,taker_buy_volume,funding_bps,open_interest,best_bid_price,best_ask_price,observed_spread_bps,observed_slippage_bps,expected_slippage_bps,p95_latency_ms,source_coverage
+BTCUSDT,1m,2026-01-01T00:00:00Z,100.0,100.4,99.8,100.0,10,5,0.1,1000,99.99,100.01,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:01:00Z,100.0,100.7,99.9,100.4,11,6,0.1,1001,100.39,100.41,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:02:00Z,100.4,100.6,100.0,100.2,12,7,0.1,1002,100.19,100.21,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:03:00Z,100.2,100.8,100.1,100.5,13,8,0.1,1003,100.49,100.51,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:04:00Z,100.5,101.0,100.2,100.8,14,9,0.1,1004,100.79,100.81,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:05:00Z,100.8,101.3,100.5,101.0,15,10,0.1,1005,100.99,101.01,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:06:00Z,101.0,101.6,100.7,101.2,16,11,0.1,1006,101.19,101.21,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:07:00Z,101.2,101.7,100.8,101.3,17,12,0.1,1007,101.29,101.31,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:08:00Z,101.3,101.8,100.9,101.4,18,13,0.1,1008,101.39,101.41,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:09:00Z,101.4,101.9,101.0,101.5,19,14,0.1,1009,101.49,101.51,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:10:00Z,101.5,102.0,101.1,101.6,20,15,0.1,1010,101.59,101.61,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:11:00Z,101.6,102.1,101.2,101.7,21,16,0.1,1011,101.69,101.71,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:12:00Z,101.7,102.2,101.3,101.8,22,17,0.1,1012,101.79,101.81,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:13:00Z,101.8,102.3,101.4,101.9,23,18,0.1,1013,101.89,101.91,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:14:00Z,101.9,102.4,101.5,102.0,24,19,0.1,1014,101.99,102.01,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:15:00Z,102.0,102.5,101.6,102.0,25,20,0.1,1015,101.99,102.01,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:16:00Z,102.0,102.1,99.5,99.8,26,6,0.1,1016,99.79,99.81,2,1,1,30,1
+BTCUSDT,1m,2026-01-01T00:17:00Z,99.8,100.0,98.8,99.0,27,5,0.1,1017,98.99,99.01,2,1,1,30,1
diff --git a/src/test/resources/replay-fixtures/false-breakout-trigger-wait.jsonl b/src/test/resources/replay-fixtures/false-breakout-trigger-wait.jsonl
index 7121d40..08286d8 100644
--- a/src/test/resources/replay-fixtures/false-breakout-trigger-wait.jsonl
+++ b/src/test/resources/replay-fixtures/false-breakout-trigger-wait.jsonl
@@ -1 +1 @@
-{"eventTime":"2026-01-01T03:00:00Z","contextFeatures":{"contextPass":true,"trendRegime":"UP","marketStructure":"weak_breakout_retest","timeframeAlignment":"1h_up_4h_flat"},"setupFeatures":{"setupPass":true,"setupName":"breakout_retest_continuation","entryPrice":"65120","invalidPrice":"64980","stopPrice":"65040","targetPrice":"65450","executionQualityScore":"0.76","volumeImpulse":"0.88","retestHold":true},"triggerFeatures":{"triggerScore":"0.32","triggerName":"false_breakout_probe","breakoutFollowThrough":false},"executionFeatures":{"lastPrice":"65105","spreadBps":"1.4","bookImbalance":"0.49"},"dataQuality":{"missing_features":[]}}
+{"eventTime":"2026-01-01T03:00:00Z","contextFeatures":{"contextPass":true,"trendRegime":"UP","marketStructure":"weak_breakout_retest","timeframeAlignment":"1h_up_4h_flat"},"setupFeatures":{"setupPass":true,"setupName":"breakout_retest_continuation","side":"LONG","entryPrice":"65120","invalidPrice":"64980","stopPrice":"65040","targetPrice":"65450","executionQualityScore":"0.76","volumeImpulse":"0.88","retestHold":true},"triggerFeatures":{"triggerScore":"0.32","triggerName":"false_breakout_probe","breakoutFollowThrough":false},"executionFeatures":{"lastPrice":"65105","spreadBps":"1.4","bookImbalance":"0.49"},"dataQuality":{"missing_features":[]}}
diff --git a/src/test/resources/replay-fixtures/incomplete-entry-plan-hard-fail.jsonl b/src/test/resources/replay-fixtures/incomplete-entry-plan-hard-fail.jsonl
index ef14beb..a6ba6a5 100644
--- a/src/test/resources/replay-fixtures/incomplete-entry-plan-hard-fail.jsonl
+++ b/src/test/resources/replay-fixtures/incomplete-entry-plan-hard-fail.jsonl
@@ -1 +1 @@
-{"eventTime":"2026-01-01T05:00:00Z","contextFeatures":{"contextPass":true,"trendRegime":"UP","marketStructure":"breakout_retest"},"setupFeatures":{"setupPass":true,"setupName":"breakout_retest_continuation","executionQualityScore":"0.90"},"triggerFeatures":{"triggerScore":"0.95","triggerName":"micro_continuation"},"executionFeatures":{"lastPrice":"65300"},"dataQuality":{"missing_features":[]}}
+{"eventTime":"2026-01-01T05:00:00Z","contextFeatures":{"contextPass":true,"trendRegime":"UP","marketStructure":"breakout_retest"},"setupFeatures":{"setupPass":true,"setupName":"breakout_retest_continuation","side":"LONG","executionQualityScore":"0.90"},"triggerFeatures":{"triggerScore":"0.95","triggerName":"micro_continuation"},"executionFeatures":{"lastPrice":"65300"},"dataQuality":{"missing_features":[]}}
diff --git a/src/test/resources/replay-fixtures/missing-features-data-quality.jsonl b/src/test/resources/replay-fixtures/missing-features-data-quality.jsonl
index 6b02b7d..060ee44 100644
--- a/src/test/resources/replay-fixtures/missing-features-data-quality.jsonl
+++ b/src/test/resources/replay-fixtures/missing-features-data-quality.jsonl
@@ -1 +1 @@
-{"eventTime":"2026-01-01T04:00:00Z","contextFeatures":{"contextPass":true,"trendRegime":"UP","marketStructure":"breakout_retest"},"setupFeatures":{"setupPass":true,"setupName":"breakout_retest_continuation","entryPrice":"65200","invalidPrice":"65020","stopPrice":"65100","targetPrice":"65580","executionQualityScore":"0.84"},"triggerFeatures":{"triggerScore":"0.91","triggerName":"micro_continuation"},"executionFeatures":{"lastPrice":"65210"},"dataQuality":{"missing_features":["level_1.best_bid","level_1.best_ask"]}}
+{"eventTime":"2026-01-01T04:00:00Z","contextFeatures":{"contextPass":true,"trendRegime":"UP","marketStructure":"breakout_retest"},"setupFeatures":{"setupPass":true,"setupName":"breakout_retest_continuation","side":"LONG","entryPrice":"65200","invalidPrice":"65020","stopPrice":"65100","targetPrice":"65580","executionQualityScore":"0.84"},"triggerFeatures":{"triggerScore":"0.91","triggerName":"micro_continuation"},"executionFeatures":{"lastPrice":"65210"},"dataQuality":{"missing_features":["level_1.best_bid","level_1.best_ask"]}}
diff --git a/src/test/resources/replay-fixtures/trend-up-breakout-happy.jsonl b/src/test/resources/replay-fixtures/trend-up-breakout-happy.jsonl
index 91e4369..444da3e 100644
--- a/src/test/resources/replay-fixtures/trend-up-breakout-happy.jsonl
+++ b/src/test/resources/replay-fixtures/trend-up-breakout-happy.jsonl
@@ -1 +1 @@
-{"eventTime":"2026-01-01T00:00:00Z","contextFeatures":{"contextPass":true,"trendRegime":"UP","marketStructure":"breakout_retest","timeframeAlignment":"1h_4h_up"},"setupFeatures":{"setupPass":true,"setupName":"breakout_retest_continuation","entryPrice":"65000","invalidPrice":"64800","stopPrice":"64920","targetPrice":"65350","executionQualityScore":"0.92","volumeImpulse":"1.48","retestHold":true},"triggerFeatures":{"triggerScore":"0.95","triggerName":"micro_continuation","breakoutFollowThrough":true},"executionFeatures":{"lastPrice":"65010","spreadBps":"0.8","bookImbalance":"0.57"},"dataQuality":{"missing_features":[]}}
+{"eventTime":"2026-01-01T00:00:00Z","contextFeatures":{"contextPass":true,"trendRegime":"UP","marketStructure":"breakout_retest","timeframeAlignment":"1h_4h_up"},"setupFeatures":{"setupPass":true,"setupName":"breakout_retest_continuation","side":"LONG","entryPrice":"65000","invalidPrice":"64800","stopPrice":"64920","targetPrice":"65350","executionQualityScore":"0.92","volumeImpulse":"1.48","retestHold":true},"triggerFeatures":{"triggerScore":"0.95","triggerName":"micro_continuation","breakoutFollowThrough":true},"executionFeatures":{"lastPrice":"65010","spreadBps":"0.8","bookImbalance":"0.57"},"dataQuality":{"missing_features":[]}}