diff --git a/src/client/api/rest/controller.go b/src/client/api/rest/controller.go index fcd359c..acd8de0 100644 --- a/src/client/api/rest/controller.go +++ b/src/client/api/rest/controller.go @@ -328,6 +328,30 @@ func (cont *Controller) GetLogs(ctx *gin.Context) { ctx.JSON(http.StatusOK, logs) } +// AllMessages godoc +// @Summary List all FIX application messages of the day +// @Description Returns every FIX application message recorded today (no admin: heartbeats/logon/logout/etc.), sorted by CreatedAt ascending +// @Tags fix +// @Produce json +// @Success 200 {array} domain.Message +// @Router /qfixdpl/v1/messages [post] +func (cont *Controller) AllMessages(ctx *gin.Context) { + setHeaders(ctx, cont.config) + + var req AllMessagesRequest + if err := ctx.ShouldBindJSON(&req); err != nil { + ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()}) + return + } + + if req.APIKey != "1234" { + ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"}) + return + } + + ctx.JSON(http.StatusOK, cont.tradeProvider.GetAllMessages()) +} + // GetPendingQuoteRequests godoc // @Summary List pending QuoteRequests // @Description Returns all QuoteRequests received from TW that have not been quoted yet by the dealer @@ -381,4 +405,3 @@ func (cont *Controller) SendQuote(ctx *gin.Context) { ctx.JSON(http.StatusOK, Msg{Text: "Quote sent"}) } - diff --git a/src/client/api/rest/model.go b/src/client/api/rest/model.go index 32223a6..d831c82 100644 --- a/src/client/api/rest/model.go +++ b/src/client/api/rest/model.go @@ -21,3 +21,7 @@ type SendQuoteRequest struct { QuoteReqID string `json:"QuoteReqID" binding:"required"` Price string `json:"Price" binding:"required" example:"99.6"` } + +type AllMessagesRequest struct { + APIKey string +} diff --git a/src/client/api/rest/routes.go b/src/client/api/rest/routes.go index 2e8c7c6..5003591 100644 --- a/src/client/api/rest/routes.go +++ b/src/client/api/rest/routes.go @@ -26,6 +26,9 @@ func SetRoutes(api *API) { qfixdpl.GET("/quote-requests", cont.GetPendingQuoteRequests) qfixdpl.POST("/quotes", cont.SendQuote) + msgs := v1.Group("/") + msgs.POST("/messages", cont.AllMessages) + backoffice := qfixdpl.Group("/backoffice") backoffice.Use(cont.BackOfficeUser) diff --git a/src/client/api/rest/server.go b/src/client/api/rest/server.go index 8d744da..80ff45c 100644 --- a/src/client/api/rest/server.go +++ b/src/client/api/rest/server.go @@ -22,6 +22,7 @@ type TradeProvider interface { GetTrades() []domain.ListTrade GetPendingQuoteRequests() []domain.ListTrade SendQuote(quoteReqID string, price decimal.Decimal) error + GetAllMessages() []domain.Message } const RedisMaxIdle = 3000 // In ms diff --git a/src/client/fix/manager.go b/src/client/fix/manager.go index caaf2e0..935fab1 100644 --- a/src/client/fix/manager.go +++ b/src/client/fix/manager.go @@ -3,10 +3,12 @@ package fix import ( "log/slog" "os" + "sort" "strings" "sync" "time" + uuid "github.com/satori/go.uuid" "github.com/shopspring/decimal" "quantex.com/qfixdpl/quickfix" @@ -20,6 +22,7 @@ import ( "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport" + "quantex.com/qfixdpl/quickfix/gen/tag" filelog "quantex.com/qfixdpl/quickfix/log/file" "quantex.com/qfixdpl/quickfix/store/file" "quantex.com/qfixdpl/src/app" @@ -42,6 +45,8 @@ type Manager struct { sessions map[string]quickfix.SessionID tradesMu sync.RWMutex trades map[string]*listTrade + messagesMu sync.RWMutex + messages []domain.Message store domain.PersistenceStore notify domain.Notifier cfg app.FIXConfig @@ -52,6 +57,7 @@ func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain. return &Manager{ sessions: make(map[string]quickfix.SessionID), trades: make(map[string]*listTrade), + messages: make([]domain.Message, 0), store: store, notify: notify, cfg: cfg, @@ -84,6 +90,11 @@ func (m *Manager) Start() error { slog.Error(err.Error()) } + if err := m.loadTodayMessages(); err != nil { + err = tracerr.Errorf("failed to load today messages from DB, starting with empty list: %w", err) + slog.Error(err.Error()) + } + f, err := os.Open(m.cfg.SettingsFile) if err != nil { err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err) @@ -255,16 +266,6 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu Quoted: false, } m.tradesMu.Unlock() - - // Persist incoming QuoteRequest. - m.persistMessage(quoteReqID, parsed) - - // Persist outgoing QuoteStatusReport. - m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{ - "QuoteReqID": quoteReqID, - "QuoteStatus": string(enum.QuoteStatus_ACCEPTED), - "OwnerTraderID": ownerTraderID, - })) } // handleQuoteAck handles an incoming QuoteAck (35=CW). @@ -273,8 +274,6 @@ func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.Sessi status, _ := msg.GetQuoteAckStatus() text, _ := msg.GetText() - m.persistMessage(quoteReqID, parseQuoteAck(msg, m.dict)) - if status != enum.QuoteAckStatus_ACCEPTED { err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text) slog.Error(err.Error()) @@ -318,16 +317,6 @@ func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID } slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) - // Persist incoming QuoteResponse. - m.persistMessage(quoteReqID, parseQuoteResponse(msg, m.dict)) - - // Persist outgoing ACK. - m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{ - "QuoteReqID": quoteReqID, - "QuoteRespID": quoteRespID, - "QuoteStatus": string(enum.QuoteStatus_ACCEPTED), - })) - // _TRDSUMM is the final message — clean up the trade. if isTrdSumm { slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID) @@ -380,17 +369,6 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses slog.Info("Trade result received from TW", "execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID) } - - // Persist incoming ExecutionReport. - m.persistMessage(clOrdID, parseExecutionReport(msg, m.dict)) - - // Persist outgoing ExecutionAck. - m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]any{ - "OrderID": orderID, - "ExecID": execID, - "ClOrdID": clOrdID, - "ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED), - })) } // handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW. @@ -528,17 +506,6 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error { slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String()) - m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]any{ - "QuoteReqID": quoteReqID, - "QuoteID": quoteID, - "Symbol": symbol, - "Side": string(side), - "Price": price.String(), - "OrderQty": orderQty.String(), - "Currency": currency, - "SettlDate": settlDate, - })) - return nil } @@ -590,7 +557,11 @@ func (m *Manager) anyActiveSessionID() quickfix.SessionID { return quickfix.SessionID{} } -// handleRawMessage persists raw FIX message strings to the logs table. +// handleRawMessage is the single ingest path for every application-level FIX message +// (admin messages — Logon, Logout, Heartbeat, TestRequest, ResendRequest, SequenceReset — +// go through ToAdmin/FromAdmin and never reach this callback). It persists the raw +// envelope to the logs table, builds a structured Message and saves it to +// the messages table, and appends to the in-memory list. func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) { quoteReqID := extractIdentifier(msg) @@ -601,17 +572,60 @@ func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) { err = tracerr.Errorf("failed to persist raw log: %w", err) slog.Error(err.Error()) } -} -// persistMessage saves a structured FIX message to the messages table. -func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) { - if err := m.store.SaveMessage(domain.TradeMessage{ - QuoteReqID: quoteReqID, - JMessage: fixJSON, - }); err != nil { - err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", fixJSON.MsgType, quoteReqID, err) + msgTypeBytes, _ := msg.Header.GetBytes(tag.MsgType) + msgType := string(msgTypeBytes) + senderCompID, msgSeqNum, sendingTime := extractHeaderMeta(msg) + + fixJSON := buildFixMessageJSON(direction, msgType, quoteReqID, msg, m.dict) + + stored := domain.Message{ + ID: uuid.NewV4().String(), + SenderCompID: senderCompID, + MsgSeqNum: msgSeqNum, + SendingTime: sendingTime, + CreatedAt: time.Now(), + JMessage: fixJSON, + } + + if err := m.store.SaveMessage(stored); err != nil { + err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", msgType, quoteReqID, err) slog.Error(err.Error()) } + + m.messagesMu.Lock() + m.messages = append(m.messages, stored) + m.messagesMu.Unlock() +} + +// GetAllMessages returns a snapshot of every FIX application message recorded today, +// sorted ascending by CreatedAt. +func (m *Manager) GetAllMessages() []domain.Message { + m.messagesMu.RLock() + out := make([]domain.Message, len(m.messages)) + copy(out, m.messages) + m.messagesMu.RUnlock() + + sort.Slice(out, func(i, j int) bool { return out[i].CreatedAt.Before(out[j].CreatedAt) }) + + return out +} + +// loadTodayMessages rebuilds the in-memory message list from today's rows in the DB. +// Must be called before the FIX initiator starts so live ingest doesn't race with replay. +func (m *Manager) loadTodayMessages() error { + messages, err := m.store.GetTodayMessages() + if err != nil { + return err + } + + m.messagesMu.Lock() + m.messages = messages + m.messagesMu.Unlock() + + slog.Info("today messages loaded", "count", len(messages)) + + return nil } // loadActiveTrades reconstructs active trades from today's messages in the database. @@ -624,9 +638,11 @@ func (m *Manager) loadActiveTrades() error { activeTrades := make(map[string]*listTrade) for _, msg := range messages { + quoteReqID := msg.JMessage.QuoteReqID + switch msg.JMessage.MsgType { case "R": // QuoteRequest -> trade is born - if !strings.HasPrefix(msg.QuoteReqID, "LST_") { + if !strings.HasPrefix(quoteReqID, "LST_") { continue } @@ -638,24 +654,24 @@ func (m *Manager) loadActiveTrades() error { continue } - activeTrades[msg.QuoteReqID] = &listTrade{ + activeTrades[quoteReqID] = &listTrade{ QuoteRequest: msg.JMessage, } case "S": // Outgoing Quote — dealer has already quoted this trade - if t, ok := activeTrades[msg.QuoteReqID]; ok { + if t, ok := activeTrades[quoteReqID]; ok { t.Quoted = true t.Price = getDecimal(msg.JMessage.Body, "Price") } case "CW": // QuoteAck — if rejected, trade is dead if getString(msg.JMessage.Body, "QuoteAckStatus") != string(enum.QuoteAckStatus_ACCEPTED) { - delete(activeTrades, msg.QuoteReqID) + delete(activeTrades, quoteReqID) } case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6) if strings.HasSuffix(getString(msg.JMessage.Body, "QuoteRespID"), "_TRDSUMM") { - delete(activeTrades, msg.QuoteReqID) + delete(activeTrades, quoteReqID) } case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4) diff --git a/src/client/fix/parser.go b/src/client/fix/parser.go index c804cab..107c526 100644 --- a/src/client/fix/parser.go +++ b/src/client/fix/parser.go @@ -5,10 +5,7 @@ import ( "quantex.com/qfixdpl/quickfix" "quantex.com/qfixdpl/quickfix/datadictionary" - "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport" - "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest" - "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse" "quantex.com/qfixdpl/quickfix/gen/tag" "quantex.com/qfixdpl/src/domain" ) @@ -50,21 +47,6 @@ func parseQuoteRequest(msg quoterequest.QuoteRequest, dd *datadictionary.DataDic return buildFixMessageJSON("IN", "R", quoteReqID, msg.Message, dd) } -func parseQuoteAck(msg quoteack.QuoteAck, dd *datadictionary.DataDictionary) domain.FixMessageJSON { - quoteReqID, _ := msg.GetQuoteReqID() - return buildFixMessageJSON("IN", "CW", quoteReqID, msg.Message, dd) -} - -func parseQuoteResponse(msg quoteresponse.QuoteResponse, dd *datadictionary.DataDictionary) domain.FixMessageJSON { - quoteReqID, _ := msg.GetQuoteReqID() - return buildFixMessageJSON("IN", "AJ", quoteReqID, msg.Message, dd) -} - -func parseExecutionReport(msg executionreport.ExecutionReport, dd *datadictionary.DataDictionary) domain.FixMessageJSON { - clOrdID, _ := msg.GetClOrdID() - return buildFixMessageJSON("IN", "8", clOrdID, msg.Message, dd) -} - // extractIdentifier extracts the trade identifier from a parsed FIX message. // For ExecutionReport (8) and ExecutionAck (BN), uses ClOrdID (tag 11). // For all other message types, uses QuoteReqID (tag 131). @@ -87,12 +69,17 @@ func extractIdentifier(msg *quickfix.Message) string { return "" } -func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]any) domain.FixMessageJSON { - return domain.FixMessageJSON{ - Direction: "OUT", - MsgType: msgType, - QuoteReqID: quoteReqID, - Body: body, - ReceiveTime: time.Now(), +// extractHeaderMeta reads SenderCompID (49), MsgSeqNum (34) and SendingTime (52) +// from a quickfix.Message header. Returns zero values when a field is absent. +func extractHeaderMeta(msg *quickfix.Message) (senderCompID string, msgSeqNum int, sendingTime time.Time) { + if s, err := msg.Header.GetString(tag.SenderCompID); err == nil { + senderCompID = s } + if n, err := msg.Header.GetInt(tag.MsgSeqNum); err == nil { + msgSeqNum = n + } + if t, err := msg.Header.GetTime(tag.SendingTime); err == nil { + sendingTime = t + } + return } diff --git a/src/client/store/db.sql b/src/client/store/db.sql index 4f82781..6fd9ebb 100644 --- a/src/client/store/db.sql +++ b/src/client/store/db.sql @@ -1,11 +1,12 @@ CREATE TABLE IF NOT EXISTS qfixdpl_messages ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - quote_req_id TEXT NOT NULL, - j_message JSONB NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + id UUID PRIMARY KEY, + sender_comp_id TEXT NOT NULL, + msg_seq_num BIGINT NOT NULL, + j_message JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL ); -CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id); CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at); +CREATE INDEX IF NOT EXISTS idx_messages_session ON qfixdpl_messages(sender_comp_id, msg_seq_num); CREATE TABLE IF NOT EXISTS qfixdpl_logs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), diff --git a/src/client/store/persistence.go b/src/client/store/persistence.go index d5e0120..6475088 100644 --- a/src/client/store/persistence.go +++ b/src/client/store/persistence.go @@ -2,6 +2,7 @@ package store import ( "encoding/json" + "strconv" "strings" "time" @@ -9,15 +10,20 @@ import ( "quantex.com/qfixdpl/src/domain" ) -func (p *Store) SaveMessage(msg domain.TradeMessage) error { +func (p *Store) SaveMessage(msg domain.Message) error { jsonBytes, err := json.Marshal(msg.JMessage) if err != nil { return tracerr.Errorf("error marshaling j_message: %w", err) } _, err = p.db.Exec( - "INSERT INTO qfixdpl_messages (quote_req_id, j_message) VALUES ($1, $2)", - msg.QuoteReqID, string(jsonBytes), + `INSERT INTO qfixdpl_messages (id, sender_comp_id, msg_seq_num, j_message, created_at) + VALUES ($1, $2, $3, $4, $5)`, + msg.ID, + msg.SenderCompID, + strconv.Itoa(msg.MsgSeqNum), + string(jsonBytes), + msg.CreatedAt.UTC().Format(time.RFC3339Nano), ) if err != nil { return tracerr.Errorf("error inserting message: %w", err) @@ -41,25 +47,29 @@ func (p *Store) SaveLog(entry domain.LogEntry) error { return nil } -func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) { +func (p *Store) GetTodayMessages() ([]domain.Message, error) { rows, err := p.db.Query( - "SELECT id, quote_req_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC", + `SELECT id, sender_comp_id, msg_seq_num, j_message, created_at + FROM qfixdpl_messages + WHERE created_at >= current_date + ORDER BY created_at ASC`, ) if err != nil { return nil, tracerr.Errorf("error querying today messages: %w", err) } defer rows.Close() - var messages []domain.TradeMessage + var messages []domain.Message for rows.Next() { var ( - id, quoteReqID string - jMessageRaw []byte - createdAt time.Time + id, senderCompID string + msgSeqNum int + jMessageRaw []byte + createdAt time.Time ) - if err := rows.Scan(&id, "eReqID, &jMessageRaw, &createdAt); err != nil { + if err := rows.Scan(&id, &senderCompID, &msgSeqNum, &jMessageRaw, &createdAt); err != nil { return nil, tracerr.Errorf("error scanning message row: %w", err) } @@ -68,11 +78,22 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) { return nil, tracerr.Errorf("error unmarshaling j_message: %w", err) } - messages = append(messages, domain.TradeMessage{ - ID: id, - QuoteReqID: quoteReqID, - JMessage: jMessage, - CreatedAt: createdAt, + sendingTime, _ := jMessage.Header["SendingTime"].(time.Time) + if sendingTime.IsZero() { + if s, ok := jMessage.Header["SendingTime"].(string); ok { + if t, parseErr := time.Parse(time.RFC3339Nano, s); parseErr == nil { + sendingTime = t + } + } + } + + messages = append(messages, domain.Message{ + ID: id, + SenderCompID: senderCompID, + MsgSeqNum: msgSeqNum, + SendingTime: sendingTime, + CreatedAt: createdAt, + JMessage: jMessage, }) } diff --git a/src/domain/persistence.go b/src/domain/persistence.go index 096aa1f..4507320 100644 --- a/src/domain/persistence.go +++ b/src/domain/persistence.go @@ -19,12 +19,15 @@ type FixMessageJSON struct { ReceiveTime time.Time `json:"receive_time"` } -// TradeMessage es una fila de qfixdpl_messages. -type TradeMessage struct { - ID string `json:"id"` - QuoteReqID string `json:"quote_req_id"` - JMessage FixMessageJSON `json:"j_message"` - CreatedAt time.Time `json:"created_at"` +// Message es una fila de qfixdpl_messages, con la metadata del header FIX hoisted +// para que los consumidores puedan ordenar/filtrar sin parsear el JSON. +type Message struct { + ID string `json:"id"` + SenderCompID string `json:"sender_comp_id"` + MsgSeqNum int `json:"msg_seq_num"` + SendingTime time.Time `json:"sending_time"` + CreatedAt time.Time `json:"created_at"` + JMessage FixMessageJSON `json:"j_message"` } // LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs. @@ -40,8 +43,8 @@ type Logs struct { // PersistenceStore define la interfaz de persistencia. type PersistenceStore interface { - SaveMessage(msg TradeMessage) error + SaveMessage(msg Message) error SaveLog(entry LogEntry) error - GetTodayMessages() ([]TradeMessage, error) + GetTodayMessages() ([]Message, error) GetLogsByQuoteReqID(quoteReqID string) (Logs, error) }