Add crypto lake replay labels

This commit is contained in:
Codex
2026-06-23 22:21:56 +08:00
parent 7ff786f658
commit 2fe4077164
28 changed files with 977 additions and 64 deletions
@@ -0,0 +1,498 @@
package com.quantai.trader.replay;
import com.quantai.trader.domain.TraderException;
import com.quantai.trader.enums.TraderErrorCode;
import com.quantai.trader.enums.TraderSide;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
@Component
public class CryptoLakeReplayCsvMarketEventReader implements ReplayMarketEventReader {
private static final Logger log = LoggerFactory.getLogger(CryptoLakeReplayCsvMarketEventReader.class);
private static final MathContext MC = new MathContext(16, RoundingMode.HALF_UP);
private static final String REPLAY_SOURCE_KEY = "cryptoLakeReplay1m";
private static final String CANDIDATE_SOURCE_KEY = "candidateEvents";
private static final BigDecimal LONG_INVALID_BPS = new BigDecimal("12.0");
private static final BigDecimal LONG_STOP_BPS = new BigDecimal("8.0");
private static final BigDecimal LONG_TARGET_BPS = new BigDecimal("30.0");
private static final BigDecimal SHORT_INVALID_BPS = new BigDecimal("12.0");
private static final BigDecimal SHORT_STOP_BPS = new BigDecimal("8.0");
private static final BigDecimal SHORT_TARGET_BPS = new BigDecimal("30.0");
@Override
public boolean supports(ReplayRunConfig config) {
DataSourceSpec source = config.dataSources() == null ? null : config.dataSources().get(REPLAY_SOURCE_KEY);
return source != null && source.path() != null && source.path().endsWith(".csv");
}
@Override
public void validateReadable(ReplayRunConfig config) {
validateSource(selectReplaySource(config), REPLAY_SOURCE_KEY);
DataSourceSpec candidateSource = config.dataSources().get(CANDIDATE_SOURCE_KEY);
if (candidateSource != null) {
validateSource(candidateSource, CANDIDATE_SOURCE_KEY);
}
}
@Override
public List<ReplayClockTick> readTicks(ReplayRunConfig config) {
validateReadable(config);
NavigableMap<Instant, MarketBar> bars = readReplayBars(config);
List<ReplayClockTick> ticks = config.dataSources().containsKey(CANDIDATE_SOURCE_KEY)
? readCandidateTicks(config, bars)
: readMarketAuditTicks(config, bars);
if (ticks.isEmpty()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "crypto lake replay csv produced no ticks");
}
log.info(
"event=trader.replay.crypto_lake_csv.loaded runId={} symbol={} tickCount={} candidateMode={}",
config.runId(),
config.symbol(),
ticks.size(),
config.dataSources().containsKey(CANDIDATE_SOURCE_KEY)
);
return ticks;
}
private NavigableMap<Instant, MarketBar> readReplayBars(ReplayRunConfig config) {
Path path = Path.of(selectReplaySource(config).path());
NavigableMap<Instant, MarketBar> bars = new TreeMap<>();
try (CSVParser parser = CSVParser.parse(path, java.nio.charset.StandardCharsets.UTF_8,
CSVFormat.DEFAULT.builder().setHeader().setSkipHeaderRecord(true).build())) {
for (CSVRecord record : parser) {
if (!config.symbol().equals(required(record, "symbol"))) {
continue;
}
if (!"1m".equals(required(record, "timeframe"))) {
continue;
}
MarketBar bar = marketBar(record);
bars.put(bar.openTime(), bar);
}
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "failed to read crypto lake replay csv: " + ex.getMessage());
}
if (bars.isEmpty()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "crypto lake replay csv has no rows for symbol: " + config.symbol());
}
return bars;
}
private List<ReplayClockTick> readMarketAuditTicks(ReplayRunConfig config, NavigableMap<Instant, MarketBar> bars) {
List<ReplayClockTick> ticks = new ArrayList<>();
List<MarketBar> ordered = List.copyOf(bars.values());
for (int i = 0; i < ordered.size(); i++) {
MarketBar bar = ordered.get(i);
if (outsideRunWindow(config, bar.openTime())) {
continue;
}
ticks.add(toTick(config, bar, null, labelInputs(ordered, i, null)));
}
return ticks.stream()
.sorted(Comparator.comparing(ReplayClockTick::eventTime))
.toList();
}
private List<ReplayClockTick> readCandidateTicks(ReplayRunConfig config, NavigableMap<Instant, MarketBar> bars) {
Path path = Path.of(config.dataSources().get(CANDIDATE_SOURCE_KEY).path());
List<MarketBar> ordered = List.copyOf(bars.values());
Map<Instant, Integer> indexByTime = new LinkedHashMap<>();
for (int i = 0; i < ordered.size(); i++) {
indexByTime.put(ordered.get(i).openTime(), i);
}
List<ReplayClockTick> ticks = new ArrayList<>();
try (CSVParser parser = CSVParser.parse(path, java.nio.charset.StandardCharsets.UTF_8,
CSVFormat.DEFAULT.builder().setHeader().setSkipHeaderRecord(true).build())) {
for (CSVRecord record : parser) {
if (!config.symbol().equals(required(record, "symbol"))) {
continue;
}
Instant candidateTime = Instant.ofEpochMilli(requiredLong(record, "bar_time"));
if (outsideRunWindow(config, candidateTime)) {
continue;
}
Map.Entry<Instant, MarketBar> entry = bars.ceilingEntry(candidateTime);
if (entry == null || outsideRunWindow(config, entry.getKey())) {
continue;
}
int barIndex = indexByTime.get(entry.getKey());
CandidateEvent event = candidateEvent(record, candidateTime);
ticks.add(toTick(config, entry.getValue(), event, labelInputs(ordered, barIndex, event.side())));
}
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "failed to read candidate events csv: " + ex.getMessage());
}
return ticks.stream()
.sorted(Comparator.comparing(ReplayClockTick::eventTime))
.toList();
}
private ReplayClockTick toTick(
ReplayRunConfig config,
MarketBar bar,
CandidateEvent candidate,
Map<String, Object> labelInputs
) {
List<String> missing = missingFeatures(bar);
Map<String, Object> context = new LinkedHashMap<>();
context.put("contextPass", missing.isEmpty());
context.put("replaySourceType", "CRYPTO_LAKE_1M_CSV");
putDecimal(context, "sourceCoverage", bar.sourceCoverage());
putDecimal(context, "fundingBps", bar.fundingBps());
putDecimal(context, "openInterest", bar.openInterest());
putDecimal(context, "volume", bar.volume());
Map<String, Object> setup = new LinkedHashMap<>();
setup.put("setupPass", candidate != null);
setup.put("setupName", candidate == null ? "market_audit_only" : "candidate_event_replay");
if (candidate != null) {
if (bar.close() == null) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "candidate event matched a replay bar without close price");
}
setup.put("candidateEventId", candidate.eventId());
setup.put("signalType", candidate.signalType());
setup.put("side", candidate.side().name());
setup.put("sourceService", candidate.sourceService());
putDecimal(setup, "entryPrice", bar.close());
putDecimal(setup, "invalidPrice", priceByBps(bar.close(), invalidBps(candidate.side()), adverseSign(candidate.side())));
putDecimal(setup, "stopPrice", priceByBps(bar.close(), stopBps(candidate.side()), adverseSign(candidate.side())));
putDecimal(setup, "targetPrice", priceByBps(bar.close(), targetBps(candidate.side()), favorableSign(candidate.side())));
putDecimal(setup, "executionQualityScore", executionQualityScore(bar));
}
Map<String, Object> trigger = new LinkedHashMap<>();
if (candidate != null && candidate.triggerScore() != null) {
putDecimal(trigger, "triggerScore", candidate.triggerScore());
}
trigger.put("replayTriggerSource", candidate == null ? "NONE" : "CANDIDATE_EVENT");
Map<String, Object> execution = new LinkedHashMap<>();
putDecimal(execution, "lastPrice", bar.close());
putDecimal(execution, "bestBidPrice", bar.bestBidPrice());
putDecimal(execution, "bestAskPrice", bar.bestAskPrice());
putDecimal(execution, "observedSpreadBps", bar.observedSpreadBps());
putDecimal(execution, "expectedSlippageBps", bar.expectedSlippageBps());
putDecimal(execution, "p95LatencyMs", bar.p95LatencyMs());
Map<String, Object> dataQuality = new LinkedHashMap<>();
dataQuality.put("missing_features", missing);
putDecimal(dataQuality, "sourceCoverage", bar.sourceCoverage());
dataQuality.put("replaySourcePath", selectReplaySource(config).path());
return new ReplayClockTick(
config.runId(),
config.symbol(),
bar.openTime(),
context,
setup,
trigger,
execution,
dataQuality,
labelInputs
);
}
private Map<String, Object> labelInputs(List<MarketBar> bars, int index, TraderSide side) {
Map<String, Object> labels = new LinkedHashMap<>();
labels.put("labelSource", "CRYPTO_LAKE_1M_REPLAY");
if (side == null) {
labels.put("labelStatus", "MARKET_AUDIT_NO_SIDE");
return labels;
}
MarketBar entry = bars.get(index);
labels.put("side", side.name());
putDecimal(labels, "entryPrice", entry.close());
putIfPresent(labels, "markoutBps1m", markout(bars, index, side, 1));
putIfPresent(labels, "markoutBps5m", markout(bars, index, side, 5));
putIfPresent(labels, "markoutBps15m", markout(bars, index, side, 15));
putIfPresent(labels, "mfeBps15m", mfe(bars, index, side, 15));
putIfPresent(labels, "maeBps15m", mae(bars, index, side, 15));
putIfPresent(labels, "targetBeforeStop15m", targetBeforeStop(bars, index, side, targetBps(side), stopBps(side), 15));
putDecimal(labels, "expectedSlippageBps", entry.expectedSlippageBps());
labels.put("labelStatus", hasMandatoryLabels(labels) ? "REPLAY_MARKOUT_LABELED" : "FUTURE_WINDOW_INCOMPLETE");
return labels;
}
private boolean hasMandatoryLabels(Map<String, Object> labels) {
return labels.containsKey("markoutBps1m")
&& labels.containsKey("markoutBps5m")
&& labels.containsKey("markoutBps15m");
}
private String markout(List<MarketBar> bars, int index, TraderSide side, int minutes) {
if (index + minutes >= bars.size()) {
return null;
}
BigDecimal entry = bars.get(index).close();
BigDecimal close = bars.get(index + minutes).close();
return decimalText(sideReturnBps(side, entry, close));
}
private String mfe(List<MarketBar> bars, int index, TraderSide side, int minutes) {
if (index + minutes >= bars.size()) {
return null;
}
BigDecimal entry = bars.get(index).close();
BigDecimal best = BigDecimal.ZERO;
for (int i = index + 1; i <= index + minutes; i++) {
BigDecimal favorable = side == TraderSide.LONG ? bars.get(i).high() : bars.get(i).low();
best = best.max(sideReturnBps(side, entry, favorable));
}
return decimalText(best.max(BigDecimal.ZERO));
}
private String mae(List<MarketBar> bars, int index, TraderSide side, int minutes) {
if (index + minutes >= bars.size()) {
return null;
}
BigDecimal entry = bars.get(index).close();
BigDecimal worst = BigDecimal.ZERO;
for (int i = index + 1; i <= index + minutes; i++) {
BigDecimal adverse = side == TraderSide.LONG ? bars.get(i).low() : bars.get(i).high();
BigDecimal signed = sideReturnBps(side, entry, adverse);
if (signed.compareTo(BigDecimal.ZERO) < 0) {
worst = worst.max(signed.abs());
}
}
return decimalText(worst);
}
private Boolean targetBeforeStop(List<MarketBar> bars, int index, TraderSide side, BigDecimal targetBps, BigDecimal stopBps, int minutes) {
if (index + minutes >= bars.size()) {
return null;
}
BigDecimal entry = bars.get(index).close();
BigDecimal target = priceByBps(entry, targetBps, favorableSign(side));
BigDecimal stop = priceByBps(entry, stopBps, adverseSign(side));
for (int i = index + 1; i <= index + minutes; i++) {
MarketBar bar = bars.get(i);
boolean targetHit = side == TraderSide.LONG
? bar.high().compareTo(target) >= 0
: bar.low().compareTo(target) <= 0;
boolean stopHit = side == TraderSide.LONG
? bar.low().compareTo(stop) <= 0
: bar.high().compareTo(stop) >= 0;
if (targetHit) {
return true;
}
if (stopHit) {
return false;
}
}
return false;
}
private MarketBar marketBar(CSVRecord record) {
return new MarketBar(
Instant.parse(required(record, "open_time")),
decimal(record, "open"),
decimal(record, "high"),
decimal(record, "low"),
decimal(record, "close"),
decimal(record, "volume"),
decimal(record, "taker_buy_volume"),
decimal(record, "funding_bps"),
decimal(record, "open_interest"),
decimal(record, "best_bid_price"),
decimal(record, "best_ask_price"),
decimal(record, "observed_spread_bps"),
decimal(record, "expected_slippage_bps"),
decimal(record, "p95_latency_ms"),
decimal(record, "source_coverage")
);
}
private CandidateEvent candidateEvent(CSVRecord record, Instant candidateTime) {
String side = required(record, "direction").toUpperCase();
return new CandidateEvent(
required(record, "event_id"),
candidateTime,
required(record, "signal_type"),
TraderSide.valueOf(side),
required(record, "source_service"),
firstDecimal(record, "old_fusion_score", "legacy_fusion_score")
);
}
private List<String> missingFeatures(MarketBar bar) {
List<String> missing = new ArrayList<>();
requirePresent(missing, "open", bar.open());
requirePresent(missing, "high", bar.high());
requirePresent(missing, "low", bar.low());
requirePresent(missing, "close", bar.close());
requirePresent(missing, "taker_buy_volume", bar.takerBuyVolume());
requirePresent(missing, "expected_slippage_bps", bar.expectedSlippageBps());
requirePresent(missing, "source_coverage", bar.sourceCoverage());
return missing;
}
private void requirePresent(List<String> missing, String field, BigDecimal value) {
if (value == null) {
missing.add(field);
}
}
private DataSourceSpec selectReplaySource(ReplayRunConfig config) {
DataSourceSpec source = config.dataSources().get(REPLAY_SOURCE_KEY);
if (source == null) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "dataSources.cryptoLakeReplay1m is required");
}
return source;
}
private void validateSource(DataSourceSpec source, String sourceType) {
if (source.path() == null || source.path().isBlank()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source path is required: " + sourceType);
}
Path path = Path.of(source.path());
if (!Files.isRegularFile(path) || !Files.isReadable(path)) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source is not readable: " + source.path());
}
}
private boolean outsideRunWindow(ReplayRunConfig config, Instant time) {
return time.isBefore(config.from()) || !time.isBefore(config.to());
}
private String required(CSVRecord record, String column) {
String value = record.get(column);
if (value == null || value.isBlank()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "csv column is required: " + column);
}
return value;
}
private long requiredLong(CSVRecord record, String column) {
return Long.parseLong(required(record, column));
}
private BigDecimal firstDecimal(CSVRecord record, String... columns) {
for (String column : columns) {
if (!record.isMapped(column)) {
continue;
}
BigDecimal value = decimal(record, column);
if (value != null) {
return value;
}
}
return null;
}
private BigDecimal decimal(CSVRecord record, String column) {
if (!record.isMapped(column)) {
return null;
}
String value = record.get(column);
if (value == null || value.isBlank()) {
return null;
}
return new BigDecimal(value);
}
private String decimalText(BigDecimal value) {
return value == null ? null : value.stripTrailingZeros().toPlainString();
}
private void putDecimal(Map<String, Object> target, String key, BigDecimal value) {
String text = decimalText(value);
if (text != null) {
target.put(key, text);
}
}
private void putIfPresent(Map<String, Object> target, String key, Object value) {
if (value != null) {
target.put(key, value);
}
}
private BigDecimal executionQualityScore(MarketBar bar) {
if (bar.expectedSlippageBps() == null) {
return null;
}
BigDecimal score = BigDecimal.ONE.subtract(bar.expectedSlippageBps().divide(new BigDecimal("20.0"), MC), MC);
return score.max(new BigDecimal("0.20")).min(BigDecimal.ONE).setScale(8, RoundingMode.HALF_UP);
}
private BigDecimal sideReturnBps(TraderSide side, BigDecimal entry, BigDecimal exit) {
BigDecimal gross = exit.subtract(entry, MC)
.divide(entry, MC)
.multiply(new BigDecimal("10000"), MC);
return side == TraderSide.LONG ? gross : gross.negate();
}
private BigDecimal priceByBps(BigDecimal entry, BigDecimal bps, int sign) {
BigDecimal multiplier = BigDecimal.ONE.add(BigDecimal.valueOf(sign).multiply(bps, MC).divide(new BigDecimal("10000"), MC), MC);
return entry.multiply(multiplier, MC).setScale(8, RoundingMode.HALF_UP);
}
private int favorableSign(TraderSide side) {
return side == TraderSide.LONG ? 1 : -1;
}
private int adverseSign(TraderSide side) {
return side == TraderSide.LONG ? -1 : 1;
}
private BigDecimal invalidBps(TraderSide side) {
return side == TraderSide.LONG ? LONG_INVALID_BPS : SHORT_INVALID_BPS;
}
private BigDecimal stopBps(TraderSide side) {
return side == TraderSide.LONG ? LONG_STOP_BPS : SHORT_STOP_BPS;
}
private BigDecimal targetBps(TraderSide side) {
return side == TraderSide.LONG ? LONG_TARGET_BPS : SHORT_TARGET_BPS;
}
private record MarketBar(
Instant openTime,
BigDecimal open,
BigDecimal high,
BigDecimal low,
BigDecimal close,
BigDecimal volume,
BigDecimal takerBuyVolume,
BigDecimal fundingBps,
BigDecimal openInterest,
BigDecimal bestBidPrice,
BigDecimal bestAskPrice,
BigDecimal observedSpreadBps,
BigDecimal expectedSlippageBps,
BigDecimal p95LatencyMs,
BigDecimal sourceCoverage
) {
}
private record CandidateEvent(
String eventId,
Instant barTime,
String signalType,
TraderSide side,
String sourceService,
BigDecimal triggerScore
) {
}
}
@@ -22,6 +22,12 @@ public class JsonlReplayMarketEventReader implements ReplayMarketEventReader {
this.objectMapper = new ObjectMapper().findAndRegisterModules();
}
@Override
public boolean supports(ReplayRunConfig config) {
DataSourceSpec source = config.dataSources() == null ? null : config.dataSources().get("ticks");
return source != null && source.path() != null && source.path().endsWith(".jsonl");
}
@Override
public void validateReadable(ReplayRunConfig config) {
DataSourceSpec source = selectReplaySource(config);
@@ -76,7 +82,8 @@ public class JsonlReplayMarketEventReader implements ReplayMarketEventReader {
fixture.setupFeatures(),
fixture.triggerFeatures(),
fixture.executionFeatures(),
fixture.dataQuality()
fixture.dataQuality(),
fixture.labelInputs() == null ? Map.of() : fixture.labelInputs()
);
} catch (IOException ex) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "invalid replay tick json: " + ex.getMessage());
@@ -100,7 +107,8 @@ public class JsonlReplayMarketEventReader implements ReplayMarketEventReader {
Map<String, Object> setupFeatures,
Map<String, Object> triggerFeatures,
Map<String, Object> executionFeatures,
Map<String, Object> dataQuality
Map<String, Object> dataQuality,
Map<String, Object> labelInputs
) {
}
}
@@ -1,6 +1,7 @@
package com.quantai.trader.replay;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Map;
public record ReplayClockTick(
@@ -11,6 +12,23 @@ public record ReplayClockTick(
Map<String, Object> setupFeatures,
Map<String, Object> triggerFeatures,
Map<String, Object> executionFeatures,
Map<String, Object> dataQuality
Map<String, Object> dataQuality,
Map<String, Object> labelInputs
) {
public ReplayClockTick {
contextFeatures = immutable(contextFeatures);
setupFeatures = immutable(setupFeatures);
triggerFeatures = immutable(triggerFeatures);
executionFeatures = immutable(executionFeatures);
dataQuality = immutable(dataQuality);
labelInputs = immutable(labelInputs);
}
private static Map<String, Object> immutable(Map<String, Object> value) {
if (value == null || value.isEmpty()) {
return Map.of();
}
return Map.copyOf(new LinkedHashMap<>(value));
}
}
@@ -4,6 +4,8 @@ import java.util.List;
public interface ReplayMarketEventReader {
boolean supports(ReplayRunConfig config);
void validateReadable(ReplayRunConfig config);
List<ReplayClockTick> readTicks(ReplayRunConfig config);
@@ -32,7 +32,7 @@ public class ReplayRunService {
private final TraderPlaybookCatalog catalog;
private final ReplayRunRepository repository;
private final ReplayReportWriter reportWriter;
private final ReplayMarketEventReader eventReader;
private final List<ReplayMarketEventReader> eventReaders;
private final TraderDecisionCycleRunner cycleRunner;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable, "trader-replay-worker");
@@ -44,13 +44,13 @@ public class ReplayRunService {
TraderPlaybookCatalog catalog,
ReplayRunRepository repository,
ReplayReportWriter reportWriter,
ReplayMarketEventReader eventReader,
List<ReplayMarketEventReader> eventReaders,
TraderDecisionCycleRunner cycleRunner
) {
this.catalog = catalog;
this.repository = repository;
this.reportWriter = reportWriter;
this.eventReader = eventReader;
this.eventReaders = List.copyOf(eventReaders);
this.cycleRunner = cycleRunner;
}
@@ -58,7 +58,7 @@ public class ReplayRunService {
validateRequest(request);
TraderPlaybookDefinitionSnapshot playbook = catalog.require(request.playbookId(), request.playbookVersion());
request.dataSources().forEach((sourceType, spec) -> validateDataSource(request, sourceType, spec));
eventReader.validateReadable(request);
readerFor(request).validateReadable(request);
String runId = Ids.runId(Instant.now());
ReplayRunConfig config = request.withRunId(runId);
@@ -114,7 +114,7 @@ public class ReplayRunService {
playbook.playbookVersion(),
ReplayRunStatus.RUNNING
);
List<ReplayClockTick> ticks = eventReader.readTicks(run.config());
List<ReplayClockTick> ticks = readerFor(run.config()).readTicks(run.config());
List<TraderCycleResult> results = new ArrayList<>(ticks.size());
TraderRuntimeState runtimeState = new TraderRuntimeState(
run.runId(),
@@ -186,6 +186,16 @@ public class ReplayRunService {
.orElseThrow(() -> new IllegalStateException("replay run disappeared: " + runId));
}
private ReplayMarketEventReader readerFor(ReplayRunConfig config) {
return eventReaders.stream()
.filter(reader -> reader.supports(config))
.findFirst()
.orElseThrow(() -> new TraderException(
TraderErrorCode.TRADER_DATA_SOURCE_MISSING,
"no replay reader supports the requested dataSources"
));
}
private void validateDataSource(ReplayRunConfig request, String sourceType, DataSourceSpec spec) {
if (spec.timezone() == null || spec.timezone().isBlank()) {
throw new TraderException(TraderErrorCode.TRADER_DATA_SOURCE_MISSING, "data source timezone is required: " + sourceType);