diff --git a/src/main/java/com/quantai/trader/config/JacksonConfig.java b/src/main/java/com/quantai/trader/config/JacksonConfig.java new file mode 100644 index 0000000..36287e4 --- /dev/null +++ b/src/main/java/com/quantai/trader/config/JacksonConfig.java @@ -0,0 +1,14 @@ +package com.quantai.trader.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class JacksonConfig { + @Bean + ObjectMapper traderObjectMapper() { + return JsonMapper.builder().findAndAddModules().build(); + } +} diff --git a/src/main/java/com/quantai/trader/enums/TraderErrorCode.java b/src/main/java/com/quantai/trader/enums/TraderErrorCode.java index 1028013..c02cf46 100644 --- a/src/main/java/com/quantai/trader/enums/TraderErrorCode.java +++ b/src/main/java/com/quantai/trader/enums/TraderErrorCode.java @@ -11,5 +11,6 @@ public enum TraderErrorCode { TRADER_FEEDBACK_INVALID, TRADER_P0_MODE_BLOCKED, TRADER_KILL_SWITCH_ACTIVE, - TRADER_ACTIVE_POINTER_MISMATCH + TRADER_ACTIVE_POINTER_MISMATCH, + TRADER_PERSISTENCE_FAILED } diff --git a/src/main/java/com/quantai/trader/evidence/EvidenceAppender.java b/src/main/java/com/quantai/trader/evidence/EvidenceAppender.java index 398c21c..8798eda 100644 --- a/src/main/java/com/quantai/trader/evidence/EvidenceAppender.java +++ b/src/main/java/com/quantai/trader/evidence/EvidenceAppender.java @@ -6,26 +6,25 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; @Component public class EvidenceAppender { private static final Logger log = LoggerFactory.getLogger(EvidenceAppender.class); - private final CopyOnWriteArrayList evidence = new CopyOnWriteArrayList<>(); + private final AtomicLong sequence = new AtomicLong(); + private final TraderEvidenceRepository repository; + + public EvidenceAppender(TraderEvidenceRepository repository) { + this.repository = repository; + } public TraderEvidence append(String runId, String cycleId, String stage, boolean pass, String reason, String blocker, Map details) { - TraderEvidence item = new TraderEvidence("evidence_" + cycleId + "_" + evidence.size(), runId, cycleId, + TraderEvidence item = new TraderEvidence("evidence_" + cycleId + "_" + sequence.getAndIncrement(), runId, cycleId, stage, pass, reason, blocker, Instant.now(), details); - evidence.add(item); + repository.insert(item); log.info("event=trader.evidence.appended runId={} cycleId={} stage={} pass={} reason={} blocker={}", runId, cycleId, stage, pass, reason, blocker); return item; } - - public List all() { - return new ArrayList<>(evidence); - } } diff --git a/src/main/java/com/quantai/trader/evidence/JdbcTraderEvidenceRepository.java b/src/main/java/com/quantai/trader/evidence/JdbcTraderEvidenceRepository.java new file mode 100644 index 0000000..2bda068 --- /dev/null +++ b/src/main/java/com/quantai/trader/evidence/JdbcTraderEvidenceRepository.java @@ -0,0 +1,32 @@ +package com.quantai.trader.evidence; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.quantai.trader.domain.TraderEvidence; +import com.quantai.trader.persistence.TraderJsonCodec; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; + +@Repository +public class JdbcTraderEvidenceRepository implements TraderEvidenceRepository { + private final JdbcTemplate jdbcTemplate; + private final TraderJsonCodec jsonCodec; + + public JdbcTraderEvidenceRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) { + this.jdbcTemplate = jdbcTemplate; + this.jsonCodec = new TraderJsonCodec(objectMapper); + } + + @Override + public void insert(TraderEvidence evidence) { + jdbcTemplate.update(""" + insert into trader_evidence + (run_id, cycle_id, evidence_id, stage, pass, reason, blocker, evidence_time, details_json) + values (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + evidence.runId(), evidence.cycleId(), evidence.evidenceId(), evidence.stage(), evidence.pass(), + evidence.reason(), evidence.blocker(), Timestamp.from(evidence.evidenceTime()), + jsonCodec.toJson(evidence.detailsJson())); + } +} diff --git a/src/main/java/com/quantai/trader/evidence/TraderEvidenceRepository.java b/src/main/java/com/quantai/trader/evidence/TraderEvidenceRepository.java new file mode 100644 index 0000000..543b1d4 --- /dev/null +++ b/src/main/java/com/quantai/trader/evidence/TraderEvidenceRepository.java @@ -0,0 +1,7 @@ +package com.quantai.trader.evidence; + +import com.quantai.trader.domain.TraderEvidence; + +public interface TraderEvidenceRepository { + void insert(TraderEvidence evidence); +} diff --git a/src/main/java/com/quantai/trader/outbox/InMemoryOutboxRepository.java b/src/main/java/com/quantai/trader/outbox/InMemoryOutboxRepository.java deleted file mode 100644 index 32ea3b7..0000000 --- a/src/main/java/com/quantai/trader/outbox/InMemoryOutboxRepository.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.quantai.trader.outbox; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Repository; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -@Repository -public class InMemoryOutboxRepository { - private static final Logger log = LoggerFactory.getLogger(InMemoryOutboxRepository.class); - private final CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); - - public void insert(TraderOutboxEvent event) { - boolean duplicate = events.stream().anyMatch(existing -> existing.destination().equals(event.destination()) - && existing.idempotencyKey().equals(event.idempotencyKey())); - if (duplicate) { - throw new IllegalArgumentException("duplicate outbox idempotency key: " + event.idempotencyKey()); - } - events.add(event); - log.info("event=trader.outbox.inserted runId={} cycleId={} destination={} aggregateType={} aggregateId={} status={}", - event.runId(), event.cycleId(), event.destination(), event.aggregateType(), event.aggregateId(), event.status()); - } - - public List all() { - return new ArrayList<>(events); - } -} diff --git a/src/main/java/com/quantai/trader/outbox/JdbcTraderOutboxRepository.java b/src/main/java/com/quantai/trader/outbox/JdbcTraderOutboxRepository.java new file mode 100644 index 0000000..3fa7f1e --- /dev/null +++ b/src/main/java/com/quantai/trader/outbox/JdbcTraderOutboxRepository.java @@ -0,0 +1,38 @@ +package com.quantai.trader.outbox; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.quantai.trader.persistence.TraderJsonCodec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; + +@Repository +public class JdbcTraderOutboxRepository implements TraderOutboxRepository { + private static final Logger log = LoggerFactory.getLogger(JdbcTraderOutboxRepository.class); + + private final JdbcTemplate jdbcTemplate; + private final TraderJsonCodec jsonCodec; + + public JdbcTraderOutboxRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) { + this.jdbcTemplate = jdbcTemplate; + this.jsonCodec = new TraderJsonCodec(objectMapper); + } + + @Override + public void insert(TraderOutboxEvent event) { + jdbcTemplate.update(""" + insert into trader_outbox + (outbox_id, run_id, cycle_id, aggregate_type, aggregate_id, event_type, destination, + payload_json, idempotency_key, status, created_at) + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + event.outboxId(), event.runId(), event.cycleId(), event.aggregateType(), event.aggregateId(), + event.eventType(), event.destination(), jsonCodec.toJson(event.payloadJson()), + event.idempotencyKey(), event.status(), Timestamp.from(event.createdAt())); + log.info("event=trader.outbox.inserted runId={} cycleId={} destination={} aggregateType={} aggregateId={} status={}", + event.runId(), event.cycleId(), event.destination(), event.aggregateType(), event.aggregateId(), event.status()); + } +} diff --git a/src/main/java/com/quantai/trader/outbox/TraderOutboxRepository.java b/src/main/java/com/quantai/trader/outbox/TraderOutboxRepository.java new file mode 100644 index 0000000..c31f150 --- /dev/null +++ b/src/main/java/com/quantai/trader/outbox/TraderOutboxRepository.java @@ -0,0 +1,5 @@ +package com.quantai.trader.outbox; + +public interface TraderOutboxRepository { + void insert(TraderOutboxEvent event); +} diff --git a/src/main/java/com/quantai/trader/persistence/JdbcTraderDecisionTraceWriter.java b/src/main/java/com/quantai/trader/persistence/JdbcTraderDecisionTraceWriter.java new file mode 100644 index 0000000..2352d5c --- /dev/null +++ b/src/main/java/com/quantai/trader/persistence/JdbcTraderDecisionTraceWriter.java @@ -0,0 +1,141 @@ +package com.quantai.trader.persistence; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.quantai.trader.config.TraderProperties; +import com.quantai.trader.domain.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.util.Map; + +@Repository +public class JdbcTraderDecisionTraceWriter implements TraderDecisionTraceWriter { + private static final Logger log = LoggerFactory.getLogger(JdbcTraderDecisionTraceWriter.class); + + private final JdbcTemplate jdbcTemplate; + private final TraderJsonCodec jsonCodec; + private final TraderProperties properties; + + public JdbcTraderDecisionTraceWriter(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, TraderProperties properties) { + this.jdbcTemplate = jdbcTemplate; + this.jsonCodec = new TraderJsonCodec(objectMapper); + this.properties = properties; + } + + @Override + public void persistCycleTrace(TraderDecisionCycle cycle, + TraderMarketSnapshot snapshot, + TraderModelOutput modelOutput, + TraderPositionState positionState, + TraderPositionManagerDecision pmDecision, + TraderRiskDecision riskDecision, + TraderAction action) { + upsertRun(cycle); + insertCycle(cycle, positionState, riskDecision); + insertModelOutput(modelOutput, snapshot); + insertPmDecision(pmDecision); + insertRiskDecision(riskDecision); + insertAction(action); + log.info("event=trader.trace.persisted runId={} cycleId={} action={} riskAllowed={}", + cycle.runId(), cycle.cycleId(), action.actionType(), riskDecision.allowAction()); + } + + void upsertRun(TraderDecisionCycle cycle) { + jdbcTemplate.update(""" + insert into trader_run + (run_id, run_mode, symbol, model_bundle_version, calibration_bundle_version, + pm_config_version, execution_mode, status, config_json, started_at) + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + on duplicate key update + model_bundle_version = values(model_bundle_version), + calibration_bundle_version = values(calibration_bundle_version), + pm_config_version = values(pm_config_version), + execution_mode = values(execution_mode), + status = values(status), + updated_at = current_timestamp(3) + """, + cycle.runId(), cycle.runMode().name(), cycle.symbol(), cycle.modelBundleVersion(), + cycle.calibrationBundleVersion(), cycle.pmConfigVersion(), properties.execution().mode().name(), + "RUNNING", jsonCodec.toJson(Map.of( + "serviceName", properties.serviceName(), + "artifactRoot", properties.artifact().artifactRoot(), + "redisKeyPrefix", properties.runtime().redisKeyPrefix())), + Timestamp.from(cycle.cycleTime())); + } + + void insertCycle(TraderDecisionCycle cycle, TraderPositionState positionState, TraderRiskDecision riskDecision) { + jdbcTemplate.update(""" + insert into trader_decision_cycle + (run_id, cycle_id, symbol, cycle_time, state, decision_status, blocker) + values (?, ?, ?, ?, ?, ?, ?) + """, + cycle.runId(), cycle.cycleId(), cycle.symbol(), Timestamp.from(cycle.cycleTime()), + positionState.isFlat() ? "FLAT" : "POSITION", + riskDecision.allowAction() ? "ACTION_ALLOWED" : "ACTION_BLOCKED", + riskDecision.blocker()); + } + + void insertModelOutput(TraderModelOutput modelOutput, TraderMarketSnapshot snapshot) { + jdbcTemplate.update(""" + insert into trader_model_output + (run_id, cycle_id, model_output_id, model_bundle_version, calibration_bundle_version, + direction_json, entry_json, continue_json, exit_json, risk_json, + uncertainty, ood_score, usable, blocker) + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + modelOutput.runId(), modelOutput.cycleId(), modelOutput.modelOutputId(), + modelOutput.modelBundleVersion(), modelOutput.calibrationBundleVersion(), + jsonCodec.toJson(modelOutput.direction()), jsonCodec.toJson(modelOutput.entry()), + jsonCodec.toJson(modelOutput.continuation()), jsonCodec.toJson(modelOutput.exit()), + jsonCodec.toJson(modelOutput.risk()), modelOutput.uncertainty(), modelOutput.oodScore(), + snapshot.dataReady(), snapshot.dataReady() ? null : "DATA_NOT_READY"); + } + + void insertPmDecision(TraderPositionManagerDecision decision) { + jdbcTemplate.update(""" + insert into trader_position_manager_decision + (run_id, cycle_id, pm_decision_id, model_output_id, position_state_id, account_state_id, + execution_state_id, candidate_action, side, price_plan_id, price_plan_config_hash, + target_position_ratio, add_ratio, reduce_ratio, stop_price, target_price, reason, decision_json) + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + decision.runId(), decision.cycleId(), decision.pmDecisionId(), decision.modelOutputId(), + decision.positionStateId(), decision.accountStateId(), decision.executionStateId(), + decision.candidateAction().name(), decision.side().name(), decision.pricePlanId(), + decision.pricePlanConfigHash(), decision.targetPositionRatio(), decision.addRatio(), + decision.reduceRatio(), decision.stopPrice(), decision.targetPrice(), decision.reason(), + jsonCodec.toJson(decision.decisionJson())); + } + + void insertRiskDecision(TraderRiskDecision decision) { + jdbcTemplate.update(""" + insert into trader_risk_decision + (run_id, cycle_id, risk_decision_id, pm_decision_id, original_action, final_action, + allow_action, blocker, decision_json) + values (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + decision.runId(), decision.cycleId(), decision.riskDecisionId(), decision.pmDecisionId(), + decision.originalAction().name(), decision.finalAction().name(), decision.allowAction(), + decision.blocker(), jsonCodec.toJson(decision.decisionJson())); + } + + void insertAction(TraderAction action) { + jdbcTemplate.update(""" + insert into trader_action + (run_id, cycle_id, action_id, model_output_id, pm_decision_id, risk_decision_id, + action_type, symbol, side, price_plan_id, price_plan_config_hash, position_ratio, + quantity, stop_price, target_price, reduce_only, idempotency_key, send_status, + reason, action_context_json) + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + action.runId(), action.cycleId(), action.actionId(), action.modelOutputId(), + action.pmDecisionId(), action.riskDecisionId(), action.actionType().name(), + action.symbol(), action.side().name(), action.pricePlanId(), action.pricePlanConfigHash(), + action.positionRatio(), action.quantity(), action.stopPrice(), action.targetPrice(), + action.reduceOnly(), action.idempotencyKey(), "PENDING", action.reason(), + jsonCodec.toJson(action.actionContextJson())); + } +} diff --git a/src/main/java/com/quantai/trader/persistence/TraderDecisionTraceWriter.java b/src/main/java/com/quantai/trader/persistence/TraderDecisionTraceWriter.java new file mode 100644 index 0000000..5deecbe --- /dev/null +++ b/src/main/java/com/quantai/trader/persistence/TraderDecisionTraceWriter.java @@ -0,0 +1,13 @@ +package com.quantai.trader.persistence; + +import com.quantai.trader.domain.*; + +public interface TraderDecisionTraceWriter { + void persistCycleTrace(TraderDecisionCycle cycle, + TraderMarketSnapshot snapshot, + TraderModelOutput modelOutput, + TraderPositionState positionState, + TraderPositionManagerDecision pmDecision, + TraderRiskDecision riskDecision, + TraderAction action); +} diff --git a/src/main/java/com/quantai/trader/persistence/TraderJsonCodec.java b/src/main/java/com/quantai/trader/persistence/TraderJsonCodec.java new file mode 100644 index 0000000..37c43b8 --- /dev/null +++ b/src/main/java/com/quantai/trader/persistence/TraderJsonCodec.java @@ -0,0 +1,23 @@ +package com.quantai.trader.persistence; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.quantai.trader.domain.TraderException; +import com.quantai.trader.enums.TraderErrorCode; + +public class TraderJsonCodec { + private final ObjectMapper objectMapper; + + public TraderJsonCodec(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public String toJson(Object value) { + try { + return objectMapper.writeValueAsString(value); + } catch (JsonProcessingException exception) { + throw new TraderException(TraderErrorCode.TRADER_PERSISTENCE_FAILED, + "failed to serialize trader persistence payload"); + } + } +} diff --git a/src/main/java/com/quantai/trader/replay/TraderP0CycleRunner.java b/src/main/java/com/quantai/trader/replay/TraderP0CycleRunner.java index f12f13b..5becad9 100644 --- a/src/main/java/com/quantai/trader/replay/TraderP0CycleRunner.java +++ b/src/main/java/com/quantai/trader/replay/TraderP0CycleRunner.java @@ -7,8 +7,9 @@ import com.quantai.trader.domain.*; import com.quantai.trader.enums.PositionSide; import com.quantai.trader.evidence.EvidenceAppender; import com.quantai.trader.model.TraderModelService; -import com.quantai.trader.outbox.InMemoryOutboxRepository; +import com.quantai.trader.outbox.TraderOutboxRepository; import com.quantai.trader.outbox.TraderOutboxEvent; +import com.quantai.trader.persistence.TraderDecisionTraceWriter; import com.quantai.trader.position.TraderPositionManager; import com.quantai.trader.risk.RiskGateInput; import com.quantai.trader.risk.RiskLimits; @@ -32,7 +33,8 @@ public class TraderP0CycleRunner { private final TraderRiskGate riskGate; private final TraderActionFactory actionFactory; private final EvidenceAppender evidenceAppender; - private final InMemoryOutboxRepository outboxRepository; + private final TraderDecisionTraceWriter traceWriter; + private final TraderOutboxRepository outboxRepository; public TraderP0CycleRunner(TraderProperties properties, TraderArtifactLoader artifactLoader, @@ -41,7 +43,8 @@ public class TraderP0CycleRunner { TraderRiskGate riskGate, TraderActionFactory actionFactory, EvidenceAppender evidenceAppender, - InMemoryOutboxRepository outboxRepository) { + TraderDecisionTraceWriter traceWriter, + TraderOutboxRepository outboxRepository) { this.properties = properties; this.artifactLoader = artifactLoader; this.modelService = modelService; @@ -49,6 +52,7 @@ public class TraderP0CycleRunner { this.riskGate = riskGate; this.actionFactory = actionFactory; this.evidenceAppender = evidenceAppender; + this.traceWriter = traceWriter; this.outboxRepository = outboxRepository; } @@ -69,6 +73,7 @@ public class TraderP0CycleRunner { pmInput.executionState(), snapshot, riskLimits())); evidenceAppender.append(cycle.runId(), cycle.cycleId(), "RISK_DECISION", riskDecision.allowAction(), riskDecision.allowAction() ? "RISK_PASS" : riskDecision.blocker(), riskDecision.blocker(), Map.of()); TraderAction action = actionFactory.create(pmDecision, riskDecision, event.symbol()); + traceWriter.persistCycleTrace(cycle, snapshot, modelOutput, pmInput.positionState(), pmDecision, riskDecision, action); outboxRepository.insert(new TraderOutboxEvent("outbox_" + action.actionId(), action.runId(), action.cycleId(), "TRADER_ACTION", action.actionId(), "ACTION_CREATED", properties.runMode().name() + "_RECORDER", Map.of("actionType", action.actionType().name()), action.idempotencyKey(), "PENDING", Instant.now())); diff --git a/src/test/java/com/quantai/trader/evidence/EvidenceAppenderTest.java b/src/test/java/com/quantai/trader/evidence/EvidenceAppenderTest.java index 8679396..a8143e6 100644 --- a/src/test/java/com/quantai/trader/evidence/EvidenceAppenderTest.java +++ b/src/test/java/com/quantai/trader/evidence/EvidenceAppenderTest.java @@ -3,6 +3,8 @@ package com.quantai.trader.evidence; import com.quantai.trader.domain.TraderEvidence; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -10,7 +12,8 @@ import static org.assertj.core.api.Assertions.assertThat; class EvidenceAppenderTest { @Test void appendsEvidenceWithStageReasonAndDetails() { - EvidenceAppender appender = new EvidenceAppender(); + RecordingEvidenceRepository repository = new RecordingEvidenceRepository(); + EvidenceAppender appender = new EvidenceAppender(repository); TraderEvidence item = appender.append("run-1", "cycle-1", "PM_DECISION", true, "OPEN_LONG_PM_PASS", null, Map.of("action", "OPEN_LONG")); @@ -18,6 +21,19 @@ class EvidenceAppenderTest { assertThat(item.evidenceId()).isEqualTo("evidence_cycle-1_0"); assertThat(item.pass()).isTrue(); assertThat(item.detailsJson()).containsEntry("action", "OPEN_LONG"); - assertThat(appender.all()).containsExactly(item); + assertThat(repository.items()).containsExactly(item); + } + + private static final class RecordingEvidenceRepository implements TraderEvidenceRepository { + private final List items = new ArrayList<>(); + + @Override + public void insert(TraderEvidence evidence) { + items.add(evidence); + } + + List items() { + return items; + } } } diff --git a/src/test/java/com/quantai/trader/outbox/InMemoryOutboxRepositoryTest.java b/src/test/java/com/quantai/trader/outbox/InMemoryOutboxRepositoryTest.java deleted file mode 100644 index 326ad30..0000000 --- a/src/test/java/com/quantai/trader/outbox/InMemoryOutboxRepositoryTest.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.quantai.trader.outbox; - -import org.junit.jupiter.api.Test; - -import java.time.Instant; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -class InMemoryOutboxRepositoryTest { - @Test - void rejectsDuplicateDestinationAndIdempotencyKey() { - InMemoryOutboxRepository repository = new InMemoryOutboxRepository(); - TraderOutboxEvent event = event("outbox-1", "SHADOW_RECORDER", "idem-1"); - - repository.insert(event); - - assertThatThrownBy(() -> repository.insert(event("outbox-2", "SHADOW_RECORDER", "idem-1"))) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("duplicate outbox idempotency key"); - assertThat(repository.all()).containsExactly(event); - } - - @Test - void allowsSameIdempotencyKeyForDifferentDestination() { - InMemoryOutboxRepository repository = new InMemoryOutboxRepository(); - - repository.insert(event("outbox-1", "REPLAY_SIM_RECORDER", "idem-1")); - repository.insert(event("outbox-2", "SHADOW_RECORDER", "idem-1")); - - assertThat(repository.all()).hasSize(2); - } - - private TraderOutboxEvent event(String id, String destination, String idempotencyKey) { - return new TraderOutboxEvent(id, "run-1", "cycle-1", "TRADER_ACTION", "action-1", - "ACTION_CREATED", destination, Map.of("actionType", "OPEN_LONG"), idempotencyKey, - "PENDING", Instant.parse("2026-06-26T00:00:00Z")); - } -} diff --git a/src/test/java/com/quantai/trader/outbox/JdbcTraderOutboxRepositoryTest.java b/src/test/java/com/quantai/trader/outbox/JdbcTraderOutboxRepositoryTest.java new file mode 100644 index 0000000..8e6cb10 --- /dev/null +++ b/src/test/java/com/quantai/trader/outbox/JdbcTraderOutboxRepositoryTest.java @@ -0,0 +1,28 @@ +package com.quantai.trader.outbox; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.Instant; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +class JdbcTraderOutboxRepositoryTest { + @Test + void insertsOutboxEventThroughJdbcRepository() { + JdbcTemplate jdbcTemplate = mock(JdbcTemplate.class); + JdbcTraderOutboxRepository repository = new JdbcTraderOutboxRepository(jdbcTemplate, new ObjectMapper()); + + repository.insert(new TraderOutboxEvent("outbox-1", "run-1", "cycle-1", + "TRADER_ACTION", "action-1", "ACTION_CREATED", "SHADOW_RECORDER", + Map.of("actionType", "OPEN_LONG"), "idem-1", "PENDING", + Instant.parse("2026-06-26T00:00:00Z"))); + + verify(jdbcTemplate).update(contains("insert into trader_outbox"), any(Object[].class)); + } +} diff --git a/src/test/java/com/quantai/trader/persistence/JdbcTraderDecisionTraceWriterTest.java b/src/test/java/com/quantai/trader/persistence/JdbcTraderDecisionTraceWriterTest.java new file mode 100644 index 0000000..886d755 --- /dev/null +++ b/src/test/java/com/quantai/trader/persistence/JdbcTraderDecisionTraceWriterTest.java @@ -0,0 +1,34 @@ +package com.quantai.trader.persistence; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.quantai.trader.domain.TraderActionFactory; +import com.quantai.trader.domain.TraderRiskDecision; +import com.quantai.trader.enums.PositionSide; +import com.quantai.trader.enums.TraderActionType; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.util.Map; + +import static com.quantai.trader.TestFixtures.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +class JdbcTraderDecisionTraceWriterTest { + @Test + void persistsRunCycleModelPmRiskAndActionTrace() { + JdbcTemplate jdbcTemplate = mock(JdbcTemplate.class); + JdbcTraderDecisionTraceWriter writer = new JdbcTraderDecisionTraceWriter(jdbcTemplate, new ObjectMapper(), properties()); + TraderRiskDecision riskDecision = new TraderRiskDecision("risk-1", "run-1", "cycle-1", + "pm-cycle-1", true, TraderActionType.OPEN_LONG, TraderActionType.OPEN_LONG, null, Map.of()); + var action = new TraderActionFactory().create(pmDecision(TraderActionType.OPEN_LONG, PositionSide.LONG), riskDecision, "BTC-USDT-PERP"); + + writer.persistCycleTrace(cycle(), snapshot(), modelOutput(), flatPosition(), + pmDecision(TraderActionType.OPEN_LONG, PositionSide.LONG), riskDecision, action); + + verify(jdbcTemplate, times(6)).update(anyString(), any(Object[].class)); + } +} diff --git a/src/test/java/com/quantai/trader/replay/TraderP0CycleRunnerTest.java b/src/test/java/com/quantai/trader/replay/TraderP0CycleRunnerTest.java index de9235a..82fd393 100644 --- a/src/test/java/com/quantai/trader/replay/TraderP0CycleRunnerTest.java +++ b/src/test/java/com/quantai/trader/replay/TraderP0CycleRunnerTest.java @@ -1,15 +1,20 @@ package com.quantai.trader.replay; import com.quantai.trader.artifact.TraderArtifactLoader; -import com.quantai.trader.domain.TraderActionFactory; +import com.quantai.trader.domain.*; import com.quantai.trader.enums.TraderActionType; import com.quantai.trader.evidence.EvidenceAppender; +import com.quantai.trader.evidence.TraderEvidenceRepository; import com.quantai.trader.model.DeterministicTraderModelService; -import com.quantai.trader.outbox.InMemoryOutboxRepository; +import com.quantai.trader.outbox.TraderOutboxEvent; +import com.quantai.trader.outbox.TraderOutboxRepository; +import com.quantai.trader.persistence.TraderDecisionTraceWriter; import com.quantai.trader.position.TraderPositionManager; import com.quantai.trader.risk.TraderRiskGate; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.List; import java.math.BigDecimal; import static com.quantai.trader.TestFixtures.T0; @@ -19,8 +24,10 @@ import static org.assertj.core.api.Assertions.assertThat; class TraderP0CycleRunnerTest { @Test void runsReplayShadowCycleThroughModelPmRiskActionOutboxAndEvidence() { - EvidenceAppender evidenceAppender = new EvidenceAppender(); - InMemoryOutboxRepository outboxRepository = new InMemoryOutboxRepository(); + RecordingEvidenceRepository evidenceRepository = new RecordingEvidenceRepository(); + EvidenceAppender evidenceAppender = new EvidenceAppender(evidenceRepository); + RecordingTraceWriter traceWriter = new RecordingTraceWriter(); + RecordingOutboxRepository outboxRepository = new RecordingOutboxRepository(); TraderP0CycleRunner runner = new TraderP0CycleRunner( properties(), new TraderArtifactLoader(properties()), @@ -29,6 +36,7 @@ class TraderP0CycleRunnerTest { new TraderRiskGate(), new TraderActionFactory(), evidenceAppender, + traceWriter, outboxRepository); TraderCycleResult result = runner.runFlatCycle(new ReplayMarketEvent( @@ -37,16 +45,19 @@ class TraderP0CycleRunnerTest { assertThat(result.action().actionType()).isEqualTo(TraderActionType.OPEN_LONG); assertThat(result.action().reduceOnly()).isFalse(); - assertThat(outboxRepository.all()).hasSize(1); - assertThat(outboxRepository.all().getFirst().destination()).isEqualTo("SHADOW_RECORDER"); - assertThat(evidenceAppender.all()).extracting("stage") + assertThat(traceWriter.actions()).containsExactly(result.action()); + assertThat(outboxRepository.events()).hasSize(1); + assertThat(outboxRepository.events().getFirst().destination()).isEqualTo("SHADOW_RECORDER"); + assertThat(evidenceRepository.items()).extracting("stage") .containsExactly("MARKET_SNAPSHOT", "MODEL_OUTPUT", "PM_DECISION", "RISK_DECISION"); } @Test void recordsWaitActionWhenReplaySnapshotHasNoLiquidity() { - EvidenceAppender evidenceAppender = new EvidenceAppender(); - InMemoryOutboxRepository outboxRepository = new InMemoryOutboxRepository(); + RecordingEvidenceRepository evidenceRepository = new RecordingEvidenceRepository(); + EvidenceAppender evidenceAppender = new EvidenceAppender(evidenceRepository); + RecordingTraceWriter traceWriter = new RecordingTraceWriter(); + RecordingOutboxRepository outboxRepository = new RecordingOutboxRepository(); TraderP0CycleRunner runner = new TraderP0CycleRunner( properties(), new TraderArtifactLoader(properties()), @@ -55,6 +66,7 @@ class TraderP0CycleRunnerTest { new TraderRiskGate(), new TraderActionFactory(), evidenceAppender, + traceWriter, outboxRepository); TraderCycleResult result = runner.runFlatCycle(new ReplayMarketEvent( @@ -63,6 +75,49 @@ class TraderP0CycleRunnerTest { assertThat(result.action().actionType()).isEqualTo(TraderActionType.WAIT); assertThat(result.action().pricePlanId()).isNull(); - assertThat(outboxRepository.all()).hasSize(1); + assertThat(traceWriter.actions()).containsExactly(result.action()); + assertThat(outboxRepository.events()).hasSize(1); + assertThat(evidenceRepository.items()).hasSize(4); + } + + private static final class RecordingEvidenceRepository implements TraderEvidenceRepository { + private final List items = new ArrayList<>(); + + @Override + public void insert(TraderEvidence evidence) { + items.add(evidence); + } + + List items() { + return items; + } + } + + private static final class RecordingOutboxRepository implements TraderOutboxRepository { + private final List events = new ArrayList<>(); + + @Override + public void insert(TraderOutboxEvent event) { + events.add(event); + } + + List events() { + return events; + } + } + + private static final class RecordingTraceWriter implements TraderDecisionTraceWriter { + private final List actions = new ArrayList<>(); + + @Override + public void persistCycleTrace(TraderDecisionCycle cycle, TraderMarketSnapshot snapshot, TraderModelOutput modelOutput, + TraderPositionState positionState, TraderPositionManagerDecision pmDecision, + TraderRiskDecision riskDecision, TraderAction action) { + actions.add(action); + } + + List actions() { + return actions; + } } }