Persistance and recovery

This commit is contained in:
2026-03-19 13:23:23 -03:00
parent 51ef6e182d
commit 82d2e1b5f7
14 changed files with 859 additions and 130 deletions

View File

@ -48,18 +48,18 @@ type Manager struct {
sessions map[string]quickfix.SessionID
tradesMu sync.RWMutex
trades map[string]*listTrade
orderStore domain.OrderStore
store domain.PersistenceStore
notify domain.Notifier
cfg app.FIXConfig
}
func NewManager(cfg app.FIXConfig, orderStore domain.OrderStore, notify domain.Notifier) *Manager {
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
return &Manager{
sessions: make(map[string]quickfix.SessionID),
trades: make(map[string]*listTrade),
orderStore: orderStore,
notify: notify,
cfg: cfg,
sessions: make(map[string]quickfix.SessionID),
trades: make(map[string]*listTrade),
store: store,
notify: notify,
cfg: cfg,
}
}
@ -72,8 +72,13 @@ func (m *Manager) Start() error {
fixApp.onQuoteResponse = m.handleQuoteResponse
fixApp.onExecutionReport = m.handleExecutionReport
fixApp.onExecutionAck = m.handleExecutionAck
fixApp.onRawMessage = m.handleRawMessage
m.app = fixApp
if err := m.loadActiveTrades(); err != nil {
slog.Error("failed to load active trades from DB, starting with empty state", "error", err)
}
f, err := os.Open(m.cfg.SettingsFile)
if err != nil {
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err)
@ -307,47 +312,96 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
SessionID: sessionID,
}
m.tradesMu.Unlock()
// Persist structured message (outside mutex).
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
// Persist outgoing QuoteStatusReport.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
}))
// Persist outgoing Quote.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteID": quoteID,
"Symbol": symbol,
"Side": string(side),
"Price": price.String(),
"OrderQty": orderQty.String(),
"Currency": currency,
"SettlDate": settlDate,
}))
}
// handleQuoteAck handles an incoming QuoteAck (35=CW).
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
m.persistMessage(quoteReqID, parseQuoteAck(msg))
if status != enum.QuoteAckStatus_ACCEPTED {
slog.Warn("handleQuoteAck: unexpected status", "quoteReqID", quoteReqID, "quoteAckStatus", status)
slog.Error("handleQuoteAck: quote rejected by TW", "quoteReqID", quoteReqID, "quoteAckStatus", string(status), "text", text)
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
return
}
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
}
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespID, _ := msg.GetQuoteRespID()
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
if !strings.HasSuffix(quoteRespID, "_TRDREQ") {
slog.Info("handleQuoteResponse: QuoteRespID does not end with _TRDREQ, ignoring", "quoteRespID", quoteRespID)
return
}
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
m.tradesMu.RLock()
_, ok := m.trades[quoteReqID]
m.tradesMu.RUnlock()
if !ok {
slog.Warn("handleQuoteResponse: no trade found for QuoteReqID", "quoteReqID", quoteReqID)
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
return
}
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
// Send QuoteStatusReport (35=AI) as TRDREQACK.
// Always send ACK regardless of whether the trade is in our map.
// TW will keep retrying until it receives an ACK.
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
slog.Error("handleQuoteResponse: failed to send TRDREQACK", "quoteReqID", quoteReqID, "error", ackErr.Error())
slog.Error("handleQuoteResponse: failed to send ACK", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID, "error", ackErr.Error())
return
}
slog.Info("TRDREQACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
}))
// _TRDSUMM is the final message — clean up the trade.
if isTrdSumm {
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
}
}
// handleExecutionReport handles an incoming ExecutionReport (35=8).
@ -378,23 +432,172 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended, cleaning up", "execID", execID, "clOrdID", clOrdID)
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW, cleaning up",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
m.tradesMu.Lock()
delete(m.trades, clOrdID)
m.tradesMu.Unlock()
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
}))
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}
// GetTrades returns a snapshot of all active trades.
func (m *Manager) GetTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
trades = append(trades, domain.ListTrade{
QuoteReqID: t.QuoteReqID,
ListID: t.ListID,
Symbol: t.Symbol,
SecurityIDSrc: string(t.SecurityIDSrc),
Currency: t.Currency,
Side: string(t.Side),
OrderQty: t.OrderQty.String(),
SettlDate: t.SettlDate,
Price: t.Price.String(),
OwnerTraderID: t.OwnerTraderID,
})
}
return trades
}
// handleRawMessage persists raw FIX message strings to the logs table.
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
quoteReqID := extractIdentifier(msg)
if err := m.store.SaveLog(domain.LogEntry{
QuoteReqID: quoteReqID,
RawMsg: "[" + direction + "] " + msg.String(),
}); err != nil {
slog.Error("failed to persist raw log", "error", err)
}
}
// persistMessage saves a structured FIX message to the messages table.
func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) {
if err := m.store.SaveMessage(domain.TradeMessage{
QuoteReqID: quoteReqID,
JMessage: fixJSON,
}); err != nil {
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "quoteReqID", quoteReqID, "error", err)
}
}
// loadActiveTrades reconstructs active trades from today's messages in the database.
func (m *Manager) loadActiveTrades() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
activeTrades := make(map[string]*listTrade)
for _, msg := range messages {
switch msg.JMessage.MsgType {
case "R": // QuoteRequest -> trade is born
if !strings.HasPrefix(msg.QuoteReqID, "LST_") {
continue
}
body := msg.JMessage.Body
nt, _ := body["NegotiationType"].(string)
if nt != "RFQ" {
continue
}
listID, _ := body["ListID"].(string)
if listID == "" {
continue
}
trade := &listTrade{
QuoteReqID: msg.QuoteReqID,
ListID: listID,
}
if v, ok := body["SecurityID"].(string); ok {
trade.Symbol = v
}
if v, ok := body["Currency"].(string); ok {
trade.Currency = v
}
if v, ok := body["Side"].(string); ok {
trade.Side = enum.Side(v)
}
if v, ok := body["OrderQty"].(string); ok {
trade.OrderQty, _ = decimal.NewFromString(v)
}
if v, ok := body["SettlDate"].(string); ok {
trade.SettlDate = v
}
if v, ok := body["OwnerTraderID"].(string); ok {
trade.OwnerTraderID = v
}
activeTrades[msg.QuoteReqID] = trade
case "CW": // QuoteAck — if rejected, trade is dead
body := msg.JMessage.Body
quoteAckStatus, _ := body["QuoteAckStatus"].(string)
if quoteAckStatus != string(enum.QuoteAckStatus_ACCEPTED) {
delete(activeTrades, msg.QuoteReqID)
}
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
body := msg.JMessage.Body
quoteRespID, _ := body["QuoteRespID"].(string)
if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
delete(activeTrades, msg.QuoteReqID)
}
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
body := msg.JMessage.Body
execID, _ := body["ExecID"].(string)
clOrdID, _ := body["ClOrdID"].(string)
if strings.Contains(execID, "_TRDSUMM") {
delete(activeTrades, clOrdID)
}
}
}
m.trades = activeTrades
slog.Info("recovery completed", "activeTrades", len(activeTrades))
return nil
}