Add endpoint for all messages

This commit is contained in:
Ramiro Paz
2026-05-12 13:27:13 -03:00
parent 6e46fde5d2
commit 4270284362
9 changed files with 171 additions and 112 deletions

View File

@ -3,10 +3,12 @@ package fix
import (
"log/slog"
"os"
"sort"
"strings"
"sync"
"time"
uuid "github.com/satori/go.uuid"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/quickfix"
@ -20,6 +22,7 @@ import (
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport"
"quantex.com/qfixdpl/quickfix/gen/tag"
filelog "quantex.com/qfixdpl/quickfix/log/file"
"quantex.com/qfixdpl/quickfix/store/file"
"quantex.com/qfixdpl/src/app"
@ -42,6 +45,8 @@ type Manager struct {
sessions map[string]quickfix.SessionID
tradesMu sync.RWMutex
trades map[string]*listTrade
messagesMu sync.RWMutex
messages []domain.Message
store domain.PersistenceStore
notify domain.Notifier
cfg app.FIXConfig
@ -52,6 +57,7 @@ func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.
return &Manager{
sessions: make(map[string]quickfix.SessionID),
trades: make(map[string]*listTrade),
messages: make([]domain.Message, 0),
store: store,
notify: notify,
cfg: cfg,
@ -84,6 +90,11 @@ func (m *Manager) Start() error {
slog.Error(err.Error())
}
if err := m.loadTodayMessages(); err != nil {
err = tracerr.Errorf("failed to load today messages from DB, starting with empty list: %w", err)
slog.Error(err.Error())
}
f, err := os.Open(m.cfg.SettingsFile)
if err != nil {
err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err)
@ -255,16 +266,6 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
Quoted: false,
}
m.tradesMu.Unlock()
// Persist incoming QuoteRequest.
m.persistMessage(quoteReqID, parsed)
// Persist outgoing QuoteStatusReport.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
}))
}
// handleQuoteAck handles an incoming QuoteAck (35=CW).
@ -273,8 +274,6 @@ func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.Sessi
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
m.persistMessage(quoteReqID, parseQuoteAck(msg, m.dict))
if status != enum.QuoteAckStatus_ACCEPTED {
err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text)
slog.Error(err.Error())
@ -318,16 +317,6 @@ func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg, m.dict))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
}))
// _TRDSUMM is the final message — clean up the trade.
if isTrdSumm {
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
@ -380,17 +369,6 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg, m.dict))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]any{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
}))
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
@ -528,17 +506,6 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String())
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]any{
"QuoteReqID": quoteReqID,
"QuoteID": quoteID,
"Symbol": symbol,
"Side": string(side),
"Price": price.String(),
"OrderQty": orderQty.String(),
"Currency": currency,
"SettlDate": settlDate,
}))
return nil
}
@ -590,7 +557,11 @@ func (m *Manager) anyActiveSessionID() quickfix.SessionID {
return quickfix.SessionID{}
}
// handleRawMessage persists raw FIX message strings to the logs table.
// handleRawMessage is the single ingest path for every application-level FIX message
// (admin messages — Logon, Logout, Heartbeat, TestRequest, ResendRequest, SequenceReset —
// go through ToAdmin/FromAdmin and never reach this callback). It persists the raw
// envelope to the logs table, builds a structured Message and saves it to
// the messages table, and appends to the in-memory list.
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
quoteReqID := extractIdentifier(msg)
@ -601,17 +572,60 @@ func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
err = tracerr.Errorf("failed to persist raw log: %w", err)
slog.Error(err.Error())
}
}
// persistMessage saves a structured FIX message to the messages table.
func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) {
if err := m.store.SaveMessage(domain.TradeMessage{
QuoteReqID: quoteReqID,
JMessage: fixJSON,
}); err != nil {
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", fixJSON.MsgType, quoteReqID, err)
msgTypeBytes, _ := msg.Header.GetBytes(tag.MsgType)
msgType := string(msgTypeBytes)
senderCompID, msgSeqNum, sendingTime := extractHeaderMeta(msg)
fixJSON := buildFixMessageJSON(direction, msgType, quoteReqID, msg, m.dict)
stored := domain.Message{
ID: uuid.NewV4().String(),
SenderCompID: senderCompID,
MsgSeqNum: msgSeqNum,
SendingTime: sendingTime,
CreatedAt: time.Now(),
JMessage: fixJSON,
}
if err := m.store.SaveMessage(stored); err != nil {
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", msgType, quoteReqID, err)
slog.Error(err.Error())
}
m.messagesMu.Lock()
m.messages = append(m.messages, stored)
m.messagesMu.Unlock()
}
// GetAllMessages returns a snapshot of every FIX application message recorded today,
// sorted ascending by CreatedAt.
func (m *Manager) GetAllMessages() []domain.Message {
m.messagesMu.RLock()
out := make([]domain.Message, len(m.messages))
copy(out, m.messages)
m.messagesMu.RUnlock()
sort.Slice(out, func(i, j int) bool { return out[i].CreatedAt.Before(out[j].CreatedAt) })
return out
}
// loadTodayMessages rebuilds the in-memory message list from today's rows in the DB.
// Must be called before the FIX initiator starts so live ingest doesn't race with replay.
func (m *Manager) loadTodayMessages() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
m.messagesMu.Lock()
m.messages = messages
m.messagesMu.Unlock()
slog.Info("today messages loaded", "count", len(messages))
return nil
}
// loadActiveTrades reconstructs active trades from today's messages in the database.
@ -624,9 +638,11 @@ func (m *Manager) loadActiveTrades() error {
activeTrades := make(map[string]*listTrade)
for _, msg := range messages {
quoteReqID := msg.JMessage.QuoteReqID
switch msg.JMessage.MsgType {
case "R": // QuoteRequest -> trade is born
if !strings.HasPrefix(msg.QuoteReqID, "LST_") {
if !strings.HasPrefix(quoteReqID, "LST_") {
continue
}
@ -638,24 +654,24 @@ func (m *Manager) loadActiveTrades() error {
continue
}
activeTrades[msg.QuoteReqID] = &listTrade{
activeTrades[quoteReqID] = &listTrade{
QuoteRequest: msg.JMessage,
}
case "S": // Outgoing Quote — dealer has already quoted this trade
if t, ok := activeTrades[msg.QuoteReqID]; ok {
if t, ok := activeTrades[quoteReqID]; ok {
t.Quoted = true
t.Price = getDecimal(msg.JMessage.Body, "Price")
}
case "CW": // QuoteAck — if rejected, trade is dead
if getString(msg.JMessage.Body, "QuoteAckStatus") != string(enum.QuoteAckStatus_ACCEPTED) {
delete(activeTrades, msg.QuoteReqID)
delete(activeTrades, quoteReqID)
}
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
if strings.HasSuffix(getString(msg.JMessage.Body, "QuoteRespID"), "_TRDSUMM") {
delete(activeTrades, msg.QuoteReqID)
delete(activeTrades, quoteReqID)
}
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)