Persist trader V4 P0 decision trace

This commit is contained in:
Codex
2026-06-26 22:01:25 +08:00
parent 5d210053d0
commit 6bbedda97d
17 changed files with 437 additions and 96 deletions
@@ -0,0 +1,14 @@
package com.quantai.trader.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class JacksonConfig {
@Bean
ObjectMapper traderObjectMapper() {
return JsonMapper.builder().findAndAddModules().build();
}
}
@@ -11,5 +11,6 @@ public enum TraderErrorCode {
TRADER_FEEDBACK_INVALID,
TRADER_P0_MODE_BLOCKED,
TRADER_KILL_SWITCH_ACTIVE,
TRADER_ACTIVE_POINTER_MISMATCH
TRADER_ACTIVE_POINTER_MISMATCH,
TRADER_PERSISTENCE_FAILED
}
@@ -6,26 +6,25 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
@Component
public class EvidenceAppender {
private static final Logger log = LoggerFactory.getLogger(EvidenceAppender.class);
private final CopyOnWriteArrayList<TraderEvidence> evidence = new CopyOnWriteArrayList<>();
private final AtomicLong sequence = new AtomicLong();
private final TraderEvidenceRepository repository;
public EvidenceAppender(TraderEvidenceRepository repository) {
this.repository = repository;
}
public TraderEvidence append(String runId, String cycleId, String stage, boolean pass, String reason, String blocker, Map<String, Object> details) {
TraderEvidence item = new TraderEvidence("evidence_" + cycleId + "_" + evidence.size(), runId, cycleId,
TraderEvidence item = new TraderEvidence("evidence_" + cycleId + "_" + sequence.getAndIncrement(), runId, cycleId,
stage, pass, reason, blocker, Instant.now(), details);
evidence.add(item);
repository.insert(item);
log.info("event=trader.evidence.appended runId={} cycleId={} stage={} pass={} reason={} blocker={}",
runId, cycleId, stage, pass, reason, blocker);
return item;
}
public List<TraderEvidence> all() {
return new ArrayList<>(evidence);
}
}
@@ -0,0 +1,32 @@
package com.quantai.trader.evidence;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quantai.trader.domain.TraderEvidence;
import com.quantai.trader.persistence.TraderJsonCodec;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
@Repository
public class JdbcTraderEvidenceRepository implements TraderEvidenceRepository {
private final JdbcTemplate jdbcTemplate;
private final TraderJsonCodec jsonCodec;
public JdbcTraderEvidenceRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
this.jdbcTemplate = jdbcTemplate;
this.jsonCodec = new TraderJsonCodec(objectMapper);
}
@Override
public void insert(TraderEvidence evidence) {
jdbcTemplate.update("""
insert into trader_evidence
(run_id, cycle_id, evidence_id, stage, pass, reason, blocker, evidence_time, details_json)
values (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
evidence.runId(), evidence.cycleId(), evidence.evidenceId(), evidence.stage(), evidence.pass(),
evidence.reason(), evidence.blocker(), Timestamp.from(evidence.evidenceTime()),
jsonCodec.toJson(evidence.detailsJson()));
}
}
@@ -0,0 +1,7 @@
package com.quantai.trader.evidence;
import com.quantai.trader.domain.TraderEvidence;
public interface TraderEvidenceRepository {
void insert(TraderEvidence evidence);
}
@@ -1,30 +0,0 @@
package com.quantai.trader.outbox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@Repository
public class InMemoryOutboxRepository {
private static final Logger log = LoggerFactory.getLogger(InMemoryOutboxRepository.class);
private final CopyOnWriteArrayList<TraderOutboxEvent> events = new CopyOnWriteArrayList<>();
public void insert(TraderOutboxEvent event) {
boolean duplicate = events.stream().anyMatch(existing -> existing.destination().equals(event.destination())
&& existing.idempotencyKey().equals(event.idempotencyKey()));
if (duplicate) {
throw new IllegalArgumentException("duplicate outbox idempotency key: " + event.idempotencyKey());
}
events.add(event);
log.info("event=trader.outbox.inserted runId={} cycleId={} destination={} aggregateType={} aggregateId={} status={}",
event.runId(), event.cycleId(), event.destination(), event.aggregateType(), event.aggregateId(), event.status());
}
public List<TraderOutboxEvent> all() {
return new ArrayList<>(events);
}
}
@@ -0,0 +1,38 @@
package com.quantai.trader.outbox;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quantai.trader.persistence.TraderJsonCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
@Repository
public class JdbcTraderOutboxRepository implements TraderOutboxRepository {
private static final Logger log = LoggerFactory.getLogger(JdbcTraderOutboxRepository.class);
private final JdbcTemplate jdbcTemplate;
private final TraderJsonCodec jsonCodec;
public JdbcTraderOutboxRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
this.jdbcTemplate = jdbcTemplate;
this.jsonCodec = new TraderJsonCodec(objectMapper);
}
@Override
public void insert(TraderOutboxEvent event) {
jdbcTemplate.update("""
insert into trader_outbox
(outbox_id, run_id, cycle_id, aggregate_type, aggregate_id, event_type, destination,
payload_json, idempotency_key, status, created_at)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
event.outboxId(), event.runId(), event.cycleId(), event.aggregateType(), event.aggregateId(),
event.eventType(), event.destination(), jsonCodec.toJson(event.payloadJson()),
event.idempotencyKey(), event.status(), Timestamp.from(event.createdAt()));
log.info("event=trader.outbox.inserted runId={} cycleId={} destination={} aggregateType={} aggregateId={} status={}",
event.runId(), event.cycleId(), event.destination(), event.aggregateType(), event.aggregateId(), event.status());
}
}
@@ -0,0 +1,5 @@
package com.quantai.trader.outbox;
public interface TraderOutboxRepository {
void insert(TraderOutboxEvent event);
}
@@ -0,0 +1,141 @@
package com.quantai.trader.persistence;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quantai.trader.config.TraderProperties;
import com.quantai.trader.domain.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
import java.util.Map;
@Repository
public class JdbcTraderDecisionTraceWriter implements TraderDecisionTraceWriter {
private static final Logger log = LoggerFactory.getLogger(JdbcTraderDecisionTraceWriter.class);
private final JdbcTemplate jdbcTemplate;
private final TraderJsonCodec jsonCodec;
private final TraderProperties properties;
public JdbcTraderDecisionTraceWriter(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, TraderProperties properties) {
this.jdbcTemplate = jdbcTemplate;
this.jsonCodec = new TraderJsonCodec(objectMapper);
this.properties = properties;
}
@Override
public void persistCycleTrace(TraderDecisionCycle cycle,
TraderMarketSnapshot snapshot,
TraderModelOutput modelOutput,
TraderPositionState positionState,
TraderPositionManagerDecision pmDecision,
TraderRiskDecision riskDecision,
TraderAction action) {
upsertRun(cycle);
insertCycle(cycle, positionState, riskDecision);
insertModelOutput(modelOutput, snapshot);
insertPmDecision(pmDecision);
insertRiskDecision(riskDecision);
insertAction(action);
log.info("event=trader.trace.persisted runId={} cycleId={} action={} riskAllowed={}",
cycle.runId(), cycle.cycleId(), action.actionType(), riskDecision.allowAction());
}
void upsertRun(TraderDecisionCycle cycle) {
jdbcTemplate.update("""
insert into trader_run
(run_id, run_mode, symbol, model_bundle_version, calibration_bundle_version,
pm_config_version, execution_mode, status, config_json, started_at)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
on duplicate key update
model_bundle_version = values(model_bundle_version),
calibration_bundle_version = values(calibration_bundle_version),
pm_config_version = values(pm_config_version),
execution_mode = values(execution_mode),
status = values(status),
updated_at = current_timestamp(3)
""",
cycle.runId(), cycle.runMode().name(), cycle.symbol(), cycle.modelBundleVersion(),
cycle.calibrationBundleVersion(), cycle.pmConfigVersion(), properties.execution().mode().name(),
"RUNNING", jsonCodec.toJson(Map.of(
"serviceName", properties.serviceName(),
"artifactRoot", properties.artifact().artifactRoot(),
"redisKeyPrefix", properties.runtime().redisKeyPrefix())),
Timestamp.from(cycle.cycleTime()));
}
void insertCycle(TraderDecisionCycle cycle, TraderPositionState positionState, TraderRiskDecision riskDecision) {
jdbcTemplate.update("""
insert into trader_decision_cycle
(run_id, cycle_id, symbol, cycle_time, state, decision_status, blocker)
values (?, ?, ?, ?, ?, ?, ?)
""",
cycle.runId(), cycle.cycleId(), cycle.symbol(), Timestamp.from(cycle.cycleTime()),
positionState.isFlat() ? "FLAT" : "POSITION",
riskDecision.allowAction() ? "ACTION_ALLOWED" : "ACTION_BLOCKED",
riskDecision.blocker());
}
void insertModelOutput(TraderModelOutput modelOutput, TraderMarketSnapshot snapshot) {
jdbcTemplate.update("""
insert into trader_model_output
(run_id, cycle_id, model_output_id, model_bundle_version, calibration_bundle_version,
direction_json, entry_json, continue_json, exit_json, risk_json,
uncertainty, ood_score, usable, blocker)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
modelOutput.runId(), modelOutput.cycleId(), modelOutput.modelOutputId(),
modelOutput.modelBundleVersion(), modelOutput.calibrationBundleVersion(),
jsonCodec.toJson(modelOutput.direction()), jsonCodec.toJson(modelOutput.entry()),
jsonCodec.toJson(modelOutput.continuation()), jsonCodec.toJson(modelOutput.exit()),
jsonCodec.toJson(modelOutput.risk()), modelOutput.uncertainty(), modelOutput.oodScore(),
snapshot.dataReady(), snapshot.dataReady() ? null : "DATA_NOT_READY");
}
void insertPmDecision(TraderPositionManagerDecision decision) {
jdbcTemplate.update("""
insert into trader_position_manager_decision
(run_id, cycle_id, pm_decision_id, model_output_id, position_state_id, account_state_id,
execution_state_id, candidate_action, side, price_plan_id, price_plan_config_hash,
target_position_ratio, add_ratio, reduce_ratio, stop_price, target_price, reason, decision_json)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
decision.runId(), decision.cycleId(), decision.pmDecisionId(), decision.modelOutputId(),
decision.positionStateId(), decision.accountStateId(), decision.executionStateId(),
decision.candidateAction().name(), decision.side().name(), decision.pricePlanId(),
decision.pricePlanConfigHash(), decision.targetPositionRatio(), decision.addRatio(),
decision.reduceRatio(), decision.stopPrice(), decision.targetPrice(), decision.reason(),
jsonCodec.toJson(decision.decisionJson()));
}
void insertRiskDecision(TraderRiskDecision decision) {
jdbcTemplate.update("""
insert into trader_risk_decision
(run_id, cycle_id, risk_decision_id, pm_decision_id, original_action, final_action,
allow_action, blocker, decision_json)
values (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
decision.runId(), decision.cycleId(), decision.riskDecisionId(), decision.pmDecisionId(),
decision.originalAction().name(), decision.finalAction().name(), decision.allowAction(),
decision.blocker(), jsonCodec.toJson(decision.decisionJson()));
}
void insertAction(TraderAction action) {
jdbcTemplate.update("""
insert into trader_action
(run_id, cycle_id, action_id, model_output_id, pm_decision_id, risk_decision_id,
action_type, symbol, side, price_plan_id, price_plan_config_hash, position_ratio,
quantity, stop_price, target_price, reduce_only, idempotency_key, send_status,
reason, action_context_json)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
action.runId(), action.cycleId(), action.actionId(), action.modelOutputId(),
action.pmDecisionId(), action.riskDecisionId(), action.actionType().name(),
action.symbol(), action.side().name(), action.pricePlanId(), action.pricePlanConfigHash(),
action.positionRatio(), action.quantity(), action.stopPrice(), action.targetPrice(),
action.reduceOnly(), action.idempotencyKey(), "PENDING", action.reason(),
jsonCodec.toJson(action.actionContextJson()));
}
}
@@ -0,0 +1,13 @@
package com.quantai.trader.persistence;
import com.quantai.trader.domain.*;
public interface TraderDecisionTraceWriter {
void persistCycleTrace(TraderDecisionCycle cycle,
TraderMarketSnapshot snapshot,
TraderModelOutput modelOutput,
TraderPositionState positionState,
TraderPositionManagerDecision pmDecision,
TraderRiskDecision riskDecision,
TraderAction action);
}
@@ -0,0 +1,23 @@
package com.quantai.trader.persistence;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quantai.trader.domain.TraderException;
import com.quantai.trader.enums.TraderErrorCode;
public class TraderJsonCodec {
private final ObjectMapper objectMapper;
public TraderJsonCodec(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
public String toJson(Object value) {
try {
return objectMapper.writeValueAsString(value);
} catch (JsonProcessingException exception) {
throw new TraderException(TraderErrorCode.TRADER_PERSISTENCE_FAILED,
"failed to serialize trader persistence payload");
}
}
}
@@ -7,8 +7,9 @@ import com.quantai.trader.domain.*;
import com.quantai.trader.enums.PositionSide;
import com.quantai.trader.evidence.EvidenceAppender;
import com.quantai.trader.model.TraderModelService;
import com.quantai.trader.outbox.InMemoryOutboxRepository;
import com.quantai.trader.outbox.TraderOutboxRepository;
import com.quantai.trader.outbox.TraderOutboxEvent;
import com.quantai.trader.persistence.TraderDecisionTraceWriter;
import com.quantai.trader.position.TraderPositionManager;
import com.quantai.trader.risk.RiskGateInput;
import com.quantai.trader.risk.RiskLimits;
@@ -32,7 +33,8 @@ public class TraderP0CycleRunner {
private final TraderRiskGate riskGate;
private final TraderActionFactory actionFactory;
private final EvidenceAppender evidenceAppender;
private final InMemoryOutboxRepository outboxRepository;
private final TraderDecisionTraceWriter traceWriter;
private final TraderOutboxRepository outboxRepository;
public TraderP0CycleRunner(TraderProperties properties,
TraderArtifactLoader artifactLoader,
@@ -41,7 +43,8 @@ public class TraderP0CycleRunner {
TraderRiskGate riskGate,
TraderActionFactory actionFactory,
EvidenceAppender evidenceAppender,
InMemoryOutboxRepository outboxRepository) {
TraderDecisionTraceWriter traceWriter,
TraderOutboxRepository outboxRepository) {
this.properties = properties;
this.artifactLoader = artifactLoader;
this.modelService = modelService;
@@ -49,6 +52,7 @@ public class TraderP0CycleRunner {
this.riskGate = riskGate;
this.actionFactory = actionFactory;
this.evidenceAppender = evidenceAppender;
this.traceWriter = traceWriter;
this.outboxRepository = outboxRepository;
}
@@ -69,6 +73,7 @@ public class TraderP0CycleRunner {
pmInput.executionState(), snapshot, riskLimits()));
evidenceAppender.append(cycle.runId(), cycle.cycleId(), "RISK_DECISION", riskDecision.allowAction(), riskDecision.allowAction() ? "RISK_PASS" : riskDecision.blocker(), riskDecision.blocker(), Map.of());
TraderAction action = actionFactory.create(pmDecision, riskDecision, event.symbol());
traceWriter.persistCycleTrace(cycle, snapshot, modelOutput, pmInput.positionState(), pmDecision, riskDecision, action);
outboxRepository.insert(new TraderOutboxEvent("outbox_" + action.actionId(), action.runId(), action.cycleId(),
"TRADER_ACTION", action.actionId(), "ACTION_CREATED", properties.runMode().name() + "_RECORDER",
Map.of("actionType", action.actionType().name()), action.idempotencyKey(), "PENDING", Instant.now()));