Merge pull request 'Structure, persistance, recovery' (#2) from FIXDPL-2/New_structure_and_persistance into develop
Reviewed-on: #2
This commit is contained in:
@ -293,7 +293,7 @@ func allowed(origin string, config Config) bool {
|
||||
|
||||
// GetTrades godoc
|
||||
// @Summary List active trades
|
||||
// @Description Returns all active List Trading trades
|
||||
// @Description Returns only active List Trading trades
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Success 200 {array} domain.ListTrade
|
||||
@ -303,6 +303,18 @@ func (cont *Controller) GetTrades(ctx *gin.Context) {
|
||||
ctx.JSON(http.StatusOK, trades)
|
||||
}
|
||||
|
||||
// GetAllTrades godoc
|
||||
// @Summary List all trades
|
||||
// @Description Returns all List Trading trades (active, rejected, completed)
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Success 200 {array} domain.ListTrade
|
||||
// @Router /qfixdpl/v1/trades/all [get]
|
||||
func (cont *Controller) GetAllTrades(ctx *gin.Context) {
|
||||
trades := cont.tradeProvider.GetAllTrades()
|
||||
ctx.JSON(http.StatusOK, trades)
|
||||
}
|
||||
|
||||
// GetLogs godoc
|
||||
// @Summary Get raw FIX logs for a trade
|
||||
// @Description Returns raw FIX message logs for a given QuoteReqID
|
||||
@ -325,3 +337,25 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
|
||||
ctx.JSON(http.StatusOK, logs)
|
||||
}
|
||||
|
||||
// GetFullTradeLog godoc
|
||||
// @Summary Get full trade lifecycle log
|
||||
// @Description Returns DPL and Post-Trade raw FIX logs for a given trade (by QuoteReqID)
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Param quoteReqID path string true "QuoteReqID"
|
||||
// @Success 200 {object} domain.FullTradeLog
|
||||
// @Router /qfixdpl/v1/trades/{quoteReqID}/full-log [get]
|
||||
func (cont *Controller) GetFullTradeLog(ctx *gin.Context) {
|
||||
quoteReqID := ctx.Param("quoteReqID")
|
||||
|
||||
fullLog, err := cont.store.GetFullTradeLog(quoteReqID)
|
||||
if err != nil {
|
||||
slog.Error("GetFullTradeLog: error fetching full trade log", "quoteReqID", quoteReqID, "error", err)
|
||||
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching full trade log"})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, fullLog)
|
||||
}
|
||||
|
||||
|
||||
@ -22,7 +22,8 @@ func SetRoutes(api *API) {
|
||||
qfixdpl.Use(cont.AuthRequired)
|
||||
qfixdpl.GET("/health", cont.HealthCheck)
|
||||
qfixdpl.GET("/trades", cont.GetTrades)
|
||||
qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs)
|
||||
qfixdpl.GET("/trades/all", cont.GetAllTrades)
|
||||
qfixdpl.GET("/trades/:quoteReqID/full-log", cont.GetFullTradeLog)
|
||||
|
||||
backoffice := qfixdpl.Group("/backoffice")
|
||||
backoffice.Use(cont.BackOfficeUser)
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
// TradeProvider exposes trade data from the FIX manager.
|
||||
type TradeProvider interface {
|
||||
GetTrades() []domain.ListTrade
|
||||
GetAllTrades() []domain.ListTrade
|
||||
}
|
||||
|
||||
const RedisMaxIdle = 3000 // In ms
|
||||
|
||||
@ -29,6 +29,7 @@ import (
|
||||
|
||||
type listTrade struct {
|
||||
QuoteReqID string
|
||||
TradeID string
|
||||
ListID string
|
||||
Symbol string
|
||||
SecurityIDSrc enum.SecurityIDSource
|
||||
@ -39,6 +40,7 @@ type listTrade struct {
|
||||
Price decimal.Decimal
|
||||
OwnerTraderID string
|
||||
SessionID quickfix.SessionID
|
||||
Status domain.TradeStatus
|
||||
}
|
||||
|
||||
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
|
||||
@ -76,7 +78,7 @@ func (m *Manager) Start() error {
|
||||
fixApp.onRawMessage = m.handleRawMessage
|
||||
m.app = fixApp
|
||||
|
||||
if err := m.loadActiveTrades(); err != nil {
|
||||
if err := m.loadTrades(); err != nil {
|
||||
slog.Error("failed to load active trades from DB, starting with empty state", "error", err)
|
||||
}
|
||||
|
||||
@ -139,6 +141,18 @@ func (m *Manager) onLogon(sessionID quickfix.SessionID) {
|
||||
m.sessionsMu.Lock()
|
||||
m.sessions[sessionID.String()] = sessionID
|
||||
m.sessionsMu.Unlock()
|
||||
|
||||
// Assign the new session to all recovered trades that have no session yet.
|
||||
// This covers the case where the service was restarted mid-trade: loadTrades()
|
||||
// reconstructs the trade data but cannot recover the SessionID from the DB.
|
||||
// Since this is a single-session initiator, all active trades belong to this session.
|
||||
m.tradesMu.Lock()
|
||||
for _, trade := range m.trades {
|
||||
if trade.Status == domain.TradeStatusActive && trade.SessionID == (quickfix.SessionID{}) {
|
||||
trade.SessionID = sessionID
|
||||
}
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
}
|
||||
|
||||
func (m *Manager) onLogout(sessionID quickfix.SessionID) {
|
||||
@ -311,6 +325,7 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
|
||||
Price: price,
|
||||
OwnerTraderID: ownerTraderID,
|
||||
SessionID: sessionID,
|
||||
Status: domain.TradeStatusActive,
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
@ -345,11 +360,16 @@ func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.Sessi
|
||||
|
||||
m.persistMessage(quoteReqID, parseQuoteAck(msg))
|
||||
|
||||
if status != enum.QuoteAckStatus_ACCEPTED {
|
||||
// QuoteAckStatus only has two defined values in TW DPL:
|
||||
// 1 = Accepted — quote delivered to client.
|
||||
// 2 = Rejected — format error, late quote, viewer busy, or race condition.
|
||||
if status == enum.QuoteAckStatus_REJECTED {
|
||||
slog.Error("handleQuoteAck: quote rejected by TW", "quoteReqID", quoteReqID, "quoteAckStatus", string(status), "text", text)
|
||||
|
||||
m.tradesMu.Lock()
|
||||
delete(m.trades, quoteReqID)
|
||||
if t, ok := m.trades[quoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusRejected
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
return
|
||||
@ -396,17 +416,21 @@ func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID
|
||||
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
|
||||
}))
|
||||
|
||||
// _TRDSUMM is the final message — clean up the trade.
|
||||
// _TRDSUMM is the final message — mark trade as completed.
|
||||
if isTrdSumm {
|
||||
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
|
||||
slog.Info("Trade summary received, marking completed", "quoteReqID", quoteReqID)
|
||||
m.tradesMu.Lock()
|
||||
delete(m.trades, quoteReqID)
|
||||
if t, ok := m.trades[quoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// handleExecutionReport handles an incoming ExecutionReport (35=8).
|
||||
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
|
||||
// In TW DPL, ClOrdID (Tag 11) always equals the original QuoteReqID (Tag 131),
|
||||
// so we use clOrdID directly as the map lookup key.
|
||||
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
|
||||
execID, _ := msg.GetExecID()
|
||||
orderID, _ := msg.GetOrderID()
|
||||
@ -436,11 +460,13 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses
|
||||
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
|
||||
|
||||
case strings.Contains(execID, "_TRDSUMM"):
|
||||
slog.Info("Trade summary received from TW, cleaning up",
|
||||
slog.Info("Trade summary received from TW, marking completed",
|
||||
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
|
||||
|
||||
m.tradesMu.Lock()
|
||||
delete(m.trades, clOrdID)
|
||||
if t, ok := m.trades[clOrdID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
case execType == enum.ExecType_TRADE:
|
||||
@ -458,6 +484,20 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses
|
||||
"ClOrdID": clOrdID,
|
||||
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
|
||||
}))
|
||||
|
||||
// If the ExecutionReport carries a TradeID, persist it in qfixdpl_logs for cross-service correlation.
|
||||
tradeID, _ := msg.GetTradeID()
|
||||
if tradeID != "" {
|
||||
m.tradesMu.Lock()
|
||||
if t, ok := m.trades[clOrdID]; ok {
|
||||
t.TradeID = tradeID
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
if err := m.store.UpdateLogTradeID(clOrdID, tradeID); err != nil {
|
||||
slog.Error("handleExecutionReport: failed to update log trade_id", "clOrdID", clOrdID, "tradeID", tradeID, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
|
||||
@ -465,30 +505,53 @@ func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID qu
|
||||
// Logged in application.go, no further action needed.
|
||||
}
|
||||
|
||||
// GetTrades returns a snapshot of all active trades.
|
||||
// GetTrades returns a snapshot of only active trades.
|
||||
func (m *Manager) GetTrades() []domain.ListTrade {
|
||||
m.tradesMu.RLock()
|
||||
defer m.tradesMu.RUnlock()
|
||||
|
||||
trades := make([]domain.ListTrade, 0, len(m.trades))
|
||||
for _, t := range m.trades {
|
||||
trades = append(trades, domain.ListTrade{
|
||||
QuoteReqID: t.QuoteReqID,
|
||||
ListID: t.ListID,
|
||||
Symbol: t.Symbol,
|
||||
SecurityIDSrc: string(t.SecurityIDSrc),
|
||||
Currency: t.Currency,
|
||||
Side: string(t.Side),
|
||||
OrderQty: t.OrderQty.String(),
|
||||
SettlDate: t.SettlDate,
|
||||
Price: t.Price.String(),
|
||||
OwnerTraderID: t.OwnerTraderID,
|
||||
})
|
||||
if t.Status != domain.TradeStatusActive {
|
||||
continue
|
||||
}
|
||||
|
||||
trades = append(trades, toListTrade(t))
|
||||
}
|
||||
|
||||
return trades
|
||||
}
|
||||
|
||||
// GetAllTrades returns a snapshot of all trades (active, rejected, completed).
|
||||
func (m *Manager) GetAllTrades() []domain.ListTrade {
|
||||
m.tradesMu.RLock()
|
||||
defer m.tradesMu.RUnlock()
|
||||
|
||||
trades := make([]domain.ListTrade, 0, len(m.trades))
|
||||
for _, t := range m.trades {
|
||||
trades = append(trades, toListTrade(t))
|
||||
}
|
||||
|
||||
return trades
|
||||
}
|
||||
|
||||
func toListTrade(t *listTrade) domain.ListTrade {
|
||||
return domain.ListTrade{
|
||||
QuoteReqID: t.QuoteReqID,
|
||||
TradeID: t.TradeID,
|
||||
ListID: t.ListID,
|
||||
Symbol: t.Symbol,
|
||||
SecurityIDSrc: string(t.SecurityIDSrc),
|
||||
Currency: t.Currency,
|
||||
Side: string(t.Side),
|
||||
OrderQty: t.OrderQty.String(),
|
||||
SettlDate: t.SettlDate,
|
||||
Price: t.Price.String(),
|
||||
OwnerTraderID: t.OwnerTraderID,
|
||||
Status: t.Status,
|
||||
}
|
||||
}
|
||||
|
||||
// handleRawMessage persists raw FIX message strings to the logs table.
|
||||
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
|
||||
quoteReqID := extractIdentifier(msg)
|
||||
@ -511,14 +574,14 @@ func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSO
|
||||
}
|
||||
}
|
||||
|
||||
// loadActiveTrades reconstructs active trades from today's messages in the database.
|
||||
func (m *Manager) loadActiveTrades() error {
|
||||
// loadTrades reconstructs all trades and their states from today's messages in the database.
|
||||
func (m *Manager) loadTrades() error {
|
||||
messages, err := m.store.GetTodayMessages()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
activeTrades := make(map[string]*listTrade)
|
||||
trades := make(map[string]*listTrade)
|
||||
|
||||
for _, msg := range messages {
|
||||
switch msg.JMessage.MsgType {
|
||||
@ -542,6 +605,7 @@ func (m *Manager) loadActiveTrades() error {
|
||||
trade := &listTrade{
|
||||
QuoteReqID: msg.QuoteReqID,
|
||||
ListID: listID,
|
||||
Status: domain.TradeStatusActive,
|
||||
}
|
||||
|
||||
if v, ok := body["SecurityID"].(string); ok {
|
||||
@ -568,14 +632,16 @@ func (m *Manager) loadActiveTrades() error {
|
||||
trade.OwnerTraderID = v
|
||||
}
|
||||
|
||||
activeTrades[msg.QuoteReqID] = trade
|
||||
trades[msg.QuoteReqID] = trade
|
||||
|
||||
case "CW": // QuoteAck — if rejected, trade is dead
|
||||
case "CW": // QuoteAck — only status "2" (Rejected) marks the trade as rejected
|
||||
body := msg.JMessage.Body
|
||||
quoteAckStatus, _ := body["QuoteAckStatus"].(string)
|
||||
|
||||
if quoteAckStatus != string(enum.QuoteAckStatus_ACCEPTED) {
|
||||
delete(activeTrades, msg.QuoteReqID)
|
||||
if quoteAckStatus == string(enum.QuoteAckStatus_REJECTED) {
|
||||
if t, ok := trades[msg.QuoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusRejected
|
||||
}
|
||||
}
|
||||
|
||||
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
|
||||
@ -583,7 +649,9 @@ func (m *Manager) loadActiveTrades() error {
|
||||
quoteRespID, _ := body["QuoteRespID"].(string)
|
||||
|
||||
if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
|
||||
delete(activeTrades, msg.QuoteReqID)
|
||||
if t, ok := trades[msg.QuoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
}
|
||||
|
||||
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
|
||||
@ -591,14 +659,29 @@ func (m *Manager) loadActiveTrades() error {
|
||||
execID, _ := body["ExecID"].(string)
|
||||
clOrdID, _ := body["ClOrdID"].(string)
|
||||
|
||||
if tid, ok := body["TradeID"].(string); ok && tid != "" {
|
||||
if t, ok := trades[clOrdID]; ok {
|
||||
t.TradeID = tid
|
||||
}
|
||||
}
|
||||
|
||||
if strings.Contains(execID, "_TRDSUMM") {
|
||||
delete(activeTrades, clOrdID)
|
||||
if t, ok := trades[clOrdID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
m.trades = activeTrades
|
||||
slog.Info("recovery completed", "activeTrades", len(activeTrades))
|
||||
active := 0
|
||||
for _, t := range trades {
|
||||
if t.Status == domain.TradeStatusActive {
|
||||
active++
|
||||
}
|
||||
}
|
||||
|
||||
m.trades = trades
|
||||
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
647
src/client/fix/manager_test.go
Normal file
647
src/client/fix/manager_test.go
Normal file
@ -0,0 +1,647 @@
|
||||
package fix
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"quantex.com/qfixdpl/quickfix"
|
||||
"quantex.com/qfixdpl/src/app"
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// newTestManager builds a Manager with the given store without calling Start().
|
||||
func newTestManager(store domain.PersistenceStore) *Manager {
|
||||
notify := &MockNotifier{}
|
||||
return NewManager(app.FIXConfig{}, store, notify)
|
||||
}
|
||||
|
||||
// makeMsg builds a TradeMessage with the given quoteReqID, msgType, and body.
|
||||
func makeMsg(quoteReqID, msgType string, body map[string]interface{}) domain.TradeMessage {
|
||||
return domain.TradeMessage{
|
||||
ID: "test-id-" + quoteReqID + "-" + msgType,
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: domain.FixMessageJSON{
|
||||
MsgType: msgType,
|
||||
QuoteReqID: quoteReqID,
|
||||
Body: body,
|
||||
ReceiveTime: time.Now(),
|
||||
},
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// makeQuoteRequest builds a "R" (QuoteRequest) TradeMessage.
|
||||
func makeQuoteRequest(quoteReqID, listID, nt string, extras map[string]interface{}) domain.TradeMessage {
|
||||
body := map[string]interface{}{
|
||||
"NegotiationType": nt,
|
||||
"ListID": listID,
|
||||
}
|
||||
for k, v := range extras {
|
||||
body[k] = v
|
||||
}
|
||||
return makeMsg(quoteReqID, "R", body)
|
||||
}
|
||||
|
||||
// makeQuoteAck builds a "CW" (QuoteAck) TradeMessage.
|
||||
func makeQuoteAck(quoteReqID, status string) domain.TradeMessage {
|
||||
return makeMsg(quoteReqID, "CW", map[string]interface{}{
|
||||
"QuoteAckStatus": status,
|
||||
})
|
||||
}
|
||||
|
||||
// makeQuoteResponse builds an "AJ" (QuoteResponse) TradeMessage.
|
||||
func makeQuoteResponse(quoteReqID, quoteRespID string) domain.TradeMessage {
|
||||
return makeMsg(quoteReqID, "AJ", map[string]interface{}{
|
||||
"QuoteRespID": quoteRespID,
|
||||
})
|
||||
}
|
||||
|
||||
// makeExecutionReport builds an "8" (ExecutionReport) TradeMessage.
|
||||
// clOrdID maps to body["ClOrdID"]; it is also set as TradeMessage.QuoteReqID
|
||||
// to mirror how persistMessage works in handleExecutionReport.
|
||||
func makeExecutionReport(clOrdID, execID string) domain.TradeMessage {
|
||||
return makeMsg(clOrdID, "8", map[string]interface{}{
|
||||
"ClOrdID": clOrdID,
|
||||
"ExecID": execID,
|
||||
})
|
||||
}
|
||||
|
||||
// makeOutgoing builds an outgoing message (AI, S, BN) for a given quoteReqID.
|
||||
func makeOutgoing(quoteReqID, msgType string) domain.TradeMessage {
|
||||
return makeMsg(quoteReqID, msgType, map[string]interface{}{})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 1 — DB vacia
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_EmptyDB(t *testing.T) {
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, m.trades)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 2 — Interrupcion despues de QuoteRequest
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_SingleR_CreatesOneTrade(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", map[string]interface{}{
|
||||
"SecurityID": "US1234567890",
|
||||
"Currency": "USD",
|
||||
"Side": "1",
|
||||
"OrderQty": "1000000",
|
||||
"SettlDate": "20260320",
|
||||
"OwnerTraderID": "trader1",
|
||||
})
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, m.trades, 1)
|
||||
|
||||
trade := m.trades["LST_ABC123"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, "LST_ABC123", trade.QuoteReqID)
|
||||
assert.Equal(t, "LIST_1", trade.ListID)
|
||||
assert.Equal(t, "US1234567890", trade.Symbol)
|
||||
assert.Equal(t, "USD", trade.Currency)
|
||||
assert.Equal(t, "1", string(trade.Side))
|
||||
assert.Equal(t, "20260320", trade.SettlDate)
|
||||
assert.Equal(t, "trader1", trade.OwnerTraderID)
|
||||
assert.Equal(t, domain.TradeStatusActive, trade.Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_WithOutgoingMsgs_IgnoresAI_S(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
ai := makeOutgoing("LST_ABC123", "AI")
|
||||
s := makeOutgoing("LST_ABC123", "S")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, ai, s}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_NonLSTPrefix_Ignored(t *testing.T) {
|
||||
r := makeQuoteRequest("TRD_ABC123", "LIST_1", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_NonRFQ_NegotiationType_Ignored(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "DEALER", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_EmptyListID_Ignored(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_MissingOptionalFields_TradeCreated(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_MIN", "LIST_MIN", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, m.trades, 1)
|
||||
|
||||
trade := m.trades["LST_MIN"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, "LST_MIN", trade.QuoteReqID)
|
||||
assert.Equal(t, "LIST_MIN", trade.ListID)
|
||||
assert.Equal(t, "", trade.Symbol)
|
||||
assert.Equal(t, "", trade.Currency)
|
||||
assert.Equal(t, "", trade.SettlDate)
|
||||
assert.Equal(t, "", trade.OwnerTraderID)
|
||||
assert.Equal(t, domain.TradeStatusActive, trade.Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 3 — Interrupcion despues de QuoteAck
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_CW_Accepted_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1") // ACCEPTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_CW_Rejected_TradeMarkedRejected(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "2") // REJECTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusRejected, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// QuoteAckStatus "0" is RECEIVED_NOT_YET_PROCESSED — not a rejection.
|
||||
// Only status "2" (Rejected) should mark the trade as rejected.
|
||||
func TestLoadTrades_CW_ReceivedNotYetProcessed_TradeStaysActive(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "0") // RECEIVED_NOT_YET_PROCESSED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_CW_WithoutPriorR_NoCrash(t *testing.T) {
|
||||
cw := makeQuoteAck("LST_ORPHAN", "2")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 4 — Interrupcion despues de QuoteResponse
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_AJ_TRDREQ_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDREQ")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_TRDEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_LISTEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_LISTEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
|
||||
aj := makeQuoteResponse("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 5 — Interrupcion despues de ExecutionReport
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_ExecReport_LISTEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_ABC123", "1")
|
||||
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
|
||||
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_ExecReport_TRDEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_ABC123", "1")
|
||||
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
|
||||
execListEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
|
||||
execTrdEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, execListEnd, execTrdEnd}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_ExecReport_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_ABC123", "1")
|
||||
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
|
||||
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_ExecReport_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
|
||||
exec := makeExecutionReport("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 6 — Multiples trades en paralelo
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_TwoTrades_BothActive(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
|
||||
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
|
||||
cw1 := makeQuoteAck("LST_TRADE1", "1")
|
||||
cw2 := makeQuoteAck("LST_TRADE2", "1")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, cw1, cw2}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 2)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE1"].Status)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_TwoTrades_OneCompleted_OneActive(t *testing.T) {
|
||||
// TRADE1: fully completed via flow 8.4 (_TRDSUMM execution report)
|
||||
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
|
||||
cw1 := makeQuoteAck("LST_TRADE1", "1")
|
||||
aj1 := makeQuoteResponse("LST_TRADE1", "LST_TRADE1_TRDREQ")
|
||||
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
|
||||
|
||||
// TRADE2: still active
|
||||
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
|
||||
cw2 := makeQuoteAck("LST_TRADE2", "1")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return(
|
||||
[]domain.TradeMessage{r1, cw1, aj1, exec1, r2, cw2}, nil,
|
||||
)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 2)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_TwoTrades_BothCompleted(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
|
||||
cw1 := makeQuoteAck("LST_TRADE1", "1")
|
||||
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
|
||||
|
||||
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
|
||||
cw2 := makeQuoteAck("LST_TRADE2", "1")
|
||||
exec2 := makeExecutionReport("LST_TRADE2", "LST_TRADE2_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return(
|
||||
[]domain.TradeMessage{r1, cw1, exec1, r2, cw2, exec2}, nil,
|
||||
)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 2)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE2"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 7 — SessionID se asigna en onLogon (solo a trades activos)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_SessionID_IsZeroAfterRecovery(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, m.trades, 1)
|
||||
|
||||
trade := m.trades["LST_X"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, quickfix.SessionID{}, trade.SessionID)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestOnLogon_AssignsSessionToRecoveredActiveTrades(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
sessionID := quickfix.SessionID{
|
||||
BeginString: "FIXT.1.1",
|
||||
SenderCompID: "QFIXDPL",
|
||||
TargetCompID: "TRADEWEB",
|
||||
}
|
||||
m.onLogon(sessionID)
|
||||
|
||||
trade := m.trades["LST_X"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, sessionID, trade.SessionID)
|
||||
}
|
||||
|
||||
func TestOnLogon_DoesNotAssignSessionToCompletedTrades(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
|
||||
exec := makeExecutionReport("LST_X", "LST_X_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
sessionID := quickfix.SessionID{
|
||||
BeginString: "FIXT.1.1",
|
||||
SenderCompID: "QFIXDPL",
|
||||
TargetCompID: "TRADEWEB",
|
||||
}
|
||||
m.onLogon(sessionID)
|
||||
|
||||
trade := m.trades["LST_X"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, quickfix.SessionID{}, trade.SessionID, "completed trades should not get session assigned")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 8 — GetTrades filtra por active, GetAllTrades devuelve todos
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestGetTrades_ReturnsOnlyActive(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
|
||||
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
|
||||
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
|
||||
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
trades := m.GetTrades()
|
||||
assert.Len(t, trades, 1)
|
||||
assert.Equal(t, "LST_ACTIVE", trades[0].QuoteReqID)
|
||||
assert.Equal(t, domain.TradeStatusActive, trades[0].Status)
|
||||
}
|
||||
|
||||
func TestGetAllTrades_ReturnsAll(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
|
||||
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
|
||||
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
|
||||
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
trades := m.GetAllTrades()
|
||||
assert.Len(t, trades, 3)
|
||||
|
||||
statusMap := map[string]domain.TradeStatus{}
|
||||
for _, tr := range trades {
|
||||
statusMap[tr.QuoteReqID] = tr.Status
|
||||
}
|
||||
|
||||
assert.Equal(t, domain.TradeStatusActive, statusMap["LST_ACTIVE"])
|
||||
assert.Equal(t, domain.TradeStatusCompleted, statusMap["LST_DONE"])
|
||||
assert.Equal(t, domain.TradeStatusRejected, statusMap["LST_REJ"])
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 9 — Error en store
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_StoreError_ReturnsError_MapEmpty(t *testing.T) {
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage(nil), errors.New("db connection failed"))
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.NotNil(t, m.trades)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
56
src/client/fix/mock_test.go
Normal file
56
src/client/fix/mock_test.go
Normal file
@ -0,0 +1,56 @@
|
||||
package fix
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
// MockPersistenceStore is a testify mock implementing domain.PersistenceStore.
|
||||
type MockPersistenceStore struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) SaveMessage(msg domain.TradeMessage) error {
|
||||
args := m.Called(msg)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) SaveLog(entry domain.LogEntry) error {
|
||||
args := m.Called(entry)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
args := m.Called()
|
||||
msgs, _ := args.Get(0).([]domain.TradeMessage)
|
||||
return msgs, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
|
||||
args := m.Called(quoteReqID)
|
||||
logs, _ := args.Get(0).(domain.Logs)
|
||||
return logs, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) UpdateLogTradeID(quoteReqID, tradeID string) error {
|
||||
args := m.Called(quoteReqID, tradeID)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
|
||||
args := m.Called(quoteReqID)
|
||||
log, _ := args.Get(0).(domain.FullTradeLog)
|
||||
return log, args.Error(1)
|
||||
}
|
||||
|
||||
// MockNotifier is a testify mock implementing domain.Notifier.
|
||||
type MockNotifier struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockNotifier) SendMsg(chat domain.MessageChannel, text string, status domain.MessageStatus, wg *sync.WaitGroup) {
|
||||
m.Called(chat, text, status, wg)
|
||||
}
|
||||
@ -185,6 +185,9 @@ func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessage
|
||||
if v, e := msg.GetSettlDate(); e == nil {
|
||||
body["SettlDate"] = v
|
||||
}
|
||||
if v, e := msg.GetTradeID(); e == nil {
|
||||
body["TradeID"] = v
|
||||
}
|
||||
|
||||
return domain.FixMessageJSON{
|
||||
Direction: "IN",
|
||||
|
||||
@ -1,16 +1,20 @@
|
||||
CREATE TABLE IF NOT EXISTS qfixdpl_messages (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
quote_req_id TEXT NOT NULL,
|
||||
trade_id TEXT,
|
||||
j_message JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_trade_id ON qfixdpl_messages(trade_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS qfixdpl_logs (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
quote_req_id TEXT NOT NULL UNIQUE,
|
||||
trade_id TEXT,
|
||||
raw_msg TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_dpl_logs_trade_id ON qfixdpl_logs(trade_id);
|
||||
|
||||
@ -16,8 +16,8 @@ func (p *Store) SaveMessage(msg domain.TradeMessage) error {
|
||||
}
|
||||
|
||||
_, err = p.db.Exec(
|
||||
"INSERT INTO qfixdpl_messages (quote_req_id, j_message) VALUES ($1, $2)",
|
||||
msg.QuoteReqID, string(jsonBytes),
|
||||
"INSERT INTO qfixdpl_messages (quote_req_id, trade_id, j_message) VALUES ($1, NULLIF($2, ''), $3)",
|
||||
msg.QuoteReqID, msg.TradeID, string(jsonBytes),
|
||||
)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error inserting message: %w", err)
|
||||
@ -43,7 +43,7 @@ func (p *Store) SaveLog(entry domain.LogEntry) error {
|
||||
|
||||
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
rows, err := p.db.Query(
|
||||
"SELECT id, quote_req_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
|
||||
"SELECT id, quote_req_id, trade_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, tracerr.Errorf("error querying today messages: %w", err)
|
||||
@ -55,11 +55,12 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
for rows.Next() {
|
||||
var (
|
||||
id, quoteReqID string
|
||||
tradeID *string
|
||||
jMessageRaw []byte
|
||||
createdAt time.Time
|
||||
)
|
||||
|
||||
if err := rows.Scan(&id, "eReqID, &jMessageRaw, &createdAt); err != nil {
|
||||
if err := rows.Scan(&id, "eReqID, &tradeID, &jMessageRaw, &createdAt); err != nil {
|
||||
return nil, tracerr.Errorf("error scanning message row: %w", err)
|
||||
}
|
||||
|
||||
@ -68,12 +69,18 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
|
||||
}
|
||||
|
||||
messages = append(messages, domain.TradeMessage{
|
||||
msg := domain.TradeMessage{
|
||||
ID: id,
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: jMessage,
|
||||
CreatedAt: createdAt,
|
||||
})
|
||||
}
|
||||
|
||||
if tradeID != nil {
|
||||
msg.TradeID = *tradeID
|
||||
}
|
||||
|
||||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
@ -83,6 +90,69 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (p *Store) UpdateLogTradeID(quoteReqID, tradeID string) error {
|
||||
_, err := p.db.Exec(
|
||||
"UPDATE qfixdpl_logs SET trade_id = $1, updated_at = NOW() WHERE quote_req_id = $2",
|
||||
tradeID, quoteReqID,
|
||||
)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error updating log trade_id: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Store) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
|
||||
rows, err := p.db.Query(
|
||||
"SELECT trade_id, raw_msg FROM qfixdpl_logs WHERE quote_req_id = $1", quoteReqID,
|
||||
)
|
||||
if err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error querying dpl log: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("no log found for quoteReqID: %s", quoteReqID)
|
||||
}
|
||||
|
||||
var (
|
||||
tradeID *string
|
||||
rawMsg string
|
||||
)
|
||||
|
||||
if err := rows.Scan(&tradeID, &rawMsg); err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error scanning dpl log row: %w", err)
|
||||
}
|
||||
|
||||
result := domain.FullTradeLog{
|
||||
QuoteReqID: quoteReqID,
|
||||
DPLEntries: strings.Split(rawMsg, "\n"),
|
||||
}
|
||||
|
||||
if tradeID != nil && *tradeID != "" {
|
||||
result.TradeID = *tradeID
|
||||
|
||||
ptRows, err := p.db.Query(
|
||||
"SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", *tradeID,
|
||||
)
|
||||
if err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error querying pt logs: %w", err)
|
||||
}
|
||||
defer ptRows.Close()
|
||||
|
||||
if ptRows.Next() {
|
||||
var ptRawMsg string
|
||||
if err := ptRows.Scan(&ptRawMsg); err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error scanning pt log row: %w", err)
|
||||
}
|
||||
|
||||
result.PTEntries = strings.Split(ptRawMsg, "\n")
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (p *Store) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
|
||||
selectStmt := "SELECT raw_msg FROM qfixdpl_logs WHERE quote_req_id = '" + quoteReqID + "';"
|
||||
|
||||
|
||||
@ -3,18 +3,29 @@ package domain
|
||||
|
||||
import "time"
|
||||
|
||||
// TradeStatus represents the lifecycle state of a List Trading trade.
|
||||
type TradeStatus string
|
||||
|
||||
const (
|
||||
TradeStatusActive TradeStatus = "active"
|
||||
TradeStatusRejected TradeStatus = "rejected"
|
||||
TradeStatusCompleted TradeStatus = "completed"
|
||||
)
|
||||
|
||||
// ListTrade es la representacion exportada de un trade de List Trading.
|
||||
type ListTrade struct {
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
ListID string `json:"list_id"`
|
||||
Symbol string `json:"symbol"`
|
||||
SecurityIDSrc string `json:"security_id_src"`
|
||||
Currency string `json:"currency"`
|
||||
Side string `json:"side"`
|
||||
OrderQty string `json:"order_qty"`
|
||||
SettlDate string `json:"settl_date"`
|
||||
Price string `json:"price"`
|
||||
OwnerTraderID string `json:"owner_trader_id"`
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
TradeID string `json:"trade_id,omitempty"`
|
||||
ListID string `json:"list_id"`
|
||||
Symbol string `json:"symbol"`
|
||||
SecurityIDSrc string `json:"security_id_src"`
|
||||
Currency string `json:"currency"`
|
||||
Side string `json:"side"`
|
||||
OrderQty string `json:"order_qty"`
|
||||
SettlDate string `json:"settl_date"`
|
||||
Price string `json:"price"`
|
||||
OwnerTraderID string `json:"owner_trader_id"`
|
||||
Status TradeStatus `json:"status"`
|
||||
}
|
||||
|
||||
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
|
||||
@ -31,6 +42,7 @@ type FixMessageJSON struct {
|
||||
type TradeMessage struct {
|
||||
ID string `json:"id"`
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
TradeID string `json:"trade_id,omitempty"`
|
||||
JMessage FixMessageJSON `json:"j_message"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
@ -46,10 +58,20 @@ type Logs struct {
|
||||
Entries []string `json:"entries"`
|
||||
}
|
||||
|
||||
// FullTradeLog es la respuesta del endpoint GET /trades/:id/full-log.
|
||||
type FullTradeLog struct {
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
TradeID string `json:"trade_id,omitempty"`
|
||||
DPLEntries []string `json:"dpl_entries"`
|
||||
PTEntries []string `json:"pt_entries,omitempty"`
|
||||
}
|
||||
|
||||
// PersistenceStore define la interfaz de persistencia.
|
||||
type PersistenceStore interface {
|
||||
SaveMessage(msg TradeMessage) error
|
||||
SaveLog(entry LogEntry) error
|
||||
GetTodayMessages() ([]TradeMessage, error)
|
||||
GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
|
||||
UpdateLogTradeID(quoteReqID, tradeID string) error
|
||||
GetFullTradeLog(id string) (FullTradeLog, error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user