Rewrite trader service for V4 P0

This commit is contained in:
Codex
2026-06-26 21:53:22 +08:00
parent 2fe4077164
commit 5d210053d0
184 changed files with 2780 additions and 6945 deletions
@@ -1,498 +0,0 @@
package com.quantai.trader.replay;
import com.quantai.trader.domain.TraderException;
import com.quantai.trader.enums.TraderErrorCode;
import com.quantai.trader.enums.TraderSide;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
@Component
public class CryptoLakeReplayCsvMarketEventReader implements ReplayMarketEventReader {
private static final Logger log = LoggerFactory.getLogger(CryptoLakeReplayCsvMarketEventReader.class);
private static final MathContext MC = new MathContext(16, RoundingMode.HALF_UP);
private static final String REPLAY_SOURCE_KEY = "cryptoLakeReplay1m";
private static final String CANDIDATE_SOURCE_KEY = "candidateEvents";
private static final BigDecimal LONG_INVALID_BPS = new BigDecimal("12.0");
private static final BigDecimal LONG_STOP_BPS = new BigDecimal("8.0");
private static final BigDecimal LONG_TARGET_BPS = new BigDecimal("30.0");
private static final BigDecimal SHORT_INVALID_BPS = new BigDecimal("12.0");
private static final BigDecimal SHORT_STOP_BPS = new BigDecimal("8.0");
private static final BigDecimal SHORT_TARGET_BPS = new BigDecimal("30.0");
@Override
public boolean supports(ReplayRunConfig config) {
DataSourceSpec source = config.dataSources() == null ? null : config.dataSources().get(REPLAY_SOURCE_KEY);
return source != null && source.path() != null && source.path().endsWith(".csv");
}
@Override
public void validateReadable(ReplayRunConfig config) {
validateSource(selectReplaySource(config), REPLAY_SOURCE_KEY);
DataSourceSpec candidateSource = config.dataSources().get(CANDIDATE_SOURCE_KEY);
if (candidateSource != null) {
validateSource(candidateSource, CANDIDATE_SOURCE_KEY);
}
}
@Override
public List<ReplayClockTick> readTicks(ReplayRunConfig config) {
validateReadable(config);
NavigableMap<Instant, MarketBar> bars = readReplayBars(config);
List<ReplayClockTick> ticks = config.dataSources().containsKey(CANDIDATE_SOURCE_KEY)
? readCandidateTicks(config, bars)
: readMarketAuditTicks(config, bars);
if (ticks.isEmpty()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "crypto lake replay csv produced no ticks");
}
log.info(
"event=trader.replay.crypto_lake_csv.loaded runId={} symbol={} tickCount={} candidateMode={}",
config.runId(),
config.symbol(),
ticks.size(),
config.dataSources().containsKey(CANDIDATE_SOURCE_KEY)
);
return ticks;
}
private NavigableMap<Instant, MarketBar> readReplayBars(ReplayRunConfig config) {
Path path = Path.of(selectReplaySource(config).path());
NavigableMap<Instant, MarketBar> bars = new TreeMap<>();
try (CSVParser parser = CSVParser.parse(path, java.nio.charset.StandardCharsets.UTF_8,
CSVFormat.DEFAULT.builder().setHeader().setSkipHeaderRecord(true).build())) {
for (CSVRecord record : parser) {
if (!config.symbol().equals(required(record, "symbol"))) {
continue;
}
if (!"1m".equals(required(record, "timeframe"))) {
continue;
}
MarketBar bar = marketBar(record);
bars.put(bar.openTime(), bar);
}
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "failed to read crypto lake replay csv: " + ex.getMessage());
}
if (bars.isEmpty()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "crypto lake replay csv has no rows for symbol: " + config.symbol());
}
return bars;
}
private List<ReplayClockTick> readMarketAuditTicks(ReplayRunConfig config, NavigableMap<Instant, MarketBar> bars) {
List<ReplayClockTick> ticks = new ArrayList<>();
List<MarketBar> ordered = List.copyOf(bars.values());
for (int i = 0; i < ordered.size(); i++) {
MarketBar bar = ordered.get(i);
if (outsideRunWindow(config, bar.openTime())) {
continue;
}
ticks.add(toTick(config, bar, null, labelInputs(ordered, i, null)));
}
return ticks.stream()
.sorted(Comparator.comparing(ReplayClockTick::eventTime))
.toList();
}
private List<ReplayClockTick> readCandidateTicks(ReplayRunConfig config, NavigableMap<Instant, MarketBar> bars) {
Path path = Path.of(config.dataSources().get(CANDIDATE_SOURCE_KEY).path());
List<MarketBar> ordered = List.copyOf(bars.values());
Map<Instant, Integer> indexByTime = new LinkedHashMap<>();
for (int i = 0; i < ordered.size(); i++) {
indexByTime.put(ordered.get(i).openTime(), i);
}
List<ReplayClockTick> ticks = new ArrayList<>();
try (CSVParser parser = CSVParser.parse(path, java.nio.charset.StandardCharsets.UTF_8,
CSVFormat.DEFAULT.builder().setHeader().setSkipHeaderRecord(true).build())) {
for (CSVRecord record : parser) {
if (!config.symbol().equals(required(record, "symbol"))) {
continue;
}
Instant candidateTime = Instant.ofEpochMilli(requiredLong(record, "bar_time"));
if (outsideRunWindow(config, candidateTime)) {
continue;
}
Map.Entry<Instant, MarketBar> entry = bars.ceilingEntry(candidateTime);
if (entry == null || outsideRunWindow(config, entry.getKey())) {
continue;
}
int barIndex = indexByTime.get(entry.getKey());
CandidateEvent event = candidateEvent(record, candidateTime);
ticks.add(toTick(config, entry.getValue(), event, labelInputs(ordered, barIndex, event.side())));
}
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "failed to read candidate events csv: " + ex.getMessage());
}
return ticks.stream()
.sorted(Comparator.comparing(ReplayClockTick::eventTime))
.toList();
}
private ReplayClockTick toTick(
ReplayRunConfig config,
MarketBar bar,
CandidateEvent candidate,
Map<String, Object> labelInputs
) {
List<String> missing = missingFeatures(bar);
Map<String, Object> context = new LinkedHashMap<>();
context.put("contextPass", missing.isEmpty());
context.put("replaySourceType", "CRYPTO_LAKE_1M_CSV");
putDecimal(context, "sourceCoverage", bar.sourceCoverage());
putDecimal(context, "fundingBps", bar.fundingBps());
putDecimal(context, "openInterest", bar.openInterest());
putDecimal(context, "volume", bar.volume());
Map<String, Object> setup = new LinkedHashMap<>();
setup.put("setupPass", candidate != null);
setup.put("setupName", candidate == null ? "market_audit_only" : "candidate_event_replay");
if (candidate != null) {
if (bar.close() == null) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "candidate event matched a replay bar without close price");
}
setup.put("candidateEventId", candidate.eventId());
setup.put("signalType", candidate.signalType());
setup.put("side", candidate.side().name());
setup.put("sourceService", candidate.sourceService());
putDecimal(setup, "entryPrice", bar.close());
putDecimal(setup, "invalidPrice", priceByBps(bar.close(), invalidBps(candidate.side()), adverseSign(candidate.side())));
putDecimal(setup, "stopPrice", priceByBps(bar.close(), stopBps(candidate.side()), adverseSign(candidate.side())));
putDecimal(setup, "targetPrice", priceByBps(bar.close(), targetBps(candidate.side()), favorableSign(candidate.side())));
putDecimal(setup, "executionQualityScore", executionQualityScore(bar));
}
Map<String, Object> trigger = new LinkedHashMap<>();
if (candidate != null && candidate.triggerScore() != null) {
putDecimal(trigger, "triggerScore", candidate.triggerScore());
}
trigger.put("replayTriggerSource", candidate == null ? "NONE" : "CANDIDATE_EVENT");
Map<String, Object> execution = new LinkedHashMap<>();
putDecimal(execution, "lastPrice", bar.close());
putDecimal(execution, "bestBidPrice", bar.bestBidPrice());
putDecimal(execution, "bestAskPrice", bar.bestAskPrice());
putDecimal(execution, "observedSpreadBps", bar.observedSpreadBps());
putDecimal(execution, "expectedSlippageBps", bar.expectedSlippageBps());
putDecimal(execution, "p95LatencyMs", bar.p95LatencyMs());
Map<String, Object> dataQuality = new LinkedHashMap<>();
dataQuality.put("missing_features", missing);
putDecimal(dataQuality, "sourceCoverage", bar.sourceCoverage());
dataQuality.put("replaySourcePath", selectReplaySource(config).path());
return new ReplayClockTick(
config.runId(),
config.symbol(),
bar.openTime(),
context,
setup,
trigger,
execution,
dataQuality,
labelInputs
);
}
private Map<String, Object> labelInputs(List<MarketBar> bars, int index, TraderSide side) {
Map<String, Object> labels = new LinkedHashMap<>();
labels.put("labelSource", "CRYPTO_LAKE_1M_REPLAY");
if (side == null) {
labels.put("labelStatus", "MARKET_AUDIT_NO_SIDE");
return labels;
}
MarketBar entry = bars.get(index);
labels.put("side", side.name());
putDecimal(labels, "entryPrice", entry.close());
putIfPresent(labels, "markoutBps1m", markout(bars, index, side, 1));
putIfPresent(labels, "markoutBps5m", markout(bars, index, side, 5));
putIfPresent(labels, "markoutBps15m", markout(bars, index, side, 15));
putIfPresent(labels, "mfeBps15m", mfe(bars, index, side, 15));
putIfPresent(labels, "maeBps15m", mae(bars, index, side, 15));
putIfPresent(labels, "targetBeforeStop15m", targetBeforeStop(bars, index, side, targetBps(side), stopBps(side), 15));
putDecimal(labels, "expectedSlippageBps", entry.expectedSlippageBps());
labels.put("labelStatus", hasMandatoryLabels(labels) ? "REPLAY_MARKOUT_LABELED" : "FUTURE_WINDOW_INCOMPLETE");
return labels;
}
private boolean hasMandatoryLabels(Map<String, Object> labels) {
return labels.containsKey("markoutBps1m")
&& labels.containsKey("markoutBps5m")
&& labels.containsKey("markoutBps15m");
}
private String markout(List<MarketBar> bars, int index, TraderSide side, int minutes) {
if (index + minutes >= bars.size()) {
return null;
}
BigDecimal entry = bars.get(index).close();
BigDecimal close = bars.get(index + minutes).close();
return decimalText(sideReturnBps(side, entry, close));
}
private String mfe(List<MarketBar> bars, int index, TraderSide side, int minutes) {
if (index + minutes >= bars.size()) {
return null;
}
BigDecimal entry = bars.get(index).close();
BigDecimal best = BigDecimal.ZERO;
for (int i = index + 1; i <= index + minutes; i++) {
BigDecimal favorable = side == TraderSide.LONG ? bars.get(i).high() : bars.get(i).low();
best = best.max(sideReturnBps(side, entry, favorable));
}
return decimalText(best.max(BigDecimal.ZERO));
}
private String mae(List<MarketBar> bars, int index, TraderSide side, int minutes) {
if (index + minutes >= bars.size()) {
return null;
}
BigDecimal entry = bars.get(index).close();
BigDecimal worst = BigDecimal.ZERO;
for (int i = index + 1; i <= index + minutes; i++) {
BigDecimal adverse = side == TraderSide.LONG ? bars.get(i).low() : bars.get(i).high();
BigDecimal signed = sideReturnBps(side, entry, adverse);
if (signed.compareTo(BigDecimal.ZERO) < 0) {
worst = worst.max(signed.abs());
}
}
return decimalText(worst);
}
private Boolean targetBeforeStop(List<MarketBar> bars, int index, TraderSide side, BigDecimal targetBps, BigDecimal stopBps, int minutes) {
if (index + minutes >= bars.size()) {
return null;
}
BigDecimal entry = bars.get(index).close();
BigDecimal target = priceByBps(entry, targetBps, favorableSign(side));
BigDecimal stop = priceByBps(entry, stopBps, adverseSign(side));
for (int i = index + 1; i <= index + minutes; i++) {
MarketBar bar = bars.get(i);
boolean targetHit = side == TraderSide.LONG
? bar.high().compareTo(target) >= 0
: bar.low().compareTo(target) <= 0;
boolean stopHit = side == TraderSide.LONG
? bar.low().compareTo(stop) <= 0
: bar.high().compareTo(stop) >= 0;
if (targetHit) {
return true;
}
if (stopHit) {
return false;
}
}
return false;
}
private MarketBar marketBar(CSVRecord record) {
return new MarketBar(
Instant.parse(required(record, "open_time")),
decimal(record, "open"),
decimal(record, "high"),
decimal(record, "low"),
decimal(record, "close"),
decimal(record, "volume"),
decimal(record, "taker_buy_volume"),
decimal(record, "funding_bps"),
decimal(record, "open_interest"),
decimal(record, "best_bid_price"),
decimal(record, "best_ask_price"),
decimal(record, "observed_spread_bps"),
decimal(record, "expected_slippage_bps"),
decimal(record, "p95_latency_ms"),
decimal(record, "source_coverage")
);
}
private CandidateEvent candidateEvent(CSVRecord record, Instant candidateTime) {
String side = required(record, "direction").toUpperCase();
return new CandidateEvent(
required(record, "event_id"),
candidateTime,
required(record, "signal_type"),
TraderSide.valueOf(side),
required(record, "source_service"),
firstDecimal(record, "old_fusion_score", "legacy_fusion_score")
);
}
private List<String> missingFeatures(MarketBar bar) {
List<String> missing = new ArrayList<>();
requirePresent(missing, "open", bar.open());
requirePresent(missing, "high", bar.high());
requirePresent(missing, "low", bar.low());
requirePresent(missing, "close", bar.close());
requirePresent(missing, "taker_buy_volume", bar.takerBuyVolume());
requirePresent(missing, "expected_slippage_bps", bar.expectedSlippageBps());
requirePresent(missing, "source_coverage", bar.sourceCoverage());
return missing;
}
private void requirePresent(List<String> missing, String field, BigDecimal value) {
if (value == null) {
missing.add(field);
}
}
private DataSourceSpec selectReplaySource(ReplayRunConfig config) {
DataSourceSpec source = config.dataSources().get(REPLAY_SOURCE_KEY);
if (source == null) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "dataSources.cryptoLakeReplay1m is required");
}
return source;
}
private void validateSource(DataSourceSpec source, String sourceType) {
if (source.path() == null || source.path().isBlank()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source path is required: " + sourceType);
}
Path path = Path.of(source.path());
if (!Files.isRegularFile(path) || !Files.isReadable(path)) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source is not readable: " + source.path());
}
}
private boolean outsideRunWindow(ReplayRunConfig config, Instant time) {
return time.isBefore(config.from()) || !time.isBefore(config.to());
}
private String required(CSVRecord record, String column) {
String value = record.get(column);
if (value == null || value.isBlank()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "csv column is required: " + column);
}
return value;
}
private long requiredLong(CSVRecord record, String column) {
return Long.parseLong(required(record, column));
}
private BigDecimal firstDecimal(CSVRecord record, String... columns) {
for (String column : columns) {
if (!record.isMapped(column)) {
continue;
}
BigDecimal value = decimal(record, column);
if (value != null) {
return value;
}
}
return null;
}
private BigDecimal decimal(CSVRecord record, String column) {
if (!record.isMapped(column)) {
return null;
}
String value = record.get(column);
if (value == null || value.isBlank()) {
return null;
}
return new BigDecimal(value);
}
private String decimalText(BigDecimal value) {
return value == null ? null : value.stripTrailingZeros().toPlainString();
}
private void putDecimal(Map<String, Object> target, String key, BigDecimal value) {
String text = decimalText(value);
if (text != null) {
target.put(key, text);
}
}
private void putIfPresent(Map<String, Object> target, String key, Object value) {
if (value != null) {
target.put(key, value);
}
}
private BigDecimal executionQualityScore(MarketBar bar) {
if (bar.expectedSlippageBps() == null) {
return null;
}
BigDecimal score = BigDecimal.ONE.subtract(bar.expectedSlippageBps().divide(new BigDecimal("20.0"), MC), MC);
return score.max(new BigDecimal("0.20")).min(BigDecimal.ONE).setScale(8, RoundingMode.HALF_UP);
}
private BigDecimal sideReturnBps(TraderSide side, BigDecimal entry, BigDecimal exit) {
BigDecimal gross = exit.subtract(entry, MC)
.divide(entry, MC)
.multiply(new BigDecimal("10000"), MC);
return side == TraderSide.LONG ? gross : gross.negate();
}
private BigDecimal priceByBps(BigDecimal entry, BigDecimal bps, int sign) {
BigDecimal multiplier = BigDecimal.ONE.add(BigDecimal.valueOf(sign).multiply(bps, MC).divide(new BigDecimal("10000"), MC), MC);
return entry.multiply(multiplier, MC).setScale(8, RoundingMode.HALF_UP);
}
private int favorableSign(TraderSide side) {
return side == TraderSide.LONG ? 1 : -1;
}
private int adverseSign(TraderSide side) {
return side == TraderSide.LONG ? -1 : 1;
}
private BigDecimal invalidBps(TraderSide side) {
return side == TraderSide.LONG ? LONG_INVALID_BPS : SHORT_INVALID_BPS;
}
private BigDecimal stopBps(TraderSide side) {
return side == TraderSide.LONG ? LONG_STOP_BPS : SHORT_STOP_BPS;
}
private BigDecimal targetBps(TraderSide side) {
return side == TraderSide.LONG ? LONG_TARGET_BPS : SHORT_TARGET_BPS;
}
private record MarketBar(
Instant openTime,
BigDecimal open,
BigDecimal high,
BigDecimal low,
BigDecimal close,
BigDecimal volume,
BigDecimal takerBuyVolume,
BigDecimal fundingBps,
BigDecimal openInterest,
BigDecimal bestBidPrice,
BigDecimal bestAskPrice,
BigDecimal observedSpreadBps,
BigDecimal expectedSlippageBps,
BigDecimal p95LatencyMs,
BigDecimal sourceCoverage
) {
}
private record CandidateEvent(
String eventId,
Instant barTime,
String signalType,
TraderSide side,
String sourceService,
BigDecimal triggerScore
) {
}
}
@@ -1,22 +0,0 @@
package com.quantai.trader.replay;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.Map;
public record DataSourceSpec(
@JsonProperty("sourceId")
String sourceId,
String path,
@JsonProperty("hashSha256")
String hashSha256,
@JsonProperty("schemaHashSha256")
String schemaHashSha256,
Long rowCount,
Instant minEventTime,
Instant maxEventTime,
String timezone,
Map<String, Object> missingSummary
) {
}
@@ -1,114 +0,0 @@
package com.quantai.trader.replay;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quantai.trader.domain.TraderException;
import com.quantai.trader.enums.TraderErrorCode;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@Component
public class JsonlReplayMarketEventReader implements ReplayMarketEventReader {
private final ObjectMapper objectMapper;
public JsonlReplayMarketEventReader() {
this.objectMapper = new ObjectMapper().findAndRegisterModules();
}
@Override
public boolean supports(ReplayRunConfig config) {
DataSourceSpec source = config.dataSources() == null ? null : config.dataSources().get("ticks");
return source != null && source.path() != null && source.path().endsWith(".jsonl");
}
@Override
public void validateReadable(ReplayRunConfig config) {
DataSourceSpec source = selectReplaySource(config);
if (source.path() == null || source.path().isBlank()) {
throw new TraderException(
TraderErrorCode.TRADER_DATA_SOURCE_MISSING,
"replay source path is required"
);
}
Path path = Path.of(source.path());
if (!Files.isRegularFile(path) || !Files.isReadable(path)) {
throw new TraderException(
TraderErrorCode.TRADER_DATA_SOURCE_MISSING,
"replay source is not readable: " + source.path()
);
}
}
@Override
public List<ReplayClockTick> readTicks(ReplayRunConfig config) {
DataSourceSpec source = selectReplaySource(config);
Path path = Path.of(source.path());
validateReadable(config);
try (var lines = Files.lines(path)) {
List<ReplayClockTick> ticks = lines
.map(String::trim)
.filter(line -> !line.isEmpty())
.map(line -> parseLine(config, line))
.sorted(Comparator.comparing(ReplayClockTick::eventTime))
.toList();
if (ticks.isEmpty()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "replay source produced no ticks");
}
return ticks;
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "failed to read replay source: " + ex.getMessage());
}
}
private ReplayClockTick parseLine(ReplayRunConfig config, String line) {
try {
ReplayTickFixture fixture = objectMapper.readValue(line, ReplayTickFixture.class);
if (fixture.eventTime() == null) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "replay tick eventTime is required");
}
Instant eventTime = Instant.parse(fixture.eventTime());
return new ReplayClockTick(
config.runId(),
config.symbol(),
eventTime,
fixture.contextFeatures(),
fixture.setupFeatures(),
fixture.triggerFeatures(),
fixture.executionFeatures(),
fixture.dataQuality(),
fixture.labelInputs() == null ? Map.of() : fixture.labelInputs()
);
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "invalid replay tick json: " + ex.getMessage());
}
}
private DataSourceSpec selectReplaySource(ReplayRunConfig config) {
DataSourceSpec explicit = config.dataSources().get("ticks");
if (explicit != null) {
return explicit;
}
throw new TraderException(
TraderErrorCode.TRADER_DATA_SOURCE_MISSING,
"P0 replay requires dataSources.ticks"
);
}
public record ReplayTickFixture(
String eventTime,
Map<String, Object> contextFeatures,
Map<String, Object> setupFeatures,
Map<String, Object> triggerFeatures,
Map<String, Object> executionFeatures,
Map<String, Object> dataQuality,
Map<String, Object> labelInputs
) {
}
}
@@ -1,34 +0,0 @@
package com.quantai.trader.replay;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Map;
public record ReplayClockTick(
String runId,
String symbol,
Instant eventTime,
Map<String, Object> contextFeatures,
Map<String, Object> setupFeatures,
Map<String, Object> triggerFeatures,
Map<String, Object> executionFeatures,
Map<String, Object> dataQuality,
Map<String, Object> labelInputs
) {
public ReplayClockTick {
contextFeatures = immutable(contextFeatures);
setupFeatures = immutable(setupFeatures);
triggerFeatures = immutable(triggerFeatures);
executionFeatures = immutable(executionFeatures);
dataQuality = immutable(dataQuality);
labelInputs = immutable(labelInputs);
}
private static Map<String, Object> immutable(Map<String, Object> value) {
if (value == null || value.isEmpty()) {
return Map.of();
}
return Map.copyOf(new LinkedHashMap<>(value));
}
}
@@ -0,0 +1,15 @@
package com.quantai.trader.replay;
import java.math.BigDecimal;
import java.time.Instant;
public record ReplayMarketEvent(
String runId,
String symbol,
Instant eventTime,
BigDecimal markPrice,
BigDecimal indexPrice,
BigDecimal spreadBps,
BigDecimal depthNotional5Bps
) {
}
@@ -1,12 +0,0 @@
package com.quantai.trader.replay;
import java.util.List;
public interface ReplayMarketEventReader {
boolean supports(ReplayRunConfig config);
void validateReadable(ReplayRunConfig config);
List<ReplayClockTick> readTicks(ReplayRunConfig config);
}
@@ -1,44 +0,0 @@
package com.quantai.trader.replay;
import com.quantai.trader.enums.ReplayRunStatus;
import java.time.Instant;
public record ReplayRun(
String runId,
ReplayRunStatus status,
ReplayRunConfig config,
String playbookDefinitionHash,
Instant createdAt,
Instant startedAt,
Instant finishedAt,
String failureReason
) {
public ReplayRun withStatus(ReplayRunStatus nextStatus) {
Instant now = Instant.now();
return new ReplayRun(
runId,
nextStatus,
config,
playbookDefinitionHash,
createdAt,
nextStatus == ReplayRunStatus.RUNNING ? now : startedAt,
nextStatus == ReplayRunStatus.COMPLETED || nextStatus == ReplayRunStatus.CANCELLED || nextStatus == ReplayRunStatus.FAILED ? now : finishedAt,
failureReason
);
}
public ReplayRun failed(String reason) {
return new ReplayRun(
runId,
ReplayRunStatus.FAILED,
config,
playbookDefinitionHash,
createdAt,
startedAt,
Instant.now(),
reason
);
}
}
@@ -1,34 +0,0 @@
package com.quantai.trader.replay;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.Map;
public record ReplayRunConfig(
String runId,
String symbol,
String playbookId,
String playbookVersion,
Instant from,
Instant to,
String featureVersion,
String labelVersion,
@JsonProperty("dataSources")
Map<String, DataSourceSpec> dataSources
) {
public ReplayRunConfig withRunId(String nextRunId) {
return new ReplayRunConfig(
nextRunId,
symbol,
playbookId,
playbookVersion,
from,
to,
featureVersion,
labelVersion,
Map.copyOf(dataSources)
);
}
}
@@ -1,9 +0,0 @@
package com.quantai.trader.replay;
import com.quantai.trader.enums.ReplayRunStatus;
public record ReplayRunResponse(
String runId,
ReplayRunStatus status
) {
}
@@ -1,225 +0,0 @@
package com.quantai.trader.replay;
import com.quantai.trader.brain.TraderCycleResult;
import com.quantai.trader.brain.TraderDecisionCycleRunner;
import com.quantai.trader.domain.TraderDataSourceManifest;
import com.quantai.trader.domain.TraderException;
import com.quantai.trader.enums.ReplayRunStatus;
import com.quantai.trader.enums.TraderErrorCode;
import com.quantai.trader.enums.TraderRunMode;
import com.quantai.trader.persistence.ReplayRunRepository;
import com.quantai.trader.playbook.TraderPlaybookCatalog;
import com.quantai.trader.playbook.TraderPlaybookDefinitionSnapshot;
import com.quantai.trader.report.ReplayReportWriter;
import com.quantai.trader.state.TraderRuntimeState;
import com.quantai.trader.util.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Service
public class ReplayRunService {
private static final Logger log = LoggerFactory.getLogger(ReplayRunService.class);
private final TraderPlaybookCatalog catalog;
private final ReplayRunRepository repository;
private final ReplayReportWriter reportWriter;
private final List<ReplayMarketEventReader> eventReaders;
private final TraderDecisionCycleRunner cycleRunner;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable, "trader-replay-worker");
thread.setDaemon(true);
return thread;
});
public ReplayRunService(
TraderPlaybookCatalog catalog,
ReplayRunRepository repository,
ReplayReportWriter reportWriter,
List<ReplayMarketEventReader> eventReaders,
TraderDecisionCycleRunner cycleRunner
) {
this.catalog = catalog;
this.repository = repository;
this.reportWriter = reportWriter;
this.eventReaders = List.copyOf(eventReaders);
this.cycleRunner = cycleRunner;
}
public ReplayRunResponse createRun(ReplayRunConfig request) {
validateRequest(request);
TraderPlaybookDefinitionSnapshot playbook = catalog.require(request.playbookId(), request.playbookVersion());
request.dataSources().forEach((sourceType, spec) -> validateDataSource(request, sourceType, spec));
readerFor(request).validateReadable(request);
String runId = Ids.runId(Instant.now());
ReplayRunConfig config = request.withRunId(runId);
ReplayRun run = new ReplayRun(
runId,
ReplayRunStatus.CREATED,
config,
playbook.definitionHashSha256(),
Instant.now(),
null,
null,
null
);
repository.insert(run);
log.info(
"event=trader.replay.created runId={} symbol={} playbookId={} playbookVersion={} status={}",
runId,
config.symbol(),
config.playbookId(),
config.playbookVersion(),
ReplayRunStatus.CREATED
);
executorService.submit(() -> execute(run, playbook));
return new ReplayRunResponse(runId, ReplayRunStatus.CREATED);
}
public Optional<ReplayRun> find(String runId) {
return repository.findByRunId(runId);
}
public ReplayRun cancel(String runId) {
ReplayRun run = repository.findByRunId(runId)
.orElseThrow(() -> new IllegalArgumentException("replay run not found: " + runId));
if (run.status() == ReplayRunStatus.COMPLETED
|| run.status() == ReplayRunStatus.CANCELLED
|| run.status() == ReplayRunStatus.FAILED) {
return run;
}
ReplayRun cancelled = run.withStatus(ReplayRunStatus.CANCEL_REQUESTED);
repository.update(cancelled);
log.info("event=trader.replay.cancel_requested runId={} status={}", runId, cancelled.status());
return cancelled;
}
private void execute(ReplayRun run, TraderPlaybookDefinitionSnapshot playbook) {
try {
repository.update(run.withStatus(ReplayRunStatus.RUNNING));
log.info(
"event=trader.replay.start runId={} symbol={} playbookId={} playbookVersion={} status={}",
run.runId(),
run.config().symbol(),
playbook.playbookId(),
playbook.playbookVersion(),
ReplayRunStatus.RUNNING
);
List<ReplayClockTick> ticks = readerFor(run.config()).readTicks(run.config());
List<TraderCycleResult> results = new ArrayList<>(ticks.size());
TraderRuntimeState runtimeState = new TraderRuntimeState(
run.runId(),
TraderRunMode.REPLAY,
playbook.playbookId(),
playbook.playbookVersion()
);
for (ReplayClockTick tick : ticks) {
ReplayRun current = currentRun(run.runId());
if (current.status() == ReplayRunStatus.CANCEL_REQUESTED) {
repository.update(current.withStatus(ReplayRunStatus.CANCELLED));
log.info("event=trader.replay.cancelled runId={} status={}", run.runId(), ReplayRunStatus.CANCELLED);
return;
}
results.add(cycleRunner.runReplayTick(tick, runtimeState));
}
reportWriter.writeReport(run.config(), playbook, results);
ReplayRun current = currentRun(run.runId());
repository.update(current.withStatus(ReplayRunStatus.COMPLETED));
log.info(
"event=trader.replay.completed runId={} symbol={} playbookId={} playbookVersion={} status={} tickCount={} resultCount={}",
run.runId(),
run.config().symbol(),
playbook.playbookId(),
playbook.playbookVersion(),
ReplayRunStatus.COMPLETED,
ticks.size(),
results.size()
);
} catch (RuntimeException ex) {
log.warn(
"event=trader.replay.failed_detected runId={} symbol={} playbookId={} playbookVersion={} status={} reason={}",
run.runId(),
run.config().symbol(),
playbook.playbookId(),
playbook.playbookVersion(),
ReplayRunStatus.FAILED,
ex.getMessage(),
ex
);
try {
repository.update(run.failed(ex.toString()));
} catch (RuntimeException updateFailure) {
log.error(
"event=trader.replay.failed_status_update_failed runId={} originalReason={} updateReason={}",
run.runId(),
ex.getMessage(),
updateFailure.getMessage(),
updateFailure
);
}
}
}
private void validateRequest(ReplayRunConfig request) {
if (request.symbol() == null || request.symbol().isBlank()
|| request.playbookId() == null || request.playbookId().isBlank()
|| request.playbookVersion() == null || request.playbookVersion().isBlank()
|| request.from() == null
|| request.to() == null
|| request.dataSources() == null
|| request.dataSources().isEmpty()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "replay request is missing required lineage fields");
}
}
private ReplayRun currentRun(String runId) {
return repository.findByRunId(runId)
.orElseThrow(() -> new IllegalStateException("replay run disappeared: " + runId));
}
private ReplayMarketEventReader readerFor(ReplayRunConfig config) {
return eventReaders.stream()
.filter(reader -> reader.supports(config))
.findFirst()
.orElseThrow(() -> new TraderException(
TraderErrorCode.TRADER_DATA_SOURCE_MISSING,
"no replay reader supports the requested dataSources"
));
}
private void validateDataSource(ReplayRunConfig request, String sourceType, DataSourceSpec spec) {
if (spec.timezone() == null || spec.timezone().isBlank()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source timezone is required: " + sourceType);
}
if (spec.missingSummary() == null) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source missingSummary is required: " + sourceType);
}
new TraderDataSourceManifest(
spec.sourceId(),
request.symbol(),
sourceType,
"BINANCE",
sourceType.equals("candles") ? "1m" : "event",
spec.path(),
spec.hashSha256(),
spec.schemaHashSha256(),
request.from(),
request.to(),
spec.minEventTime(),
spec.maxEventTime(),
spec.timezone(),
spec.rowCount(),
spec.missingSummary(),
"P0_ACCEPTED"
);
}
}
@@ -0,0 +1,14 @@
package com.quantai.trader.replay;
import com.quantai.trader.domain.TraderAction;
import com.quantai.trader.domain.TraderPositionManagerDecision;
import com.quantai.trader.domain.TraderRiskDecision;
public record TraderCycleResult(
String runId,
String cycleId,
TraderPositionManagerDecision pmDecision,
TraderRiskDecision riskDecision,
TraderAction action
) {
}
@@ -0,0 +1,108 @@
package com.quantai.trader.replay;
import com.quantai.trader.artifact.TraderArtifactBundle;
import com.quantai.trader.artifact.TraderArtifactLoader;
import com.quantai.trader.config.TraderProperties;
import com.quantai.trader.domain.*;
import com.quantai.trader.enums.PositionSide;
import com.quantai.trader.evidence.EvidenceAppender;
import com.quantai.trader.model.TraderModelService;
import com.quantai.trader.outbox.InMemoryOutboxRepository;
import com.quantai.trader.outbox.TraderOutboxEvent;
import com.quantai.trader.position.TraderPositionManager;
import com.quantai.trader.risk.RiskGateInput;
import com.quantai.trader.risk.RiskLimits;
import com.quantai.trader.risk.TraderRiskGate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Map;
@Service
public class TraderP0CycleRunner {
private static final Logger log = LoggerFactory.getLogger(TraderP0CycleRunner.class);
private final TraderProperties properties;
private final TraderArtifactLoader artifactLoader;
private final TraderModelService modelService;
private final TraderPositionManager positionManager;
private final TraderRiskGate riskGate;
private final TraderActionFactory actionFactory;
private final EvidenceAppender evidenceAppender;
private final InMemoryOutboxRepository outboxRepository;
public TraderP0CycleRunner(TraderProperties properties,
TraderArtifactLoader artifactLoader,
TraderModelService modelService,
TraderPositionManager positionManager,
TraderRiskGate riskGate,
TraderActionFactory actionFactory,
EvidenceAppender evidenceAppender,
InMemoryOutboxRepository outboxRepository) {
this.properties = properties;
this.artifactLoader = artifactLoader;
this.modelService = modelService;
this.positionManager = positionManager;
this.riskGate = riskGate;
this.actionFactory = actionFactory;
this.evidenceAppender = evidenceAppender;
this.outboxRepository = outboxRepository;
}
public TraderCycleResult runFlatCycle(ReplayMarketEvent event) {
String cycleId = "cycle_" + event.runId() + "_" + event.eventTime().toEpochMilli();
TraderArtifactBundle bundle = artifactLoader.loadActiveBundle();
TraderDecisionCycle cycle = new TraderDecisionCycle(event.runId(), cycleId, event.symbol(), event.eventTime(),
properties.runMode(), bundle.modelBundleVersion(), bundle.calibrationBundleVersion(), bundle.pmConfigVersion());
TraderMarketSnapshot snapshot = snapshot(event, cycleId);
evidenceAppender.append(cycle.runId(), cycle.cycleId(), "MARKET_SNAPSHOT", snapshot.dataReady(), "SNAPSHOT_BUILT", null, Map.of());
TraderModelOutput modelOutput = modelService.evaluate(snapshot, bundle);
evidenceAppender.append(cycle.runId(), cycle.cycleId(), "MODEL_OUTPUT", true, "MODEL_EVALUATED", null, Map.of("modelOutputId", modelOutput.modelOutputId()));
PositionManagerInput pmInput = new PositionManagerInput(cycle, snapshot, modelOutput,
flatPosition(cycle, snapshot), account(cycle), execution(cycle), bundle.pmConfig());
TraderPositionManagerDecision pmDecision = positionManager.decide(pmInput);
evidenceAppender.append(cycle.runId(), cycle.cycleId(), "PM_DECISION", true, pmDecision.reason(), null, Map.of("action", pmDecision.candidateAction().name()));
TraderRiskDecision riskDecision = riskGate.evaluate(new RiskGateInput(pmDecision, pmInput.positionState(), pmInput.accountState(),
pmInput.executionState(), snapshot, riskLimits()));
evidenceAppender.append(cycle.runId(), cycle.cycleId(), "RISK_DECISION", riskDecision.allowAction(), riskDecision.allowAction() ? "RISK_PASS" : riskDecision.blocker(), riskDecision.blocker(), Map.of());
TraderAction action = actionFactory.create(pmDecision, riskDecision, event.symbol());
outboxRepository.insert(new TraderOutboxEvent("outbox_" + action.actionId(), action.runId(), action.cycleId(),
"TRADER_ACTION", action.actionId(), "ACTION_CREATED", properties.runMode().name() + "_RECORDER",
Map.of("actionType", action.actionType().name()), action.idempotencyKey(), "PENDING", Instant.now()));
log.info("event=trader.cycle.completed runId={} cycleId={} action={} outbox=PENDING", action.runId(), action.cycleId(), action.actionType());
return new TraderCycleResult(cycle.runId(), cycle.cycleId(), pmDecision, riskDecision, action);
}
private TraderMarketSnapshot snapshot(ReplayMarketEvent event, String cycleId) {
return new TraderMarketSnapshot("snapshot_" + cycleId, event.runId(), cycleId, event.symbol(), event.eventTime(),
"feature-v4-p0", event.markPrice(), event.indexPrice(), event.spreadBps(), BigDecimal.ZERO,
event.depthNotional5Bps(), event.depthNotional5Bps(), event.depthNotional5Bps(),
event.depthNotional5Bps().compareTo(BigDecimal.ZERO) > 0, Map.of(), Map.of());
}
private TraderPositionState flatPosition(TraderDecisionCycle cycle, TraderMarketSnapshot snapshot) {
return new TraderPositionState("position_state_" + cycle.cycleId(), cycle.runId(), cycle.cycleId(), cycle.symbol(),
PositionSide.NONE, BigDecimal.ZERO, null, snapshot.markPrice(), BigDecimal.ZERO, new BigDecimal("1000"),
0, BigDecimal.ONE, null);
}
private TraderAccountState account(TraderDecisionCycle cycle) {
return new TraderAccountState("account_state_" + cycle.cycleId(), cycle.runId(), cycle.cycleId(),
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ONE, 0);
}
private TraderExecutionState execution(TraderDecisionCycle cycle) {
return new TraderExecutionState("execution_state_" + cycle.cycleId(), cycle.runId(), cycle.cycleId(), cycle.symbol(),
java.util.List.of(), new BigDecimal("1.5"), 10, 0, new BigDecimal("1"), new BigDecimal("4"),
new BigDecimal("5"), new BigDecimal("0.1"), new BigDecimal("0.001"), new BigDecimal("0.001"), BigDecimal.ONE);
}
private RiskLimits riskLimits() {
return new RiskLimits(properties.risk().maxDailyLossBps(), properties.risk().maxTotalExposureRatio(),
properties.risk().minLiquidationBufferBps(), properties.execution().maxApiErrorCount(),
properties.execution().maxExchangeLatencyMs(), false, false);
}
}