8 Commits

Author SHA1 Message Date
96bf917191 Quote cancel 2026-06-23 10:56:11 -03:00
298e9c39e3 Api key configuracion and quotes endpooint 2026-06-01 13:03:06 -03:00
7cc4a96a03 fix 2026-05-19 15:38:43 -03:00
1676909cbf fixes 2026-05-19 15:31:39 -03:00
d06433e0f5 add log 2026-05-19 15:15:54 -03:00
0f3ac0dd8d sending the messages by seqnum 2026-05-15 17:03:54 -03:00
4270284362 Add endpoint for all messages 2026-05-12 13:27:13 -03:00
6e46fde5d2 fixes 2026-05-11 14:27:27 -03:00
12 changed files with 505 additions and 151 deletions

View File

@ -36,7 +36,11 @@ type Service struct {
AuthorizedServices map[string]AuthorizedService `toml:"AuthorizedServices"`
APIBasePort string
EnableJWTAuth bool // Enable JWT authentication for service-to-service communication
FIX FIXConfig
// ServiceAPIKey is the shared secret that authenticates qbymarouter (and any
// other internal service) when posting to /qfixdpl/v1/quotes and
// /qfixdpl/v1/messages. Must match the DPLAPIKey configured on the caller.
ServiceAPIKey string
FIX FIXConfig
}
type FIXConfig struct {

View File

@ -328,6 +328,41 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
ctx.JSON(http.StatusOK, logs)
}
// AllMessages godoc
// @Summary List FIX application messages of the day after the caller's last-seen sequence
// @Description Returns today's FIX application messages (no admin: heartbeats/logon/logout/etc.) with MsgSeqNum greater than the caller's last-seen sequence per direction. "In" is the last MsgSeqNum the caller received on the IN side; "Out" is the same for OUT. Pass 0 to receive everything on that side. Sorted by CreatedAt ascending.
// @Tags fix
// @Accept json
// @Produce json
// @Param body body AllMessagesRequest true "API key and last-seen MsgSeqNum per direction"
// @Success 200 {array} domain.Message
// @Failure 400 {object} HTTPError
// @Failure 401 {object} HTTPError
// @Router /qfixdpl/v1/messages [post]
func (cont *Controller) AllMessages(ctx *gin.Context) {
setHeaders(ctx, cont.config)
var req AllMessagesRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
if !cont.checkServiceAPIKey(req.APIKey) {
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"})
return
}
ctx.JSON(http.StatusOK, cont.tradeProvider.GetAllMessages(req.In, req.Out))
}
// checkServiceAPIKey returns true when the provided key matches the configured
// service-to-service shared secret. Empty configured key is always rejected
// to avoid open authentication when misconfigured.
func (cont *Controller) checkServiceAPIKey(key string) bool {
return cont.config.ServiceAPIKey != "" && key == cont.config.ServiceAPIKey
}
// GetPendingQuoteRequests godoc
// @Summary List pending QuoteRequests
// @Description Returns all QuoteRequests received from TW that have not been quoted yet by the dealer
@ -354,12 +389,19 @@ func (cont *Controller) GetPendingQuoteRequests(ctx *gin.Context) {
// @Failure 500 {object} HTTPError
// @Router /qfixdpl/v1/quotes [post]
func (cont *Controller) SendQuote(ctx *gin.Context) {
setHeaders(ctx, cont.config)
var req SendQuoteRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
if !cont.checkServiceAPIKey(req.APIKey) {
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"})
return
}
price, err := decimal.NewFromString(req.Price)
if err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: "invalid price: " + err.Error()})
@ -382,3 +424,46 @@ func (cont *Controller) SendQuote(ctx *gin.Context) {
ctx.JSON(http.StatusOK, Msg{Text: "Quote sent"})
}
// CancelQuote godoc
// @Summary Cancel a Quote for a QuoteRequest
// @Description Builds and sends a QuoteCancel (35=Z) to TW for an existing QuoteRequest
// @Tags fix
// @Accept json
// @Produce json
// @Param body body CancelQuoteRequest true "Quote cancel request"
// @Success 200 {object} Msg
// @Failure 400 {object} HTTPError
// @Failure 401 {object} HTTPError
// @Failure 404 {object} HTTPError
// @Failure 409 {object} HTTPError
// @Failure 500 {object} HTTPError
// @Router /qfixdpl/v1/quotes/cancel [post]
func (cont *Controller) CancelQuote(ctx *gin.Context) {
setHeaders(ctx, cont.config)
var req CancelQuoteRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
if !cont.checkServiceAPIKey(req.APIKey) {
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"})
return
}
if err := cont.tradeProvider.CancelQuote(req.QuoteReqID, req.Text); err != nil {
msg := err.Error()
switch {
case strings.Contains(msg, "not found"):
ctx.JSON(http.StatusNotFound, HTTPError{Error: "quoteReqID not found"})
case strings.Contains(msg, "cannot respond"):
ctx.JSON(http.StatusConflict, HTTPError{Error: "quote request cannot be cancelled"})
default:
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "failed to cancel quote"})
}
return
}
ctx.JSON(http.StatusOK, Msg{Text: "Quote cancel sent"})
}

View File

@ -18,6 +18,19 @@ type Session struct {
}
type SendQuoteRequest struct {
APIKey string `json:"APIKey" binding:"required"`
QuoteReqID string `json:"QuoteReqID" binding:"required"`
Price string `json:"Price" binding:"required" example:"99.6"`
}
type CancelQuoteRequest struct {
APIKey string `json:"APIKey" binding:"required"`
QuoteReqID string `json:"QuoteReqID" binding:"required"`
Text string `json:"Text,omitempty"`
}
type AllMessagesRequest struct {
APIKey string
In int
Out int
}

View File

@ -24,7 +24,12 @@ func SetRoutes(api *API) {
qfixdpl.GET("/trades", cont.GetTrades)
qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs)
qfixdpl.GET("/quote-requests", cont.GetPendingQuoteRequests)
qfixdpl.POST("/quotes", cont.SendQuote)
// services group: API-key auth via body, no session cookie required.
services := v1.Group("/")
services.POST("/messages", cont.AllMessages)
services.POST("/quotes", cont.SendQuote)
services.POST("/quotes/cancel", cont.CancelQuote)
backoffice := qfixdpl.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser)

View File

@ -22,6 +22,8 @@ type TradeProvider interface {
GetTrades() []domain.ListTrade
GetPendingQuoteRequests() []domain.ListTrade
SendQuote(quoteReqID string, price decimal.Decimal) error
CancelQuote(quoteReqID, text string) error
GetAllMessages(inSeq, outSeq int) []domain.Message
}
const RedisMaxIdle = 3000 // In ms
@ -38,6 +40,10 @@ type Config struct {
AuthorizedServices map[string]app.AuthorizedService `toml:"AuthorizedServices"`
Port string
EnableJWTAuth bool
// ServiceAPIKey authenticates internal services (qbymarouter, etc.) calling
// /qfixdpl/v1/quotes and /qfixdpl/v1/messages. Compared against the APIKey
// field in the request body.
ServiceAPIKey string
}
func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {

View File

@ -1,12 +1,16 @@
package fix
import (
"fmt"
"log/slog"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
uuid "github.com/satori/go.uuid"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/quickfix"
@ -17,9 +21,11 @@ import (
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotecancel"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport"
"quantex.com/qfixdpl/quickfix/gen/tag"
filelog "quantex.com/qfixdpl/quickfix/log/file"
"quantex.com/qfixdpl/quickfix/store/file"
"quantex.com/qfixdpl/src/app"
@ -34,6 +40,8 @@ type listTrade struct {
Price decimal.Decimal
}
const defaultQuoteCancelText = "Quote withdrawn by dealer"
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
type Manager struct {
initiator *quickfix.Initiator
@ -42,6 +50,8 @@ type Manager struct {
sessions map[string]quickfix.SessionID
tradesMu sync.RWMutex
trades map[string]*listTrade
messagesMu sync.RWMutex
messages []domain.Message
store domain.PersistenceStore
notify domain.Notifier
cfg app.FIXConfig
@ -52,6 +62,7 @@ func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.
return &Manager{
sessions: make(map[string]quickfix.SessionID),
trades: make(map[string]*listTrade),
messages: make([]domain.Message, 0),
store: store,
notify: notify,
cfg: cfg,
@ -84,6 +95,11 @@ func (m *Manager) Start() error {
slog.Error(err.Error())
}
if err := m.loadTodayMessages(); err != nil {
err = tracerr.Errorf("failed to load today messages from DB, starting with empty list: %w", err)
slog.Error(err.Error())
}
f, err := os.Open(m.cfg.SettingsFile)
if err != nil {
err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err)
@ -255,16 +271,6 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
Quoted: false,
}
m.tradesMu.Unlock()
// Persist incoming QuoteRequest.
m.persistMessage(quoteReqID, parsed)
// Persist outgoing QuoteStatusReport.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
}))
}
// handleQuoteAck handles an incoming QuoteAck (35=CW).
@ -273,16 +279,10 @@ func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.Sessi
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
m.persistMessage(quoteReqID, parseQuoteAck(msg, m.dict))
if status != enum.QuoteAckStatus_ACCEPTED {
err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text)
slog.Error(err.Error())
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
return
}
@ -318,22 +318,9 @@ func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg, m.dict))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
}))
// _TRDSUMM is the final message — clean up the trade.
if isTrdSumm {
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
}
}
@ -372,25 +359,10 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses
slog.Info("Trade summary received from TW, cleaning up",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
m.tradesMu.Lock()
delete(m.trades, clOrdID)
m.tradesMu.Unlock()
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg, m.dict))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]any{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
}))
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
@ -428,9 +400,7 @@ func toDomainListTrade(t *listTrade) domain.ListTrade {
out := domain.ListTrade{
QuoteRequest: t.QuoteRequest,
}
if !t.Price.IsZero() {
out.Price = t.Price.String()
}
return out
}
@ -444,12 +414,6 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
slog.Error(err.Error())
return err
}
// if t.Quoted {
// m.tradesMu.Unlock()
// err := tracerr.Errorf("SendQuote: quote already sent for quoteReqID %s", quoteReqID)
// slog.Error(err.Error())
// return err
// }
sessionID := t.SessionID
if sessionID == (quickfix.SessionID{}) {
@ -530,16 +494,91 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String())
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]any{
"QuoteReqID": quoteReqID,
"QuoteID": quoteID,
"Symbol": symbol,
"Side": string(side),
"Price": price.String(),
"OrderQty": orderQty.String(),
"Currency": currency,
"SettlDate": settlDate,
}))
return nil
}
// CancelQuote builds and sends a QuoteCancel (35=Z) for an existing QuoteRequest.
func (m *Manager) CancelQuote(quoteReqID, text string) error {
m.tradesMu.RLock()
t, ok := m.trades[quoteReqID]
if !ok {
m.tradesMu.RUnlock()
err := tracerr.Errorf("CancelQuote: quoteReqID %s not found", quoteReqID)
slog.Error(err.Error())
return err
}
sessionID := t.SessionID
if sessionID == (quickfix.SessionID{}) {
sessionID = m.anyActiveSessionID()
if sessionID == (quickfix.SessionID{}) {
m.tradesMu.RUnlock()
err := tracerr.Errorf("CancelQuote: no active FIX session for quoteReqID %s", quoteReqID)
slog.Error(err.Error())
return err
}
}
quoteRequestBody := t.QuoteRequest.Body
m.tradesMu.RUnlock()
ownerTraderID := quoteRequestString(quoteRequestBody, "OwnerTraderID")
canRespond, hasCanRespond, rawCanRespond := quoteRequestCanRespond(quoteRequestBody)
canRespondLogValue := rawCanRespond
if canRespondLogValue == "" {
canRespondLogValue = "missing"
}
if !hasCanRespond {
slog.Warn("CancelQuote: CanRespond missing or unparseable, sending QuoteCancel anyway",
"quoteReqID", quoteReqID,
"canRespond", canRespondLogValue,
)
} else if !canRespond {
err := tracerr.Errorf("CancelQuote: quoteReqID %s cannot respond", quoteReqID)
slog.Error(err.Error(),
"quoteReqID", quoteReqID,
"canRespond", canRespondLogValue,
)
return err
} else {
slog.Info("CancelQuote: CanRespond allows QuoteCancel",
"quoteReqID", quoteReqID,
"canRespond", canRespondLogValue,
)
}
quoteID := uuid.NewV4().String()
text = strings.TrimSpace(text)
if text == "" {
text = defaultQuoteCancelText
}
qc := quotecancel.New(
field.NewQuoteReqID(quoteReqID),
field.NewQuoteCancelType(enum.QuoteCancelType_CANCEL_SPECIFIED_SINGLE_QUOTE),
)
qc.SetQuoteID(quoteID)
qc.SetTransactTime(time.Now().UTC())
qc.SetText(text)
if ownerTraderID != "" {
qc.SetOwnerTraderID(ownerTraderID)
}
if sendErr := quickfix.SendToTarget(qc, sessionID); sendErr != nil {
sendErr = tracerr.Errorf("CancelQuote: failed to send quote cancel (quoteReqID=%s, quoteID=%s): %w", quoteReqID, quoteID, sendErr)
slog.Error(sendErr.Error())
return sendErr
}
slog.Info("QuoteCancel sent",
"quoteReqID", quoteReqID,
"quoteID", quoteID,
"canRespond", canRespondLogValue,
"ownerTraderID", ownerTraderID,
"text", text,
)
return nil
}
@ -549,11 +588,22 @@ func firstGroup(body map[string]any, name string) map[string]any {
if body == nil {
return nil
}
groups, ok := body[name].([]map[string]any)
if !ok || len(groups) == 0 {
return nil
switch groups := body[name].(type) {
case []map[string]any:
if len(groups) == 0 {
return nil
}
return groups[0]
case []any:
if len(groups) == 0 {
return nil
}
group, _ := groups[0].(map[string]any)
return group
}
return groups[0]
return nil
}
// getString reads a string value from a body map, tolerating nil maps and missing keys.
@ -567,6 +617,53 @@ func getString(body map[string]any, name string) string {
return ""
}
func quoteRequestString(body map[string]any, name string) string {
if v := getString(body, name); v != "" {
return v
}
return getString(firstGroup(body, "NoRelatedSym"), name)
}
func quoteRequestCanRespond(body map[string]any) (bool, bool, string) {
if value, ok := body["CanRespond"]; ok {
return parseCanRespondValue(value)
}
if value, ok := firstGroup(body, "NoRelatedSym")["CanRespond"]; ok {
return parseCanRespondValue(value)
}
return false, false, ""
}
func parseCanRespondValue(value any) (bool, bool, string) {
switch v := value.(type) {
case bool:
if v {
return true, true, "true"
}
return false, true, "false"
case string:
normalized := strings.ToUpper(strings.TrimSpace(v))
switch normalized {
case "Y", "YES", "TRUE", "1":
return true, true, v
case "N", "NO", "FALSE", "0":
return false, true, v
default:
return false, false, v
}
case int:
return v != 0, true, fmt.Sprint(v)
case int64:
return v != 0, true, fmt.Sprint(v)
case float64:
return v != 0, true, fmt.Sprint(v)
default:
return false, false, ""
}
}
// getDecimal reads a decimal value from a body map. Numeric FIX types come through
// as strings (e.g. "10000"); INT-typed counts may be int. Both are accepted.
func getDecimal(body map[string]any, name string) decimal.Decimal {
@ -592,7 +689,11 @@ func (m *Manager) anyActiveSessionID() quickfix.SessionID {
return quickfix.SessionID{}
}
// handleRawMessage persists raw FIX message strings to the logs table.
// handleRawMessage is the single ingest path for every application-level FIX message
// (admin messages — Logon, Logout, Heartbeat, TestRequest, ResendRequest, SequenceReset —
// go through ToAdmin/FromAdmin and never reach this callback). It persists the raw
// envelope to the logs table, builds a structured Message and saves it to
// the messages table, and appends to the in-memory list.
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
quoteReqID := extractIdentifier(msg)
@ -603,17 +704,77 @@ func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
err = tracerr.Errorf("failed to persist raw log: %w", err)
slog.Error(err.Error())
}
}
// persistMessage saves a structured FIX message to the messages table.
func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) {
if err := m.store.SaveMessage(domain.TradeMessage{
QuoteReqID: quoteReqID,
JMessage: fixJSON,
}); err != nil {
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", fixJSON.MsgType, quoteReqID, err)
msgTypeBytes, _ := msg.Header.GetBytes(tag.MsgType)
msgType := string(msgTypeBytes)
senderCompID, msgSeqNum, sendingTime := extractHeaderMeta(msg)
fixJSON := buildFixMessageJSON(direction, msgType, quoteReqID, msg, m.dict)
stored := domain.Message{
ID: uuid.NewV4().String(),
SenderCompID: senderCompID,
MsgSeqNum: msgSeqNum,
SendingTime: sendingTime,
CreatedAt: time.Now(),
JMessage: fixJSON,
}
if err := m.store.SaveMessage(stored); err != nil {
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", msgType, quoteReqID, err)
slog.Error(err.Error())
}
m.messagesMu.Lock()
m.messages = append(m.messages, stored)
m.messagesMu.Unlock()
}
// GetAllMessages returns today's FIX application messages with MsgSeqNum greater than
// the caller's last-seen sequence per direction (inSeq for IN, outSeq for OUT), sorted
// ascending by CreatedAt. Passing 0 for either cursor returns all messages on that side.
func (m *Manager) GetAllMessages(inSeq, outSeq int) []domain.Message {
m.messagesMu.RLock()
log.Info().Msgf("request received, inSeq: %d, outSeq: %d", inSeq, outSeq)
filtered := make([]domain.Message, 0, len(m.messages))
for _, msg := range m.messages {
switch msg.JMessage.Direction {
case "IN":
if msg.MsgSeqNum > inSeq {
filtered = append(filtered, msg)
}
case "OUT":
if msg.MsgSeqNum > outSeq {
filtered = append(filtered, msg)
}
}
}
m.messagesMu.RUnlock()
sort.Slice(filtered, func(i, j int) bool { return filtered[i].CreatedAt.Before(filtered[j].CreatedAt) })
log.Info().Msgf("messages sent: %d", len(filtered))
return filtered
}
// loadTodayMessages rebuilds the in-memory message list from today's rows in the DB.
// Must be called before the FIX initiator starts so live ingest doesn't race with replay.
func (m *Manager) loadTodayMessages() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
m.messagesMu.Lock()
m.messages = messages
m.messagesMu.Unlock()
slog.Info("today messages loaded", "count", len(messages))
return nil
}
// loadActiveTrades reconstructs active trades from today's messages in the database.
@ -626,9 +787,11 @@ func (m *Manager) loadActiveTrades() error {
activeTrades := make(map[string]*listTrade)
for _, msg := range messages {
quoteReqID := msg.JMessage.QuoteReqID
switch msg.JMessage.MsgType {
case "R": // QuoteRequest -> trade is born
if !strings.HasPrefix(msg.QuoteReqID, "LST_") {
if !strings.HasPrefix(quoteReqID, "LST_") {
continue
}
@ -640,31 +803,15 @@ func (m *Manager) loadActiveTrades() error {
continue
}
activeTrades[msg.QuoteReqID] = &listTrade{
activeTrades[quoteReqID] = &listTrade{
QuoteRequest: msg.JMessage,
}
case "S": // Outgoing Quote — dealer has already quoted this trade
if t, ok := activeTrades[msg.QuoteReqID]; ok {
if t, ok := activeTrades[quoteReqID]; ok {
t.Quoted = true
t.Price = getDecimal(msg.JMessage.Body, "Price")
}
case "CW": // QuoteAck — if rejected, trade is dead
if getString(msg.JMessage.Body, "QuoteAckStatus") != string(enum.QuoteAckStatus_ACCEPTED) {
delete(activeTrades, msg.QuoteReqID)
}
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
if strings.HasSuffix(getString(msg.JMessage.Body, "QuoteRespID"), "_TRDSUMM") {
delete(activeTrades, msg.QuoteReqID)
}
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
body := msg.JMessage.Body
if strings.Contains(getString(body, "ExecID"), "_TRDSUMM") {
delete(activeTrades, getString(body, "ClOrdID"))
}
}
}

View File

@ -0,0 +1,82 @@
package fix
import "testing"
func TestQuoteRequestCanRespond(t *testing.T) {
t.Parallel()
tests := []struct {
name string
body map[string]any
want bool
wantOK bool
wantRaw string
}{
{
name: "top-level bool true",
body: map[string]any{"CanRespond": true},
want: true,
wantOK: true,
wantRaw: "true",
},
{
name: "group string yes",
body: map[string]any{
"NoRelatedSym": []map[string]any{{"CanRespond": "Y"}},
},
want: true,
wantOK: true,
wantRaw: "Y",
},
{
name: "json decoded group string yes",
body: map[string]any{
"NoRelatedSym": []any{map[string]any{"CanRespond": "Y"}},
},
want: true,
wantOK: true,
wantRaw: "Y",
},
{
name: "group false",
body: map[string]any{
"NoRelatedSym": []map[string]any{{"CanRespond": false}},
},
want: false,
wantOK: true,
wantRaw: "false",
},
{
name: "missing",
body: map[string]any{},
want: false,
wantOK: false,
wantRaw: "",
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got, gotOK, gotRaw := quoteRequestCanRespond(tc.body)
if got != tc.want || gotOK != tc.wantOK || gotRaw != tc.wantRaw {
t.Fatalf("quoteRequestCanRespond() = (%v, %v, %q), want (%v, %v, %q)",
got, gotOK, gotRaw, tc.want, tc.wantOK, tc.wantRaw)
}
})
}
}
func TestQuoteRequestStringFallsBackToNoRelatedSym(t *testing.T) {
t.Parallel()
body := map[string]any{
"NoRelatedSym": []map[string]any{{"OwnerTraderID": "dealer-1"}},
}
if got := quoteRequestString(body, "OwnerTraderID"); got != "dealer-1" {
t.Fatalf("quoteRequestString() = %q, want dealer-1", got)
}
}

View File

@ -5,10 +5,7 @@ import (
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/datadictionary"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/tag"
"quantex.com/qfixdpl/src/domain"
)
@ -50,21 +47,6 @@ func parseQuoteRequest(msg quoterequest.QuoteRequest, dd *datadictionary.DataDic
return buildFixMessageJSON("IN", "R", quoteReqID, msg.Message, dd)
}
func parseQuoteAck(msg quoteack.QuoteAck, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
return buildFixMessageJSON("IN", "CW", quoteReqID, msg.Message, dd)
}
func parseQuoteResponse(msg quoteresponse.QuoteResponse, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
return buildFixMessageJSON("IN", "AJ", quoteReqID, msg.Message, dd)
}
func parseExecutionReport(msg executionreport.ExecutionReport, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
clOrdID, _ := msg.GetClOrdID()
return buildFixMessageJSON("IN", "8", clOrdID, msg.Message, dd)
}
// extractIdentifier extracts the trade identifier from a parsed FIX message.
// For ExecutionReport (8) and ExecutionAck (BN), uses ClOrdID (tag 11).
// For all other message types, uses QuoteReqID (tag 131).
@ -87,12 +69,17 @@ func extractIdentifier(msg *quickfix.Message) string {
return ""
}
func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]any) domain.FixMessageJSON {
return domain.FixMessageJSON{
Direction: "OUT",
MsgType: msgType,
QuoteReqID: quoteReqID,
Body: body,
ReceiveTime: time.Now(),
// extractHeaderMeta reads SenderCompID (49), MsgSeqNum (34) and SendingTime (52)
// from a quickfix.Message header. Returns zero values when a field is absent.
func extractHeaderMeta(msg *quickfix.Message) (senderCompID string, msgSeqNum int, sendingTime time.Time) {
if s, err := msg.Header.GetString(tag.SenderCompID); err == nil {
senderCompID = s
}
if n, err := msg.Header.GetInt(tag.MsgSeqNum); err == nil {
msgSeqNum = n
}
if t, err := msg.Header.GetTime(tag.SendingTime); err == nil {
sendingTime = t
}
return
}

View File

@ -1,11 +1,12 @@
CREATE TABLE IF NOT EXISTS qfixdpl_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quote_req_id TEXT NOT NULL,
j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
id UUID PRIMARY KEY,
sender_comp_id TEXT NOT NULL,
msg_seq_num BIGINT NOT NULL,
j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
CREATE INDEX IF NOT EXISTS idx_messages_session ON qfixdpl_messages(sender_comp_id, msg_seq_num);
CREATE TABLE IF NOT EXISTS qfixdpl_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

View File

@ -2,6 +2,7 @@ package store
import (
"encoding/json"
"strconv"
"strings"
"time"
@ -9,15 +10,20 @@ import (
"quantex.com/qfixdpl/src/domain"
)
func (p *Store) SaveMessage(msg domain.TradeMessage) error {
func (p *Store) SaveMessage(msg domain.Message) error {
jsonBytes, err := json.Marshal(msg.JMessage)
if err != nil {
return tracerr.Errorf("error marshaling j_message: %w", err)
}
_, err = p.db.Exec(
"INSERT INTO qfixdpl_messages (quote_req_id, j_message) VALUES ($1, $2)",
msg.QuoteReqID, string(jsonBytes),
`INSERT INTO qfixdpl_messages (id, sender_comp_id, msg_seq_num, j_message, created_at)
VALUES ($1, $2, $3, $4, $5)`,
msg.ID,
msg.SenderCompID,
strconv.Itoa(msg.MsgSeqNum),
string(jsonBytes),
msg.CreatedAt.UTC().Format(time.RFC3339Nano),
)
if err != nil {
return tracerr.Errorf("error inserting message: %w", err)
@ -41,25 +47,29 @@ func (p *Store) SaveLog(entry domain.LogEntry) error {
return nil
}
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
func (p *Store) GetTodayMessages() ([]domain.Message, error) {
rows, err := p.db.Query(
"SELECT id, quote_req_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
`SELECT id, sender_comp_id, msg_seq_num, j_message, created_at
FROM qfixdpl_messages
WHERE created_at >= current_date
ORDER BY created_at ASC`,
)
if err != nil {
return nil, tracerr.Errorf("error querying today messages: %w", err)
}
defer rows.Close()
var messages []domain.TradeMessage
var messages []domain.Message
for rows.Next() {
var (
id, quoteReqID string
jMessageRaw []byte
createdAt time.Time
id, senderCompID string
msgSeqNum int
jMessageRaw []byte
createdAt time.Time
)
if err := rows.Scan(&id, &quoteReqID, &jMessageRaw, &createdAt); err != nil {
if err := rows.Scan(&id, &senderCompID, &msgSeqNum, &jMessageRaw, &createdAt); err != nil {
return nil, tracerr.Errorf("error scanning message row: %w", err)
}
@ -68,11 +78,22 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
}
messages = append(messages, domain.TradeMessage{
ID: id,
QuoteReqID: quoteReqID,
JMessage: jMessage,
CreatedAt: createdAt,
sendingTime, _ := jMessage.Header["SendingTime"].(time.Time)
if sendingTime.IsZero() {
if s, ok := jMessage.Header["SendingTime"].(string); ok {
if t, parseErr := time.Parse(time.RFC3339Nano, s); parseErr == nil {
sendingTime = t
}
}
}
messages = append(messages, domain.Message{
ID: id,
SenderCompID: senderCompID,
MsgSeqNum: msgSeqNum,
SendingTime: sendingTime,
CreatedAt: createdAt,
JMessage: jMessage,
})
}

View File

@ -51,6 +51,7 @@ func Runner(cfg app.Config) error {
External: cfg.External,
AuthorizedServices: cfg.AuthorizedServices,
EnableJWTAuth: cfg.EnableJWTAuth,
ServiceAPIKey: cfg.ServiceAPIKey,
}
api := rest.New(userData, appStore, fixManager, apiConfig, notify)

View File

@ -6,7 +6,6 @@ import "time"
// ListTrade es la representacion exportada de un trade de List Trading.
type ListTrade struct {
QuoteRequest FixMessageJSON `json:"quote_request"`
Price string `json:"price,omitempty"`
}
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
@ -20,12 +19,15 @@ type FixMessageJSON struct {
ReceiveTime time.Time `json:"receive_time"`
}
// TradeMessage es una fila de qfixdpl_messages.
type TradeMessage struct {
ID string `json:"id"`
QuoteReqID string `json:"quote_req_id"`
JMessage FixMessageJSON `json:"j_message"`
CreatedAt time.Time `json:"created_at"`
// Message es una fila de qfixdpl_messages, con la metadata del header FIX hoisted
// para que los consumidores puedan ordenar/filtrar sin parsear el JSON.
type Message struct {
ID string `json:"id"`
SenderCompID string `json:"sender_comp_id"`
MsgSeqNum int `json:"msg_seq_num"`
SendingTime time.Time `json:"sending_time"`
CreatedAt time.Time `json:"created_at"`
JMessage FixMessageJSON `json:"j_message"`
}
// LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs.
@ -41,8 +43,8 @@ type Logs struct {
// PersistenceStore define la interfaz de persistencia.
type PersistenceStore interface {
SaveMessage(msg TradeMessage) error
SaveMessage(msg Message) error
SaveLog(entry LogEntry) error
GetTodayMessages() ([]TradeMessage, error)
GetTodayMessages() ([]Message, error)
GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
}