Flow 8.4 list trading working

This commit is contained in:
2026-03-16 12:44:52 -03:00
parent 5f1d7038ac
commit e17675d973
8 changed files with 264 additions and 158 deletions

View File

@ -33,21 +33,19 @@ type Controller struct {
userData app.UserDataProvider
store *store.Store
orderStore domain.OrderStore
fixSender domain.FIXSender
config Config
notify domain.Notifier
authMutex deadlock.Mutex
}
func newController(pool *redis.Pool, userData app.UserDataProvider,
s *store.Store, orderStore domain.OrderStore, fixSender domain.FIXSender, config Config, n domain.Notifier,
s *store.Store, orderStore domain.OrderStore, config Config, n domain.Notifier,
) *Controller {
return &Controller{
pool: pool,
userData: userData,
store: s,
orderStore: orderStore,
fixSender: fixSender,
config: config,
notify: n,
}
@ -305,39 +303,3 @@ func (cont *Controller) GetOrders(ctx *gin.Context) {
ctx.JSON(http.StatusOK, orders)
}
// SendQuote godoc
// @Summary Send a FIX Quote
// @Description Sends a Quote (MsgType S) back to the FIX client for a given order
// @Tags fix
// @Accept json
// @Produce json
// @Param quote body QuoteRequest true "Quote details"
// @Success 200 {object} Msg
// @Failure 400 {object} HTTPError
// @Failure 404 {object} HTTPError
// @Failure 500 {object} HTTPError
// @Router /qfixdpl/v1/quotes [post]
func (cont *Controller) SendQuote(ctx *gin.Context) {
var req QuoteRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
bidPx, offerPx, _, _, err := req.toDecimals()
if err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
if err = cont.fixSender.SendQuote(
req.ClOrdID, req.QuoteID, req.Symbol, req.SecurityIDSource, req.Currency, bidPx, offerPx); err != nil {
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: err.Error()})
return
}
ctx.JSON(http.StatusOK, Msg{Text: "quote sent"})
}

View File

@ -1,10 +1,5 @@
package rest
import (
"fmt"
"github.com/shopspring/decimal"
)
type HTTPError struct {
Error string
@ -23,26 +18,3 @@ type Session struct {
Email string
}
type QuoteRequest struct {
ClOrdID string `json:"cl_ord_id" binding:"required"`
QuoteID string `json:"quote_id" binding:"required"`
Symbol string `json:"symbol" binding:"required"`
Currency string `json:"currency"`
BidPx string `json:"bid_px" binding:"required"`
OfferPx string `json:"offer_px" binding:"required"`
SecurityIDSource string `json:"security_id_source" binding:"required"`
}
func (r QuoteRequest) toDecimals() (bidPx, offerPx, bidSize, offerSize decimal.Decimal, err error) {
bidPx, err = decimal.NewFromString(r.BidPx)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid bid_px: %w", err)
}
offerPx, err = decimal.NewFromString(r.OfferPx)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid offer_px: %w", err)
}
return bidPx, offerPx, bidSize, offerSize, nil
}

View File

@ -22,7 +22,6 @@ func SetRoutes(api *API) {
qfixdpl.Use(cont.AuthRequired)
qfixdpl.GET("/health", cont.HealthCheck)
qfixdpl.GET("/orders", cont.GetOrders)
qfixdpl.POST("/quotes", cont.SendQuote)
backoffice := qfixdpl.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser)

View File

@ -32,7 +32,7 @@ type Config struct {
EnableJWTAuth bool
}
func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore domain.OrderStore, fixSender domain.FIXSender, config Config, notify domain.Notifier) *API {
func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore domain.OrderStore, config Config, notify domain.Notifier) *API {
// Set up Gin
var engine *gin.Engine
if version.Environment() == version.EnvironmentTypeProd {
@ -58,7 +58,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore d
}
api := &API{
Controller: newController(NewPool(), userData, storeInstance, orderStore, fixSender, config, notify),
Controller: newController(NewPool(), userData, storeInstance, orderStore, config, notify),
Router: engine,
Port: config.Port,
}

View File

@ -5,9 +5,12 @@ import (
"log/slog"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/tag"
"quantex.com/qfixdpl/src/domain"
)
@ -15,11 +18,14 @@ import (
type application struct {
router *quickfix.MessageRouter
notifier domain.Notifier
onLogon func(quickfix.SessionID)
onLogout func(quickfix.SessionID)
onQuote func(quote.Quote, quickfix.SessionID)
onQuoteRequest func(quoterequest.QuoteRequest, quickfix.SessionID)
onQuoteAck func(quoteack.QuoteAck, quickfix.SessionID)
onLogon func(quickfix.SessionID)
onLogout func(quickfix.SessionID)
onQuote func(quote.Quote, quickfix.SessionID)
onQuoteRequest func(quoterequest.QuoteRequest, quickfix.SessionID)
onQuoteAck func(quoteack.QuoteAck, quickfix.SessionID)
onQuoteResponse func(quoteresponse.QuoteResponse, quickfix.SessionID)
onExecutionReport func(executionreport.ExecutionReport, quickfix.SessionID)
onExecutionAck func(executionack.ExecutionAck, quickfix.SessionID)
}
func newApplication(n domain.Notifier) *application {
@ -31,6 +37,9 @@ func newApplication(n domain.Notifier) *application {
app.router.AddRoute(quote.Route(app.handleQuote))
app.router.AddRoute(quoteack.Route(app.handleQuoteAck))
app.router.AddRoute(quoterequest.Route(app.handleQuoteRequest))
app.router.AddRoute(quoteresponse.Route(app.handleQuoteResponse))
app.router.AddRoute(executionack.Route(app.handleExecutionAck))
app.router.AddRoute(executionreport.Route(app.handleExecutionReport))
return app
}
@ -147,3 +156,64 @@ func (a *application) handleQuote(msg quote.Quote, sessionID quickfix.SessionID)
return nil
}
func (a *application) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) quickfix.MessageRejectError {
quoteRespID, _ := msg.GetQuoteRespID()
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespType, _ := msg.GetQuoteRespType()
slog.Info("QuoteResponse received",
"quoteRespID", quoteRespID,
"quoteReqID", quoteReqID,
"quoteRespType", quoteRespType,
"session", sessionID.String(),
)
if a.onQuoteResponse != nil {
a.onQuoteResponse(msg, sessionID)
}
return nil
}
func (a *application) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) quickfix.MessageRejectError {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
status, _ := msg.GetExecAckStatus()
slog.Info("ExecutionAck received",
"execID", execID,
"orderID", orderID,
"execAckStatus", status,
"session", sessionID.String(),
)
if a.onExecutionAck != nil {
a.onExecutionAck(msg, sessionID)
}
return nil
}
func (a *application) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
listID, _ := msg.GetListID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
slog.Info("ExecutionReport received",
"execID", execID,
"orderID", orderID,
"listID", listID,
"execType", execType,
"ordStatus", ordStatus,
"session", sessionID.String(),
)
if a.onExecutionReport != nil {
a.onExecutionReport(msg, sessionID)
}
return nil
}

View File

@ -1,21 +1,24 @@
package fix
import (
"fmt"
"log/slog"
"os"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/enum"
"quantex.com/qfixdpl/quickfix/gen/field"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport"
"quantex.com/qfixdpl/quickfix/store/file"
"quantex.com/qfixdpl/src/app"
@ -23,12 +26,28 @@ import (
"quantex.com/qfixdpl/src/domain"
)
type listTrade struct {
QuoteReqID string
ListID string
Symbol string
SecurityIDSrc enum.SecurityIDSource
Currency string
Side enum.Side
OrderQty decimal.Decimal
SettlDate string
Price decimal.Decimal
OwnerTraderID string
SessionID quickfix.SessionID
}
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
type Manager struct {
initiator *quickfix.Initiator
app *application
sessionsMu sync.RWMutex
sessions map[string]quickfix.SessionID
tradesMu sync.RWMutex
trades map[string]*listTrade
orderStore domain.OrderStore
notify domain.Notifier
cfg app.FIXConfig
@ -37,6 +56,7 @@ type Manager struct {
func NewManager(cfg app.FIXConfig, orderStore domain.OrderStore, notify domain.Notifier) *Manager {
return &Manager{
sessions: make(map[string]quickfix.SessionID),
trades: make(map[string]*listTrade),
orderStore: orderStore,
notify: notify,
cfg: cfg,
@ -48,6 +68,10 @@ func (m *Manager) Start() error {
fixApp.onLogon = m.onLogon
fixApp.onLogout = m.onLogout
fixApp.onQuoteRequest = m.handleQuoteRequest
fixApp.onQuoteAck = m.handleQuoteAck
fixApp.onQuoteResponse = m.handleQuoteResponse
fixApp.onExecutionReport = m.handleExecutionReport
fixApp.onExecutionAck = m.handleExecutionAck
m.app = fixApp
f, err := os.Open(m.cfg.SettingsFile)
@ -117,72 +141,6 @@ func (m *Manager) onLogout(sessionID quickfix.SessionID) {
m.sessionsMu.Unlock()
}
// SendQuote implements domain.FIXSender.
func (m *Manager) SendQuote(
clOrdID, quoteID, symbol string,
secIDSource string,
currency string,
bidPx, offerPx decimal.Decimal,
) error {
m.sessionsMu.RLock()
var sessionID quickfix.SessionID
var ok bool
for _, sid := range m.sessions {
sessionID = sid
ok = true
break
}
m.sessionsMu.RUnlock()
if !ok {
err := tracerr.Errorf("error sending quote: no active FIX session")
log.Error().Msg(err.Error())
return err
}
q := quote.New(
field.NewQuoteID(uuid.NewString()),
field.NewQuoteType(enum.QuoteType_INDICATIVE),
field.NewTransactTime(time.Now()),
)
sIDSource := enum.SecurityIDSource_ISIN_NUMBER
if secIDSource == "1" {
sIDSource = enum.SecurityIDSource_CUSIP
}
q.SetSymbol("[N/A]")
q.SetSecurityID(symbol)
q.SetSecurityIDSource(sIDSource)
q.SetQuoteReqID(quoteID)
if currency != "" {
q.SetCurrency(currency)
}
if !bidPx.IsZero() {
q.SetBidPx(bidPx, 8)
q.SetSide(enum.Side_SELL)
} else {
q.SetOfferPx(offerPx, 8)
q.SetSide(enum.Side_BUY)
}
q.SetPriceType(enum.PriceType_PERCENTAGE)
if err := quickfix.SendToTarget(q, sessionID); err != nil {
err = tracerr.Errorf("error sending FIX quote: %s", err)
log.Error().Msg(err.Error())
return err
}
slog.Info("Quote sent", "clOrdID", clOrdID, "quoteID", quoteID, "symbol", symbol)
return nil
}
// sendQuoteStatusReport sends a QuoteStatusReport (35=AI) to acknowledge the incoming QuoteRequest.
func (m *Manager) sendQuoteStatusReport(quoteReqID, ownerTraderID string, sessionID quickfix.SessionID) error {
qsr := quotestatusreport.New(
@ -199,6 +157,33 @@ func (m *Manager) sendQuoteStatusReport(quoteReqID, ownerTraderID string, sessio
return quickfix.SendToTarget(qsr, sessionID)
}
// sendTradeRequestAck sends a QuoteStatusReport (35=AI) to acknowledge a trade request (TRDREQACK).
func (m *Manager) sendTradeRequestAck(quoteReqID, quoteRespID string, sessionID quickfix.SessionID) error {
qsr := quotestatusreport.New(
field.NewTransactTime(time.Now()),
field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED),
)
qsr.SetQuoteReqID(quoteReqID)
qsr.SetQuoteRespID(quoteRespID)
qsr.SetSymbol("[N/A]")
return quickfix.SendToTarget(qsr, sessionID)
}
// sendExecutionAck sends an ExecutionAck (35=BN) to acknowledge an incoming ExecutionReport.
func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID quickfix.SessionID) error {
bn := executionack.New(
field.NewOrderID(orderID),
field.NewExecID(execID),
field.NewExecAckStatus(enum.ExecAckStatus_ACCEPTED),
)
bn.SetClOrdID(clOrdID)
bn.SetSymbol("[N/A]")
bn.SetTransactTime(time.Now())
return quickfix.SendToTarget(bn, sessionID)
}
// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge) followed by a Quote at price 99.6.
func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) {
quoteReqID, err := msg.GetQuoteReqID()
@ -207,11 +192,17 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
return
}
// Validate LST_ prefix for List Trading flow.
if !strings.HasPrefix(quoteReqID, "LST_") {
slog.Warn("handleQuoteRequest: QuoteReqID missing LST_ prefix, ignoring", "quoteReqID", quoteReqID)
return
}
var (
symbol, currency, ownerTraderID, settlDate string
side enum.Side
secIDSource enum.SecurityIDSource
orderQty decimal.Decimal
symbol, currency, ownerTraderID, settlDate, listID, negotiationType string
side enum.Side
secIDSource enum.SecurityIDSource
orderQty decimal.Decimal
)
relatedSyms, relErr := msg.GetNoRelatedSym()
@ -224,6 +215,18 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
ownerTraderID, _ = sym.GetOwnerTraderID()
orderQty, _ = sym.GetOrderQty()
settlDate, _ = sym.GetSettlDate()
listID, _ = sym.GetListID()
negotiationType, _ = sym.GetNegotiationType()
}
if listID == "" {
slog.Warn("handleQuoteRequest: missing ListID", "quoteReqID", quoteReqID)
return
}
if negotiationType != "RFQ" {
slog.Warn("handleQuoteRequest: unexpected NegotiationType", "quoteReqID", quoteReqID, "negotiationType", negotiationType)
return
}
// Step 1: Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
@ -241,7 +244,7 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
sIDSource = enum.SecurityIDSource_CUSIP
}
quoteID := fmt.Sprintf("Q-%d", time.Now().UnixMilli())
quoteID := quoteReqID
q := quote.New(
field.NewQuoteID(quoteID),
field.NewQuoteType(enum.QuoteType_SEND_QUOTE),
@ -265,6 +268,8 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
q.SetSettlDate(settlDate)
}
q.SetPrice(price, 8)
if side == enum.Side_BUY {
q.SetOfferPx(price, 8)
q.SetSide(enum.Side_BUY)
@ -285,4 +290,111 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
}
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol)
// Store trade state for subsequent steps.
m.tradesMu.Lock()
m.trades[quoteReqID] = &listTrade{
QuoteReqID: quoteReqID,
ListID: listID,
Symbol: symbol,
SecurityIDSrc: sIDSource,
Currency: currency,
Side: side,
OrderQty: orderQty,
SettlDate: settlDate,
Price: price,
OwnerTraderID: ownerTraderID,
SessionID: sessionID,
}
m.tradesMu.Unlock()
}
// handleQuoteAck handles an incoming QuoteAck (35=CW).
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
status, _ := msg.GetQuoteAckStatus()
if status != enum.QuoteAckStatus_ACCEPTED {
slog.Warn("handleQuoteAck: unexpected status", "quoteReqID", quoteReqID, "quoteAckStatus", status)
}
}
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespID, _ := msg.GetQuoteRespID()
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
if !strings.HasSuffix(quoteRespID, "_TRDREQ") {
slog.Info("handleQuoteResponse: QuoteRespID does not end with _TRDREQ, ignoring", "quoteRespID", quoteRespID)
return
}
m.tradesMu.RLock()
_, ok := m.trades[quoteReqID]
m.tradesMu.RUnlock()
if !ok {
slog.Warn("handleQuoteResponse: no trade found for QuoteReqID", "quoteReqID", quoteReqID)
return
}
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
// Send QuoteStatusReport (35=AI) as TRDREQACK.
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
slog.Error("handleQuoteResponse: failed to send TRDREQACK", "quoteReqID", quoteReqID, "error", ackErr.Error())
return
}
slog.Info("TRDREQACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
}
// handleExecutionReport handles an incoming ExecutionReport (35=8).
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
clOrdID, _ := msg.GetClOrdID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
listID, _ := msg.GetListID()
slog.Info("handleExecutionReport received",
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
)
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
slog.Error("handleExecutionReport: failed to send ExecutionAck", "execID", execID, "error", ackErr.Error())
} else {
slog.Info("ExecutionAck sent", "execID", execID)
}
switch {
case strings.Contains(execID, "_LISTEND"):
slog.Info("List ended (due-in closed), awaiting trade result from TW",
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended, cleaning up", "execID", execID, "clOrdID", clOrdID)
m.tradesMu.Lock()
delete(m.trades, clOrdID)
m.tradesMu.Unlock()
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}