From 9fa1349afdd4c1112d42379a13f0bb6b2d8a77a1 Mon Sep 17 00:00:00 2001 From: Facundo Marion Date: Mon, 30 Mar 2026 10:35:30 -0300 Subject: [PATCH] New structure, persistance and recovery --- src/client/api/rest/controller.go | 54 ++++-- src/client/api/rest/routes.go | 3 + src/client/api/rest/server.go | 10 +- src/client/fix/application.go | 13 +- src/client/fix/manager.go | 251 +++++++++++++++++++++++-- src/client/fix/parser.go | 293 ++++++++++++++++++++++++++++++ src/client/store/db.sql | 16 ++ src/client/store/manager.go | 27 +++ src/client/store/persistence.go | 103 +++++++++++ src/cmd/service/service.go | 4 +- src/domain/persistence.go | 63 +++++++ 11 files changed, 805 insertions(+), 32 deletions(-) create mode 100644 src/client/store/db.sql create mode 100644 src/client/store/persistence.go create mode 100644 src/domain/persistence.go diff --git a/src/client/api/rest/controller.go b/src/client/api/rest/controller.go index ae605ea..52eee3c 100644 --- a/src/client/api/rest/controller.go +++ b/src/client/api/rest/controller.go @@ -29,26 +29,56 @@ const ( ) type Controller struct { - pool *redis.Pool - userData app.UserDataProvider - store *store.Store - config Config - notify domain.Notifier - authMutex deadlock.Mutex + pool *redis.Pool + userData app.UserDataProvider + store *store.Store + tradeProvider TradeProvider + config Config + notify domain.Notifier + authMutex deadlock.Mutex } func newController(pool *redis.Pool, userData app.UserDataProvider, - s *store.Store, config Config, n domain.Notifier, + s *store.Store, tp TradeProvider, config Config, n domain.Notifier, ) *Controller { return &Controller{ - pool: pool, - userData: userData, - store: s, - config: config, - notify: n, + pool: pool, + userData: userData, + store: s, + tradeProvider: tp, + config: config, + notify: n, } } +func (cont *Controller) GetTrades(ctx *gin.Context) { + setHeaders(ctx, cont.config) + trades := cont.tradeProvider.GetTrades() + ctx.JSON(http.StatusOK, trades) +} + +func (cont *Controller) GetAllTrades(ctx *gin.Context) { + setHeaders(ctx, cont.config) + trades := cont.tradeProvider.GetAllTrades() + ctx.JSON(http.StatusOK, trades) +} + +func (cont *Controller) GetTradeLogs(ctx *gin.Context) { + setHeaders(ctx, cont.config) + + tradeID := ctx.Param("tradeID") + + logs, err := cont.store.GetLogsByTradeID(tradeID) + if err != nil { + slog.Error("error fetching trade logs", "tradeID", tradeID, "error", err) + ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"}) + + return + } + + ctx.JSON(http.StatusOK, logs) +} + func (cont *Controller) GetUser(ctx *gin.Context) app.User { // This is set on the AuthRequired middleware response, ok := ctx.Get(responseKey) diff --git a/src/client/api/rest/routes.go b/src/client/api/rest/routes.go index 444e17f..29a708d 100644 --- a/src/client/api/rest/routes.go +++ b/src/client/api/rest/routes.go @@ -21,6 +21,9 @@ func SetRoutes(api *API) { qfixpt := v1.Group("/") qfixpt.Use(cont.AuthRequired) qfixpt.GET("/health", cont.HealthCheck) + qfixpt.GET("/trades", cont.GetTrades) + qfixpt.GET("/trades/all", cont.GetAllTrades) + qfixpt.GET("/trades/:tradeID/logs", cont.GetTradeLogs) backoffice := qfixpt.Group("/backoffice") backoffice.Use(cont.BackOfficeUser) diff --git a/src/client/api/rest/server.go b/src/client/api/rest/server.go index 41fa1de..a24e7dc 100644 --- a/src/client/api/rest/server.go +++ b/src/client/api/rest/server.go @@ -32,7 +32,13 @@ type Config struct { EnableJWTAuth bool } -func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier) *API { +// TradeProvider exposes trade data for the REST API. +type TradeProvider interface { + GetTrades() []domain.PostTrade + GetAllTrades() []domain.PostTrade +} + +func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API { // Set up Gin var engine *gin.Engine if version.Environment() == version.EnvironmentTypeProd { @@ -58,7 +64,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, config Confi } api := &API{ - Controller: newController(NewPool(), userData, storeInstance, config, notify), + Controller: newController(NewPool(), userData, storeInstance, tradeProvider, config, notify), Router: engine, Port: config.Port, } diff --git a/src/client/fix/application.go b/src/client/fix/application.go index ffc170c..439ef41 100644 --- a/src/client/fix/application.go +++ b/src/client/fix/application.go @@ -19,6 +19,7 @@ type application struct { onTradeCaptureReport func(tradecapturereport.TradeCaptureReport, quickfix.SessionID) onAllocationReport func(allocationreport.AllocationReport, quickfix.SessionID) onConfirmation func(confirmation.Confirmation, quickfix.SessionID) + onRawMessage func(direction string, msg *quickfix.Message) } func newApplication(n domain.Notifier) *application { @@ -57,13 +58,23 @@ func (a *application) OnLogout(sessionID quickfix.SessionID) { func (a *application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {} -func (a *application) ToApp(_ *quickfix.Message, _ quickfix.SessionID) error { return nil } +func (a *application) ToApp(msg *quickfix.Message, _ quickfix.SessionID) error { + if a.onRawMessage != nil { + a.onRawMessage("OUT", msg) + } + + return nil +} func (a *application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError { return nil } func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError { + if a.onRawMessage != nil { + a.onRawMessage("IN", msg) + } + beginString, _ := msg.Header.GetBytes(tag.BeginString) msgType, _ := msg.Header.GetBytes(tag.MsgType) diff --git a/src/client/fix/manager.go b/src/client/fix/manager.go index dd37f48..3b1bb28 100644 --- a/src/client/fix/manager.go +++ b/src/client/fix/manager.go @@ -3,6 +3,7 @@ package fix import ( "log/slog" "os" + "sync" "time" "github.com/rs/zerolog/log" @@ -26,24 +27,44 @@ import ( // Custom Tradeweb tags not in generated code. const ( - tagTestMessage = 23029 - tagTWTradeID = 23068 - tagTWOrigTradeID = 23096 - tagTWClrID = 23025 - tagDlrClrID = 23027 - tagClearingStatus = 5440 + tagTestMessage = 23029 + tagTWTradeID = 23068 + tagTWOrigTradeID = 23096 + tagTWClrID = 23025 + tagDlrClrID = 23027 + tagClearingStatus = 5440 + tagTradingSystemID = 6731 ) +// postTrade is the internal representation of a trade in memory. +type postTrade struct { + TradeID string + TradeReportID string + TradeReportType string + Side string + LastQty decimal.Decimal + LastPx decimal.Decimal + SettlDate string + TradeDate string + Status domain.PostTradeStatus + TWTradeID string +} + // Manager wraps the QuickFIX initiator for post-trade message handling. type Manager struct { initiator *quickfix.Initiator app *application + tradesMu sync.RWMutex + trades map[string]*postTrade + store domain.PersistenceStore notify domain.Notifier cfg app.FIXConfig } -func NewManager(cfg app.FIXConfig, notify domain.Notifier) *Manager { +func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager { return &Manager{ + trades: make(map[string]*postTrade), + store: store, notify: notify, cfg: cfg, } @@ -56,8 +77,13 @@ func (m *Manager) Start() error { fixApp.onTradeCaptureReport = m.handleTradeCaptureReport fixApp.onAllocationReport = m.handleAllocationReport fixApp.onConfirmation = m.handleConfirmation + fixApp.onRawMessage = m.handleRawMessage m.app = fixApp + if err := m.loadTrades(); err != nil { + slog.Error("failed to load trades from DB, starting with empty state", "error", err) + } + f, err := os.Open(m.cfg.SettingsFile) if err != nil { err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err) @@ -207,23 +233,47 @@ func (m *Manager) handleTradeCaptureReport(msg tradecapturereport.TradeCaptureRe "settlDate", trade.SettlDate, ) + // Persist structured message. + m.persistMessage(tradeID, parseTradeCaptureReportJSON(msg)) + + // Update in-memory state. + var status domain.PostTradeStatus + switch string(tradeReportType) { case "101": // TRDCONF — trade confirmation - // TODO: upsert trade en BD + status = domain.PostTradeStatusActive slog.Info("TRDCONF: trade confirmation", "tradeReportID", tradeReportID, "tradeID", tradeID) case "102": // TRDBLOCK — block trade, log only + status = domain.PostTradeStatusActive slog.Info("TRDBLOCK: block trade", "tradeReportID", tradeReportID, "tradeID", tradeID) case "103": // TRDCORR — trade correction - // TODO: update/correct trade en BD (usando TradeReportRefID) + status = domain.PostTradeStatusCorrected slog.Info("TRDCORR: trade correction", "tradeReportID", tradeReportID, "refID", tradeReportRefID) case "104": // TRDCXL — trade cancellation - // TODO: cancel trade en BD (usando TradeReportRefID) + status = domain.PostTradeStatusCancelled slog.Info("TRDCXL: trade cancel", "tradeReportID", tradeReportID, "refID", tradeReportRefID) case "105", "106", "107", "108": // log and ack only, no business logic + status = domain.PostTradeStatusActive slog.Info("trade report (no business logic)", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID) default: + status = domain.PostTradeStatusActive slog.Warn("unexpected TradeReportType", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID) } + + m.tradesMu.Lock() + m.trades[tradeID] = &postTrade{ + TradeID: tradeID, + TradeReportID: tradeReportID, + TradeReportType: string(tradeReportType), + Side: side, + LastQty: lastQty, + LastPx: lastPx, + SettlDate: settlDate, + TradeDate: tradeDate, + Status: status, + TWTradeID: twTradeID, + } + m.tradesMu.Unlock() } // handleAllocationReport processes an incoming AllocationReport (35=AS). @@ -248,8 +298,15 @@ func (m *Manager) handleAllocationReport(msg allocationreport.AllocationReport, allocTransType, _ := msg.GetAllocTransType() - // Extract TradeID from the first NoExecs entry if available. + // Extract TradeID: try tag 1003 directly, fall back to SecondaryOrderID (198) from NoOrders. tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003) + if tradeID == "" { + if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 { + if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" { + tradeID = v + } + } + } alloc := domain.Allocation{ AllocReportID: allocReportID, @@ -283,15 +340,15 @@ func (m *Manager) handleAllocationReport(msg allocationreport.AllocationReport, "numEntries", len(alloc.Entries), ) + // Persist structured message. + m.persistMessage(tradeID, parseAllocationReportJSON(msg)) + switch allocTransType { case enum.AllocTransType_NEW: - // TODO: insert allocations en BD slog.Info("new allocation", "allocReportID", allocReportID) case enum.AllocTransType_REPLACE: - // TODO: update allocations en BD slog.Info("replace allocation", "allocReportID", allocReportID) case enum.AllocTransType_CANCEL: - // TODO: cancel allocations en BD slog.Info("cancel allocation", "allocReportID", allocReportID) default: slog.Warn("unhandled AllocTransType", "allocTransType", string(allocTransType), "allocReportID", allocReportID) @@ -353,7 +410,171 @@ func (m *Manager) handleConfirmation(msg confirmation.Confirmation, sessionID qu "clearingStatus", conf.ClearingStatus, ) - // TODO: persistir/actualizar confirmation status en BD + // Persist structured message. Use TradeID if available, fall back to ConfirmID. + persistID := getCustomString(&msg.Message.Body.FieldMap, 1003) + if persistID == "" { + persistID = confirmID + } + + m.persistMessage(persistID, parseConfirmationJSON(msg)) +} + +// persistMessage saves a structured FIX message to the messages table. +func (m *Manager) persistMessage(tradeID string, fixJSON domain.FixMessageJSON) { + if err := m.store.SaveMessage(domain.TradeMessage{ + TradeID: tradeID, + JMessage: fixJSON, + }); err != nil { + slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "tradeID", tradeID, "error", err) + } +} + +// handleRawMessage persists raw FIX message strings to the logs table. +func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) { + tradeID := extractTradeIdentifier(msg) + if tradeID == "" { + return + } + + if err := m.store.SaveLog(domain.LogEntry{ + TradeID: tradeID, + RawMsg: "[" + direction + "] " + msg.String(), + }); err != nil { + slog.Error("failed to persist raw log", "error", err) + } +} + +// loadTrades reconstructs all trades and their states from today's messages in the database. +func (m *Manager) loadTrades() error { + messages, err := m.store.GetTodayMessages() + if err != nil { + return err + } + + trades := make(map[string]*postTrade) + + for _, msg := range messages { + switch msg.JMessage.MsgType { + case "AE": // TradeCaptureReport + body := msg.JMessage.Body + tradeReportType, _ := body["TradeReportType"].(string) + tradeID := msg.TradeID + + if tradeID == "" { + continue + } + + t, exists := trades[tradeID] + if !exists { + t = &postTrade{ + TradeID: tradeID, + Status: domain.PostTradeStatusActive, + } + trades[tradeID] = t + } + + if v, ok := body["TradeReportID"].(string); ok { + t.TradeReportID = v + } + + t.TradeReportType = tradeReportType + + if v, ok := body["Side"].(string); ok { + t.Side = v + } + + if v, ok := body["LastQty"].(string); ok { + t.LastQty, _ = decimal.NewFromString(v) + } + + if v, ok := body["LastPx"].(string); ok { + t.LastPx, _ = decimal.NewFromString(v) + } + + if v, ok := body["SettlDate"].(string); ok { + t.SettlDate = v + } + + if v, ok := body["TradeDate"].(string); ok { + t.TradeDate = v + } + + if v, ok := body["TWTradeID"].(string); ok { + t.TWTradeID = v + } + + switch tradeReportType { + case "103": // TRDCORR + t.Status = domain.PostTradeStatusCorrected + case "104": // TRDCXL + t.Status = domain.PostTradeStatusCancelled + default: + if t.Status != domain.PostTradeStatusCorrected && t.Status != domain.PostTradeStatusCancelled { + t.Status = domain.PostTradeStatusActive + } + } + } + } + + active := 0 + for _, t := range trades { + if t.Status == domain.PostTradeStatusActive { + active++ + } + } + + m.tradesMu.Lock() + m.trades = trades + m.tradesMu.Unlock() + + slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active) + + return nil +} + +// GetTrades returns only active trades. +func (m *Manager) GetTrades() []domain.PostTrade { + m.tradesMu.RLock() + defer m.tradesMu.RUnlock() + + var result []domain.PostTrade + + for _, t := range m.trades { + if t.Status == domain.PostTradeStatusActive { + result = append(result, t.toPostTrade()) + } + } + + return result +} + +// GetAllTrades returns all trades regardless of status. +func (m *Manager) GetAllTrades() []domain.PostTrade { + m.tradesMu.RLock() + defer m.tradesMu.RUnlock() + + result := make([]domain.PostTrade, 0, len(m.trades)) + + for _, t := range m.trades { + result = append(result, t.toPostTrade()) + } + + return result +} + +func (t *postTrade) toPostTrade() domain.PostTrade { + return domain.PostTrade{ + TradeID: t.TradeID, + TradeReportID: t.TradeReportID, + TradeReportType: t.TradeReportType, + Side: t.Side, + LastQty: t.LastQty.String(), + LastPx: t.LastPx.String(), + SettlDate: t.SettlDate, + TradeDate: t.TradeDate, + Status: t.Status, + TWTradeID: t.TWTradeID, + } } // Ensure transactTime has a sensible default if not present in the message. diff --git a/src/client/fix/parser.go b/src/client/fix/parser.go index 6952a6e..0bccfba 100644 --- a/src/client/fix/parser.go +++ b/src/client/fix/parser.go @@ -1,9 +1,15 @@ package fix import ( + "time" + "quantex.com/qfixpt/quickfix" "quantex.com/qfixpt/quickfix/gen/enum" + "quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport" + "quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation" "quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport" + "quantex.com/qfixpt/quickfix/gen/tag" + "quantex.com/qfixpt/src/domain" ) func getCustomString(body *quickfix.FieldMap, t int) string { @@ -33,6 +39,293 @@ func getCustomBool(body *quickfix.FieldMap, t int) bool { return v } +func extractHeader(msg *quickfix.Message) map[string]interface{} { + h := make(map[string]interface{}) + + if v, err := msg.Header.GetBytes(tag.BeginString); err == nil { + h["BeginString"] = string(v) + } + + if v, err := msg.Header.GetBytes(tag.MsgType); err == nil { + h["MsgType"] = string(v) + } + + if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil { + h["SenderCompID"] = string(v) + } + + if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil { + h["TargetCompID"] = string(v) + } + + if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil { + h["SendingTime"] = string(v) + } + + return h +} + +func parseTradeCaptureReportJSON(msg tradecapturereport.TradeCaptureReport) domain.FixMessageJSON { + tradeID, _ := msg.GetTradeID() + + body := map[string]interface{}{} + + if v, _ := msg.GetTradeReportID(); v != "" { + body["TradeReportID"] = v + } + + body["TradeID"] = tradeID + + if v, _ := msg.GetExecID(); v != "" { + body["ExecID"] = v + } + + if v, _ := msg.GetTradeReportType(); v != "" { + body["TradeReportType"] = string(v) + } + + if v, _ := msg.GetLastQty(); !v.IsZero() { + body["LastQty"] = v.String() + } + + if v, _ := msg.GetLastPx(); !v.IsZero() { + body["LastPx"] = v.String() + } + + if v, _ := msg.GetSpread(); !v.IsZero() { + body["Spread"] = v.String() + } + + if v, _ := msg.GetSettlDate(); v != "" { + body["SettlDate"] = v + } + + if v, _ := msg.GetTradeDate(); v != "" { + body["TradeDate"] = v + } + + if v, _ := msg.GetGrossTradeAmt(); !v.IsZero() { + body["GrossTradeAmt"] = v.String() + } + + if v, _ := msg.GetTradeReportRefID(); v != "" { + body["TradeReportRefID"] = v + } + + // Multi-leg support. + if v, _ := msg.GetTradeLegRefID(); v != "" { + body["TradeLegRefID"] = v + } + + if v := getCustomString(&msg.Message.Body.FieldMap, tagTradingSystemID); v != "" { + body["TradingSystemID"] = v + } + + // Custom fields. + if v := getCustomString(&msg.Message.Body.FieldMap, tagTWTradeID); v != "" { + body["TWTradeID"] = v + } + + if v := getCustomString(&msg.Message.Body.FieldMap, tagTWOrigTradeID); v != "" { + body["TWOrigTradeID"] = v + } + + // Side-level fields. + sides, sideErr := msg.GetNoSides() + if sideErr == nil && sides.Len() > 0 { + s := sides.Get(0) + + if v, e := s.GetSide(); e == nil { + body["Side"] = string(v) + } + + if v, e := s.GetAccruedInterestAmt(); e == nil { + body["AccruedInterestAmt"] = v.String() + } + + if v, e := s.GetNetMoney(); e == nil { + body["NetMoney"] = v.String() + } + + if v := extractPartyByRole(s, enum.PartyRole_CLEARING_FIRM); v != "" { + body["ClearingFirm"] = v + } + + if v := extractPartyByRole(s, "1007"); v != "" { + body["TradewebTraderID"] = v + } + } + + return domain.FixMessageJSON{ + Direction: "IN", + MsgType: "AE", + TradeID: tradeID, + Header: extractHeader(msg.Message), + Body: body, + ReceiveTime: time.Now(), + } +} + +func parseAllocationReportJSON(msg allocationreport.AllocationReport) domain.FixMessageJSON { + // Primary: tag 1003 directly on the body. + tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003) + + // Fallback: SecondaryOrderID (198) from the NoOrders repeating group. + if tradeID == "" { + if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 { + if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" { + tradeID = v + } + } + } + + body := map[string]interface{}{} + + if v, _ := msg.GetAllocReportID(); v != "" { + body["AllocReportID"] = v + } + + if v, _ := msg.GetAllocTransType(); v != "" { + body["AllocTransType"] = string(v) + } + + body["TradeID"] = tradeID + + // TradingSystemID (6731) — for multi-leg, differentiates each leg. + if v := getCustomString(&msg.Message.Body.FieldMap, tagTradingSystemID); v != "" { + body["TradingSystemID"] = v + } + + // SecondaryOrderID (198) from NoOrders — explicit reference to original trade. + if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 { + if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" { + body["SecondaryOrderID"] = v + } + } + + allocs, allocErr := msg.GetNoAllocs() + if allocErr == nil { + var entries []map[string]interface{} + + for i := 0; i < allocs.Len(); i++ { + entry := allocs.Get(i) + e := map[string]interface{}{} + + if v, err := entry.GetAllocAccount(); err == nil { + e["AllocAccount"] = v + } + + if v, err := entry.GetAllocQty(); err == nil { + e["AllocQty"] = v.String() + } + + if v, err := entry.GetIndividualAllocID(); err == nil { + e["IndividualAllocID"] = v + } + + if v, err := entry.GetAllocPrice(); err == nil { + e["AllocPrice"] = v.String() + } + + entries = append(entries, e) + } + + body["Allocs"] = entries + } + + return domain.FixMessageJSON{ + Direction: "IN", + MsgType: "AS", + TradeID: tradeID, + Header: extractHeader(msg.Message), + Body: body, + ReceiveTime: time.Now(), + } +} + +func parseConfirmationJSON(msg confirmation.Confirmation) domain.FixMessageJSON { + // Try TradeID first, fall back to ConfirmID. + tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003) + if tradeID == "" { + tradeID, _ = msg.GetConfirmID() + } + + body := map[string]interface{}{} + + if v, _ := msg.GetConfirmID(); v != "" { + body["ConfirmID"] = v + } + + if v, _ := msg.GetConfirmStatus(); v != "" { + body["ConfirmStatus"] = string(v) + } + + if v, _ := msg.GetConfirmTransType(); v != "" { + body["ConfirmTransType"] = string(v) + } + + if v, _ := msg.GetIndividualAllocID(); v != "" { + body["IndividualAllocID"] = v + } + + if v, _ := msg.GetTradeDate(); v != "" { + body["TradeDate"] = v + } + + if v := getCustomString(&msg.Message.Body.FieldMap, tagTWClrID); v != "" { + body["TWClrID"] = v + } + + if v := getCustomString(&msg.Message.Body.FieldMap, tagDlrClrID); v != "" { + body["DlrClrID"] = v + } + + if v := getCustomString(&msg.Message.Body.FieldMap, tagClearingStatus); v != "" { + body["ClearingStatus"] = v + } + + return domain.FixMessageJSON{ + Direction: "IN", + MsgType: "AK", + TradeID: tradeID, + Header: extractHeader(msg.Message), + Body: body, + ReceiveTime: time.Now(), + } +} + +func extractTradeIdentifier(msg *quickfix.Message) string { + // Primary: TradeID (1003). + var tradeID quickfix.FIXString + if err := msg.Body.GetField(tag.TradeID, &tradeID); err == nil && string(tradeID) != "" { + return string(tradeID) + } + + // Fallback: SecondaryOrderID (198) — used in AllocationReport to reference the trade. + var secOrderID quickfix.FIXString + if err := msg.Body.GetField(tag.SecondaryOrderID, &secOrderID); err == nil && string(secOrderID) != "" { + return string(secOrderID) + } + + // Last resort: ConfirmID (664) — for Confirmation messages without TradeID. + var confirmID quickfix.FIXString + if err := msg.Body.GetField(tag.ConfirmID, &confirmID); err == nil && string(confirmID) != "" { + return string(confirmID) + } + + return "" +} + +func buildOutgoingMessageJSON(msgType, tradeID string, body map[string]interface{}) domain.FixMessageJSON { + return domain.FixMessageJSON{ + Direction: "OUT", + MsgType: msgType, + TradeID: tradeID, + Body: body, + ReceiveTime: time.Now(), + } +} + // extractPartyByRole iterates the Parties repeating group inside a NoSides entry // and returns the PartyID for the given target role. func extractPartyByRole(side tradecapturereport.NoSides, targetRole enum.PartyRole) string { diff --git a/src/client/store/db.sql b/src/client/store/db.sql new file mode 100644 index 0000000..2132a28 --- /dev/null +++ b/src/client/store/db.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS qfixpt_messages ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + trade_id TEXT NOT NULL, + j_message JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_pt_messages_trade_id ON qfixpt_messages(trade_id); +CREATE INDEX IF NOT EXISTS idx_pt_messages_created_at ON qfixpt_messages(created_at); + +CREATE TABLE IF NOT EXISTS qfixpt_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + trade_id TEXT NOT NULL UNIQUE, + raw_msg TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/src/client/store/manager.go b/src/client/store/manager.go index 73d172f..b80630c 100644 --- a/src/client/store/manager.go +++ b/src/client/store/manager.go @@ -2,7 +2,9 @@ package store import ( + _ "embed" "log/slog" + "strings" "time" "quantex.com.ar/multidb" @@ -11,6 +13,9 @@ import ( "quantex.com/qfixpt/src/common/tracerr" ) +//go:embed db.sql +var schemaSQL string + const dbPingSeconds = 30 type Store struct { @@ -45,9 +50,31 @@ func New(config Config) (*Store, error) { go s.db.PeriodicDBPing(time.Second * dbPingSeconds) + if err := s.ensureTables(); err != nil { + return nil, tracerr.Errorf("error ensuring tables: %w", err) + } + return s, nil } +func (p *Store) ensureTables() error { + statements := strings.Split(schemaSQL, ";") + for _, stmt := range statements { + stmt = strings.TrimSpace(stmt) + if stmt == "" { + continue + } + + if _, err := p.db.Exec(stmt); err != nil { + return tracerr.Errorf("error executing schema statement: %w", err) + } + } + + slog.Info("database tables ensured") + + return nil +} + func (p *Store) CloseDB() { p.db.Close() slog.Info("closing database connection.") diff --git a/src/client/store/persistence.go b/src/client/store/persistence.go new file mode 100644 index 0000000..24c7b2b --- /dev/null +++ b/src/client/store/persistence.go @@ -0,0 +1,103 @@ +package store + +import ( + "encoding/json" + "strings" + "time" + + "quantex.com/qfixpt/src/common/tracerr" + "quantex.com/qfixpt/src/domain" +) + +func (p *Store) SaveMessage(msg domain.TradeMessage) error { + jsonBytes, err := json.Marshal(msg.JMessage) + if err != nil { + return tracerr.Errorf("error marshaling j_message: %w", err) + } + + _, err = p.db.Exec( + "INSERT INTO qfixpt_messages (trade_id, j_message) VALUES ($1, $2)", + msg.TradeID, string(jsonBytes), + ) + if err != nil { + return tracerr.Errorf("error inserting message: %w", err) + } + + return nil +} + +func (p *Store) SaveLog(entry domain.LogEntry) error { + upsertStmt := `INSERT INTO qfixpt_logs (trade_id, raw_msg) + VALUES ($1, $2) + ON CONFLICT (trade_id) DO UPDATE + SET raw_msg = qfixpt_logs.raw_msg || E'\n' || EXCLUDED.raw_msg, + updated_at = NOW()` + + _, err := p.db.Exec(upsertStmt, entry.TradeID, entry.RawMsg) + if err != nil { + return tracerr.Errorf("error upserting log: %w", err) + } + + return nil +} + +func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) { + rows, err := p.db.Query( + "SELECT id, trade_id, j_message, created_at FROM qfixpt_messages WHERE created_at >= current_date ORDER BY created_at ASC", + ) + if err != nil { + return nil, tracerr.Errorf("error querying today messages: %w", err) + } + defer rows.Close() + + var messages []domain.TradeMessage + + for rows.Next() { + var ( + id, tradeID string + jMessageRaw []byte + createdAt time.Time + ) + + if err := rows.Scan(&id, &tradeID, &jMessageRaw, &createdAt); err != nil { + return nil, tracerr.Errorf("error scanning message row: %w", err) + } + + var jMessage domain.FixMessageJSON + if err := json.Unmarshal(jMessageRaw, &jMessage); err != nil { + return nil, tracerr.Errorf("error unmarshaling j_message: %w", err) + } + + messages = append(messages, domain.TradeMessage{ + ID: id, + TradeID: tradeID, + JMessage: jMessage, + CreatedAt: createdAt, + }) + } + + if err := rows.Err(); err != nil { + return nil, tracerr.Errorf("error iterating message rows: %w", err) + } + + return messages, nil +} + +func (p *Store) GetLogsByTradeID(tradeID string) (domain.Logs, error) { + rows, err := p.db.Query("SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", tradeID) + if err != nil { + return domain.Logs{}, tracerr.Errorf("error querying logs: %w", err) + } + defer rows.Close() + + if !rows.Next() { + return domain.Logs{}, nil + } + + var rawMsg string + if err := rows.Scan(&rawMsg); err != nil { + return domain.Logs{}, tracerr.Errorf("error scanning log row: %w", err) + } + + return domain.Logs{Entries: strings.Split(rawMsg, "\n")}, nil +} diff --git a/src/cmd/service/service.go b/src/cmd/service/service.go index 6e9a098..a3fe39a 100644 --- a/src/cmd/service/service.go +++ b/src/cmd/service/service.go @@ -40,7 +40,7 @@ func Runner(cfg app.Config) error { userData := data.New() // Initialize FIX Post-Trade Manager. - fixManager := fix.NewManager(cfg.FIX, notify) + fixManager := fix.NewManager(cfg.FIX, appStore, notify) if err = fixManager.Start(); err != nil { return fmt.Errorf("error starting FIX initiator: %w", err) } @@ -54,7 +54,7 @@ func Runner(cfg app.Config) error { EnableJWTAuth: cfg.EnableJWTAuth, } - api := rest.New(userData, appStore, apiConfig, notify) + api := rest.New(userData, appStore, fixManager, apiConfig, notify) api.Run() cmd.WaitForInterruptSignal(nil) diff --git a/src/domain/persistence.go b/src/domain/persistence.go new file mode 100644 index 0000000..2fa5046 --- /dev/null +++ b/src/domain/persistence.go @@ -0,0 +1,63 @@ +package domain + +import "time" + +// PostTradeStatus represents the lifecycle state of a post-trade record. +type PostTradeStatus string + +const ( + PostTradeStatusActive PostTradeStatus = "active" + PostTradeStatusCorrected PostTradeStatus = "corrected" + PostTradeStatusCancelled PostTradeStatus = "cancelled" +) + +// FixMessageJSON is the structured representation of a FIX message for storage. +type FixMessageJSON struct { + Direction string `json:"direction"` + MsgType string `json:"msg_type"` + TradeID string `json:"trade_id"` + Header map[string]interface{} `json:"header"` + Body map[string]interface{} `json:"body"` + ReceiveTime time.Time `json:"receive_time"` +} + +// TradeMessage is a row in qfixpt_messages. +type TradeMessage struct { + ID string `json:"id"` + TradeID string `json:"trade_id"` + JMessage FixMessageJSON `json:"j_message"` + CreatedAt time.Time `json:"created_at"` +} + +// LogEntry is the DTO for inserting/appending a raw log in qfixpt_logs. +type LogEntry struct { + TradeID string + RawMsg string +} + +// Logs is the response for GET /trades/:tradeID/logs. +type Logs struct { + Entries []string `json:"entries"` +} + +// PostTrade is the exported representation of a post-trade record for the API. +type PostTrade struct { + TradeID string `json:"trade_id"` + TradeReportID string `json:"trade_report_id"` + TradeReportType string `json:"trade_report_type"` + Side string `json:"side"` + LastQty string `json:"last_qty"` + LastPx string `json:"last_px"` + SettlDate string `json:"settl_date"` + TradeDate string `json:"trade_date"` + Status PostTradeStatus `json:"status"` + TWTradeID string `json:"tw_trade_id"` +} + +// PersistenceStore defines the persistence interface for post-trade messages. +type PersistenceStore interface { + SaveMessage(msg TradeMessage) error + SaveLog(entry LogEntry) error + GetTodayMessages() ([]TradeMessage, error) + GetLogsByTradeID(tradeID string) (Logs, error) +} -- 2.45.2