Compare commits
2 Commits
setup
...
FIXPT-2/Ne
| Author | SHA1 | Date | |
|---|---|---|---|
| 9fa1349afd | |||
| 01e04df0f6 |
6
Makefile
6
Makefile
@ -56,13 +56,13 @@ build: check-env swag vendor only-build # Build a native version. Set e=environm
|
|||||||
|
|
||||||
only-build: check-env
|
only-build: check-env
|
||||||
@echo "Building for $(e) environment..."
|
@echo "Building for $(e) environment..."
|
||||||
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
|
env OUT_PATH=$(DEFAULT_OUT_PATH) tools/build.sh $(e)
|
||||||
|
|
||||||
linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo
|
linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo
|
||||||
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
|
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
|
||||||
|
|
||||||
deploy: check-env # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=qfixpt
|
deploy: # Deploy to remote server. Set s=serverName, e=environment. e.g. make deploy e=dev s=nonprodFix
|
||||||
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/
|
make linux-build e=$(e) && qscp build/out/distribution/qfixpt.gz $(s):/home/quantex/qfixtb/qfixpt/
|
||||||
|
|
||||||
fmt: download-versions # Apply the Go formatter to the code
|
fmt: download-versions # Apply the Go formatter to the code
|
||||||
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);
|
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);
|
||||||
|
|||||||
19
fix_pt.cfg
Normal file
19
fix_pt.cfg
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
[DEFAULT]
|
||||||
|
ConnectionType=initiator
|
||||||
|
HeartBtInt=30
|
||||||
|
ReconnectInterval=30
|
||||||
|
SenderCompID=<CONFIGURAR>
|
||||||
|
ResetOnLogon=Y
|
||||||
|
FileStorePath=fix_store
|
||||||
|
FileLogPath=fix_logs
|
||||||
|
|
||||||
|
[SESSION]
|
||||||
|
BeginString=FIXT.1.1
|
||||||
|
DefaultApplVerID=FIX.5.0SP2
|
||||||
|
TargetCompID=<CONFIGURAR>
|
||||||
|
TransportDataDictionary=spec/FIXT11.xml
|
||||||
|
AppDataDictionary=spec/FIX50SP2.xml
|
||||||
|
StartTime=00:00:00
|
||||||
|
EndTime=00:00:00
|
||||||
|
SocketConnectHost=<CONFIGURAR>
|
||||||
|
SocketConnectPort=<CONFIGURAR>
|
||||||
5
main.go
5
main.go
@ -178,11 +178,6 @@ func parseLogLevel(level string) (slog.Level, error) {
|
|||||||
|
|
||||||
func startRunner(runner, globalCfg, serviceCfg string) {
|
func startRunner(runner, globalCfg, serviceCfg string) {
|
||||||
var fn func(cfg app.Config) error
|
var fn func(cfg app.Config) error
|
||||||
|
|
||||||
if runner == "" {
|
|
||||||
runner = "service"
|
|
||||||
}
|
|
||||||
|
|
||||||
switch runner {
|
switch runner {
|
||||||
case "service":
|
case "service":
|
||||||
fn = service.Runner
|
fn = service.Runner
|
||||||
|
|||||||
@ -38,8 +38,9 @@ type Service struct {
|
|||||||
EnableJWTAuth bool // Enable JWT authentication for service-to-service communication
|
EnableJWTAuth bool // Enable JWT authentication for service-to-service communication
|
||||||
FIX FIXConfig
|
FIX FIXConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
type FIXConfig struct {
|
type FIXConfig struct {
|
||||||
SettingsFile string // path to fix.cfg file
|
SettingsFile string // path to fix_pt.cfg file
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExtAuth struct {
|
type ExtAuth struct {
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package version
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
@ -37,17 +38,17 @@ type EnvironmentType int //nolint:recvcheck // The methods of this are autogener
|
|||||||
var environment EnvironmentType //nolint:gochecknoglobals // Just keept this global to avoid having to create an instance
|
var environment EnvironmentType //nolint:gochecknoglobals // Just keept this global to avoid having to create an instance
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
// aux := os.Getenv(quantexEnvironment)
|
aux := os.Getenv(quantexEnvironment)
|
||||||
// if aux == "" {
|
if aux == "" {
|
||||||
// panic("QUANTEX_ENVIRONMENT is not set")
|
panic("QUANTEX_ENVIRONMENT is not set")
|
||||||
// }
|
}
|
||||||
|
|
||||||
// env, err := ParseEnvironmentType(aux)
|
env, err := ParseEnvironmentType(aux)
|
||||||
// if err != nil {
|
if err != nil {
|
||||||
// panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error())
|
panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error())
|
||||||
// }
|
}
|
||||||
|
|
||||||
environment = EnvironmentTypeDev
|
environment = env
|
||||||
}
|
}
|
||||||
|
|
||||||
// Base returns the version base name
|
// Base returns the version base name
|
||||||
|
|||||||
@ -29,28 +29,56 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Controller struct {
|
type Controller struct {
|
||||||
pool *redis.Pool
|
pool *redis.Pool
|
||||||
userData app.UserDataProvider
|
userData app.UserDataProvider
|
||||||
store *store.Store
|
store *store.Store
|
||||||
config Config
|
tradeProvider TradeProvider
|
||||||
notify domain.Notifier
|
config Config
|
||||||
orderRepo domain.OrderRepository
|
notify domain.Notifier
|
||||||
authMutex deadlock.Mutex
|
authMutex deadlock.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func newController(pool *redis.Pool, userData app.UserDataProvider,
|
func newController(pool *redis.Pool, userData app.UserDataProvider,
|
||||||
s *store.Store, config Config, n domain.Notifier, repo domain.OrderRepository,
|
s *store.Store, tp TradeProvider, config Config, n domain.Notifier,
|
||||||
) *Controller {
|
) *Controller {
|
||||||
return &Controller{
|
return &Controller{
|
||||||
pool: pool,
|
pool: pool,
|
||||||
userData: userData,
|
userData: userData,
|
||||||
store: s,
|
store: s,
|
||||||
config: config,
|
tradeProvider: tp,
|
||||||
notify: n,
|
config: config,
|
||||||
orderRepo: repo,
|
notify: n,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cont *Controller) GetTrades(ctx *gin.Context) {
|
||||||
|
setHeaders(ctx, cont.config)
|
||||||
|
trades := cont.tradeProvider.GetTrades()
|
||||||
|
ctx.JSON(http.StatusOK, trades)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cont *Controller) GetAllTrades(ctx *gin.Context) {
|
||||||
|
setHeaders(ctx, cont.config)
|
||||||
|
trades := cont.tradeProvider.GetAllTrades()
|
||||||
|
ctx.JSON(http.StatusOK, trades)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cont *Controller) GetTradeLogs(ctx *gin.Context) {
|
||||||
|
setHeaders(ctx, cont.config)
|
||||||
|
|
||||||
|
tradeID := ctx.Param("tradeID")
|
||||||
|
|
||||||
|
logs, err := cont.store.GetLogsByTradeID(tradeID)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("error fetching trade logs", "tradeID", tradeID, "error", err)
|
||||||
|
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.JSON(http.StatusOK, logs)
|
||||||
|
}
|
||||||
|
|
||||||
func (cont *Controller) GetUser(ctx *gin.Context) app.User {
|
func (cont *Controller) GetUser(ctx *gin.Context) app.User {
|
||||||
// This is set on the AuthRequired middleware
|
// This is set on the AuthRequired middleware
|
||||||
response, ok := ctx.Get(responseKey)
|
response, ok := ctx.Get(responseKey)
|
||||||
@ -290,82 +318,3 @@ func allowed(origin string, config Config) bool {
|
|||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetExecutions godoc
|
|
||||||
// @Summary Get all execution reports
|
|
||||||
// @Description Returns all FIX ExecutionReport messages received by the initiator
|
|
||||||
// @Tags executions
|
|
||||||
// @Produce json
|
|
||||||
// @Success 200 {array} ExecutionReportResponse
|
|
||||||
// @Failure 500 {object} HTTPError
|
|
||||||
// @Router /executions [get]
|
|
||||||
func (cont *Controller) GetExecutions(ctx *gin.Context) {
|
|
||||||
setHeaders(ctx, cont.config)
|
|
||||||
|
|
||||||
reports := cont.orderRepo.GetAll()
|
|
||||||
|
|
||||||
response := make([]ExecutionReportResponse, 0, len(reports))
|
|
||||||
for _, r := range reports {
|
|
||||||
response = append(response, ExecutionReportResponse{
|
|
||||||
OrderID: r.OrderID,
|
|
||||||
ClOrdID: r.ClOrdID,
|
|
||||||
ExecID: r.ExecID,
|
|
||||||
ExecType: r.ExecType,
|
|
||||||
OrdStatus: r.OrdStatus,
|
|
||||||
Symbol: r.Symbol,
|
|
||||||
Side: r.Side,
|
|
||||||
OrderQty: r.OrderQty,
|
|
||||||
Price: r.Price,
|
|
||||||
LastPx: r.LastPx,
|
|
||||||
LastQty: r.LastQty,
|
|
||||||
CumQty: r.CumQty,
|
|
||||||
LeavesQty: r.LeavesQty,
|
|
||||||
AvgPx: r.AvgPx,
|
|
||||||
TransactTime: r.TransactTime,
|
|
||||||
Account: r.Account,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.JSON(http.StatusOK, response)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetExecutionByOrderID godoc
|
|
||||||
// @Summary Get execution report by OrderID
|
|
||||||
// @Description Returns the FIX ExecutionReport for the given OrderID
|
|
||||||
// @Tags executions
|
|
||||||
// @Produce json
|
|
||||||
// @Param orderID path string true "OrderID"
|
|
||||||
// @Success 200 {object} ExecutionReportResponse
|
|
||||||
// @Failure 404 {object} HTTPError
|
|
||||||
// @Router /executions/{orderID} [get]
|
|
||||||
func (cont *Controller) GetExecutionByOrderID(ctx *gin.Context) {
|
|
||||||
setHeaders(ctx, cont.config)
|
|
||||||
|
|
||||||
orderID := ctx.Param("orderID")
|
|
||||||
|
|
||||||
r, ok := cont.orderRepo.GetByOrderID(orderID)
|
|
||||||
if !ok {
|
|
||||||
ctx.JSON(http.StatusNotFound, HTTPError{Error: "execution report not found"})
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.JSON(http.StatusOK, ExecutionReportResponse{
|
|
||||||
OrderID: r.OrderID,
|
|
||||||
ClOrdID: r.ClOrdID,
|
|
||||||
ExecID: r.ExecID,
|
|
||||||
ExecType: r.ExecType,
|
|
||||||
OrdStatus: r.OrdStatus,
|
|
||||||
Symbol: r.Symbol,
|
|
||||||
Side: r.Side,
|
|
||||||
OrderQty: r.OrderQty,
|
|
||||||
Price: r.Price,
|
|
||||||
LastPx: r.LastPx,
|
|
||||||
LastQty: r.LastQty,
|
|
||||||
CumQty: r.CumQty,
|
|
||||||
LeavesQty: r.LeavesQty,
|
|
||||||
AvgPx: r.AvgPx,
|
|
||||||
TransactTime: r.TransactTime,
|
|
||||||
Account: r.Account,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|||||||
@ -1,11 +1,5 @@
|
|||||||
package rest
|
package rest
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/shopspring/decimal"
|
|
||||||
)
|
|
||||||
|
|
||||||
type HTTPError struct {
|
type HTTPError struct {
|
||||||
Error string
|
Error string
|
||||||
}
|
}
|
||||||
@ -22,23 +16,3 @@ type Credentials struct {
|
|||||||
type Session struct {
|
type Session struct {
|
||||||
Email string
|
Email string
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecutionReportResponse is the REST representation of a FIX ExecutionReport.
|
|
||||||
type ExecutionReportResponse struct {
|
|
||||||
OrderID string `json:"orderID"`
|
|
||||||
ClOrdID string `json:"clOrdID"`
|
|
||||||
ExecID string `json:"execID"`
|
|
||||||
ExecType string `json:"execType"`
|
|
||||||
OrdStatus string `json:"ordStatus"`
|
|
||||||
Symbol string `json:"symbol"`
|
|
||||||
Side string `json:"side"`
|
|
||||||
OrderQty decimal.Decimal `json:"orderQty"`
|
|
||||||
Price decimal.Decimal `json:"price"`
|
|
||||||
LastPx decimal.Decimal `json:"lastPx"`
|
|
||||||
LastQty decimal.Decimal `json:"lastQty"`
|
|
||||||
CumQty decimal.Decimal `json:"cumQty"`
|
|
||||||
LeavesQty decimal.Decimal `json:"leavesQty"`
|
|
||||||
AvgPx decimal.Decimal `json:"avgPx"`
|
|
||||||
TransactTime time.Time `json:"transactTime"`
|
|
||||||
Account string `json:"account,omitempty"`
|
|
||||||
}
|
|
||||||
|
|||||||
@ -21,8 +21,9 @@ func SetRoutes(api *API) {
|
|||||||
qfixpt := v1.Group("/")
|
qfixpt := v1.Group("/")
|
||||||
qfixpt.Use(cont.AuthRequired)
|
qfixpt.Use(cont.AuthRequired)
|
||||||
qfixpt.GET("/health", cont.HealthCheck)
|
qfixpt.GET("/health", cont.HealthCheck)
|
||||||
qfixpt.GET("/executions", cont.GetExecutions)
|
qfixpt.GET("/trades", cont.GetTrades)
|
||||||
qfixpt.GET("/executions/:orderID", cont.GetExecutionByOrderID)
|
qfixpt.GET("/trades/all", cont.GetAllTrades)
|
||||||
|
qfixpt.GET("/trades/:tradeID/logs", cont.GetTradeLogs)
|
||||||
|
|
||||||
backoffice := qfixpt.Group("/backoffice")
|
backoffice := qfixpt.Group("/backoffice")
|
||||||
backoffice.Use(cont.BackOfficeUser)
|
backoffice.Use(cont.BackOfficeUser)
|
||||||
|
|||||||
@ -32,7 +32,13 @@ type Config struct {
|
|||||||
EnableJWTAuth bool
|
EnableJWTAuth bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier, repo domain.OrderRepository) *API {
|
// TradeProvider exposes trade data for the REST API.
|
||||||
|
type TradeProvider interface {
|
||||||
|
GetTrades() []domain.PostTrade
|
||||||
|
GetAllTrades() []domain.PostTrade
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {
|
||||||
// Set up Gin
|
// Set up Gin
|
||||||
var engine *gin.Engine
|
var engine *gin.Engine
|
||||||
if version.Environment() == version.EnvironmentTypeProd {
|
if version.Environment() == version.EnvironmentTypeProd {
|
||||||
@ -58,7 +64,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, config Confi
|
|||||||
}
|
}
|
||||||
|
|
||||||
api := &API{
|
api := &API{
|
||||||
Controller: newController(NewPool(), userData, storeInstance, config, notify, repo),
|
Controller: newController(NewPool(), userData, storeInstance, tradeProvider, config, notify),
|
||||||
Router: engine,
|
Router: engine,
|
||||||
Port: config.Port,
|
Port: config.Port,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,137 +1,147 @@
|
|||||||
// Package fix implements the FIX protocol initiator and application handler.
|
|
||||||
package fix
|
package fix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
||||||
"quantex.com/qfixpt/quickfix"
|
"quantex.com/qfixpt/quickfix"
|
||||||
"quantex.com/qfixpt/quickfix/gen/fix44/executionreport"
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/tag"
|
||||||
"quantex.com/qfixpt/src/domain"
|
"quantex.com/qfixpt/src/domain"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Application implements quickfix.Application to handle incoming FIX messages.
|
type application struct {
|
||||||
type Application struct {
|
router *quickfix.MessageRouter
|
||||||
repo domain.OrderRepository
|
notifier domain.Notifier
|
||||||
|
onLogon func(quickfix.SessionID)
|
||||||
|
onLogout func(quickfix.SessionID)
|
||||||
|
onTradeCaptureReport func(tradecapturereport.TradeCaptureReport, quickfix.SessionID)
|
||||||
|
onAllocationReport func(allocationreport.AllocationReport, quickfix.SessionID)
|
||||||
|
onConfirmation func(confirmation.Confirmation, quickfix.SessionID)
|
||||||
|
onRawMessage func(direction string, msg *quickfix.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewApplication creates a new FIX Application backed by the given repository.
|
func newApplication(n domain.Notifier) *application {
|
||||||
func NewApplication(repo domain.OrderRepository) *Application {
|
app := &application{
|
||||||
return &Application{repo: repo}
|
router: quickfix.NewMessageRouter(),
|
||||||
|
notifier: n,
|
||||||
|
}
|
||||||
|
|
||||||
|
app.router.AddRoute(tradecapturereport.Route(app.handleTradeCaptureReport))
|
||||||
|
app.router.AddRoute(allocationreport.Route(app.handleAllocationReport))
|
||||||
|
app.router.AddRoute(confirmation.Route(app.handleConfirmation))
|
||||||
|
|
||||||
|
return app
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnCreate is called when a FIX session is created.
|
func (a *application) OnCreate(sessionID quickfix.SessionID) {
|
||||||
func (a *Application) OnCreate(sessionID quickfix.SessionID) {
|
slog.Info("FIX session created", "session", sessionID.String())
|
||||||
slog.Info("FIX session created", "sessionID", sessionID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnLogon is called when a FIX session logs on successfully.
|
func (a *application) OnLogon(sessionID quickfix.SessionID) {
|
||||||
func (a *Application) OnLogon(sessionID quickfix.SessionID) {
|
slog.Info("FIX session logged on", "session", sessionID.String())
|
||||||
slog.Info("FIX session logged on", "sessionID", sessionID)
|
if a.onLogon != nil {
|
||||||
|
a.onLogon(sessionID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnLogout is called when a FIX session logs out or disconnects.
|
func (a *application) OnLogout(sessionID quickfix.SessionID) {
|
||||||
func (a *Application) OnLogout(sessionID quickfix.SessionID) {
|
slog.Info("FIX session logged out", "session", sessionID.String())
|
||||||
slog.Info("FIX session logged out", "sessionID", sessionID)
|
|
||||||
|
go a.notifier.SendMsg(domain.MessageChannelError, "Logout", domain.MessageStatusWarning, nil)
|
||||||
|
|
||||||
|
if a.onLogout != nil {
|
||||||
|
a.onLogout(sessionID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToAdmin is called before sending an admin message.
|
func (a *application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {}
|
||||||
func (a *Application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {}
|
|
||||||
|
|
||||||
// ToApp is called before sending an application message.
|
func (a *application) ToApp(msg *quickfix.Message, _ quickfix.SessionID) error {
|
||||||
func (a *Application) ToApp(_ *quickfix.Message, _ quickfix.SessionID) error {
|
if a.onRawMessage != nil {
|
||||||
return nil
|
a.onRawMessage("OUT", msg)
|
||||||
}
|
|
||||||
|
|
||||||
// FromAdmin is called when an admin message is received.
|
|
||||||
func (a *Application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FromApp is called when an application message is received.
|
|
||||||
// It routes ExecutionReport (MsgType=8) messages to the handler.
|
|
||||||
func (a *Application) FromApp(message *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
|
||||||
if message.IsMsgTypeOf("8") {
|
|
||||||
a.handleExecutionReport(message, sessionID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Application) handleExecutionReport(msg *quickfix.Message, _ quickfix.SessionID) {
|
func (a *application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError {
|
||||||
er := executionreport.FromMessage(msg)
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
report := domain.ExecutionReport{}
|
func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||||
|
if a.onRawMessage != nil {
|
||||||
if v, err := er.GetOrderID(); err == nil {
|
a.onRawMessage("IN", msg)
|
||||||
report.OrderID = v
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if v, err := er.GetClOrdID(); err == nil {
|
beginString, _ := msg.Header.GetBytes(tag.BeginString)
|
||||||
report.ClOrdID = v
|
msgType, _ := msg.Header.GetBytes(tag.MsgType)
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetExecID(); err == nil {
|
var applVerID quickfix.FIXString
|
||||||
report.ExecID = v
|
msg.Header.GetField(tag.ApplVerID, &applVerID)
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetExecType(); err == nil {
|
slog.Info("FIX FromApp received",
|
||||||
report.ExecType = string(v)
|
"beginString", string(beginString),
|
||||||
}
|
"msgType", string(msgType),
|
||||||
|
"applVerID", string(applVerID),
|
||||||
if v, err := er.GetOrdStatus(); err == nil {
|
"session", sessionID.String(),
|
||||||
report.OrdStatus = string(v)
|
"rawMsg", msg.String(),
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetSymbol(); err == nil {
|
|
||||||
report.Symbol = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetSide(); err == nil {
|
|
||||||
report.Side = string(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetOrderQty(); err == nil {
|
|
||||||
report.OrderQty = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetPrice(); err == nil {
|
|
||||||
report.Price = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetLastPx(); err == nil {
|
|
||||||
report.LastPx = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetLastQty(); err == nil {
|
|
||||||
report.LastQty = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetCumQty(); err == nil {
|
|
||||||
report.CumQty = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetLeavesQty(); err == nil {
|
|
||||||
report.LeavesQty = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetAvgPx(); err == nil {
|
|
||||||
report.AvgPx = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetTransactTime(); err == nil {
|
|
||||||
report.TransactTime = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, err := er.GetAccount(); err == nil {
|
|
||||||
report.Account = v
|
|
||||||
}
|
|
||||||
|
|
||||||
a.repo.Save(report)
|
|
||||||
|
|
||||||
slog.Info("ExecutionReport stored",
|
|
||||||
"orderID", report.OrderID,
|
|
||||||
"execType", report.ExecType,
|
|
||||||
"ordStatus", report.OrdStatus,
|
|
||||||
"symbol", report.Symbol,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
rejErr := a.router.Route(msg, sessionID)
|
||||||
|
if rejErr != nil {
|
||||||
|
slog.Error("FIX FromApp routing failed",
|
||||||
|
"msgType", string(msgType),
|
||||||
|
"error", rejErr.Error(),
|
||||||
|
"isBusinessReject", rejErr.IsBusinessReject(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
return rejErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *application) handleTradeCaptureReport(msg tradecapturereport.TradeCaptureReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||||
|
tradeReportID, _ := msg.GetTradeReportID()
|
||||||
|
|
||||||
|
slog.Info("TradeCaptureReport received",
|
||||||
|
"tradeReportID", tradeReportID,
|
||||||
|
"session", sessionID.String(),
|
||||||
|
)
|
||||||
|
|
||||||
|
if a.onTradeCaptureReport != nil {
|
||||||
|
a.onTradeCaptureReport(msg, sessionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *application) handleAllocationReport(msg allocationreport.AllocationReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||||
|
allocReportID, _ := msg.GetAllocReportID()
|
||||||
|
|
||||||
|
slog.Info("AllocationReport received",
|
||||||
|
"allocReportID", allocReportID,
|
||||||
|
"session", sessionID.String(),
|
||||||
|
)
|
||||||
|
|
||||||
|
if a.onAllocationReport != nil {
|
||||||
|
a.onAllocationReport(msg, sessionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *application) handleConfirmation(msg confirmation.Confirmation, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||||
|
confirmID, _ := msg.GetConfirmID()
|
||||||
|
|
||||||
|
slog.Info("Confirmation received",
|
||||||
|
"confirmID", confirmID,
|
||||||
|
"session", sessionID.String(),
|
||||||
|
)
|
||||||
|
|
||||||
|
if a.onConfirmation != nil {
|
||||||
|
a.onConfirmation(msg, sessionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,49 +0,0 @@
|
|||||||
package fix
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"quantex.com/qfixpt/quickfix"
|
|
||||||
"quantex.com/qfixpt/src/domain"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Initiator wraps the quickfix Initiator lifecycle.
|
|
||||||
type Initiator struct {
|
|
||||||
inner *quickfix.Initiator
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewInitiator creates and starts a FIX initiator using the given settings file path.
|
|
||||||
func NewInitiator(settingsFile string, repo domain.OrderRepository) (*Initiator, error) {
|
|
||||||
f, err := os.Open(settingsFile)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("opening FIX settings file %q: %w", settingsFile, err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
settings, err := quickfix.ParseSettings(f)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("parsing FIX settings: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
app := NewApplication(repo)
|
|
||||||
storeFactory := quickfix.NewMemoryStoreFactory()
|
|
||||||
logFactory := quickfix.NewNullLogFactory()
|
|
||||||
|
|
||||||
initiator, err := quickfix.NewInitiator(app, storeFactory, settings, logFactory)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("creating FIX initiator: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Initiator{inner: initiator}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start begins connecting to the FIX counterparty.
|
|
||||||
func (i *Initiator) Start() error {
|
|
||||||
return i.inner.Start()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop gracefully disconnects all FIX sessions.
|
|
||||||
func (i *Initiator) Stop() {
|
|
||||||
i.inner.Stop()
|
|
||||||
}
|
|
||||||
581
src/client/fix/manager.go
Normal file
581
src/client/fix/manager.go
Normal file
@ -0,0 +1,581 @@
|
|||||||
|
package fix
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
"github.com/shopspring/decimal"
|
||||||
|
|
||||||
|
"quantex.com/qfixpt/quickfix"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/enum"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/field"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreportack"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmationack"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereportack"
|
||||||
|
filelog "quantex.com/qfixpt/quickfix/log/file"
|
||||||
|
filestore "quantex.com/qfixpt/quickfix/store/file"
|
||||||
|
"quantex.com/qfixpt/src/app"
|
||||||
|
"quantex.com/qfixpt/src/common/tracerr"
|
||||||
|
"quantex.com/qfixpt/src/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Custom Tradeweb tags not in generated code.
|
||||||
|
const (
|
||||||
|
tagTestMessage = 23029
|
||||||
|
tagTWTradeID = 23068
|
||||||
|
tagTWOrigTradeID = 23096
|
||||||
|
tagTWClrID = 23025
|
||||||
|
tagDlrClrID = 23027
|
||||||
|
tagClearingStatus = 5440
|
||||||
|
tagTradingSystemID = 6731
|
||||||
|
)
|
||||||
|
|
||||||
|
// postTrade is the internal representation of a trade in memory.
|
||||||
|
type postTrade struct {
|
||||||
|
TradeID string
|
||||||
|
TradeReportID string
|
||||||
|
TradeReportType string
|
||||||
|
Side string
|
||||||
|
LastQty decimal.Decimal
|
||||||
|
LastPx decimal.Decimal
|
||||||
|
SettlDate string
|
||||||
|
TradeDate string
|
||||||
|
Status domain.PostTradeStatus
|
||||||
|
TWTradeID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manager wraps the QuickFIX initiator for post-trade message handling.
|
||||||
|
type Manager struct {
|
||||||
|
initiator *quickfix.Initiator
|
||||||
|
app *application
|
||||||
|
tradesMu sync.RWMutex
|
||||||
|
trades map[string]*postTrade
|
||||||
|
store domain.PersistenceStore
|
||||||
|
notify domain.Notifier
|
||||||
|
cfg app.FIXConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
|
||||||
|
return &Manager{
|
||||||
|
trades: make(map[string]*postTrade),
|
||||||
|
store: store,
|
||||||
|
notify: notify,
|
||||||
|
cfg: cfg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Start() error {
|
||||||
|
fixApp := newApplication(m.notify)
|
||||||
|
fixApp.onLogon = m.onLogon
|
||||||
|
fixApp.onLogout = m.onLogout
|
||||||
|
fixApp.onTradeCaptureReport = m.handleTradeCaptureReport
|
||||||
|
fixApp.onAllocationReport = m.handleAllocationReport
|
||||||
|
fixApp.onConfirmation = m.handleConfirmation
|
||||||
|
fixApp.onRawMessage = m.handleRawMessage
|
||||||
|
m.app = fixApp
|
||||||
|
|
||||||
|
if err := m.loadTrades(); err != nil {
|
||||||
|
slog.Error("failed to load trades from DB, starting with empty state", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Open(m.cfg.SettingsFile)
|
||||||
|
if err != nil {
|
||||||
|
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err)
|
||||||
|
log.Error().Msg(err.Error())
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
settings, err := quickfix.ParseSettings(f)
|
||||||
|
if err != nil {
|
||||||
|
err = tracerr.Errorf("error parsing FIX settings: %s", err)
|
||||||
|
log.Error().Msg(err.Error())
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
storeFactory := filestore.NewStoreFactory(settings)
|
||||||
|
logFactory, err := filelog.NewLogFactory(settings)
|
||||||
|
if err != nil {
|
||||||
|
err = tracerr.Errorf("error creating file log factory: %s", err)
|
||||||
|
log.Error().Msg(err.Error())
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
|
||||||
|
if err != nil {
|
||||||
|
err = tracerr.Errorf("error creating FIX initiator: %s", err)
|
||||||
|
log.Error().Msg(err.Error())
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.initiator = initiator
|
||||||
|
|
||||||
|
if err = m.initiator.Start(); err != nil {
|
||||||
|
err = tracerr.Errorf("error starting FIX initiator: %s", err)
|
||||||
|
log.Error().Msg(err.Error())
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Stop() {
|
||||||
|
if m.initiator != nil {
|
||||||
|
m.initiator.Stop()
|
||||||
|
slog.Info("FIX initiator stopped")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) onLogon(sessionID quickfix.SessionID) {
|
||||||
|
slog.Info("FIX PT session logged on", "session", sessionID.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) onLogout(sessionID quickfix.SessionID) {
|
||||||
|
slog.Info("FIX PT session logged out", "session", sessionID.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleTradeCaptureReport processes an incoming TradeCaptureReport (35=AE).
|
||||||
|
func (m *Manager) handleTradeCaptureReport(msg tradecapturereport.TradeCaptureReport, sessionID quickfix.SessionID) {
|
||||||
|
tradeReportID, _ := msg.GetTradeReportID()
|
||||||
|
|
||||||
|
// ACK immediately before any validation.
|
||||||
|
ack := tradecapturereportack.New()
|
||||||
|
ack.SetTradeReportID(tradeReportID)
|
||||||
|
|
||||||
|
if err := quickfix.SendToTarget(ack, sessionID); err != nil {
|
||||||
|
slog.Error("failed to send TradeCaptureReportAck", "tradeReportID", tradeReportID, "error", err.Error())
|
||||||
|
} else {
|
||||||
|
slog.Info("TradeCaptureReportAck sent", "tradeReportID", tradeReportID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if test message.
|
||||||
|
testMessage := getCustomBool(&msg.Message.Body.FieldMap, tagTestMessage)
|
||||||
|
if testMessage {
|
||||||
|
slog.Info("test TradeCaptureReport, skipping", "tradeReportID", tradeReportID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse trade data.
|
||||||
|
tradeReportType, _ := msg.GetTradeReportType()
|
||||||
|
tradeID, _ := msg.GetTradeID()
|
||||||
|
execID, _ := msg.GetExecID()
|
||||||
|
lastQty, _ := msg.GetLastQty()
|
||||||
|
lastPx, _ := msg.GetLastPx()
|
||||||
|
spread, _ := msg.GetSpread()
|
||||||
|
settlDate, _ := msg.GetSettlDate()
|
||||||
|
tradeDate, _ := msg.GetTradeDate()
|
||||||
|
grossTradeAmt, _ := msg.GetGrossTradeAmt()
|
||||||
|
tradeReportRefID, _ := msg.GetTradeReportRefID()
|
||||||
|
|
||||||
|
// Custom fields.
|
||||||
|
twTradeID := getCustomString(&msg.Message.Body.FieldMap, tagTWTradeID)
|
||||||
|
twOrigTradeID := getCustomString(&msg.Message.Body.FieldMap, tagTWOrigTradeID)
|
||||||
|
|
||||||
|
// Extract side-level fields (parties, accrued interest, net money).
|
||||||
|
var side string
|
||||||
|
var accruedInterest, netMoney decimal.Decimal
|
||||||
|
var clearingFirm, twTraderID string
|
||||||
|
|
||||||
|
sides, sideErr := msg.GetNoSides()
|
||||||
|
if sideErr == nil && sides.Len() > 0 {
|
||||||
|
s := sides.Get(0)
|
||||||
|
sideVal, _ := s.GetSide()
|
||||||
|
side = string(sideVal)
|
||||||
|
accruedInterest, _ = s.GetAccruedInterestAmt()
|
||||||
|
netMoney, _ = s.GetNetMoney()
|
||||||
|
|
||||||
|
clearingFirm = extractPartyByRole(s, enum.PartyRole_CLEARING_FIRM)
|
||||||
|
// 1007 is a custom Tradeweb role, access via string comparison.
|
||||||
|
twTraderID = extractPartyByRole(s, "1007")
|
||||||
|
}
|
||||||
|
|
||||||
|
trade := domain.Trade{
|
||||||
|
TradeReportID: tradeReportID,
|
||||||
|
TradeID: tradeID,
|
||||||
|
ExecID: execID,
|
||||||
|
LastQty: lastQty,
|
||||||
|
LastPx: lastPx,
|
||||||
|
Spread: spread,
|
||||||
|
Side: side,
|
||||||
|
AccruedInterest: accruedInterest,
|
||||||
|
NetMoney: netMoney,
|
||||||
|
GrossTradeAmt: grossTradeAmt,
|
||||||
|
SettlDate: settlDate,
|
||||||
|
TradeDate: tradeDate,
|
||||||
|
TradeReportType: string(tradeReportType),
|
||||||
|
TradeReportRefID: tradeReportRefID,
|
||||||
|
ClearingFirm: clearingFirm,
|
||||||
|
TradewebTraderID: twTraderID,
|
||||||
|
TWTradeID: twTradeID,
|
||||||
|
TWOrigTradeID: twOrigTradeID,
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("TradeCaptureReport parsed",
|
||||||
|
"tradeReportID", trade.TradeReportID,
|
||||||
|
"tradeReportType", trade.TradeReportType,
|
||||||
|
"tradeID", trade.TradeID,
|
||||||
|
"lastQty", trade.LastQty.String(),
|
||||||
|
"lastPx", trade.LastPx.String(),
|
||||||
|
"side", trade.Side,
|
||||||
|
"settlDate", trade.SettlDate,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Persist structured message.
|
||||||
|
m.persistMessage(tradeID, parseTradeCaptureReportJSON(msg))
|
||||||
|
|
||||||
|
// Update in-memory state.
|
||||||
|
var status domain.PostTradeStatus
|
||||||
|
|
||||||
|
switch string(tradeReportType) {
|
||||||
|
case "101": // TRDCONF — trade confirmation
|
||||||
|
status = domain.PostTradeStatusActive
|
||||||
|
slog.Info("TRDCONF: trade confirmation", "tradeReportID", tradeReportID, "tradeID", tradeID)
|
||||||
|
case "102": // TRDBLOCK — block trade, log only
|
||||||
|
status = domain.PostTradeStatusActive
|
||||||
|
slog.Info("TRDBLOCK: block trade", "tradeReportID", tradeReportID, "tradeID", tradeID)
|
||||||
|
case "103": // TRDCORR — trade correction
|
||||||
|
status = domain.PostTradeStatusCorrected
|
||||||
|
slog.Info("TRDCORR: trade correction", "tradeReportID", tradeReportID, "refID", tradeReportRefID)
|
||||||
|
case "104": // TRDCXL — trade cancellation
|
||||||
|
status = domain.PostTradeStatusCancelled
|
||||||
|
slog.Info("TRDCXL: trade cancel", "tradeReportID", tradeReportID, "refID", tradeReportRefID)
|
||||||
|
case "105", "106", "107", "108": // log and ack only, no business logic
|
||||||
|
status = domain.PostTradeStatusActive
|
||||||
|
slog.Info("trade report (no business logic)", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID)
|
||||||
|
default:
|
||||||
|
status = domain.PostTradeStatusActive
|
||||||
|
slog.Warn("unexpected TradeReportType", "tradeReportType", string(tradeReportType), "tradeReportID", tradeReportID)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.tradesMu.Lock()
|
||||||
|
m.trades[tradeID] = &postTrade{
|
||||||
|
TradeID: tradeID,
|
||||||
|
TradeReportID: tradeReportID,
|
||||||
|
TradeReportType: string(tradeReportType),
|
||||||
|
Side: side,
|
||||||
|
LastQty: lastQty,
|
||||||
|
LastPx: lastPx,
|
||||||
|
SettlDate: settlDate,
|
||||||
|
TradeDate: tradeDate,
|
||||||
|
Status: status,
|
||||||
|
TWTradeID: twTradeID,
|
||||||
|
}
|
||||||
|
m.tradesMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleAllocationReport processes an incoming AllocationReport (35=AS).
|
||||||
|
func (m *Manager) handleAllocationReport(msg allocationreport.AllocationReport, sessionID quickfix.SessionID) {
|
||||||
|
allocReportID, _ := msg.GetAllocReportID()
|
||||||
|
|
||||||
|
// ACK immediately.
|
||||||
|
ack := allocationreportack.New(field.NewAllocReportID(allocReportID))
|
||||||
|
|
||||||
|
if err := quickfix.SendToTarget(ack, sessionID); err != nil {
|
||||||
|
slog.Error("failed to send AllocationReportAck", "allocReportID", allocReportID, "error", err.Error())
|
||||||
|
} else {
|
||||||
|
slog.Info("AllocationReportAck sent", "allocReportID", allocReportID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if test message.
|
||||||
|
testMessage := getCustomBool(&msg.Message.Body.FieldMap, tagTestMessage)
|
||||||
|
if testMessage {
|
||||||
|
slog.Info("test AllocationReport, skipping", "allocReportID", allocReportID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
allocTransType, _ := msg.GetAllocTransType()
|
||||||
|
|
||||||
|
// Extract TradeID: try tag 1003 directly, fall back to SecondaryOrderID (198) from NoOrders.
|
||||||
|
tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003)
|
||||||
|
if tradeID == "" {
|
||||||
|
if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 {
|
||||||
|
if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" {
|
||||||
|
tradeID = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
alloc := domain.Allocation{
|
||||||
|
AllocReportID: allocReportID,
|
||||||
|
AllocTransType: string(allocTransType),
|
||||||
|
TradeID: tradeID,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse NoAllocs repeating group.
|
||||||
|
allocs, allocErr := msg.GetNoAllocs()
|
||||||
|
if allocErr == nil {
|
||||||
|
for i := 0; i < allocs.Len(); i++ {
|
||||||
|
entry := allocs.Get(i)
|
||||||
|
account, _ := entry.GetAllocAccount()
|
||||||
|
qty, _ := entry.GetAllocQty()
|
||||||
|
individualID, _ := entry.GetIndividualAllocID()
|
||||||
|
price, _ := entry.GetAllocPrice()
|
||||||
|
|
||||||
|
alloc.Entries = append(alloc.Entries, domain.AllocEntry{
|
||||||
|
AllocAccount: account,
|
||||||
|
AllocQty: qty,
|
||||||
|
IndividualAllocID: individualID,
|
||||||
|
AllocPrice: price,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("AllocationReport parsed",
|
||||||
|
"allocReportID", alloc.AllocReportID,
|
||||||
|
"allocTransType", alloc.AllocTransType,
|
||||||
|
"tradeID", alloc.TradeID,
|
||||||
|
"numEntries", len(alloc.Entries),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Persist structured message.
|
||||||
|
m.persistMessage(tradeID, parseAllocationReportJSON(msg))
|
||||||
|
|
||||||
|
switch allocTransType {
|
||||||
|
case enum.AllocTransType_NEW:
|
||||||
|
slog.Info("new allocation", "allocReportID", allocReportID)
|
||||||
|
case enum.AllocTransType_REPLACE:
|
||||||
|
slog.Info("replace allocation", "allocReportID", allocReportID)
|
||||||
|
case enum.AllocTransType_CANCEL:
|
||||||
|
slog.Info("cancel allocation", "allocReportID", allocReportID)
|
||||||
|
default:
|
||||||
|
slog.Warn("unhandled AllocTransType", "allocTransType", string(allocTransType), "allocReportID", allocReportID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleConfirmation processes an incoming Confirmation (35=AK).
|
||||||
|
func (m *Manager) handleConfirmation(msg confirmation.Confirmation, sessionID quickfix.SessionID) {
|
||||||
|
confirmID, _ := msg.GetConfirmID()
|
||||||
|
tradeDate, _ := msg.GetTradeDate()
|
||||||
|
transactTime, _ := msg.GetTransactTime()
|
||||||
|
|
||||||
|
// ACK immediately.
|
||||||
|
ack := confirmationack.New(
|
||||||
|
field.NewConfirmID(confirmID),
|
||||||
|
field.NewTradeDate(tradeDate),
|
||||||
|
field.NewTransactTime(transactTime),
|
||||||
|
field.NewAffirmStatus(enum.AffirmStatus_RECEIVED),
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := quickfix.SendToTarget(ack, sessionID); err != nil {
|
||||||
|
slog.Error("failed to send ConfirmationAck", "confirmID", confirmID, "error", err.Error())
|
||||||
|
} else {
|
||||||
|
slog.Info("ConfirmationAck sent", "confirmID", confirmID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if test message.
|
||||||
|
testMessage := getCustomBool(&msg.Message.Body.FieldMap, tagTestMessage)
|
||||||
|
if testMessage {
|
||||||
|
slog.Info("test Confirmation, skipping", "confirmID", confirmID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
confirmStatus, _ := msg.GetConfirmStatus()
|
||||||
|
confirmTransType, _ := msg.GetConfirmTransType()
|
||||||
|
individualAllocID, _ := msg.GetIndividualAllocID()
|
||||||
|
|
||||||
|
// Custom fields.
|
||||||
|
twClrID := getCustomString(&msg.Message.Body.FieldMap, tagTWClrID)
|
||||||
|
dlrClrID := getCustomString(&msg.Message.Body.FieldMap, tagDlrClrID)
|
||||||
|
clearingStatus := getCustomString(&msg.Message.Body.FieldMap, tagClearingStatus)
|
||||||
|
|
||||||
|
conf := domain.ConfirmationMsg{
|
||||||
|
ConfirmID: confirmID,
|
||||||
|
ConfirmStatus: string(confirmStatus),
|
||||||
|
ConfirmTransType: string(confirmTransType),
|
||||||
|
IndividualAllocID: individualAllocID,
|
||||||
|
TWClrID: twClrID,
|
||||||
|
DlrClrID: dlrClrID,
|
||||||
|
ClearingStatus: clearingStatus,
|
||||||
|
TradeDate: tradeDate,
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("Confirmation parsed",
|
||||||
|
"confirmID", conf.ConfirmID,
|
||||||
|
"confirmStatus", conf.ConfirmStatus,
|
||||||
|
"confirmTransType", conf.ConfirmTransType,
|
||||||
|
"individualAllocID", conf.IndividualAllocID,
|
||||||
|
"clearingStatus", conf.ClearingStatus,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Persist structured message. Use TradeID if available, fall back to ConfirmID.
|
||||||
|
persistID := getCustomString(&msg.Message.Body.FieldMap, 1003)
|
||||||
|
if persistID == "" {
|
||||||
|
persistID = confirmID
|
||||||
|
}
|
||||||
|
|
||||||
|
m.persistMessage(persistID, parseConfirmationJSON(msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
// persistMessage saves a structured FIX message to the messages table.
|
||||||
|
func (m *Manager) persistMessage(tradeID string, fixJSON domain.FixMessageJSON) {
|
||||||
|
if err := m.store.SaveMessage(domain.TradeMessage{
|
||||||
|
TradeID: tradeID,
|
||||||
|
JMessage: fixJSON,
|
||||||
|
}); err != nil {
|
||||||
|
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "tradeID", tradeID, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleRawMessage persists raw FIX message strings to the logs table.
|
||||||
|
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
|
||||||
|
tradeID := extractTradeIdentifier(msg)
|
||||||
|
if tradeID == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.store.SaveLog(domain.LogEntry{
|
||||||
|
TradeID: tradeID,
|
||||||
|
RawMsg: "[" + direction + "] " + msg.String(),
|
||||||
|
}); err != nil {
|
||||||
|
slog.Error("failed to persist raw log", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadTrades reconstructs all trades and their states from today's messages in the database.
|
||||||
|
func (m *Manager) loadTrades() error {
|
||||||
|
messages, err := m.store.GetTodayMessages()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
trades := make(map[string]*postTrade)
|
||||||
|
|
||||||
|
for _, msg := range messages {
|
||||||
|
switch msg.JMessage.MsgType {
|
||||||
|
case "AE": // TradeCaptureReport
|
||||||
|
body := msg.JMessage.Body
|
||||||
|
tradeReportType, _ := body["TradeReportType"].(string)
|
||||||
|
tradeID := msg.TradeID
|
||||||
|
|
||||||
|
if tradeID == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
t, exists := trades[tradeID]
|
||||||
|
if !exists {
|
||||||
|
t = &postTrade{
|
||||||
|
TradeID: tradeID,
|
||||||
|
Status: domain.PostTradeStatusActive,
|
||||||
|
}
|
||||||
|
trades[tradeID] = t
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := body["TradeReportID"].(string); ok {
|
||||||
|
t.TradeReportID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
t.TradeReportType = tradeReportType
|
||||||
|
|
||||||
|
if v, ok := body["Side"].(string); ok {
|
||||||
|
t.Side = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := body["LastQty"].(string); ok {
|
||||||
|
t.LastQty, _ = decimal.NewFromString(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := body["LastPx"].(string); ok {
|
||||||
|
t.LastPx, _ = decimal.NewFromString(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := body["SettlDate"].(string); ok {
|
||||||
|
t.SettlDate = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := body["TradeDate"].(string); ok {
|
||||||
|
t.TradeDate = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := body["TWTradeID"].(string); ok {
|
||||||
|
t.TWTradeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
switch tradeReportType {
|
||||||
|
case "103": // TRDCORR
|
||||||
|
t.Status = domain.PostTradeStatusCorrected
|
||||||
|
case "104": // TRDCXL
|
||||||
|
t.Status = domain.PostTradeStatusCancelled
|
||||||
|
default:
|
||||||
|
if t.Status != domain.PostTradeStatusCorrected && t.Status != domain.PostTradeStatusCancelled {
|
||||||
|
t.Status = domain.PostTradeStatusActive
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
active := 0
|
||||||
|
for _, t := range trades {
|
||||||
|
if t.Status == domain.PostTradeStatusActive {
|
||||||
|
active++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m.tradesMu.Lock()
|
||||||
|
m.trades = trades
|
||||||
|
m.tradesMu.Unlock()
|
||||||
|
|
||||||
|
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTrades returns only active trades.
|
||||||
|
func (m *Manager) GetTrades() []domain.PostTrade {
|
||||||
|
m.tradesMu.RLock()
|
||||||
|
defer m.tradesMu.RUnlock()
|
||||||
|
|
||||||
|
var result []domain.PostTrade
|
||||||
|
|
||||||
|
for _, t := range m.trades {
|
||||||
|
if t.Status == domain.PostTradeStatusActive {
|
||||||
|
result = append(result, t.toPostTrade())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAllTrades returns all trades regardless of status.
|
||||||
|
func (m *Manager) GetAllTrades() []domain.PostTrade {
|
||||||
|
m.tradesMu.RLock()
|
||||||
|
defer m.tradesMu.RUnlock()
|
||||||
|
|
||||||
|
result := make([]domain.PostTrade, 0, len(m.trades))
|
||||||
|
|
||||||
|
for _, t := range m.trades {
|
||||||
|
result = append(result, t.toPostTrade())
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *postTrade) toPostTrade() domain.PostTrade {
|
||||||
|
return domain.PostTrade{
|
||||||
|
TradeID: t.TradeID,
|
||||||
|
TradeReportID: t.TradeReportID,
|
||||||
|
TradeReportType: t.TradeReportType,
|
||||||
|
Side: t.Side,
|
||||||
|
LastQty: t.LastQty.String(),
|
||||||
|
LastPx: t.LastPx.String(),
|
||||||
|
SettlDate: t.SettlDate,
|
||||||
|
TradeDate: t.TradeDate,
|
||||||
|
Status: t.Status,
|
||||||
|
TWTradeID: t.TWTradeID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure transactTime has a sensible default if not present in the message.
|
||||||
|
var _ = time.Now // used by confirmationack.New via field.NewTransactTime
|
||||||
356
src/client/fix/parser.go
Normal file
356
src/client/fix/parser.go
Normal file
@ -0,0 +1,356 @@
|
|||||||
|
package fix
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"quantex.com/qfixpt/quickfix"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/enum"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/allocationreport"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/confirmation"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/fix50sp2/tradecapturereport"
|
||||||
|
"quantex.com/qfixpt/quickfix/gen/tag"
|
||||||
|
"quantex.com/qfixpt/src/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getCustomString(body *quickfix.FieldMap, t int) string {
|
||||||
|
v, err := body.GetString(quickfix.Tag(t))
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCustomInt(body *quickfix.FieldMap, t int) int {
|
||||||
|
v, err := body.GetInt(quickfix.Tag(t))
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCustomBool(body *quickfix.FieldMap, t int) bool {
|
||||||
|
v, err := body.GetBool(quickfix.Tag(t))
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractHeader(msg *quickfix.Message) map[string]interface{} {
|
||||||
|
h := make(map[string]interface{})
|
||||||
|
|
||||||
|
if v, err := msg.Header.GetBytes(tag.BeginString); err == nil {
|
||||||
|
h["BeginString"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := msg.Header.GetBytes(tag.MsgType); err == nil {
|
||||||
|
h["MsgType"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil {
|
||||||
|
h["SenderCompID"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil {
|
||||||
|
h["TargetCompID"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil {
|
||||||
|
h["SendingTime"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseTradeCaptureReportJSON(msg tradecapturereport.TradeCaptureReport) domain.FixMessageJSON {
|
||||||
|
tradeID, _ := msg.GetTradeID()
|
||||||
|
|
||||||
|
body := map[string]interface{}{}
|
||||||
|
|
||||||
|
if v, _ := msg.GetTradeReportID(); v != "" {
|
||||||
|
body["TradeReportID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
body["TradeID"] = tradeID
|
||||||
|
|
||||||
|
if v, _ := msg.GetExecID(); v != "" {
|
||||||
|
body["ExecID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetTradeReportType(); v != "" {
|
||||||
|
body["TradeReportType"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetLastQty(); !v.IsZero() {
|
||||||
|
body["LastQty"] = v.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetLastPx(); !v.IsZero() {
|
||||||
|
body["LastPx"] = v.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetSpread(); !v.IsZero() {
|
||||||
|
body["Spread"] = v.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetSettlDate(); v != "" {
|
||||||
|
body["SettlDate"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetTradeDate(); v != "" {
|
||||||
|
body["TradeDate"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetGrossTradeAmt(); !v.IsZero() {
|
||||||
|
body["GrossTradeAmt"] = v.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetTradeReportRefID(); v != "" {
|
||||||
|
body["TradeReportRefID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multi-leg support.
|
||||||
|
if v, _ := msg.GetTradeLegRefID(); v != "" {
|
||||||
|
body["TradeLegRefID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v := getCustomString(&msg.Message.Body.FieldMap, tagTradingSystemID); v != "" {
|
||||||
|
body["TradingSystemID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Custom fields.
|
||||||
|
if v := getCustomString(&msg.Message.Body.FieldMap, tagTWTradeID); v != "" {
|
||||||
|
body["TWTradeID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v := getCustomString(&msg.Message.Body.FieldMap, tagTWOrigTradeID); v != "" {
|
||||||
|
body["TWOrigTradeID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Side-level fields.
|
||||||
|
sides, sideErr := msg.GetNoSides()
|
||||||
|
if sideErr == nil && sides.Len() > 0 {
|
||||||
|
s := sides.Get(0)
|
||||||
|
|
||||||
|
if v, e := s.GetSide(); e == nil {
|
||||||
|
body["Side"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, e := s.GetAccruedInterestAmt(); e == nil {
|
||||||
|
body["AccruedInterestAmt"] = v.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, e := s.GetNetMoney(); e == nil {
|
||||||
|
body["NetMoney"] = v.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v := extractPartyByRole(s, enum.PartyRole_CLEARING_FIRM); v != "" {
|
||||||
|
body["ClearingFirm"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v := extractPartyByRole(s, "1007"); v != "" {
|
||||||
|
body["TradewebTraderID"] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return domain.FixMessageJSON{
|
||||||
|
Direction: "IN",
|
||||||
|
MsgType: "AE",
|
||||||
|
TradeID: tradeID,
|
||||||
|
Header: extractHeader(msg.Message),
|
||||||
|
Body: body,
|
||||||
|
ReceiveTime: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseAllocationReportJSON(msg allocationreport.AllocationReport) domain.FixMessageJSON {
|
||||||
|
// Primary: tag 1003 directly on the body.
|
||||||
|
tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003)
|
||||||
|
|
||||||
|
// Fallback: SecondaryOrderID (198) from the NoOrders repeating group.
|
||||||
|
if tradeID == "" {
|
||||||
|
if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 {
|
||||||
|
if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" {
|
||||||
|
tradeID = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
body := map[string]interface{}{}
|
||||||
|
|
||||||
|
if v, _ := msg.GetAllocReportID(); v != "" {
|
||||||
|
body["AllocReportID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetAllocTransType(); v != "" {
|
||||||
|
body["AllocTransType"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
body["TradeID"] = tradeID
|
||||||
|
|
||||||
|
// TradingSystemID (6731) — for multi-leg, differentiates each leg.
|
||||||
|
if v := getCustomString(&msg.Message.Body.FieldMap, tagTradingSystemID); v != "" {
|
||||||
|
body["TradingSystemID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// SecondaryOrderID (198) from NoOrders — explicit reference to original trade.
|
||||||
|
if orders, err := msg.GetNoOrders(); err == nil && orders.Len() > 0 {
|
||||||
|
if v, e := orders.Get(0).GetSecondaryOrderID(); e == nil && v != "" {
|
||||||
|
body["SecondaryOrderID"] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
allocs, allocErr := msg.GetNoAllocs()
|
||||||
|
if allocErr == nil {
|
||||||
|
var entries []map[string]interface{}
|
||||||
|
|
||||||
|
for i := 0; i < allocs.Len(); i++ {
|
||||||
|
entry := allocs.Get(i)
|
||||||
|
e := map[string]interface{}{}
|
||||||
|
|
||||||
|
if v, err := entry.GetAllocAccount(); err == nil {
|
||||||
|
e["AllocAccount"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := entry.GetAllocQty(); err == nil {
|
||||||
|
e["AllocQty"] = v.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := entry.GetIndividualAllocID(); err == nil {
|
||||||
|
e["IndividualAllocID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := entry.GetAllocPrice(); err == nil {
|
||||||
|
e["AllocPrice"] = v.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
entries = append(entries, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
body["Allocs"] = entries
|
||||||
|
}
|
||||||
|
|
||||||
|
return domain.FixMessageJSON{
|
||||||
|
Direction: "IN",
|
||||||
|
MsgType: "AS",
|
||||||
|
TradeID: tradeID,
|
||||||
|
Header: extractHeader(msg.Message),
|
||||||
|
Body: body,
|
||||||
|
ReceiveTime: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseConfirmationJSON(msg confirmation.Confirmation) domain.FixMessageJSON {
|
||||||
|
// Try TradeID first, fall back to ConfirmID.
|
||||||
|
tradeID := getCustomString(&msg.Message.Body.FieldMap, 1003)
|
||||||
|
if tradeID == "" {
|
||||||
|
tradeID, _ = msg.GetConfirmID()
|
||||||
|
}
|
||||||
|
|
||||||
|
body := map[string]interface{}{}
|
||||||
|
|
||||||
|
if v, _ := msg.GetConfirmID(); v != "" {
|
||||||
|
body["ConfirmID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetConfirmStatus(); v != "" {
|
||||||
|
body["ConfirmStatus"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetConfirmTransType(); v != "" {
|
||||||
|
body["ConfirmTransType"] = string(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetIndividualAllocID(); v != "" {
|
||||||
|
body["IndividualAllocID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, _ := msg.GetTradeDate(); v != "" {
|
||||||
|
body["TradeDate"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v := getCustomString(&msg.Message.Body.FieldMap, tagTWClrID); v != "" {
|
||||||
|
body["TWClrID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v := getCustomString(&msg.Message.Body.FieldMap, tagDlrClrID); v != "" {
|
||||||
|
body["DlrClrID"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v := getCustomString(&msg.Message.Body.FieldMap, tagClearingStatus); v != "" {
|
||||||
|
body["ClearingStatus"] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return domain.FixMessageJSON{
|
||||||
|
Direction: "IN",
|
||||||
|
MsgType: "AK",
|
||||||
|
TradeID: tradeID,
|
||||||
|
Header: extractHeader(msg.Message),
|
||||||
|
Body: body,
|
||||||
|
ReceiveTime: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractTradeIdentifier(msg *quickfix.Message) string {
|
||||||
|
// Primary: TradeID (1003).
|
||||||
|
var tradeID quickfix.FIXString
|
||||||
|
if err := msg.Body.GetField(tag.TradeID, &tradeID); err == nil && string(tradeID) != "" {
|
||||||
|
return string(tradeID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback: SecondaryOrderID (198) — used in AllocationReport to reference the trade.
|
||||||
|
var secOrderID quickfix.FIXString
|
||||||
|
if err := msg.Body.GetField(tag.SecondaryOrderID, &secOrderID); err == nil && string(secOrderID) != "" {
|
||||||
|
return string(secOrderID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Last resort: ConfirmID (664) — for Confirmation messages without TradeID.
|
||||||
|
var confirmID quickfix.FIXString
|
||||||
|
if err := msg.Body.GetField(tag.ConfirmID, &confirmID); err == nil && string(confirmID) != "" {
|
||||||
|
return string(confirmID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildOutgoingMessageJSON(msgType, tradeID string, body map[string]interface{}) domain.FixMessageJSON {
|
||||||
|
return domain.FixMessageJSON{
|
||||||
|
Direction: "OUT",
|
||||||
|
MsgType: msgType,
|
||||||
|
TradeID: tradeID,
|
||||||
|
Body: body,
|
||||||
|
ReceiveTime: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractPartyByRole iterates the Parties repeating group inside a NoSides entry
|
||||||
|
// and returns the PartyID for the given target role.
|
||||||
|
func extractPartyByRole(side tradecapturereport.NoSides, targetRole enum.PartyRole) string {
|
||||||
|
parties, err := side.GetNoPartyIDs()
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < parties.Len(); i++ {
|
||||||
|
party := parties.Get(i)
|
||||||
|
|
||||||
|
role, roleErr := party.GetPartyRole()
|
||||||
|
if roleErr != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if role == targetRole {
|
||||||
|
id, idErr := party.GetPartyID()
|
||||||
|
if idErr != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
@ -1,58 +0,0 @@
|
|||||||
// Package repository provides in-memory storage implementations.
|
|
||||||
package repository
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"quantex.com/qfixpt/src/domain"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Memory is a thread-safe in-memory implementation of domain.OrderRepository.
|
|
||||||
type Memory struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
orders map[string]domain.ExecutionReport
|
|
||||||
ordered []string // maintains insertion order
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewMemory creates a new in-memory order repository.
|
|
||||||
func NewMemory() *Memory {
|
|
||||||
return &Memory{
|
|
||||||
orders: make(map[string]domain.ExecutionReport),
|
|
||||||
ordered: []string{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save stores an execution report, overwriting any existing entry with the same OrderID.
|
|
||||||
func (m *Memory) Save(report domain.ExecutionReport) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
if _, exists := m.orders[report.OrderID]; !exists {
|
|
||||||
m.ordered = append(m.ordered, report.OrderID)
|
|
||||||
}
|
|
||||||
|
|
||||||
m.orders[report.OrderID] = report
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAll returns all stored execution reports in insertion order.
|
|
||||||
func (m *Memory) GetAll() []domain.ExecutionReport {
|
|
||||||
m.mu.RLock()
|
|
||||||
defer m.mu.RUnlock()
|
|
||||||
|
|
||||||
result := make([]domain.ExecutionReport, 0, len(m.ordered))
|
|
||||||
for _, id := range m.ordered {
|
|
||||||
result = append(result, m.orders[id])
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetByOrderID returns the execution report for the given OrderID.
|
|
||||||
func (m *Memory) GetByOrderID(orderID string) (domain.ExecutionReport, bool) {
|
|
||||||
m.mu.RLock()
|
|
||||||
defer m.mu.RUnlock()
|
|
||||||
|
|
||||||
r, ok := m.orders[orderID]
|
|
||||||
|
|
||||||
return r, ok
|
|
||||||
}
|
|
||||||
16
src/client/store/db.sql
Normal file
16
src/client/store/db.sql
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS qfixpt_messages (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
trade_id TEXT NOT NULL,
|
||||||
|
j_message JSONB NOT NULL,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_pt_messages_trade_id ON qfixpt_messages(trade_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_pt_messages_created_at ON qfixpt_messages(created_at);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS qfixpt_logs (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
trade_id TEXT NOT NULL UNIQUE,
|
||||||
|
raw_msg TEXT NOT NULL,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||||
|
);
|
||||||
@ -2,7 +2,9 @@
|
|||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
_ "embed"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"quantex.com.ar/multidb"
|
"quantex.com.ar/multidb"
|
||||||
@ -11,6 +13,9 @@ import (
|
|||||||
"quantex.com/qfixpt/src/common/tracerr"
|
"quantex.com/qfixpt/src/common/tracerr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//go:embed db.sql
|
||||||
|
var schemaSQL string
|
||||||
|
|
||||||
const dbPingSeconds = 30
|
const dbPingSeconds = 30
|
||||||
|
|
||||||
type Store struct {
|
type Store struct {
|
||||||
@ -45,9 +50,31 @@ func New(config Config) (*Store, error) {
|
|||||||
|
|
||||||
go s.db.PeriodicDBPing(time.Second * dbPingSeconds)
|
go s.db.PeriodicDBPing(time.Second * dbPingSeconds)
|
||||||
|
|
||||||
|
if err := s.ensureTables(); err != nil {
|
||||||
|
return nil, tracerr.Errorf("error ensuring tables: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Store) ensureTables() error {
|
||||||
|
statements := strings.Split(schemaSQL, ";")
|
||||||
|
for _, stmt := range statements {
|
||||||
|
stmt = strings.TrimSpace(stmt)
|
||||||
|
if stmt == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := p.db.Exec(stmt); err != nil {
|
||||||
|
return tracerr.Errorf("error executing schema statement: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("database tables ensured")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Store) CloseDB() {
|
func (p *Store) CloseDB() {
|
||||||
p.db.Close()
|
p.db.Close()
|
||||||
slog.Info("closing database connection.")
|
slog.Info("closing database connection.")
|
||||||
|
|||||||
103
src/client/store/persistence.go
Normal file
103
src/client/store/persistence.go
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"quantex.com/qfixpt/src/common/tracerr"
|
||||||
|
"quantex.com/qfixpt/src/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *Store) SaveMessage(msg domain.TradeMessage) error {
|
||||||
|
jsonBytes, err := json.Marshal(msg.JMessage)
|
||||||
|
if err != nil {
|
||||||
|
return tracerr.Errorf("error marshaling j_message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = p.db.Exec(
|
||||||
|
"INSERT INTO qfixpt_messages (trade_id, j_message) VALUES ($1, $2)",
|
||||||
|
msg.TradeID, string(jsonBytes),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return tracerr.Errorf("error inserting message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Store) SaveLog(entry domain.LogEntry) error {
|
||||||
|
upsertStmt := `INSERT INTO qfixpt_logs (trade_id, raw_msg)
|
||||||
|
VALUES ($1, $2)
|
||||||
|
ON CONFLICT (trade_id) DO UPDATE
|
||||||
|
SET raw_msg = qfixpt_logs.raw_msg || E'\n' || EXCLUDED.raw_msg,
|
||||||
|
updated_at = NOW()`
|
||||||
|
|
||||||
|
_, err := p.db.Exec(upsertStmt, entry.TradeID, entry.RawMsg)
|
||||||
|
if err != nil {
|
||||||
|
return tracerr.Errorf("error upserting log: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||||
|
rows, err := p.db.Query(
|
||||||
|
"SELECT id, trade_id, j_message, created_at FROM qfixpt_messages WHERE created_at >= current_date ORDER BY created_at ASC",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, tracerr.Errorf("error querying today messages: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var messages []domain.TradeMessage
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var (
|
||||||
|
id, tradeID string
|
||||||
|
jMessageRaw []byte
|
||||||
|
createdAt time.Time
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := rows.Scan(&id, &tradeID, &jMessageRaw, &createdAt); err != nil {
|
||||||
|
return nil, tracerr.Errorf("error scanning message row: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var jMessage domain.FixMessageJSON
|
||||||
|
if err := json.Unmarshal(jMessageRaw, &jMessage); err != nil {
|
||||||
|
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
messages = append(messages, domain.TradeMessage{
|
||||||
|
ID: id,
|
||||||
|
TradeID: tradeID,
|
||||||
|
JMessage: jMessage,
|
||||||
|
CreatedAt: createdAt,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, tracerr.Errorf("error iterating message rows: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Store) GetLogsByTradeID(tradeID string) (domain.Logs, error) {
|
||||||
|
rows, err := p.db.Query("SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", tradeID)
|
||||||
|
if err != nil {
|
||||||
|
return domain.Logs{}, tracerr.Errorf("error querying logs: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
if !rows.Next() {
|
||||||
|
return domain.Logs{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var rawMsg string
|
||||||
|
if err := rows.Scan(&rawMsg); err != nil {
|
||||||
|
return domain.Logs{}, tracerr.Errorf("error scanning log row: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return domain.Logs{Entries: strings.Split(rawMsg, "\n")}, nil
|
||||||
|
}
|
||||||
@ -9,7 +9,6 @@ import (
|
|||||||
"quantex.com/qfixpt/src/client/api/rest"
|
"quantex.com/qfixpt/src/client/api/rest"
|
||||||
"quantex.com/qfixpt/src/client/data"
|
"quantex.com/qfixpt/src/client/data"
|
||||||
"quantex.com/qfixpt/src/client/fix"
|
"quantex.com/qfixpt/src/client/fix"
|
||||||
fixrepo "quantex.com/qfixpt/src/client/fix/repository"
|
|
||||||
googlechat "quantex.com/qfixpt/src/client/notify/google"
|
googlechat "quantex.com/qfixpt/src/client/notify/google"
|
||||||
"quantex.com/qfixpt/src/client/store"
|
"quantex.com/qfixpt/src/client/store"
|
||||||
"quantex.com/qfixpt/src/client/store/external"
|
"quantex.com/qfixpt/src/client/store/external"
|
||||||
@ -38,28 +37,15 @@ func Runner(cfg app.Config) error {
|
|||||||
return fmt.Errorf("error trying to create store %w", err)
|
return fmt.Errorf("error trying to create store %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// In-memory repository shared between the FIX initiator and the REST API.
|
|
||||||
orderRepo := fixrepo.NewMemory()
|
|
||||||
|
|
||||||
// Start the FIX initiator if a settings file is configured.
|
|
||||||
var fixInitiator *fix.Initiator
|
|
||||||
if cfg.FIX.SettingsFile != "" {
|
|
||||||
fixInitiator, err = fix.NewInitiator(cfg.FIX.SettingsFile, orderRepo)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error creating FIX initiator: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fixInitiator.Start(); err != nil {
|
|
||||||
return fmt.Errorf("error starting FIX initiator: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
slog.Info("FIX initiator started", "settingsFile", cfg.FIX.SettingsFile)
|
|
||||||
} else {
|
|
||||||
slog.Warn("FIX.SettingsFile not configured — FIX initiator will not start")
|
|
||||||
}
|
|
||||||
|
|
||||||
userData := data.New()
|
userData := data.New()
|
||||||
|
|
||||||
|
// Initialize FIX Post-Trade Manager.
|
||||||
|
fixManager := fix.NewManager(cfg.FIX, appStore, notify)
|
||||||
|
if err = fixManager.Start(); err != nil {
|
||||||
|
return fmt.Errorf("error starting FIX initiator: %w", err)
|
||||||
|
}
|
||||||
|
defer fixManager.Stop()
|
||||||
|
|
||||||
apiConfig := rest.Config{
|
apiConfig := rest.Config{
|
||||||
Port: cfg.APIBasePort,
|
Port: cfg.APIBasePort,
|
||||||
AllowedOrigins: cfg.AllowedOrigins,
|
AllowedOrigins: cfg.AllowedOrigins,
|
||||||
@ -68,14 +54,10 @@ func Runner(cfg app.Config) error {
|
|||||||
EnableJWTAuth: cfg.EnableJWTAuth,
|
EnableJWTAuth: cfg.EnableJWTAuth,
|
||||||
}
|
}
|
||||||
|
|
||||||
api := rest.New(userData, appStore, apiConfig, notify, orderRepo)
|
api := rest.New(userData, appStore, fixManager, apiConfig, notify)
|
||||||
api.Run()
|
api.Run()
|
||||||
|
|
||||||
cmd.WaitForInterruptSignal(nil)
|
cmd.WaitForInterruptSignal(nil)
|
||||||
|
|
||||||
if fixInitiator != nil {
|
|
||||||
fixInitiator.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,34 +0,0 @@
|
|||||||
package domain
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/shopspring/decimal"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ExecutionReport represents a FIX ExecutionReport (MsgType=8) received from the counterparty.
|
|
||||||
type ExecutionReport struct {
|
|
||||||
OrderID string
|
|
||||||
ClOrdID string
|
|
||||||
ExecID string
|
|
||||||
ExecType string
|
|
||||||
OrdStatus string
|
|
||||||
Symbol string
|
|
||||||
Side string
|
|
||||||
OrderQty decimal.Decimal
|
|
||||||
Price decimal.Decimal
|
|
||||||
LastPx decimal.Decimal
|
|
||||||
LastQty decimal.Decimal
|
|
||||||
CumQty decimal.Decimal
|
|
||||||
LeavesQty decimal.Decimal
|
|
||||||
AvgPx decimal.Decimal
|
|
||||||
TransactTime time.Time
|
|
||||||
Account string
|
|
||||||
}
|
|
||||||
|
|
||||||
// OrderRepository defines the interface for storing and retrieving execution reports.
|
|
||||||
type OrderRepository interface {
|
|
||||||
Save(report ExecutionReport)
|
|
||||||
GetAll() []ExecutionReport
|
|
||||||
GetByOrderID(orderID string) (ExecutionReport, bool)
|
|
||||||
}
|
|
||||||
63
src/domain/persistence.go
Normal file
63
src/domain/persistence.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package domain
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// PostTradeStatus represents the lifecycle state of a post-trade record.
|
||||||
|
type PostTradeStatus string
|
||||||
|
|
||||||
|
const (
|
||||||
|
PostTradeStatusActive PostTradeStatus = "active"
|
||||||
|
PostTradeStatusCorrected PostTradeStatus = "corrected"
|
||||||
|
PostTradeStatusCancelled PostTradeStatus = "cancelled"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FixMessageJSON is the structured representation of a FIX message for storage.
|
||||||
|
type FixMessageJSON struct {
|
||||||
|
Direction string `json:"direction"`
|
||||||
|
MsgType string `json:"msg_type"`
|
||||||
|
TradeID string `json:"trade_id"`
|
||||||
|
Header map[string]interface{} `json:"header"`
|
||||||
|
Body map[string]interface{} `json:"body"`
|
||||||
|
ReceiveTime time.Time `json:"receive_time"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TradeMessage is a row in qfixpt_messages.
|
||||||
|
type TradeMessage struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
TradeID string `json:"trade_id"`
|
||||||
|
JMessage FixMessageJSON `json:"j_message"`
|
||||||
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogEntry is the DTO for inserting/appending a raw log in qfixpt_logs.
|
||||||
|
type LogEntry struct {
|
||||||
|
TradeID string
|
||||||
|
RawMsg string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Logs is the response for GET /trades/:tradeID/logs.
|
||||||
|
type Logs struct {
|
||||||
|
Entries []string `json:"entries"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// PostTrade is the exported representation of a post-trade record for the API.
|
||||||
|
type PostTrade struct {
|
||||||
|
TradeID string `json:"trade_id"`
|
||||||
|
TradeReportID string `json:"trade_report_id"`
|
||||||
|
TradeReportType string `json:"trade_report_type"`
|
||||||
|
Side string `json:"side"`
|
||||||
|
LastQty string `json:"last_qty"`
|
||||||
|
LastPx string `json:"last_px"`
|
||||||
|
SettlDate string `json:"settl_date"`
|
||||||
|
TradeDate string `json:"trade_date"`
|
||||||
|
Status PostTradeStatus `json:"status"`
|
||||||
|
TWTradeID string `json:"tw_trade_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// PersistenceStore defines the persistence interface for post-trade messages.
|
||||||
|
type PersistenceStore interface {
|
||||||
|
SaveMessage(msg TradeMessage) error
|
||||||
|
SaveLog(entry LogEntry) error
|
||||||
|
GetTodayMessages() ([]TradeMessage, error)
|
||||||
|
GetLogsByTradeID(tradeID string) (Logs, error)
|
||||||
|
}
|
||||||
61
src/domain/posttrade.go
Normal file
61
src/domain/posttrade.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
package domain
|
||||||
|
|
||||||
|
import "github.com/shopspring/decimal"
|
||||||
|
|
||||||
|
// Trade represents data extracted from a TradeCaptureReport (35=AE).
|
||||||
|
type Trade struct {
|
||||||
|
TradeReportID string // 571
|
||||||
|
TradeID string // 1003
|
||||||
|
ExecID string // 17
|
||||||
|
OrigTradeID string // 1903
|
||||||
|
OrigSecondaryID string // 1906
|
||||||
|
LastQty decimal.Decimal // 32
|
||||||
|
LastPx decimal.Decimal // 31
|
||||||
|
Spread decimal.Decimal // 218 (236 = benchmark curve name)
|
||||||
|
Side string // 54
|
||||||
|
AccruedInterest decimal.Decimal // 159
|
||||||
|
NetMoney decimal.Decimal // 118
|
||||||
|
GrossTradeAmt decimal.Decimal // 381
|
||||||
|
SettlDate string // 64
|
||||||
|
TradeDate string // 75
|
||||||
|
ListID string // 66
|
||||||
|
TradeReportType string // 856
|
||||||
|
TradeHandlInst string // 487
|
||||||
|
TradeReportRefID string // 572
|
||||||
|
ClearingFirm string // Party role=4
|
||||||
|
TradewebTraderID string // Party role=1007 (custom)
|
||||||
|
TestMessage bool // 23029 (custom)
|
||||||
|
TWTradeID string // 23068 (custom)
|
||||||
|
TWOrigTradeID string // 23096 (custom)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocation represents data extracted from an AllocationReport (35=AS).
|
||||||
|
type Allocation struct {
|
||||||
|
AllocReportID string // 755
|
||||||
|
AllocTransType string // 71
|
||||||
|
TradeID string // 1003
|
||||||
|
TestMessage bool // 23029 (custom)
|
||||||
|
Entries []AllocEntry // NoAllocs group (78)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AllocEntry represents a single entry in the NoAllocs repeating group.
|
||||||
|
type AllocEntry struct {
|
||||||
|
AllocAccount string // 79
|
||||||
|
AllocQty decimal.Decimal // 80
|
||||||
|
IndividualAllocID string // 467
|
||||||
|
AllocPrice decimal.Decimal // 154 (via custom tag access if needed)
|
||||||
|
CashSettlAmount decimal.Decimal // 742 (via custom tag access if needed)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConfirmationMsg represents data extracted from a Confirmation (35=AK).
|
||||||
|
type ConfirmationMsg struct {
|
||||||
|
ConfirmID string // 664
|
||||||
|
ConfirmStatus string // 665
|
||||||
|
ConfirmTransType string // 666
|
||||||
|
IndividualAllocID string // 467
|
||||||
|
TWClrID string // 23025 (custom)
|
||||||
|
DlrClrID string // 23027 (custom)
|
||||||
|
ClearingStatus string // 5440 (custom)
|
||||||
|
TradeDate string // 75
|
||||||
|
TestMessage bool // 23029 (custom)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user