Files
quant-trader-service/training/trader_training/promote.py
T

130 lines
5.2 KiB
Python

from __future__ import annotations
import logging
from typing import Any
from trader_training.io_utils import read_json, utc_now_text, write_json, write_text
def promote_artifact_bundle(args: Any) -> None:
artifact_root = args.artifact_root
validation_path = artifact_root.parent / "artifact_validation_result.json"
validation = read_json(validation_path)
if validation.get("status") != "PASS":
_refuse(artifact_root, args.reason, "validation result is not PASS", validation)
if validation.get("release_gate_status") != "PASS":
_refuse(artifact_root, args.reason, "release gate is not PASS", validation)
promotion = {
"promoted_at": utc_now_text(),
"reason": args.reason,
"validation_result_path": str(validation_path),
}
bundle_path = artifact_root / "manifests" / "model_bundle_manifest.json"
bundle = read_json(bundle_path)
model_manifest_path = artifact_root / "manifests" / "model_manifest.json"
model_rows = read_json(model_manifest_path)
calibration_manifest_path = artifact_root / "manifests" / "calibration_manifest.json"
calibration_rows = read_json(calibration_manifest_path)
pm_manifest_path = artifact_root / "manifests" / "position_manager_manifest.json"
pm_manifest = read_json(pm_manifest_path)
export_manifest_path = artifact_root / "manifests" / "training_export_manifest.json"
export_manifest = read_json(export_manifest_path)
# Check every manifest before writing any ACTIVE status, so a bad bundle
# cannot be left half-promoted if one file fails late.
_require_candidate("model_bundle_manifest", bundle.get("status"))
for row in model_rows:
_require_candidate(f"model_manifest.{row.get('model_name')}", row.get("status"))
for row in calibration_rows:
_require_candidate(f"calibration_manifest.{row.get('model_name')}", row.get("status"))
_require_candidate("position_manager_manifest", pm_manifest.get("status"))
_require_candidate("training_export_manifest", export_manifest.get("status"))
bundle["status"] = "ACTIVE"
bundle["promotion_json"] = promotion
for row in model_rows:
row["status"] = "ACTIVE"
row["promotion_json"] = promotion
for row in calibration_rows:
row["status"] = "ACTIVE"
row["promotion_json"] = promotion
pm_manifest["status"] = "ACTIVE"
pm_manifest["promotion_json"] = promotion
export_manifest["status"] = "ACTIVE"
export_manifest["promotion_json"] = promotion
write_json(bundle_path, bundle)
write_json(model_manifest_path, model_rows)
write_json(calibration_manifest_path, calibration_rows)
write_json(pm_manifest_path, pm_manifest)
write_json(export_manifest_path, export_manifest)
result = {"status": "ACTIVE", "artifact_root": str(artifact_root), "promotion": promotion}
write_json(artifact_root.parent / "artifact_promotion_result.json", result)
write_text(
artifact_root.parent / "artifact_promotion_report.md",
"\n".join(
[
"# Artifact Promotion Report",
"",
"- status: ACTIVE",
f"- artifact_root: {artifact_root}",
f"- reason: {args.reason}",
f"- promoted_at: {promotion['promoted_at']}",
"",
]
),
)
logging.info("trader.training.artifact_promoted status=ACTIVE path=%s reason=%s", artifact_root, args.reason)
def _require_candidate(name: str, status: str | None) -> None:
if status != "CANDIDATE":
raise SystemExit(f"artifact promotion refused: {name} status must be CANDIDATE, actual={status}")
def _refuse(artifact_root: Any, reason: str, message: str, validation: dict[str, Any]) -> None:
result = {
"status": "REFUSED",
"artifact_root": str(artifact_root),
"reason": reason,
"message": message,
"validation_status": validation.get("status"),
"release_gate_status": validation.get("release_gate_status"),
"release_gate_reasons": validation.get("release_gate_reasons", []),
"refused_at": utc_now_text(),
}
write_json(artifact_root.parent / "artifact_promotion_result.json", result)
write_text(
artifact_root.parent / "artifact_promotion_report.md",
"\n".join(
[
"# Artifact Promotion Report",
"",
"- status: REFUSED",
f"- artifact_root: {artifact_root}",
f"- reason: {reason}",
f"- message: {message}",
f"- validation_status: {result['validation_status']}",
f"- release_gate_status: {result['release_gate_status']}",
f"- release_gate_reasons: {result['release_gate_reasons']}",
f"- refused_at: {result['refused_at']}",
"",
]
),
)
logging.warning(
"trader.training.artifact_promotion_refused status=REFUSED path=%s reason=%s message=%s releaseGate=%s",
artifact_root,
reason,
message,
result["release_gate_status"],
)
raise SystemExit(f"artifact promotion refused: {message}, reasons={result['release_gate_reasons']}")