From 82d2e1b5f73092dec0bcc16a99007f2576b0471d Mon Sep 17 00:00:00 2001 From: Facundo Marion Date: Thu, 19 Mar 2026 13:23:23 -0300 Subject: [PATCH] Persistance and recovery --- docs/FLOW_8_4_LIST_TRADING.md | 137 +++++++++++++++- src/client/api/rest/controller.go | 66 +++++--- src/client/api/rest/routes.go | 3 +- src/client/api/rest/server.go | 9 +- src/client/data/orders.go | 41 ----- src/client/fix/application.go | 13 +- src/client/fix/manager.go | 255 +++++++++++++++++++++++++++--- src/client/fix/parser.go | 229 +++++++++++++++++++++++++++ src/client/store/db.sql | 16 ++ src/client/store/manager.go | 27 ++++ src/client/store/persistence.go | 105 ++++++++++++ src/cmd/service/service.go | 5 +- src/domain/order.go | 28 ---- src/domain/persistence.go | 55 +++++++ 14 files changed, 859 insertions(+), 130 deletions(-) delete mode 100644 src/client/data/orders.go create mode 100644 src/client/fix/parser.go create mode 100644 src/client/store/db.sql create mode 100644 src/client/store/persistence.go delete mode 100644 src/domain/order.go create mode 100644 src/domain/persistence.go diff --git a/docs/FLOW_8_4_LIST_TRADING.md b/docs/FLOW_8_4_LIST_TRADING.md index eb0731f..5375d26 100644 --- a/docs/FLOW_8_4_LIST_TRADING.md +++ b/docs/FLOW_8_4_LIST_TRADING.md @@ -10,6 +10,10 @@ The dealer's role is limited to: - Acknowledging messages (35=AI QuoteStatusReport, 35=BN ExecutionAck) - Sending a price quote (35=S) +This document covers the **happy path** (client accepts) and two **alternative flows**: +- **Flow 8.6 — Trade Ended:** Client cancels before or after receiving the quote +- **QuoteAck Rejected:** TW rejects the dealer's quote + ## Participants | Abbreviation | Role | @@ -113,7 +117,7 @@ TW confirms the quote was accepted. |-----|-------|-------| | 1865 | QuoteAckStatus | `1` (ACCEPTED) | -**Note:** If status is not ACCEPTED, the dealer should log a warning — the quote may have been rejected. +**Note:** If status is not ACCEPTED, the dealer logs the rejection (including the `Text` field) and cleans up the trade from memory. See [QuoteAck Rejected](#quoteack-rejected-quote-not-accepted) below. ### Step 5 — QuoteResponse (35=AJ) — TW → Dealer @@ -190,6 +194,115 @@ TW sends the full trade summary with additional details (parties, settlement inf Same format as Step 8. +--- + +## Alternative Flows + +### Flow 8.6 — Trade Ended (Client Cancels) + +The client changes their mind and ends the trade. This can happen at any point after the QuoteRequest — even before the dealer's quote arrives. TW informs the dealer via QuoteResponse (35=AJ) messages with `_TRDEND` and `_TRDSUMM` suffixes instead of the ExecutionReport chain. + +> **Critical:** The dealer MUST send a QuoteStatusReport (35=AI) ACK for every QuoteResponse. If no ACK is sent, TW will retry the message indefinitely (~every 11 seconds). + +``` + TW Dealer + │ │ + │ 1. QuoteRequest (35=R) │ + │ ─────────────────────────────────────────> │ + │ │ + │ 2. QuoteStatusReport (35=AI) [ACK] │ + │ <───────────────────────────────────────── │ + │ │ + │ ┌─── Client ends trade ───┐ │ + │ │ Meanwhile, dealer may │ │ + │ │ still send Quote (S) │ │ + │ └─────────────────────────┘ │ + │ │ + │ 3. QuoteResponse (35=AJ) [_TRDEND] │ + │ QuoteRespType=7 (End Trade) │ + │ ─────────────────────────────────────────> │ + │ │ + │ 4. QuoteStatusReport (35=AI) [ACK] │ + │ <───────────────────────────────────────── │ + │ │ + │ 5. QuoteAck (35=CW) [REJECTED] │ + │ (if quote was sent, TW rejects it) │ + │ ─────────────────────────────────────────> │ + │ │ + │ 6. QuoteResponse (35=AJ) [_TRDSUMM] │ + │ QuoteRespType=7, TradeSummary=Y │ + │ ─────────────────────────────────────────> │ + │ │ + │ 7. QuoteStatusReport (35=AI) [ACK] │ + │ <───────────────────────────────────────── │ + │ │ +``` + +#### Step 3 — QuoteResponse (35=AJ) `_TRDEND` — TW → Dealer + +TW notifies that the client ended the trade. + +| Tag | Field | Value | Notes | +|-----|-------|-------|-------| +| 693 | QuoteRespID | `..._TRDEND` | Suffix identifies this as trade end | +| 694 | QuoteRespType | `7` (End Trade) | | +| 131 | QuoteReqID | Same as original | | + +**Dealer action:** Send QuoteStatusReport (35=AI) with `693=QuoteRespID` and `297=0` (ACCEPTED). + +#### Step 5 — QuoteAck (35=CW) `REJECTED` — TW → Dealer + +If the dealer's Quote (35=S) crossed with the TRDEND, TW rejects it. The QuoteAckStatus will be `2` (REJECTED) with a text like "DPL DLRQUOTE received in an invalid state." + +**Dealer action:** Log the rejection and clean up the trade from memory. + +#### Step 6 — QuoteResponse (35=AJ) `_TRDSUMM` — TW → Dealer + +TW sends the final trade summary confirming the outcome. + +| Tag | Field | Value | Notes | +|-----|-------|-------|-------| +| 693 | QuoteRespID | `..._TRDSUMM` | Final summary message | +| 694 | QuoteRespType | `7` (End Trade) | | +| 22636 | TradeSummary | `Y` | Confirms this is the summary | + +**Dealer action:** Send QuoteStatusReport (35=AI) ACK. Clean up the trade from memory. This is the **terminal message** — no more messages will follow for this QuoteReqID. + +--- + +### QuoteAck Rejected (Quote Not Accepted) + +If TW rejects the dealer's Quote (35=CW with status != ACCEPTED), the trade is dead from the dealer's perspective. + +``` + TW Dealer + │ │ + │ 1-3. (same as happy path) │ + │ │ + │ 4. QuoteAck (35=CW) [REJECTED] │ + │ QuoteAckStatus != 1 │ + │ ─────────────────────────────────────────> │ + │ │ + │ Trade is terminated. │ + │ │ +``` + +**Dealer action:** Log the rejection (including the `Text` field with the reason) and remove the trade from memory. No further action needed — TW may or may not send subsequent messages for this QuoteReqID. + +--- + +## QuoteRespID Suffix Routing in `handleQuoteResponse` + +All QuoteResponse (35=AJ) messages are routed by the suffix of the `QuoteRespID` (tag 693): + +``` +QuoteRespID ends with "_TRDREQ" → Trade request (flow 8.4 happy path) — ACK +QuoteRespID ends with "_TRDEND" → Trade ended by client (flow 8.6) — ACK +QuoteRespID ends with "_TRDSUMM" → Trade summary (flow 8.6 final) — ACK + cleanup +QuoteRespID ends with "_LISTEND" → List ended — ACK +Other suffix → Ignored (logged) +``` + ## Code Reference The implementation lives in `src/client/fix/manager.go`: @@ -197,20 +310,32 @@ The implementation lives in `src/client/fix/manager.go`: | Handler | Triggers on | Action | |---------|------------|--------| | `handleQuoteRequest` | 35=R | Sends 35=AI (ack) + 35=S (quote) | -| `handleQuoteAck` | 35=CW | Logs status | -| `handleQuoteResponse` | 35=AJ | Sends 35=AI (TRDREQACK) | +| `handleQuoteAck` | 35=CW | If rejected: logs + cleans up trade. If accepted: logs | +| `handleQuoteResponse` | 35=AJ | Sends 35=AI (ACK). Routes by QuoteRespID suffix. Cleans up on `_TRDSUMM` | | `handleExecutionReport` | 35=8 | Sends 35=BN (ack) + routes by ExecID suffix | | `sendQuoteStatusReport` | — | Builds 35=AI for QuoteRequest ack | -| `sendTradeRequestAck` | — | Builds 35=AI for TRDREQACK | +| `sendTradeRequestAck` | — | Builds 35=AI for QuoteResponse ack (all suffixes) | | `sendExecutionAck` | — | Builds 35=BN for ExecutionReport ack | ### ExecID Routing in `handleExecutionReport` ``` ExecID contains "_LISTEND" → Log only, await trade result -ExecID contains "_TRDEND" → Log + cleanup trade from memory -ExecID contains "_TRDSUMM" → Log trade summary +ExecID contains "_TRDEND" → Log trade end +ExecID contains "_TRDSUMM" → Log trade summary + cleanup trade from memory ExecType = F (fallback) → Log generic trade result ``` The order matters: ExecID suffix checks run before ExecType checks, because `_TRDEND` and `_TRDSUMM` both have `ExecType=F`. + +### Trade Cleanup Paths + +A trade is removed from memory in any of these scenarios: + +| Trigger | Message | Condition | +|---------|---------|-----------| +| QuoteAck rejected | 35=CW | `QuoteAckStatus != ACCEPTED` | +| QuoteResponse summary | 35=AJ | `QuoteRespID` ends with `_TRDSUMM` (flow 8.6) | +| ExecutionReport summary | 35=8 | `ExecID` contains `_TRDSUMM` (flow 8.4) | + +The `loadActiveTrades` recovery function replays today's messages and applies the same cleanup rules to reconstruct accurate state on restart. diff --git a/src/client/api/rest/controller.go b/src/client/api/rest/controller.go index 0b515c2..913ebd0 100644 --- a/src/client/api/rest/controller.go +++ b/src/client/api/rest/controller.go @@ -29,25 +29,25 @@ const ( ) type Controller struct { - pool *redis.Pool - userData app.UserDataProvider - store *store.Store - orderStore domain.OrderStore - config Config - notify domain.Notifier - authMutex deadlock.Mutex + pool *redis.Pool + userData app.UserDataProvider + store *store.Store + tradeProvider TradeProvider + config Config + notify domain.Notifier + authMutex deadlock.Mutex } func newController(pool *redis.Pool, userData app.UserDataProvider, - s *store.Store, orderStore domain.OrderStore, config Config, n domain.Notifier, + s *store.Store, tradeProvider TradeProvider, config Config, n domain.Notifier, ) *Controller { return &Controller{ - pool: pool, - userData: userData, - store: s, - orderStore: orderStore, - config: config, - notify: n, + pool: pool, + userData: userData, + store: s, + tradeProvider: tradeProvider, + config: config, + notify: n, } } @@ -291,15 +291,37 @@ func allowed(origin string, config Config) bool { return false } -// GetOrders godoc -// @Summary List received FIX orders -// @Description Returns all NewOrderSingle messages received via FIX +// GetTrades godoc +// @Summary List active trades +// @Description Returns all active List Trading trades // @Tags fix // @Produce json -// @Success 200 {array} domain.Order -// @Router /qfixdpl/v1/orders [get] -func (cont *Controller) GetOrders(ctx *gin.Context) { - orders := cont.orderStore.GetOrders() - ctx.JSON(http.StatusOK, orders) +// @Success 200 {array} domain.ListTrade +// @Router /qfixdpl/v1/trades [get] +func (cont *Controller) GetTrades(ctx *gin.Context) { + trades := cont.tradeProvider.GetTrades() + ctx.JSON(http.StatusOK, trades) +} + +// GetLogs godoc +// @Summary Get raw FIX logs for a trade +// @Description Returns raw FIX message logs for a given QuoteReqID +// @Tags fix +// @Produce json +// @Param quoteReqID path string true "QuoteReqID" +// @Success 200 {object} domain.Logs +// @Router /qfixdpl/v1/trades/{quoteReqID}/logs [get] +func (cont *Controller) GetLogs(ctx *gin.Context) { + quoteReqID := ctx.Param("quoteReqID") + + logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID) + if err != nil { + slog.Error("GetLogs: error fetching logs", "quoteReqID", quoteReqID, "error", err) + ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"}) + + return + } + + ctx.JSON(http.StatusOK, logs) } diff --git a/src/client/api/rest/routes.go b/src/client/api/rest/routes.go index c895dcc..3980d3a 100644 --- a/src/client/api/rest/routes.go +++ b/src/client/api/rest/routes.go @@ -21,7 +21,8 @@ func SetRoutes(api *API) { qfixdpl := v1.Group("/") qfixdpl.Use(cont.AuthRequired) qfixdpl.GET("/health", cont.HealthCheck) - qfixdpl.GET("/orders", cont.GetOrders) + qfixdpl.GET("/trades", cont.GetTrades) + qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs) backoffice := qfixdpl.Group("/backoffice") backoffice.Use(cont.BackOfficeUser) diff --git a/src/client/api/rest/server.go b/src/client/api/rest/server.go index f3e6cdb..db29f0f 100644 --- a/src/client/api/rest/server.go +++ b/src/client/api/rest/server.go @@ -16,6 +16,11 @@ import ( "quantex.com/qfixdpl/src/domain" ) +// TradeProvider exposes trade data from the FIX manager. +type TradeProvider interface { + GetTrades() []domain.ListTrade +} + const RedisMaxIdle = 3000 // In ms type API struct { @@ -32,7 +37,7 @@ type Config struct { EnableJWTAuth bool } -func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore domain.OrderStore, config Config, notify domain.Notifier) *API { +func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API { // Set up Gin var engine *gin.Engine if version.Environment() == version.EnvironmentTypeProd { @@ -58,7 +63,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore d } api := &API{ - Controller: newController(NewPool(), userData, storeInstance, orderStore, config, notify), + Controller: newController(NewPool(), userData, storeInstance, tradeProvider, config, notify), Router: engine, Port: config.Port, } diff --git a/src/client/data/orders.go b/src/client/data/orders.go deleted file mode 100644 index 0d56a69..0000000 --- a/src/client/data/orders.go +++ /dev/null @@ -1,41 +0,0 @@ -package data - -import ( - "sync" - - "quantex.com/qfixdpl/src/domain" -) - -type InMemoryOrderStore struct { - mu sync.RWMutex - orders []domain.Order -} - -func NewOrderStore() *InMemoryOrderStore { - return &InMemoryOrderStore{} -} - -func (s *InMemoryOrderStore) SaveOrder(order domain.Order) { - s.mu.Lock() - defer s.mu.Unlock() - s.orders = append(s.orders, order) -} - -func (s *InMemoryOrderStore) GetOrders() []domain.Order { - s.mu.RLock() - defer s.mu.RUnlock() - result := make([]domain.Order, len(s.orders)) - copy(result, s.orders) - return result -} - -func (s *InMemoryOrderStore) GetOrderByClOrdID(id string) (domain.Order, bool) { - s.mu.RLock() - defer s.mu.RUnlock() - for _, o := range s.orders { - if o.ClOrdID == id { - return o, true - } - } - return domain.Order{}, false -} diff --git a/src/client/fix/application.go b/src/client/fix/application.go index 255ef55..163bac2 100644 --- a/src/client/fix/application.go +++ b/src/client/fix/application.go @@ -26,6 +26,7 @@ type application struct { onQuoteResponse func(quoteresponse.QuoteResponse, quickfix.SessionID) onExecutionReport func(executionreport.ExecutionReport, quickfix.SessionID) onExecutionAck func(executionack.ExecutionAck, quickfix.SessionID) + onRawMessage func(direction string, msg *quickfix.Message) } func newApplication(n domain.Notifier) *application { @@ -67,7 +68,13 @@ func (a *application) OnLogout(sessionID quickfix.SessionID) { func (a *application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {} -func (a *application) ToApp(_ *quickfix.Message, _ quickfix.SessionID) error { return nil } +func (a *application) ToApp(msg *quickfix.Message, _ quickfix.SessionID) error { + if a.onRawMessage != nil { + a.onRawMessage("OUT", msg) + } + + return nil +} func (a *application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError { return nil @@ -88,6 +95,10 @@ func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionI "rawMsg", msg.String(), ) + if a.onRawMessage != nil { + a.onRawMessage("IN", msg) + } + rejErr := a.router.Route(msg, sessionID) if rejErr != nil { slog.Error("FIX FromApp routing failed", diff --git a/src/client/fix/manager.go b/src/client/fix/manager.go index 260a7d0..802ce81 100644 --- a/src/client/fix/manager.go +++ b/src/client/fix/manager.go @@ -48,18 +48,18 @@ type Manager struct { sessions map[string]quickfix.SessionID tradesMu sync.RWMutex trades map[string]*listTrade - orderStore domain.OrderStore + store domain.PersistenceStore notify domain.Notifier cfg app.FIXConfig } -func NewManager(cfg app.FIXConfig, orderStore domain.OrderStore, notify domain.Notifier) *Manager { +func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager { return &Manager{ - sessions: make(map[string]quickfix.SessionID), - trades: make(map[string]*listTrade), - orderStore: orderStore, - notify: notify, - cfg: cfg, + sessions: make(map[string]quickfix.SessionID), + trades: make(map[string]*listTrade), + store: store, + notify: notify, + cfg: cfg, } } @@ -72,8 +72,13 @@ func (m *Manager) Start() error { fixApp.onQuoteResponse = m.handleQuoteResponse fixApp.onExecutionReport = m.handleExecutionReport fixApp.onExecutionAck = m.handleExecutionAck + fixApp.onRawMessage = m.handleRawMessage m.app = fixApp + if err := m.loadActiveTrades(); err != nil { + slog.Error("failed to load active trades from DB, starting with empty state", "error", err) + } + f, err := os.Open(m.cfg.SettingsFile) if err != nil { err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err) @@ -307,47 +312,96 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu SessionID: sessionID, } m.tradesMu.Unlock() + + // Persist structured message (outside mutex). + m.persistMessage(quoteReqID, parseQuoteRequest(msg)) + + // Persist outgoing QuoteStatusReport. + m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{ + "QuoteReqID": quoteReqID, + "QuoteStatus": string(enum.QuoteStatus_ACCEPTED), + "OwnerTraderID": ownerTraderID, + })) + + // Persist outgoing Quote. + m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{ + "QuoteReqID": quoteReqID, + "QuoteID": quoteID, + "Symbol": symbol, + "Side": string(side), + "Price": price.String(), + "OrderQty": orderQty.String(), + "Currency": currency, + "SettlDate": settlDate, + })) } // handleQuoteAck handles an incoming QuoteAck (35=CW). func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) { quoteReqID, _ := msg.GetQuoteReqID() status, _ := msg.GetQuoteAckStatus() + text, _ := msg.GetText() + + m.persistMessage(quoteReqID, parseQuoteAck(msg)) if status != enum.QuoteAckStatus_ACCEPTED { - slog.Warn("handleQuoteAck: unexpected status", "quoteReqID", quoteReqID, "quoteAckStatus", status) + slog.Error("handleQuoteAck: quote rejected by TW", "quoteReqID", quoteReqID, "quoteAckStatus", string(status), "text", text) + + m.tradesMu.Lock() + delete(m.trades, quoteReqID) + m.tradesMu.Unlock() + + return } + + slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID) } // handleQuoteResponse handles an incoming QuoteResponse (35=AJ). +// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes. func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) { quoteReqID, _ := msg.GetQuoteReqID() quoteRespID, _ := msg.GetQuoteRespID() slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) - if !strings.HasSuffix(quoteRespID, "_TRDREQ") { - slog.Info("handleQuoteResponse: QuoteRespID does not end with _TRDREQ, ignoring", "quoteRespID", quoteRespID) - return - } + isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ") + isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND") + isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM") + isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND") - m.tradesMu.RLock() - _, ok := m.trades[quoteReqID] - m.tradesMu.RUnlock() - - if !ok { - slog.Warn("handleQuoteResponse: no trade found for QuoteReqID", "quoteReqID", quoteReqID) + if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd { + slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID) return } // TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future. - // Send QuoteStatusReport (35=AI) as TRDREQACK. + // Always send ACK regardless of whether the trade is in our map. + // TW will keep retrying until it receives an ACK. if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil { - slog.Error("handleQuoteResponse: failed to send TRDREQACK", "quoteReqID", quoteReqID, "error", ackErr.Error()) + slog.Error("handleQuoteResponse: failed to send ACK", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID, "error", ackErr.Error()) return } - slog.Info("TRDREQACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) + slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) + + // Persist incoming QuoteResponse. + m.persistMessage(quoteReqID, parseQuoteResponse(msg)) + + // Persist outgoing ACK. + m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{ + "QuoteReqID": quoteReqID, + "QuoteRespID": quoteRespID, + "QuoteStatus": string(enum.QuoteStatus_ACCEPTED), + })) + + // _TRDSUMM is the final message — clean up the trade. + if isTrdSumm { + slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID) + m.tradesMu.Lock() + delete(m.trades, quoteReqID) + m.tradesMu.Unlock() + } } // handleExecutionReport handles an incoming ExecutionReport (35=8). @@ -378,23 +432,172 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses "execID", execID, "clOrdID", clOrdID) case strings.Contains(execID, "_TRDEND"): - slog.Info("Trade ended, cleaning up", "execID", execID, "clOrdID", clOrdID) + slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID) + + case strings.Contains(execID, "_TRDSUMM"): + slog.Info("Trade summary received from TW, cleaning up", + "execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID) m.tradesMu.Lock() delete(m.trades, clOrdID) m.tradesMu.Unlock() - case strings.Contains(execID, "_TRDSUMM"): - slog.Info("Trade summary received from TW", - "execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID) - case execType == enum.ExecType_TRADE: slog.Info("Trade result received from TW", "execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID) } + + // Persist incoming ExecutionReport. + m.persistMessage(clOrdID, parseExecutionReport(msg)) + + // Persist outgoing ExecutionAck. + m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{ + "OrderID": orderID, + "ExecID": execID, + "ClOrdID": clOrdID, + "ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED), + })) } // handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW. func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) { // Logged in application.go, no further action needed. } + +// GetTrades returns a snapshot of all active trades. +func (m *Manager) GetTrades() []domain.ListTrade { + m.tradesMu.RLock() + defer m.tradesMu.RUnlock() + + trades := make([]domain.ListTrade, 0, len(m.trades)) + for _, t := range m.trades { + trades = append(trades, domain.ListTrade{ + QuoteReqID: t.QuoteReqID, + ListID: t.ListID, + Symbol: t.Symbol, + SecurityIDSrc: string(t.SecurityIDSrc), + Currency: t.Currency, + Side: string(t.Side), + OrderQty: t.OrderQty.String(), + SettlDate: t.SettlDate, + Price: t.Price.String(), + OwnerTraderID: t.OwnerTraderID, + }) + } + + return trades +} + +// handleRawMessage persists raw FIX message strings to the logs table. +func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) { + quoteReqID := extractIdentifier(msg) + + if err := m.store.SaveLog(domain.LogEntry{ + QuoteReqID: quoteReqID, + RawMsg: "[" + direction + "] " + msg.String(), + }); err != nil { + slog.Error("failed to persist raw log", "error", err) + } +} + +// persistMessage saves a structured FIX message to the messages table. +func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) { + if err := m.store.SaveMessage(domain.TradeMessage{ + QuoteReqID: quoteReqID, + JMessage: fixJSON, + }); err != nil { + slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "quoteReqID", quoteReqID, "error", err) + } +} + +// loadActiveTrades reconstructs active trades from today's messages in the database. +func (m *Manager) loadActiveTrades() error { + messages, err := m.store.GetTodayMessages() + if err != nil { + return err + } + + activeTrades := make(map[string]*listTrade) + + for _, msg := range messages { + switch msg.JMessage.MsgType { + case "R": // QuoteRequest -> trade is born + if !strings.HasPrefix(msg.QuoteReqID, "LST_") { + continue + } + + body := msg.JMessage.Body + + nt, _ := body["NegotiationType"].(string) + if nt != "RFQ" { + continue + } + + listID, _ := body["ListID"].(string) + if listID == "" { + continue + } + + trade := &listTrade{ + QuoteReqID: msg.QuoteReqID, + ListID: listID, + } + + if v, ok := body["SecurityID"].(string); ok { + trade.Symbol = v + } + + if v, ok := body["Currency"].(string); ok { + trade.Currency = v + } + + if v, ok := body["Side"].(string); ok { + trade.Side = enum.Side(v) + } + + if v, ok := body["OrderQty"].(string); ok { + trade.OrderQty, _ = decimal.NewFromString(v) + } + + if v, ok := body["SettlDate"].(string); ok { + trade.SettlDate = v + } + + if v, ok := body["OwnerTraderID"].(string); ok { + trade.OwnerTraderID = v + } + + activeTrades[msg.QuoteReqID] = trade + + case "CW": // QuoteAck — if rejected, trade is dead + body := msg.JMessage.Body + quoteAckStatus, _ := body["QuoteAckStatus"].(string) + + if quoteAckStatus != string(enum.QuoteAckStatus_ACCEPTED) { + delete(activeTrades, msg.QuoteReqID) + } + + case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6) + body := msg.JMessage.Body + quoteRespID, _ := body["QuoteRespID"].(string) + + if strings.HasSuffix(quoteRespID, "_TRDSUMM") { + delete(activeTrades, msg.QuoteReqID) + } + + case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4) + body := msg.JMessage.Body + execID, _ := body["ExecID"].(string) + clOrdID, _ := body["ClOrdID"].(string) + + if strings.Contains(execID, "_TRDSUMM") { + delete(activeTrades, clOrdID) + } + } + } + + m.trades = activeTrades + slog.Info("recovery completed", "activeTrades", len(activeTrades)) + + return nil +} diff --git a/src/client/fix/parser.go b/src/client/fix/parser.go new file mode 100644 index 0000000..ef25cff --- /dev/null +++ b/src/client/fix/parser.go @@ -0,0 +1,229 @@ +package fix + +import ( + "time" + + "quantex.com/qfixdpl/quickfix" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse" + "quantex.com/qfixdpl/quickfix/gen/tag" + "quantex.com/qfixdpl/src/domain" +) + +func extractHeader(msg *quickfix.Message) map[string]interface{} { + header := make(map[string]interface{}) + + if v, err := msg.Header.GetBytes(tag.BeginString); err == nil { + header["BeginString"] = string(v) + } + if v, err := msg.Header.GetBytes(tag.MsgType); err == nil { + header["MsgType"] = string(v) + } + if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil { + header["SenderCompID"] = string(v) + } + if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil { + header["TargetCompID"] = string(v) + } + if v, err := msg.Header.GetBytes(tag.MsgSeqNum); err == nil { + header["MsgSeqNum"] = string(v) + } + if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil { + header["SendingTime"] = string(v) + } + + return header +} + +func parseQuoteRequest(msg quoterequest.QuoteRequest) domain.FixMessageJSON { + quoteReqID, _ := msg.GetQuoteReqID() + body := map[string]interface{}{"QuoteReqID": quoteReqID} + + if relSyms, err := msg.GetNoRelatedSym(); err == nil && relSyms.Len() > 0 { + sym := relSyms.Get(0) + if v, e := sym.GetSecurityID(); e == nil { + body["SecurityID"] = v + } + if v, e := sym.GetSecurityIDSource(); e == nil { + body["SecurityIDSource"] = string(v) + } + if v, e := sym.GetCurrency(); e == nil { + body["Currency"] = v + } + if v, e := sym.GetSide(); e == nil { + body["Side"] = string(v) + } + if v, e := sym.GetOrderQty(); e == nil { + body["OrderQty"] = v.String() + } + if v, e := sym.GetSettlDate(); e == nil { + body["SettlDate"] = v + } + if v, e := sym.GetListID(); e == nil { + body["ListID"] = v + } + if v, e := sym.GetOwnerTraderID(); e == nil { + body["OwnerTraderID"] = v + } + if v, e := sym.GetNegotiationType(); e == nil { + body["NegotiationType"] = v + } + } + + return domain.FixMessageJSON{ + Direction: "IN", + MsgType: "R", + QuoteReqID: quoteReqID, + Header: extractHeader(msg.Message), + Body: body, + ReceiveTime: time.Now(), + } +} + +func parseQuoteAck(msg quoteack.QuoteAck) domain.FixMessageJSON { + quoteReqID, _ := msg.GetQuoteReqID() + body := map[string]interface{}{"QuoteReqID": quoteReqID} + + if v, e := msg.GetQuoteID(); e == nil { + body["QuoteID"] = v + } + if v, e := msg.GetQuoteAckStatus(); e == nil { + body["QuoteAckStatus"] = string(v) + } + if v, e := msg.GetText(); e == nil { + body["Text"] = v + } + + return domain.FixMessageJSON{ + Direction: "IN", + MsgType: "CW", + QuoteReqID: quoteReqID, + Header: extractHeader(msg.Message), + Body: body, + ReceiveTime: time.Now(), + } +} + +func parseQuoteResponse(msg quoteresponse.QuoteResponse) domain.FixMessageJSON { + quoteReqID, _ := msg.GetQuoteReqID() + body := map[string]interface{}{"QuoteReqID": quoteReqID} + + if v, e := msg.GetQuoteRespID(); e == nil { + body["QuoteRespID"] = v + } + if v, e := msg.GetQuoteRespType(); e == nil { + body["QuoteRespType"] = string(v) + } + if v, e := msg.GetSide(); e == nil { + body["Side"] = string(v) + } + if v, e := msg.GetPrice(); e == nil { + body["Price"] = v.String() + } + if v, e := msg.GetOrderQty(); e == nil { + body["OrderQty"] = v.String() + } + if v, e := msg.GetClOrdID(); e == nil { + body["ClOrdID"] = v + } + + return domain.FixMessageJSON{ + Direction: "IN", + MsgType: "AJ", + QuoteReqID: quoteReqID, + Header: extractHeader(msg.Message), + Body: body, + ReceiveTime: time.Now(), + } +} + +func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessageJSON { + clOrdID, _ := msg.GetClOrdID() + body := map[string]interface{}{"ClOrdID": clOrdID} + + if v, e := msg.GetExecID(); e == nil { + body["ExecID"] = v + } + if v, e := msg.GetOrderID(); e == nil { + body["OrderID"] = v + } + if v, e := msg.GetExecType(); e == nil { + body["ExecType"] = string(v) + } + if v, e := msg.GetOrdStatus(); e == nil { + body["OrdStatus"] = string(v) + } + if v, e := msg.GetListID(); e == nil { + body["ListID"] = v + } + if v, e := msg.GetSide(); e == nil { + body["Side"] = string(v) + } + if v, e := msg.GetSymbol(); e == nil { + body["Symbol"] = v + } + if v, e := msg.GetSecurityID(); e == nil { + body["SecurityID"] = v + } + if v, e := msg.GetCurrency(); e == nil { + body["Currency"] = v + } + if v, e := msg.GetPrice(); e == nil { + body["Price"] = v.String() + } + if v, e := msg.GetLastPx(); e == nil { + body["LastPx"] = v.String() + } + if v, e := msg.GetLastQty(); e == nil { + body["LastQty"] = v.String() + } + if v, e := msg.GetOrderQty(); e == nil { + body["OrderQty"] = v.String() + } + if v, e := msg.GetSettlDate(); e == nil { + body["SettlDate"] = v + } + + return domain.FixMessageJSON{ + Direction: "IN", + MsgType: "8", + QuoteReqID: clOrdID, + Header: extractHeader(msg.Message), + Body: body, + ReceiveTime: time.Now(), + } +} + +// extractIdentifier extracts the trade identifier from a parsed FIX message. +// For ExecutionReport (8) and ExecutionAck (BN), uses ClOrdID (tag 11). +// For all other message types, uses QuoteReqID (tag 131). +func extractIdentifier(msg *quickfix.Message) string { + msgType, _ := msg.Header.GetBytes(tag.MsgType) + + switch string(msgType) { + case "8", "BN": + var clOrdID quickfix.FIXString + if err := msg.Body.GetField(tag.ClOrdID, &clOrdID); err == nil { + return string(clOrdID) + } + default: + var quoteReqID quickfix.FIXString + if err := msg.Body.GetField(tag.QuoteReqID, "eReqID); err == nil { + return string(quoteReqID) + } + } + + return "" +} + +func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]interface{}) domain.FixMessageJSON { + return domain.FixMessageJSON{ + Direction: "OUT", + MsgType: msgType, + QuoteReqID: quoteReqID, + Body: body, + ReceiveTime: time.Now(), + } +} diff --git a/src/client/store/db.sql b/src/client/store/db.sql new file mode 100644 index 0000000..4f82781 --- /dev/null +++ b/src/client/store/db.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS qfixdpl_messages ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + quote_req_id TEXT NOT NULL, + j_message JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id); +CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at); + +CREATE TABLE IF NOT EXISTS qfixdpl_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + quote_req_id TEXT NOT NULL UNIQUE, + raw_msg TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/src/client/store/manager.go b/src/client/store/manager.go index b41e778..574e2c8 100644 --- a/src/client/store/manager.go +++ b/src/client/store/manager.go @@ -2,7 +2,9 @@ package store import ( + _ "embed" "log/slog" + "strings" "time" "quantex.com.ar/multidb" @@ -11,6 +13,9 @@ import ( "quantex.com/qfixdpl/src/common/tracerr" ) +//go:embed db.sql +var schemaSQL string + const dbPingSeconds = 30 type Store struct { @@ -45,9 +50,31 @@ func New(config Config) (*Store, error) { go s.db.PeriodicDBPing(time.Second * dbPingSeconds) + if err := s.ensureTables(); err != nil { + return nil, tracerr.Errorf("error ensuring tables: %w", err) + } + return s, nil } +func (p *Store) ensureTables() error { + statements := strings.Split(schemaSQL, ";") + for _, stmt := range statements { + stmt = strings.TrimSpace(stmt) + if stmt == "" { + continue + } + + if _, err := p.db.Exec(stmt); err != nil { + return tracerr.Errorf("error executing schema statement: %w", err) + } + } + + slog.Info("database tables ensured") + + return nil +} + func (p *Store) CloseDB() { p.db.Close() slog.Info("closing database connection.") diff --git a/src/client/store/persistence.go b/src/client/store/persistence.go new file mode 100644 index 0000000..d5e0120 --- /dev/null +++ b/src/client/store/persistence.go @@ -0,0 +1,105 @@ +package store + +import ( + "encoding/json" + "strings" + "time" + + "quantex.com/qfixdpl/src/common/tracerr" + "quantex.com/qfixdpl/src/domain" +) + +func (p *Store) SaveMessage(msg domain.TradeMessage) error { + jsonBytes, err := json.Marshal(msg.JMessage) + if err != nil { + return tracerr.Errorf("error marshaling j_message: %w", err) + } + + _, err = p.db.Exec( + "INSERT INTO qfixdpl_messages (quote_req_id, j_message) VALUES ($1, $2)", + msg.QuoteReqID, string(jsonBytes), + ) + if err != nil { + return tracerr.Errorf("error inserting message: %w", err) + } + + return nil +} + +func (p *Store) SaveLog(entry domain.LogEntry) error { + upsertStmt := `INSERT INTO qfixdpl_logs (quote_req_id, raw_msg) + VALUES ($1, $2) + ON CONFLICT (quote_req_id) DO UPDATE + SET raw_msg = qfixdpl_logs.raw_msg || E'\n' || EXCLUDED.raw_msg, + updated_at = NOW()` + + _, err := p.db.Exec(upsertStmt, entry.QuoteReqID, entry.RawMsg) + if err != nil { + return tracerr.Errorf("error upserting log: %w", err) + } + + return nil +} + +func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) { + rows, err := p.db.Query( + "SELECT id, quote_req_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC", + ) + if err != nil { + return nil, tracerr.Errorf("error querying today messages: %w", err) + } + defer rows.Close() + + var messages []domain.TradeMessage + + for rows.Next() { + var ( + id, quoteReqID string + jMessageRaw []byte + createdAt time.Time + ) + + if err := rows.Scan(&id, "eReqID, &jMessageRaw, &createdAt); err != nil { + return nil, tracerr.Errorf("error scanning message row: %w", err) + } + + var jMessage domain.FixMessageJSON + if err := json.Unmarshal(jMessageRaw, &jMessage); err != nil { + return nil, tracerr.Errorf("error unmarshaling j_message: %w", err) + } + + messages = append(messages, domain.TradeMessage{ + ID: id, + QuoteReqID: quoteReqID, + JMessage: jMessage, + CreatedAt: createdAt, + }) + } + + if err := rows.Err(); err != nil { + return nil, tracerr.Errorf("error iterating message rows: %w", err) + } + + return messages, nil +} + +func (p *Store) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) { + selectStmt := "SELECT raw_msg FROM qfixdpl_logs WHERE quote_req_id = '" + quoteReqID + "';" + + response, err := p.db.Query(selectStmt) + if err != nil { + return domain.Logs{}, tracerr.Errorf("error querying logs: %w", err) + } + defer response.Close() + + if !response.Next() { + return domain.Logs{}, nil + } + + var rawMsg string + if err := response.Scan(&rawMsg); err != nil { + return domain.Logs{}, tracerr.Errorf("error scanning log row: %w", err) + } + + return domain.Logs{Entries: strings.Split(rawMsg, "\n")}, nil +} diff --git a/src/cmd/service/service.go b/src/cmd/service/service.go index 73b0a2c..b2e79be 100644 --- a/src/cmd/service/service.go +++ b/src/cmd/service/service.go @@ -38,9 +38,8 @@ func Runner(cfg app.Config) error { } userData := data.New() - orderStore := data.NewOrderStore() - fixManager := fix.NewManager(cfg.FIX, orderStore, notify) + fixManager := fix.NewManager(cfg.FIX, appStore, notify) if err = fixManager.Start(); err != nil { return fmt.Errorf("error starting FIX acceptor: %w", err) } @@ -54,7 +53,7 @@ func Runner(cfg app.Config) error { EnableJWTAuth: cfg.EnableJWTAuth, } - api := rest.New(userData, appStore, orderStore, apiConfig, notify) + api := rest.New(userData, appStore, fixManager, apiConfig, notify) api.Run() cmd.WaitForInterruptSignal(nil) diff --git a/src/domain/order.go b/src/domain/order.go deleted file mode 100644 index 1ff45e1..0000000 --- a/src/domain/order.go +++ /dev/null @@ -1,28 +0,0 @@ -// Package domain defines all the domain models -package domain - -import ( - "time" - - "github.com/shopspring/decimal" -) - -// Order represents a FIX NewOrderSingle message received from a client. -type Order struct { - ClOrdID string - Symbol string - Side string - OrdType string - OrderQty decimal.Decimal - Price decimal.Decimal - SessionID string - ReceivedAt time.Time -} - -// OrderStore is the port for persisting and retrieving orders. -type OrderStore interface { - SaveOrder(order Order) - GetOrders() []Order - GetOrderByClOrdID(id string) (Order, bool) -} - diff --git a/src/domain/persistence.go b/src/domain/persistence.go new file mode 100644 index 0000000..12c2ca7 --- /dev/null +++ b/src/domain/persistence.go @@ -0,0 +1,55 @@ +// Package domain defines all the domain models +package domain + +import "time" + +// ListTrade es la representacion exportada de un trade de List Trading. +type ListTrade struct { + QuoteReqID string `json:"quote_req_id"` + ListID string `json:"list_id"` + Symbol string `json:"symbol"` + SecurityIDSrc string `json:"security_id_src"` + Currency string `json:"currency"` + Side string `json:"side"` + OrderQty string `json:"order_qty"` + SettlDate string `json:"settl_date"` + Price string `json:"price"` + OwnerTraderID string `json:"owner_trader_id"` +} + +// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento. +type FixMessageJSON struct { + Direction string `json:"direction"` + MsgType string `json:"msg_type"` + QuoteReqID string `json:"quote_req_id"` + Header map[string]interface{} `json:"header"` + Body map[string]interface{} `json:"body"` + ReceiveTime time.Time `json:"receive_time"` +} + +// TradeMessage es una fila de qfixdpl_messages. +type TradeMessage struct { + ID string `json:"id"` + QuoteReqID string `json:"quote_req_id"` + JMessage FixMessageJSON `json:"j_message"` + CreatedAt time.Time `json:"created_at"` +} + +// LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs. +type LogEntry struct { + QuoteReqID string + RawMsg string +} + +// Logs es la respuesta del endpoint GET /trades/:quoteReqID/logs. +type Logs struct { + Entries []string `json:"entries"` +} + +// PersistenceStore define la interfaz de persistencia. +type PersistenceStore interface { + SaveMessage(msg TradeMessage) error + SaveLog(entry LogEntry) error + GetTodayMessages() ([]TradeMessage, error) + GetLogsByQuoteReqID(quoteReqID string) (Logs, error) +}