diff --git a/src/client/api/rest/controller.go b/src/client/api/rest/controller.go index ae605ea..ecb72a9 100644 --- a/src/client/api/rest/controller.go +++ b/src/client/api/rest/controller.go @@ -34,18 +34,20 @@ type Controller struct { store *store.Store config Config notify domain.Notifier + orderRepo domain.OrderRepository authMutex deadlock.Mutex } func newController(pool *redis.Pool, userData app.UserDataProvider, - s *store.Store, config Config, n domain.Notifier, + s *store.Store, config Config, n domain.Notifier, repo domain.OrderRepository, ) *Controller { return &Controller{ - pool: pool, - userData: userData, - store: s, - config: config, - notify: n, + pool: pool, + userData: userData, + store: s, + config: config, + notify: n, + orderRepo: repo, } } @@ -288,3 +290,82 @@ func allowed(origin string, config Config) bool { return false } + +// GetExecutions godoc +// @Summary Get all execution reports +// @Description Returns all FIX ExecutionReport messages received by the initiator +// @Tags executions +// @Produce json +// @Success 200 {array} ExecutionReportResponse +// @Failure 500 {object} HTTPError +// @Router /executions [get] +func (cont *Controller) GetExecutions(ctx *gin.Context) { + setHeaders(ctx, cont.config) + + reports := cont.orderRepo.GetAll() + + response := make([]ExecutionReportResponse, 0, len(reports)) + for _, r := range reports { + response = append(response, ExecutionReportResponse{ + OrderID: r.OrderID, + ClOrdID: r.ClOrdID, + ExecID: r.ExecID, + ExecType: r.ExecType, + OrdStatus: r.OrdStatus, + Symbol: r.Symbol, + Side: r.Side, + OrderQty: r.OrderQty, + Price: r.Price, + LastPx: r.LastPx, + LastQty: r.LastQty, + CumQty: r.CumQty, + LeavesQty: r.LeavesQty, + AvgPx: r.AvgPx, + TransactTime: r.TransactTime, + Account: r.Account, + }) + } + + ctx.JSON(http.StatusOK, response) +} + +// GetExecutionByOrderID godoc +// @Summary Get execution report by OrderID +// @Description Returns the FIX ExecutionReport for the given OrderID +// @Tags executions +// @Produce json +// @Param orderID path string true "OrderID" +// @Success 200 {object} ExecutionReportResponse +// @Failure 404 {object} HTTPError +// @Router /executions/{orderID} [get] +func (cont *Controller) GetExecutionByOrderID(ctx *gin.Context) { + setHeaders(ctx, cont.config) + + orderID := ctx.Param("orderID") + + r, ok := cont.orderRepo.GetByOrderID(orderID) + if !ok { + ctx.JSON(http.StatusNotFound, HTTPError{Error: "execution report not found"}) + + return + } + + ctx.JSON(http.StatusOK, ExecutionReportResponse{ + OrderID: r.OrderID, + ClOrdID: r.ClOrdID, + ExecID: r.ExecID, + ExecType: r.ExecType, + OrdStatus: r.OrdStatus, + Symbol: r.Symbol, + Side: r.Side, + OrderQty: r.OrderQty, + Price: r.Price, + LastPx: r.LastPx, + LastQty: r.LastQty, + CumQty: r.CumQty, + LeavesQty: r.LeavesQty, + AvgPx: r.AvgPx, + TransactTime: r.TransactTime, + Account: r.Account, + }) +} diff --git a/src/client/api/rest/model.go b/src/client/api/rest/model.go index 26f90dd..714c504 100644 --- a/src/client/api/rest/model.go +++ b/src/client/api/rest/model.go @@ -1,5 +1,11 @@ package rest +import ( + "time" + + "github.com/shopspring/decimal" +) + type HTTPError struct { Error string } @@ -16,3 +22,23 @@ type Credentials struct { type Session struct { Email string } + +// ExecutionReportResponse is the REST representation of a FIX ExecutionReport. +type ExecutionReportResponse struct { + OrderID string `json:"orderID"` + ClOrdID string `json:"clOrdID"` + ExecID string `json:"execID"` + ExecType string `json:"execType"` + OrdStatus string `json:"ordStatus"` + Symbol string `json:"symbol"` + Side string `json:"side"` + OrderQty decimal.Decimal `json:"orderQty"` + Price decimal.Decimal `json:"price"` + LastPx decimal.Decimal `json:"lastPx"` + LastQty decimal.Decimal `json:"lastQty"` + CumQty decimal.Decimal `json:"cumQty"` + LeavesQty decimal.Decimal `json:"leavesQty"` + AvgPx decimal.Decimal `json:"avgPx"` + TransactTime time.Time `json:"transactTime"` + Account string `json:"account,omitempty"` +} diff --git a/src/client/api/rest/routes.go b/src/client/api/rest/routes.go index 444e17f..bb766d7 100644 --- a/src/client/api/rest/routes.go +++ b/src/client/api/rest/routes.go @@ -21,6 +21,8 @@ func SetRoutes(api *API) { qfixpt := v1.Group("/") qfixpt.Use(cont.AuthRequired) qfixpt.GET("/health", cont.HealthCheck) + qfixpt.GET("/executions", cont.GetExecutions) + qfixpt.GET("/executions/:orderID", cont.GetExecutionByOrderID) backoffice := qfixpt.Group("/backoffice") backoffice.Use(cont.BackOfficeUser) diff --git a/src/client/api/rest/server.go b/src/client/api/rest/server.go index 41fa1de..d1f6cd1 100644 --- a/src/client/api/rest/server.go +++ b/src/client/api/rest/server.go @@ -32,7 +32,7 @@ type Config struct { EnableJWTAuth bool } -func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier) *API { +func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier, repo domain.OrderRepository) *API { // Set up Gin var engine *gin.Engine if version.Environment() == version.EnvironmentTypeProd { @@ -58,7 +58,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, config Confi } api := &API{ - Controller: newController(NewPool(), userData, storeInstance, config, notify), + Controller: newController(NewPool(), userData, storeInstance, config, notify, repo), Router: engine, Port: config.Port, } diff --git a/src/client/fix/application.go b/src/client/fix/application.go new file mode 100644 index 0000000..ce28b80 --- /dev/null +++ b/src/client/fix/application.go @@ -0,0 +1,137 @@ +// Package fix implements the FIX protocol initiator and application handler. +package fix + +import ( + "log/slog" + + "quantex.com/qfixpt/quickfix" + "quantex.com/qfixpt/quickfix/gen/fix44/executionreport" + "quantex.com/qfixpt/src/domain" +) + +// Application implements quickfix.Application to handle incoming FIX messages. +type Application struct { + repo domain.OrderRepository +} + +// NewApplication creates a new FIX Application backed by the given repository. +func NewApplication(repo domain.OrderRepository) *Application { + return &Application{repo: repo} +} + +// OnCreate is called when a FIX session is created. +func (a *Application) OnCreate(sessionID quickfix.SessionID) { + slog.Info("FIX session created", "sessionID", sessionID) +} + +// OnLogon is called when a FIX session logs on successfully. +func (a *Application) OnLogon(sessionID quickfix.SessionID) { + slog.Info("FIX session logged on", "sessionID", sessionID) +} + +// OnLogout is called when a FIX session logs out or disconnects. +func (a *Application) OnLogout(sessionID quickfix.SessionID) { + slog.Info("FIX session logged out", "sessionID", sessionID) +} + +// ToAdmin is called before sending an admin message. +func (a *Application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {} + +// ToApp is called before sending an application message. +func (a *Application) ToApp(_ *quickfix.Message, _ quickfix.SessionID) error { + return nil +} + +// FromAdmin is called when an admin message is received. +func (a *Application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError { + return nil +} + +// FromApp is called when an application message is received. +// It routes ExecutionReport (MsgType=8) messages to the handler. +func (a *Application) FromApp(message *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError { + if message.IsMsgTypeOf("8") { + a.handleExecutionReport(message, sessionID) + } + + return nil +} + +func (a *Application) handleExecutionReport(msg *quickfix.Message, _ quickfix.SessionID) { + er := executionreport.FromMessage(msg) + + report := domain.ExecutionReport{} + + if v, err := er.GetOrderID(); err == nil { + report.OrderID = v + } + + if v, err := er.GetClOrdID(); err == nil { + report.ClOrdID = v + } + + if v, err := er.GetExecID(); err == nil { + report.ExecID = v + } + + if v, err := er.GetExecType(); err == nil { + report.ExecType = string(v) + } + + if v, err := er.GetOrdStatus(); err == nil { + report.OrdStatus = string(v) + } + + if v, err := er.GetSymbol(); err == nil { + report.Symbol = v + } + + if v, err := er.GetSide(); err == nil { + report.Side = string(v) + } + + if v, err := er.GetOrderQty(); err == nil { + report.OrderQty = v + } + + if v, err := er.GetPrice(); err == nil { + report.Price = v + } + + if v, err := er.GetLastPx(); err == nil { + report.LastPx = v + } + + if v, err := er.GetLastQty(); err == nil { + report.LastQty = v + } + + if v, err := er.GetCumQty(); err == nil { + report.CumQty = v + } + + if v, err := er.GetLeavesQty(); err == nil { + report.LeavesQty = v + } + + if v, err := er.GetAvgPx(); err == nil { + report.AvgPx = v + } + + if v, err := er.GetTransactTime(); err == nil { + report.TransactTime = v + } + + if v, err := er.GetAccount(); err == nil { + report.Account = v + } + + a.repo.Save(report) + + slog.Info("ExecutionReport stored", + "orderID", report.OrderID, + "execType", report.ExecType, + "ordStatus", report.OrdStatus, + "symbol", report.Symbol, + ) +} diff --git a/src/client/fix/fix.go b/src/client/fix/fix.go new file mode 100644 index 0000000..7041bbb --- /dev/null +++ b/src/client/fix/fix.go @@ -0,0 +1,49 @@ +package fix + +import ( + "fmt" + "os" + + "quantex.com/qfixpt/quickfix" + "quantex.com/qfixpt/src/domain" +) + +// Initiator wraps the quickfix Initiator lifecycle. +type Initiator struct { + inner *quickfix.Initiator +} + +// NewInitiator creates and starts a FIX initiator using the given settings file path. +func NewInitiator(settingsFile string, repo domain.OrderRepository) (*Initiator, error) { + f, err := os.Open(settingsFile) + if err != nil { + return nil, fmt.Errorf("opening FIX settings file %q: %w", settingsFile, err) + } + defer f.Close() + + settings, err := quickfix.ParseSettings(f) + if err != nil { + return nil, fmt.Errorf("parsing FIX settings: %w", err) + } + + app := NewApplication(repo) + storeFactory := quickfix.NewMemoryStoreFactory() + logFactory := quickfix.NewNullLogFactory() + + initiator, err := quickfix.NewInitiator(app, storeFactory, settings, logFactory) + if err != nil { + return nil, fmt.Errorf("creating FIX initiator: %w", err) + } + + return &Initiator{inner: initiator}, nil +} + +// Start begins connecting to the FIX counterparty. +func (i *Initiator) Start() error { + return i.inner.Start() +} + +// Stop gracefully disconnects all FIX sessions. +func (i *Initiator) Stop() { + i.inner.Stop() +} diff --git a/src/client/fix/repository/memory.go b/src/client/fix/repository/memory.go new file mode 100644 index 0000000..568ee28 --- /dev/null +++ b/src/client/fix/repository/memory.go @@ -0,0 +1,58 @@ +// Package repository provides in-memory storage implementations. +package repository + +import ( + "sync" + + "quantex.com/qfixpt/src/domain" +) + +// Memory is a thread-safe in-memory implementation of domain.OrderRepository. +type Memory struct { + mu sync.RWMutex + orders map[string]domain.ExecutionReport + ordered []string // maintains insertion order +} + +// NewMemory creates a new in-memory order repository. +func NewMemory() *Memory { + return &Memory{ + orders: make(map[string]domain.ExecutionReport), + ordered: []string{}, + } +} + +// Save stores an execution report, overwriting any existing entry with the same OrderID. +func (m *Memory) Save(report domain.ExecutionReport) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.orders[report.OrderID]; !exists { + m.ordered = append(m.ordered, report.OrderID) + } + + m.orders[report.OrderID] = report +} + +// GetAll returns all stored execution reports in insertion order. +func (m *Memory) GetAll() []domain.ExecutionReport { + m.mu.RLock() + defer m.mu.RUnlock() + + result := make([]domain.ExecutionReport, 0, len(m.ordered)) + for _, id := range m.ordered { + result = append(result, m.orders[id]) + } + + return result +} + +// GetByOrderID returns the execution report for the given OrderID. +func (m *Memory) GetByOrderID(orderID string) (domain.ExecutionReport, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + r, ok := m.orders[orderID] + + return r, ok +} diff --git a/src/cmd/service/service.go b/src/cmd/service/service.go index 6aef2cd..e96e155 100644 --- a/src/cmd/service/service.go +++ b/src/cmd/service/service.go @@ -8,6 +8,8 @@ import ( "quantex.com/qfixpt/src/app" "quantex.com/qfixpt/src/client/api/rest" "quantex.com/qfixpt/src/client/data" + "quantex.com/qfixpt/src/client/fix" + fixrepo "quantex.com/qfixpt/src/client/fix/repository" googlechat "quantex.com/qfixpt/src/client/notify/google" "quantex.com/qfixpt/src/client/store" "quantex.com/qfixpt/src/client/store/external" @@ -36,6 +38,26 @@ func Runner(cfg app.Config) error { return fmt.Errorf("error trying to create store %w", err) } + // In-memory repository shared between the FIX initiator and the REST API. + orderRepo := fixrepo.NewMemory() + + // Start the FIX initiator if a settings file is configured. + var fixInitiator *fix.Initiator + if cfg.FIX.SettingsFile != "" { + fixInitiator, err = fix.NewInitiator(cfg.FIX.SettingsFile, orderRepo) + if err != nil { + return fmt.Errorf("error creating FIX initiator: %w", err) + } + + if err = fixInitiator.Start(); err != nil { + return fmt.Errorf("error starting FIX initiator: %w", err) + } + + slog.Info("FIX initiator started", "settingsFile", cfg.FIX.SettingsFile) + } else { + slog.Warn("FIX.SettingsFile not configured — FIX initiator will not start") + } + userData := data.New() apiConfig := rest.Config{ @@ -46,10 +68,14 @@ func Runner(cfg app.Config) error { EnableJWTAuth: cfg.EnableJWTAuth, } - api := rest.New(userData, appStore, apiConfig, notify) + api := rest.New(userData, appStore, apiConfig, notify, orderRepo) api.Run() cmd.WaitForInterruptSignal(nil) + if fixInitiator != nil { + fixInitiator.Stop() + } + return nil } diff --git a/src/domain/execution_report.go b/src/domain/execution_report.go new file mode 100644 index 0000000..cc02535 --- /dev/null +++ b/src/domain/execution_report.go @@ -0,0 +1,34 @@ +package domain + +import ( + "time" + + "github.com/shopspring/decimal" +) + +// ExecutionReport represents a FIX ExecutionReport (MsgType=8) received from the counterparty. +type ExecutionReport struct { + OrderID string + ClOrdID string + ExecID string + ExecType string + OrdStatus string + Symbol string + Side string + OrderQty decimal.Decimal + Price decimal.Decimal + LastPx decimal.Decimal + LastQty decimal.Decimal + CumQty decimal.Decimal + LeavesQty decimal.Decimal + AvgPx decimal.Decimal + TransactTime time.Time + Account string +} + +// OrderRepository defines the interface for storing and retrieving execution reports. +type OrderRepository interface { + Save(report ExecutionReport) + GetAll() []ExecutionReport + GetByOrderID(orderID string) (ExecutionReport, bool) +}