From 45fad9de6c825f493fd50ec3602d5cc9bfcb5263 Mon Sep 17 00:00:00 2001 From: Ramiro Paz Date: Thu, 7 May 2026 17:37:17 -0300 Subject: [PATCH] generic json for fix messages --- fix.cfg | 4 + quickfix/field_map.go | 18 +++ quickfix/tag_value.go | 10 ++ spec/FIXT11.xml | 313 ++++++++++++++++++++++++++++++++++++++ src/app/model.go | 3 +- src/client/fix/builder.go | 172 +++++++++++++++++++++ src/client/fix/dict.go | 113 ++++++++++++++ src/client/fix/manager.go | 256 +++++++++++++++---------------- src/client/fix/parser.go | 197 ++++-------------------- src/domain/persistence.go | 26 ++-- 10 files changed, 794 insertions(+), 318 deletions(-) create mode 100644 spec/FIXT11.xml create mode 100644 src/client/fix/builder.go create mode 100644 src/client/fix/dict.go diff --git a/fix.cfg b/fix.cfg index d45ecfe..38ae765 100644 --- a/fix.cfg +++ b/fix.cfg @@ -5,6 +5,10 @@ SenderCompID=QUANTEX ResetOnLogon=Y FileStorePath=fix_store FileLogPath=fix_logs +TransportDataDictionary=spec/FIXT11.xml +AppDataDictionary=spec/FIX50SP2.xml +AllowUnknownMessageFields=Y +RejectInvalidMessage=N [SESSION] BeginString=FIXT.1.1 diff --git a/quickfix/field_map.go b/quickfix/field_map.go index 4aac64b..adb4d8e 100644 --- a/quickfix/field_map.go +++ b/quickfix/field_map.go @@ -89,6 +89,24 @@ func (m FieldMap) Get(parser Field) MessageRejectError { return m.GetField(parser.Tag(), parser) } +// RawValues returns a copy of the underlying TagValue slice for a tag. +// For repeating groups, the first entry is the count and the remaining +// entries are the flattened inner-field values across all repetitions. +func (m FieldMap) RawValues(tag Tag) []TagValue { + m.rwLock.RLock() + defer m.rwLock.RUnlock() + + f, ok := m.tagLookup[tag] + if !ok { + return nil + } + + out := make([]TagValue, len(f)) + copy(out, f) + + return out +} + // Has returns true if the Tag is present in this FieldMap. func (m FieldMap) Has(tag Tag) bool { m.rwLock.RLock() diff --git a/quickfix/tag_value.go b/quickfix/tag_value.go index 441635c..901079c 100644 --- a/quickfix/tag_value.go +++ b/quickfix/tag_value.go @@ -86,6 +86,16 @@ func (tv TagValue) String() string { return string(tv.bytes) } +// Tag returns the FIX tag for this TagValue. +func (tv TagValue) Tag() Tag { + return tv.tag +} + +// Value returns the raw FIX value bytes for this TagValue. +func (tv TagValue) Value() []byte { + return tv.value +} + func bytesTotal(bytes []byte) (total int) { for _, b := range bytes { total += int(b) diff --git a/spec/FIXT11.xml b/spec/FIXT11.xml new file mode 100644 index 0000000..a7d733f --- /dev/null +++ b/spec/FIXT11.xml @@ -0,0 +1,313 @@ + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
diff --git a/src/app/model.go b/src/app/model.go index 63f9496..ce29f04 100644 --- a/src/app/model.go +++ b/src/app/model.go @@ -40,7 +40,8 @@ type Service struct { } type FIXConfig struct { - SettingsFile string // path to fix.cfg file + SettingsFile string // path to fix.cfg file + DataDictionaryFile string // path to FIX data dictionary XML (e.g. spec/FIX50SP2.xml) } type ExtAuth struct { diff --git a/src/client/fix/builder.go b/src/client/fix/builder.go new file mode 100644 index 0000000..0d428b9 --- /dev/null +++ b/src/client/fix/builder.go @@ -0,0 +1,172 @@ +package fix + +import ( + "fmt" + + "quantex.com/qfixdpl/quickfix" + "quantex.com/qfixdpl/quickfix/datadictionary" +) + +// BuildFieldMap walks a quickfix.FieldMap and produces an enriched FieldMap +// keyed by FixField (Name + Tag + Type) with values typed per the FIX +// data dictionary. Repeating groups become []FieldMap; nested groups recurse. +func BuildFieldMap(qfMap quickfix.FieldMap, dd *datadictionary.DataDictionary, fieldsByTag map[int]*datadictionary.FieldDef) FieldMap { + out := FieldMap{} + + for _, t := range qfMap.Tags() { + tagInt := int(t) + rawValues := qfMap.RawValues(t) + if len(rawValues) == 0 { + continue + } + + ff, fd := resolveField(dd, fieldsByTag, tagInt) + + if ff.Type == "NUMINGROUP" { + if fd != nil && fd.IsGroup() { + out[ff] = buildGroup(rawValues[1:], dd, fd.Fields) + } else { + // Count tag with no group structure available (dictionary lookup + // failed, or quickfix didn't nest the inner fields). Emit an empty + // group rather than an int so the consumer's type expectation holds. + out[ff] = []FieldMap{} + } + continue + } + + out[ff] = parseScalar(rawValues[0].Value(), ff.Type) + } + + return out +} + +// buildGroup splits a flat slice of TagValues (the inner values of a NUMINGROUP, +// excluding the count) into per-repetition FieldMaps. The first element of +// innerFields is the delimiter that marks the start of each repetition. +func buildGroup(tvs []quickfix.TagValue, dd *datadictionary.DataDictionary, innerFields []*datadictionary.FieldDef) []FieldMap { + if len(innerFields) == 0 || len(tvs) == 0 { + return nil + } + + delimiter := innerFields[0].Tag() + fdByTag := make(map[int]*datadictionary.FieldDef, len(innerFields)) + for _, f := range innerFields { + fdByTag[f.Tag()] = f + } + + var ( + repetitions []FieldMap + current FieldMap + ) + + i := 0 + for i < len(tvs) { + tv := tvs[i] + tagInt := int(tv.Tag()) + + if tagInt == delimiter { + current = FieldMap{} + repetitions = append(repetitions, current) + } + + if current == nil { + // Field appeared before the first delimiter; skip. + i++ + continue + } + + fd, known := fdByTag[tagInt] + if !known { + current[FixField{Name: fmt.Sprintf("tag_%d", tagInt), Tag: tagInt, Type: "STRING"}] = string(tv.Value()) + i++ + continue + } + + if fd.IsGroup() { + nestedAllowed := allowedTags(fd.Fields) + j := i + 1 + for j < len(tvs) && nestedAllowed[int(tvs[j].Tag())] { + j++ + } + nested := buildGroup(tvs[i+1:j], dd, fd.Fields) + current[FixField{Name: fd.Name(), Tag: tagInt, Type: "NUMINGROUP"}] = nested + i = j + continue + } + + current[FixField{Name: fd.Name(), Tag: tagInt, Type: fd.Type}] = parseScalar(tv.Value(), fd.Type) + i++ + } + + return repetitions +} + +// resolveField looks up the FixField metadata (name + type) for a tag. +// Prefers the message-level FieldDef so group structure is preserved; +// falls back to the global FieldTypeByTag, then to a synthetic "tag_" STRING. +func resolveField(dd *datadictionary.DataDictionary, fieldsByTag map[int]*datadictionary.FieldDef, tagInt int) (FixField, *datadictionary.FieldDef) { + if fieldsByTag != nil { + if fd, ok := fieldsByTag[tagInt]; ok { + return FixField{Name: fd.Name(), Tag: tagInt, Type: fd.Type}, fd + } + } + + if dd != nil { + if ft, ok := dd.FieldTypeByTag[tagInt]; ok { + return FixField{Name: ft.Name(), Tag: tagInt, Type: ft.Type}, nil + } + } + + return FixField{Name: fmt.Sprintf("tag_%d", tagInt), Tag: tagInt, Type: "STRING"}, nil +} + +// parseScalar converts raw FIX bytes into the Go type expected by the consumer +// (GetKeyValue): bool for BOOLEAN, int for INT/SEQNUM, time.Time for UTCTIMESTAMP, +// string for everything else. +func parseScalar(raw []byte, fixType string) interface{} { + s := string(raw) + + switch fixType { + case "INT", "SEQNUM", "LENGTH", "DAYOFMONTH": + var v quickfix.FIXInt + if err := v.Read(raw); err == nil { + return int(v) + } + return s + case "BOOLEAN": + var v quickfix.FIXBoolean + if err := v.Read(raw); err == nil { + return bool(v) + } + return s + case "UTCTIMESTAMP": + var v quickfix.FIXUTCTimestamp + if err := v.Read(raw); err == nil { + return v.Time + } + return s + default: + return s + } +} + +// allowedTags returns the set of all tags valid inside a group (including +// nested-group counts and their descendants), used to detect where a flat +// nested-group slice ends within its parent. +func allowedTags(fields []*datadictionary.FieldDef) map[int]bool { + out := make(map[int]bool, len(fields)) + + var visit func(fs []*datadictionary.FieldDef) + visit = func(fs []*datadictionary.FieldDef) { + for _, f := range fs { + out[f.Tag()] = true + if f.IsGroup() { + visit(f.Fields) + } + } + } + + visit(fields) + + return out +} diff --git a/src/client/fix/dict.go b/src/client/fix/dict.go new file mode 100644 index 0000000..f7fe371 --- /dev/null +++ b/src/client/fix/dict.go @@ -0,0 +1,113 @@ +package fix + +import ( + "log/slog" + "time" + + "quantex.com/qfixdpl/src/common/tracerr" +) + +// FixField identifies a single FIX field by its dictionary metadata. +type FixField struct { + Name string + Tag int + Type string +} + +// FieldValue is the typed value for a FIX field. +type FieldValue interface{} + +// FieldMap is the enriched representation of a FIX FieldMap (header, body, or trailer). +type FieldMap map[FixField]FieldValue + +// GetMap converts a FieldMap into a JSON-friendly map keyed by field name. +func GetMap(fieldMap FieldMap) map[string]interface{} { + result := map[string]interface{}{} + + for f, v := range fieldMap { + k, val := GetKeyValue(f, v) + result[k] = val + } + + return result +} + +//nolint:funlen,gocyclo,cyclop //it's long but easy to read +func GetKeyValue(f FixField, value FieldValue) (key string, val interface{}) { + key = f.Name + + switch f.Type { + case "NUMINGROUP": + var groups []map[string]interface{} + + fMapLst, ok := value.([]FieldMap) + if !ok { + err := tracerr.Errorf("could not parse as []FieldMap, value: %+v", value) + slog.Error(err.Error()) + } + + for _, fieldMap := range fMapLst { + groups = append(groups, GetMap(fieldMap)) + } + + val = groups + case "BOOLEAN": + b, ok := value.(bool) + if !ok { + err := tracerr.Errorf("could not parse as bool, value: %+v", value) + slog.Error(err.Error()) + } + + val = b + case "INT": + i, ok := value.(int) + if !ok { + err := tracerr.Errorf("could not parse as int, value: %+v", value) + slog.Error(err.Error()) + } + + val = i + case "SEQNUM": + i, ok := value.(int) + if !ok { + err := tracerr.Errorf("could not parse as int, value: %+v", value) + slog.Error(err.Error()) + } + + val = i + case "UTCTIMESTAMP": + t, ok := value.(time.Time) + if !ok { + err := tracerr.Errorf("could not parse as Time, value: %+v", value) + slog.Error(err.Error()) + } + + val = t + case "STRING": + s, ok := value.(string) + if !ok { + err := tracerr.Errorf("could not parse as string, value: %+v", value) + slog.Error(err.Error()) + } + + val = s + case "CHAR": + s, ok := value.(string) + if !ok { + err := tracerr.Errorf("could not parse as string, value: %+v", value) + slog.Error(err.Error()) + } + + val = s + default: + s, ok := value.(string) + if !ok { + err := tracerr.Errorf("could not parse as string, value: %+v", value) + slog.Error(err.Error()) + } + + val = s + } + + return key, val +} diff --git a/src/client/fix/manager.go b/src/client/fix/manager.go index d5a914f..20003fe 100644 --- a/src/client/fix/manager.go +++ b/src/client/fix/manager.go @@ -10,6 +10,7 @@ import ( "github.com/shopspring/decimal" "quantex.com/qfixdpl/quickfix" + "quantex.com/qfixdpl/quickfix/datadictionary" "quantex.com/qfixdpl/quickfix/gen/enum" "quantex.com/qfixdpl/quickfix/gen/field" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack" @@ -27,18 +28,10 @@ import ( ) type listTrade struct { - QuoteReqID string - ListID string - Symbol string - SecurityIDSrc enum.SecurityIDSource - Currency string - Side enum.Side - OrderQty decimal.Decimal - SettlDate string - Price decimal.Decimal - OwnerTraderID string - SessionID quickfix.SessionID - Quoted bool + QuoteRequest domain.FixMessageJSON + SessionID quickfix.SessionID + Quoted bool + Price decimal.Decimal } // Manager wraps the QuickFIX initiator and implements domain.FIXSender. @@ -52,6 +45,7 @@ type Manager struct { store domain.PersistenceStore notify domain.Notifier cfg app.FIXConfig + dict *datadictionary.DataDictionary } func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager { @@ -76,6 +70,15 @@ func (m *Manager) Start() error { fixApp.onRawMessage = m.handleRawMessage m.app = fixApp + dict, err := datadictionary.Parse(m.cfg.DataDictionaryFile) + if err != nil { + err = tracerr.Errorf("error parsing FIX data dictionary %q: %w", m.cfg.DataDictionaryFile, err) + slog.Error(err.Error()) + + return err + } + m.dict = dict + if err := m.loadActiveTrades(); err != nil { err = tracerr.Errorf("failed to load active trades from DB, starting with empty state: %w", err) slog.Error(err.Error()) @@ -191,13 +194,15 @@ func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID qu return quickfix.SendToTarget(bn, sessionID) } -// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge) followed by a Quote at price 99.6. +// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge). +// The Quote (35=S) is sent later via the REST endpoint when the dealer prices it. func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) { - quoteReqID, err := msg.GetQuoteReqID() - if err != nil { - err := tracerr.Errorf("handleQuoteRequest: missing QuoteReqID: %w", err) - slog.Error(err.Error()) + parsed := parseQuoteRequest(msg, m.dict) + quoteReqID := parsed.QuoteReqID + if quoteReqID == "" { + err := tracerr.Errorf("handleQuoteRequest: missing QuoteReqID") + slog.Error(err.Error()) return } @@ -207,26 +212,22 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu return } - var ( - symbol, currency, ownerTraderID, settlDate, listID, negotiationType string - side enum.Side - secIDSource enum.SecurityIDSource - orderQty decimal.Decimal - ) - - relatedSyms, relErr := msg.GetNoRelatedSym() - if relErr == nil && relatedSyms.Len() > 0 { - sym := relatedSyms.Get(0) - symbol, _ = sym.GetSecurityID() - secIDSource, _ = sym.GetSecurityIDSource() - currency, _ = sym.GetCurrency() - side, _ = sym.GetSide() - ownerTraderID, _ = sym.GetOwnerTraderID() - orderQty, _ = sym.GetOrderQty() - settlDate, _ = sym.GetSettlDate() - listID, _ = sym.GetListID() - negotiationType, _ = sym.GetNegotiationType() + bodyKeys := make([]string, 0, len(parsed.Body)) + for k := range parsed.Body { + bodyKeys = append(bodyKeys, k) } + slog.Info("handleQuoteRequest: parsed body keys", "quoteReqID", quoteReqID, "keys", bodyKeys) + + relSym := firstGroup(parsed.Body, "NoRelatedSym") + relSymKeys := make([]string, 0, len(relSym)) + for k := range relSym { + relSymKeys = append(relSymKeys, k) + } + slog.Info("handleQuoteRequest: NoRelatedSym keys", "quoteReqID", quoteReqID, "keys", relSymKeys) + + listID := getString(relSym, "ListID") + negotiationType := getString(relSym, "NegotiationType") + ownerTraderID := getString(relSym, "OwnerTraderID") if listID == "" { slog.Warn("handleQuoteRequest: missing ListID", "quoteReqID", quoteReqID) @@ -246,33 +247,20 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu } slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID) - sIDSource := enum.SecurityIDSource_ISIN_NUMBER - if secIDSource == enum.SecurityIDSource_CUSIP { - sIDSource = enum.SecurityIDSource_CUSIP - } - // Store trade state as pending; Quote (35=S) is sent later via REST endpoint. m.tradesMu.Lock() m.trades[quoteReqID] = &listTrade{ - QuoteReqID: quoteReqID, - ListID: listID, - Symbol: symbol, - SecurityIDSrc: sIDSource, - Currency: currency, - Side: side, - OrderQty: orderQty, - SettlDate: settlDate, - OwnerTraderID: ownerTraderID, - SessionID: sessionID, - Quoted: false, + QuoteRequest: parsed, + SessionID: sessionID, + Quoted: false, } m.tradesMu.Unlock() - // Persist structured message (outside mutex). - m.persistMessage(quoteReqID, parseQuoteRequest(msg)) + // Persist incoming QuoteRequest. + m.persistMessage(quoteReqID, parsed) // Persist outgoing QuoteStatusReport. - m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{ + m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{ "QuoteReqID": quoteReqID, "QuoteStatus": string(enum.QuoteStatus_ACCEPTED), "OwnerTraderID": ownerTraderID, @@ -285,7 +273,7 @@ func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.Sessi status, _ := msg.GetQuoteAckStatus() text, _ := msg.GetText() - m.persistMessage(quoteReqID, parseQuoteAck(msg)) + m.persistMessage(quoteReqID, parseQuoteAck(msg, m.dict)) if status != enum.QuoteAckStatus_ACCEPTED { err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text) @@ -331,10 +319,10 @@ func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) // Persist incoming QuoteResponse. - m.persistMessage(quoteReqID, parseQuoteResponse(msg)) + m.persistMessage(quoteReqID, parseQuoteResponse(msg, m.dict)) // Persist outgoing ACK. - m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{ + m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{ "QuoteReqID": quoteReqID, "QuoteRespID": quoteRespID, "QuoteStatus": string(enum.QuoteStatus_ACCEPTED), @@ -394,10 +382,10 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses } // Persist incoming ExecutionReport. - m.persistMessage(clOrdID, parseExecutionReport(msg)) + m.persistMessage(clOrdID, parseExecutionReport(msg, m.dict)) // Persist outgoing ExecutionAck. - m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{ + m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]any{ "OrderID": orderID, "ExecID": execID, "ClOrdID": clOrdID, @@ -439,18 +427,14 @@ func (m *Manager) GetPendingQuoteRequests() []domain.ListTrade { } func toDomainListTrade(t *listTrade) domain.ListTrade { - return domain.ListTrade{ - QuoteReqID: t.QuoteReqID, - ListID: t.ListID, - Symbol: t.Symbol, - SecurityIDSrc: string(t.SecurityIDSrc), - Currency: t.Currency, - Side: string(t.Side), - OrderQty: t.OrderQty.String(), - SettlDate: t.SettlDate, - Price: t.Price.String(), - OwnerTraderID: t.OwnerTraderID, + out := domain.ListTrade{ + QuoteRequest: t.QuoteRequest, + Quoted: t.Quoted, } + if !t.Price.IsZero() { + out.Price = t.Price.String() + } + return out } // SendQuote builds and sends a Quote (35=S) for an existing pending QuoteRequest at the given price. @@ -481,15 +465,20 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error { } } - symbol := t.Symbol - sIDSource := t.SecurityIDSrc - currency := t.Currency - side := t.Side - orderQty := t.OrderQty - settlDate := t.SettlDate - ownerTraderID := t.OwnerTraderID + relSym := firstGroup(t.QuoteRequest.Body, "NoRelatedSym") + symbol := getString(relSym, "SecurityID") + sIDSource := enum.SecurityIDSource(getString(relSym, "SecurityIDSource")) + currency := getString(relSym, "Currency") + side := enum.Side(getString(relSym, "Side")) + orderQty := getDecimal(relSym, "OrderQty") + settlDate := getString(relSym, "SettlDate") + ownerTraderID := getString(relSym, "OwnerTraderID") m.tradesMu.Unlock() + if sIDSource != enum.SecurityIDSource_CUSIP { + sIDSource = enum.SecurityIDSource_ISIN_NUMBER + } + quoteID := quoteReqID q := quote.New( field.NewQuoteID(quoteID), @@ -545,7 +534,7 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error { slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String()) - m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{ + m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]any{ "QuoteReqID": quoteReqID, "QuoteID": quoteID, "Symbol": symbol, @@ -559,6 +548,45 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error { return nil } +// firstGroup returns the first repetition of a NUMINGROUP field, or nil. +func firstGroup(body map[string]any, name string) map[string]any { + if body == nil { + return nil + } + groups, ok := body[name].([]map[string]any) + if !ok || len(groups) == 0 { + return nil + } + return groups[0] +} + +// getString reads a string value from a body map, tolerating nil maps and missing keys. +func getString(body map[string]any, name string) string { + if body == nil { + return "" + } + if v, ok := body[name].(string); ok { + return v + } + return "" +} + +// getDecimal reads a decimal value from a body map. Numeric FIX types come through +// as strings (e.g. "10000"); INT-typed counts may be int. Both are accepted. +func getDecimal(body map[string]any, name string) decimal.Decimal { + if body == nil { + return decimal.Decimal{} + } + switch v := body[name].(type) { + case string: + d, _ := decimal.NewFromString(v) + return d + case int: + return decimal.NewFromInt(int64(v)) + } + return decimal.Decimal{} +} + func (m *Manager) anyActiveSessionID() quickfix.SessionID { m.sessionsMu.RLock() defer m.sessionsMu.RUnlock() @@ -608,84 +636,38 @@ func (m *Manager) loadActiveTrades() error { continue } - body := msg.JMessage.Body - - nt, _ := body["NegotiationType"].(string) - if nt != "RFQ" { + relSym := firstGroup(msg.JMessage.Body, "NoRelatedSym") + if getString(relSym, "NegotiationType") != "RFQ" { + continue + } + if getString(relSym, "ListID") == "" { continue } - listID, _ := body["ListID"].(string) - if listID == "" { - continue + activeTrades[msg.QuoteReqID] = &listTrade{ + QuoteRequest: msg.JMessage, } - trade := &listTrade{ - QuoteReqID: msg.QuoteReqID, - ListID: listID, - } - - if v, ok := body["SecurityID"].(string); ok { - trade.Symbol = v - } - - if v, ok := body["SecurityIDSource"].(string); ok { - trade.SecurityIDSrc = enum.SecurityIDSource(v) - } - - if v, ok := body["Currency"].(string); ok { - trade.Currency = v - } - - if v, ok := body["Side"].(string); ok { - trade.Side = enum.Side(v) - } - - if v, ok := body["OrderQty"].(string); ok { - trade.OrderQty, _ = decimal.NewFromString(v) - } - - if v, ok := body["SettlDate"].(string); ok { - trade.SettlDate = v - } - - if v, ok := body["OwnerTraderID"].(string); ok { - trade.OwnerTraderID = v - } - - activeTrades[msg.QuoteReqID] = trade - case "S": // Outgoing Quote — dealer has already quoted this trade if t, ok := activeTrades[msg.QuoteReqID]; ok { t.Quoted = true - if v, ok := msg.JMessage.Body["Price"].(string); ok { - t.Price, _ = decimal.NewFromString(v) - } + t.Price = getDecimal(msg.JMessage.Body, "Price") } case "CW": // QuoteAck — if rejected, trade is dead - body := msg.JMessage.Body - quoteAckStatus, _ := body["QuoteAckStatus"].(string) - - if quoteAckStatus != string(enum.QuoteAckStatus_ACCEPTED) { + if getString(msg.JMessage.Body, "QuoteAckStatus") != string(enum.QuoteAckStatus_ACCEPTED) { delete(activeTrades, msg.QuoteReqID) } case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6) - body := msg.JMessage.Body - quoteRespID, _ := body["QuoteRespID"].(string) - - if strings.HasSuffix(quoteRespID, "_TRDSUMM") { + if strings.HasSuffix(getString(msg.JMessage.Body, "QuoteRespID"), "_TRDSUMM") { delete(activeTrades, msg.QuoteReqID) } case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4) body := msg.JMessage.Body - execID, _ := body["ExecID"].(string) - clOrdID, _ := body["ClOrdID"].(string) - - if strings.Contains(execID, "_TRDSUMM") { - delete(activeTrades, clOrdID) + if strings.Contains(getString(body, "ExecID"), "_TRDSUMM") { + delete(activeTrades, getString(body, "ClOrdID")) } } } diff --git a/src/client/fix/parser.go b/src/client/fix/parser.go index ef25cff..c804cab 100644 --- a/src/client/fix/parser.go +++ b/src/client/fix/parser.go @@ -4,6 +4,7 @@ import ( "time" "quantex.com/qfixdpl/quickfix" + "quantex.com/qfixdpl/quickfix/datadictionary" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest" @@ -12,188 +13,56 @@ import ( "quantex.com/qfixdpl/src/domain" ) -func extractHeader(msg *quickfix.Message) map[string]interface{} { - header := make(map[string]interface{}) +// buildFixMessageJSON walks the full FIX message (header + body + trailer) +// using the data dictionary and returns a fully populated FixMessageJSON. +func buildFixMessageJSON(direction, msgType, quoteReqID string, msg *quickfix.Message, dd *datadictionary.DataDictionary) domain.FixMessageJSON { + var ( + headerFields map[int]*datadictionary.FieldDef + trailerFields map[int]*datadictionary.FieldDef + bodyFields map[int]*datadictionary.FieldDef + ) - if v, err := msg.Header.GetBytes(tag.BeginString); err == nil { - header["BeginString"] = string(v) - } - if v, err := msg.Header.GetBytes(tag.MsgType); err == nil { - header["MsgType"] = string(v) - } - if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil { - header["SenderCompID"] = string(v) - } - if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil { - header["TargetCompID"] = string(v) - } - if v, err := msg.Header.GetBytes(tag.MsgSeqNum); err == nil { - header["MsgSeqNum"] = string(v) - } - if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil { - header["SendingTime"] = string(v) - } - - return header -} - -func parseQuoteRequest(msg quoterequest.QuoteRequest) domain.FixMessageJSON { - quoteReqID, _ := msg.GetQuoteReqID() - body := map[string]interface{}{"QuoteReqID": quoteReqID} - - if relSyms, err := msg.GetNoRelatedSym(); err == nil && relSyms.Len() > 0 { - sym := relSyms.Get(0) - if v, e := sym.GetSecurityID(); e == nil { - body["SecurityID"] = v + if dd != nil { + if dd.Header != nil { + headerFields = dd.Header.Fields } - if v, e := sym.GetSecurityIDSource(); e == nil { - body["SecurityIDSource"] = string(v) + if dd.Trailer != nil { + trailerFields = dd.Trailer.Fields } - if v, e := sym.GetCurrency(); e == nil { - body["Currency"] = v - } - if v, e := sym.GetSide(); e == nil { - body["Side"] = string(v) - } - if v, e := sym.GetOrderQty(); e == nil { - body["OrderQty"] = v.String() - } - if v, e := sym.GetSettlDate(); e == nil { - body["SettlDate"] = v - } - if v, e := sym.GetListID(); e == nil { - body["ListID"] = v - } - if v, e := sym.GetOwnerTraderID(); e == nil { - body["OwnerTraderID"] = v - } - if v, e := sym.GetNegotiationType(); e == nil { - body["NegotiationType"] = v + if md, ok := dd.Messages[msgType]; ok { + bodyFields = md.Fields } } return domain.FixMessageJSON{ - Direction: "IN", - MsgType: "R", + Direction: direction, + MsgType: msgType, QuoteReqID: quoteReqID, - Header: extractHeader(msg.Message), - Body: body, + Header: GetMap(BuildFieldMap(msg.Header.FieldMap, dd, headerFields)), + Body: GetMap(BuildFieldMap(msg.Body.FieldMap, dd, bodyFields)), + Trailer: GetMap(BuildFieldMap(msg.Trailer.FieldMap, dd, trailerFields)), ReceiveTime: time.Now(), } } -func parseQuoteAck(msg quoteack.QuoteAck) domain.FixMessageJSON { +func parseQuoteRequest(msg quoterequest.QuoteRequest, dd *datadictionary.DataDictionary) domain.FixMessageJSON { quoteReqID, _ := msg.GetQuoteReqID() - body := map[string]interface{}{"QuoteReqID": quoteReqID} - - if v, e := msg.GetQuoteID(); e == nil { - body["QuoteID"] = v - } - if v, e := msg.GetQuoteAckStatus(); e == nil { - body["QuoteAckStatus"] = string(v) - } - if v, e := msg.GetText(); e == nil { - body["Text"] = v - } - - return domain.FixMessageJSON{ - Direction: "IN", - MsgType: "CW", - QuoteReqID: quoteReqID, - Header: extractHeader(msg.Message), - Body: body, - ReceiveTime: time.Now(), - } + return buildFixMessageJSON("IN", "R", quoteReqID, msg.Message, dd) } -func parseQuoteResponse(msg quoteresponse.QuoteResponse) domain.FixMessageJSON { +func parseQuoteAck(msg quoteack.QuoteAck, dd *datadictionary.DataDictionary) domain.FixMessageJSON { quoteReqID, _ := msg.GetQuoteReqID() - body := map[string]interface{}{"QuoteReqID": quoteReqID} - - if v, e := msg.GetQuoteRespID(); e == nil { - body["QuoteRespID"] = v - } - if v, e := msg.GetQuoteRespType(); e == nil { - body["QuoteRespType"] = string(v) - } - if v, e := msg.GetSide(); e == nil { - body["Side"] = string(v) - } - if v, e := msg.GetPrice(); e == nil { - body["Price"] = v.String() - } - if v, e := msg.GetOrderQty(); e == nil { - body["OrderQty"] = v.String() - } - if v, e := msg.GetClOrdID(); e == nil { - body["ClOrdID"] = v - } - - return domain.FixMessageJSON{ - Direction: "IN", - MsgType: "AJ", - QuoteReqID: quoteReqID, - Header: extractHeader(msg.Message), - Body: body, - ReceiveTime: time.Now(), - } + return buildFixMessageJSON("IN", "CW", quoteReqID, msg.Message, dd) } -func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessageJSON { +func parseQuoteResponse(msg quoteresponse.QuoteResponse, dd *datadictionary.DataDictionary) domain.FixMessageJSON { + quoteReqID, _ := msg.GetQuoteReqID() + return buildFixMessageJSON("IN", "AJ", quoteReqID, msg.Message, dd) +} + +func parseExecutionReport(msg executionreport.ExecutionReport, dd *datadictionary.DataDictionary) domain.FixMessageJSON { clOrdID, _ := msg.GetClOrdID() - body := map[string]interface{}{"ClOrdID": clOrdID} - - if v, e := msg.GetExecID(); e == nil { - body["ExecID"] = v - } - if v, e := msg.GetOrderID(); e == nil { - body["OrderID"] = v - } - if v, e := msg.GetExecType(); e == nil { - body["ExecType"] = string(v) - } - if v, e := msg.GetOrdStatus(); e == nil { - body["OrdStatus"] = string(v) - } - if v, e := msg.GetListID(); e == nil { - body["ListID"] = v - } - if v, e := msg.GetSide(); e == nil { - body["Side"] = string(v) - } - if v, e := msg.GetSymbol(); e == nil { - body["Symbol"] = v - } - if v, e := msg.GetSecurityID(); e == nil { - body["SecurityID"] = v - } - if v, e := msg.GetCurrency(); e == nil { - body["Currency"] = v - } - if v, e := msg.GetPrice(); e == nil { - body["Price"] = v.String() - } - if v, e := msg.GetLastPx(); e == nil { - body["LastPx"] = v.String() - } - if v, e := msg.GetLastQty(); e == nil { - body["LastQty"] = v.String() - } - if v, e := msg.GetOrderQty(); e == nil { - body["OrderQty"] = v.String() - } - if v, e := msg.GetSettlDate(); e == nil { - body["SettlDate"] = v - } - - return domain.FixMessageJSON{ - Direction: "IN", - MsgType: "8", - QuoteReqID: clOrdID, - Header: extractHeader(msg.Message), - Body: body, - ReceiveTime: time.Now(), - } + return buildFixMessageJSON("IN", "8", clOrdID, msg.Message, dd) } // extractIdentifier extracts the trade identifier from a parsed FIX message. @@ -218,7 +87,7 @@ func extractIdentifier(msg *quickfix.Message) string { return "" } -func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]interface{}) domain.FixMessageJSON { +func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]any) domain.FixMessageJSON { return domain.FixMessageJSON{ Direction: "OUT", MsgType: msgType, diff --git a/src/domain/persistence.go b/src/domain/persistence.go index dc06d38..e3db254 100644 --- a/src/domain/persistence.go +++ b/src/domain/persistence.go @@ -5,26 +5,20 @@ import "time" // ListTrade es la representacion exportada de un trade de List Trading. type ListTrade struct { - QuoteReqID string - ListID string - Symbol string - SecurityIDSrc string - Currency string - Side string - OrderQty string - SettlDate string - Price string - OwnerTraderID string + QuoteRequest FixMessageJSON `json:"quote_request"` + Quoted bool `json:"quoted"` + Price string `json:"price,omitempty"` } // FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento. type FixMessageJSON struct { - Direction string `json:"direction"` - MsgType string `json:"msg_type"` - QuoteReqID string `json:"quote_req_id"` - Header map[string]interface{} `json:"header"` - Body map[string]interface{} `json:"body"` - ReceiveTime time.Time `json:"receive_time"` + Direction string `json:"direction"` + MsgType string `json:"msg_type"` + QuoteReqID string `json:"quote_req_id"` + Header map[string]any `json:"header"` + Body map[string]any `json:"body"` + Trailer map[string]any `json:"trailer"` + ReceiveTime time.Time `json:"receive_time"` } // TradeMessage es una fila de qfixdpl_messages.