2 Commits

5 changed files with 44 additions and 82 deletions

View File

@ -51,9 +51,9 @@ func newController(pool *redis.Pool, userData app.UserDataProvider,
}
}
func (cont *Controller) GetTrades(ctx *gin.Context) {
func (cont *Controller) TradeCaptureReports(ctx *gin.Context) {
setHeaders(ctx, cont.config)
trades := cont.tradeProvider.GetTrades()
trades := cont.tradeProvider.TradeCaptureReports()
ctx.JSON(http.StatusOK, trades)
}

View File

@ -21,7 +21,7 @@ func SetRoutes(api *API) {
qfixpt := v1.Group("/")
qfixpt.Use(cont.AuthRequired)
qfixpt.GET("/health", cont.HealthCheck)
qfixpt.GET("/trades", cont.GetTrades)
qfixpt.GET("/trade_capture_reports", cont.TradeCaptureReports)
qfixpt.GET("/trades/all", cont.GetAllTrades)
qfixpt.GET("/trades/:tradeID/logs", cont.GetTradeLogs)

View File

@ -34,8 +34,8 @@ type Config struct {
// TradeProvider exposes trade data for the REST API.
type TradeProvider interface {
GetTrades() []domain.PostTrade
GetAllTrades() []domain.PostTrade
TradeCaptureReports() []domain.TradeCaptureReport
GetAllTrades() []domain.TradeCaptureReport
}
func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {

View File

@ -27,13 +27,13 @@ import (
// Custom Tradeweb tags not in generated code.
const (
tagTestMessage = 23029
tagTWTradeID = 23068
tagTWOrigTradeID = 23096
tagTWClrID = 23025
tagDlrClrID = 23027
tagClearingStatus = 5440
tagTradingSystemID = 6731
tagTestMessage = 23029
tagTWTradeID = 23068
tagTWOrigTradeID = 23096
tagTWClrID = 23025
tagDlrClrID = 23027
tagClearingStatus = 5440
tagTradingSystemID = 6731
)
// postTrade is the internal representation of a trade in memory.
@ -55,7 +55,7 @@ type Manager struct {
initiator *quickfix.Initiator
app *application
tradesMu sync.RWMutex
trades map[string]*postTrade
trades map[string]domain.TradeCaptureReport
store domain.PersistenceStore
notify domain.Notifier
cfg app.FIXConfig
@ -63,7 +63,7 @@ type Manager struct {
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
return &Manager{
trades: make(map[string]*postTrade),
trades: make(map[string]domain.TradeCaptureReport),
store: store,
notify: notify,
cfg: cfg,
@ -202,7 +202,7 @@ func (m *Manager) handleTradeCaptureReport(msg tradecapturereport.TradeCaptureRe
twTraderID = extractPartyByRole(s, "1007")
}
trade := domain.Trade{
trade := domain.TradeCaptureReport{
TradeReportID: tradeReportID,
TradeID: tradeID,
ExecID: execID,
@ -236,43 +236,8 @@ func (m *Manager) handleTradeCaptureReport(msg tradecapturereport.TradeCaptureRe
// Persist structured message.
m.persistMessage(tradeID, parseTradeCaptureReportJSON(msg))
// Update in-memory state.
var status domain.PostTradeStatus
switch string(tradeReportType) {
case "101": // TRDCONF — trade confirmation
status = domain.PostTradeStatusActive
slog.Info("TRDCONF: trade confirmation", "tradeReportID", tradeReportID, "tradeID", tradeID)
case "102": // TRDBLOCK — block trade, log only
status = domain.PostTradeStatusActive
slog.Info("TRDBLOCK: block trade", "tradeReportID", tradeReportID, "tradeID", tradeID)
case "103": // TRDCORR — trade correction
status = domain.PostTradeStatusCorrected
slog.Info("TRDCORR: trade correction", "tradeReportID", tradeReportID, "refID", tradeReportRefID)
case "104": // TRDCXL — trade cancellation
status = domain.PostTradeStatusCancelled
slog.Info("TRDCXL: trade cancel", "tradeReportID", tradeReportID, "refID", tradeReportRefID)
case "105", "106", "107", "108": // log and ack only, no business logic
status = domain.PostTradeStatusActive
slog.Info("trade report (no business logic)", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID)
default:
status = domain.PostTradeStatusActive
slog.Warn("unexpected TradeReportType", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID)
}
m.tradesMu.Lock()
m.trades[tradeID] = &postTrade{
TradeID: tradeID,
TradeReportID: tradeReportID,
TradeReportType: string(tradeReportType),
Side: side,
LastQty: lastQty,
LastPx: lastPx,
SettlDate: settlDate,
TradeDate: tradeDate,
Status: status,
TWTradeID: twTradeID,
}
m.trades[tradeID] = trade
m.tradesMu.Unlock()
}
@ -451,7 +416,7 @@ func (m *Manager) loadTrades() error {
return err
}
trades := make(map[string]*postTrade)
trades := make(map[string]domain.TradeCaptureReport)
for _, msg := range messages {
switch msg.JMessage.MsgType {
@ -466,9 +431,9 @@ func (m *Manager) loadTrades() error {
t, exists := trades[tradeID]
if !exists {
t = &postTrade{
t = domain.TradeCaptureReport{
TradeID: tradeID,
Status: domain.PostTradeStatusActive,
// Status: domain.PostTradeStatusActive,
}
trades[tradeID] = t
}
@ -503,60 +468,57 @@ func (m *Manager) loadTrades() error {
t.TWTradeID = v
}
switch tradeReportType {
case "103": // TRDCORR
t.Status = domain.PostTradeStatusCorrected
case "104": // TRDCXL
t.Status = domain.PostTradeStatusCancelled
default:
if t.Status != domain.PostTradeStatusCorrected && t.Status != domain.PostTradeStatusCancelled {
t.Status = domain.PostTradeStatusActive
}
}
// switch tradeReportType {
// case "103": // TRDCORR
// t.Status = domain.PostTradeStatusCorrected
// case "104": // TRDCXL
// t.Status = domain.PostTradeStatusCancelled
// default:
// if t.Status != domain.PostTradeStatusCorrected && t.Status != domain.PostTradeStatusCancelled {
// t.Status = domain.PostTradeStatusActive
// }
// }
}
}
active := 0
for _, t := range trades {
if t.Status == domain.PostTradeStatusActive {
active++
}
}
// active := 0
// for _, t := range trades {
// if t.Status == domain.PostTradeStatusActive {
// active++
// }
// }
m.tradesMu.Lock()
m.trades = trades
m.tradesMu.Unlock()
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
// slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
return nil
}
// GetTrades returns only active trades.
func (m *Manager) GetTrades() []domain.PostTrade {
func (m *Manager) TradeCaptureReports() []domain.TradeCaptureReport {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
var result []domain.PostTrade
var result []domain.TradeCaptureReport
for _, t := range m.trades {
if t.Status == domain.PostTradeStatusActive {
result = append(result, t.toPostTrade())
}
result = append(result, t)
}
return result
}
// GetAllTrades returns all trades regardless of status.
func (m *Manager) GetAllTrades() []domain.PostTrade {
func (m *Manager) GetAllTrades() []domain.TradeCaptureReport {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
result := make([]domain.PostTrade, 0, len(m.trades))
result := make([]domain.TradeCaptureReport, 0, len(m.trades))
for _, t := range m.trades {
result = append(result, t.toPostTrade())
result = append(result, t)
}
return result

View File

@ -2,8 +2,8 @@ package domain
import "github.com/shopspring/decimal"
// Trade represents data extracted from a TradeCaptureReport (35=AE).
type Trade struct {
// TradeCaptureReport represents data extracted from a TradeCaptureReport (35=AE).
type TradeCaptureReport struct {
TradeReportID string // 571
TradeID string // 1003
ExecID string // 17