Compare commits
3 Commits
develop
...
QFIXDPL-3/
| Author | SHA1 | Date | |
|---|---|---|---|
| 36b841fc66 | |||
| 68238d309a | |||
| 15a60bac92 |
2
Makefile
2
Makefile
@ -62,7 +62,7 @@ linux-build: check-env swag # Build a linux version for prod environment. Set e=
|
||||
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
|
||||
|
||||
deploy: # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=dpl
|
||||
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/
|
||||
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/dpl/
|
||||
|
||||
fmt: download-versions # Apply the Go formatter to the code
|
||||
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);
|
||||
|
||||
@ -4,12 +4,14 @@ import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/sasha-s/go-deadlock"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"github.com/shopspring/decimal"
|
||||
|
||||
"quantex.com/qfixdpl/src/app"
|
||||
"quantex.com/qfixdpl/src/app/version"
|
||||
@ -293,7 +295,7 @@ func allowed(origin string, config Config) bool {
|
||||
|
||||
// GetTrades godoc
|
||||
// @Summary List active trades
|
||||
// @Description Returns only active List Trading trades
|
||||
// @Description Returns all active List Trading trades
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Success 200 {array} domain.ListTrade
|
||||
@ -303,18 +305,6 @@ func (cont *Controller) GetTrades(ctx *gin.Context) {
|
||||
ctx.JSON(http.StatusOK, trades)
|
||||
}
|
||||
|
||||
// GetAllTrades godoc
|
||||
// @Summary List all trades
|
||||
// @Description Returns all List Trading trades (active, rejected, completed)
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Success 200 {array} domain.ListTrade
|
||||
// @Router /qfixdpl/v1/trades/all [get]
|
||||
func (cont *Controller) GetAllTrades(ctx *gin.Context) {
|
||||
trades := cont.tradeProvider.GetAllTrades()
|
||||
ctx.JSON(http.StatusOK, trades)
|
||||
}
|
||||
|
||||
// GetLogs godoc
|
||||
// @Summary Get raw FIX logs for a trade
|
||||
// @Description Returns raw FIX message logs for a given QuoteReqID
|
||||
@ -328,7 +318,8 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
|
||||
|
||||
logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID)
|
||||
if err != nil {
|
||||
slog.Error("GetLogs: error fetching logs", "quoteReqID", quoteReqID, "error", err)
|
||||
err = tracerr.Errorf("GetLogs: error fetching logs (quoteReqID=%s): %w", quoteReqID, err)
|
||||
slog.Error(err.Error())
|
||||
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
|
||||
|
||||
return
|
||||
@ -337,25 +328,57 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
|
||||
ctx.JSON(http.StatusOK, logs)
|
||||
}
|
||||
|
||||
// GetFullTradeLog godoc
|
||||
// @Summary Get full trade lifecycle log
|
||||
// @Description Returns DPL and Post-Trade raw FIX logs for a given trade (by QuoteReqID)
|
||||
// GetPendingQuoteRequests godoc
|
||||
// @Summary List pending QuoteRequests
|
||||
// @Description Returns all QuoteRequests received from TW that have not been quoted yet by the dealer
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Param quoteReqID path string true "QuoteReqID"
|
||||
// @Success 200 {object} domain.FullTradeLog
|
||||
// @Router /qfixdpl/v1/trades/{quoteReqID}/full-log [get]
|
||||
func (cont *Controller) GetFullTradeLog(ctx *gin.Context) {
|
||||
quoteReqID := ctx.Param("quoteReqID")
|
||||
|
||||
fullLog, err := cont.store.GetFullTradeLog(quoteReqID)
|
||||
if err != nil {
|
||||
slog.Error("GetFullTradeLog: error fetching full trade log", "quoteReqID", quoteReqID, "error", err)
|
||||
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching full trade log"})
|
||||
// @Success 200 {array} domain.ListTrade
|
||||
// @Router /qfixdpl/v1/quote-requests [get]
|
||||
func (cont *Controller) GetPendingQuoteRequests(ctx *gin.Context) {
|
||||
pending := cont.tradeProvider.GetPendingQuoteRequests()
|
||||
ctx.JSON(http.StatusOK, pending)
|
||||
}
|
||||
|
||||
// SendQuote godoc
|
||||
// @Summary Send a Quote for a pending QuoteRequest
|
||||
// @Description Builds and sends a Quote (35=S) to TW for an existing QuoteRequest at the given price
|
||||
// @Tags fix
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param body body SendQuoteRequest true "Quote to send"
|
||||
// @Success 200 {object} Msg
|
||||
// @Failure 400 {object} HTTPError
|
||||
// @Failure 404 {object} HTTPError
|
||||
// @Failure 409 {object} HTTPError
|
||||
// @Failure 500 {object} HTTPError
|
||||
// @Router /qfixdpl/v1/quotes [post]
|
||||
func (cont *Controller) SendQuote(ctx *gin.Context) {
|
||||
var req SendQuoteRequest
|
||||
if err := ctx.ShouldBindJSON(&req); err != nil {
|
||||
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, fullLog)
|
||||
price, err := decimal.NewFromString(req.Price)
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusBadRequest, HTTPError{Error: "invalid price: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if err := cont.tradeProvider.SendQuote(req.QuoteReqID, price); err != nil {
|
||||
msg := err.Error()
|
||||
switch {
|
||||
case strings.Contains(msg, "not found"):
|
||||
ctx.JSON(http.StatusNotFound, HTTPError{Error: "quoteReqID not found"})
|
||||
case strings.Contains(msg, "already sent"):
|
||||
ctx.JSON(http.StatusConflict, HTTPError{Error: "quote already sent for this quoteReqID"})
|
||||
default:
|
||||
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "failed to send quote"})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, Msg{Text: "Quote sent"})
|
||||
}
|
||||
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
package rest
|
||||
|
||||
|
||||
type HTTPError struct {
|
||||
Error string
|
||||
}
|
||||
@ -18,3 +17,7 @@ type Session struct {
|
||||
Email string
|
||||
}
|
||||
|
||||
type SendQuoteRequest struct {
|
||||
QuoteReqID string `json:"QuoteReqID" binding:"required"`
|
||||
Price string `json:"Price" binding:"required" example:"99.6"`
|
||||
}
|
||||
|
||||
@ -22,8 +22,9 @@ func SetRoutes(api *API) {
|
||||
qfixdpl.Use(cont.AuthRequired)
|
||||
qfixdpl.GET("/health", cont.HealthCheck)
|
||||
qfixdpl.GET("/trades", cont.GetTrades)
|
||||
qfixdpl.GET("/trades/all", cont.GetAllTrades)
|
||||
qfixdpl.GET("/trades/:quoteReqID/full-log", cont.GetFullTradeLog)
|
||||
qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs)
|
||||
qfixdpl.GET("/quote-requests", cont.GetPendingQuoteRequests)
|
||||
qfixdpl.POST("/quotes", cont.SendQuote)
|
||||
|
||||
backoffice := qfixdpl.Group("/backoffice")
|
||||
backoffice.Use(cont.BackOfficeUser)
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/shopspring/decimal"
|
||||
|
||||
"quantex.com/qfixdpl/src/app"
|
||||
"quantex.com/qfixdpl/src/app/version"
|
||||
@ -19,7 +20,8 @@ import (
|
||||
// TradeProvider exposes trade data from the FIX manager.
|
||||
type TradeProvider interface {
|
||||
GetTrades() []domain.ListTrade
|
||||
GetAllTrades() []domain.ListTrade
|
||||
GetPendingQuoteRequests() []domain.ListTrade
|
||||
SendQuote(quoteReqID string, price decimal.Decimal) error
|
||||
}
|
||||
|
||||
const RedisMaxIdle = 3000 // In ms
|
||||
|
||||
@ -7,7 +7,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/shopspring/decimal"
|
||||
|
||||
"quantex.com/qfixdpl/quickfix"
|
||||
@ -29,7 +28,6 @@ import (
|
||||
|
||||
type listTrade struct {
|
||||
QuoteReqID string
|
||||
TradeID string
|
||||
ListID string
|
||||
Symbol string
|
||||
SecurityIDSrc enum.SecurityIDSource
|
||||
@ -40,7 +38,7 @@ type listTrade struct {
|
||||
Price decimal.Decimal
|
||||
OwnerTraderID string
|
||||
SessionID quickfix.SessionID
|
||||
Status domain.TradeStatus
|
||||
Quoted bool
|
||||
}
|
||||
|
||||
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
|
||||
@ -78,14 +76,15 @@ func (m *Manager) Start() error {
|
||||
fixApp.onRawMessage = m.handleRawMessage
|
||||
m.app = fixApp
|
||||
|
||||
if err := m.loadTrades(); err != nil {
|
||||
slog.Error("failed to load active trades from DB, starting with empty state", "error", err)
|
||||
if err := m.loadActiveTrades(); err != nil {
|
||||
err = tracerr.Errorf("failed to load active trades from DB, starting with empty state: %w", err)
|
||||
slog.Error(err.Error())
|
||||
}
|
||||
|
||||
f, err := os.Open(m.cfg.SettingsFile)
|
||||
if err != nil {
|
||||
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err)
|
||||
log.Error().Msg(err.Error())
|
||||
err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err)
|
||||
slog.Error(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
@ -93,8 +92,8 @@ func (m *Manager) Start() error {
|
||||
|
||||
settings, err := quickfix.ParseSettings(f)
|
||||
if err != nil {
|
||||
err = tracerr.Errorf("error parsing FIX settings: %s", err)
|
||||
log.Error().Msg(err.Error())
|
||||
err = tracerr.Errorf("error parsing FIX settings: %w", err)
|
||||
slog.Error(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
@ -102,16 +101,16 @@ func (m *Manager) Start() error {
|
||||
storeFactory := file.NewStoreFactory(settings)
|
||||
logFactory, err := filelog.NewLogFactory(settings)
|
||||
if err != nil {
|
||||
err = tracerr.Errorf("error creating file log factory: %s", err)
|
||||
log.Error().Msg(err.Error())
|
||||
err = tracerr.Errorf("error creating file log factory: %w", err)
|
||||
slog.Error(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
|
||||
if err != nil {
|
||||
err = tracerr.Errorf("error creating FIX initiator: %s", err)
|
||||
log.Error().Msg(err.Error())
|
||||
err = tracerr.Errorf("error creating FIX initiator: %w", err)
|
||||
slog.Error(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
@ -119,8 +118,8 @@ func (m *Manager) Start() error {
|
||||
m.initiator = initiator
|
||||
|
||||
if err = m.initiator.Start(); err != nil {
|
||||
err = tracerr.Errorf("error starting FIX initiator: %s", err)
|
||||
log.Error().Msg(err.Error())
|
||||
err = tracerr.Errorf("error starting FIX initiator: %w", err)
|
||||
slog.Error(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
@ -141,18 +140,6 @@ func (m *Manager) onLogon(sessionID quickfix.SessionID) {
|
||||
m.sessionsMu.Lock()
|
||||
m.sessions[sessionID.String()] = sessionID
|
||||
m.sessionsMu.Unlock()
|
||||
|
||||
// Assign the new session to all recovered trades that have no session yet.
|
||||
// This covers the case where the service was restarted mid-trade: loadTrades()
|
||||
// reconstructs the trade data but cannot recover the SessionID from the DB.
|
||||
// Since this is a single-session initiator, all active trades belong to this session.
|
||||
m.tradesMu.Lock()
|
||||
for _, trade := range m.trades {
|
||||
if trade.Status == domain.TradeStatusActive && trade.SessionID == (quickfix.SessionID{}) {
|
||||
trade.SessionID = sessionID
|
||||
}
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
}
|
||||
|
||||
func (m *Manager) onLogout(sessionID quickfix.SessionID) {
|
||||
@ -208,7 +195,9 @@ func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID qu
|
||||
func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) {
|
||||
quoteReqID, err := msg.GetQuoteReqID()
|
||||
if err != nil {
|
||||
slog.Error("handleQuoteRequest: missing QuoteReqID", "error", err.Error())
|
||||
err := tracerr.Errorf("handleQuoteRequest: missing QuoteReqID: %w", err)
|
||||
slog.Error(err.Error())
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -249,21 +238,258 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
|
||||
return
|
||||
}
|
||||
|
||||
// Step 1: Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
|
||||
// Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
|
||||
if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil {
|
||||
slog.Error("handleQuoteRequest: failed to send QuoteStatusReport", "quoteReqID", quoteReqID, "error", ackErr.Error())
|
||||
ackErr = tracerr.Errorf("handleQuoteRequest: failed to send QuoteStatusReport (quoteReqID=%s): %w", quoteReqID, ackErr)
|
||||
slog.Error(ackErr.Error())
|
||||
return
|
||||
}
|
||||
slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID)
|
||||
|
||||
// Step 2: Build and send Quote (35=S) with price.
|
||||
price := decimal.NewFromFloat(99.6)
|
||||
|
||||
sIDSource := enum.SecurityIDSource_ISIN_NUMBER
|
||||
if secIDSource == enum.SecurityIDSource_CUSIP {
|
||||
sIDSource = enum.SecurityIDSource_CUSIP
|
||||
}
|
||||
|
||||
// Store trade state as pending; Quote (35=S) is sent later via REST endpoint.
|
||||
m.tradesMu.Lock()
|
||||
m.trades[quoteReqID] = &listTrade{
|
||||
QuoteReqID: quoteReqID,
|
||||
ListID: listID,
|
||||
Symbol: symbol,
|
||||
SecurityIDSrc: sIDSource,
|
||||
Currency: currency,
|
||||
Side: side,
|
||||
OrderQty: orderQty,
|
||||
SettlDate: settlDate,
|
||||
OwnerTraderID: ownerTraderID,
|
||||
SessionID: sessionID,
|
||||
Quoted: false,
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
// Persist structured message (outside mutex).
|
||||
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
|
||||
|
||||
// Persist outgoing QuoteStatusReport.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
|
||||
"OwnerTraderID": ownerTraderID,
|
||||
}))
|
||||
}
|
||||
|
||||
// handleQuoteAck handles an incoming QuoteAck (35=CW).
|
||||
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
status, _ := msg.GetQuoteAckStatus()
|
||||
text, _ := msg.GetText()
|
||||
|
||||
m.persistMessage(quoteReqID, parseQuoteAck(msg))
|
||||
|
||||
if status != enum.QuoteAckStatus_ACCEPTED {
|
||||
err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text)
|
||||
slog.Error(err.Error())
|
||||
|
||||
m.tradesMu.Lock()
|
||||
delete(m.trades, quoteReqID)
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
|
||||
}
|
||||
|
||||
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
|
||||
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
|
||||
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
quoteRespID, _ := msg.GetQuoteRespID()
|
||||
|
||||
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
|
||||
|
||||
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
|
||||
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
|
||||
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
|
||||
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
|
||||
|
||||
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
|
||||
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
|
||||
|
||||
// Always send ACK regardless of whether the trade is in our map.
|
||||
// TW will keep retrying until it receives an ACK.
|
||||
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
|
||||
ackErr = tracerr.Errorf("handleQuoteResponse: failed to send ACK (quoteReqID=%s, quoteRespID=%s): %w", quoteReqID, quoteRespID, ackErr)
|
||||
slog.Error(ackErr.Error())
|
||||
return
|
||||
}
|
||||
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
|
||||
|
||||
// Persist incoming QuoteResponse.
|
||||
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
|
||||
|
||||
// Persist outgoing ACK.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteRespID": quoteRespID,
|
||||
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
|
||||
}))
|
||||
|
||||
// _TRDSUMM is the final message — clean up the trade.
|
||||
if isTrdSumm {
|
||||
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
|
||||
m.tradesMu.Lock()
|
||||
delete(m.trades, quoteReqID)
|
||||
m.tradesMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// handleExecutionReport handles an incoming ExecutionReport (35=8).
|
||||
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
|
||||
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
|
||||
execID, _ := msg.GetExecID()
|
||||
orderID, _ := msg.GetOrderID()
|
||||
clOrdID, _ := msg.GetClOrdID()
|
||||
execType, _ := msg.GetExecType()
|
||||
ordStatus, _ := msg.GetOrdStatus()
|
||||
listID, _ := msg.GetListID()
|
||||
|
||||
slog.Info("handleExecutionReport received",
|
||||
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
|
||||
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
|
||||
)
|
||||
|
||||
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
|
||||
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
|
||||
ackErr = tracerr.Errorf("handleExecutionReport: failed to send ExecutionAck (execID=%s): %w", execID, ackErr)
|
||||
slog.Error(ackErr.Error())
|
||||
} else {
|
||||
slog.Info("ExecutionAck sent", "execID", execID)
|
||||
}
|
||||
|
||||
switch {
|
||||
case strings.Contains(execID, "_LISTEND"):
|
||||
slog.Info("List ended (due-in closed), awaiting trade result from TW",
|
||||
"execID", execID, "clOrdID", clOrdID)
|
||||
|
||||
case strings.Contains(execID, "_TRDEND"):
|
||||
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
|
||||
|
||||
case strings.Contains(execID, "_TRDSUMM"):
|
||||
slog.Info("Trade summary received from TW, cleaning up",
|
||||
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
|
||||
|
||||
m.tradesMu.Lock()
|
||||
delete(m.trades, clOrdID)
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
case execType == enum.ExecType_TRADE:
|
||||
slog.Info("Trade result received from TW",
|
||||
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
|
||||
}
|
||||
|
||||
// Persist incoming ExecutionReport.
|
||||
m.persistMessage(clOrdID, parseExecutionReport(msg))
|
||||
|
||||
// Persist outgoing ExecutionAck.
|
||||
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
|
||||
"OrderID": orderID,
|
||||
"ExecID": execID,
|
||||
"ClOrdID": clOrdID,
|
||||
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
|
||||
}))
|
||||
}
|
||||
|
||||
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
|
||||
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
|
||||
// Logged in application.go, no further action needed.
|
||||
}
|
||||
|
||||
// GetTrades returns a snapshot of all active trades.
|
||||
func (m *Manager) GetTrades() []domain.ListTrade {
|
||||
m.tradesMu.RLock()
|
||||
defer m.tradesMu.RUnlock()
|
||||
|
||||
trades := make([]domain.ListTrade, 0, len(m.trades))
|
||||
for _, t := range m.trades {
|
||||
trades = append(trades, toDomainListTrade(t))
|
||||
}
|
||||
|
||||
return trades
|
||||
}
|
||||
|
||||
// GetPendingQuoteRequests returns trades that have received a QuoteRequest but not yet been quoted by the dealer.
|
||||
func (m *Manager) GetPendingQuoteRequests() []domain.ListTrade {
|
||||
m.tradesMu.RLock()
|
||||
defer m.tradesMu.RUnlock()
|
||||
|
||||
pending := make([]domain.ListTrade, 0)
|
||||
for _, t := range m.trades {
|
||||
if !t.Quoted {
|
||||
pending = append(pending, toDomainListTrade(t))
|
||||
}
|
||||
}
|
||||
|
||||
return pending
|
||||
}
|
||||
|
||||
func toDomainListTrade(t *listTrade) domain.ListTrade {
|
||||
return domain.ListTrade{
|
||||
QuoteReqID: t.QuoteReqID,
|
||||
ListID: t.ListID,
|
||||
Symbol: t.Symbol,
|
||||
SecurityIDSrc: string(t.SecurityIDSrc),
|
||||
Currency: t.Currency,
|
||||
Side: string(t.Side),
|
||||
OrderQty: t.OrderQty.String(),
|
||||
SettlDate: t.SettlDate,
|
||||
Price: t.Price.String(),
|
||||
OwnerTraderID: t.OwnerTraderID,
|
||||
}
|
||||
}
|
||||
|
||||
// SendQuote builds and sends a Quote (35=S) for an existing pending QuoteRequest at the given price.
|
||||
func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
|
||||
m.tradesMu.Lock()
|
||||
t, ok := m.trades[quoteReqID]
|
||||
if !ok {
|
||||
m.tradesMu.Unlock()
|
||||
err := tracerr.Errorf("SendQuote: quoteReqID %s not found", quoteReqID)
|
||||
slog.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
if t.Quoted {
|
||||
m.tradesMu.Unlock()
|
||||
err := tracerr.Errorf("SendQuote: quote already sent for quoteReqID %s", quoteReqID)
|
||||
slog.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
sessionID := t.SessionID
|
||||
if sessionID == (quickfix.SessionID{}) {
|
||||
sessionID = m.anyActiveSessionID()
|
||||
if sessionID == (quickfix.SessionID{}) {
|
||||
m.tradesMu.Unlock()
|
||||
err := tracerr.Errorf("SendQuote: no active FIX session for quoteReqID %s", quoteReqID)
|
||||
slog.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
symbol := t.Symbol
|
||||
sIDSource := t.SecurityIDSrc
|
||||
currency := t.Currency
|
||||
side := t.Side
|
||||
orderQty := t.OrderQty
|
||||
settlDate := t.SettlDate
|
||||
ownerTraderID := t.OwnerTraderID
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
quoteID := quoteReqID
|
||||
q := quote.New(
|
||||
field.NewQuoteID(quoteID),
|
||||
@ -305,41 +531,20 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
|
||||
}
|
||||
|
||||
if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil {
|
||||
slog.Error("handleQuoteRequest: failed to send quote", "quoteReqID", quoteReqID, "error", sendErr.Error())
|
||||
return
|
||||
sendErr = tracerr.Errorf("SendQuote: failed to send quote (quoteReqID=%s): %w", quoteReqID, sendErr)
|
||||
slog.Error(sendErr.Error())
|
||||
return sendErr
|
||||
}
|
||||
|
||||
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol)
|
||||
|
||||
// Store trade state for subsequent steps.
|
||||
m.tradesMu.Lock()
|
||||
m.trades[quoteReqID] = &listTrade{
|
||||
QuoteReqID: quoteReqID,
|
||||
ListID: listID,
|
||||
Symbol: symbol,
|
||||
SecurityIDSrc: sIDSource,
|
||||
Currency: currency,
|
||||
Side: side,
|
||||
OrderQty: orderQty,
|
||||
SettlDate: settlDate,
|
||||
Price: price,
|
||||
OwnerTraderID: ownerTraderID,
|
||||
SessionID: sessionID,
|
||||
Status: domain.TradeStatusActive,
|
||||
if t, ok := m.trades[quoteReqID]; ok {
|
||||
t.Price = price
|
||||
t.Quoted = true
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
// Persist structured message (outside mutex).
|
||||
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
|
||||
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String())
|
||||
|
||||
// Persist outgoing QuoteStatusReport.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
|
||||
"OwnerTraderID": ownerTraderID,
|
||||
}))
|
||||
|
||||
// Persist outgoing Quote.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteID": quoteID,
|
||||
@ -350,206 +555,17 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
|
||||
"Currency": currency,
|
||||
"SettlDate": settlDate,
|
||||
}))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleQuoteAck handles an incoming QuoteAck (35=CW).
|
||||
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
status, _ := msg.GetQuoteAckStatus()
|
||||
text, _ := msg.GetText()
|
||||
|
||||
m.persistMessage(quoteReqID, parseQuoteAck(msg))
|
||||
|
||||
// QuoteAckStatus only has two defined values in TW DPL:
|
||||
// 1 = Accepted — quote delivered to client.
|
||||
// 2 = Rejected — format error, late quote, viewer busy, or race condition.
|
||||
if status == enum.QuoteAckStatus_REJECTED {
|
||||
slog.Error("handleQuoteAck: quote rejected by TW", "quoteReqID", quoteReqID, "quoteAckStatus", string(status), "text", text)
|
||||
|
||||
m.tradesMu.Lock()
|
||||
if t, ok := m.trades[quoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusRejected
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
|
||||
}
|
||||
|
||||
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
|
||||
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
|
||||
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
quoteRespID, _ := msg.GetQuoteRespID()
|
||||
|
||||
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
|
||||
|
||||
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
|
||||
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
|
||||
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
|
||||
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
|
||||
|
||||
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
|
||||
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
|
||||
|
||||
// Always send ACK regardless of whether the trade is in our map.
|
||||
// TW will keep retrying until it receives an ACK.
|
||||
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
|
||||
slog.Error("handleQuoteResponse: failed to send ACK", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID, "error", ackErr.Error())
|
||||
return
|
||||
}
|
||||
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
|
||||
|
||||
// Persist incoming QuoteResponse.
|
||||
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
|
||||
|
||||
// Persist outgoing ACK.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteRespID": quoteRespID,
|
||||
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
|
||||
}))
|
||||
|
||||
// _TRDSUMM is the final message — mark trade as completed.
|
||||
if isTrdSumm {
|
||||
slog.Info("Trade summary received, marking completed", "quoteReqID", quoteReqID)
|
||||
m.tradesMu.Lock()
|
||||
if t, ok := m.trades[quoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// handleExecutionReport handles an incoming ExecutionReport (35=8).
|
||||
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
|
||||
// In TW DPL, ClOrdID (Tag 11) always equals the original QuoteReqID (Tag 131),
|
||||
// so we use clOrdID directly as the map lookup key.
|
||||
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
|
||||
execID, _ := msg.GetExecID()
|
||||
orderID, _ := msg.GetOrderID()
|
||||
clOrdID, _ := msg.GetClOrdID()
|
||||
execType, _ := msg.GetExecType()
|
||||
ordStatus, _ := msg.GetOrdStatus()
|
||||
listID, _ := msg.GetListID()
|
||||
|
||||
slog.Info("handleExecutionReport received",
|
||||
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
|
||||
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
|
||||
)
|
||||
|
||||
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
|
||||
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
|
||||
slog.Error("handleExecutionReport: failed to send ExecutionAck", "execID", execID, "error", ackErr.Error())
|
||||
} else {
|
||||
slog.Info("ExecutionAck sent", "execID", execID)
|
||||
}
|
||||
|
||||
switch {
|
||||
case strings.Contains(execID, "_LISTEND"):
|
||||
slog.Info("List ended (due-in closed), awaiting trade result from TW",
|
||||
"execID", execID, "clOrdID", clOrdID)
|
||||
|
||||
case strings.Contains(execID, "_TRDEND"):
|
||||
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
|
||||
|
||||
case strings.Contains(execID, "_TRDSUMM"):
|
||||
slog.Info("Trade summary received from TW, marking completed",
|
||||
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
|
||||
|
||||
m.tradesMu.Lock()
|
||||
if t, ok := m.trades[clOrdID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
case execType == enum.ExecType_TRADE:
|
||||
slog.Info("Trade result received from TW",
|
||||
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
|
||||
}
|
||||
|
||||
// Persist incoming ExecutionReport.
|
||||
m.persistMessage(clOrdID, parseExecutionReport(msg))
|
||||
|
||||
// Persist outgoing ExecutionAck.
|
||||
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
|
||||
"OrderID": orderID,
|
||||
"ExecID": execID,
|
||||
"ClOrdID": clOrdID,
|
||||
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
|
||||
}))
|
||||
|
||||
// If the ExecutionReport carries a TradeID, persist it in qfixdpl_logs for cross-service correlation.
|
||||
tradeID, _ := msg.GetTradeID()
|
||||
if tradeID != "" {
|
||||
m.tradesMu.Lock()
|
||||
if t, ok := m.trades[clOrdID]; ok {
|
||||
t.TradeID = tradeID
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
if err := m.store.UpdateLogTradeID(clOrdID, tradeID); err != nil {
|
||||
slog.Error("handleExecutionReport: failed to update log trade_id", "clOrdID", clOrdID, "tradeID", tradeID, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
|
||||
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
|
||||
// Logged in application.go, no further action needed.
|
||||
}
|
||||
|
||||
// GetTrades returns a snapshot of only active trades.
|
||||
func (m *Manager) GetTrades() []domain.ListTrade {
|
||||
m.tradesMu.RLock()
|
||||
defer m.tradesMu.RUnlock()
|
||||
|
||||
trades := make([]domain.ListTrade, 0, len(m.trades))
|
||||
for _, t := range m.trades {
|
||||
if t.Status != domain.TradeStatusActive {
|
||||
continue
|
||||
}
|
||||
|
||||
trades = append(trades, toListTrade(t))
|
||||
}
|
||||
|
||||
return trades
|
||||
}
|
||||
|
||||
// GetAllTrades returns a snapshot of all trades (active, rejected, completed).
|
||||
func (m *Manager) GetAllTrades() []domain.ListTrade {
|
||||
m.tradesMu.RLock()
|
||||
defer m.tradesMu.RUnlock()
|
||||
|
||||
trades := make([]domain.ListTrade, 0, len(m.trades))
|
||||
for _, t := range m.trades {
|
||||
trades = append(trades, toListTrade(t))
|
||||
}
|
||||
|
||||
return trades
|
||||
}
|
||||
|
||||
func toListTrade(t *listTrade) domain.ListTrade {
|
||||
return domain.ListTrade{
|
||||
QuoteReqID: t.QuoteReqID,
|
||||
TradeID: t.TradeID,
|
||||
ListID: t.ListID,
|
||||
Symbol: t.Symbol,
|
||||
SecurityIDSrc: string(t.SecurityIDSrc),
|
||||
Currency: t.Currency,
|
||||
Side: string(t.Side),
|
||||
OrderQty: t.OrderQty.String(),
|
||||
SettlDate: t.SettlDate,
|
||||
Price: t.Price.String(),
|
||||
OwnerTraderID: t.OwnerTraderID,
|
||||
Status: t.Status,
|
||||
func (m *Manager) anyActiveSessionID() quickfix.SessionID {
|
||||
m.sessionsMu.RLock()
|
||||
defer m.sessionsMu.RUnlock()
|
||||
for _, s := range m.sessions {
|
||||
return s
|
||||
}
|
||||
return quickfix.SessionID{}
|
||||
}
|
||||
|
||||
// handleRawMessage persists raw FIX message strings to the logs table.
|
||||
@ -560,7 +576,8 @@ func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
|
||||
QuoteReqID: quoteReqID,
|
||||
RawMsg: "[" + direction + "] " + msg.String(),
|
||||
}); err != nil {
|
||||
slog.Error("failed to persist raw log", "error", err)
|
||||
err = tracerr.Errorf("failed to persist raw log: %w", err)
|
||||
slog.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@ -570,18 +587,19 @@ func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSO
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: fixJSON,
|
||||
}); err != nil {
|
||||
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "quoteReqID", quoteReqID, "error", err)
|
||||
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", fixJSON.MsgType, quoteReqID, err)
|
||||
slog.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// loadTrades reconstructs all trades and their states from today's messages in the database.
|
||||
func (m *Manager) loadTrades() error {
|
||||
// loadActiveTrades reconstructs active trades from today's messages in the database.
|
||||
func (m *Manager) loadActiveTrades() error {
|
||||
messages, err := m.store.GetTodayMessages()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
trades := make(map[string]*listTrade)
|
||||
activeTrades := make(map[string]*listTrade)
|
||||
|
||||
for _, msg := range messages {
|
||||
switch msg.JMessage.MsgType {
|
||||
@ -605,13 +623,16 @@ func (m *Manager) loadTrades() error {
|
||||
trade := &listTrade{
|
||||
QuoteReqID: msg.QuoteReqID,
|
||||
ListID: listID,
|
||||
Status: domain.TradeStatusActive,
|
||||
}
|
||||
|
||||
if v, ok := body["SecurityID"].(string); ok {
|
||||
trade.Symbol = v
|
||||
}
|
||||
|
||||
if v, ok := body["SecurityIDSource"].(string); ok {
|
||||
trade.SecurityIDSrc = enum.SecurityIDSource(v)
|
||||
}
|
||||
|
||||
if v, ok := body["Currency"].(string); ok {
|
||||
trade.Currency = v
|
||||
}
|
||||
@ -632,16 +653,22 @@ func (m *Manager) loadTrades() error {
|
||||
trade.OwnerTraderID = v
|
||||
}
|
||||
|
||||
trades[msg.QuoteReqID] = trade
|
||||
activeTrades[msg.QuoteReqID] = trade
|
||||
|
||||
case "CW": // QuoteAck — only status "2" (Rejected) marks the trade as rejected
|
||||
case "S": // Outgoing Quote — dealer has already quoted this trade
|
||||
if t, ok := activeTrades[msg.QuoteReqID]; ok {
|
||||
t.Quoted = true
|
||||
if v, ok := msg.JMessage.Body["Price"].(string); ok {
|
||||
t.Price, _ = decimal.NewFromString(v)
|
||||
}
|
||||
}
|
||||
|
||||
case "CW": // QuoteAck — if rejected, trade is dead
|
||||
body := msg.JMessage.Body
|
||||
quoteAckStatus, _ := body["QuoteAckStatus"].(string)
|
||||
|
||||
if quoteAckStatus == string(enum.QuoteAckStatus_REJECTED) {
|
||||
if t, ok := trades[msg.QuoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusRejected
|
||||
}
|
||||
if quoteAckStatus != string(enum.QuoteAckStatus_ACCEPTED) {
|
||||
delete(activeTrades, msg.QuoteReqID)
|
||||
}
|
||||
|
||||
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
|
||||
@ -649,9 +676,7 @@ func (m *Manager) loadTrades() error {
|
||||
quoteRespID, _ := body["QuoteRespID"].(string)
|
||||
|
||||
if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
|
||||
if t, ok := trades[msg.QuoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
delete(activeTrades, msg.QuoteReqID)
|
||||
}
|
||||
|
||||
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
|
||||
@ -659,29 +684,14 @@ func (m *Manager) loadTrades() error {
|
||||
execID, _ := body["ExecID"].(string)
|
||||
clOrdID, _ := body["ClOrdID"].(string)
|
||||
|
||||
if tid, ok := body["TradeID"].(string); ok && tid != "" {
|
||||
if t, ok := trades[clOrdID]; ok {
|
||||
t.TradeID = tid
|
||||
}
|
||||
}
|
||||
|
||||
if strings.Contains(execID, "_TRDSUMM") {
|
||||
if t, ok := trades[clOrdID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
delete(activeTrades, clOrdID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
active := 0
|
||||
for _, t := range trades {
|
||||
if t.Status == domain.TradeStatusActive {
|
||||
active++
|
||||
}
|
||||
}
|
||||
|
||||
m.trades = trades
|
||||
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
|
||||
m.trades = activeTrades
|
||||
slog.Info("recovery completed", "activeTrades", len(activeTrades))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1,647 +0,0 @@
|
||||
package fix
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"quantex.com/qfixdpl/quickfix"
|
||||
"quantex.com/qfixdpl/src/app"
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// newTestManager builds a Manager with the given store without calling Start().
|
||||
func newTestManager(store domain.PersistenceStore) *Manager {
|
||||
notify := &MockNotifier{}
|
||||
return NewManager(app.FIXConfig{}, store, notify)
|
||||
}
|
||||
|
||||
// makeMsg builds a TradeMessage with the given quoteReqID, msgType, and body.
|
||||
func makeMsg(quoteReqID, msgType string, body map[string]interface{}) domain.TradeMessage {
|
||||
return domain.TradeMessage{
|
||||
ID: "test-id-" + quoteReqID + "-" + msgType,
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: domain.FixMessageJSON{
|
||||
MsgType: msgType,
|
||||
QuoteReqID: quoteReqID,
|
||||
Body: body,
|
||||
ReceiveTime: time.Now(),
|
||||
},
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// makeQuoteRequest builds a "R" (QuoteRequest) TradeMessage.
|
||||
func makeQuoteRequest(quoteReqID, listID, nt string, extras map[string]interface{}) domain.TradeMessage {
|
||||
body := map[string]interface{}{
|
||||
"NegotiationType": nt,
|
||||
"ListID": listID,
|
||||
}
|
||||
for k, v := range extras {
|
||||
body[k] = v
|
||||
}
|
||||
return makeMsg(quoteReqID, "R", body)
|
||||
}
|
||||
|
||||
// makeQuoteAck builds a "CW" (QuoteAck) TradeMessage.
|
||||
func makeQuoteAck(quoteReqID, status string) domain.TradeMessage {
|
||||
return makeMsg(quoteReqID, "CW", map[string]interface{}{
|
||||
"QuoteAckStatus": status,
|
||||
})
|
||||
}
|
||||
|
||||
// makeQuoteResponse builds an "AJ" (QuoteResponse) TradeMessage.
|
||||
func makeQuoteResponse(quoteReqID, quoteRespID string) domain.TradeMessage {
|
||||
return makeMsg(quoteReqID, "AJ", map[string]interface{}{
|
||||
"QuoteRespID": quoteRespID,
|
||||
})
|
||||
}
|
||||
|
||||
// makeExecutionReport builds an "8" (ExecutionReport) TradeMessage.
|
||||
// clOrdID maps to body["ClOrdID"]; it is also set as TradeMessage.QuoteReqID
|
||||
// to mirror how persistMessage works in handleExecutionReport.
|
||||
func makeExecutionReport(clOrdID, execID string) domain.TradeMessage {
|
||||
return makeMsg(clOrdID, "8", map[string]interface{}{
|
||||
"ClOrdID": clOrdID,
|
||||
"ExecID": execID,
|
||||
})
|
||||
}
|
||||
|
||||
// makeOutgoing builds an outgoing message (AI, S, BN) for a given quoteReqID.
|
||||
func makeOutgoing(quoteReqID, msgType string) domain.TradeMessage {
|
||||
return makeMsg(quoteReqID, msgType, map[string]interface{}{})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 1 — DB vacia
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_EmptyDB(t *testing.T) {
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, m.trades)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 2 — Interrupcion despues de QuoteRequest
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_SingleR_CreatesOneTrade(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", map[string]interface{}{
|
||||
"SecurityID": "US1234567890",
|
||||
"Currency": "USD",
|
||||
"Side": "1",
|
||||
"OrderQty": "1000000",
|
||||
"SettlDate": "20260320",
|
||||
"OwnerTraderID": "trader1",
|
||||
})
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, m.trades, 1)
|
||||
|
||||
trade := m.trades["LST_ABC123"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, "LST_ABC123", trade.QuoteReqID)
|
||||
assert.Equal(t, "LIST_1", trade.ListID)
|
||||
assert.Equal(t, "US1234567890", trade.Symbol)
|
||||
assert.Equal(t, "USD", trade.Currency)
|
||||
assert.Equal(t, "1", string(trade.Side))
|
||||
assert.Equal(t, "20260320", trade.SettlDate)
|
||||
assert.Equal(t, "trader1", trade.OwnerTraderID)
|
||||
assert.Equal(t, domain.TradeStatusActive, trade.Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_WithOutgoingMsgs_IgnoresAI_S(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
ai := makeOutgoing("LST_ABC123", "AI")
|
||||
s := makeOutgoing("LST_ABC123", "S")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, ai, s}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_NonLSTPrefix_Ignored(t *testing.T) {
|
||||
r := makeQuoteRequest("TRD_ABC123", "LIST_1", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_NonRFQ_NegotiationType_Ignored(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "DEALER", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_EmptyListID_Ignored(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_MissingOptionalFields_TradeCreated(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_MIN", "LIST_MIN", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, m.trades, 1)
|
||||
|
||||
trade := m.trades["LST_MIN"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, "LST_MIN", trade.QuoteReqID)
|
||||
assert.Equal(t, "LIST_MIN", trade.ListID)
|
||||
assert.Equal(t, "", trade.Symbol)
|
||||
assert.Equal(t, "", trade.Currency)
|
||||
assert.Equal(t, "", trade.SettlDate)
|
||||
assert.Equal(t, "", trade.OwnerTraderID)
|
||||
assert.Equal(t, domain.TradeStatusActive, trade.Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 3 — Interrupcion despues de QuoteAck
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_CW_Accepted_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1") // ACCEPTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_CW_Rejected_TradeMarkedRejected(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "2") // REJECTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusRejected, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// QuoteAckStatus "0" is RECEIVED_NOT_YET_PROCESSED — not a rejection.
|
||||
// Only status "2" (Rejected) should mark the trade as rejected.
|
||||
func TestLoadTrades_CW_ReceivedNotYetProcessed_TradeStaysActive(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "0") // RECEIVED_NOT_YET_PROCESSED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_CW_WithoutPriorR_NoCrash(t *testing.T) {
|
||||
cw := makeQuoteAck("LST_ORPHAN", "2")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 4 — Interrupcion despues de QuoteResponse
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_AJ_TRDREQ_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDREQ")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_TRDEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_LISTEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_LISTEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
|
||||
aj := makeQuoteResponse("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 5 — Interrupcion despues de ExecutionReport
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_ExecReport_LISTEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_ABC123", "1")
|
||||
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
|
||||
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_ExecReport_TRDEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_ABC123", "1")
|
||||
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
|
||||
execListEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
|
||||
execTrdEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, execListEnd, execTrdEnd}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_ExecReport_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_ABC123", "1")
|
||||
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
|
||||
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_ExecReport_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
|
||||
exec := makeExecutionReport("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 6 — Multiples trades en paralelo
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_TwoTrades_BothActive(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
|
||||
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
|
||||
cw1 := makeQuoteAck("LST_TRADE1", "1")
|
||||
cw2 := makeQuoteAck("LST_TRADE2", "1")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, cw1, cw2}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 2)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE1"].Status)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_TwoTrades_OneCompleted_OneActive(t *testing.T) {
|
||||
// TRADE1: fully completed via flow 8.4 (_TRDSUMM execution report)
|
||||
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
|
||||
cw1 := makeQuoteAck("LST_TRADE1", "1")
|
||||
aj1 := makeQuoteResponse("LST_TRADE1", "LST_TRADE1_TRDREQ")
|
||||
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
|
||||
|
||||
// TRADE2: still active
|
||||
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
|
||||
cw2 := makeQuoteAck("LST_TRADE2", "1")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return(
|
||||
[]domain.TradeMessage{r1, cw1, aj1, exec1, r2, cw2}, nil,
|
||||
)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 2)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_TwoTrades_BothCompleted(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
|
||||
cw1 := makeQuoteAck("LST_TRADE1", "1")
|
||||
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
|
||||
|
||||
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
|
||||
cw2 := makeQuoteAck("LST_TRADE2", "1")
|
||||
exec2 := makeExecutionReport("LST_TRADE2", "LST_TRADE2_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return(
|
||||
[]domain.TradeMessage{r1, cw1, exec1, r2, cw2, exec2}, nil,
|
||||
)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 2)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE2"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 7 — SessionID se asigna en onLogon (solo a trades activos)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_SessionID_IsZeroAfterRecovery(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, m.trades, 1)
|
||||
|
||||
trade := m.trades["LST_X"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, quickfix.SessionID{}, trade.SessionID)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestOnLogon_AssignsSessionToRecoveredActiveTrades(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
sessionID := quickfix.SessionID{
|
||||
BeginString: "FIXT.1.1",
|
||||
SenderCompID: "QFIXDPL",
|
||||
TargetCompID: "TRADEWEB",
|
||||
}
|
||||
m.onLogon(sessionID)
|
||||
|
||||
trade := m.trades["LST_X"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, sessionID, trade.SessionID)
|
||||
}
|
||||
|
||||
func TestOnLogon_DoesNotAssignSessionToCompletedTrades(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
|
||||
exec := makeExecutionReport("LST_X", "LST_X_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
sessionID := quickfix.SessionID{
|
||||
BeginString: "FIXT.1.1",
|
||||
SenderCompID: "QFIXDPL",
|
||||
TargetCompID: "TRADEWEB",
|
||||
}
|
||||
m.onLogon(sessionID)
|
||||
|
||||
trade := m.trades["LST_X"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, quickfix.SessionID{}, trade.SessionID, "completed trades should not get session assigned")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 8 — GetTrades filtra por active, GetAllTrades devuelve todos
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestGetTrades_ReturnsOnlyActive(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
|
||||
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
|
||||
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
|
||||
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
trades := m.GetTrades()
|
||||
assert.Len(t, trades, 1)
|
||||
assert.Equal(t, "LST_ACTIVE", trades[0].QuoteReqID)
|
||||
assert.Equal(t, domain.TradeStatusActive, trades[0].Status)
|
||||
}
|
||||
|
||||
func TestGetAllTrades_ReturnsAll(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
|
||||
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
|
||||
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
|
||||
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
trades := m.GetAllTrades()
|
||||
assert.Len(t, trades, 3)
|
||||
|
||||
statusMap := map[string]domain.TradeStatus{}
|
||||
for _, tr := range trades {
|
||||
statusMap[tr.QuoteReqID] = tr.Status
|
||||
}
|
||||
|
||||
assert.Equal(t, domain.TradeStatusActive, statusMap["LST_ACTIVE"])
|
||||
assert.Equal(t, domain.TradeStatusCompleted, statusMap["LST_DONE"])
|
||||
assert.Equal(t, domain.TradeStatusRejected, statusMap["LST_REJ"])
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 9 — Error en store
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_StoreError_ReturnsError_MapEmpty(t *testing.T) {
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage(nil), errors.New("db connection failed"))
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.NotNil(t, m.trades)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
@ -1,56 +0,0 @@
|
||||
package fix
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
// MockPersistenceStore is a testify mock implementing domain.PersistenceStore.
|
||||
type MockPersistenceStore struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) SaveMessage(msg domain.TradeMessage) error {
|
||||
args := m.Called(msg)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) SaveLog(entry domain.LogEntry) error {
|
||||
args := m.Called(entry)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
args := m.Called()
|
||||
msgs, _ := args.Get(0).([]domain.TradeMessage)
|
||||
return msgs, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
|
||||
args := m.Called(quoteReqID)
|
||||
logs, _ := args.Get(0).(domain.Logs)
|
||||
return logs, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) UpdateLogTradeID(quoteReqID, tradeID string) error {
|
||||
args := m.Called(quoteReqID, tradeID)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
|
||||
args := m.Called(quoteReqID)
|
||||
log, _ := args.Get(0).(domain.FullTradeLog)
|
||||
return log, args.Error(1)
|
||||
}
|
||||
|
||||
// MockNotifier is a testify mock implementing domain.Notifier.
|
||||
type MockNotifier struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockNotifier) SendMsg(chat domain.MessageChannel, text string, status domain.MessageStatus, wg *sync.WaitGroup) {
|
||||
m.Called(chat, text, status, wg)
|
||||
}
|
||||
@ -185,9 +185,6 @@ func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessage
|
||||
if v, e := msg.GetSettlDate(); e == nil {
|
||||
body["SettlDate"] = v
|
||||
}
|
||||
if v, e := msg.GetTradeID(); e == nil {
|
||||
body["TradeID"] = v
|
||||
}
|
||||
|
||||
return domain.FixMessageJSON{
|
||||
Direction: "IN",
|
||||
|
||||
104
src/client/fix/protocol.txt
Normal file
104
src/client/fix/protocol.txt
Normal file
@ -0,0 +1,104 @@
|
||||
Step 1
|
||||
Direction: ← QuoteRequest (R)
|
||||
Comentary: Tradeweb sends trade information to dealer.
|
||||
FIX Message: 8=FIXT.1.1 9=869 35=R 34=2 49=TRADEWEB 52=20160401-16:53:19.992 56=TW1_CORI_TEST_12345_DLRDPL 131=LST_20160401_TW1_CORI_NY302485.18_1 146=1 55=AMXLMM 2.375 09/08/16 48=02364WBC8 22=1 460=12 167=CORP 762=REGCORIPRC 541=20160908 225=20110908 470=MX 223=2.375 106=AMERICA MOVIL SAB DE CV 54=2 38=1000000 64=20160406 15=USD 6110=Comms 60=20160401-16:53:19 662=99.98046875 22570=0.76 663=1 699=912828UR9 761=1 423=6 44=-999999 5023=0.001000 66=NY302485.18 6847=1 75=20160401 464=Y 20086=1 20074=Y 20075=Y 20077=LatAm Comms 20078=pddealer 20079=60 20081=60 20090=60 20072=60 20098=60 5745=1 20073=RFQ 20076=Y 20156=N 20130=2000000000 20138=JPM,MER 20175=20120308 2115=0 22630=0 20265=LatAm 453=3 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=1 523=Tradeweb0001 803=4002 448=DTCC 447=C 452=4 5114=2 5113=1 20169=A2 5113=0 20169=A- 10=121
|
||||
|
||||
Step 2
|
||||
Direction: QuoteStatusReport (AI) →
|
||||
Comentary: Dealer acknowledges trade.
|
||||
FIX Message: 8=FIXT.1.1 9=198 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=2 52=20160401-16:53:19.988 131=LST_20160401_TW1_CORI_NY302485.18_1 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:53:19.988 297=0 10=106
|
||||
|
||||
Step 3
|
||||
Direction: Quote (S) →
|
||||
Comentary: Dealer sends quote.
|
||||
FIX Message: 8=FIXT.1.1 9=231 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=3 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 6153=emackdlr 44=.99 423=6 60=20160401-16:53:19.990 10=247
|
||||
|
||||
Step 4
|
||||
Direction: ← QuoteAck (CW)
|
||||
Comentary: Tradeweb acknowledges quote.
|
||||
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=3 49=TRADEWEB 52=20160401-16:53:25.102 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:25 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
|
||||
|
||||
Step 5
|
||||
Direction: Quote (S) →
|
||||
Comentary: Dealer sends quote.
|
||||
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=4 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.97 423=6 60=20160401-16:53:19.990 10=114
|
||||
|
||||
Step 6
|
||||
Direction: ← QuoteAck (CW)
|
||||
Comentary: Tradeweb acknowledges quote.
|
||||
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=4 49=TRADEWEB 52=20160401-16:53:30.055 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:30 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
|
||||
|
||||
Step 7
|
||||
Direction: Quote (S) →
|
||||
Comentary: Dealer sends quote.
|
||||
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=6 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.98 423=6 60=20160401-16:53:19.990 10=117
|
||||
|
||||
Step 8
|
||||
Direction: ← QuoteAck (CW)
|
||||
Commentary: Tradeweb acknowledges quote.
|
||||
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=5 49=TRADEWEB 52=20160401-16:53:35.071 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:35 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=163
|
||||
|
||||
Step 9
|
||||
Direction: ← ExecutionReport (8)
|
||||
Commentary: Tradeweb notifies dealer that the list trading (Due In time) has ended. ExecType=A
|
||||
FIX Message: 8=FIXT.1.1 9=289 35=8 34=7 49=TRADEWEB 52=20160401-16:54:19.463 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.463 37=LST_20160401_TW1_CORI_NY302485.18_1 39=D 55=[N/A] 60=20160401-16:54:19 75=20160401 150=I 151=0 20086=1 10=073
|
||||
|
||||
Step 10
|
||||
Direction: ExecutionAck (BN) →
|
||||
Commentary: Dealer acknowledges ExecutionReport message.
|
||||
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=8 52=20160401-16:54:19.448 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.448 55=[N/A] 60=20160401-16:54:19.448 10=119
|
||||
|
||||
Step 11
|
||||
Direction: ← QuoteResponse (AJ)
|
||||
Commentary: Customer lifts after good-for time expires. Tradeweb informs dealer customer lift.
|
||||
FIX Message: 8=FIXT.1.1 9=431 35=AJ 34=10 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 22=1 38=1000000 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:32 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 423=6 662=99.98046875 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 694=1 2115=1 20074=Y 20075=N 20076=N 20079=60 20082=60 20156=N 22570=0.760289080299 22630=0 10=183
|
||||
|
||||
Step 12
|
||||
Direction: QuoteStatusReport (AI) →
|
||||
Commentary: Dealer acknowledges customer’s trade acceptance.
|
||||
FIX Message: 8=FIXT.1.1 9=206 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=12 52=20160401-16:55:32.849 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 131=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:32.849 297=0 10=189
|
||||
|
||||
Step 13
|
||||
Direction: ← ExecutionReport (8)
|
||||
Commentary: Tradeweb notifies the dealer that list has ended
|
||||
FIX Message: incoming8=FIXT.1.1 9=290 35=8 34=11 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.855 37=LST_20160401_TW1_CORI_NY302485.18_1 39=A 55=[N/A] 60=20160401-16:55:32 75=20160401 150=A 151=0 20086=1 10=095
|
||||
|
||||
Step 14
|
||||
Direction: ExecutionAck (BN) →
|
||||
Commentary: Dealer acknowledges ExecutionReport message.
|
||||
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=13 52=20160401-16:55:32.852 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.852 55=[N/A] 60=20160401-16:55:32.852 10=155
|
||||
|
||||
Step 15
|
||||
Direction: ExecutionReport (8) →
|
||||
Commentary: Dealer accepts – re-quote is not allowed. ExecType=F, OrdStatus=Filled
|
||||
FIX Message: 8=FIXT.1.1 9=283 35=8 34=14 49=TW1_CORI_TEST_12345_DLRDPL 52=20160401-16:55:32.850 56=TRADEWEB 6=0 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 54=2 55=[N/A] 150=F 151=0 6153=LST_20160401_TW1_CORI_NY302485.18_1 22631=POSTTRADE_STRING 22632=POSTTRADE_STRING 10=155
|
||||
|
||||
Step 16
|
||||
Direction: ← ExecutionAck (BN)
|
||||
Commentary: Tradeweb acknowledges ExecutionReport
|
||||
FIX Message: 8=FIXT.1.1 9=233 35=BN 34=12 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:37 1036=1 10=051
|
||||
|
||||
Step 17
|
||||
Direction: ← ExecutionReport (8)
|
||||
Commentary: Tradeweb informs Dealer of outcome of a list trade item. OrdStatus=Filled, ExecType=Trade
|
||||
FIX Message: 8=FIXT.1.1 9=337 35=8 34=13 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 44=0.98 54=2 55=[N/A] 60=20160401-16:55:37 75=20160401 150=F 151=0 423=6 662=99.98046875 22570=0.760289080299 10=067
|
||||
|
||||
Step 18
|
||||
Direction: ExecutionAck (BN) →
|
||||
Comentary: Dealer acknowledges ExecutionReport message.
|
||||
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=15 52=20160401-16:55:37.910 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 55=[N/A] 60=20160401-16:55:37.910 10=078
|
||||
|
||||
Step 19
|
||||
Direction:
|
||||
Comentary: Autospot at Tradeweb Treasury composite.
|
||||
FIX Message:
|
||||
|
||||
Step 20
|
||||
Direction: ← ExecutionReport (8)
|
||||
Comentary: Tradeweb sends ExecutionReport message to confirm final outcome of list trade item including applicable cover quote, spot, settlement money information, etc. OrdStatus=Filled, ExecType=Order Status TradeSummary = Y
|
||||
FIX Message: 8=FIXT.1.1 9=733 35=8 34=14 49=TRADEWEB 52=20160401-16:55:40.292 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 22=1 37=LST_20160401_TW1_CORI_NY302485.18_1 38=1000000 39=2 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:40 64=20160406 75=20160401 150=F 151=0 167=CORP 236=1.74 423=6 453=2 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=2 523=Tradeweb0001 803=4002 523=YES 803=4003 526=TRD_20160401_TW1_CORI_23 662=99.98046875 1003=20160401.TW1.CORI.23 6153=emackdlr 6731=20160401.TW1.CORI.23 20115=100.265 20250=225000 22570=0.76 22630=0 22631=POSTTRADE_STRING 22634=160401.DLRX.TRSY.120 22636=Y 10=239
|
||||
|
||||
Step 21
|
||||
Direction: ExecutionAck (BN) →
|
||||
Comentary: Dealer acknowledges ExecutionReport message.
|
||||
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=16 52=20160401-16:55:40.287 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 55=[N/A] 60=20160401-16:55:40.287 10=182
|
||||
@ -1,20 +1,16 @@
|
||||
CREATE TABLE IF NOT EXISTS qfixdpl_messages (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
quote_req_id TEXT NOT NULL,
|
||||
trade_id TEXT,
|
||||
j_message JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_trade_id ON qfixdpl_messages(trade_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS qfixdpl_logs (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
quote_req_id TEXT NOT NULL UNIQUE,
|
||||
trade_id TEXT,
|
||||
raw_msg TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_dpl_logs_trade_id ON qfixdpl_logs(trade_id);
|
||||
|
||||
@ -16,8 +16,8 @@ func (p *Store) SaveMessage(msg domain.TradeMessage) error {
|
||||
}
|
||||
|
||||
_, err = p.db.Exec(
|
||||
"INSERT INTO qfixdpl_messages (quote_req_id, trade_id, j_message) VALUES ($1, NULLIF($2, ''), $3)",
|
||||
msg.QuoteReqID, msg.TradeID, string(jsonBytes),
|
||||
"INSERT INTO qfixdpl_messages (quote_req_id, j_message) VALUES ($1, $2)",
|
||||
msg.QuoteReqID, string(jsonBytes),
|
||||
)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error inserting message: %w", err)
|
||||
@ -43,7 +43,7 @@ func (p *Store) SaveLog(entry domain.LogEntry) error {
|
||||
|
||||
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
rows, err := p.db.Query(
|
||||
"SELECT id, quote_req_id, trade_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
|
||||
"SELECT id, quote_req_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, tracerr.Errorf("error querying today messages: %w", err)
|
||||
@ -55,12 +55,11 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
for rows.Next() {
|
||||
var (
|
||||
id, quoteReqID string
|
||||
tradeID *string
|
||||
jMessageRaw []byte
|
||||
createdAt time.Time
|
||||
)
|
||||
|
||||
if err := rows.Scan(&id, "eReqID, &tradeID, &jMessageRaw, &createdAt); err != nil {
|
||||
if err := rows.Scan(&id, "eReqID, &jMessageRaw, &createdAt); err != nil {
|
||||
return nil, tracerr.Errorf("error scanning message row: %w", err)
|
||||
}
|
||||
|
||||
@ -69,18 +68,12 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
|
||||
}
|
||||
|
||||
msg := domain.TradeMessage{
|
||||
messages = append(messages, domain.TradeMessage{
|
||||
ID: id,
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: jMessage,
|
||||
CreatedAt: createdAt,
|
||||
}
|
||||
|
||||
if tradeID != nil {
|
||||
msg.TradeID = *tradeID
|
||||
}
|
||||
|
||||
messages = append(messages, msg)
|
||||
})
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
@ -90,69 +83,6 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (p *Store) UpdateLogTradeID(quoteReqID, tradeID string) error {
|
||||
_, err := p.db.Exec(
|
||||
"UPDATE qfixdpl_logs SET trade_id = $1, updated_at = NOW() WHERE quote_req_id = $2",
|
||||
tradeID, quoteReqID,
|
||||
)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error updating log trade_id: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Store) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
|
||||
rows, err := p.db.Query(
|
||||
"SELECT trade_id, raw_msg FROM qfixdpl_logs WHERE quote_req_id = $1", quoteReqID,
|
||||
)
|
||||
if err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error querying dpl log: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("no log found for quoteReqID: %s", quoteReqID)
|
||||
}
|
||||
|
||||
var (
|
||||
tradeID *string
|
||||
rawMsg string
|
||||
)
|
||||
|
||||
if err := rows.Scan(&tradeID, &rawMsg); err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error scanning dpl log row: %w", err)
|
||||
}
|
||||
|
||||
result := domain.FullTradeLog{
|
||||
QuoteReqID: quoteReqID,
|
||||
DPLEntries: strings.Split(rawMsg, "\n"),
|
||||
}
|
||||
|
||||
if tradeID != nil && *tradeID != "" {
|
||||
result.TradeID = *tradeID
|
||||
|
||||
ptRows, err := p.db.Query(
|
||||
"SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", *tradeID,
|
||||
)
|
||||
if err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error querying pt logs: %w", err)
|
||||
}
|
||||
defer ptRows.Close()
|
||||
|
||||
if ptRows.Next() {
|
||||
var ptRawMsg string
|
||||
if err := ptRows.Scan(&ptRawMsg); err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error scanning pt log row: %w", err)
|
||||
}
|
||||
|
||||
result.PTEntries = strings.Split(ptRawMsg, "\n")
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (p *Store) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
|
||||
selectStmt := "SELECT raw_msg FROM qfixdpl_logs WHERE quote_req_id = '" + quoteReqID + "';"
|
||||
|
||||
|
||||
@ -3,29 +3,18 @@ package domain
|
||||
|
||||
import "time"
|
||||
|
||||
// TradeStatus represents the lifecycle state of a List Trading trade.
|
||||
type TradeStatus string
|
||||
|
||||
const (
|
||||
TradeStatusActive TradeStatus = "active"
|
||||
TradeStatusRejected TradeStatus = "rejected"
|
||||
TradeStatusCompleted TradeStatus = "completed"
|
||||
)
|
||||
|
||||
// ListTrade es la representacion exportada de un trade de List Trading.
|
||||
type ListTrade struct {
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
TradeID string `json:"trade_id,omitempty"`
|
||||
ListID string `json:"list_id"`
|
||||
Symbol string `json:"symbol"`
|
||||
SecurityIDSrc string `json:"security_id_src"`
|
||||
Currency string `json:"currency"`
|
||||
Side string `json:"side"`
|
||||
OrderQty string `json:"order_qty"`
|
||||
SettlDate string `json:"settl_date"`
|
||||
Price string `json:"price"`
|
||||
OwnerTraderID string `json:"owner_trader_id"`
|
||||
Status TradeStatus `json:"status"`
|
||||
QuoteReqID string
|
||||
ListID string
|
||||
Symbol string
|
||||
SecurityIDSrc string
|
||||
Currency string
|
||||
Side string
|
||||
OrderQty string
|
||||
SettlDate string
|
||||
Price string
|
||||
OwnerTraderID string
|
||||
}
|
||||
|
||||
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
|
||||
@ -42,7 +31,6 @@ type FixMessageJSON struct {
|
||||
type TradeMessage struct {
|
||||
ID string `json:"id"`
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
TradeID string `json:"trade_id,omitempty"`
|
||||
JMessage FixMessageJSON `json:"j_message"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
@ -58,20 +46,10 @@ type Logs struct {
|
||||
Entries []string `json:"entries"`
|
||||
}
|
||||
|
||||
// FullTradeLog es la respuesta del endpoint GET /trades/:id/full-log.
|
||||
type FullTradeLog struct {
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
TradeID string `json:"trade_id,omitempty"`
|
||||
DPLEntries []string `json:"dpl_entries"`
|
||||
PTEntries []string `json:"pt_entries,omitempty"`
|
||||
}
|
||||
|
||||
// PersistenceStore define la interfaz de persistencia.
|
||||
type PersistenceStore interface {
|
||||
SaveMessage(msg TradeMessage) error
|
||||
SaveLog(entry LogEntry) error
|
||||
GetTodayMessages() ([]TradeMessage, error)
|
||||
GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
|
||||
UpdateLogTradeID(quoteReqID, tradeID string) error
|
||||
GetFullTradeLog(id string) (FullTradeLog, error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user