package fix import ( "log/slog" "os" "sort" "strings" "sync" "time" "github.com/rs/zerolog/log" uuid "github.com/satori/go.uuid" "github.com/shopspring/decimal" "quantex.com/qfixdpl/quickfix" "quantex.com/qfixdpl/quickfix/datadictionary" "quantex.com/qfixdpl/quickfix/gen/enum" "quantex.com/qfixdpl/quickfix/gen/field" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport" "quantex.com/qfixdpl/quickfix/gen/tag" filelog "quantex.com/qfixdpl/quickfix/log/file" "quantex.com/qfixdpl/quickfix/store/file" "quantex.com/qfixdpl/src/app" "quantex.com/qfixdpl/src/common/tracerr" "quantex.com/qfixdpl/src/domain" ) type listTrade struct { QuoteRequest domain.FixMessageJSON SessionID quickfix.SessionID Quoted bool Price decimal.Decimal } // Manager wraps the QuickFIX initiator and implements domain.FIXSender. type Manager struct { initiator *quickfix.Initiator app *application sessionsMu sync.RWMutex sessions map[string]quickfix.SessionID tradesMu sync.RWMutex trades map[string]*listTrade messagesMu sync.RWMutex messages []domain.Message store domain.PersistenceStore notify domain.Notifier cfg app.FIXConfig dict *datadictionary.DataDictionary } func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager { return &Manager{ sessions: make(map[string]quickfix.SessionID), trades: make(map[string]*listTrade), messages: make([]domain.Message, 0), store: store, notify: notify, cfg: cfg, } } func (m *Manager) Start() error { fixApp := newApplication(m.notify) fixApp.onLogon = m.onLogon fixApp.onLogout = m.onLogout fixApp.onQuoteRequest = m.handleQuoteRequest fixApp.onQuoteAck = m.handleQuoteAck fixApp.onQuoteResponse = m.handleQuoteResponse fixApp.onExecutionReport = m.handleExecutionReport fixApp.onExecutionAck = m.handleExecutionAck fixApp.onRawMessage = m.handleRawMessage m.app = fixApp dict, err := datadictionary.Parse(m.cfg.DataDictionaryFile) if err != nil { err = tracerr.Errorf("error parsing FIX data dictionary %q: %w", m.cfg.DataDictionaryFile, err) slog.Error(err.Error()) return err } m.dict = dict if err := m.loadActiveTrades(); err != nil { err = tracerr.Errorf("failed to load active trades from DB, starting with empty state: %w", err) slog.Error(err.Error()) } if err := m.loadTodayMessages(); err != nil { err = tracerr.Errorf("failed to load today messages from DB, starting with empty list: %w", err) slog.Error(err.Error()) } f, err := os.Open(m.cfg.SettingsFile) if err != nil { err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err) slog.Error(err.Error()) return err } defer f.Close() settings, err := quickfix.ParseSettings(f) if err != nil { err = tracerr.Errorf("error parsing FIX settings: %w", err) slog.Error(err.Error()) return err } storeFactory := file.NewStoreFactory(settings) logFactory, err := filelog.NewLogFactory(settings) if err != nil { err = tracerr.Errorf("error creating file log factory: %w", err) slog.Error(err.Error()) return err } initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory) if err != nil { err = tracerr.Errorf("error creating FIX initiator: %w", err) slog.Error(err.Error()) return err } m.initiator = initiator if err = m.initiator.Start(); err != nil { err = tracerr.Errorf("error starting FIX initiator: %w", err) slog.Error(err.Error()) return err } slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile) return nil } func (m *Manager) Stop() { if m.initiator != nil { m.initiator.Stop() slog.Info("FIX initiator stopped") } } func (m *Manager) onLogon(sessionID quickfix.SessionID) { m.sessionsMu.Lock() m.sessions[sessionID.String()] = sessionID m.sessionsMu.Unlock() } func (m *Manager) onLogout(sessionID quickfix.SessionID) { m.sessionsMu.Lock() delete(m.sessions, sessionID.String()) m.sessionsMu.Unlock() } // sendQuoteStatusReport sends a QuoteStatusReport (35=AI) to acknowledge the incoming QuoteRequest. func (m *Manager) sendQuoteStatusReport(quoteReqID, ownerTraderID string, sessionID quickfix.SessionID) error { qsr := quotestatusreport.New( field.NewTransactTime(time.Now()), field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED), ) qsr.SetQuoteReqID(quoteReqID) qsr.SetQuoteID(quoteReqID) qsr.SetSymbol("[N/A]") if ownerTraderID != "" { qsr.SetOwnerTraderID(ownerTraderID) } return quickfix.SendToTarget(qsr, sessionID) } // sendTradeRequestAck sends a QuoteStatusReport (35=AI) to acknowledge a trade request (TRDREQACK). func (m *Manager) sendTradeRequestAck(quoteReqID, quoteRespID string, sessionID quickfix.SessionID) error { qsr := quotestatusreport.New( field.NewTransactTime(time.Now()), field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED), ) qsr.SetQuoteReqID(quoteReqID) qsr.SetQuoteRespID(quoteRespID) qsr.SetSymbol("[N/A]") return quickfix.SendToTarget(qsr, sessionID) } // sendExecutionAck sends an ExecutionAck (35=BN) to acknowledge an incoming ExecutionReport. func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID quickfix.SessionID) error { bn := executionack.New( field.NewOrderID(orderID), field.NewExecID(execID), field.NewExecAckStatus(enum.ExecAckStatus_ACCEPTED), ) bn.SetClOrdID(clOrdID) bn.SetSymbol("[N/A]") bn.SetTransactTime(time.Now()) return quickfix.SendToTarget(bn, sessionID) } // handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge). // The Quote (35=S) is sent later via the REST endpoint when the dealer prices it. func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) { parsed := parseQuoteRequest(msg, m.dict) quoteReqID := parsed.QuoteReqID if quoteReqID == "" { m.notify.SendMsg( domain.MessageChannelError, "quoteReqID missing in quote request", domain.MessageStatusWarning, nil, ) err := tracerr.Errorf("handleQuoteRequest, missing QuoteReqID, quoteRequest: %+v", parsed) slog.Error(err.Error()) return } // Validate LST_ prefix for List Trading flow. if !strings.HasPrefix(quoteReqID, "LST_") { m.notify.SendMsg( domain.MessageChannelError, "quoteReqID ("+quoteReqID+") missing LST_ prefix", domain.MessageStatusWarning, nil, ) slog.Warn("handleQuoteRequest: QuoteReqID missing LST_ prefix, ignoring", "quoteReqID", quoteReqID) return } bodyKeys := make([]string, 0, len(parsed.Body)) for k := range parsed.Body { bodyKeys = append(bodyKeys, k) } slog.Info("handleQuoteRequest: parsed body keys", "quoteReqID", quoteReqID, "keys", bodyKeys) relSym := firstGroup(parsed.Body, "NoRelatedSym") relSymKeys := make([]string, 0, len(relSym)) for k := range relSym { relSymKeys = append(relSymKeys, k) } slog.Info("handleQuoteRequest: NoRelatedSym keys", "quoteReqID", quoteReqID, "keys", relSymKeys) ownerTraderID := getString(relSym, "OwnerTraderID") // Send QuoteStatusReport (35=AI) to acknowledge the inquiry. if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil { ackErr = tracerr.Errorf("handleQuoteRequest: failed to send QuoteStatusReport (quoteReqID=%s): %w", quoteReqID, ackErr) slog.Error(ackErr.Error()) return } slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID) // Store trade state as pending; Quote (35=S) is sent later via REST endpoint. m.tradesMu.Lock() m.trades[quoteReqID] = &listTrade{ QuoteRequest: parsed, SessionID: sessionID, Quoted: false, } m.tradesMu.Unlock() } // handleQuoteAck handles an incoming QuoteAck (35=CW). func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) { quoteReqID, _ := msg.GetQuoteReqID() status, _ := msg.GetQuoteAckStatus() text, _ := msg.GetText() if status != enum.QuoteAckStatus_ACCEPTED { err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text) slog.Error(err.Error()) return } slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID) } // handleQuoteResponse handles an incoming QuoteResponse (35=AJ). // Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes. func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) { quoteReqID, _ := msg.GetQuoteReqID() quoteRespID, _ := msg.GetQuoteRespID() slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ") isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND") isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM") isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND") if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd { slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID) return } // TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future. // Always send ACK regardless of whether the trade is in our map. // TW will keep retrying until it receives an ACK. if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil { ackErr = tracerr.Errorf("handleQuoteResponse: failed to send ACK (quoteReqID=%s, quoteRespID=%s): %w", quoteReqID, quoteRespID, ackErr) slog.Error(ackErr.Error()) return } slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) // _TRDSUMM is the final message — clean up the trade. if isTrdSumm { slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID) } } // handleExecutionReport handles an incoming ExecutionReport (35=8). // In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does. func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) { execID, _ := msg.GetExecID() orderID, _ := msg.GetOrderID() clOrdID, _ := msg.GetClOrdID() execType, _ := msg.GetExecType() ordStatus, _ := msg.GetOrdStatus() listID, _ := msg.GetListID() slog.Info("handleExecutionReport received", "execID", execID, "orderID", orderID, "clOrdID", clOrdID, "execType", string(execType), "ordStatus", string(ordStatus), "listID", listID, ) // Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW. if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil { ackErr = tracerr.Errorf("handleExecutionReport: failed to send ExecutionAck (execID=%s): %w", execID, ackErr) slog.Error(ackErr.Error()) } else { slog.Info("ExecutionAck sent", "execID", execID) } switch { case strings.Contains(execID, "_LISTEND"): slog.Info("List ended (due-in closed), awaiting trade result from TW", "execID", execID, "clOrdID", clOrdID) case strings.Contains(execID, "_TRDEND"): slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID) case strings.Contains(execID, "_TRDSUMM"): slog.Info("Trade summary received from TW, cleaning up", "execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID) case execType == enum.ExecType_TRADE: slog.Info("Trade result received from TW", "execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID) } } // handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW. func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) { // Logged in application.go, no further action needed. } // GetTrades returns a snapshot of all active trades. func (m *Manager) GetTrades() []domain.ListTrade { m.tradesMu.RLock() defer m.tradesMu.RUnlock() trades := make([]domain.ListTrade, 0, len(m.trades)) for _, t := range m.trades { trades = append(trades, toDomainListTrade(t)) } return trades } // GetPendingQuoteRequests returns trades that have received a QuoteRequest but not yet been quoted by the dealer. func (m *Manager) GetPendingQuoteRequests() []domain.ListTrade { m.tradesMu.RLock() defer m.tradesMu.RUnlock() pending := make([]domain.ListTrade, 0) for _, t := range m.trades { pending = append(pending, toDomainListTrade(t)) } return pending } func toDomainListTrade(t *listTrade) domain.ListTrade { out := domain.ListTrade{ QuoteRequest: t.QuoteRequest, } return out } // SendQuote builds and sends a Quote (35=S) for an existing pending QuoteRequest at the given price. func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error { m.tradesMu.Lock() t, ok := m.trades[quoteReqID] if !ok { m.tradesMu.Unlock() err := tracerr.Errorf("SendQuote: quoteReqID %s not found", quoteReqID) slog.Error(err.Error()) return err } sessionID := t.SessionID if sessionID == (quickfix.SessionID{}) { sessionID = m.anyActiveSessionID() if sessionID == (quickfix.SessionID{}) { m.tradesMu.Unlock() err := tracerr.Errorf("SendQuote: no active FIX session for quoteReqID %s", quoteReqID) slog.Error(err.Error()) return err } } symbol := getString(t.QuoteRequest.Body, "SecurityID") sIDSource := enum.SecurityIDSource(getString(t.QuoteRequest.Body, "SecurityIDSource")) currency := getString(t.QuoteRequest.Body, "Currency") side := enum.Side(getString(t.QuoteRequest.Body, "Side")) orderQty := getDecimal(t.QuoteRequest.Body, "OrderQty") settlDate := getString(t.QuoteRequest.Body, "SettlDate") ownerTraderID := getString(t.QuoteRequest.Body, "OwnerTraderID") m.tradesMu.Unlock() if sIDSource != enum.SecurityIDSource_CUSIP { sIDSource = enum.SecurityIDSource_ISIN_NUMBER } quoteID := quoteReqID q := quote.New( field.NewQuoteID(quoteID), field.NewQuoteType(enum.QuoteType_SEND_QUOTE), field.NewTransactTime(time.Now()), ) q.SetSymbol("[N/A]") q.SetSecurityID(symbol) q.SetSecurityIDSource(sIDSource) q.SetQuoteReqID(quoteReqID) if currency != "" { q.SetCurrency(currency) } if !orderQty.IsZero() { q.SetOrderQty(orderQty, 0) } if settlDate != "" { q.SetSettlDate(settlDate) } q.SetPrice(price, 8) if side == enum.Side_BUY { q.SetOfferPx(price, 8) q.SetSide(enum.Side_BUY) } else { q.SetBidPx(price, 8) q.SetSide(enum.Side_SELL) } q.SetPriceType(enum.PriceType_PERCENTAGE) if ownerTraderID != "" { q.SetOwnerTraderID(ownerTraderID) } if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil { sendErr = tracerr.Errorf("SendQuote: failed to send quote (quoteReqID=%s): %w", quoteReqID, sendErr) slog.Error(sendErr.Error()) return sendErr } m.tradesMu.Lock() if t, ok := m.trades[quoteReqID]; ok { t.Price = price t.Quoted = true } m.tradesMu.Unlock() slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String()) return nil } // firstGroup returns the first repetition of a NUMINGROUP field, or nil. func firstGroup(body map[string]any, name string) map[string]any { if body == nil { return nil } groups, ok := body[name].([]map[string]any) if !ok || len(groups) == 0 { return nil } return groups[0] } // getString reads a string value from a body map, tolerating nil maps and missing keys. func getString(body map[string]any, name string) string { if body == nil { return "" } if v, ok := body[name].(string); ok { return v } return "" } // getDecimal reads a decimal value from a body map. Numeric FIX types come through // as strings (e.g. "10000"); INT-typed counts may be int. Both are accepted. func getDecimal(body map[string]any, name string) decimal.Decimal { if body == nil { return decimal.Decimal{} } switch v := body[name].(type) { case string: d, _ := decimal.NewFromString(v) return d case int: return decimal.NewFromInt(int64(v)) } return decimal.Decimal{} } func (m *Manager) anyActiveSessionID() quickfix.SessionID { m.sessionsMu.RLock() defer m.sessionsMu.RUnlock() for _, s := range m.sessions { return s } return quickfix.SessionID{} } // handleRawMessage is the single ingest path for every application-level FIX message // (admin messages — Logon, Logout, Heartbeat, TestRequest, ResendRequest, SequenceReset — // go through ToAdmin/FromAdmin and never reach this callback). It persists the raw // envelope to the logs table, builds a structured Message and saves it to // the messages table, and appends to the in-memory list. func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) { quoteReqID := extractIdentifier(msg) if err := m.store.SaveLog(domain.LogEntry{ QuoteReqID: quoteReqID, RawMsg: "[" + direction + "] " + msg.String(), }); err != nil { err = tracerr.Errorf("failed to persist raw log: %w", err) slog.Error(err.Error()) } msgTypeBytes, _ := msg.Header.GetBytes(tag.MsgType) msgType := string(msgTypeBytes) senderCompID, msgSeqNum, sendingTime := extractHeaderMeta(msg) fixJSON := buildFixMessageJSON(direction, msgType, quoteReqID, msg, m.dict) stored := domain.Message{ ID: uuid.NewV4().String(), SenderCompID: senderCompID, MsgSeqNum: msgSeqNum, SendingTime: sendingTime, CreatedAt: time.Now(), JMessage: fixJSON, } if err := m.store.SaveMessage(stored); err != nil { err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", msgType, quoteReqID, err) slog.Error(err.Error()) } m.messagesMu.Lock() m.messages = append(m.messages, stored) m.messagesMu.Unlock() } // GetAllMessages returns today's FIX application messages with MsgSeqNum greater than // the caller's last-seen sequence per direction (inSeq for IN, outSeq for OUT), sorted // ascending by CreatedAt. Passing 0 for either cursor returns all messages on that side. func (m *Manager) GetAllMessages(inSeq, outSeq int) []domain.Message { m.messagesMu.RLock() log.Info().Msgf("request received, inSeq: %d, outSeq: %d", inSeq, outSeq) filtered := make([]domain.Message, 0, len(m.messages)) for _, msg := range m.messages { switch msg.JMessage.Direction { case "IN": if msg.MsgSeqNum > inSeq { filtered = append(filtered, msg) } case "OUT": if msg.MsgSeqNum > outSeq { filtered = append(filtered, msg) } } } m.messagesMu.RUnlock() sort.Slice(filtered, func(i, j int) bool { return filtered[i].CreatedAt.Before(filtered[j].CreatedAt) }) log.Info().Msgf("messages sent: %d", len(filtered)) return filtered } // loadTodayMessages rebuilds the in-memory message list from today's rows in the DB. // Must be called before the FIX initiator starts so live ingest doesn't race with replay. func (m *Manager) loadTodayMessages() error { messages, err := m.store.GetTodayMessages() if err != nil { return err } m.messagesMu.Lock() m.messages = messages m.messagesMu.Unlock() slog.Info("today messages loaded", "count", len(messages)) return nil } // loadActiveTrades reconstructs active trades from today's messages in the database. func (m *Manager) loadActiveTrades() error { messages, err := m.store.GetTodayMessages() if err != nil { return err } activeTrades := make(map[string]*listTrade) for _, msg := range messages { quoteReqID := msg.JMessage.QuoteReqID switch msg.JMessage.MsgType { case "R": // QuoteRequest -> trade is born if !strings.HasPrefix(quoteReqID, "LST_") { continue } relSym := firstGroup(msg.JMessage.Body, "NoRelatedSym") if getString(relSym, "NegotiationType") != "RFQ" { continue } if getString(relSym, "ListID") == "" { continue } activeTrades[quoteReqID] = &listTrade{ QuoteRequest: msg.JMessage, } case "S": // Outgoing Quote — dealer has already quoted this trade if t, ok := activeTrades[quoteReqID]; ok { t.Quoted = true t.Price = getDecimal(msg.JMessage.Body, "Price") } } } m.trades = activeTrades slog.Info("recovery completed", "activeTrades", len(activeTrades)) return nil }