diff --git a/src/client/api/rest/controller.go b/src/client/api/rest/controller.go index f739218..0b515c2 100644 --- a/src/client/api/rest/controller.go +++ b/src/client/api/rest/controller.go @@ -33,21 +33,19 @@ type Controller struct { userData app.UserDataProvider store *store.Store orderStore domain.OrderStore - fixSender domain.FIXSender config Config notify domain.Notifier authMutex deadlock.Mutex } func newController(pool *redis.Pool, userData app.UserDataProvider, - s *store.Store, orderStore domain.OrderStore, fixSender domain.FIXSender, config Config, n domain.Notifier, + s *store.Store, orderStore domain.OrderStore, config Config, n domain.Notifier, ) *Controller { return &Controller{ pool: pool, userData: userData, store: s, orderStore: orderStore, - fixSender: fixSender, config: config, notify: n, } @@ -305,39 +303,3 @@ func (cont *Controller) GetOrders(ctx *gin.Context) { ctx.JSON(http.StatusOK, orders) } -// SendQuote godoc -// @Summary Send a FIX Quote -// @Description Sends a Quote (MsgType S) back to the FIX client for a given order -// @Tags fix -// @Accept json -// @Produce json -// @Param quote body QuoteRequest true "Quote details" -// @Success 200 {object} Msg -// @Failure 400 {object} HTTPError -// @Failure 404 {object} HTTPError -// @Failure 500 {object} HTTPError -// @Router /qfixdpl/v1/quotes [post] -func (cont *Controller) SendQuote(ctx *gin.Context) { - var req QuoteRequest - if err := ctx.ShouldBindJSON(&req); err != nil { - ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()}) - - return - } - - bidPx, offerPx, _, _, err := req.toDecimals() - if err != nil { - ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()}) - - return - } - - if err = cont.fixSender.SendQuote( - req.ClOrdID, req.QuoteID, req.Symbol, req.SecurityIDSource, req.Currency, bidPx, offerPx); err != nil { - ctx.JSON(http.StatusInternalServerError, HTTPError{Error: err.Error()}) - - return - } - - ctx.JSON(http.StatusOK, Msg{Text: "quote sent"}) -} diff --git a/src/client/api/rest/model.go b/src/client/api/rest/model.go index 35d4316..6db83a4 100644 --- a/src/client/api/rest/model.go +++ b/src/client/api/rest/model.go @@ -1,10 +1,5 @@ package rest -import ( - "fmt" - - "github.com/shopspring/decimal" -) type HTTPError struct { Error string @@ -23,26 +18,3 @@ type Session struct { Email string } -type QuoteRequest struct { - ClOrdID string `json:"cl_ord_id" binding:"required"` - QuoteID string `json:"quote_id" binding:"required"` - Symbol string `json:"symbol" binding:"required"` - Currency string `json:"currency"` - BidPx string `json:"bid_px" binding:"required"` - OfferPx string `json:"offer_px" binding:"required"` - SecurityIDSource string `json:"security_id_source" binding:"required"` -} - -func (r QuoteRequest) toDecimals() (bidPx, offerPx, bidSize, offerSize decimal.Decimal, err error) { - bidPx, err = decimal.NewFromString(r.BidPx) - if err != nil { - return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid bid_px: %w", err) - } - - offerPx, err = decimal.NewFromString(r.OfferPx) - if err != nil { - return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid offer_px: %w", err) - } - - return bidPx, offerPx, bidSize, offerSize, nil -} diff --git a/src/client/api/rest/routes.go b/src/client/api/rest/routes.go index 7e3d346..c895dcc 100644 --- a/src/client/api/rest/routes.go +++ b/src/client/api/rest/routes.go @@ -22,7 +22,6 @@ func SetRoutes(api *API) { qfixdpl.Use(cont.AuthRequired) qfixdpl.GET("/health", cont.HealthCheck) qfixdpl.GET("/orders", cont.GetOrders) - qfixdpl.POST("/quotes", cont.SendQuote) backoffice := qfixdpl.Group("/backoffice") backoffice.Use(cont.BackOfficeUser) diff --git a/src/client/api/rest/server.go b/src/client/api/rest/server.go index de5c3f8..f3e6cdb 100644 --- a/src/client/api/rest/server.go +++ b/src/client/api/rest/server.go @@ -32,7 +32,7 @@ type Config struct { EnableJWTAuth bool } -func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore domain.OrderStore, fixSender domain.FIXSender, config Config, notify domain.Notifier) *API { +func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore domain.OrderStore, config Config, notify domain.Notifier) *API { // Set up Gin var engine *gin.Engine if version.Environment() == version.EnvironmentTypeProd { @@ -58,7 +58,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore d } api := &API{ - Controller: newController(NewPool(), userData, storeInstance, orderStore, fixSender, config, notify), + Controller: newController(NewPool(), userData, storeInstance, orderStore, config, notify), Router: engine, Port: config.Port, } diff --git a/src/client/fix/application.go b/src/client/fix/application.go index e615843..255ef55 100644 --- a/src/client/fix/application.go +++ b/src/client/fix/application.go @@ -5,9 +5,12 @@ import ( "log/slog" "quantex.com/qfixdpl/quickfix" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse" "quantex.com/qfixdpl/quickfix/gen/tag" "quantex.com/qfixdpl/src/domain" ) @@ -15,11 +18,14 @@ import ( type application struct { router *quickfix.MessageRouter notifier domain.Notifier - onLogon func(quickfix.SessionID) - onLogout func(quickfix.SessionID) - onQuote func(quote.Quote, quickfix.SessionID) - onQuoteRequest func(quoterequest.QuoteRequest, quickfix.SessionID) - onQuoteAck func(quoteack.QuoteAck, quickfix.SessionID) + onLogon func(quickfix.SessionID) + onLogout func(quickfix.SessionID) + onQuote func(quote.Quote, quickfix.SessionID) + onQuoteRequest func(quoterequest.QuoteRequest, quickfix.SessionID) + onQuoteAck func(quoteack.QuoteAck, quickfix.SessionID) + onQuoteResponse func(quoteresponse.QuoteResponse, quickfix.SessionID) + onExecutionReport func(executionreport.ExecutionReport, quickfix.SessionID) + onExecutionAck func(executionack.ExecutionAck, quickfix.SessionID) } func newApplication(n domain.Notifier) *application { @@ -31,6 +37,9 @@ func newApplication(n domain.Notifier) *application { app.router.AddRoute(quote.Route(app.handleQuote)) app.router.AddRoute(quoteack.Route(app.handleQuoteAck)) app.router.AddRoute(quoterequest.Route(app.handleQuoteRequest)) + app.router.AddRoute(quoteresponse.Route(app.handleQuoteResponse)) + app.router.AddRoute(executionack.Route(app.handleExecutionAck)) + app.router.AddRoute(executionreport.Route(app.handleExecutionReport)) return app } @@ -147,3 +156,64 @@ func (a *application) handleQuote(msg quote.Quote, sessionID quickfix.SessionID) return nil } + +func (a *application) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) quickfix.MessageRejectError { + quoteRespID, _ := msg.GetQuoteRespID() + quoteReqID, _ := msg.GetQuoteReqID() + quoteRespType, _ := msg.GetQuoteRespType() + + slog.Info("QuoteResponse received", + "quoteRespID", quoteRespID, + "quoteReqID", quoteReqID, + "quoteRespType", quoteRespType, + "session", sessionID.String(), + ) + + if a.onQuoteResponse != nil { + a.onQuoteResponse(msg, sessionID) + } + + return nil +} + +func (a *application) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) quickfix.MessageRejectError { + execID, _ := msg.GetExecID() + orderID, _ := msg.GetOrderID() + status, _ := msg.GetExecAckStatus() + + slog.Info("ExecutionAck received", + "execID", execID, + "orderID", orderID, + "execAckStatus", status, + "session", sessionID.String(), + ) + + if a.onExecutionAck != nil { + a.onExecutionAck(msg, sessionID) + } + + return nil +} + +func (a *application) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) quickfix.MessageRejectError { + execID, _ := msg.GetExecID() + orderID, _ := msg.GetOrderID() + listID, _ := msg.GetListID() + execType, _ := msg.GetExecType() + ordStatus, _ := msg.GetOrdStatus() + + slog.Info("ExecutionReport received", + "execID", execID, + "orderID", orderID, + "listID", listID, + "execType", execType, + "ordStatus", ordStatus, + "session", sessionID.String(), + ) + + if a.onExecutionReport != nil { + a.onExecutionReport(msg, sessionID) + } + + return nil +} diff --git a/src/client/fix/manager.go b/src/client/fix/manager.go index 8085cb9..260a7d0 100644 --- a/src/client/fix/manager.go +++ b/src/client/fix/manager.go @@ -1,21 +1,24 @@ package fix import ( - "fmt" "log/slog" "os" + "strings" "sync" "time" - "github.com/google/uuid" "github.com/rs/zerolog/log" "github.com/shopspring/decimal" "quantex.com/qfixdpl/quickfix" "quantex.com/qfixdpl/quickfix/gen/enum" "quantex.com/qfixdpl/quickfix/gen/field" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest" + "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport" "quantex.com/qfixdpl/quickfix/store/file" "quantex.com/qfixdpl/src/app" @@ -23,12 +26,28 @@ import ( "quantex.com/qfixdpl/src/domain" ) +type listTrade struct { + QuoteReqID string + ListID string + Symbol string + SecurityIDSrc enum.SecurityIDSource + Currency string + Side enum.Side + OrderQty decimal.Decimal + SettlDate string + Price decimal.Decimal + OwnerTraderID string + SessionID quickfix.SessionID +} + // Manager wraps the QuickFIX initiator and implements domain.FIXSender. type Manager struct { initiator *quickfix.Initiator app *application sessionsMu sync.RWMutex sessions map[string]quickfix.SessionID + tradesMu sync.RWMutex + trades map[string]*listTrade orderStore domain.OrderStore notify domain.Notifier cfg app.FIXConfig @@ -37,6 +56,7 @@ type Manager struct { func NewManager(cfg app.FIXConfig, orderStore domain.OrderStore, notify domain.Notifier) *Manager { return &Manager{ sessions: make(map[string]quickfix.SessionID), + trades: make(map[string]*listTrade), orderStore: orderStore, notify: notify, cfg: cfg, @@ -48,6 +68,10 @@ func (m *Manager) Start() error { fixApp.onLogon = m.onLogon fixApp.onLogout = m.onLogout fixApp.onQuoteRequest = m.handleQuoteRequest + fixApp.onQuoteAck = m.handleQuoteAck + fixApp.onQuoteResponse = m.handleQuoteResponse + fixApp.onExecutionReport = m.handleExecutionReport + fixApp.onExecutionAck = m.handleExecutionAck m.app = fixApp f, err := os.Open(m.cfg.SettingsFile) @@ -117,72 +141,6 @@ func (m *Manager) onLogout(sessionID quickfix.SessionID) { m.sessionsMu.Unlock() } -// SendQuote implements domain.FIXSender. -func (m *Manager) SendQuote( - clOrdID, quoteID, symbol string, - secIDSource string, - currency string, - bidPx, offerPx decimal.Decimal, -) error { - m.sessionsMu.RLock() - var sessionID quickfix.SessionID - var ok bool - for _, sid := range m.sessions { - sessionID = sid - ok = true - break - } - m.sessionsMu.RUnlock() - - if !ok { - err := tracerr.Errorf("error sending quote: no active FIX session") - log.Error().Msg(err.Error()) - - return err - } - - q := quote.New( - field.NewQuoteID(uuid.NewString()), - field.NewQuoteType(enum.QuoteType_INDICATIVE), - field.NewTransactTime(time.Now()), - ) - - sIDSource := enum.SecurityIDSource_ISIN_NUMBER - if secIDSource == "1" { - sIDSource = enum.SecurityIDSource_CUSIP - } - - q.SetSymbol("[N/A]") - q.SetSecurityID(symbol) - q.SetSecurityIDSource(sIDSource) - q.SetQuoteReqID(quoteID) - - if currency != "" { - q.SetCurrency(currency) - } - - if !bidPx.IsZero() { - q.SetBidPx(bidPx, 8) - q.SetSide(enum.Side_SELL) - } else { - q.SetOfferPx(offerPx, 8) - q.SetSide(enum.Side_BUY) - } - - q.SetPriceType(enum.PriceType_PERCENTAGE) - - if err := quickfix.SendToTarget(q, sessionID); err != nil { - err = tracerr.Errorf("error sending FIX quote: %s", err) - log.Error().Msg(err.Error()) - - return err - } - - slog.Info("Quote sent", "clOrdID", clOrdID, "quoteID", quoteID, "symbol", symbol) - - return nil -} - // sendQuoteStatusReport sends a QuoteStatusReport (35=AI) to acknowledge the incoming QuoteRequest. func (m *Manager) sendQuoteStatusReport(quoteReqID, ownerTraderID string, sessionID quickfix.SessionID) error { qsr := quotestatusreport.New( @@ -199,6 +157,33 @@ func (m *Manager) sendQuoteStatusReport(quoteReqID, ownerTraderID string, sessio return quickfix.SendToTarget(qsr, sessionID) } +// sendTradeRequestAck sends a QuoteStatusReport (35=AI) to acknowledge a trade request (TRDREQACK). +func (m *Manager) sendTradeRequestAck(quoteReqID, quoteRespID string, sessionID quickfix.SessionID) error { + qsr := quotestatusreport.New( + field.NewTransactTime(time.Now()), + field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED), + ) + qsr.SetQuoteReqID(quoteReqID) + qsr.SetQuoteRespID(quoteRespID) + qsr.SetSymbol("[N/A]") + + return quickfix.SendToTarget(qsr, sessionID) +} + +// sendExecutionAck sends an ExecutionAck (35=BN) to acknowledge an incoming ExecutionReport. +func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID quickfix.SessionID) error { + bn := executionack.New( + field.NewOrderID(orderID), + field.NewExecID(execID), + field.NewExecAckStatus(enum.ExecAckStatus_ACCEPTED), + ) + bn.SetClOrdID(clOrdID) + bn.SetSymbol("[N/A]") + bn.SetTransactTime(time.Now()) + + return quickfix.SendToTarget(bn, sessionID) +} + // handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge) followed by a Quote at price 99.6. func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) { quoteReqID, err := msg.GetQuoteReqID() @@ -207,11 +192,17 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu return } + // Validate LST_ prefix for List Trading flow. + if !strings.HasPrefix(quoteReqID, "LST_") { + slog.Warn("handleQuoteRequest: QuoteReqID missing LST_ prefix, ignoring", "quoteReqID", quoteReqID) + return + } + var ( - symbol, currency, ownerTraderID, settlDate string - side enum.Side - secIDSource enum.SecurityIDSource - orderQty decimal.Decimal + symbol, currency, ownerTraderID, settlDate, listID, negotiationType string + side enum.Side + secIDSource enum.SecurityIDSource + orderQty decimal.Decimal ) relatedSyms, relErr := msg.GetNoRelatedSym() @@ -224,6 +215,18 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu ownerTraderID, _ = sym.GetOwnerTraderID() orderQty, _ = sym.GetOrderQty() settlDate, _ = sym.GetSettlDate() + listID, _ = sym.GetListID() + negotiationType, _ = sym.GetNegotiationType() + } + + if listID == "" { + slog.Warn("handleQuoteRequest: missing ListID", "quoteReqID", quoteReqID) + return + } + + if negotiationType != "RFQ" { + slog.Warn("handleQuoteRequest: unexpected NegotiationType", "quoteReqID", quoteReqID, "negotiationType", negotiationType) + return } // Step 1: Send QuoteStatusReport (35=AI) to acknowledge the inquiry. @@ -241,7 +244,7 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu sIDSource = enum.SecurityIDSource_CUSIP } - quoteID := fmt.Sprintf("Q-%d", time.Now().UnixMilli()) + quoteID := quoteReqID q := quote.New( field.NewQuoteID(quoteID), field.NewQuoteType(enum.QuoteType_SEND_QUOTE), @@ -265,6 +268,8 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu q.SetSettlDate(settlDate) } + q.SetPrice(price, 8) + if side == enum.Side_BUY { q.SetOfferPx(price, 8) q.SetSide(enum.Side_BUY) @@ -285,4 +290,111 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu } slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol) + + // Store trade state for subsequent steps. + m.tradesMu.Lock() + m.trades[quoteReqID] = &listTrade{ + QuoteReqID: quoteReqID, + ListID: listID, + Symbol: symbol, + SecurityIDSrc: sIDSource, + Currency: currency, + Side: side, + OrderQty: orderQty, + SettlDate: settlDate, + Price: price, + OwnerTraderID: ownerTraderID, + SessionID: sessionID, + } + m.tradesMu.Unlock() +} + +// handleQuoteAck handles an incoming QuoteAck (35=CW). +func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) { + quoteReqID, _ := msg.GetQuoteReqID() + status, _ := msg.GetQuoteAckStatus() + + if status != enum.QuoteAckStatus_ACCEPTED { + slog.Warn("handleQuoteAck: unexpected status", "quoteReqID", quoteReqID, "quoteAckStatus", status) + } +} + +// handleQuoteResponse handles an incoming QuoteResponse (35=AJ). +func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) { + quoteReqID, _ := msg.GetQuoteReqID() + quoteRespID, _ := msg.GetQuoteRespID() + + slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) + + if !strings.HasSuffix(quoteRespID, "_TRDREQ") { + slog.Info("handleQuoteResponse: QuoteRespID does not end with _TRDREQ, ignoring", "quoteRespID", quoteRespID) + return + } + + m.tradesMu.RLock() + _, ok := m.trades[quoteReqID] + m.tradesMu.RUnlock() + + if !ok { + slog.Warn("handleQuoteResponse: no trade found for QuoteReqID", "quoteReqID", quoteReqID) + return + } + + // TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future. + + // Send QuoteStatusReport (35=AI) as TRDREQACK. + if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil { + slog.Error("handleQuoteResponse: failed to send TRDREQACK", "quoteReqID", quoteReqID, "error", ackErr.Error()) + return + } + slog.Info("TRDREQACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) +} + +// handleExecutionReport handles an incoming ExecutionReport (35=8). +// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does. +func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) { + execID, _ := msg.GetExecID() + orderID, _ := msg.GetOrderID() + clOrdID, _ := msg.GetClOrdID() + execType, _ := msg.GetExecType() + ordStatus, _ := msg.GetOrdStatus() + listID, _ := msg.GetListID() + + slog.Info("handleExecutionReport received", + "execID", execID, "orderID", orderID, "clOrdID", clOrdID, + "execType", string(execType), "ordStatus", string(ordStatus), "listID", listID, + ) + + // Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW. + if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil { + slog.Error("handleExecutionReport: failed to send ExecutionAck", "execID", execID, "error", ackErr.Error()) + } else { + slog.Info("ExecutionAck sent", "execID", execID) + } + + switch { + case strings.Contains(execID, "_LISTEND"): + slog.Info("List ended (due-in closed), awaiting trade result from TW", + "execID", execID, "clOrdID", clOrdID) + + case strings.Contains(execID, "_TRDEND"): + slog.Info("Trade ended, cleaning up", "execID", execID, "clOrdID", clOrdID) + + m.tradesMu.Lock() + delete(m.trades, clOrdID) + m.tradesMu.Unlock() + + case strings.Contains(execID, "_TRDSUMM"): + slog.Info("Trade summary received from TW", + "execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID) + + case execType == enum.ExecType_TRADE: + slog.Info("Trade result received from TW", + "execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID) + } +} + +// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW. +func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) { + // Logged in application.go, no further action needed. } diff --git a/src/cmd/service/service.go b/src/cmd/service/service.go index e948375..73b0a2c 100644 --- a/src/cmd/service/service.go +++ b/src/cmd/service/service.go @@ -54,7 +54,7 @@ func Runner(cfg app.Config) error { EnableJWTAuth: cfg.EnableJWTAuth, } - api := rest.New(userData, appStore, orderStore, fixManager, apiConfig, notify) + api := rest.New(userData, appStore, orderStore, apiConfig, notify) api.Run() cmd.WaitForInterruptSignal(nil) diff --git a/src/domain/order.go b/src/domain/order.go index e7f677c..1ff45e1 100644 --- a/src/domain/order.go +++ b/src/domain/order.go @@ -26,12 +26,3 @@ type OrderStore interface { GetOrderByClOrdID(id string) (Order, bool) } -// FIXSender is the port for sending FIX messages back to clients. -type FIXSender interface { - SendQuote( - clOrdID, quoteID, symbol string, - secIDSource string, - currency string, - bidPx, offerPx decimal.Decimal, - ) error -}