3 Commits

Author SHA1 Message Date
36b841fc66 pascal case 2026-05-06 14:11:50 -03:00
68238d309a adding endpoints 2026-05-06 11:56:12 -03:00
15a60bac92 handling errors 2026-05-05 15:34:01 -03:00
13 changed files with 483 additions and 1142 deletions

View File

@ -62,7 +62,7 @@ linux-build: check-env swag # Build a linux version for prod environment. Set e=
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e) env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
deploy: # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=dpl deploy: # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=dpl
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/ make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/dpl/
fmt: download-versions # Apply the Go formatter to the code fmt: download-versions # Apply the Go formatter to the code
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt); cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);

View File

@ -4,12 +4,14 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strings"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/sasha-s/go-deadlock" "github.com/sasha-s/go-deadlock"
uuid "github.com/satori/go.uuid" uuid "github.com/satori/go.uuid"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/src/app" "quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version" "quantex.com/qfixdpl/src/app/version"
@ -293,7 +295,7 @@ func allowed(origin string, config Config) bool {
// GetTrades godoc // GetTrades godoc
// @Summary List active trades // @Summary List active trades
// @Description Returns only active List Trading trades // @Description Returns all active List Trading trades
// @Tags fix // @Tags fix
// @Produce json // @Produce json
// @Success 200 {array} domain.ListTrade // @Success 200 {array} domain.ListTrade
@ -303,18 +305,6 @@ func (cont *Controller) GetTrades(ctx *gin.Context) {
ctx.JSON(http.StatusOK, trades) ctx.JSON(http.StatusOK, trades)
} }
// GetAllTrades godoc
// @Summary List all trades
// @Description Returns all List Trading trades (active, rejected, completed)
// @Tags fix
// @Produce json
// @Success 200 {array} domain.ListTrade
// @Router /qfixdpl/v1/trades/all [get]
func (cont *Controller) GetAllTrades(ctx *gin.Context) {
trades := cont.tradeProvider.GetAllTrades()
ctx.JSON(http.StatusOK, trades)
}
// GetLogs godoc // GetLogs godoc
// @Summary Get raw FIX logs for a trade // @Summary Get raw FIX logs for a trade
// @Description Returns raw FIX message logs for a given QuoteReqID // @Description Returns raw FIX message logs for a given QuoteReqID
@ -328,7 +318,8 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID) logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID)
if err != nil { if err != nil {
slog.Error("GetLogs: error fetching logs", "quoteReqID", quoteReqID, "error", err) err = tracerr.Errorf("GetLogs: error fetching logs (quoteReqID=%s): %w", quoteReqID, err)
slog.Error(err.Error())
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"}) ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
return return
@ -337,25 +328,57 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
ctx.JSON(http.StatusOK, logs) ctx.JSON(http.StatusOK, logs)
} }
// GetFullTradeLog godoc // GetPendingQuoteRequests godoc
// @Summary Get full trade lifecycle log // @Summary List pending QuoteRequests
// @Description Returns DPL and Post-Trade raw FIX logs for a given trade (by QuoteReqID) // @Description Returns all QuoteRequests received from TW that have not been quoted yet by the dealer
// @Tags fix // @Tags fix
// @Produce json // @Produce json
// @Param quoteReqID path string true "QuoteReqID" // @Success 200 {array} domain.ListTrade
// @Success 200 {object} domain.FullTradeLog // @Router /qfixdpl/v1/quote-requests [get]
// @Router /qfixdpl/v1/trades/{quoteReqID}/full-log [get] func (cont *Controller) GetPendingQuoteRequests(ctx *gin.Context) {
func (cont *Controller) GetFullTradeLog(ctx *gin.Context) { pending := cont.tradeProvider.GetPendingQuoteRequests()
quoteReqID := ctx.Param("quoteReqID") ctx.JSON(http.StatusOK, pending)
}
fullLog, err := cont.store.GetFullTradeLog(quoteReqID)
if err != nil {
slog.Error("GetFullTradeLog: error fetching full trade log", "quoteReqID", quoteReqID, "error", err)
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching full trade log"})
// SendQuote godoc
// @Summary Send a Quote for a pending QuoteRequest
// @Description Builds and sends a Quote (35=S) to TW for an existing QuoteRequest at the given price
// @Tags fix
// @Accept json
// @Produce json
// @Param body body SendQuoteRequest true "Quote to send"
// @Success 200 {object} Msg
// @Failure 400 {object} HTTPError
// @Failure 404 {object} HTTPError
// @Failure 409 {object} HTTPError
// @Failure 500 {object} HTTPError
// @Router /qfixdpl/v1/quotes [post]
func (cont *Controller) SendQuote(ctx *gin.Context) {
var req SendQuoteRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return return
} }
ctx.JSON(http.StatusOK, fullLog) price, err := decimal.NewFromString(req.Price)
if err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: "invalid price: " + err.Error()})
return
}
if err := cont.tradeProvider.SendQuote(req.QuoteReqID, price); err != nil {
msg := err.Error()
switch {
case strings.Contains(msg, "not found"):
ctx.JSON(http.StatusNotFound, HTTPError{Error: "quoteReqID not found"})
case strings.Contains(msg, "already sent"):
ctx.JSON(http.StatusConflict, HTTPError{Error: "quote already sent for this quoteReqID"})
default:
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "failed to send quote"})
}
return
}
ctx.JSON(http.StatusOK, Msg{Text: "Quote sent"})
} }

View File

@ -1,6 +1,5 @@
package rest package rest
type HTTPError struct { type HTTPError struct {
Error string Error string
} }
@ -18,3 +17,7 @@ type Session struct {
Email string Email string
} }
type SendQuoteRequest struct {
QuoteReqID string `json:"QuoteReqID" binding:"required"`
Price string `json:"Price" binding:"required" example:"99.6"`
}

View File

@ -22,8 +22,9 @@ func SetRoutes(api *API) {
qfixdpl.Use(cont.AuthRequired) qfixdpl.Use(cont.AuthRequired)
qfixdpl.GET("/health", cont.HealthCheck) qfixdpl.GET("/health", cont.HealthCheck)
qfixdpl.GET("/trades", cont.GetTrades) qfixdpl.GET("/trades", cont.GetTrades)
qfixdpl.GET("/trades/all", cont.GetAllTrades) qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs)
qfixdpl.GET("/trades/:quoteReqID/full-log", cont.GetFullTradeLog) qfixdpl.GET("/quote-requests", cont.GetPendingQuoteRequests)
qfixdpl.POST("/quotes", cont.SendQuote)
backoffice := qfixdpl.Group("/backoffice") backoffice := qfixdpl.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser) backoffice.Use(cont.BackOfficeUser)

View File

@ -7,6 +7,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/src/app" "quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version" "quantex.com/qfixdpl/src/app/version"
@ -19,7 +20,8 @@ import (
// TradeProvider exposes trade data from the FIX manager. // TradeProvider exposes trade data from the FIX manager.
type TradeProvider interface { type TradeProvider interface {
GetTrades() []domain.ListTrade GetTrades() []domain.ListTrade
GetAllTrades() []domain.ListTrade GetPendingQuoteRequests() []domain.ListTrade
SendQuote(quoteReqID string, price decimal.Decimal) error
} }
const RedisMaxIdle = 3000 // In ms const RedisMaxIdle = 3000 // In ms

View File

@ -7,7 +7,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/rs/zerolog/log"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"quantex.com/qfixdpl/quickfix" "quantex.com/qfixdpl/quickfix"
@ -29,7 +28,6 @@ import (
type listTrade struct { type listTrade struct {
QuoteReqID string QuoteReqID string
TradeID string
ListID string ListID string
Symbol string Symbol string
SecurityIDSrc enum.SecurityIDSource SecurityIDSrc enum.SecurityIDSource
@ -40,7 +38,7 @@ type listTrade struct {
Price decimal.Decimal Price decimal.Decimal
OwnerTraderID string OwnerTraderID string
SessionID quickfix.SessionID SessionID quickfix.SessionID
Status domain.TradeStatus Quoted bool
} }
// Manager wraps the QuickFIX initiator and implements domain.FIXSender. // Manager wraps the QuickFIX initiator and implements domain.FIXSender.
@ -78,14 +76,15 @@ func (m *Manager) Start() error {
fixApp.onRawMessage = m.handleRawMessage fixApp.onRawMessage = m.handleRawMessage
m.app = fixApp m.app = fixApp
if err := m.loadTrades(); err != nil { if err := m.loadActiveTrades(); err != nil {
slog.Error("failed to load active trades from DB, starting with empty state", "error", err) err = tracerr.Errorf("failed to load active trades from DB, starting with empty state: %w", err)
slog.Error(err.Error())
} }
f, err := os.Open(m.cfg.SettingsFile) f, err := os.Open(m.cfg.SettingsFile)
if err != nil { if err != nil {
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err) err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
@ -93,8 +92,8 @@ func (m *Manager) Start() error {
settings, err := quickfix.ParseSettings(f) settings, err := quickfix.ParseSettings(f)
if err != nil { if err != nil {
err = tracerr.Errorf("error parsing FIX settings: %s", err) err = tracerr.Errorf("error parsing FIX settings: %w", err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
@ -102,16 +101,16 @@ func (m *Manager) Start() error {
storeFactory := file.NewStoreFactory(settings) storeFactory := file.NewStoreFactory(settings)
logFactory, err := filelog.NewLogFactory(settings) logFactory, err := filelog.NewLogFactory(settings)
if err != nil { if err != nil {
err = tracerr.Errorf("error creating file log factory: %s", err) err = tracerr.Errorf("error creating file log factory: %w", err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory) initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
if err != nil { if err != nil {
err = tracerr.Errorf("error creating FIX initiator: %s", err) err = tracerr.Errorf("error creating FIX initiator: %w", err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
@ -119,8 +118,8 @@ func (m *Manager) Start() error {
m.initiator = initiator m.initiator = initiator
if err = m.initiator.Start(); err != nil { if err = m.initiator.Start(); err != nil {
err = tracerr.Errorf("error starting FIX initiator: %s", err) err = tracerr.Errorf("error starting FIX initiator: %w", err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
@ -141,18 +140,6 @@ func (m *Manager) onLogon(sessionID quickfix.SessionID) {
m.sessionsMu.Lock() m.sessionsMu.Lock()
m.sessions[sessionID.String()] = sessionID m.sessions[sessionID.String()] = sessionID
m.sessionsMu.Unlock() m.sessionsMu.Unlock()
// Assign the new session to all recovered trades that have no session yet.
// This covers the case where the service was restarted mid-trade: loadTrades()
// reconstructs the trade data but cannot recover the SessionID from the DB.
// Since this is a single-session initiator, all active trades belong to this session.
m.tradesMu.Lock()
for _, trade := range m.trades {
if trade.Status == domain.TradeStatusActive && trade.SessionID == (quickfix.SessionID{}) {
trade.SessionID = sessionID
}
}
m.tradesMu.Unlock()
} }
func (m *Manager) onLogout(sessionID quickfix.SessionID) { func (m *Manager) onLogout(sessionID quickfix.SessionID) {
@ -208,7 +195,9 @@ func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID qu
func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) { func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) {
quoteReqID, err := msg.GetQuoteReqID() quoteReqID, err := msg.GetQuoteReqID()
if err != nil { if err != nil {
slog.Error("handleQuoteRequest: missing QuoteReqID", "error", err.Error()) err := tracerr.Errorf("handleQuoteRequest: missing QuoteReqID: %w", err)
slog.Error(err.Error())
return return
} }
@ -249,21 +238,258 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
return return
} }
// Step 1: Send QuoteStatusReport (35=AI) to acknowledge the inquiry. // Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil { if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil {
slog.Error("handleQuoteRequest: failed to send QuoteStatusReport", "quoteReqID", quoteReqID, "error", ackErr.Error()) ackErr = tracerr.Errorf("handleQuoteRequest: failed to send QuoteStatusReport (quoteReqID=%s): %w", quoteReqID, ackErr)
slog.Error(ackErr.Error())
return return
} }
slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID) slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID)
// Step 2: Build and send Quote (35=S) with price.
price := decimal.NewFromFloat(99.6)
sIDSource := enum.SecurityIDSource_ISIN_NUMBER sIDSource := enum.SecurityIDSource_ISIN_NUMBER
if secIDSource == enum.SecurityIDSource_CUSIP { if secIDSource == enum.SecurityIDSource_CUSIP {
sIDSource = enum.SecurityIDSource_CUSIP sIDSource = enum.SecurityIDSource_CUSIP
} }
// Store trade state as pending; Quote (35=S) is sent later via REST endpoint.
m.tradesMu.Lock()
m.trades[quoteReqID] = &listTrade{
QuoteReqID: quoteReqID,
ListID: listID,
Symbol: symbol,
SecurityIDSrc: sIDSource,
Currency: currency,
Side: side,
OrderQty: orderQty,
SettlDate: settlDate,
OwnerTraderID: ownerTraderID,
SessionID: sessionID,
Quoted: false,
}
m.tradesMu.Unlock()
// Persist structured message (outside mutex).
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
// Persist outgoing QuoteStatusReport.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
}))
}
// handleQuoteAck handles an incoming QuoteAck (35=CW).
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
m.persistMessage(quoteReqID, parseQuoteAck(msg))
if status != enum.QuoteAckStatus_ACCEPTED {
err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text)
slog.Error(err.Error())
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
return
}
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
}
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespID, _ := msg.GetQuoteRespID()
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
return
}
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
// Always send ACK regardless of whether the trade is in our map.
// TW will keep retrying until it receives an ACK.
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleQuoteResponse: failed to send ACK (quoteReqID=%s, quoteRespID=%s): %w", quoteReqID, quoteRespID, ackErr)
slog.Error(ackErr.Error())
return
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
}))
// _TRDSUMM is the final message — clean up the trade.
if isTrdSumm {
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
}
}
// handleExecutionReport handles an incoming ExecutionReport (35=8).
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
clOrdID, _ := msg.GetClOrdID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
listID, _ := msg.GetListID()
slog.Info("handleExecutionReport received",
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
)
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleExecutionReport: failed to send ExecutionAck (execID=%s): %w", execID, ackErr)
slog.Error(ackErr.Error())
} else {
slog.Info("ExecutionAck sent", "execID", execID)
}
switch {
case strings.Contains(execID, "_LISTEND"):
slog.Info("List ended (due-in closed), awaiting trade result from TW",
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW, cleaning up",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
m.tradesMu.Lock()
delete(m.trades, clOrdID)
m.tradesMu.Unlock()
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
}))
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}
// GetTrades returns a snapshot of all active trades.
func (m *Manager) GetTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
trades = append(trades, toDomainListTrade(t))
}
return trades
}
// GetPendingQuoteRequests returns trades that have received a QuoteRequest but not yet been quoted by the dealer.
func (m *Manager) GetPendingQuoteRequests() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
pending := make([]domain.ListTrade, 0)
for _, t := range m.trades {
if !t.Quoted {
pending = append(pending, toDomainListTrade(t))
}
}
return pending
}
func toDomainListTrade(t *listTrade) domain.ListTrade {
return domain.ListTrade{
QuoteReqID: t.QuoteReqID,
ListID: t.ListID,
Symbol: t.Symbol,
SecurityIDSrc: string(t.SecurityIDSrc),
Currency: t.Currency,
Side: string(t.Side),
OrderQty: t.OrderQty.String(),
SettlDate: t.SettlDate,
Price: t.Price.String(),
OwnerTraderID: t.OwnerTraderID,
}
}
// SendQuote builds and sends a Quote (35=S) for an existing pending QuoteRequest at the given price.
func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
m.tradesMu.Lock()
t, ok := m.trades[quoteReqID]
if !ok {
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: quoteReqID %s not found", quoteReqID)
slog.Error(err.Error())
return err
}
if t.Quoted {
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: quote already sent for quoteReqID %s", quoteReqID)
slog.Error(err.Error())
return err
}
sessionID := t.SessionID
if sessionID == (quickfix.SessionID{}) {
sessionID = m.anyActiveSessionID()
if sessionID == (quickfix.SessionID{}) {
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: no active FIX session for quoteReqID %s", quoteReqID)
slog.Error(err.Error())
return err
}
}
symbol := t.Symbol
sIDSource := t.SecurityIDSrc
currency := t.Currency
side := t.Side
orderQty := t.OrderQty
settlDate := t.SettlDate
ownerTraderID := t.OwnerTraderID
m.tradesMu.Unlock()
quoteID := quoteReqID quoteID := quoteReqID
q := quote.New( q := quote.New(
field.NewQuoteID(quoteID), field.NewQuoteID(quoteID),
@ -305,41 +531,20 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
} }
if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil { if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil {
slog.Error("handleQuoteRequest: failed to send quote", "quoteReqID", quoteReqID, "error", sendErr.Error()) sendErr = tracerr.Errorf("SendQuote: failed to send quote (quoteReqID=%s): %w", quoteReqID, sendErr)
return slog.Error(sendErr.Error())
return sendErr
} }
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol)
// Store trade state for subsequent steps.
m.tradesMu.Lock() m.tradesMu.Lock()
m.trades[quoteReqID] = &listTrade{ if t, ok := m.trades[quoteReqID]; ok {
QuoteReqID: quoteReqID, t.Price = price
ListID: listID, t.Quoted = true
Symbol: symbol,
SecurityIDSrc: sIDSource,
Currency: currency,
Side: side,
OrderQty: orderQty,
SettlDate: settlDate,
Price: price,
OwnerTraderID: ownerTraderID,
SessionID: sessionID,
Status: domain.TradeStatusActive,
} }
m.tradesMu.Unlock() m.tradesMu.Unlock()
// Persist structured message (outside mutex). slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String())
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
// Persist outgoing QuoteStatusReport.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
}))
// Persist outgoing Quote.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{ m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID, "QuoteReqID": quoteReqID,
"QuoteID": quoteID, "QuoteID": quoteID,
@ -350,206 +555,17 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
"Currency": currency, "Currency": currency,
"SettlDate": settlDate, "SettlDate": settlDate,
})) }))
return nil
} }
// handleQuoteAck handles an incoming QuoteAck (35=CW). func (m *Manager) anyActiveSessionID() quickfix.SessionID {
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) { m.sessionsMu.RLock()
quoteReqID, _ := msg.GetQuoteReqID() defer m.sessionsMu.RUnlock()
status, _ := msg.GetQuoteAckStatus() for _, s := range m.sessions {
text, _ := msg.GetText() return s
m.persistMessage(quoteReqID, parseQuoteAck(msg))
// QuoteAckStatus only has two defined values in TW DPL:
// 1 = Accepted — quote delivered to client.
// 2 = Rejected — format error, late quote, viewer busy, or race condition.
if status == enum.QuoteAckStatus_REJECTED {
slog.Error("handleQuoteAck: quote rejected by TW", "quoteReqID", quoteReqID, "quoteAckStatus", string(status), "text", text)
m.tradesMu.Lock()
if t, ok := m.trades[quoteReqID]; ok {
t.Status = domain.TradeStatusRejected
}
m.tradesMu.Unlock()
return
}
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
}
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespID, _ := msg.GetQuoteRespID()
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
return
}
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
// Always send ACK regardless of whether the trade is in our map.
// TW will keep retrying until it receives an ACK.
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
slog.Error("handleQuoteResponse: failed to send ACK", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID, "error", ackErr.Error())
return
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
}))
// _TRDSUMM is the final message — mark trade as completed.
if isTrdSumm {
slog.Info("Trade summary received, marking completed", "quoteReqID", quoteReqID)
m.tradesMu.Lock()
if t, ok := m.trades[quoteReqID]; ok {
t.Status = domain.TradeStatusCompleted
}
m.tradesMu.Unlock()
}
}
// handleExecutionReport handles an incoming ExecutionReport (35=8).
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
// In TW DPL, ClOrdID (Tag 11) always equals the original QuoteReqID (Tag 131),
// so we use clOrdID directly as the map lookup key.
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
clOrdID, _ := msg.GetClOrdID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
listID, _ := msg.GetListID()
slog.Info("handleExecutionReport received",
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
)
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
slog.Error("handleExecutionReport: failed to send ExecutionAck", "execID", execID, "error", ackErr.Error())
} else {
slog.Info("ExecutionAck sent", "execID", execID)
}
switch {
case strings.Contains(execID, "_LISTEND"):
slog.Info("List ended (due-in closed), awaiting trade result from TW",
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW, marking completed",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
m.tradesMu.Lock()
if t, ok := m.trades[clOrdID]; ok {
t.Status = domain.TradeStatusCompleted
}
m.tradesMu.Unlock()
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
}))
// If the ExecutionReport carries a TradeID, persist it in qfixdpl_logs for cross-service correlation.
tradeID, _ := msg.GetTradeID()
if tradeID != "" {
m.tradesMu.Lock()
if t, ok := m.trades[clOrdID]; ok {
t.TradeID = tradeID
}
m.tradesMu.Unlock()
if err := m.store.UpdateLogTradeID(clOrdID, tradeID); err != nil {
slog.Error("handleExecutionReport: failed to update log trade_id", "clOrdID", clOrdID, "tradeID", tradeID, "error", err)
}
}
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}
// GetTrades returns a snapshot of only active trades.
func (m *Manager) GetTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
if t.Status != domain.TradeStatusActive {
continue
}
trades = append(trades, toListTrade(t))
}
return trades
}
// GetAllTrades returns a snapshot of all trades (active, rejected, completed).
func (m *Manager) GetAllTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
trades = append(trades, toListTrade(t))
}
return trades
}
func toListTrade(t *listTrade) domain.ListTrade {
return domain.ListTrade{
QuoteReqID: t.QuoteReqID,
TradeID: t.TradeID,
ListID: t.ListID,
Symbol: t.Symbol,
SecurityIDSrc: string(t.SecurityIDSrc),
Currency: t.Currency,
Side: string(t.Side),
OrderQty: t.OrderQty.String(),
SettlDate: t.SettlDate,
Price: t.Price.String(),
OwnerTraderID: t.OwnerTraderID,
Status: t.Status,
} }
return quickfix.SessionID{}
} }
// handleRawMessage persists raw FIX message strings to the logs table. // handleRawMessage persists raw FIX message strings to the logs table.
@ -560,7 +576,8 @@ func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
QuoteReqID: quoteReqID, QuoteReqID: quoteReqID,
RawMsg: "[" + direction + "] " + msg.String(), RawMsg: "[" + direction + "] " + msg.String(),
}); err != nil { }); err != nil {
slog.Error("failed to persist raw log", "error", err) err = tracerr.Errorf("failed to persist raw log: %w", err)
slog.Error(err.Error())
} }
} }
@ -570,18 +587,19 @@ func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSO
QuoteReqID: quoteReqID, QuoteReqID: quoteReqID,
JMessage: fixJSON, JMessage: fixJSON,
}); err != nil { }); err != nil {
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "quoteReqID", quoteReqID, "error", err) err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", fixJSON.MsgType, quoteReqID, err)
slog.Error(err.Error())
} }
} }
// loadTrades reconstructs all trades and their states from today's messages in the database. // loadActiveTrades reconstructs active trades from today's messages in the database.
func (m *Manager) loadTrades() error { func (m *Manager) loadActiveTrades() error {
messages, err := m.store.GetTodayMessages() messages, err := m.store.GetTodayMessages()
if err != nil { if err != nil {
return err return err
} }
trades := make(map[string]*listTrade) activeTrades := make(map[string]*listTrade)
for _, msg := range messages { for _, msg := range messages {
switch msg.JMessage.MsgType { switch msg.JMessage.MsgType {
@ -605,13 +623,16 @@ func (m *Manager) loadTrades() error {
trade := &listTrade{ trade := &listTrade{
QuoteReqID: msg.QuoteReqID, QuoteReqID: msg.QuoteReqID,
ListID: listID, ListID: listID,
Status: domain.TradeStatusActive,
} }
if v, ok := body["SecurityID"].(string); ok { if v, ok := body["SecurityID"].(string); ok {
trade.Symbol = v trade.Symbol = v
} }
if v, ok := body["SecurityIDSource"].(string); ok {
trade.SecurityIDSrc = enum.SecurityIDSource(v)
}
if v, ok := body["Currency"].(string); ok { if v, ok := body["Currency"].(string); ok {
trade.Currency = v trade.Currency = v
} }
@ -632,16 +653,22 @@ func (m *Manager) loadTrades() error {
trade.OwnerTraderID = v trade.OwnerTraderID = v
} }
trades[msg.QuoteReqID] = trade activeTrades[msg.QuoteReqID] = trade
case "CW": // QuoteAck — only status "2" (Rejected) marks the trade as rejected case "S": // Outgoing Quote — dealer has already quoted this trade
if t, ok := activeTrades[msg.QuoteReqID]; ok {
t.Quoted = true
if v, ok := msg.JMessage.Body["Price"].(string); ok {
t.Price, _ = decimal.NewFromString(v)
}
}
case "CW": // QuoteAck — if rejected, trade is dead
body := msg.JMessage.Body body := msg.JMessage.Body
quoteAckStatus, _ := body["QuoteAckStatus"].(string) quoteAckStatus, _ := body["QuoteAckStatus"].(string)
if quoteAckStatus == string(enum.QuoteAckStatus_REJECTED) { if quoteAckStatus != string(enum.QuoteAckStatus_ACCEPTED) {
if t, ok := trades[msg.QuoteReqID]; ok { delete(activeTrades, msg.QuoteReqID)
t.Status = domain.TradeStatusRejected
}
} }
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6) case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
@ -649,9 +676,7 @@ func (m *Manager) loadTrades() error {
quoteRespID, _ := body["QuoteRespID"].(string) quoteRespID, _ := body["QuoteRespID"].(string)
if strings.HasSuffix(quoteRespID, "_TRDSUMM") { if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
if t, ok := trades[msg.QuoteReqID]; ok { delete(activeTrades, msg.QuoteReqID)
t.Status = domain.TradeStatusCompleted
}
} }
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4) case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
@ -659,29 +684,14 @@ func (m *Manager) loadTrades() error {
execID, _ := body["ExecID"].(string) execID, _ := body["ExecID"].(string)
clOrdID, _ := body["ClOrdID"].(string) clOrdID, _ := body["ClOrdID"].(string)
if tid, ok := body["TradeID"].(string); ok && tid != "" {
if t, ok := trades[clOrdID]; ok {
t.TradeID = tid
}
}
if strings.Contains(execID, "_TRDSUMM") { if strings.Contains(execID, "_TRDSUMM") {
if t, ok := trades[clOrdID]; ok { delete(activeTrades, clOrdID)
t.Status = domain.TradeStatusCompleted
}
} }
} }
} }
active := 0 m.trades = activeTrades
for _, t := range trades { slog.Info("recovery completed", "activeTrades", len(activeTrades))
if t.Status == domain.TradeStatusActive {
active++
}
}
m.trades = trades
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
return nil return nil
} }

View File

@ -1,647 +0,0 @@
package fix
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/domain"
)
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
// newTestManager builds a Manager with the given store without calling Start().
func newTestManager(store domain.PersistenceStore) *Manager {
notify := &MockNotifier{}
return NewManager(app.FIXConfig{}, store, notify)
}
// makeMsg builds a TradeMessage with the given quoteReqID, msgType, and body.
func makeMsg(quoteReqID, msgType string, body map[string]interface{}) domain.TradeMessage {
return domain.TradeMessage{
ID: "test-id-" + quoteReqID + "-" + msgType,
QuoteReqID: quoteReqID,
JMessage: domain.FixMessageJSON{
MsgType: msgType,
QuoteReqID: quoteReqID,
Body: body,
ReceiveTime: time.Now(),
},
CreatedAt: time.Now(),
}
}
// makeQuoteRequest builds a "R" (QuoteRequest) TradeMessage.
func makeQuoteRequest(quoteReqID, listID, nt string, extras map[string]interface{}) domain.TradeMessage {
body := map[string]interface{}{
"NegotiationType": nt,
"ListID": listID,
}
for k, v := range extras {
body[k] = v
}
return makeMsg(quoteReqID, "R", body)
}
// makeQuoteAck builds a "CW" (QuoteAck) TradeMessage.
func makeQuoteAck(quoteReqID, status string) domain.TradeMessage {
return makeMsg(quoteReqID, "CW", map[string]interface{}{
"QuoteAckStatus": status,
})
}
// makeQuoteResponse builds an "AJ" (QuoteResponse) TradeMessage.
func makeQuoteResponse(quoteReqID, quoteRespID string) domain.TradeMessage {
return makeMsg(quoteReqID, "AJ", map[string]interface{}{
"QuoteRespID": quoteRespID,
})
}
// makeExecutionReport builds an "8" (ExecutionReport) TradeMessage.
// clOrdID maps to body["ClOrdID"]; it is also set as TradeMessage.QuoteReqID
// to mirror how persistMessage works in handleExecutionReport.
func makeExecutionReport(clOrdID, execID string) domain.TradeMessage {
return makeMsg(clOrdID, "8", map[string]interface{}{
"ClOrdID": clOrdID,
"ExecID": execID,
})
}
// makeOutgoing builds an outgoing message (AI, S, BN) for a given quoteReqID.
func makeOutgoing(quoteReqID, msgType string) domain.TradeMessage {
return makeMsg(quoteReqID, msgType, map[string]interface{}{})
}
// ---------------------------------------------------------------------------
// Group 1 — DB vacia
// ---------------------------------------------------------------------------
func TestLoadTrades_EmptyDB(t *testing.T) {
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.NotNil(t, m.trades)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 2 — Interrupcion despues de QuoteRequest
// ---------------------------------------------------------------------------
func TestLoadTrades_SingleR_CreatesOneTrade(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", map[string]interface{}{
"SecurityID": "US1234567890",
"Currency": "USD",
"Side": "1",
"OrderQty": "1000000",
"SettlDate": "20260320",
"OwnerTraderID": "trader1",
})
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
require.Len(t, m.trades, 1)
trade := m.trades["LST_ABC123"]
require.NotNil(t, trade)
assert.Equal(t, "LST_ABC123", trade.QuoteReqID)
assert.Equal(t, "LIST_1", trade.ListID)
assert.Equal(t, "US1234567890", trade.Symbol)
assert.Equal(t, "USD", trade.Currency)
assert.Equal(t, "1", string(trade.Side))
assert.Equal(t, "20260320", trade.SettlDate)
assert.Equal(t, "trader1", trade.OwnerTraderID)
assert.Equal(t, domain.TradeStatusActive, trade.Status)
store.AssertExpectations(t)
}
func TestLoadTrades_R_WithOutgoingMsgs_IgnoresAI_S(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
ai := makeOutgoing("LST_ABC123", "AI")
s := makeOutgoing("LST_ABC123", "S")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, ai, s}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_R_NonLSTPrefix_Ignored(t *testing.T) {
r := makeQuoteRequest("TRD_ABC123", "LIST_1", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
func TestLoadTrades_R_NonRFQ_NegotiationType_Ignored(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "DEALER", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
func TestLoadTrades_R_EmptyListID_Ignored(t *testing.T) {
r := makeQuoteRequest("LST_X", "", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
func TestLoadTrades_R_MissingOptionalFields_TradeCreated(t *testing.T) {
r := makeQuoteRequest("LST_MIN", "LIST_MIN", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
require.Len(t, m.trades, 1)
trade := m.trades["LST_MIN"]
require.NotNil(t, trade)
assert.Equal(t, "LST_MIN", trade.QuoteReqID)
assert.Equal(t, "LIST_MIN", trade.ListID)
assert.Equal(t, "", trade.Symbol)
assert.Equal(t, "", trade.Currency)
assert.Equal(t, "", trade.SettlDate)
assert.Equal(t, "", trade.OwnerTraderID)
assert.Equal(t, domain.TradeStatusActive, trade.Status)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 3 — Interrupcion despues de QuoteAck
// ---------------------------------------------------------------------------
func TestLoadTrades_CW_Accepted_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1") // ACCEPTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_CW_Rejected_TradeMarkedRejected(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "2") // REJECTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusRejected, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
// QuoteAckStatus "0" is RECEIVED_NOT_YET_PROCESSED — not a rejection.
// Only status "2" (Rejected) should mark the trade as rejected.
func TestLoadTrades_CW_ReceivedNotYetProcessed_TradeStaysActive(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "0") // RECEIVED_NOT_YET_PROCESSED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_CW_WithoutPriorR_NoCrash(t *testing.T) {
cw := makeQuoteAck("LST_ORPHAN", "2")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 4 — Interrupcion despues de QuoteResponse
// ---------------------------------------------------------------------------
func TestLoadTrades_AJ_TRDREQ_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDREQ")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_TRDEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_LISTEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_LISTEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
aj := makeQuoteResponse("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 5 — Interrupcion despues de ExecutionReport
// ---------------------------------------------------------------------------
func TestLoadTrades_ExecReport_LISTEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_ABC123", "1")
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_ExecReport_TRDEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_ABC123", "1")
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
execListEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
execTrdEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, execListEnd, execTrdEnd}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_ExecReport_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_ABC123", "1")
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_ExecReport_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
exec := makeExecutionReport("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{exec}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 6 — Multiples trades en paralelo
// ---------------------------------------------------------------------------
func TestLoadTrades_TwoTrades_BothActive(t *testing.T) {
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
cw1 := makeQuoteAck("LST_TRADE1", "1")
cw2 := makeQuoteAck("LST_TRADE2", "1")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, cw1, cw2}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 2)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE1"].Status)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_TwoTrades_OneCompleted_OneActive(t *testing.T) {
// TRADE1: fully completed via flow 8.4 (_TRDSUMM execution report)
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
cw1 := makeQuoteAck("LST_TRADE1", "1")
aj1 := makeQuoteResponse("LST_TRADE1", "LST_TRADE1_TRDREQ")
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
// TRADE2: still active
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
cw2 := makeQuoteAck("LST_TRADE2", "1")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return(
[]domain.TradeMessage{r1, cw1, aj1, exec1, r2, cw2}, nil,
)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 2)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_TwoTrades_BothCompleted(t *testing.T) {
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
cw1 := makeQuoteAck("LST_TRADE1", "1")
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
cw2 := makeQuoteAck("LST_TRADE2", "1")
exec2 := makeExecutionReport("LST_TRADE2", "LST_TRADE2_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return(
[]domain.TradeMessage{r1, cw1, exec1, r2, cw2, exec2}, nil,
)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 2)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE2"].Status)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 7 — SessionID se asigna en onLogon (solo a trades activos)
// ---------------------------------------------------------------------------
func TestLoadTrades_SessionID_IsZeroAfterRecovery(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
require.Len(t, m.trades, 1)
trade := m.trades["LST_X"]
require.NotNil(t, trade)
assert.Equal(t, quickfix.SessionID{}, trade.SessionID)
store.AssertExpectations(t)
}
func TestOnLogon_AssignsSessionToRecoveredActiveTrades(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
sessionID := quickfix.SessionID{
BeginString: "FIXT.1.1",
SenderCompID: "QFIXDPL",
TargetCompID: "TRADEWEB",
}
m.onLogon(sessionID)
trade := m.trades["LST_X"]
require.NotNil(t, trade)
assert.Equal(t, sessionID, trade.SessionID)
}
func TestOnLogon_DoesNotAssignSessionToCompletedTrades(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
exec := makeExecutionReport("LST_X", "LST_X_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, exec}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
sessionID := quickfix.SessionID{
BeginString: "FIXT.1.1",
SenderCompID: "QFIXDPL",
TargetCompID: "TRADEWEB",
}
m.onLogon(sessionID)
trade := m.trades["LST_X"]
require.NotNil(t, trade)
assert.Equal(t, quickfix.SessionID{}, trade.SessionID, "completed trades should not get session assigned")
}
// ---------------------------------------------------------------------------
// Group 8 — GetTrades filtra por active, GetAllTrades devuelve todos
// ---------------------------------------------------------------------------
func TestGetTrades_ReturnsOnlyActive(t *testing.T) {
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
trades := m.GetTrades()
assert.Len(t, trades, 1)
assert.Equal(t, "LST_ACTIVE", trades[0].QuoteReqID)
assert.Equal(t, domain.TradeStatusActive, trades[0].Status)
}
func TestGetAllTrades_ReturnsAll(t *testing.T) {
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
trades := m.GetAllTrades()
assert.Len(t, trades, 3)
statusMap := map[string]domain.TradeStatus{}
for _, tr := range trades {
statusMap[tr.QuoteReqID] = tr.Status
}
assert.Equal(t, domain.TradeStatusActive, statusMap["LST_ACTIVE"])
assert.Equal(t, domain.TradeStatusCompleted, statusMap["LST_DONE"])
assert.Equal(t, domain.TradeStatusRejected, statusMap["LST_REJ"])
}
// ---------------------------------------------------------------------------
// Group 9 — Error en store
// ---------------------------------------------------------------------------
func TestLoadTrades_StoreError_ReturnsError_MapEmpty(t *testing.T) {
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage(nil), errors.New("db connection failed"))
m := newTestManager(store)
err := m.loadTrades()
assert.Error(t, err)
assert.NotNil(t, m.trades)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}

View File

@ -1,56 +0,0 @@
package fix
import (
"sync"
"github.com/stretchr/testify/mock"
"quantex.com/qfixdpl/src/domain"
)
// MockPersistenceStore is a testify mock implementing domain.PersistenceStore.
type MockPersistenceStore struct {
mock.Mock
}
func (m *MockPersistenceStore) SaveMessage(msg domain.TradeMessage) error {
args := m.Called(msg)
return args.Error(0)
}
func (m *MockPersistenceStore) SaveLog(entry domain.LogEntry) error {
args := m.Called(entry)
return args.Error(0)
}
func (m *MockPersistenceStore) GetTodayMessages() ([]domain.TradeMessage, error) {
args := m.Called()
msgs, _ := args.Get(0).([]domain.TradeMessage)
return msgs, args.Error(1)
}
func (m *MockPersistenceStore) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
args := m.Called(quoteReqID)
logs, _ := args.Get(0).(domain.Logs)
return logs, args.Error(1)
}
func (m *MockPersistenceStore) UpdateLogTradeID(quoteReqID, tradeID string) error {
args := m.Called(quoteReqID, tradeID)
return args.Error(0)
}
func (m *MockPersistenceStore) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
args := m.Called(quoteReqID)
log, _ := args.Get(0).(domain.FullTradeLog)
return log, args.Error(1)
}
// MockNotifier is a testify mock implementing domain.Notifier.
type MockNotifier struct {
mock.Mock
}
func (m *MockNotifier) SendMsg(chat domain.MessageChannel, text string, status domain.MessageStatus, wg *sync.WaitGroup) {
m.Called(chat, text, status, wg)
}

View File

@ -185,9 +185,6 @@ func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessage
if v, e := msg.GetSettlDate(); e == nil { if v, e := msg.GetSettlDate(); e == nil {
body["SettlDate"] = v body["SettlDate"] = v
} }
if v, e := msg.GetTradeID(); e == nil {
body["TradeID"] = v
}
return domain.FixMessageJSON{ return domain.FixMessageJSON{
Direction: "IN", Direction: "IN",

104
src/client/fix/protocol.txt Normal file
View File

@ -0,0 +1,104 @@
Step 1
Direction: ← QuoteRequest (R)
Comentary: Tradeweb sends trade information to dealer.
FIX Message: 8=FIXT.1.1 9=869 35=R 34=2 49=TRADEWEB 52=20160401-16:53:19.992 56=TW1_CORI_TEST_12345_DLRDPL 131=LST_20160401_TW1_CORI_NY302485.18_1 146=1 55=AMXLMM 2.375 09/08/16 48=02364WBC8 22=1 460=12 167=CORP 762=REGCORIPRC 541=20160908 225=20110908 470=MX 223=2.375 106=AMERICA MOVIL SAB DE CV 54=2 38=1000000 64=20160406 15=USD 6110=Comms 60=20160401-16:53:19 662=99.98046875 22570=0.76 663=1 699=912828UR9 761=1 423=6 44=-999999 5023=0.001000 66=NY302485.18 6847=1 75=20160401 464=Y 20086=1 20074=Y 20075=Y 20077=LatAm Comms 20078=pddealer 20079=60 20081=60 20090=60 20072=60 20098=60 5745=1 20073=RFQ 20076=Y 20156=N 20130=2000000000 20138=JPM,MER 20175=20120308 2115=0 22630=0 20265=LatAm 453=3 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=1 523=Tradeweb0001 803=4002 448=DTCC 447=C 452=4 5114=2 5113=1 20169=A2 5113=0 20169=A- 10=121
Step 2
Direction: QuoteStatusReport (AI) →
Comentary: Dealer acknowledges trade.
FIX Message: 8=FIXT.1.1 9=198 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=2 52=20160401-16:53:19.988 131=LST_20160401_TW1_CORI_NY302485.18_1 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:53:19.988 297=0 10=106
Step 3
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=231 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=3 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 6153=emackdlr 44=.99 423=6 60=20160401-16:53:19.990 10=247
Step 4
Direction: ← QuoteAck (CW)
Comentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=3 49=TRADEWEB 52=20160401-16:53:25.102 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:25 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
Step 5
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=4 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.97 423=6 60=20160401-16:53:19.990 10=114
Step 6
Direction: ← QuoteAck (CW)
Comentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=4 49=TRADEWEB 52=20160401-16:53:30.055 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:30 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
Step 7
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=6 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.98 423=6 60=20160401-16:53:19.990 10=117
Step 8
Direction: ← QuoteAck (CW)
Commentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=5 49=TRADEWEB 52=20160401-16:53:35.071 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:35 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=163
Step 9
Direction: ← ExecutionReport (8)
Commentary: Tradeweb notifies dealer that the list trading (Due In time) has ended. ExecType=A
FIX Message: 8=FIXT.1.1 9=289 35=8 34=7 49=TRADEWEB 52=20160401-16:54:19.463 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.463 37=LST_20160401_TW1_CORI_NY302485.18_1 39=D 55=[N/A] 60=20160401-16:54:19 75=20160401 150=I 151=0 20086=1 10=073
Step 10
Direction: ExecutionAck (BN) →
Commentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=8 52=20160401-16:54:19.448 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.448 55=[N/A] 60=20160401-16:54:19.448 10=119
Step 11
Direction: ← QuoteResponse (AJ)
Commentary: Customer lifts after good-for time expires. Tradeweb informs dealer customer lift.
FIX Message: 8=FIXT.1.1 9=431 35=AJ 34=10 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 22=1 38=1000000 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:32 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 423=6 662=99.98046875 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 694=1 2115=1 20074=Y 20075=N 20076=N 20079=60 20082=60 20156=N 22570=0.760289080299 22630=0 10=183
Step 12
Direction: QuoteStatusReport (AI) →
Commentary: Dealer acknowledges customers trade acceptance.
FIX Message: 8=FIXT.1.1 9=206 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=12 52=20160401-16:55:32.849 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 131=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:32.849 297=0 10=189
Step 13
Direction: ← ExecutionReport (8)
Commentary: Tradeweb notifies the dealer that list has ended
FIX Message: incoming8=FIXT.1.1 9=290 35=8 34=11 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.855 37=LST_20160401_TW1_CORI_NY302485.18_1 39=A 55=[N/A] 60=20160401-16:55:32 75=20160401 150=A 151=0 20086=1 10=095
Step 14
Direction: ExecutionAck (BN) →
Commentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=13 52=20160401-16:55:32.852 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.852 55=[N/A] 60=20160401-16:55:32.852 10=155
Step 15
Direction: ExecutionReport (8) →
Commentary: Dealer accepts re-quote is not allowed. ExecType=F, OrdStatus=Filled
FIX Message: 8=FIXT.1.1 9=283 35=8 34=14 49=TW1_CORI_TEST_12345_DLRDPL 52=20160401-16:55:32.850 56=TRADEWEB 6=0 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 54=2 55=[N/A] 150=F 151=0 6153=LST_20160401_TW1_CORI_NY302485.18_1 22631=POSTTRADE_STRING 22632=POSTTRADE_STRING 10=155
Step 16
Direction: ← ExecutionAck (BN)
Commentary: Tradeweb acknowledges ExecutionReport
FIX Message: 8=FIXT.1.1 9=233 35=BN 34=12 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:37 1036=1 10=051
Step 17
Direction: ← ExecutionReport (8)
Commentary: Tradeweb informs Dealer of outcome of a list trade item. OrdStatus=Filled, ExecType=Trade
FIX Message: 8=FIXT.1.1 9=337 35=8 34=13 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 44=0.98 54=2 55=[N/A] 60=20160401-16:55:37 75=20160401 150=F 151=0 423=6 662=99.98046875 22570=0.760289080299 10=067
Step 18
Direction: ExecutionAck (BN) →
Comentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=15 52=20160401-16:55:37.910 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 55=[N/A] 60=20160401-16:55:37.910 10=078
Step 19
Direction:
Comentary: Autospot at Tradeweb Treasury composite.
FIX Message:
Step 20
Direction: ← ExecutionReport (8)
Comentary: Tradeweb sends ExecutionReport message to confirm final outcome of list trade item including applicable cover quote, spot, settlement money information, etc. OrdStatus=Filled, ExecType=Order Status TradeSummary = Y
FIX Message: 8=FIXT.1.1 9=733 35=8 34=14 49=TRADEWEB 52=20160401-16:55:40.292 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 22=1 37=LST_20160401_TW1_CORI_NY302485.18_1 38=1000000 39=2 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:40 64=20160406 75=20160401 150=F 151=0 167=CORP 236=1.74 423=6 453=2 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=2 523=Tradeweb0001 803=4002 523=YES 803=4003 526=TRD_20160401_TW1_CORI_23 662=99.98046875 1003=20160401.TW1.CORI.23 6153=emackdlr 6731=20160401.TW1.CORI.23 20115=100.265 20250=225000 22570=0.76 22630=0 22631=POSTTRADE_STRING 22634=160401.DLRX.TRSY.120 22636=Y 10=239
Step 21
Direction: ExecutionAck (BN) →
Comentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=16 52=20160401-16:55:40.287 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 55=[N/A] 60=20160401-16:55:40.287 10=182

View File

@ -1,20 +1,16 @@
CREATE TABLE IF NOT EXISTS qfixdpl_messages ( CREATE TABLE IF NOT EXISTS qfixdpl_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quote_req_id TEXT NOT NULL, quote_req_id TEXT NOT NULL,
trade_id TEXT,
j_message JSONB NOT NULL, j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
); );
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id); CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at); CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
CREATE INDEX IF NOT EXISTS idx_messages_trade_id ON qfixdpl_messages(trade_id);
CREATE TABLE IF NOT EXISTS qfixdpl_logs ( CREATE TABLE IF NOT EXISTS qfixdpl_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quote_req_id TEXT NOT NULL UNIQUE, quote_req_id TEXT NOT NULL UNIQUE,
trade_id TEXT,
raw_msg TEXT NOT NULL, raw_msg TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
); );
CREATE INDEX IF NOT EXISTS idx_dpl_logs_trade_id ON qfixdpl_logs(trade_id);

View File

@ -16,8 +16,8 @@ func (p *Store) SaveMessage(msg domain.TradeMessage) error {
} }
_, err = p.db.Exec( _, err = p.db.Exec(
"INSERT INTO qfixdpl_messages (quote_req_id, trade_id, j_message) VALUES ($1, NULLIF($2, ''), $3)", "INSERT INTO qfixdpl_messages (quote_req_id, j_message) VALUES ($1, $2)",
msg.QuoteReqID, msg.TradeID, string(jsonBytes), msg.QuoteReqID, string(jsonBytes),
) )
if err != nil { if err != nil {
return tracerr.Errorf("error inserting message: %w", err) return tracerr.Errorf("error inserting message: %w", err)
@ -43,7 +43,7 @@ func (p *Store) SaveLog(entry domain.LogEntry) error {
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) { func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
rows, err := p.db.Query( rows, err := p.db.Query(
"SELECT id, quote_req_id, trade_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC", "SELECT id, quote_req_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
) )
if err != nil { if err != nil {
return nil, tracerr.Errorf("error querying today messages: %w", err) return nil, tracerr.Errorf("error querying today messages: %w", err)
@ -55,12 +55,11 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
for rows.Next() { for rows.Next() {
var ( var (
id, quoteReqID string id, quoteReqID string
tradeID *string
jMessageRaw []byte jMessageRaw []byte
createdAt time.Time createdAt time.Time
) )
if err := rows.Scan(&id, &quoteReqID, &tradeID, &jMessageRaw, &createdAt); err != nil { if err := rows.Scan(&id, &quoteReqID, &jMessageRaw, &createdAt); err != nil {
return nil, tracerr.Errorf("error scanning message row: %w", err) return nil, tracerr.Errorf("error scanning message row: %w", err)
} }
@ -69,18 +68,12 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err) return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
} }
msg := domain.TradeMessage{ messages = append(messages, domain.TradeMessage{
ID: id, ID: id,
QuoteReqID: quoteReqID, QuoteReqID: quoteReqID,
JMessage: jMessage, JMessage: jMessage,
CreatedAt: createdAt, CreatedAt: createdAt,
} })
if tradeID != nil {
msg.TradeID = *tradeID
}
messages = append(messages, msg)
} }
if err := rows.Err(); err != nil { if err := rows.Err(); err != nil {
@ -90,69 +83,6 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
return messages, nil return messages, nil
} }
func (p *Store) UpdateLogTradeID(quoteReqID, tradeID string) error {
_, err := p.db.Exec(
"UPDATE qfixdpl_logs SET trade_id = $1, updated_at = NOW() WHERE quote_req_id = $2",
tradeID, quoteReqID,
)
if err != nil {
return tracerr.Errorf("error updating log trade_id: %w", err)
}
return nil
}
func (p *Store) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
rows, err := p.db.Query(
"SELECT trade_id, raw_msg FROM qfixdpl_logs WHERE quote_req_id = $1", quoteReqID,
)
if err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error querying dpl log: %w", err)
}
defer rows.Close()
if !rows.Next() {
return domain.FullTradeLog{}, tracerr.Errorf("no log found for quoteReqID: %s", quoteReqID)
}
var (
tradeID *string
rawMsg string
)
if err := rows.Scan(&tradeID, &rawMsg); err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error scanning dpl log row: %w", err)
}
result := domain.FullTradeLog{
QuoteReqID: quoteReqID,
DPLEntries: strings.Split(rawMsg, "\n"),
}
if tradeID != nil && *tradeID != "" {
result.TradeID = *tradeID
ptRows, err := p.db.Query(
"SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", *tradeID,
)
if err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error querying pt logs: %w", err)
}
defer ptRows.Close()
if ptRows.Next() {
var ptRawMsg string
if err := ptRows.Scan(&ptRawMsg); err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error scanning pt log row: %w", err)
}
result.PTEntries = strings.Split(ptRawMsg, "\n")
}
}
return result, nil
}
func (p *Store) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) { func (p *Store) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
selectStmt := "SELECT raw_msg FROM qfixdpl_logs WHERE quote_req_id = '" + quoteReqID + "';" selectStmt := "SELECT raw_msg FROM qfixdpl_logs WHERE quote_req_id = '" + quoteReqID + "';"

View File

@ -3,29 +3,18 @@ package domain
import "time" import "time"
// TradeStatus represents the lifecycle state of a List Trading trade.
type TradeStatus string
const (
TradeStatusActive TradeStatus = "active"
TradeStatusRejected TradeStatus = "rejected"
TradeStatusCompleted TradeStatus = "completed"
)
// ListTrade es la representacion exportada de un trade de List Trading. // ListTrade es la representacion exportada de un trade de List Trading.
type ListTrade struct { type ListTrade struct {
QuoteReqID string `json:"quote_req_id"` QuoteReqID string
TradeID string `json:"trade_id,omitempty"` ListID string
ListID string `json:"list_id"` Symbol string
Symbol string `json:"symbol"` SecurityIDSrc string
SecurityIDSrc string `json:"security_id_src"` Currency string
Currency string `json:"currency"` Side string
Side string `json:"side"` OrderQty string
OrderQty string `json:"order_qty"` SettlDate string
SettlDate string `json:"settl_date"` Price string
Price string `json:"price"` OwnerTraderID string
OwnerTraderID string `json:"owner_trader_id"`
Status TradeStatus `json:"status"`
} }
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento. // FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
@ -42,7 +31,6 @@ type FixMessageJSON struct {
type TradeMessage struct { type TradeMessage struct {
ID string `json:"id"` ID string `json:"id"`
QuoteReqID string `json:"quote_req_id"` QuoteReqID string `json:"quote_req_id"`
TradeID string `json:"trade_id,omitempty"`
JMessage FixMessageJSON `json:"j_message"` JMessage FixMessageJSON `json:"j_message"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
} }
@ -58,20 +46,10 @@ type Logs struct {
Entries []string `json:"entries"` Entries []string `json:"entries"`
} }
// FullTradeLog es la respuesta del endpoint GET /trades/:id/full-log.
type FullTradeLog struct {
QuoteReqID string `json:"quote_req_id"`
TradeID string `json:"trade_id,omitempty"`
DPLEntries []string `json:"dpl_entries"`
PTEntries []string `json:"pt_entries,omitempty"`
}
// PersistenceStore define la interfaz de persistencia. // PersistenceStore define la interfaz de persistencia.
type PersistenceStore interface { type PersistenceStore interface {
SaveMessage(msg TradeMessage) error SaveMessage(msg TradeMessage) error
SaveLog(entry LogEntry) error SaveLog(entry LogEntry) error
GetTodayMessages() ([]TradeMessage, error) GetTodayMessages() ([]TradeMessage, error)
GetLogsByQuoteReqID(quoteReqID string) (Logs, error) GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
UpdateLogTradeID(quoteReqID, tradeID string) error
GetFullTradeLog(id string) (FullTradeLog, error)
} }