From 0424e2ed797d077b9fc89aaa902b7d8bf74103f8 Mon Sep 17 00:00:00 2001 From: Ramiro Paz Date: Thu, 12 Mar 2026 12:25:10 -0300 Subject: [PATCH 1/3] makefile --- Makefile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index b149483..b22b004 100644 --- a/Makefile +++ b/Makefile @@ -56,13 +56,13 @@ build: check-env swag vendor only-build # Build a native version. Set e=environm only-build: check-env @echo "Building for $(e) environment..." - env OUT_PATH=$(DEFAULT_OUT_PATH) tools/build.sh $(e) + env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e) linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e) -deploy: check-env # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo - tools/deploy.sh $(e) +deploy: check-env # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=qfixpt + make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/ fmt: download-versions # Apply the Go formatter to the code cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt); -- 2.45.2 From ac285e662b155de4d885e0fd3dc9da7fae7c85cd Mon Sep 17 00:00:00 2001 From: Ramiro Paz Date: Thu, 12 Mar 2026 12:31:05 -0300 Subject: [PATCH 2/3] fixes --- main.go | 5 +++++ src/app/model.go | 4 ++++ src/app/version/version.go | 19 +++++++++---------- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/main.go b/main.go index 7817b1c..d8c199b 100644 --- a/main.go +++ b/main.go @@ -178,6 +178,11 @@ func parseLogLevel(level string) (slog.Level, error) { func startRunner(runner, globalCfg, serviceCfg string) { var fn func(cfg app.Config) error + + if runner == "" { + runner = "service" + } + switch runner { case "service": fn = service.Runner diff --git a/src/app/model.go b/src/app/model.go index 1c437db..8079dbc 100644 --- a/src/app/model.go +++ b/src/app/model.go @@ -36,6 +36,10 @@ type Service struct { AuthorizedServices map[string]AuthorizedService `toml:"AuthorizedServices"` APIBasePort string EnableJWTAuth bool // Enable JWT authentication for service-to-service communication + FIX FIXConfig +} +type FIXConfig struct { + SettingsFile string // path to fix.cfg file } type ExtAuth struct { diff --git a/src/app/version/version.go b/src/app/version/version.go index 720a634..0ad9b56 100644 --- a/src/app/version/version.go +++ b/src/app/version/version.go @@ -3,7 +3,6 @@ package version import ( "fmt" - "os" "runtime" "strings" ) @@ -38,17 +37,17 @@ type EnvironmentType int //nolint:recvcheck // The methods of this are autogener var environment EnvironmentType //nolint:gochecknoglobals // Just keept this global to avoid having to create an instance func init() { - aux := os.Getenv(quantexEnvironment) - if aux == "" { - panic("QUANTEX_ENVIRONMENT is not set") - } + // aux := os.Getenv(quantexEnvironment) + // if aux == "" { + // panic("QUANTEX_ENVIRONMENT is not set") + // } - env, err := ParseEnvironmentType(aux) - if err != nil { - panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error()) - } + // env, err := ParseEnvironmentType(aux) + // if err != nil { + // panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error()) + // } - environment = env + environment = EnvironmentTypeDev } // Base returns the version base name -- 2.45.2 From f940f5a0f9de76f150460b534ff8e81278b1f3e6 Mon Sep 17 00:00:00 2001 From: Ramiro Paz Date: Thu, 12 Mar 2026 12:52:18 -0300 Subject: [PATCH 3/3] adding FIX stuff --- src/client/api/rest/controller.go | 93 +++++++++++++++++-- src/client/api/rest/model.go | 26 ++++++ src/client/api/rest/routes.go | 2 + src/client/api/rest/server.go | 4 +- src/client/fix/application.go | 137 ++++++++++++++++++++++++++++ src/client/fix/fix.go | 49 ++++++++++ src/client/fix/repository/memory.go | 58 ++++++++++++ src/cmd/service/service.go | 28 +++++- src/domain/execution_report.go | 34 +++++++ 9 files changed, 422 insertions(+), 9 deletions(-) create mode 100644 src/client/fix/application.go create mode 100644 src/client/fix/fix.go create mode 100644 src/client/fix/repository/memory.go create mode 100644 src/domain/execution_report.go diff --git a/src/client/api/rest/controller.go b/src/client/api/rest/controller.go index ae605ea..ecb72a9 100644 --- a/src/client/api/rest/controller.go +++ b/src/client/api/rest/controller.go @@ -34,18 +34,20 @@ type Controller struct { store *store.Store config Config notify domain.Notifier + orderRepo domain.OrderRepository authMutex deadlock.Mutex } func newController(pool *redis.Pool, userData app.UserDataProvider, - s *store.Store, config Config, n domain.Notifier, + s *store.Store, config Config, n domain.Notifier, repo domain.OrderRepository, ) *Controller { return &Controller{ - pool: pool, - userData: userData, - store: s, - config: config, - notify: n, + pool: pool, + userData: userData, + store: s, + config: config, + notify: n, + orderRepo: repo, } } @@ -288,3 +290,82 @@ func allowed(origin string, config Config) bool { return false } + +// GetExecutions godoc +// @Summary Get all execution reports +// @Description Returns all FIX ExecutionReport messages received by the initiator +// @Tags executions +// @Produce json +// @Success 200 {array} ExecutionReportResponse +// @Failure 500 {object} HTTPError +// @Router /executions [get] +func (cont *Controller) GetExecutions(ctx *gin.Context) { + setHeaders(ctx, cont.config) + + reports := cont.orderRepo.GetAll() + + response := make([]ExecutionReportResponse, 0, len(reports)) + for _, r := range reports { + response = append(response, ExecutionReportResponse{ + OrderID: r.OrderID, + ClOrdID: r.ClOrdID, + ExecID: r.ExecID, + ExecType: r.ExecType, + OrdStatus: r.OrdStatus, + Symbol: r.Symbol, + Side: r.Side, + OrderQty: r.OrderQty, + Price: r.Price, + LastPx: r.LastPx, + LastQty: r.LastQty, + CumQty: r.CumQty, + LeavesQty: r.LeavesQty, + AvgPx: r.AvgPx, + TransactTime: r.TransactTime, + Account: r.Account, + }) + } + + ctx.JSON(http.StatusOK, response) +} + +// GetExecutionByOrderID godoc +// @Summary Get execution report by OrderID +// @Description Returns the FIX ExecutionReport for the given OrderID +// @Tags executions +// @Produce json +// @Param orderID path string true "OrderID" +// @Success 200 {object} ExecutionReportResponse +// @Failure 404 {object} HTTPError +// @Router /executions/{orderID} [get] +func (cont *Controller) GetExecutionByOrderID(ctx *gin.Context) { + setHeaders(ctx, cont.config) + + orderID := ctx.Param("orderID") + + r, ok := cont.orderRepo.GetByOrderID(orderID) + if !ok { + ctx.JSON(http.StatusNotFound, HTTPError{Error: "execution report not found"}) + + return + } + + ctx.JSON(http.StatusOK, ExecutionReportResponse{ + OrderID: r.OrderID, + ClOrdID: r.ClOrdID, + ExecID: r.ExecID, + ExecType: r.ExecType, + OrdStatus: r.OrdStatus, + Symbol: r.Symbol, + Side: r.Side, + OrderQty: r.OrderQty, + Price: r.Price, + LastPx: r.LastPx, + LastQty: r.LastQty, + CumQty: r.CumQty, + LeavesQty: r.LeavesQty, + AvgPx: r.AvgPx, + TransactTime: r.TransactTime, + Account: r.Account, + }) +} diff --git a/src/client/api/rest/model.go b/src/client/api/rest/model.go index 26f90dd..714c504 100644 --- a/src/client/api/rest/model.go +++ b/src/client/api/rest/model.go @@ -1,5 +1,11 @@ package rest +import ( + "time" + + "github.com/shopspring/decimal" +) + type HTTPError struct { Error string } @@ -16,3 +22,23 @@ type Credentials struct { type Session struct { Email string } + +// ExecutionReportResponse is the REST representation of a FIX ExecutionReport. +type ExecutionReportResponse struct { + OrderID string `json:"orderID"` + ClOrdID string `json:"clOrdID"` + ExecID string `json:"execID"` + ExecType string `json:"execType"` + OrdStatus string `json:"ordStatus"` + Symbol string `json:"symbol"` + Side string `json:"side"` + OrderQty decimal.Decimal `json:"orderQty"` + Price decimal.Decimal `json:"price"` + LastPx decimal.Decimal `json:"lastPx"` + LastQty decimal.Decimal `json:"lastQty"` + CumQty decimal.Decimal `json:"cumQty"` + LeavesQty decimal.Decimal `json:"leavesQty"` + AvgPx decimal.Decimal `json:"avgPx"` + TransactTime time.Time `json:"transactTime"` + Account string `json:"account,omitempty"` +} diff --git a/src/client/api/rest/routes.go b/src/client/api/rest/routes.go index 444e17f..bb766d7 100644 --- a/src/client/api/rest/routes.go +++ b/src/client/api/rest/routes.go @@ -21,6 +21,8 @@ func SetRoutes(api *API) { qfixpt := v1.Group("/") qfixpt.Use(cont.AuthRequired) qfixpt.GET("/health", cont.HealthCheck) + qfixpt.GET("/executions", cont.GetExecutions) + qfixpt.GET("/executions/:orderID", cont.GetExecutionByOrderID) backoffice := qfixpt.Group("/backoffice") backoffice.Use(cont.BackOfficeUser) diff --git a/src/client/api/rest/server.go b/src/client/api/rest/server.go index 41fa1de..d1f6cd1 100644 --- a/src/client/api/rest/server.go +++ b/src/client/api/rest/server.go @@ -32,7 +32,7 @@ type Config struct { EnableJWTAuth bool } -func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier) *API { +func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier, repo domain.OrderRepository) *API { // Set up Gin var engine *gin.Engine if version.Environment() == version.EnvironmentTypeProd { @@ -58,7 +58,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, config Confi } api := &API{ - Controller: newController(NewPool(), userData, storeInstance, config, notify), + Controller: newController(NewPool(), userData, storeInstance, config, notify, repo), Router: engine, Port: config.Port, } diff --git a/src/client/fix/application.go b/src/client/fix/application.go new file mode 100644 index 0000000..ce28b80 --- /dev/null +++ b/src/client/fix/application.go @@ -0,0 +1,137 @@ +// Package fix implements the FIX protocol initiator and application handler. +package fix + +import ( + "log/slog" + + "quantex.com/qfixpt/quickfix" + "quantex.com/qfixpt/quickfix/gen/fix44/executionreport" + "quantex.com/qfixpt/src/domain" +) + +// Application implements quickfix.Application to handle incoming FIX messages. +type Application struct { + repo domain.OrderRepository +} + +// NewApplication creates a new FIX Application backed by the given repository. +func NewApplication(repo domain.OrderRepository) *Application { + return &Application{repo: repo} +} + +// OnCreate is called when a FIX session is created. +func (a *Application) OnCreate(sessionID quickfix.SessionID) { + slog.Info("FIX session created", "sessionID", sessionID) +} + +// OnLogon is called when a FIX session logs on successfully. +func (a *Application) OnLogon(sessionID quickfix.SessionID) { + slog.Info("FIX session logged on", "sessionID", sessionID) +} + +// OnLogout is called when a FIX session logs out or disconnects. +func (a *Application) OnLogout(sessionID quickfix.SessionID) { + slog.Info("FIX session logged out", "sessionID", sessionID) +} + +// ToAdmin is called before sending an admin message. +func (a *Application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {} + +// ToApp is called before sending an application message. +func (a *Application) ToApp(_ *quickfix.Message, _ quickfix.SessionID) error { + return nil +} + +// FromAdmin is called when an admin message is received. +func (a *Application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError { + return nil +} + +// FromApp is called when an application message is received. +// It routes ExecutionReport (MsgType=8) messages to the handler. +func (a *Application) FromApp(message *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError { + if message.IsMsgTypeOf("8") { + a.handleExecutionReport(message, sessionID) + } + + return nil +} + +func (a *Application) handleExecutionReport(msg *quickfix.Message, _ quickfix.SessionID) { + er := executionreport.FromMessage(msg) + + report := domain.ExecutionReport{} + + if v, err := er.GetOrderID(); err == nil { + report.OrderID = v + } + + if v, err := er.GetClOrdID(); err == nil { + report.ClOrdID = v + } + + if v, err := er.GetExecID(); err == nil { + report.ExecID = v + } + + if v, err := er.GetExecType(); err == nil { + report.ExecType = string(v) + } + + if v, err := er.GetOrdStatus(); err == nil { + report.OrdStatus = string(v) + } + + if v, err := er.GetSymbol(); err == nil { + report.Symbol = v + } + + if v, err := er.GetSide(); err == nil { + report.Side = string(v) + } + + if v, err := er.GetOrderQty(); err == nil { + report.OrderQty = v + } + + if v, err := er.GetPrice(); err == nil { + report.Price = v + } + + if v, err := er.GetLastPx(); err == nil { + report.LastPx = v + } + + if v, err := er.GetLastQty(); err == nil { + report.LastQty = v + } + + if v, err := er.GetCumQty(); err == nil { + report.CumQty = v + } + + if v, err := er.GetLeavesQty(); err == nil { + report.LeavesQty = v + } + + if v, err := er.GetAvgPx(); err == nil { + report.AvgPx = v + } + + if v, err := er.GetTransactTime(); err == nil { + report.TransactTime = v + } + + if v, err := er.GetAccount(); err == nil { + report.Account = v + } + + a.repo.Save(report) + + slog.Info("ExecutionReport stored", + "orderID", report.OrderID, + "execType", report.ExecType, + "ordStatus", report.OrdStatus, + "symbol", report.Symbol, + ) +} diff --git a/src/client/fix/fix.go b/src/client/fix/fix.go new file mode 100644 index 0000000..7041bbb --- /dev/null +++ b/src/client/fix/fix.go @@ -0,0 +1,49 @@ +package fix + +import ( + "fmt" + "os" + + "quantex.com/qfixpt/quickfix" + "quantex.com/qfixpt/src/domain" +) + +// Initiator wraps the quickfix Initiator lifecycle. +type Initiator struct { + inner *quickfix.Initiator +} + +// NewInitiator creates and starts a FIX initiator using the given settings file path. +func NewInitiator(settingsFile string, repo domain.OrderRepository) (*Initiator, error) { + f, err := os.Open(settingsFile) + if err != nil { + return nil, fmt.Errorf("opening FIX settings file %q: %w", settingsFile, err) + } + defer f.Close() + + settings, err := quickfix.ParseSettings(f) + if err != nil { + return nil, fmt.Errorf("parsing FIX settings: %w", err) + } + + app := NewApplication(repo) + storeFactory := quickfix.NewMemoryStoreFactory() + logFactory := quickfix.NewNullLogFactory() + + initiator, err := quickfix.NewInitiator(app, storeFactory, settings, logFactory) + if err != nil { + return nil, fmt.Errorf("creating FIX initiator: %w", err) + } + + return &Initiator{inner: initiator}, nil +} + +// Start begins connecting to the FIX counterparty. +func (i *Initiator) Start() error { + return i.inner.Start() +} + +// Stop gracefully disconnects all FIX sessions. +func (i *Initiator) Stop() { + i.inner.Stop() +} diff --git a/src/client/fix/repository/memory.go b/src/client/fix/repository/memory.go new file mode 100644 index 0000000..568ee28 --- /dev/null +++ b/src/client/fix/repository/memory.go @@ -0,0 +1,58 @@ +// Package repository provides in-memory storage implementations. +package repository + +import ( + "sync" + + "quantex.com/qfixpt/src/domain" +) + +// Memory is a thread-safe in-memory implementation of domain.OrderRepository. +type Memory struct { + mu sync.RWMutex + orders map[string]domain.ExecutionReport + ordered []string // maintains insertion order +} + +// NewMemory creates a new in-memory order repository. +func NewMemory() *Memory { + return &Memory{ + orders: make(map[string]domain.ExecutionReport), + ordered: []string{}, + } +} + +// Save stores an execution report, overwriting any existing entry with the same OrderID. +func (m *Memory) Save(report domain.ExecutionReport) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.orders[report.OrderID]; !exists { + m.ordered = append(m.ordered, report.OrderID) + } + + m.orders[report.OrderID] = report +} + +// GetAll returns all stored execution reports in insertion order. +func (m *Memory) GetAll() []domain.ExecutionReport { + m.mu.RLock() + defer m.mu.RUnlock() + + result := make([]domain.ExecutionReport, 0, len(m.ordered)) + for _, id := range m.ordered { + result = append(result, m.orders[id]) + } + + return result +} + +// GetByOrderID returns the execution report for the given OrderID. +func (m *Memory) GetByOrderID(orderID string) (domain.ExecutionReport, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + r, ok := m.orders[orderID] + + return r, ok +} diff --git a/src/cmd/service/service.go b/src/cmd/service/service.go index 6aef2cd..e96e155 100644 --- a/src/cmd/service/service.go +++ b/src/cmd/service/service.go @@ -8,6 +8,8 @@ import ( "quantex.com/qfixpt/src/app" "quantex.com/qfixpt/src/client/api/rest" "quantex.com/qfixpt/src/client/data" + "quantex.com/qfixpt/src/client/fix" + fixrepo "quantex.com/qfixpt/src/client/fix/repository" googlechat "quantex.com/qfixpt/src/client/notify/google" "quantex.com/qfixpt/src/client/store" "quantex.com/qfixpt/src/client/store/external" @@ -36,6 +38,26 @@ func Runner(cfg app.Config) error { return fmt.Errorf("error trying to create store %w", err) } + // In-memory repository shared between the FIX initiator and the REST API. + orderRepo := fixrepo.NewMemory() + + // Start the FIX initiator if a settings file is configured. + var fixInitiator *fix.Initiator + if cfg.FIX.SettingsFile != "" { + fixInitiator, err = fix.NewInitiator(cfg.FIX.SettingsFile, orderRepo) + if err != nil { + return fmt.Errorf("error creating FIX initiator: %w", err) + } + + if err = fixInitiator.Start(); err != nil { + return fmt.Errorf("error starting FIX initiator: %w", err) + } + + slog.Info("FIX initiator started", "settingsFile", cfg.FIX.SettingsFile) + } else { + slog.Warn("FIX.SettingsFile not configured — FIX initiator will not start") + } + userData := data.New() apiConfig := rest.Config{ @@ -46,10 +68,14 @@ func Runner(cfg app.Config) error { EnableJWTAuth: cfg.EnableJWTAuth, } - api := rest.New(userData, appStore, apiConfig, notify) + api := rest.New(userData, appStore, apiConfig, notify, orderRepo) api.Run() cmd.WaitForInterruptSignal(nil) + if fixInitiator != nil { + fixInitiator.Stop() + } + return nil } diff --git a/src/domain/execution_report.go b/src/domain/execution_report.go new file mode 100644 index 0000000..cc02535 --- /dev/null +++ b/src/domain/execution_report.go @@ -0,0 +1,34 @@ +package domain + +import ( + "time" + + "github.com/shopspring/decimal" +) + +// ExecutionReport represents a FIX ExecutionReport (MsgType=8) received from the counterparty. +type ExecutionReport struct { + OrderID string + ClOrdID string + ExecID string + ExecType string + OrdStatus string + Symbol string + Side string + OrderQty decimal.Decimal + Price decimal.Decimal + LastPx decimal.Decimal + LastQty decimal.Decimal + CumQty decimal.Decimal + LeavesQty decimal.Decimal + AvgPx decimal.Decimal + TransactTime time.Time + Account string +} + +// OrderRepository defines the interface for storing and retrieving execution reports. +type OrderRepository interface { + Save(report ExecutionReport) + GetAll() []ExecutionReport + GetByOrderID(orderID string) (ExecutionReport, bool) +} -- 2.45.2