generic json for fix messages

This commit is contained in:
Ramiro Paz
2026-05-07 17:37:17 -03:00
parent 36b841fc66
commit 45fad9de6c
10 changed files with 794 additions and 318 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/datadictionary"
"quantex.com/qfixdpl/quickfix/gen/enum"
"quantex.com/qfixdpl/quickfix/gen/field"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
@ -27,18 +28,10 @@ import (
)
type listTrade struct {
QuoteReqID string
ListID string
Symbol string
SecurityIDSrc enum.SecurityIDSource
Currency string
Side enum.Side
OrderQty decimal.Decimal
SettlDate string
Price decimal.Decimal
OwnerTraderID string
SessionID quickfix.SessionID
Quoted bool
QuoteRequest domain.FixMessageJSON
SessionID quickfix.SessionID
Quoted bool
Price decimal.Decimal
}
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
@ -52,6 +45,7 @@ type Manager struct {
store domain.PersistenceStore
notify domain.Notifier
cfg app.FIXConfig
dict *datadictionary.DataDictionary
}
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
@ -76,6 +70,15 @@ func (m *Manager) Start() error {
fixApp.onRawMessage = m.handleRawMessage
m.app = fixApp
dict, err := datadictionary.Parse(m.cfg.DataDictionaryFile)
if err != nil {
err = tracerr.Errorf("error parsing FIX data dictionary %q: %w", m.cfg.DataDictionaryFile, err)
slog.Error(err.Error())
return err
}
m.dict = dict
if err := m.loadActiveTrades(); err != nil {
err = tracerr.Errorf("failed to load active trades from DB, starting with empty state: %w", err)
slog.Error(err.Error())
@ -191,13 +194,15 @@ func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID qu
return quickfix.SendToTarget(bn, sessionID)
}
// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge) followed by a Quote at price 99.6.
// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge).
// The Quote (35=S) is sent later via the REST endpoint when the dealer prices it.
func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) {
quoteReqID, err := msg.GetQuoteReqID()
if err != nil {
err := tracerr.Errorf("handleQuoteRequest: missing QuoteReqID: %w", err)
slog.Error(err.Error())
parsed := parseQuoteRequest(msg, m.dict)
quoteReqID := parsed.QuoteReqID
if quoteReqID == "" {
err := tracerr.Errorf("handleQuoteRequest: missing QuoteReqID")
slog.Error(err.Error())
return
}
@ -207,26 +212,22 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
return
}
var (
symbol, currency, ownerTraderID, settlDate, listID, negotiationType string
side enum.Side
secIDSource enum.SecurityIDSource
orderQty decimal.Decimal
)
relatedSyms, relErr := msg.GetNoRelatedSym()
if relErr == nil && relatedSyms.Len() > 0 {
sym := relatedSyms.Get(0)
symbol, _ = sym.GetSecurityID()
secIDSource, _ = sym.GetSecurityIDSource()
currency, _ = sym.GetCurrency()
side, _ = sym.GetSide()
ownerTraderID, _ = sym.GetOwnerTraderID()
orderQty, _ = sym.GetOrderQty()
settlDate, _ = sym.GetSettlDate()
listID, _ = sym.GetListID()
negotiationType, _ = sym.GetNegotiationType()
bodyKeys := make([]string, 0, len(parsed.Body))
for k := range parsed.Body {
bodyKeys = append(bodyKeys, k)
}
slog.Info("handleQuoteRequest: parsed body keys", "quoteReqID", quoteReqID, "keys", bodyKeys)
relSym := firstGroup(parsed.Body, "NoRelatedSym")
relSymKeys := make([]string, 0, len(relSym))
for k := range relSym {
relSymKeys = append(relSymKeys, k)
}
slog.Info("handleQuoteRequest: NoRelatedSym keys", "quoteReqID", quoteReqID, "keys", relSymKeys)
listID := getString(relSym, "ListID")
negotiationType := getString(relSym, "NegotiationType")
ownerTraderID := getString(relSym, "OwnerTraderID")
if listID == "" {
slog.Warn("handleQuoteRequest: missing ListID", "quoteReqID", quoteReqID)
@ -246,33 +247,20 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
}
slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID)
sIDSource := enum.SecurityIDSource_ISIN_NUMBER
if secIDSource == enum.SecurityIDSource_CUSIP {
sIDSource = enum.SecurityIDSource_CUSIP
}
// Store trade state as pending; Quote (35=S) is sent later via REST endpoint.
m.tradesMu.Lock()
m.trades[quoteReqID] = &listTrade{
QuoteReqID: quoteReqID,
ListID: listID,
Symbol: symbol,
SecurityIDSrc: sIDSource,
Currency: currency,
Side: side,
OrderQty: orderQty,
SettlDate: settlDate,
OwnerTraderID: ownerTraderID,
SessionID: sessionID,
Quoted: false,
QuoteRequest: parsed,
SessionID: sessionID,
Quoted: false,
}
m.tradesMu.Unlock()
// Persist structured message (outside mutex).
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
// Persist incoming QuoteRequest.
m.persistMessage(quoteReqID, parsed)
// Persist outgoing QuoteStatusReport.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
@ -285,7 +273,7 @@ func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.Sessi
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
m.persistMessage(quoteReqID, parseQuoteAck(msg))
m.persistMessage(quoteReqID, parseQuoteAck(msg, m.dict))
if status != enum.QuoteAckStatus_ACCEPTED {
err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text)
@ -331,10 +319,10 @@ func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
m.persistMessage(quoteReqID, parseQuoteResponse(msg, m.dict))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
@ -394,10 +382,10 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg))
m.persistMessage(clOrdID, parseExecutionReport(msg, m.dict))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]any{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
@ -439,18 +427,14 @@ func (m *Manager) GetPendingQuoteRequests() []domain.ListTrade {
}
func toDomainListTrade(t *listTrade) domain.ListTrade {
return domain.ListTrade{
QuoteReqID: t.QuoteReqID,
ListID: t.ListID,
Symbol: t.Symbol,
SecurityIDSrc: string(t.SecurityIDSrc),
Currency: t.Currency,
Side: string(t.Side),
OrderQty: t.OrderQty.String(),
SettlDate: t.SettlDate,
Price: t.Price.String(),
OwnerTraderID: t.OwnerTraderID,
out := domain.ListTrade{
QuoteRequest: t.QuoteRequest,
Quoted: t.Quoted,
}
if !t.Price.IsZero() {
out.Price = t.Price.String()
}
return out
}
// SendQuote builds and sends a Quote (35=S) for an existing pending QuoteRequest at the given price.
@ -481,15 +465,20 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
}
}
symbol := t.Symbol
sIDSource := t.SecurityIDSrc
currency := t.Currency
side := t.Side
orderQty := t.OrderQty
settlDate := t.SettlDate
ownerTraderID := t.OwnerTraderID
relSym := firstGroup(t.QuoteRequest.Body, "NoRelatedSym")
symbol := getString(relSym, "SecurityID")
sIDSource := enum.SecurityIDSource(getString(relSym, "SecurityIDSource"))
currency := getString(relSym, "Currency")
side := enum.Side(getString(relSym, "Side"))
orderQty := getDecimal(relSym, "OrderQty")
settlDate := getString(relSym, "SettlDate")
ownerTraderID := getString(relSym, "OwnerTraderID")
m.tradesMu.Unlock()
if sIDSource != enum.SecurityIDSource_CUSIP {
sIDSource = enum.SecurityIDSource_ISIN_NUMBER
}
quoteID := quoteReqID
q := quote.New(
field.NewQuoteID(quoteID),
@ -545,7 +534,7 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String())
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]any{
"QuoteReqID": quoteReqID,
"QuoteID": quoteID,
"Symbol": symbol,
@ -559,6 +548,45 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
return nil
}
// firstGroup returns the first repetition of a NUMINGROUP field, or nil.
func firstGroup(body map[string]any, name string) map[string]any {
if body == nil {
return nil
}
groups, ok := body[name].([]map[string]any)
if !ok || len(groups) == 0 {
return nil
}
return groups[0]
}
// getString reads a string value from a body map, tolerating nil maps and missing keys.
func getString(body map[string]any, name string) string {
if body == nil {
return ""
}
if v, ok := body[name].(string); ok {
return v
}
return ""
}
// getDecimal reads a decimal value from a body map. Numeric FIX types come through
// as strings (e.g. "10000"); INT-typed counts may be int. Both are accepted.
func getDecimal(body map[string]any, name string) decimal.Decimal {
if body == nil {
return decimal.Decimal{}
}
switch v := body[name].(type) {
case string:
d, _ := decimal.NewFromString(v)
return d
case int:
return decimal.NewFromInt(int64(v))
}
return decimal.Decimal{}
}
func (m *Manager) anyActiveSessionID() quickfix.SessionID {
m.sessionsMu.RLock()
defer m.sessionsMu.RUnlock()
@ -608,84 +636,38 @@ func (m *Manager) loadActiveTrades() error {
continue
}
body := msg.JMessage.Body
nt, _ := body["NegotiationType"].(string)
if nt != "RFQ" {
relSym := firstGroup(msg.JMessage.Body, "NoRelatedSym")
if getString(relSym, "NegotiationType") != "RFQ" {
continue
}
if getString(relSym, "ListID") == "" {
continue
}
listID, _ := body["ListID"].(string)
if listID == "" {
continue
activeTrades[msg.QuoteReqID] = &listTrade{
QuoteRequest: msg.JMessage,
}
trade := &listTrade{
QuoteReqID: msg.QuoteReqID,
ListID: listID,
}
if v, ok := body["SecurityID"].(string); ok {
trade.Symbol = v
}
if v, ok := body["SecurityIDSource"].(string); ok {
trade.SecurityIDSrc = enum.SecurityIDSource(v)
}
if v, ok := body["Currency"].(string); ok {
trade.Currency = v
}
if v, ok := body["Side"].(string); ok {
trade.Side = enum.Side(v)
}
if v, ok := body["OrderQty"].(string); ok {
trade.OrderQty, _ = decimal.NewFromString(v)
}
if v, ok := body["SettlDate"].(string); ok {
trade.SettlDate = v
}
if v, ok := body["OwnerTraderID"].(string); ok {
trade.OwnerTraderID = v
}
activeTrades[msg.QuoteReqID] = trade
case "S": // Outgoing Quote — dealer has already quoted this trade
if t, ok := activeTrades[msg.QuoteReqID]; ok {
t.Quoted = true
if v, ok := msg.JMessage.Body["Price"].(string); ok {
t.Price, _ = decimal.NewFromString(v)
}
t.Price = getDecimal(msg.JMessage.Body, "Price")
}
case "CW": // QuoteAck — if rejected, trade is dead
body := msg.JMessage.Body
quoteAckStatus, _ := body["QuoteAckStatus"].(string)
if quoteAckStatus != string(enum.QuoteAckStatus_ACCEPTED) {
if getString(msg.JMessage.Body, "QuoteAckStatus") != string(enum.QuoteAckStatus_ACCEPTED) {
delete(activeTrades, msg.QuoteReqID)
}
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
body := msg.JMessage.Body
quoteRespID, _ := body["QuoteRespID"].(string)
if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
if strings.HasSuffix(getString(msg.JMessage.Body, "QuoteRespID"), "_TRDSUMM") {
delete(activeTrades, msg.QuoteReqID)
}
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
body := msg.JMessage.Body
execID, _ := body["ExecID"].(string)
clOrdID, _ := body["ClOrdID"].(string)
if strings.Contains(execID, "_TRDSUMM") {
delete(activeTrades, clOrdID)
if strings.Contains(getString(body, "ExecID"), "_TRDSUMM") {
delete(activeTrades, getString(body, "ClOrdID"))
}
}
}