Files
qfixdpl/src/client/fix/manager.go
2026-03-10 17:38:47 -03:00

169 lines
3.7 KiB
Go

package fix
import (
"log/slog"
"os"
"sync"
"time"
"github.com/rs/zerolog/log"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/enum"
"quantex.com/qfixdpl/quickfix/gen/field"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
"quantex.com/qfixdpl/quickfix/store/file"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
type Manager struct {
initiator *quickfix.Initiator
app *application
sessionsMu sync.RWMutex
sessions map[string]quickfix.SessionID
orderStore domain.OrderStore
notify domain.Notifier
cfg app.FIXConfig
}
func NewManager(cfg app.FIXConfig, orderStore domain.OrderStore, notify domain.Notifier) *Manager {
return &Manager{
sessions: make(map[string]quickfix.SessionID),
orderStore: orderStore,
notify: notify,
cfg: cfg,
}
}
func (m *Manager) Start() error {
fixApp := newApplication()
fixApp.onLogon = m.onLogon
fixApp.onLogout = m.onLogout
m.app = fixApp
f, err := os.Open(m.cfg.SettingsFile)
if err != nil {
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err)
log.Error().Msg(err.Error())
return err
}
defer f.Close()
settings, err := quickfix.ParseSettings(f)
if err != nil {
err = tracerr.Errorf("error parsing FIX settings: %s", err)
log.Error().Msg(err.Error())
return err
}
storeFactory := file.NewStoreFactory(settings)
logFactory, err := quickfix.NewFileLogFactory(settings)
if err != nil {
err = tracerr.Errorf("error creating file log factory: %s", err)
log.Error().Msg(err.Error())
return err
}
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
if err != nil {
err = tracerr.Errorf("error creating FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
}
m.initiator = initiator
if err = m.initiator.Start(); err != nil {
err = tracerr.Errorf("error starting FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
}
slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile)
return nil
}
func (m *Manager) Stop() {
if m.initiator != nil {
m.initiator.Stop()
slog.Info("FIX initiator stopped")
}
}
func (m *Manager) onLogon(sessionID quickfix.SessionID) {
m.sessionsMu.Lock()
m.sessions[sessionID.String()] = sessionID
m.sessionsMu.Unlock()
}
func (m *Manager) onLogout(sessionID quickfix.SessionID) {
m.sessionsMu.Lock()
delete(m.sessions, sessionID.String())
m.sessionsMu.Unlock()
}
// SendQuote implements domain.FIXSender.
func (m *Manager) SendQuote(clOrdID, quoteID, symbol, currency string, bidPx, offerPx, bidSize, offerSize decimal.Decimal) error {
m.sessionsMu.RLock()
var sessionID quickfix.SessionID
var ok bool
for _, sid := range m.sessions {
sessionID = sid
ok = true
break
}
m.sessionsMu.RUnlock()
if !ok {
err := tracerr.Errorf("error sending quote: no active FIX session")
log.Error().Msg(err.Error())
return err
}
q := quote.New(
field.NewQuoteID(quoteID),
field.NewQuoteType(enum.QuoteType_INDICATIVE),
field.NewTransactTime(time.Now()),
)
q.SetSymbol(symbol)
q.SetQuoteID(quoteID)
if currency != "" {
q.SetCurrency(currency)
}
q.SetBidPx(bidPx, 8)
q.SetOfferPx(offerPx, 8)
if !bidSize.IsZero() {
q.SetBidSize(bidSize, 8)
}
if !offerSize.IsZero() {
q.SetOfferSize(offerSize, 8)
}
if err := quickfix.SendToTarget(q, sessionID); err != nil {
err = tracerr.Errorf("error sending FIX quote: %s", err)
log.Error().Msg(err.Error())
return err
}
slog.Info("Quote sent", "clOrdID", clOrdID, "quoteID", quoteID, "symbol", symbol)
return nil
}