Initial quant trader service baseline

This commit is contained in:
Codex
2026-06-23 22:09:06 +08:00
commit 7ff786f658
137 changed files with 6664 additions and 0 deletions
@@ -0,0 +1,22 @@
package com.quantai.trader.replay;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.Map;
public record DataSourceSpec(
@JsonProperty("sourceId")
String sourceId,
String path,
@JsonProperty("hashSha256")
String hashSha256,
@JsonProperty("schemaHashSha256")
String schemaHashSha256,
Long rowCount,
Instant minEventTime,
Instant maxEventTime,
String timezone,
Map<String, Object> missingSummary
) {
}
@@ -0,0 +1,106 @@
package com.quantai.trader.replay;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quantai.trader.domain.TraderException;
import com.quantai.trader.enums.TraderErrorCode;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@Component
public class JsonlReplayMarketEventReader implements ReplayMarketEventReader {
private final ObjectMapper objectMapper;
public JsonlReplayMarketEventReader() {
this.objectMapper = new ObjectMapper().findAndRegisterModules();
}
@Override
public void validateReadable(ReplayRunConfig config) {
DataSourceSpec source = selectReplaySource(config);
if (source.path() == null || source.path().isBlank()) {
throw new TraderException(
TraderErrorCode.TRADER_DATA_SOURCE_MISSING,
"replay source path is required"
);
}
Path path = Path.of(source.path());
if (!Files.isRegularFile(path) || !Files.isReadable(path)) {
throw new TraderException(
TraderErrorCode.TRADER_DATA_SOURCE_MISSING,
"replay source is not readable: " + source.path()
);
}
}
@Override
public List<ReplayClockTick> readTicks(ReplayRunConfig config) {
DataSourceSpec source = selectReplaySource(config);
Path path = Path.of(source.path());
validateReadable(config);
try (var lines = Files.lines(path)) {
List<ReplayClockTick> ticks = lines
.map(String::trim)
.filter(line -> !line.isEmpty())
.map(line -> parseLine(config, line))
.sorted(Comparator.comparing(ReplayClockTick::eventTime))
.toList();
if (ticks.isEmpty()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "replay source produced no ticks");
}
return ticks;
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "failed to read replay source: " + ex.getMessage());
}
}
private ReplayClockTick parseLine(ReplayRunConfig config, String line) {
try {
ReplayTickFixture fixture = objectMapper.readValue(line, ReplayTickFixture.class);
if (fixture.eventTime() == null) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "replay tick eventTime is required");
}
Instant eventTime = Instant.parse(fixture.eventTime());
return new ReplayClockTick(
config.runId(),
config.symbol(),
eventTime,
fixture.contextFeatures(),
fixture.setupFeatures(),
fixture.triggerFeatures(),
fixture.executionFeatures(),
fixture.dataQuality()
);
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "invalid replay tick json: " + ex.getMessage());
}
}
private DataSourceSpec selectReplaySource(ReplayRunConfig config) {
DataSourceSpec explicit = config.dataSources().get("ticks");
if (explicit != null) {
return explicit;
}
throw new TraderException(
TraderErrorCode.TRADER_DATA_SOURCE_MISSING,
"P0 replay requires dataSources.ticks"
);
}
public record ReplayTickFixture(
String eventTime,
Map<String, Object> contextFeatures,
Map<String, Object> setupFeatures,
Map<String, Object> triggerFeatures,
Map<String, Object> executionFeatures,
Map<String, Object> dataQuality
) {
}
}
@@ -0,0 +1,16 @@
package com.quantai.trader.replay;
import java.time.Instant;
import java.util.Map;
public record ReplayClockTick(
String runId,
String symbol,
Instant eventTime,
Map<String, Object> contextFeatures,
Map<String, Object> setupFeatures,
Map<String, Object> triggerFeatures,
Map<String, Object> executionFeatures,
Map<String, Object> dataQuality
) {
}
@@ -0,0 +1,10 @@
package com.quantai.trader.replay;
import java.util.List;
public interface ReplayMarketEventReader {
void validateReadable(ReplayRunConfig config);
List<ReplayClockTick> readTicks(ReplayRunConfig config);
}
@@ -0,0 +1,44 @@
package com.quantai.trader.replay;
import com.quantai.trader.enums.ReplayRunStatus;
import java.time.Instant;
public record ReplayRun(
String runId,
ReplayRunStatus status,
ReplayRunConfig config,
String playbookDefinitionHash,
Instant createdAt,
Instant startedAt,
Instant finishedAt,
String failureReason
) {
public ReplayRun withStatus(ReplayRunStatus nextStatus) {
Instant now = Instant.now();
return new ReplayRun(
runId,
nextStatus,
config,
playbookDefinitionHash,
createdAt,
nextStatus == ReplayRunStatus.RUNNING ? now : startedAt,
nextStatus == ReplayRunStatus.COMPLETED || nextStatus == ReplayRunStatus.CANCELLED || nextStatus == ReplayRunStatus.FAILED ? now : finishedAt,
failureReason
);
}
public ReplayRun failed(String reason) {
return new ReplayRun(
runId,
ReplayRunStatus.FAILED,
config,
playbookDefinitionHash,
createdAt,
startedAt,
Instant.now(),
reason
);
}
}
@@ -0,0 +1,34 @@
package com.quantai.trader.replay;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.Map;
public record ReplayRunConfig(
String runId,
String symbol,
String playbookId,
String playbookVersion,
Instant from,
Instant to,
String featureVersion,
String labelVersion,
@JsonProperty("dataSources")
Map<String, DataSourceSpec> dataSources
) {
public ReplayRunConfig withRunId(String nextRunId) {
return new ReplayRunConfig(
nextRunId,
symbol,
playbookId,
playbookVersion,
from,
to,
featureVersion,
labelVersion,
Map.copyOf(dataSources)
);
}
}
@@ -0,0 +1,9 @@
package com.quantai.trader.replay;
import com.quantai.trader.enums.ReplayRunStatus;
public record ReplayRunResponse(
String runId,
ReplayRunStatus status
) {
}
@@ -0,0 +1,215 @@
package com.quantai.trader.replay;
import com.quantai.trader.brain.TraderCycleResult;
import com.quantai.trader.brain.TraderDecisionCycleRunner;
import com.quantai.trader.domain.TraderDataSourceManifest;
import com.quantai.trader.domain.TraderException;
import com.quantai.trader.enums.ReplayRunStatus;
import com.quantai.trader.enums.TraderErrorCode;
import com.quantai.trader.enums.TraderRunMode;
import com.quantai.trader.persistence.ReplayRunRepository;
import com.quantai.trader.playbook.TraderPlaybookCatalog;
import com.quantai.trader.playbook.TraderPlaybookDefinitionSnapshot;
import com.quantai.trader.report.ReplayReportWriter;
import com.quantai.trader.state.TraderRuntimeState;
import com.quantai.trader.util.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Service
public class ReplayRunService {
private static final Logger log = LoggerFactory.getLogger(ReplayRunService.class);
private final TraderPlaybookCatalog catalog;
private final ReplayRunRepository repository;
private final ReplayReportWriter reportWriter;
private final ReplayMarketEventReader eventReader;
private final TraderDecisionCycleRunner cycleRunner;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable, "trader-replay-worker");
thread.setDaemon(true);
return thread;
});
public ReplayRunService(
TraderPlaybookCatalog catalog,
ReplayRunRepository repository,
ReplayReportWriter reportWriter,
ReplayMarketEventReader eventReader,
TraderDecisionCycleRunner cycleRunner
) {
this.catalog = catalog;
this.repository = repository;
this.reportWriter = reportWriter;
this.eventReader = eventReader;
this.cycleRunner = cycleRunner;
}
public ReplayRunResponse createRun(ReplayRunConfig request) {
validateRequest(request);
TraderPlaybookDefinitionSnapshot playbook = catalog.require(request.playbookId(), request.playbookVersion());
request.dataSources().forEach((sourceType, spec) -> validateDataSource(request, sourceType, spec));
eventReader.validateReadable(request);
String runId = Ids.runId(Instant.now());
ReplayRunConfig config = request.withRunId(runId);
ReplayRun run = new ReplayRun(
runId,
ReplayRunStatus.CREATED,
config,
playbook.definitionHashSha256(),
Instant.now(),
null,
null,
null
);
repository.insert(run);
log.info(
"event=trader.replay.created runId={} symbol={} playbookId={} playbookVersion={} status={}",
runId,
config.symbol(),
config.playbookId(),
config.playbookVersion(),
ReplayRunStatus.CREATED
);
executorService.submit(() -> execute(run, playbook));
return new ReplayRunResponse(runId, ReplayRunStatus.CREATED);
}
public Optional<ReplayRun> find(String runId) {
return repository.findByRunId(runId);
}
public ReplayRun cancel(String runId) {
ReplayRun run = repository.findByRunId(runId)
.orElseThrow(() -> new IllegalArgumentException("replay run not found: " + runId));
if (run.status() == ReplayRunStatus.COMPLETED
|| run.status() == ReplayRunStatus.CANCELLED
|| run.status() == ReplayRunStatus.FAILED) {
return run;
}
ReplayRun cancelled = run.withStatus(ReplayRunStatus.CANCEL_REQUESTED);
repository.update(cancelled);
log.info("event=trader.replay.cancel_requested runId={} status={}", runId, cancelled.status());
return cancelled;
}
private void execute(ReplayRun run, TraderPlaybookDefinitionSnapshot playbook) {
try {
repository.update(run.withStatus(ReplayRunStatus.RUNNING));
log.info(
"event=trader.replay.start runId={} symbol={} playbookId={} playbookVersion={} status={}",
run.runId(),
run.config().symbol(),
playbook.playbookId(),
playbook.playbookVersion(),
ReplayRunStatus.RUNNING
);
List<ReplayClockTick> ticks = eventReader.readTicks(run.config());
List<TraderCycleResult> results = new ArrayList<>(ticks.size());
TraderRuntimeState runtimeState = new TraderRuntimeState(
run.runId(),
TraderRunMode.REPLAY,
playbook.playbookId(),
playbook.playbookVersion()
);
for (ReplayClockTick tick : ticks) {
ReplayRun current = currentRun(run.runId());
if (current.status() == ReplayRunStatus.CANCEL_REQUESTED) {
repository.update(current.withStatus(ReplayRunStatus.CANCELLED));
log.info("event=trader.replay.cancelled runId={} status={}", run.runId(), ReplayRunStatus.CANCELLED);
return;
}
results.add(cycleRunner.runReplayTick(tick, runtimeState));
}
reportWriter.writeReport(run.config(), playbook, results);
ReplayRun current = currentRun(run.runId());
repository.update(current.withStatus(ReplayRunStatus.COMPLETED));
log.info(
"event=trader.replay.completed runId={} symbol={} playbookId={} playbookVersion={} status={} tickCount={} resultCount={}",
run.runId(),
run.config().symbol(),
playbook.playbookId(),
playbook.playbookVersion(),
ReplayRunStatus.COMPLETED,
ticks.size(),
results.size()
);
} catch (RuntimeException ex) {
log.warn(
"event=trader.replay.failed_detected runId={} symbol={} playbookId={} playbookVersion={} status={} reason={}",
run.runId(),
run.config().symbol(),
playbook.playbookId(),
playbook.playbookVersion(),
ReplayRunStatus.FAILED,
ex.getMessage(),
ex
);
try {
repository.update(run.failed(ex.toString()));
} catch (RuntimeException updateFailure) {
log.error(
"event=trader.replay.failed_status_update_failed runId={} originalReason={} updateReason={}",
run.runId(),
ex.getMessage(),
updateFailure.getMessage(),
updateFailure
);
}
}
}
private void validateRequest(ReplayRunConfig request) {
if (request.symbol() == null || request.symbol().isBlank()
|| request.playbookId() == null || request.playbookId().isBlank()
|| request.playbookVersion() == null || request.playbookVersion().isBlank()
|| request.from() == null
|| request.to() == null
|| request.dataSources() == null
|| request.dataSources().isEmpty()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "replay request is missing required lineage fields");
}
}
private ReplayRun currentRun(String runId) {
return repository.findByRunId(runId)
.orElseThrow(() -> new IllegalStateException("replay run disappeared: " + runId));
}
private void validateDataSource(ReplayRunConfig request, String sourceType, DataSourceSpec spec) {
if (spec.timezone() == null || spec.timezone().isBlank()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source timezone is required: " + sourceType);
}
if (spec.missingSummary() == null) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source missingSummary is required: " + sourceType);
}
new TraderDataSourceManifest(
spec.sourceId(),
request.symbol(),
sourceType,
"BINANCE",
sourceType.equals("candles") ? "1m" : "event",
spec.path(),
spec.hashSha256(),
spec.schemaHashSha256(),
request.from(),
request.to(),
spec.minEventTime(),
spec.maxEventTime(),
spec.timezone(),
spec.rowCount(),
spec.missingSummary(),
"P0_ACCEPTED"
);
}
}