adding FIX stuff

This commit is contained in:
Ramiro Paz
2026-03-12 12:52:18 -03:00
parent ac285e662b
commit f940f5a0f9
9 changed files with 422 additions and 9 deletions

View File

@ -34,18 +34,20 @@ type Controller struct {
store *store.Store
config Config
notify domain.Notifier
orderRepo domain.OrderRepository
authMutex deadlock.Mutex
}
func newController(pool *redis.Pool, userData app.UserDataProvider,
s *store.Store, config Config, n domain.Notifier,
s *store.Store, config Config, n domain.Notifier, repo domain.OrderRepository,
) *Controller {
return &Controller{
pool: pool,
userData: userData,
store: s,
config: config,
notify: n,
pool: pool,
userData: userData,
store: s,
config: config,
notify: n,
orderRepo: repo,
}
}
@ -288,3 +290,82 @@ func allowed(origin string, config Config) bool {
return false
}
// GetExecutions godoc
// @Summary Get all execution reports
// @Description Returns all FIX ExecutionReport messages received by the initiator
// @Tags executions
// @Produce json
// @Success 200 {array} ExecutionReportResponse
// @Failure 500 {object} HTTPError
// @Router /executions [get]
func (cont *Controller) GetExecutions(ctx *gin.Context) {
setHeaders(ctx, cont.config)
reports := cont.orderRepo.GetAll()
response := make([]ExecutionReportResponse, 0, len(reports))
for _, r := range reports {
response = append(response, ExecutionReportResponse{
OrderID: r.OrderID,
ClOrdID: r.ClOrdID,
ExecID: r.ExecID,
ExecType: r.ExecType,
OrdStatus: r.OrdStatus,
Symbol: r.Symbol,
Side: r.Side,
OrderQty: r.OrderQty,
Price: r.Price,
LastPx: r.LastPx,
LastQty: r.LastQty,
CumQty: r.CumQty,
LeavesQty: r.LeavesQty,
AvgPx: r.AvgPx,
TransactTime: r.TransactTime,
Account: r.Account,
})
}
ctx.JSON(http.StatusOK, response)
}
// GetExecutionByOrderID godoc
// @Summary Get execution report by OrderID
// @Description Returns the FIX ExecutionReport for the given OrderID
// @Tags executions
// @Produce json
// @Param orderID path string true "OrderID"
// @Success 200 {object} ExecutionReportResponse
// @Failure 404 {object} HTTPError
// @Router /executions/{orderID} [get]
func (cont *Controller) GetExecutionByOrderID(ctx *gin.Context) {
setHeaders(ctx, cont.config)
orderID := ctx.Param("orderID")
r, ok := cont.orderRepo.GetByOrderID(orderID)
if !ok {
ctx.JSON(http.StatusNotFound, HTTPError{Error: "execution report not found"})
return
}
ctx.JSON(http.StatusOK, ExecutionReportResponse{
OrderID: r.OrderID,
ClOrdID: r.ClOrdID,
ExecID: r.ExecID,
ExecType: r.ExecType,
OrdStatus: r.OrdStatus,
Symbol: r.Symbol,
Side: r.Side,
OrderQty: r.OrderQty,
Price: r.Price,
LastPx: r.LastPx,
LastQty: r.LastQty,
CumQty: r.CumQty,
LeavesQty: r.LeavesQty,
AvgPx: r.AvgPx,
TransactTime: r.TransactTime,
Account: r.Account,
})
}

View File

@ -1,5 +1,11 @@
package rest
import (
"time"
"github.com/shopspring/decimal"
)
type HTTPError struct {
Error string
}
@ -16,3 +22,23 @@ type Credentials struct {
type Session struct {
Email string
}
// ExecutionReportResponse is the REST representation of a FIX ExecutionReport.
type ExecutionReportResponse struct {
OrderID string `json:"orderID"`
ClOrdID string `json:"clOrdID"`
ExecID string `json:"execID"`
ExecType string `json:"execType"`
OrdStatus string `json:"ordStatus"`
Symbol string `json:"symbol"`
Side string `json:"side"`
OrderQty decimal.Decimal `json:"orderQty"`
Price decimal.Decimal `json:"price"`
LastPx decimal.Decimal `json:"lastPx"`
LastQty decimal.Decimal `json:"lastQty"`
CumQty decimal.Decimal `json:"cumQty"`
LeavesQty decimal.Decimal `json:"leavesQty"`
AvgPx decimal.Decimal `json:"avgPx"`
TransactTime time.Time `json:"transactTime"`
Account string `json:"account,omitempty"`
}

View File

@ -21,6 +21,8 @@ func SetRoutes(api *API) {
qfixpt := v1.Group("/")
qfixpt.Use(cont.AuthRequired)
qfixpt.GET("/health", cont.HealthCheck)
qfixpt.GET("/executions", cont.GetExecutions)
qfixpt.GET("/executions/:orderID", cont.GetExecutionByOrderID)
backoffice := qfixpt.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser)

View File

@ -32,7 +32,7 @@ type Config struct {
EnableJWTAuth bool
}
func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier) *API {
func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier, repo domain.OrderRepository) *API {
// Set up Gin
var engine *gin.Engine
if version.Environment() == version.EnvironmentTypeProd {
@ -58,7 +58,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, config Confi
}
api := &API{
Controller: newController(NewPool(), userData, storeInstance, config, notify),
Controller: newController(NewPool(), userData, storeInstance, config, notify, repo),
Router: engine,
Port: config.Port,
}

View File

@ -0,0 +1,137 @@
// Package fix implements the FIX protocol initiator and application handler.
package fix
import (
"log/slog"
"quantex.com/qfixpt/quickfix"
"quantex.com/qfixpt/quickfix/gen/fix44/executionreport"
"quantex.com/qfixpt/src/domain"
)
// Application implements quickfix.Application to handle incoming FIX messages.
type Application struct {
repo domain.OrderRepository
}
// NewApplication creates a new FIX Application backed by the given repository.
func NewApplication(repo domain.OrderRepository) *Application {
return &Application{repo: repo}
}
// OnCreate is called when a FIX session is created.
func (a *Application) OnCreate(sessionID quickfix.SessionID) {
slog.Info("FIX session created", "sessionID", sessionID)
}
// OnLogon is called when a FIX session logs on successfully.
func (a *Application) OnLogon(sessionID quickfix.SessionID) {
slog.Info("FIX session logged on", "sessionID", sessionID)
}
// OnLogout is called when a FIX session logs out or disconnects.
func (a *Application) OnLogout(sessionID quickfix.SessionID) {
slog.Info("FIX session logged out", "sessionID", sessionID)
}
// ToAdmin is called before sending an admin message.
func (a *Application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {}
// ToApp is called before sending an application message.
func (a *Application) ToApp(_ *quickfix.Message, _ quickfix.SessionID) error {
return nil
}
// FromAdmin is called when an admin message is received.
func (a *Application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError {
return nil
}
// FromApp is called when an application message is received.
// It routes ExecutionReport (MsgType=8) messages to the handler.
func (a *Application) FromApp(message *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError {
if message.IsMsgTypeOf("8") {
a.handleExecutionReport(message, sessionID)
}
return nil
}
func (a *Application) handleExecutionReport(msg *quickfix.Message, _ quickfix.SessionID) {
er := executionreport.FromMessage(msg)
report := domain.ExecutionReport{}
if v, err := er.GetOrderID(); err == nil {
report.OrderID = v
}
if v, err := er.GetClOrdID(); err == nil {
report.ClOrdID = v
}
if v, err := er.GetExecID(); err == nil {
report.ExecID = v
}
if v, err := er.GetExecType(); err == nil {
report.ExecType = string(v)
}
if v, err := er.GetOrdStatus(); err == nil {
report.OrdStatus = string(v)
}
if v, err := er.GetSymbol(); err == nil {
report.Symbol = v
}
if v, err := er.GetSide(); err == nil {
report.Side = string(v)
}
if v, err := er.GetOrderQty(); err == nil {
report.OrderQty = v
}
if v, err := er.GetPrice(); err == nil {
report.Price = v
}
if v, err := er.GetLastPx(); err == nil {
report.LastPx = v
}
if v, err := er.GetLastQty(); err == nil {
report.LastQty = v
}
if v, err := er.GetCumQty(); err == nil {
report.CumQty = v
}
if v, err := er.GetLeavesQty(); err == nil {
report.LeavesQty = v
}
if v, err := er.GetAvgPx(); err == nil {
report.AvgPx = v
}
if v, err := er.GetTransactTime(); err == nil {
report.TransactTime = v
}
if v, err := er.GetAccount(); err == nil {
report.Account = v
}
a.repo.Save(report)
slog.Info("ExecutionReport stored",
"orderID", report.OrderID,
"execType", report.ExecType,
"ordStatus", report.OrdStatus,
"symbol", report.Symbol,
)
}

49
src/client/fix/fix.go Normal file
View File

@ -0,0 +1,49 @@
package fix
import (
"fmt"
"os"
"quantex.com/qfixpt/quickfix"
"quantex.com/qfixpt/src/domain"
)
// Initiator wraps the quickfix Initiator lifecycle.
type Initiator struct {
inner *quickfix.Initiator
}
// NewInitiator creates and starts a FIX initiator using the given settings file path.
func NewInitiator(settingsFile string, repo domain.OrderRepository) (*Initiator, error) {
f, err := os.Open(settingsFile)
if err != nil {
return nil, fmt.Errorf("opening FIX settings file %q: %w", settingsFile, err)
}
defer f.Close()
settings, err := quickfix.ParseSettings(f)
if err != nil {
return nil, fmt.Errorf("parsing FIX settings: %w", err)
}
app := NewApplication(repo)
storeFactory := quickfix.NewMemoryStoreFactory()
logFactory := quickfix.NewNullLogFactory()
initiator, err := quickfix.NewInitiator(app, storeFactory, settings, logFactory)
if err != nil {
return nil, fmt.Errorf("creating FIX initiator: %w", err)
}
return &Initiator{inner: initiator}, nil
}
// Start begins connecting to the FIX counterparty.
func (i *Initiator) Start() error {
return i.inner.Start()
}
// Stop gracefully disconnects all FIX sessions.
func (i *Initiator) Stop() {
i.inner.Stop()
}

View File

@ -0,0 +1,58 @@
// Package repository provides in-memory storage implementations.
package repository
import (
"sync"
"quantex.com/qfixpt/src/domain"
)
// Memory is a thread-safe in-memory implementation of domain.OrderRepository.
type Memory struct {
mu sync.RWMutex
orders map[string]domain.ExecutionReport
ordered []string // maintains insertion order
}
// NewMemory creates a new in-memory order repository.
func NewMemory() *Memory {
return &Memory{
orders: make(map[string]domain.ExecutionReport),
ordered: []string{},
}
}
// Save stores an execution report, overwriting any existing entry with the same OrderID.
func (m *Memory) Save(report domain.ExecutionReport) {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.orders[report.OrderID]; !exists {
m.ordered = append(m.ordered, report.OrderID)
}
m.orders[report.OrderID] = report
}
// GetAll returns all stored execution reports in insertion order.
func (m *Memory) GetAll() []domain.ExecutionReport {
m.mu.RLock()
defer m.mu.RUnlock()
result := make([]domain.ExecutionReport, 0, len(m.ordered))
for _, id := range m.ordered {
result = append(result, m.orders[id])
}
return result
}
// GetByOrderID returns the execution report for the given OrderID.
func (m *Memory) GetByOrderID(orderID string) (domain.ExecutionReport, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
r, ok := m.orders[orderID]
return r, ok
}

View File

@ -8,6 +8,8 @@ import (
"quantex.com/qfixpt/src/app"
"quantex.com/qfixpt/src/client/api/rest"
"quantex.com/qfixpt/src/client/data"
"quantex.com/qfixpt/src/client/fix"
fixrepo "quantex.com/qfixpt/src/client/fix/repository"
googlechat "quantex.com/qfixpt/src/client/notify/google"
"quantex.com/qfixpt/src/client/store"
"quantex.com/qfixpt/src/client/store/external"
@ -36,6 +38,26 @@ func Runner(cfg app.Config) error {
return fmt.Errorf("error trying to create store %w", err)
}
// In-memory repository shared between the FIX initiator and the REST API.
orderRepo := fixrepo.NewMemory()
// Start the FIX initiator if a settings file is configured.
var fixInitiator *fix.Initiator
if cfg.FIX.SettingsFile != "" {
fixInitiator, err = fix.NewInitiator(cfg.FIX.SettingsFile, orderRepo)
if err != nil {
return fmt.Errorf("error creating FIX initiator: %w", err)
}
if err = fixInitiator.Start(); err != nil {
return fmt.Errorf("error starting FIX initiator: %w", err)
}
slog.Info("FIX initiator started", "settingsFile", cfg.FIX.SettingsFile)
} else {
slog.Warn("FIX.SettingsFile not configured — FIX initiator will not start")
}
userData := data.New()
apiConfig := rest.Config{
@ -46,10 +68,14 @@ func Runner(cfg app.Config) error {
EnableJWTAuth: cfg.EnableJWTAuth,
}
api := rest.New(userData, appStore, apiConfig, notify)
api := rest.New(userData, appStore, apiConfig, notify, orderRepo)
api.Run()
cmd.WaitForInterruptSignal(nil)
if fixInitiator != nil {
fixInitiator.Stop()
}
return nil
}

View File

@ -0,0 +1,34 @@
package domain
import (
"time"
"github.com/shopspring/decimal"
)
// ExecutionReport represents a FIX ExecutionReport (MsgType=8) received from the counterparty.
type ExecutionReport struct {
OrderID string
ClOrdID string
ExecID string
ExecType string
OrdStatus string
Symbol string
Side string
OrderQty decimal.Decimal
Price decimal.Decimal
LastPx decimal.Decimal
LastQty decimal.Decimal
CumQty decimal.Decimal
LeavesQty decimal.Decimal
AvgPx decimal.Decimal
TransactTime time.Time
Account string
}
// OrderRepository defines the interface for storing and retrieving execution reports.
type OrderRepository interface {
Save(report ExecutionReport)
GetAll() []ExecutionReport
GetByOrderID(orderID string) (ExecutionReport, bool)
}