3 Commits

15 changed files with 1442 additions and 17 deletions

View File

@ -61,8 +61,8 @@ only-build: check-env
linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e) env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
deploy: check-env # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo deploy: # Deploy to remote server. Set s=serverName, e=environment. e.g. make deploy e=dev s=nonprodFix
tools/deploy.sh $(e) make linux-build e=$(e) && qscp build/out/distribution/qfixpt.gz $(s):/home/quantex/qfixtb/qfixpt/
fmt: download-versions # Apply the Go formatter to the code fmt: download-versions # Apply the Go formatter to the code
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt); cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);

19
fix_pt.cfg Normal file
View File

@ -0,0 +1,19 @@
[DEFAULT]
ConnectionType=initiator
HeartBtInt=30
ReconnectInterval=30
SenderCompID=<CONFIGURAR>
ResetOnLogon=Y
FileStorePath=fix_store
FileLogPath=fix_logs
[SESSION]
BeginString=FIXT.1.1
DefaultApplVerID=FIX.5.0SP2
TargetCompID=<CONFIGURAR>
TransportDataDictionary=spec/FIXT11.xml
AppDataDictionary=spec/FIX50SP2.xml
StartTime=00:00:00
EndTime=00:00:00
SocketConnectHost=<CONFIGURAR>
SocketConnectPort=<CONFIGURAR>

View File

@ -36,6 +36,11 @@ type Service struct {
AuthorizedServices map[string]AuthorizedService `toml:"AuthorizedServices"` AuthorizedServices map[string]AuthorizedService `toml:"AuthorizedServices"`
APIBasePort string APIBasePort string
EnableJWTAuth bool // Enable JWT authentication for service-to-service communication EnableJWTAuth bool // Enable JWT authentication for service-to-service communication
FIX FIXConfig
}
type FIXConfig struct {
SettingsFile string // path to fix_pt.cfg file
} }
type ExtAuth struct { type ExtAuth struct {

View File

@ -29,26 +29,56 @@ const (
) )
type Controller struct { type Controller struct {
pool *redis.Pool pool *redis.Pool
userData app.UserDataProvider userData app.UserDataProvider
store *store.Store store *store.Store
config Config tradeProvider TradeProvider
notify domain.Notifier config Config
authMutex deadlock.Mutex notify domain.Notifier
authMutex deadlock.Mutex
} }
func newController(pool *redis.Pool, userData app.UserDataProvider, func newController(pool *redis.Pool, userData app.UserDataProvider,
s *store.Store, config Config, n domain.Notifier, s *store.Store, tp TradeProvider, config Config, n domain.Notifier,
) *Controller { ) *Controller {
return &Controller{ return &Controller{
pool: pool, pool: pool,
userData: userData, userData: userData,
store: s, store: s,
config: config, tradeProvider: tp,
notify: n, config: config,
notify: n,
} }
} }
func (cont *Controller) GetTrades(ctx *gin.Context) {
setHeaders(ctx, cont.config)
trades := cont.tradeProvider.GetTrades()
ctx.JSON(http.StatusOK, trades)
}
func (cont *Controller) GetAllTrades(ctx *gin.Context) {
setHeaders(ctx, cont.config)
trades := cont.tradeProvider.GetAllTrades()
ctx.JSON(http.StatusOK, trades)
}
func (cont *Controller) GetTradeLogs(ctx *gin.Context) {
setHeaders(ctx, cont.config)
tradeID := ctx.Param("tradeID")
logs, err := cont.store.GetLogsByTradeID(tradeID)
if err != nil {
slog.Error("error fetching trade logs", "tradeID", tradeID, "error", err)
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
return
}
ctx.JSON(http.StatusOK, logs)
}
func (cont *Controller) GetUser(ctx *gin.Context) app.User { func (cont *Controller) GetUser(ctx *gin.Context) app.User {
// This is set on the AuthRequired middleware // This is set on the AuthRequired middleware
response, ok := ctx.Get(responseKey) response, ok := ctx.Get(responseKey)

View File

@ -21,6 +21,9 @@ func SetRoutes(api *API) {
qfixpt := v1.Group("/") qfixpt := v1.Group("/")
qfixpt.Use(cont.AuthRequired) qfixpt.Use(cont.AuthRequired)
qfixpt.GET("/health", cont.HealthCheck) qfixpt.GET("/health", cont.HealthCheck)
qfixpt.GET("/trades", cont.GetTrades)
qfixpt.GET("/trades/all", cont.GetAllTrades)
qfixpt.GET("/trades/:tradeID/logs", cont.GetTradeLogs)
backoffice := qfixpt.Group("/backoffice") backoffice := qfixpt.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser) backoffice.Use(cont.BackOfficeUser)

View File

@ -32,7 +32,13 @@ type Config struct {
EnableJWTAuth bool EnableJWTAuth bool
} }
func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier) *API { // TradeProvider exposes trade data for the REST API.
type TradeProvider interface {
GetTrades() []domain.PostTrade
GetAllTrades() []domain.PostTrade
}
func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {
// Set up Gin // Set up Gin
var engine *gin.Engine var engine *gin.Engine
if version.Environment() == version.EnvironmentTypeProd { if version.Environment() == version.EnvironmentTypeProd {
@ -58,7 +64,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, config Confi
} }
api := &API{ api := &API{
Controller: newController(NewPool(), userData, storeInstance, config, notify), Controller: newController(NewPool(), userData, storeInstance, tradeProvider, config, notify),
Router: engine, Router: engine,
Port: config.Port, Port: config.Port,
} }

View File

@ -0,0 +1,147 @@
package fix
import (
"log/slog"
"quantex.com/qfixpt/quickfix"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport"
"quantex.com/qfixpt/quickfix/gen/tag"
"quantex.com/qfixpt/src/domain"
)
type application struct {
router *quickfix.MessageRouter
notifier domain.Notifier
onLogon func(quickfix.SessionID)
onLogout func(quickfix.SessionID)
onTradeCaptureReport func(tradecapturereport.TradeCaptureReport, quickfix.SessionID)
onAllocationReport func(allocationreport.AllocationReport, quickfix.SessionID)
onConfirmation func(confirmation.Confirmation, quickfix.SessionID)
onRawMessage func(direction string, msg *quickfix.Message)
}
func newApplication(n domain.Notifier) *application {
app := &application{
router: quickfix.NewMessageRouter(),
notifier: n,
}
app.router.AddRoute(tradecapturereport.Route(app.handleTradeCaptureReport))
app.router.AddRoute(allocationreport.Route(app.handleAllocationReport))
app.router.AddRoute(confirmation.Route(app.handleConfirmation))
return app
}
func (a *application) OnCreate(sessionID quickfix.SessionID) {
slog.Info("FIX session created", "session", sessionID.String())
}
func (a *application) OnLogon(sessionID quickfix.SessionID) {
slog.Info("FIX session logged on", "session", sessionID.String())
if a.onLogon != nil {
a.onLogon(sessionID)
}
}
func (a *application) OnLogout(sessionID quickfix.SessionID) {
slog.Info("FIX session logged out", "session", sessionID.String())
go a.notifier.SendMsg(domain.MessageChannelError, "Logout", domain.MessageStatusWarning, nil)
if a.onLogout != nil {
a.onLogout(sessionID)
}
}
func (a *application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {}
func (a *application) ToApp(msg *quickfix.Message, _ quickfix.SessionID) error {
if a.onRawMessage != nil {
a.onRawMessage("OUT", msg)
}
return nil
}
func (a *application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError {
return nil
}
func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError {
if a.onRawMessage != nil {
a.onRawMessage("IN", msg)
}
beginString, _ := msg.Header.GetBytes(tag.BeginString)
msgType, _ := msg.Header.GetBytes(tag.MsgType)
var applVerID quickfix.FIXString
msg.Header.GetField(tag.ApplVerID, &applVerID)
slog.Info("FIX FromApp received",
"beginString", string(beginString),
"msgType", string(msgType),
"applVerID", string(applVerID),
"session", sessionID.String(),
"rawMsg", msg.String(),
)
rejErr := a.router.Route(msg, sessionID)
if rejErr != nil {
slog.Error("FIX FromApp routing failed",
"msgType", string(msgType),
"error", rejErr.Error(),
"isBusinessReject", rejErr.IsBusinessReject(),
)
}
return rejErr
}
func (a *application) handleTradeCaptureReport(msg tradecapturereport.TradeCaptureReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
tradeReportID, _ := msg.GetTradeReportID()
slog.Info("TradeCaptureReport received",
"tradeReportID", tradeReportID,
"session", sessionID.String(),
)
if a.onTradeCaptureReport != nil {
a.onTradeCaptureReport(msg, sessionID)
}
return nil
}
func (a *application) handleAllocationReport(msg allocationreport.AllocationReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
allocReportID, _ := msg.GetAllocReportID()
slog.Info("AllocationReport received",
"allocReportID", allocReportID,
"session", sessionID.String(),
)
if a.onAllocationReport != nil {
a.onAllocationReport(msg, sessionID)
}
return nil
}
func (a *application) handleConfirmation(msg confirmation.Confirmation, sessionID quickfix.SessionID) quickfix.MessageRejectError {
confirmID, _ := msg.GetConfirmID()
slog.Info("Confirmation received",
"confirmID", confirmID,
"session", sessionID.String(),
)
if a.onConfirmation != nil {
a.onConfirmation(msg, sessionID)
}
return nil
}

581
src/client/fix/manager.go Normal file
View File

@ -0,0 +1,581 @@
package fix
import (
"log/slog"
"os"
"sync"
"time"
"github.com/rs/zerolog/log"
"github.com/shopspring/decimal"
"quantex.com/qfixpt/quickfix"
"quantex.com/qfixpt/quickfix/gen/enum"
"quantex.com/qfixpt/quickfix/gen/field"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreportack"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmationack"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereportack"
filelog "quantex.com/qfixpt/quickfix/log/file"
filestore "quantex.com/qfixpt/quickfix/store/file"
"quantex.com/qfixpt/src/app"
"quantex.com/qfixpt/src/common/tracerr"
"quantex.com/qfixpt/src/domain"
)
// Custom Tradeweb tags not in generated code.
const (
tagTestMessage = 23029
tagTWTradeID = 23068
tagTWOrigTradeID = 23096
tagTWClrID = 23025
tagDlrClrID = 23027
tagClearingStatus = 5440
tagTradingSystemID = 6731
)
// postTrade is the internal representation of a trade in memory.
type postTrade struct {
TradeID string
TradeReportID string
TradeReportType string
Side string
LastQty decimal.Decimal
LastPx decimal.Decimal
SettlDate string
TradeDate string
Status domain.PostTradeStatus
TWTradeID string
}
// Manager wraps the QuickFIX initiator for post-trade message handling.
type Manager struct {
initiator *quickfix.Initiator
app *application
tradesMu sync.RWMutex
trades map[string]*postTrade
store domain.PersistenceStore
notify domain.Notifier
cfg app.FIXConfig
}
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
return &Manager{
trades: make(map[string]*postTrade),
store: store,
notify: notify,
cfg: cfg,
}
}
func (m *Manager) Start() error {
fixApp := newApplication(m.notify)
fixApp.onLogon = m.onLogon
fixApp.onLogout = m.onLogout
fixApp.onTradeCaptureReport = m.handleTradeCaptureReport
fixApp.onAllocationReport = m.handleAllocationReport
fixApp.onConfirmation = m.handleConfirmation
fixApp.onRawMessage = m.handleRawMessage
m.app = fixApp
if err := m.loadTrades(); err != nil {
slog.Error("failed to load trades from DB, starting with empty state", "error", err)
}
f, err := os.Open(m.cfg.SettingsFile)
if err != nil {
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err)
log.Error().Msg(err.Error())
return err
}
defer f.Close()
settings, err := quickfix.ParseSettings(f)
if err != nil {
err = tracerr.Errorf("error parsing FIX settings: %s", err)
log.Error().Msg(err.Error())
return err
}
storeFactory := filestore.NewStoreFactory(settings)
logFactory, err := filelog.NewLogFactory(settings)
if err != nil {
err = tracerr.Errorf("error creating file log factory: %s", err)
log.Error().Msg(err.Error())
return err
}
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
if err != nil {
err = tracerr.Errorf("error creating FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
}
m.initiator = initiator
if err = m.initiator.Start(); err != nil {
err = tracerr.Errorf("error starting FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
}
slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile)
return nil
}
func (m *Manager) Stop() {
if m.initiator != nil {
m.initiator.Stop()
slog.Info("FIX initiator stopped")
}
}
func (m *Manager) onLogon(sessionID quickfix.SessionID) {
slog.Info("FIX PT session logged on", "session", sessionID.String())
}
func (m *Manager) onLogout(sessionID quickfix.SessionID) {
slog.Info("FIX PT session logged out", "session", sessionID.String())
}
// handleTradeCaptureReport processes an incoming TradeCaptureReport (35=AE).
func (m *Manager) handleTradeCaptureReport(msg tradecapturereport.TradeCaptureReport, sessionID quickfix.SessionID) {
tradeReportID, _ := msg.GetTradeReportID()
// ACK immediately before any validation.
ack := tradecapturereportack.New()
ack.SetTradeReportID(tradeReportID)
if err := quickfix.SendToTarget(ack, sessionID); err != nil {
slog.Error("failed to send TradeCaptureReportAck", "tradeReportID", tradeReportID, "error", err.Error())
} else {
slog.Info("TradeCaptureReportAck sent", "tradeReportID", tradeReportID)
}
// Check if test message.
testMessage := getCustomBool(&msg.Message.Body.FieldMap, tagTestMessage)
if testMessage {
slog.Info("test TradeCaptureReport, skipping", "tradeReportID", tradeReportID)
return
}
// Parse trade data.
tradeReportType, _ := msg.GetTradeReportType()
tradeID, _ := msg.GetTradeID()
execID, _ := msg.GetExecID()
lastQty, _ := msg.GetLastQty()
lastPx, _ := msg.GetLastPx()
spread, _ := msg.GetSpread()
settlDate, _ := msg.GetSettlDate()
tradeDate, _ := msg.GetTradeDate()
grossTradeAmt, _ := msg.GetGrossTradeAmt()
tradeReportRefID, _ := msg.GetTradeReportRefID()
// Custom fields.
twTradeID := getCustomString(&msg.Message.Body.FieldMap, tagTWTradeID)
twOrigTradeID := getCustomString(&msg.Message.Body.FieldMap, tagTWOrigTradeID)
// Extract side-level fields (parties, accrued interest, net money).
var side string
var accruedInterest, netMoney decimal.Decimal
var clearingFirm, twTraderID string
sides, sideErr := msg.GetNoSides()
if sideErr == nil && sides.Len() > 0 {
s := sides.Get(0)
sideVal, _ := s.GetSide()
side = string(sideVal)
accruedInterest, _ = s.GetAccruedInterestAmt()
netMoney, _ = s.GetNetMoney()
clearingFirm = extractPartyByRole(s, enum.PartyRole_CLEARING_FIRM)
// 1007 is a custom Tradeweb role, access via string comparison.
twTraderID = extractPartyByRole(s, "1007")
}
trade := domain.Trade{
TradeReportID: tradeReportID,
TradeID: tradeID,
ExecID: execID,
LastQty: lastQty,
LastPx: lastPx,
Spread: spread,
Side: side,
AccruedInterest: accruedInterest,
NetMoney: netMoney,
GrossTradeAmt: grossTradeAmt,
SettlDate: settlDate,
TradeDate: tradeDate,
TradeReportType: string(tradeReportType),
TradeReportRefID: tradeReportRefID,
ClearingFirm: clearingFirm,
TradewebTraderID: twTraderID,
TWTradeID: twTradeID,
TWOrigTradeID: twOrigTradeID,
}
slog.Info("TradeCaptureReport parsed",
"tradeReportID", trade.TradeReportID,
"tradeReportType", trade.TradeReportType,
"tradeID", trade.TradeID,
"lastQty", trade.LastQty.String(),
"lastPx", trade.LastPx.String(),
"side", trade.Side,
"settlDate", trade.SettlDate,
)
// Persist structured message.
m.persistMessage(tradeID, parseTradeCaptureReportJSON(msg))
// Update in-memory state.
var status domain.PostTradeStatus
switch string(tradeReportType) {
case "101": // TRDCONF — trade confirmation
status = domain.PostTradeStatusActive
slog.Info("TRDCONF: trade confirmation", "tradeReportID", tradeReportID, "tradeID", tradeID)
case "102": // TRDBLOCK — block trade, log only
status = domain.PostTradeStatusActive
slog.Info("TRDBLOCK: block trade", "tradeReportID", tradeReportID, "tradeID", tradeID)
case "103": // TRDCORR — trade correction
status = domain.PostTradeStatusCorrected
slog.Info("TRDCORR: trade correction", "tradeReportID", tradeReportID, "refID", tradeReportRefID)
case "104": // TRDCXL — trade cancellation
status = domain.PostTradeStatusCancelled
slog.Info("TRDCXL: trade cancel", "tradeReportID", tradeReportID, "refID", tradeReportRefID)
case "105", "106", "107", "108": // log and ack only, no business logic
status = domain.PostTradeStatusActive
slog.Info("trade report (no business logic)", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID)
default:
status = domain.PostTradeStatusActive
slog.Warn("unexpected TradeReportType", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID)
}
m.tradesMu.Lock()
m.trades[tradeID] = &postTrade{
TradeID: tradeID,
TradeReportID: tradeReportID,
TradeReportType: string(tradeReportType),
Side: side,
LastQty: lastQty,
LastPx: lastPx,
SettlDate: settlDate,
TradeDate: tradeDate,
Status: status,
TWTradeID: twTradeID,
}
m.tradesMu.Unlock()
}
// handleAllocationReport processes an incoming AllocationReport (35=AS).
func (m *Manager) handleAllocationReport(msg allocationreport.AllocationReport, sessionID quickfix.SessionID) {
allocReportID, _ := msg.GetAllocReportID()
// ACK immediately.
ack := allocationreportack.New(field.NewAllocReportID(allocReportID))
if err := quickfix.SendToTarget(ack, sessionID); err != nil {
slog.Error("failed to send AllocationReportAck", "allocReportID", allocReportID, "error", err.Error())
} else {
slog.Info("AllocationReportAck sent", "allocReportID", allocReportID)
}
// Check if test message.
testMessage := getCustomBool(&msg.Message.Body.FieldMap, tagTestMessage)
if testMessage {
slog.Info("test AllocationReport, skipping", "allocReportID", allocReportID)
return
}
allocTransType, _ := msg.GetAllocTransType()
// Extract TradeID: try tag 1003 directly, fall back to SecondaryOrderID (198) from NoOrders.
tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003)
if tradeID == "" {
if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 {
if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" {
tradeID = v
}
}
}
alloc := domain.Allocation{
AllocReportID: allocReportID,
AllocTransType: string(allocTransType),
TradeID: tradeID,
}
// Parse NoAllocs repeating group.
allocs, allocErr := msg.GetNoAllocs()
if allocErr == nil {
for i := 0; i < allocs.Len(); i++ {
entry := allocs.Get(i)
account, _ := entry.GetAllocAccount()
qty, _ := entry.GetAllocQty()
individualID, _ := entry.GetIndividualAllocID()
price, _ := entry.GetAllocPrice()
alloc.Entries = append(alloc.Entries, domain.AllocEntry{
AllocAccount: account,
AllocQty: qty,
IndividualAllocID: individualID,
AllocPrice: price,
})
}
}
slog.Info("AllocationReport parsed",
"allocReportID", alloc.AllocReportID,
"allocTransType", alloc.AllocTransType,
"tradeID", alloc.TradeID,
"numEntries", len(alloc.Entries),
)
// Persist structured message.
m.persistMessage(tradeID, parseAllocationReportJSON(msg))
switch allocTransType {
case enum.AllocTransType_NEW:
slog.Info("new allocation", "allocReportID", allocReportID)
case enum.AllocTransType_REPLACE:
slog.Info("replace allocation", "allocReportID", allocReportID)
case enum.AllocTransType_CANCEL:
slog.Info("cancel allocation", "allocReportID", allocReportID)
default:
slog.Warn("unhandled AllocTransType", "allocTransType", string(allocTransType), "allocReportID", allocReportID)
}
}
// handleConfirmation processes an incoming Confirmation (35=AK).
func (m *Manager) handleConfirmation(msg confirmation.Confirmation, sessionID quickfix.SessionID) {
confirmID, _ := msg.GetConfirmID()
tradeDate, _ := msg.GetTradeDate()
transactTime, _ := msg.GetTransactTime()
// ACK immediately.
ack := confirmationack.New(
field.NewConfirmID(confirmID),
field.NewTradeDate(tradeDate),
field.NewTransactTime(transactTime),
field.NewAffirmStatus(enum.AffirmStatus_RECEIVED),
)
if err := quickfix.SendToTarget(ack, sessionID); err != nil {
slog.Error("failed to send ConfirmationAck", "confirmID", confirmID, "error", err.Error())
} else {
slog.Info("ConfirmationAck sent", "confirmID", confirmID)
}
// Check if test message.
testMessage := getCustomBool(&msg.Message.Body.FieldMap, tagTestMessage)
if testMessage {
slog.Info("test Confirmation, skipping", "confirmID", confirmID)
return
}
confirmStatus, _ := msg.GetConfirmStatus()
confirmTransType, _ := msg.GetConfirmTransType()
individualAllocID, _ := msg.GetIndividualAllocID()
// Custom fields.
twClrID := getCustomString(&msg.Message.Body.FieldMap, tagTWClrID)
dlrClrID := getCustomString(&msg.Message.Body.FieldMap, tagDlrClrID)
clearingStatus := getCustomString(&msg.Message.Body.FieldMap, tagClearingStatus)
conf := domain.ConfirmationMsg{
ConfirmID: confirmID,
ConfirmStatus: string(confirmStatus),
ConfirmTransType: string(confirmTransType),
IndividualAllocID: individualAllocID,
TWClrID: twClrID,
DlrClrID: dlrClrID,
ClearingStatus: clearingStatus,
TradeDate: tradeDate,
}
slog.Info("Confirmation parsed",
"confirmID", conf.ConfirmID,
"confirmStatus", conf.ConfirmStatus,
"confirmTransType", conf.ConfirmTransType,
"individualAllocID", conf.IndividualAllocID,
"clearingStatus", conf.ClearingStatus,
)
// Persist structured message. Use TradeID if available, fall back to ConfirmID.
persistID := getCustomString(&msg.Message.Body.FieldMap, 1003)
if persistID == "" {
persistID = confirmID
}
m.persistMessage(persistID, parseConfirmationJSON(msg))
}
// persistMessage saves a structured FIX message to the messages table.
func (m *Manager) persistMessage(tradeID string, fixJSON domain.FixMessageJSON) {
if err := m.store.SaveMessage(domain.TradeMessage{
TradeID: tradeID,
JMessage: fixJSON,
}); err != nil {
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "tradeID", tradeID, "error", err)
}
}
// handleRawMessage persists raw FIX message strings to the logs table.
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
tradeID := extractTradeIdentifier(msg)
if tradeID == "" {
return
}
if err := m.store.SaveLog(domain.LogEntry{
TradeID: tradeID,
RawMsg: "[" + direction + "] " + msg.String(),
}); err != nil {
slog.Error("failed to persist raw log", "error", err)
}
}
// loadTrades reconstructs all trades and their states from today's messages in the database.
func (m *Manager) loadTrades() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
trades := make(map[string]*postTrade)
for _, msg := range messages {
switch msg.JMessage.MsgType {
case "AE": // TradeCaptureReport
body := msg.JMessage.Body
tradeReportType, _ := body["TradeReportType"].(string)
tradeID := msg.TradeID
if tradeID == "" {
continue
}
t, exists := trades[tradeID]
if !exists {
t = &postTrade{
TradeID: tradeID,
Status: domain.PostTradeStatusActive,
}
trades[tradeID] = t
}
if v, ok := body["TradeReportID"].(string); ok {
t.TradeReportID = v
}
t.TradeReportType = tradeReportType
if v, ok := body["Side"].(string); ok {
t.Side = v
}
if v, ok := body["LastQty"].(string); ok {
t.LastQty, _ = decimal.NewFromString(v)
}
if v, ok := body["LastPx"].(string); ok {
t.LastPx, _ = decimal.NewFromString(v)
}
if v, ok := body["SettlDate"].(string); ok {
t.SettlDate = v
}
if v, ok := body["TradeDate"].(string); ok {
t.TradeDate = v
}
if v, ok := body["TWTradeID"].(string); ok {
t.TWTradeID = v
}
switch tradeReportType {
case "103": // TRDCORR
t.Status = domain.PostTradeStatusCorrected
case "104": // TRDCXL
t.Status = domain.PostTradeStatusCancelled
default:
if t.Status != domain.PostTradeStatusCorrected && t.Status != domain.PostTradeStatusCancelled {
t.Status = domain.PostTradeStatusActive
}
}
}
}
active := 0
for _, t := range trades {
if t.Status == domain.PostTradeStatusActive {
active++
}
}
m.tradesMu.Lock()
m.trades = trades
m.tradesMu.Unlock()
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
return nil
}
// GetTrades returns only active trades.
func (m *Manager) GetTrades() []domain.PostTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
var result []domain.PostTrade
for _, t := range m.trades {
if t.Status == domain.PostTradeStatusActive {
result = append(result, t.toPostTrade())
}
}
return result
}
// GetAllTrades returns all trades regardless of status.
func (m *Manager) GetAllTrades() []domain.PostTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
result := make([]domain.PostTrade, 0, len(m.trades))
for _, t := range m.trades {
result = append(result, t.toPostTrade())
}
return result
}
func (t *postTrade) toPostTrade() domain.PostTrade {
return domain.PostTrade{
TradeID: t.TradeID,
TradeReportID: t.TradeReportID,
TradeReportType: t.TradeReportType,
Side: t.Side,
LastQty: t.LastQty.String(),
LastPx: t.LastPx.String(),
SettlDate: t.SettlDate,
TradeDate: t.TradeDate,
Status: t.Status,
TWTradeID: t.TWTradeID,
}
}
// Ensure transactTime has a sensible default if not present in the message.
var _ = time.Now // used by confirmationack.New via field.NewTransactTime

356
src/client/fix/parser.go Normal file
View File

@ -0,0 +1,356 @@
package fix
import (
"time"
"quantex.com/qfixpt/quickfix"
"quantex.com/qfixpt/quickfix/gen/enum"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport"
"quantex.com/qfixpt/quickfix/gen/tag"
"quantex.com/qfixpt/src/domain"
)
func getCustomString(body *quickfix.FieldMap, t int) string {
v, err := body.GetString(quickfix.Tag(t))
if err != nil {
return ""
}
return v
}
func getCustomInt(body *quickfix.FieldMap, t int) int {
v, err := body.GetInt(quickfix.Tag(t))
if err != nil {
return 0
}
return v
}
func getCustomBool(body *quickfix.FieldMap, t int) bool {
v, err := body.GetBool(quickfix.Tag(t))
if err != nil {
return false
}
return v
}
func extractHeader(msg *quickfix.Message) map[string]interface{} {
h := make(map[string]interface{})
if v, err := msg.Header.GetBytes(tag.BeginString); err == nil {
h["BeginString"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.MsgType); err == nil {
h["MsgType"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil {
h["SenderCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil {
h["TargetCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil {
h["SendingTime"] = string(v)
}
return h
}
func parseTradeCaptureReportJSON(msg tradecapturereport.TradeCaptureReport) domain.FixMessageJSON {
tradeID, _ := msg.GetTradeID()
body := map[string]interface{}{}
if v, _ := msg.GetTradeReportID(); v != "" {
body["TradeReportID"] = v
}
body["TradeID"] = tradeID
if v, _ := msg.GetExecID(); v != "" {
body["ExecID"] = v
}
if v, _ := msg.GetTradeReportType(); v != "" {
body["TradeReportType"] = string(v)
}
if v, _ := msg.GetLastQty(); !v.IsZero() {
body["LastQty"] = v.String()
}
if v, _ := msg.GetLastPx(); !v.IsZero() {
body["LastPx"] = v.String()
}
if v, _ := msg.GetSpread(); !v.IsZero() {
body["Spread"] = v.String()
}
if v, _ := msg.GetSettlDate(); v != "" {
body["SettlDate"] = v
}
if v, _ := msg.GetTradeDate(); v != "" {
body["TradeDate"] = v
}
if v, _ := msg.GetGrossTradeAmt(); !v.IsZero() {
body["GrossTradeAmt"] = v.String()
}
if v, _ := msg.GetTradeReportRefID(); v != "" {
body["TradeReportRefID"] = v
}
// Multi-leg support.
if v, _ := msg.GetTradeLegRefID(); v != "" {
body["TradeLegRefID"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagTradingSystemID); v != "" {
body["TradingSystemID"] = v
}
// Custom fields.
if v := getCustomString(&msg.Message.Body.FieldMap, tagTWTradeID); v != "" {
body["TWTradeID"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagTWOrigTradeID); v != "" {
body["TWOrigTradeID"] = v
}
// Side-level fields.
sides, sideErr := msg.GetNoSides()
if sideErr == nil && sides.Len() > 0 {
s := sides.Get(0)
if v, e := s.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := s.GetAccruedInterestAmt(); e == nil {
body["AccruedInterestAmt"] = v.String()
}
if v, e := s.GetNetMoney(); e == nil {
body["NetMoney"] = v.String()
}
if v := extractPartyByRole(s, enum.PartyRole_CLEARING_FIRM); v != "" {
body["ClearingFirm"] = v
}
if v := extractPartyByRole(s, "1007"); v != "" {
body["TradewebTraderID"] = v
}
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AE",
TradeID: tradeID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseAllocationReportJSON(msg allocationreport.AllocationReport) domain.FixMessageJSON {
// Primary: tag 1003 directly on the body.
tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003)
// Fallback: SecondaryOrderID (198) from the NoOrders repeating group.
if tradeID == "" {
if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 {
if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" {
tradeID = v
}
}
}
body := map[string]interface{}{}
if v, _ := msg.GetAllocReportID(); v != "" {
body["AllocReportID"] = v
}
if v, _ := msg.GetAllocTransType(); v != "" {
body["AllocTransType"] = string(v)
}
body["TradeID"] = tradeID
// TradingSystemID (6731) — for multi-leg, differentiates each leg.
if v := getCustomString(&msg.Message.Body.FieldMap, tagTradingSystemID); v != "" {
body["TradingSystemID"] = v
}
// SecondaryOrderID (198) from NoOrders — explicit reference to original trade.
if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 {
if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" {
body["SecondaryOrderID"] = v
}
}
allocs, allocErr := msg.GetNoAllocs()
if allocErr == nil {
var entries []map[string]interface{}
for i := 0; i < allocs.Len(); i++ {
entry := allocs.Get(i)
e := map[string]interface{}{}
if v, err := entry.GetAllocAccount(); err == nil {
e["AllocAccount"] = v
}
if v, err := entry.GetAllocQty(); err == nil {
e["AllocQty"] = v.String()
}
if v, err := entry.GetIndividualAllocID(); err == nil {
e["IndividualAllocID"] = v
}
if v, err := entry.GetAllocPrice(); err == nil {
e["AllocPrice"] = v.String()
}
entries = append(entries, e)
}
body["Allocs"] = entries
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AS",
TradeID: tradeID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseConfirmationJSON(msg confirmation.Confirmation) domain.FixMessageJSON {
// Try TradeID first, fall back to ConfirmID.
tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003)
if tradeID == "" {
tradeID, _ = msg.GetConfirmID()
}
body := map[string]interface{}{}
if v, _ := msg.GetConfirmID(); v != "" {
body["ConfirmID"] = v
}
if v, _ := msg.GetConfirmStatus(); v != "" {
body["ConfirmStatus"] = string(v)
}
if v, _ := msg.GetConfirmTransType(); v != "" {
body["ConfirmTransType"] = string(v)
}
if v, _ := msg.GetIndividualAllocID(); v != "" {
body["IndividualAllocID"] = v
}
if v, _ := msg.GetTradeDate(); v != "" {
body["TradeDate"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagTWClrID); v != "" {
body["TWClrID"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagDlrClrID); v != "" {
body["DlrClrID"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagClearingStatus); v != "" {
body["ClearingStatus"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AK",
TradeID: tradeID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func extractTradeIdentifier(msg *quickfix.Message) string {
// Primary: TradeID (1003).
var tradeID quickfix.FIXString
if err := msg.Body.GetField(tag.TradeID, &tradeID); err == nil && string(tradeID) != "" {
return string(tradeID)
}
// Fallback: SecondaryOrderID (198) — used in AllocationReport to reference the trade.
var secOrderID quickfix.FIXString
if err := msg.Body.GetField(tag.SecondaryOrderID, &secOrderID); err == nil && string(secOrderID) != "" {
return string(secOrderID)
}
// Last resort: ConfirmID (664) — for Confirmation messages without TradeID.
var confirmID quickfix.FIXString
if err := msg.Body.GetField(tag.ConfirmID, &confirmID); err == nil && string(confirmID) != "" {
return string(confirmID)
}
return ""
}
func buildOutgoingMessageJSON(msgType, tradeID string, body map[string]interface{}) domain.FixMessageJSON {
return domain.FixMessageJSON{
Direction: "OUT",
MsgType: msgType,
TradeID: tradeID,
Body: body,
ReceiveTime: time.Now(),
}
}
// extractPartyByRole iterates the Parties repeating group inside a NoSides entry
// and returns the PartyID for the given target role.
func extractPartyByRole(side tradecapturereport.NoSides, targetRole enum.PartyRole) string {
parties, err := side.GetNoPartyIDs()
if err != nil {
return ""
}
for i := 0; i < parties.Len(); i++ {
party := parties.Get(i)
role, roleErr := party.GetPartyRole()
if roleErr != nil {
continue
}
if role == targetRole {
id, idErr := party.GetPartyID()
if idErr != nil {
return ""
}
return id
}
}
return ""
}

16
src/client/store/db.sql Normal file
View File

@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS qfixpt_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
trade_id TEXT NOT NULL,
j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_pt_messages_trade_id ON qfixpt_messages(trade_id);
CREATE INDEX IF NOT EXISTS idx_pt_messages_created_at ON qfixpt_messages(created_at);
CREATE TABLE IF NOT EXISTS qfixpt_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
trade_id TEXT NOT NULL UNIQUE,
raw_msg TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

View File

@ -2,7 +2,9 @@
package store package store
import ( import (
_ "embed"
"log/slog" "log/slog"
"strings"
"time" "time"
"quantex.com.ar/multidb" "quantex.com.ar/multidb"
@ -11,6 +13,9 @@ import (
"quantex.com/qfixpt/src/common/tracerr" "quantex.com/qfixpt/src/common/tracerr"
) )
//go:embed db.sql
var schemaSQL string
const dbPingSeconds = 30 const dbPingSeconds = 30
type Store struct { type Store struct {
@ -45,9 +50,31 @@ func New(config Config) (*Store, error) {
go s.db.PeriodicDBPing(time.Second * dbPingSeconds) go s.db.PeriodicDBPing(time.Second * dbPingSeconds)
if err := s.ensureTables(); err != nil {
return nil, tracerr.Errorf("error ensuring tables: %w", err)
}
return s, nil return s, nil
} }
func (p *Store) ensureTables() error {
statements := strings.Split(schemaSQL, ";")
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := p.db.Exec(stmt); err != nil {
return tracerr.Errorf("error executing schema statement: %w", err)
}
}
slog.Info("database tables ensured")
return nil
}
func (p *Store) CloseDB() { func (p *Store) CloseDB() {
p.db.Close() p.db.Close()
slog.Info("closing database connection.") slog.Info("closing database connection.")

View File

@ -0,0 +1,103 @@
package store
import (
"encoding/json"
"strings"
"time"
"quantex.com/qfixpt/src/common/tracerr"
"quantex.com/qfixpt/src/domain"
)
func (p *Store) SaveMessage(msg domain.TradeMessage) error {
jsonBytes, err := json.Marshal(msg.JMessage)
if err != nil {
return tracerr.Errorf("error marshaling j_message: %w", err)
}
_, err = p.db.Exec(
"INSERT INTO qfixpt_messages (trade_id, j_message) VALUES ($1, $2)",
msg.TradeID, string(jsonBytes),
)
if err != nil {
return tracerr.Errorf("error inserting message: %w", err)
}
return nil
}
func (p *Store) SaveLog(entry domain.LogEntry) error {
upsertStmt := `INSERT INTO qfixpt_logs (trade_id, raw_msg)
VALUES ($1, $2)
ON CONFLICT (trade_id) DO UPDATE
SET raw_msg = qfixpt_logs.raw_msg || E'\n' || EXCLUDED.raw_msg,
updated_at = NOW()`
_, err := p.db.Exec(upsertStmt, entry.TradeID, entry.RawMsg)
if err != nil {
return tracerr.Errorf("error upserting log: %w", err)
}
return nil
}
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
rows, err := p.db.Query(
"SELECT id, trade_id, j_message, created_at FROM qfixpt_messages WHERE created_at >= current_date ORDER BY created_at ASC",
)
if err != nil {
return nil, tracerr.Errorf("error querying today messages: %w", err)
}
defer rows.Close()
var messages []domain.TradeMessage
for rows.Next() {
var (
id, tradeID string
jMessageRaw []byte
createdAt time.Time
)
if err := rows.Scan(&id, &tradeID, &jMessageRaw, &createdAt); err != nil {
return nil, tracerr.Errorf("error scanning message row: %w", err)
}
var jMessage domain.FixMessageJSON
if err := json.Unmarshal(jMessageRaw, &jMessage); err != nil {
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
}
messages = append(messages, domain.TradeMessage{
ID: id,
TradeID: tradeID,
JMessage: jMessage,
CreatedAt: createdAt,
})
}
if err := rows.Err(); err != nil {
return nil, tracerr.Errorf("error iterating message rows: %w", err)
}
return messages, nil
}
func (p *Store) GetLogsByTradeID(tradeID string) (domain.Logs, error) {
rows, err := p.db.Query("SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", tradeID)
if err != nil {
return domain.Logs{}, tracerr.Errorf("error querying logs: %w", err)
}
defer rows.Close()
if !rows.Next() {
return domain.Logs{}, nil
}
var rawMsg string
if err := rows.Scan(&rawMsg); err != nil {
return domain.Logs{}, tracerr.Errorf("error scanning log row: %w", err)
}
return domain.Logs{Entries: strings.Split(rawMsg, "\n")}, nil
}

View File

@ -8,6 +8,7 @@ import (
"quantex.com/qfixpt/src/app" "quantex.com/qfixpt/src/app"
"quantex.com/qfixpt/src/client/api/rest" "quantex.com/qfixpt/src/client/api/rest"
"quantex.com/qfixpt/src/client/data" "quantex.com/qfixpt/src/client/data"
"quantex.com/qfixpt/src/client/fix"
googlechat "quantex.com/qfixpt/src/client/notify/google" googlechat "quantex.com/qfixpt/src/client/notify/google"
"quantex.com/qfixpt/src/client/store" "quantex.com/qfixpt/src/client/store"
"quantex.com/qfixpt/src/client/store/external" "quantex.com/qfixpt/src/client/store/external"
@ -38,6 +39,13 @@ func Runner(cfg app.Config) error {
userData := data.New() userData := data.New()
// Initialize FIX Post-Trade Manager.
fixManager := fix.NewManager(cfg.FIX, appStore, notify)
if err = fixManager.Start(); err != nil {
return fmt.Errorf("error starting FIX initiator: %w", err)
}
defer fixManager.Stop()
apiConfig := rest.Config{ apiConfig := rest.Config{
Port: cfg.APIBasePort, Port: cfg.APIBasePort,
AllowedOrigins: cfg.AllowedOrigins, AllowedOrigins: cfg.AllowedOrigins,
@ -46,7 +54,7 @@ func Runner(cfg app.Config) error {
EnableJWTAuth: cfg.EnableJWTAuth, EnableJWTAuth: cfg.EnableJWTAuth,
} }
api := rest.New(userData, appStore, apiConfig, notify) api := rest.New(userData, appStore, fixManager, apiConfig, notify)
api.Run() api.Run()
cmd.WaitForInterruptSignal(nil) cmd.WaitForInterruptSignal(nil)

63
src/domain/persistence.go Normal file
View File

@ -0,0 +1,63 @@
package domain
import "time"
// PostTradeStatus represents the lifecycle state of a post-trade record.
type PostTradeStatus string
const (
PostTradeStatusActive PostTradeStatus = "active"
PostTradeStatusCorrected PostTradeStatus = "corrected"
PostTradeStatusCancelled PostTradeStatus = "cancelled"
)
// FixMessageJSON is the structured representation of a FIX message for storage.
type FixMessageJSON struct {
Direction string `json:"direction"`
MsgType string `json:"msg_type"`
TradeID string `json:"trade_id"`
Header map[string]interface{} `json:"header"`
Body map[string]interface{} `json:"body"`
ReceiveTime time.Time `json:"receive_time"`
}
// TradeMessage is a row in qfixpt_messages.
type TradeMessage struct {
ID string `json:"id"`
TradeID string `json:"trade_id"`
JMessage FixMessageJSON `json:"j_message"`
CreatedAt time.Time `json:"created_at"`
}
// LogEntry is the DTO for inserting/appending a raw log in qfixpt_logs.
type LogEntry struct {
TradeID string
RawMsg string
}
// Logs is the response for GET /trades/:tradeID/logs.
type Logs struct {
Entries []string `json:"entries"`
}
// PostTrade is the exported representation of a post-trade record for the API.
type PostTrade struct {
TradeID string `json:"trade_id"`
TradeReportID string `json:"trade_report_id"`
TradeReportType string `json:"trade_report_type"`
Side string `json:"side"`
LastQty string `json:"last_qty"`
LastPx string `json:"last_px"`
SettlDate string `json:"settl_date"`
TradeDate string `json:"trade_date"`
Status PostTradeStatus `json:"status"`
TWTradeID string `json:"tw_trade_id"`
}
// PersistenceStore defines the persistence interface for post-trade messages.
type PersistenceStore interface {
SaveMessage(msg TradeMessage) error
SaveLog(entry LogEntry) error
GetTodayMessages() ([]TradeMessage, error)
GetLogsByTradeID(tradeID string) (Logs, error)
}

61
src/domain/posttrade.go Normal file
View File

@ -0,0 +1,61 @@
package domain
import "github.com/shopspring/decimal"
// Trade represents data extracted from a TradeCaptureReport (35=AE).
type Trade struct {
TradeReportID string // 571
TradeID string // 1003
ExecID string // 17
OrigTradeID string // 1903
OrigSecondaryID string // 1906
LastQty decimal.Decimal // 32
LastPx decimal.Decimal // 31
Spread decimal.Decimal // 218 (236 = benchmark curve name)
Side string // 54
AccruedInterest decimal.Decimal // 159
NetMoney decimal.Decimal // 118
GrossTradeAmt decimal.Decimal // 381
SettlDate string // 64
TradeDate string // 75
ListID string // 66
TradeReportType string // 856
TradeHandlInst string // 487
TradeReportRefID string // 572
ClearingFirm string // Party role=4
TradewebTraderID string // Party role=1007 (custom)
TestMessage bool // 23029 (custom)
TWTradeID string // 23068 (custom)
TWOrigTradeID string // 23096 (custom)
}
// Allocation represents data extracted from an AllocationReport (35=AS).
type Allocation struct {
AllocReportID string // 755
AllocTransType string // 71
TradeID string // 1003
TestMessage bool // 23029 (custom)
Entries []AllocEntry // NoAllocs group (78)
}
// AllocEntry represents a single entry in the NoAllocs repeating group.
type AllocEntry struct {
AllocAccount string // 79
AllocQty decimal.Decimal // 80
IndividualAllocID string // 467
AllocPrice decimal.Decimal // 154 (via custom tag access if needed)
CashSettlAmount decimal.Decimal // 742 (via custom tag access if needed)
}
// ConfirmationMsg represents data extracted from a Confirmation (35=AK).
type ConfirmationMsg struct {
ConfirmID string // 664
ConfirmStatus string // 665
ConfirmTransType string // 666
IndividualAllocID string // 467
TWClrID string // 23025 (custom)
DlrClrID string // 23027 (custom)
ClearingStatus string // 5440 (custom)
TradeDate string // 75
TestMessage bool // 23029 (custom)
}