3 Commits

21 changed files with 1417 additions and 413 deletions

View File

@ -56,13 +56,13 @@ build: check-env swag vendor only-build # Build a native version. Set e=environm
only-build: check-env only-build: check-env
@echo "Building for $(e) environment..." @echo "Building for $(e) environment..."
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e) env OUT_PATH=$(DEFAULT_OUT_PATH) tools/build.sh $(e)
linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e) env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
deploy: check-env # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=qfixpt deploy: # Deploy to remote server. Set s=serverName, e=environment. e.g. make deploy e=dev s=nonprodFix
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/ make linux-build e=$(e) && qscp build/out/distribution/qfixpt.gz $(s):/home/quantex/qfixtb/qfixpt/
fmt: download-versions # Apply the Go formatter to the code fmt: download-versions # Apply the Go formatter to the code
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt); cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);

19
fix_pt.cfg Normal file
View File

@ -0,0 +1,19 @@
[DEFAULT]
ConnectionType=initiator
HeartBtInt=30
ReconnectInterval=30
SenderCompID=<CONFIGURAR>
ResetOnLogon=Y
FileStorePath=fix_store
FileLogPath=fix_logs
[SESSION]
BeginString=FIXT.1.1
DefaultApplVerID=FIX.5.0SP2
TargetCompID=<CONFIGURAR>
TransportDataDictionary=spec/FIXT11.xml
AppDataDictionary=spec/FIX50SP2.xml
StartTime=00:00:00
EndTime=00:00:00
SocketConnectHost=<CONFIGURAR>
SocketConnectPort=<CONFIGURAR>

View File

@ -178,11 +178,6 @@ func parseLogLevel(level string) (slog.Level, error) {
func startRunner(runner, globalCfg, serviceCfg string) { func startRunner(runner, globalCfg, serviceCfg string) {
var fn func(cfg app.Config) error var fn func(cfg app.Config) error
if runner == "" {
runner = "service"
}
switch runner { switch runner {
case "service": case "service":
fn = service.Runner fn = service.Runner

View File

@ -38,8 +38,9 @@ type Service struct {
EnableJWTAuth bool // Enable JWT authentication for service-to-service communication EnableJWTAuth bool // Enable JWT authentication for service-to-service communication
FIX FIXConfig FIX FIXConfig
} }
type FIXConfig struct { type FIXConfig struct {
SettingsFile string // path to fix.cfg file SettingsFile string // path to fix_pt.cfg file
} }
type ExtAuth struct { type ExtAuth struct {

View File

@ -3,6 +3,7 @@ package version
import ( import (
"fmt" "fmt"
"os"
"runtime" "runtime"
"strings" "strings"
) )
@ -37,17 +38,17 @@ type EnvironmentType int //nolint:recvcheck // The methods of this are autogener
var environment EnvironmentType //nolint:gochecknoglobals // Just keept this global to avoid having to create an instance var environment EnvironmentType //nolint:gochecknoglobals // Just keept this global to avoid having to create an instance
func init() { func init() {
// aux := os.Getenv(quantexEnvironment) aux := os.Getenv(quantexEnvironment)
// if aux == "" { if aux == "" {
// panic("QUANTEX_ENVIRONMENT is not set") panic("QUANTEX_ENVIRONMENT is not set")
// } }
// env, err := ParseEnvironmentType(aux) env, err := ParseEnvironmentType(aux)
// if err != nil { if err != nil {
// panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error()) panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error())
// } }
environment = EnvironmentTypeDev environment = env
} }
// Base returns the version base name // Base returns the version base name

View File

@ -32,25 +32,53 @@ type Controller struct {
pool *redis.Pool pool *redis.Pool
userData app.UserDataProvider userData app.UserDataProvider
store *store.Store store *store.Store
tradeProvider TradeProvider
config Config config Config
notify domain.Notifier notify domain.Notifier
orderRepo domain.OrderRepository
authMutex deadlock.Mutex authMutex deadlock.Mutex
} }
func newController(pool *redis.Pool, userData app.UserDataProvider, func newController(pool *redis.Pool, userData app.UserDataProvider,
s *store.Store, config Config, n domain.Notifier, repo domain.OrderRepository, s *store.Store, tp TradeProvider, config Config, n domain.Notifier,
) *Controller { ) *Controller {
return &Controller{ return &Controller{
pool: pool, pool: pool,
userData: userData, userData: userData,
store: s, store: s,
tradeProvider: tp,
config: config, config: config,
notify: n, notify: n,
orderRepo: repo,
} }
} }
func (cont *Controller) GetTrades(ctx *gin.Context) {
setHeaders(ctx, cont.config)
trades := cont.tradeProvider.GetTrades()
ctx.JSON(http.StatusOK, trades)
}
func (cont *Controller) GetAllTrades(ctx *gin.Context) {
setHeaders(ctx, cont.config)
trades := cont.tradeProvider.GetAllTrades()
ctx.JSON(http.StatusOK, trades)
}
func (cont *Controller) GetTradeLogs(ctx *gin.Context) {
setHeaders(ctx, cont.config)
tradeID := ctx.Param("tradeID")
logs, err := cont.store.GetLogsByTradeID(tradeID)
if err != nil {
slog.Error("error fetching trade logs", "tradeID", tradeID, "error", err)
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
return
}
ctx.JSON(http.StatusOK, logs)
}
func (cont *Controller) GetUser(ctx *gin.Context) app.User { func (cont *Controller) GetUser(ctx *gin.Context) app.User {
// This is set on the AuthRequired middleware // This is set on the AuthRequired middleware
response, ok := ctx.Get(responseKey) response, ok := ctx.Get(responseKey)
@ -290,82 +318,3 @@ func allowed(origin string, config Config) bool {
return false return false
} }
// GetExecutions godoc
// @Summary Get all execution reports
// @Description Returns all FIX ExecutionReport messages received by the initiator
// @Tags executions
// @Produce json
// @Success 200 {array} ExecutionReportResponse
// @Failure 500 {object} HTTPError
// @Router /executions [get]
func (cont *Controller) GetExecutions(ctx *gin.Context) {
setHeaders(ctx, cont.config)
reports := cont.orderRepo.GetAll()
response := make([]ExecutionReportResponse, 0, len(reports))
for _, r := range reports {
response = append(response, ExecutionReportResponse{
OrderID: r.OrderID,
ClOrdID: r.ClOrdID,
ExecID: r.ExecID,
ExecType: r.ExecType,
OrdStatus: r.OrdStatus,
Symbol: r.Symbol,
Side: r.Side,
OrderQty: r.OrderQty,
Price: r.Price,
LastPx: r.LastPx,
LastQty: r.LastQty,
CumQty: r.CumQty,
LeavesQty: r.LeavesQty,
AvgPx: r.AvgPx,
TransactTime: r.TransactTime,
Account: r.Account,
})
}
ctx.JSON(http.StatusOK, response)
}
// GetExecutionByOrderID godoc
// @Summary Get execution report by OrderID
// @Description Returns the FIX ExecutionReport for the given OrderID
// @Tags executions
// @Produce json
// @Param orderID path string true "OrderID"
// @Success 200 {object} ExecutionReportResponse
// @Failure 404 {object} HTTPError
// @Router /executions/{orderID} [get]
func (cont *Controller) GetExecutionByOrderID(ctx *gin.Context) {
setHeaders(ctx, cont.config)
orderID := ctx.Param("orderID")
r, ok := cont.orderRepo.GetByOrderID(orderID)
if !ok {
ctx.JSON(http.StatusNotFound, HTTPError{Error: "execution report not found"})
return
}
ctx.JSON(http.StatusOK, ExecutionReportResponse{
OrderID: r.OrderID,
ClOrdID: r.ClOrdID,
ExecID: r.ExecID,
ExecType: r.ExecType,
OrdStatus: r.OrdStatus,
Symbol: r.Symbol,
Side: r.Side,
OrderQty: r.OrderQty,
Price: r.Price,
LastPx: r.LastPx,
LastQty: r.LastQty,
CumQty: r.CumQty,
LeavesQty: r.LeavesQty,
AvgPx: r.AvgPx,
TransactTime: r.TransactTime,
Account: r.Account,
})
}

View File

@ -1,11 +1,5 @@
package rest package rest
import (
"time"
"github.com/shopspring/decimal"
)
type HTTPError struct { type HTTPError struct {
Error string Error string
} }
@ -22,23 +16,3 @@ type Credentials struct {
type Session struct { type Session struct {
Email string Email string
} }
// ExecutionReportResponse is the REST representation of a FIX ExecutionReport.
type ExecutionReportResponse struct {
OrderID string `json:"orderID"`
ClOrdID string `json:"clOrdID"`
ExecID string `json:"execID"`
ExecType string `json:"execType"`
OrdStatus string `json:"ordStatus"`
Symbol string `json:"symbol"`
Side string `json:"side"`
OrderQty decimal.Decimal `json:"orderQty"`
Price decimal.Decimal `json:"price"`
LastPx decimal.Decimal `json:"lastPx"`
LastQty decimal.Decimal `json:"lastQty"`
CumQty decimal.Decimal `json:"cumQty"`
LeavesQty decimal.Decimal `json:"leavesQty"`
AvgPx decimal.Decimal `json:"avgPx"`
TransactTime time.Time `json:"transactTime"`
Account string `json:"account,omitempty"`
}

View File

@ -21,8 +21,9 @@ func SetRoutes(api *API) {
qfixpt := v1.Group("/") qfixpt := v1.Group("/")
qfixpt.Use(cont.AuthRequired) qfixpt.Use(cont.AuthRequired)
qfixpt.GET("/health", cont.HealthCheck) qfixpt.GET("/health", cont.HealthCheck)
qfixpt.GET("/executions", cont.GetExecutions) qfixpt.GET("/trades", cont.GetTrades)
qfixpt.GET("/executions/:orderID", cont.GetExecutionByOrderID) qfixpt.GET("/trades/all", cont.GetAllTrades)
qfixpt.GET("/trades/:tradeID/logs", cont.GetTradeLogs)
backoffice := qfixpt.Group("/backoffice") backoffice := qfixpt.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser) backoffice.Use(cont.BackOfficeUser)

View File

@ -32,7 +32,13 @@ type Config struct {
EnableJWTAuth bool EnableJWTAuth bool
} }
func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier, repo domain.OrderRepository) *API { // TradeProvider exposes trade data for the REST API.
type TradeProvider interface {
GetTrades() []domain.PostTrade
GetAllTrades() []domain.PostTrade
}
func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {
// Set up Gin // Set up Gin
var engine *gin.Engine var engine *gin.Engine
if version.Environment() == version.EnvironmentTypeProd { if version.Environment() == version.EnvironmentTypeProd {
@ -58,7 +64,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, config Confi
} }
api := &API{ api := &API{
Controller: newController(NewPool(), userData, storeInstance, config, notify, repo), Controller: newController(NewPool(), userData, storeInstance, tradeProvider, config, notify),
Router: engine, Router: engine,
Port: config.Port, Port: config.Port,
} }

View File

@ -1,137 +1,147 @@
// Package fix implements the FIX protocol initiator and application handler.
package fix package fix
import ( import (
"log/slog" "log/slog"
"quantex.com/qfixpt/quickfix" "quantex.com/qfixpt/quickfix"
"quantex.com/qfixpt/quickfix/gen/fix44/executionreport" "quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport"
"quantex.com/qfixpt/quickfix/gen/tag"
"quantex.com/qfixpt/src/domain" "quantex.com/qfixpt/src/domain"
) )
// Application implements quickfix.Application to handle incoming FIX messages. type application struct {
type Application struct { router *quickfix.MessageRouter
repo domain.OrderRepository notifier domain.Notifier
onLogon func(quickfix.SessionID)
onLogout func(quickfix.SessionID)
onTradeCaptureReport func(tradecapturereport.TradeCaptureReport, quickfix.SessionID)
onAllocationReport func(allocationreport.AllocationReport, quickfix.SessionID)
onConfirmation func(confirmation.Confirmation, quickfix.SessionID)
onRawMessage func(direction string, msg *quickfix.Message)
} }
// NewApplication creates a new FIX Application backed by the given repository. func newApplication(n domain.Notifier) *application {
func NewApplication(repo domain.OrderRepository) *Application { app := &application{
return &Application{repo: repo} router: quickfix.NewMessageRouter(),
notifier: n,
} }
// OnCreate is called when a FIX session is created. app.router.AddRoute(tradecapturereport.Route(app.handleTradeCaptureReport))
func (a *Application) OnCreate(sessionID quickfix.SessionID) { app.router.AddRoute(allocationreport.Route(app.handleAllocationReport))
slog.Info("FIX session created", "sessionID", sessionID) app.router.AddRoute(confirmation.Route(app.handleConfirmation))
return app
} }
// OnLogon is called when a FIX session logs on successfully. func (a *application) OnCreate(sessionID quickfix.SessionID) {
func (a *Application) OnLogon(sessionID quickfix.SessionID) { slog.Info("FIX session created", "session", sessionID.String())
slog.Info("FIX session logged on", "sessionID", sessionID)
} }
// OnLogout is called when a FIX session logs out or disconnects. func (a *application) OnLogon(sessionID quickfix.SessionID) {
func (a *Application) OnLogout(sessionID quickfix.SessionID) { slog.Info("FIX session logged on", "session", sessionID.String())
slog.Info("FIX session logged out", "sessionID", sessionID) if a.onLogon != nil {
a.onLogon(sessionID)
}
} }
// ToAdmin is called before sending an admin message. func (a *application) OnLogout(sessionID quickfix.SessionID) {
func (a *Application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {} slog.Info("FIX session logged out", "session", sessionID.String())
// ToApp is called before sending an application message. go a.notifier.SendMsg(domain.MessageChannelError, "Logout", domain.MessageStatusWarning, nil)
func (a *Application) ToApp(_ *quickfix.Message, _ quickfix.SessionID) error {
return nil if a.onLogout != nil {
a.onLogout(sessionID)
}
} }
// FromAdmin is called when an admin message is received. func (a *application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {}
func (a *Application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError {
return nil
}
// FromApp is called when an application message is received. func (a *application) ToApp(msg *quickfix.Message, _ quickfix.SessionID) error {
// It routes ExecutionReport (MsgType=8) messages to the handler. if a.onRawMessage != nil {
func (a *Application) FromApp(message *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError { a.onRawMessage("OUT", msg)
if message.IsMsgTypeOf("8") {
a.handleExecutionReport(message, sessionID)
} }
return nil return nil
} }
func (a *Application) handleExecutionReport(msg *quickfix.Message, _ quickfix.SessionID) { func (a *application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError {
er := executionreport.FromMessage(msg) return nil
report := domain.ExecutionReport{}
if v, err := er.GetOrderID(); err == nil {
report.OrderID = v
} }
if v, err := er.GetClOrdID(); err == nil { func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError {
report.ClOrdID = v if a.onRawMessage != nil {
a.onRawMessage("IN", msg)
} }
if v, err := er.GetExecID(); err == nil { beginString, _ := msg.Header.GetBytes(tag.BeginString)
report.ExecID = v msgType, _ := msg.Header.GetBytes(tag.MsgType)
}
if v, err := er.GetExecType(); err == nil { var applVerID quickfix.FIXString
report.ExecType = string(v) msg.Header.GetField(tag.ApplVerID, &applVerID)
}
if v, err := er.GetOrdStatus(); err == nil { slog.Info("FIX FromApp received",
report.OrdStatus = string(v) "beginString", string(beginString),
} "msgType", string(msgType),
"applVerID", string(applVerID),
"session", sessionID.String(),
"rawMsg", msg.String(),
)
if v, err := er.GetSymbol(); err == nil { rejErr := a.router.Route(msg, sessionID)
report.Symbol = v if rejErr != nil {
} slog.Error("FIX FromApp routing failed",
"msgType", string(msgType),
if v, err := er.GetSide(); err == nil { "error", rejErr.Error(),
report.Side = string(v) "isBusinessReject", rejErr.IsBusinessReject(),
}
if v, err := er.GetOrderQty(); err == nil {
report.OrderQty = v
}
if v, err := er.GetPrice(); err == nil {
report.Price = v
}
if v, err := er.GetLastPx(); err == nil {
report.LastPx = v
}
if v, err := er.GetLastQty(); err == nil {
report.LastQty = v
}
if v, err := er.GetCumQty(); err == nil {
report.CumQty = v
}
if v, err := er.GetLeavesQty(); err == nil {
report.LeavesQty = v
}
if v, err := er.GetAvgPx(); err == nil {
report.AvgPx = v
}
if v, err := er.GetTransactTime(); err == nil {
report.TransactTime = v
}
if v, err := er.GetAccount(); err == nil {
report.Account = v
}
a.repo.Save(report)
slog.Info("ExecutionReport stored",
"orderID", report.OrderID,
"execType", report.ExecType,
"ordStatus", report.OrdStatus,
"symbol", report.Symbol,
) )
} }
return rejErr
}
func (a *application) handleTradeCaptureReport(msg tradecapturereport.TradeCaptureReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
tradeReportID, _ := msg.GetTradeReportID()
slog.Info("TradeCaptureReport received",
"tradeReportID", tradeReportID,
"session", sessionID.String(),
)
if a.onTradeCaptureReport != nil {
a.onTradeCaptureReport(msg, sessionID)
}
return nil
}
func (a *application) handleAllocationReport(msg allocationreport.AllocationReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
allocReportID, _ := msg.GetAllocReportID()
slog.Info("AllocationReport received",
"allocReportID", allocReportID,
"session", sessionID.String(),
)
if a.onAllocationReport != nil {
a.onAllocationReport(msg, sessionID)
}
return nil
}
func (a *application) handleConfirmation(msg confirmation.Confirmation, sessionID quickfix.SessionID) quickfix.MessageRejectError {
confirmID, _ := msg.GetConfirmID()
slog.Info("Confirmation received",
"confirmID", confirmID,
"session", sessionID.String(),
)
if a.onConfirmation != nil {
a.onConfirmation(msg, sessionID)
}
return nil
}

View File

@ -1,49 +0,0 @@
package fix
import (
"fmt"
"os"
"quantex.com/qfixpt/quickfix"
"quantex.com/qfixpt/src/domain"
)
// Initiator wraps the quickfix Initiator lifecycle.
type Initiator struct {
inner *quickfix.Initiator
}
// NewInitiator creates and starts a FIX initiator using the given settings file path.
func NewInitiator(settingsFile string, repo domain.OrderRepository) (*Initiator, error) {
f, err := os.Open(settingsFile)
if err != nil {
return nil, fmt.Errorf("opening FIX settings file %q: %w", settingsFile, err)
}
defer f.Close()
settings, err := quickfix.ParseSettings(f)
if err != nil {
return nil, fmt.Errorf("parsing FIX settings: %w", err)
}
app := NewApplication(repo)
storeFactory := quickfix.NewMemoryStoreFactory()
logFactory := quickfix.NewNullLogFactory()
initiator, err := quickfix.NewInitiator(app, storeFactory, settings, logFactory)
if err != nil {
return nil, fmt.Errorf("creating FIX initiator: %w", err)
}
return &Initiator{inner: initiator}, nil
}
// Start begins connecting to the FIX counterparty.
func (i *Initiator) Start() error {
return i.inner.Start()
}
// Stop gracefully disconnects all FIX sessions.
func (i *Initiator) Stop() {
i.inner.Stop()
}

581
src/client/fix/manager.go Normal file
View File

@ -0,0 +1,581 @@
package fix
import (
"log/slog"
"os"
"sync"
"time"
"github.com/rs/zerolog/log"
"github.com/shopspring/decimal"
"quantex.com/qfixpt/quickfix"
"quantex.com/qfixpt/quickfix/gen/enum"
"quantex.com/qfixpt/quickfix/gen/field"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreportack"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmationack"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereportack"
filelog "quantex.com/qfixpt/quickfix/log/file"
filestore "quantex.com/qfixpt/quickfix/store/file"
"quantex.com/qfixpt/src/app"
"quantex.com/qfixpt/src/common/tracerr"
"quantex.com/qfixpt/src/domain"
)
// Custom Tradeweb tags not in generated code.
const (
tagTestMessage = 23029
tagTWTradeID = 23068
tagTWOrigTradeID = 23096
tagTWClrID = 23025
tagDlrClrID = 23027
tagClearingStatus = 5440
tagTradingSystemID = 6731
)
// postTrade is the internal representation of a trade in memory.
type postTrade struct {
TradeID string
TradeReportID string
TradeReportType string
Side string
LastQty decimal.Decimal
LastPx decimal.Decimal
SettlDate string
TradeDate string
Status domain.PostTradeStatus
TWTradeID string
}
// Manager wraps the QuickFIX initiator for post-trade message handling.
type Manager struct {
initiator *quickfix.Initiator
app *application
tradesMu sync.RWMutex
trades map[string]*postTrade
store domain.PersistenceStore
notify domain.Notifier
cfg app.FIXConfig
}
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
return &Manager{
trades: make(map[string]*postTrade),
store: store,
notify: notify,
cfg: cfg,
}
}
func (m *Manager) Start() error {
fixApp := newApplication(m.notify)
fixApp.onLogon = m.onLogon
fixApp.onLogout = m.onLogout
fixApp.onTradeCaptureReport = m.handleTradeCaptureReport
fixApp.onAllocationReport = m.handleAllocationReport
fixApp.onConfirmation = m.handleConfirmation
fixApp.onRawMessage = m.handleRawMessage
m.app = fixApp
if err := m.loadTrades(); err != nil {
slog.Error("failed to load trades from DB, starting with empty state", "error", err)
}
f, err := os.Open(m.cfg.SettingsFile)
if err != nil {
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err)
log.Error().Msg(err.Error())
return err
}
defer f.Close()
settings, err := quickfix.ParseSettings(f)
if err != nil {
err = tracerr.Errorf("error parsing FIX settings: %s", err)
log.Error().Msg(err.Error())
return err
}
storeFactory := filestore.NewStoreFactory(settings)
logFactory, err := filelog.NewLogFactory(settings)
if err != nil {
err = tracerr.Errorf("error creating file log factory: %s", err)
log.Error().Msg(err.Error())
return err
}
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
if err != nil {
err = tracerr.Errorf("error creating FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
}
m.initiator = initiator
if err = m.initiator.Start(); err != nil {
err = tracerr.Errorf("error starting FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
}
slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile)
return nil
}
func (m *Manager) Stop() {
if m.initiator != nil {
m.initiator.Stop()
slog.Info("FIX initiator stopped")
}
}
func (m *Manager) onLogon(sessionID quickfix.SessionID) {
slog.Info("FIX PT session logged on", "session", sessionID.String())
}
func (m *Manager) onLogout(sessionID quickfix.SessionID) {
slog.Info("FIX PT session logged out", "session", sessionID.String())
}
// handleTradeCaptureReport processes an incoming TradeCaptureReport (35=AE).
func (m *Manager) handleTradeCaptureReport(msg tradecapturereport.TradeCaptureReport, sessionID quickfix.SessionID) {
tradeReportID, _ := msg.GetTradeReportID()
// ACK immediately before any validation.
ack := tradecapturereportack.New()
ack.SetTradeReportID(tradeReportID)
if err := quickfix.SendToTarget(ack, sessionID); err != nil {
slog.Error("failed to send TradeCaptureReportAck", "tradeReportID", tradeReportID, "error", err.Error())
} else {
slog.Info("TradeCaptureReportAck sent", "tradeReportID", tradeReportID)
}
// Check if test message.
testMessage := getCustomBool(&msg.Message.Body.FieldMap, tagTestMessage)
if testMessage {
slog.Info("test TradeCaptureReport, skipping", "tradeReportID", tradeReportID)
return
}
// Parse trade data.
tradeReportType, _ := msg.GetTradeReportType()
tradeID, _ := msg.GetTradeID()
execID, _ := msg.GetExecID()
lastQty, _ := msg.GetLastQty()
lastPx, _ := msg.GetLastPx()
spread, _ := msg.GetSpread()
settlDate, _ := msg.GetSettlDate()
tradeDate, _ := msg.GetTradeDate()
grossTradeAmt, _ := msg.GetGrossTradeAmt()
tradeReportRefID, _ := msg.GetTradeReportRefID()
// Custom fields.
twTradeID := getCustomString(&msg.Message.Body.FieldMap, tagTWTradeID)
twOrigTradeID := getCustomString(&msg.Message.Body.FieldMap, tagTWOrigTradeID)
// Extract side-level fields (parties, accrued interest, net money).
var side string
var accruedInterest, netMoney decimal.Decimal
var clearingFirm, twTraderID string
sides, sideErr := msg.GetNoSides()
if sideErr == nil && sides.Len() > 0 {
s := sides.Get(0)
sideVal, _ := s.GetSide()
side = string(sideVal)
accruedInterest, _ = s.GetAccruedInterestAmt()
netMoney, _ = s.GetNetMoney()
clearingFirm = extractPartyByRole(s, enum.PartyRole_CLEARING_FIRM)
// 1007 is a custom Tradeweb role, access via string comparison.
twTraderID = extractPartyByRole(s, "1007")
}
trade := domain.Trade{
TradeReportID: tradeReportID,
TradeID: tradeID,
ExecID: execID,
LastQty: lastQty,
LastPx: lastPx,
Spread: spread,
Side: side,
AccruedInterest: accruedInterest,
NetMoney: netMoney,
GrossTradeAmt: grossTradeAmt,
SettlDate: settlDate,
TradeDate: tradeDate,
TradeReportType: string(tradeReportType),
TradeReportRefID: tradeReportRefID,
ClearingFirm: clearingFirm,
TradewebTraderID: twTraderID,
TWTradeID: twTradeID,
TWOrigTradeID: twOrigTradeID,
}
slog.Info("TradeCaptureReport parsed",
"tradeReportID", trade.TradeReportID,
"tradeReportType", trade.TradeReportType,
"tradeID", trade.TradeID,
"lastQty", trade.LastQty.String(),
"lastPx", trade.LastPx.String(),
"side", trade.Side,
"settlDate", trade.SettlDate,
)
// Persist structured message.
m.persistMessage(tradeID, parseTradeCaptureReportJSON(msg))
// Update in-memory state.
var status domain.PostTradeStatus
switch string(tradeReportType) {
case "101": // TRDCONF — trade confirmation
status = domain.PostTradeStatusActive
slog.Info("TRDCONF: trade confirmation", "tradeReportID", tradeReportID, "tradeID", tradeID)
case "102": // TRDBLOCK — block trade, log only
status = domain.PostTradeStatusActive
slog.Info("TRDBLOCK: block trade", "tradeReportID", tradeReportID, "tradeID", tradeID)
case "103": // TRDCORR — trade correction
status = domain.PostTradeStatusCorrected
slog.Info("TRDCORR: trade correction", "tradeReportID", tradeReportID, "refID", tradeReportRefID)
case "104": // TRDCXL — trade cancellation
status = domain.PostTradeStatusCancelled
slog.Info("TRDCXL: trade cancel", "tradeReportID", tradeReportID, "refID", tradeReportRefID)
case "105", "106", "107", "108": // log and ack only, no business logic
status = domain.PostTradeStatusActive
slog.Info("trade report (no business logic)", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID)
default:
status = domain.PostTradeStatusActive
slog.Warn("unexpected TradeReportType", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID)
}
m.tradesMu.Lock()
m.trades[tradeID] = &postTrade{
TradeID: tradeID,
TradeReportID: tradeReportID,
TradeReportType: string(tradeReportType),
Side: side,
LastQty: lastQty,
LastPx: lastPx,
SettlDate: settlDate,
TradeDate: tradeDate,
Status: status,
TWTradeID: twTradeID,
}
m.tradesMu.Unlock()
}
// handleAllocationReport processes an incoming AllocationReport (35=AS).
func (m *Manager) handleAllocationReport(msg allocationreport.AllocationReport, sessionID quickfix.SessionID) {
allocReportID, _ := msg.GetAllocReportID()
// ACK immediately.
ack := allocationreportack.New(field.NewAllocReportID(allocReportID))
if err := quickfix.SendToTarget(ack, sessionID); err != nil {
slog.Error("failed to send AllocationReportAck", "allocReportID", allocReportID, "error", err.Error())
} else {
slog.Info("AllocationReportAck sent", "allocReportID", allocReportID)
}
// Check if test message.
testMessage := getCustomBool(&msg.Message.Body.FieldMap, tagTestMessage)
if testMessage {
slog.Info("test AllocationReport, skipping", "allocReportID", allocReportID)
return
}
allocTransType, _ := msg.GetAllocTransType()
// Extract TradeID: try tag 1003 directly, fall back to SecondaryOrderID (198) from NoOrders.
tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003)
if tradeID == "" {
if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 {
if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" {
tradeID = v
}
}
}
alloc := domain.Allocation{
AllocReportID: allocReportID,
AllocTransType: string(allocTransType),
TradeID: tradeID,
}
// Parse NoAllocs repeating group.
allocs, allocErr := msg.GetNoAllocs()
if allocErr == nil {
for i := 0; i < allocs.Len(); i++ {
entry := allocs.Get(i)
account, _ := entry.GetAllocAccount()
qty, _ := entry.GetAllocQty()
individualID, _ := entry.GetIndividualAllocID()
price, _ := entry.GetAllocPrice()
alloc.Entries = append(alloc.Entries, domain.AllocEntry{
AllocAccount: account,
AllocQty: qty,
IndividualAllocID: individualID,
AllocPrice: price,
})
}
}
slog.Info("AllocationReport parsed",
"allocReportID", alloc.AllocReportID,
"allocTransType", alloc.AllocTransType,
"tradeID", alloc.TradeID,
"numEntries", len(alloc.Entries),
)
// Persist structured message.
m.persistMessage(tradeID, parseAllocationReportJSON(msg))
switch allocTransType {
case enum.AllocTransType_NEW:
slog.Info("new allocation", "allocReportID", allocReportID)
case enum.AllocTransType_REPLACE:
slog.Info("replace allocation", "allocReportID", allocReportID)
case enum.AllocTransType_CANCEL:
slog.Info("cancel allocation", "allocReportID", allocReportID)
default:
slog.Warn("unhandled AllocTransType", "allocTransType", string(allocTransType), "allocReportID", allocReportID)
}
}
// handleConfirmation processes an incoming Confirmation (35=AK).
func (m *Manager) handleConfirmation(msg confirmation.Confirmation, sessionID quickfix.SessionID) {
confirmID, _ := msg.GetConfirmID()
tradeDate, _ := msg.GetTradeDate()
transactTime, _ := msg.GetTransactTime()
// ACK immediately.
ack := confirmationack.New(
field.NewConfirmID(confirmID),
field.NewTradeDate(tradeDate),
field.NewTransactTime(transactTime),
field.NewAffirmStatus(enum.AffirmStatus_RECEIVED),
)
if err := quickfix.SendToTarget(ack, sessionID); err != nil {
slog.Error("failed to send ConfirmationAck", "confirmID", confirmID, "error", err.Error())
} else {
slog.Info("ConfirmationAck sent", "confirmID", confirmID)
}
// Check if test message.
testMessage := getCustomBool(&msg.Message.Body.FieldMap, tagTestMessage)
if testMessage {
slog.Info("test Confirmation, skipping", "confirmID", confirmID)
return
}
confirmStatus, _ := msg.GetConfirmStatus()
confirmTransType, _ := msg.GetConfirmTransType()
individualAllocID, _ := msg.GetIndividualAllocID()
// Custom fields.
twClrID := getCustomString(&msg.Message.Body.FieldMap, tagTWClrID)
dlrClrID := getCustomString(&msg.Message.Body.FieldMap, tagDlrClrID)
clearingStatus := getCustomString(&msg.Message.Body.FieldMap, tagClearingStatus)
conf := domain.ConfirmationMsg{
ConfirmID: confirmID,
ConfirmStatus: string(confirmStatus),
ConfirmTransType: string(confirmTransType),
IndividualAllocID: individualAllocID,
TWClrID: twClrID,
DlrClrID: dlrClrID,
ClearingStatus: clearingStatus,
TradeDate: tradeDate,
}
slog.Info("Confirmation parsed",
"confirmID", conf.ConfirmID,
"confirmStatus", conf.ConfirmStatus,
"confirmTransType", conf.ConfirmTransType,
"individualAllocID", conf.IndividualAllocID,
"clearingStatus", conf.ClearingStatus,
)
// Persist structured message. Use TradeID if available, fall back to ConfirmID.
persistID := getCustomString(&msg.Message.Body.FieldMap, 1003)
if persistID == "" {
persistID = confirmID
}
m.persistMessage(persistID, parseConfirmationJSON(msg))
}
// persistMessage saves a structured FIX message to the messages table.
func (m *Manager) persistMessage(tradeID string, fixJSON domain.FixMessageJSON) {
if err := m.store.SaveMessage(domain.TradeMessage{
TradeID: tradeID,
JMessage: fixJSON,
}); err != nil {
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "tradeID", tradeID, "error", err)
}
}
// handleRawMessage persists raw FIX message strings to the logs table.
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
tradeID := extractTradeIdentifier(msg)
if tradeID == "" {
return
}
if err := m.store.SaveLog(domain.LogEntry{
TradeID: tradeID,
RawMsg: "[" + direction + "] " + msg.String(),
}); err != nil {
slog.Error("failed to persist raw log", "error", err)
}
}
// loadTrades reconstructs all trades and their states from today's messages in the database.
func (m *Manager) loadTrades() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
trades := make(map[string]*postTrade)
for _, msg := range messages {
switch msg.JMessage.MsgType {
case "AE": // TradeCaptureReport
body := msg.JMessage.Body
tradeReportType, _ := body["TradeReportType"].(string)
tradeID := msg.TradeID
if tradeID == "" {
continue
}
t, exists := trades[tradeID]
if !exists {
t = &postTrade{
TradeID: tradeID,
Status: domain.PostTradeStatusActive,
}
trades[tradeID] = t
}
if v, ok := body["TradeReportID"].(string); ok {
t.TradeReportID = v
}
t.TradeReportType = tradeReportType
if v, ok := body["Side"].(string); ok {
t.Side = v
}
if v, ok := body["LastQty"].(string); ok {
t.LastQty, _ = decimal.NewFromString(v)
}
if v, ok := body["LastPx"].(string); ok {
t.LastPx, _ = decimal.NewFromString(v)
}
if v, ok := body["SettlDate"].(string); ok {
t.SettlDate = v
}
if v, ok := body["TradeDate"].(string); ok {
t.TradeDate = v
}
if v, ok := body["TWTradeID"].(string); ok {
t.TWTradeID = v
}
switch tradeReportType {
case "103": // TRDCORR
t.Status = domain.PostTradeStatusCorrected
case "104": // TRDCXL
t.Status = domain.PostTradeStatusCancelled
default:
if t.Status != domain.PostTradeStatusCorrected && t.Status != domain.PostTradeStatusCancelled {
t.Status = domain.PostTradeStatusActive
}
}
}
}
active := 0
for _, t := range trades {
if t.Status == domain.PostTradeStatusActive {
active++
}
}
m.tradesMu.Lock()
m.trades = trades
m.tradesMu.Unlock()
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
return nil
}
// GetTrades returns only active trades.
func (m *Manager) GetTrades() []domain.PostTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
var result []domain.PostTrade
for _, t := range m.trades {
if t.Status == domain.PostTradeStatusActive {
result = append(result, t.toPostTrade())
}
}
return result
}
// GetAllTrades returns all trades regardless of status.
func (m *Manager) GetAllTrades() []domain.PostTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
result := make([]domain.PostTrade, 0, len(m.trades))
for _, t := range m.trades {
result = append(result, t.toPostTrade())
}
return result
}
func (t *postTrade) toPostTrade() domain.PostTrade {
return domain.PostTrade{
TradeID: t.TradeID,
TradeReportID: t.TradeReportID,
TradeReportType: t.TradeReportType,
Side: t.Side,
LastQty: t.LastQty.String(),
LastPx: t.LastPx.String(),
SettlDate: t.SettlDate,
TradeDate: t.TradeDate,
Status: t.Status,
TWTradeID: t.TWTradeID,
}
}
// Ensure transactTime has a sensible default if not present in the message.
var _ = time.Now // used by confirmationack.New via field.NewTransactTime

356
src/client/fix/parser.go Normal file
View File

@ -0,0 +1,356 @@
package fix
import (
"time"
"quantex.com/qfixpt/quickfix"
"quantex.com/qfixpt/quickfix/gen/enum"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation"
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport"
"quantex.com/qfixpt/quickfix/gen/tag"
"quantex.com/qfixpt/src/domain"
)
func getCustomString(body *quickfix.FieldMap, t int) string {
v, err := body.GetString(quickfix.Tag(t))
if err != nil {
return ""
}
return v
}
func getCustomInt(body *quickfix.FieldMap, t int) int {
v, err := body.GetInt(quickfix.Tag(t))
if err != nil {
return 0
}
return v
}
func getCustomBool(body *quickfix.FieldMap, t int) bool {
v, err := body.GetBool(quickfix.Tag(t))
if err != nil {
return false
}
return v
}
func extractHeader(msg *quickfix.Message) map[string]interface{} {
h := make(map[string]interface{})
if v, err := msg.Header.GetBytes(tag.BeginString); err == nil {
h["BeginString"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.MsgType); err == nil {
h["MsgType"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil {
h["SenderCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil {
h["TargetCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil {
h["SendingTime"] = string(v)
}
return h
}
func parseTradeCaptureReportJSON(msg tradecapturereport.TradeCaptureReport) domain.FixMessageJSON {
tradeID, _ := msg.GetTradeID()
body := map[string]interface{}{}
if v, _ := msg.GetTradeReportID(); v != "" {
body["TradeReportID"] = v
}
body["TradeID"] = tradeID
if v, _ := msg.GetExecID(); v != "" {
body["ExecID"] = v
}
if v, _ := msg.GetTradeReportType(); v != "" {
body["TradeReportType"] = string(v)
}
if v, _ := msg.GetLastQty(); !v.IsZero() {
body["LastQty"] = v.String()
}
if v, _ := msg.GetLastPx(); !v.IsZero() {
body["LastPx"] = v.String()
}
if v, _ := msg.GetSpread(); !v.IsZero() {
body["Spread"] = v.String()
}
if v, _ := msg.GetSettlDate(); v != "" {
body["SettlDate"] = v
}
if v, _ := msg.GetTradeDate(); v != "" {
body["TradeDate"] = v
}
if v, _ := msg.GetGrossTradeAmt(); !v.IsZero() {
body["GrossTradeAmt"] = v.String()
}
if v, _ := msg.GetTradeReportRefID(); v != "" {
body["TradeReportRefID"] = v
}
// Multi-leg support.
if v, _ := msg.GetTradeLegRefID(); v != "" {
body["TradeLegRefID"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagTradingSystemID); v != "" {
body["TradingSystemID"] = v
}
// Custom fields.
if v := getCustomString(&msg.Message.Body.FieldMap, tagTWTradeID); v != "" {
body["TWTradeID"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagTWOrigTradeID); v != "" {
body["TWOrigTradeID"] = v
}
// Side-level fields.
sides, sideErr := msg.GetNoSides()
if sideErr == nil && sides.Len() > 0 {
s := sides.Get(0)
if v, e := s.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := s.GetAccruedInterestAmt(); e == nil {
body["AccruedInterestAmt"] = v.String()
}
if v, e := s.GetNetMoney(); e == nil {
body["NetMoney"] = v.String()
}
if v := extractPartyByRole(s, enum.PartyRole_CLEARING_FIRM); v != "" {
body["ClearingFirm"] = v
}
if v := extractPartyByRole(s, "1007"); v != "" {
body["TradewebTraderID"] = v
}
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AE",
TradeID: tradeID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseAllocationReportJSON(msg allocationreport.AllocationReport) domain.FixMessageJSON {
// Primary: tag 1003 directly on the body.
tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003)
// Fallback: SecondaryOrderID (198) from the NoOrders repeating group.
if tradeID == "" {
if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 {
if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" {
tradeID = v
}
}
}
body := map[string]interface{}{}
if v, _ := msg.GetAllocReportID(); v != "" {
body["AllocReportID"] = v
}
if v, _ := msg.GetAllocTransType(); v != "" {
body["AllocTransType"] = string(v)
}
body["TradeID"] = tradeID
// TradingSystemID (6731) — for multi-leg, differentiates each leg.
if v := getCustomString(&msg.Message.Body.FieldMap, tagTradingSystemID); v != "" {
body["TradingSystemID"] = v
}
// SecondaryOrderID (198) from NoOrders — explicit reference to original trade.
if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 {
if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" {
body["SecondaryOrderID"] = v
}
}
allocs, allocErr := msg.GetNoAllocs()
if allocErr == nil {
var entries []map[string]interface{}
for i := 0; i < allocs.Len(); i++ {
entry := allocs.Get(i)
e := map[string]interface{}{}
if v, err := entry.GetAllocAccount(); err == nil {
e["AllocAccount"] = v
}
if v, err := entry.GetAllocQty(); err == nil {
e["AllocQty"] = v.String()
}
if v, err := entry.GetIndividualAllocID(); err == nil {
e["IndividualAllocID"] = v
}
if v, err := entry.GetAllocPrice(); err == nil {
e["AllocPrice"] = v.String()
}
entries = append(entries, e)
}
body["Allocs"] = entries
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AS",
TradeID: tradeID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseConfirmationJSON(msg confirmation.Confirmation) domain.FixMessageJSON {
// Try TradeID first, fall back to ConfirmID.
tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003)
if tradeID == "" {
tradeID, _ = msg.GetConfirmID()
}
body := map[string]interface{}{}
if v, _ := msg.GetConfirmID(); v != "" {
body["ConfirmID"] = v
}
if v, _ := msg.GetConfirmStatus(); v != "" {
body["ConfirmStatus"] = string(v)
}
if v, _ := msg.GetConfirmTransType(); v != "" {
body["ConfirmTransType"] = string(v)
}
if v, _ := msg.GetIndividualAllocID(); v != "" {
body["IndividualAllocID"] = v
}
if v, _ := msg.GetTradeDate(); v != "" {
body["TradeDate"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagTWClrID); v != "" {
body["TWClrID"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagDlrClrID); v != "" {
body["DlrClrID"] = v
}
if v := getCustomString(&msg.Message.Body.FieldMap, tagClearingStatus); v != "" {
body["ClearingStatus"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AK",
TradeID: tradeID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func extractTradeIdentifier(msg *quickfix.Message) string {
// Primary: TradeID (1003).
var tradeID quickfix.FIXString
if err := msg.Body.GetField(tag.TradeID, &tradeID); err == nil && string(tradeID) != "" {
return string(tradeID)
}
// Fallback: SecondaryOrderID (198) — used in AllocationReport to reference the trade.
var secOrderID quickfix.FIXString
if err := msg.Body.GetField(tag.SecondaryOrderID, &secOrderID); err == nil && string(secOrderID) != "" {
return string(secOrderID)
}
// Last resort: ConfirmID (664) — for Confirmation messages without TradeID.
var confirmID quickfix.FIXString
if err := msg.Body.GetField(tag.ConfirmID, &confirmID); err == nil && string(confirmID) != "" {
return string(confirmID)
}
return ""
}
func buildOutgoingMessageJSON(msgType, tradeID string, body map[string]interface{}) domain.FixMessageJSON {
return domain.FixMessageJSON{
Direction: "OUT",
MsgType: msgType,
TradeID: tradeID,
Body: body,
ReceiveTime: time.Now(),
}
}
// extractPartyByRole iterates the Parties repeating group inside a NoSides entry
// and returns the PartyID for the given target role.
func extractPartyByRole(side tradecapturereport.NoSides, targetRole enum.PartyRole) string {
parties, err := side.GetNoPartyIDs()
if err != nil {
return ""
}
for i := 0; i < parties.Len(); i++ {
party := parties.Get(i)
role, roleErr := party.GetPartyRole()
if roleErr != nil {
continue
}
if role == targetRole {
id, idErr := party.GetPartyID()
if idErr != nil {
return ""
}
return id
}
}
return ""
}

View File

@ -1,58 +0,0 @@
// Package repository provides in-memory storage implementations.
package repository
import (
"sync"
"quantex.com/qfixpt/src/domain"
)
// Memory is a thread-safe in-memory implementation of domain.OrderRepository.
type Memory struct {
mu sync.RWMutex
orders map[string]domain.ExecutionReport
ordered []string // maintains insertion order
}
// NewMemory creates a new in-memory order repository.
func NewMemory() *Memory {
return &Memory{
orders: make(map[string]domain.ExecutionReport),
ordered: []string{},
}
}
// Save stores an execution report, overwriting any existing entry with the same OrderID.
func (m *Memory) Save(report domain.ExecutionReport) {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.orders[report.OrderID]; !exists {
m.ordered = append(m.ordered, report.OrderID)
}
m.orders[report.OrderID] = report
}
// GetAll returns all stored execution reports in insertion order.
func (m *Memory) GetAll() []domain.ExecutionReport {
m.mu.RLock()
defer m.mu.RUnlock()
result := make([]domain.ExecutionReport, 0, len(m.ordered))
for _, id := range m.ordered {
result = append(result, m.orders[id])
}
return result
}
// GetByOrderID returns the execution report for the given OrderID.
func (m *Memory) GetByOrderID(orderID string) (domain.ExecutionReport, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
r, ok := m.orders[orderID]
return r, ok
}

16
src/client/store/db.sql Normal file
View File

@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS qfixpt_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
trade_id TEXT NOT NULL,
j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_pt_messages_trade_id ON qfixpt_messages(trade_id);
CREATE INDEX IF NOT EXISTS idx_pt_messages_created_at ON qfixpt_messages(created_at);
CREATE TABLE IF NOT EXISTS qfixpt_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
trade_id TEXT NOT NULL UNIQUE,
raw_msg TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

View File

@ -2,7 +2,9 @@
package store package store
import ( import (
_ "embed"
"log/slog" "log/slog"
"strings"
"time" "time"
"quantex.com.ar/multidb" "quantex.com.ar/multidb"
@ -11,6 +13,9 @@ import (
"quantex.com/qfixpt/src/common/tracerr" "quantex.com/qfixpt/src/common/tracerr"
) )
//go:embed db.sql
var schemaSQL string
const dbPingSeconds = 30 const dbPingSeconds = 30
type Store struct { type Store struct {
@ -45,9 +50,31 @@ func New(config Config) (*Store, error) {
go s.db.PeriodicDBPing(time.Second * dbPingSeconds) go s.db.PeriodicDBPing(time.Second * dbPingSeconds)
if err := s.ensureTables(); err != nil {
return nil, tracerr.Errorf("error ensuring tables: %w", err)
}
return s, nil return s, nil
} }
func (p *Store) ensureTables() error {
statements := strings.Split(schemaSQL, ";")
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := p.db.Exec(stmt); err != nil {
return tracerr.Errorf("error executing schema statement: %w", err)
}
}
slog.Info("database tables ensured")
return nil
}
func (p *Store) CloseDB() { func (p *Store) CloseDB() {
p.db.Close() p.db.Close()
slog.Info("closing database connection.") slog.Info("closing database connection.")

View File

@ -0,0 +1,103 @@
package store
import (
"encoding/json"
"strings"
"time"
"quantex.com/qfixpt/src/common/tracerr"
"quantex.com/qfixpt/src/domain"
)
func (p *Store) SaveMessage(msg domain.TradeMessage) error {
jsonBytes, err := json.Marshal(msg.JMessage)
if err != nil {
return tracerr.Errorf("error marshaling j_message: %w", err)
}
_, err = p.db.Exec(
"INSERT INTO qfixpt_messages (trade_id, j_message) VALUES ($1, $2)",
msg.TradeID, string(jsonBytes),
)
if err != nil {
return tracerr.Errorf("error inserting message: %w", err)
}
return nil
}
func (p *Store) SaveLog(entry domain.LogEntry) error {
upsertStmt := `INSERT INTO qfixpt_logs (trade_id, raw_msg)
VALUES ($1, $2)
ON CONFLICT (trade_id) DO UPDATE
SET raw_msg = qfixpt_logs.raw_msg || E'\n' || EXCLUDED.raw_msg,
updated_at = NOW()`
_, err := p.db.Exec(upsertStmt, entry.TradeID, entry.RawMsg)
if err != nil {
return tracerr.Errorf("error upserting log: %w", err)
}
return nil
}
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
rows, err := p.db.Query(
"SELECT id, trade_id, j_message, created_at FROM qfixpt_messages WHERE created_at >= current_date ORDER BY created_at ASC",
)
if err != nil {
return nil, tracerr.Errorf("error querying today messages: %w", err)
}
defer rows.Close()
var messages []domain.TradeMessage
for rows.Next() {
var (
id, tradeID string
jMessageRaw []byte
createdAt time.Time
)
if err := rows.Scan(&id, &tradeID, &jMessageRaw, &createdAt); err != nil {
return nil, tracerr.Errorf("error scanning message row: %w", err)
}
var jMessage domain.FixMessageJSON
if err := json.Unmarshal(jMessageRaw, &jMessage); err != nil {
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
}
messages = append(messages, domain.TradeMessage{
ID: id,
TradeID: tradeID,
JMessage: jMessage,
CreatedAt: createdAt,
})
}
if err := rows.Err(); err != nil {
return nil, tracerr.Errorf("error iterating message rows: %w", err)
}
return messages, nil
}
func (p *Store) GetLogsByTradeID(tradeID string) (domain.Logs, error) {
rows, err := p.db.Query("SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", tradeID)
if err != nil {
return domain.Logs{}, tracerr.Errorf("error querying logs: %w", err)
}
defer rows.Close()
if !rows.Next() {
return domain.Logs{}, nil
}
var rawMsg string
if err := rows.Scan(&rawMsg); err != nil {
return domain.Logs{}, tracerr.Errorf("error scanning log row: %w", err)
}
return domain.Logs{Entries: strings.Split(rawMsg, "\n")}, nil
}

View File

@ -9,7 +9,6 @@ import (
"quantex.com/qfixpt/src/client/api/rest" "quantex.com/qfixpt/src/client/api/rest"
"quantex.com/qfixpt/src/client/data" "quantex.com/qfixpt/src/client/data"
"quantex.com/qfixpt/src/client/fix" "quantex.com/qfixpt/src/client/fix"
fixrepo "quantex.com/qfixpt/src/client/fix/repository"
googlechat "quantex.com/qfixpt/src/client/notify/google" googlechat "quantex.com/qfixpt/src/client/notify/google"
"quantex.com/qfixpt/src/client/store" "quantex.com/qfixpt/src/client/store"
"quantex.com/qfixpt/src/client/store/external" "quantex.com/qfixpt/src/client/store/external"
@ -38,27 +37,14 @@ func Runner(cfg app.Config) error {
return fmt.Errorf("error trying to create store %w", err) return fmt.Errorf("error trying to create store %w", err)
} }
// In-memory repository shared between the FIX initiator and the REST API. userData := data.New()
orderRepo := fixrepo.NewMemory()
// Start the FIX initiator if a settings file is configured. // Initialize FIX Post-Trade Manager.
var fixInitiator *fix.Initiator fixManager := fix.NewManager(cfg.FIX, appStore, notify)
if cfg.FIX.SettingsFile != "" { if err = fixManager.Start(); err != nil {
fixInitiator, err = fix.NewInitiator(cfg.FIX.SettingsFile, orderRepo)
if err != nil {
return fmt.Errorf("error creating FIX initiator: %w", err)
}
if err = fixInitiator.Start(); err != nil {
return fmt.Errorf("error starting FIX initiator: %w", err) return fmt.Errorf("error starting FIX initiator: %w", err)
} }
defer fixManager.Stop()
slog.Info("FIX initiator started", "settingsFile", cfg.FIX.SettingsFile)
} else {
slog.Warn("FIX.SettingsFile not configured — FIX initiator will not start")
}
userData := data.New()
apiConfig := rest.Config{ apiConfig := rest.Config{
Port: cfg.APIBasePort, Port: cfg.APIBasePort,
@ -68,14 +54,10 @@ func Runner(cfg app.Config) error {
EnableJWTAuth: cfg.EnableJWTAuth, EnableJWTAuth: cfg.EnableJWTAuth,
} }
api := rest.New(userData, appStore, apiConfig, notify, orderRepo) api := rest.New(userData, appStore, fixManager, apiConfig, notify)
api.Run() api.Run()
cmd.WaitForInterruptSignal(nil) cmd.WaitForInterruptSignal(nil)
if fixInitiator != nil {
fixInitiator.Stop()
}
return nil return nil
} }

View File

@ -1,34 +0,0 @@
package domain
import (
"time"
"github.com/shopspring/decimal"
)
// ExecutionReport represents a FIX ExecutionReport (MsgType=8) received from the counterparty.
type ExecutionReport struct {
OrderID string
ClOrdID string
ExecID string
ExecType string
OrdStatus string
Symbol string
Side string
OrderQty decimal.Decimal
Price decimal.Decimal
LastPx decimal.Decimal
LastQty decimal.Decimal
CumQty decimal.Decimal
LeavesQty decimal.Decimal
AvgPx decimal.Decimal
TransactTime time.Time
Account string
}
// OrderRepository defines the interface for storing and retrieving execution reports.
type OrderRepository interface {
Save(report ExecutionReport)
GetAll() []ExecutionReport
GetByOrderID(orderID string) (ExecutionReport, bool)
}

63
src/domain/persistence.go Normal file
View File

@ -0,0 +1,63 @@
package domain
import "time"
// PostTradeStatus represents the lifecycle state of a post-trade record.
type PostTradeStatus string
const (
PostTradeStatusActive PostTradeStatus = "active"
PostTradeStatusCorrected PostTradeStatus = "corrected"
PostTradeStatusCancelled PostTradeStatus = "cancelled"
)
// FixMessageJSON is the structured representation of a FIX message for storage.
type FixMessageJSON struct {
Direction string `json:"direction"`
MsgType string `json:"msg_type"`
TradeID string `json:"trade_id"`
Header map[string]interface{} `json:"header"`
Body map[string]interface{} `json:"body"`
ReceiveTime time.Time `json:"receive_time"`
}
// TradeMessage is a row in qfixpt_messages.
type TradeMessage struct {
ID string `json:"id"`
TradeID string `json:"trade_id"`
JMessage FixMessageJSON `json:"j_message"`
CreatedAt time.Time `json:"created_at"`
}
// LogEntry is the DTO for inserting/appending a raw log in qfixpt_logs.
type LogEntry struct {
TradeID string
RawMsg string
}
// Logs is the response for GET /trades/:tradeID/logs.
type Logs struct {
Entries []string `json:"entries"`
}
// PostTrade is the exported representation of a post-trade record for the API.
type PostTrade struct {
TradeID string `json:"trade_id"`
TradeReportID string `json:"trade_report_id"`
TradeReportType string `json:"trade_report_type"`
Side string `json:"side"`
LastQty string `json:"last_qty"`
LastPx string `json:"last_px"`
SettlDate string `json:"settl_date"`
TradeDate string `json:"trade_date"`
Status PostTradeStatus `json:"status"`
TWTradeID string `json:"tw_trade_id"`
}
// PersistenceStore defines the persistence interface for post-trade messages.
type PersistenceStore interface {
SaveMessage(msg TradeMessage) error
SaveLog(entry LogEntry) error
GetTodayMessages() ([]TradeMessage, error)
GetLogsByTradeID(tradeID string) (Logs, error)
}

61
src/domain/posttrade.go Normal file
View File

@ -0,0 +1,61 @@
package domain
import "github.com/shopspring/decimal"
// Trade represents data extracted from a TradeCaptureReport (35=AE).
type Trade struct {
TradeReportID string // 571
TradeID string // 1003
ExecID string // 17
OrigTradeID string // 1903
OrigSecondaryID string // 1906
LastQty decimal.Decimal // 32
LastPx decimal.Decimal // 31
Spread decimal.Decimal // 218 (236 = benchmark curve name)
Side string // 54
AccruedInterest decimal.Decimal // 159
NetMoney decimal.Decimal // 118
GrossTradeAmt decimal.Decimal // 381
SettlDate string // 64
TradeDate string // 75
ListID string // 66
TradeReportType string // 856
TradeHandlInst string // 487
TradeReportRefID string // 572
ClearingFirm string // Party role=4
TradewebTraderID string // Party role=1007 (custom)
TestMessage bool // 23029 (custom)
TWTradeID string // 23068 (custom)
TWOrigTradeID string // 23096 (custom)
}
// Allocation represents data extracted from an AllocationReport (35=AS).
type Allocation struct {
AllocReportID string // 755
AllocTransType string // 71
TradeID string // 1003
TestMessage bool // 23029 (custom)
Entries []AllocEntry // NoAllocs group (78)
}
// AllocEntry represents a single entry in the NoAllocs repeating group.
type AllocEntry struct {
AllocAccount string // 79
AllocQty decimal.Decimal // 80
IndividualAllocID string // 467
AllocPrice decimal.Decimal // 154 (via custom tag access if needed)
CashSettlAmount decimal.Decimal // 742 (via custom tag access if needed)
}
// ConfirmationMsg represents data extracted from a Confirmation (35=AK).
type ConfirmationMsg struct {
ConfirmID string // 664
ConfirmStatus string // 665
ConfirmTransType string // 666
IndividualAllocID string // 467
TWClrID string // 23025 (custom)
DlrClrID string // 23027 (custom)
ClearingStatus string // 5440 (custom)
TradeDate string // 75
TestMessage bool // 23029 (custom)
}