changing to initiator

This commit is contained in:
Ramiro Paz
2026-03-10 16:28:09 -03:00
parent 557c04436d
commit 5053bfa9af
8 changed files with 58 additions and 86 deletions

View File

@ -56,13 +56,13 @@ build: check-env swag vendor only-build # Build a native version. Set e=environm
only-build: check-env only-build: check-env
@echo "Building for $(e) environment..." @echo "Building for $(e) environment..."
env OUT_PATH=$(DEFAULT_OUT_PATH) tools/build.sh $(e) env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e) env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
deploy: check-env # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo deploy: # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=dpl
tools/deploy.sh $(e) make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/
fmt: download-versions # Apply the Go formatter to the code fmt: download-versions # Apply the Go formatter to the code
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt); cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);

View File

@ -178,6 +178,10 @@ func parseLogLevel(level string) (slog.Level, error) {
func startRunner(runner, globalCfg, serviceCfg string) { func startRunner(runner, globalCfg, serviceCfg string) {
var fn func(cfg app.Config) error var fn func(cfg app.Config) error
if runner == "" {
runner = "service"
}
switch runner { switch runner {
case "service": case "service":
fn = service.Runner fn = service.Runner

View File

@ -3,7 +3,6 @@ package version
import ( import (
"fmt" "fmt"
"os"
"runtime" "runtime"
"strings" "strings"
) )
@ -38,17 +37,17 @@ type EnvironmentType int //nolint:recvcheck // The methods of this are autogener
var environment EnvironmentType //nolint:gochecknoglobals // Just keept this global to avoid having to create an instance var environment EnvironmentType //nolint:gochecknoglobals // Just keept this global to avoid having to create an instance
func init() { func init() {
aux := os.Getenv(quantexEnvironment) // aux := os.Getenv(EnvironmentTypeDev)
if aux == "" { // if aux == "" {
panic("QUANTEX_ENVIRONMENT is not set") // panic("QUANTEX_ENVIRONMENT is not set")
} // }
env, err := ParseEnvironmentType(aux) // env, err := ParseEnvironmentType(aux)
if err != nil { // if err != nil {
panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error()) // panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error())
} // }
environment = env environment = EnvironmentTypeDev
} }
// Base returns the version base name // Base returns the version base name

View File

@ -1,29 +1,26 @@
// Package fix implements the QuickFIX acceptor application. // Package fix implements the QuickFIX initiator application.
package fix package fix
import ( import (
"log/slog" "log/slog"
"time"
"quantex.com/qfixdpl/quickfix" "quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/newordersingle" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
"quantex.com/qfixdpl/src/domain"
) )
type application struct { type application struct {
router *quickfix.MessageRouter router *quickfix.MessageRouter
orderStore domain.OrderStore onLogon func(quickfix.SessionID)
onLogon func(quickfix.SessionID) onLogout func(quickfix.SessionID)
onLogout func(quickfix.SessionID) onQuote func(quote.Quote, quickfix.SessionID)
} }
func newApplication(orderStore domain.OrderStore) *application { func newApplication() *application {
app := &application{ app := &application{
router: quickfix.NewMessageRouter(), router: quickfix.NewMessageRouter(),
orderStore: orderStore,
} }
app.router.AddRoute(newordersingle.Route(app.handleNewOrderSingle)) app.router.AddRoute(quote.Route(app.handleQuote))
return app return app
} }
@ -33,14 +30,14 @@ func (a *application) OnCreate(sessionID quickfix.SessionID) {
} }
func (a *application) OnLogon(sessionID quickfix.SessionID) { func (a *application) OnLogon(sessionID quickfix.SessionID) {
slog.Info("FIX client logged on", "session", sessionID.String()) slog.Info("FIX session logged on", "session", sessionID.String())
if a.onLogon != nil { if a.onLogon != nil {
a.onLogon(sessionID) a.onLogon(sessionID)
} }
} }
func (a *application) OnLogout(sessionID quickfix.SessionID) { func (a *application) OnLogout(sessionID quickfix.SessionID) {
slog.Info("FIX client logged out", "session", sessionID.String()) slog.Info("FIX session logged out", "session", sessionID.String())
if a.onLogout != nil { if a.onLogout != nil {
a.onLogout(sessionID) a.onLogout(sessionID)
} }
@ -58,48 +55,19 @@ func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionI
return a.router.Route(msg, sessionID) return a.router.Route(msg, sessionID)
} }
func (a *application) handleNewOrderSingle(msg newordersingle.NewOrderSingle, sessionID quickfix.SessionID) quickfix.MessageRejectError { func (a *application) handleQuote(msg quote.Quote, sessionID quickfix.SessionID) quickfix.MessageRejectError {
clOrdID, err := msg.GetClOrdID() quoteID, err := msg.GetQuoteID()
if err != nil { if err != nil {
return err return err
} }
symbol, err := msg.GetSymbol() symbol, _ := msg.GetSymbol()
if err != nil {
return err slog.Info("Quote received", "quoteID", quoteID, "symbol", symbol, "session", sessionID.String())
if a.onQuote != nil {
a.onQuote(msg, sessionID)
} }
side, err := msg.GetSide()
if err != nil {
return err
}
ordType, err := msg.GetOrdType()
if err != nil {
return err
}
orderQty, err := msg.GetOrderQty()
if err != nil {
return err
}
price, _ := msg.GetPrice() // Price is optional for some OrdTypes
order := domain.Order{
ClOrdID: clOrdID,
Symbol: symbol,
Side: string(side),
OrdType: string(ordType),
OrderQty: orderQty,
Price: price,
SessionID: sessionID.String(),
ReceivedAt: time.Now(),
}
a.orderStore.SaveOrder(order)
slog.Info("NewOrderSingle received", "clOrdID", clOrdID, "symbol", symbol, "side", order.Side)
return nil return nil
} }

View File

@ -17,9 +17,9 @@ import (
"quantex.com/qfixdpl/src/domain" "quantex.com/qfixdpl/src/domain"
) )
// Manager wraps the QuickFIX acceptor and implements domain.FIXSender. // Manager wraps the QuickFIX initiator and implements domain.FIXSender.
type Manager struct { type Manager struct {
acceptor *quickfix.Acceptor initiator *quickfix.Initiator
app *application app *application
sessionsMu sync.RWMutex sessionsMu sync.RWMutex
sessions map[string]quickfix.SessionID sessions map[string]quickfix.SessionID
@ -38,7 +38,7 @@ func NewManager(cfg app.FIXConfig, orderStore domain.OrderStore, notify domain.N
} }
func (m *Manager) Start() error { func (m *Manager) Start() error {
fixApp := newApplication(m.orderStore) fixApp := newApplication()
fixApp.onLogon = m.onLogon fixApp.onLogon = m.onLogon
fixApp.onLogout = m.onLogout fixApp.onLogout = m.onLogout
m.app = fixApp m.app = fixApp
@ -57,26 +57,26 @@ func (m *Manager) Start() error {
storeFactory := quickfix.NewMemoryStoreFactory() storeFactory := quickfix.NewMemoryStoreFactory()
logFactory := quickfix.NewNullLogFactory() logFactory := quickfix.NewNullLogFactory()
acceptor, err := quickfix.NewAcceptor(fixApp, storeFactory, settings, logFactory) initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
if err != nil { if err != nil {
return fmt.Errorf("creating FIX acceptor: %w", err) return fmt.Errorf("creating FIX initiator: %w", err)
} }
m.acceptor = acceptor m.initiator = initiator
if err = m.acceptor.Start(); err != nil { if err = m.initiator.Start(); err != nil {
return fmt.Errorf("starting FIX acceptor: %w", err) return fmt.Errorf("starting FIX initiator: %w", err)
} }
slog.Info("FIX acceptor started", "settings", m.cfg.SettingsFile) slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile)
return nil return nil
} }
func (m *Manager) Stop() { func (m *Manager) Stop() {
if m.acceptor != nil { if m.initiator != nil {
m.acceptor.Stop() m.initiator.Stop()
slog.Info("FIX acceptor stopped") slog.Info("FIX initiator stopped")
} }
} }
@ -94,17 +94,18 @@ func (m *Manager) onLogout(sessionID quickfix.SessionID) {
// SendQuote implements domain.FIXSender. // SendQuote implements domain.FIXSender.
func (m *Manager) SendQuote(clOrdID, quoteID, symbol, currency string, bidPx, offerPx, bidSize, offerSize decimal.Decimal) error { func (m *Manager) SendQuote(clOrdID, quoteID, symbol, currency string, bidPx, offerPx, bidSize, offerSize decimal.Decimal) error {
order, ok := m.orderStore.GetOrderByClOrdID(clOrdID)
if !ok {
return fmt.Errorf("order not found: %s", clOrdID)
}
m.sessionsMu.RLock() m.sessionsMu.RLock()
sessionID, ok := m.sessions[order.SessionID] var sessionID quickfix.SessionID
var ok bool
for _, sid := range m.sessions {
sessionID = sid
ok = true
break
}
m.sessionsMu.RUnlock() m.sessionsMu.RUnlock()
if !ok { if !ok {
return fmt.Errorf("session not active for order %s (session %s)", clOrdID, order.SessionID) return fmt.Errorf("no active FIX session")
} }
q := quote.New( q := quote.New(

View File

@ -42,7 +42,7 @@ if COMMIT_MSG=$(QUANTEX_ENVIRONMENT=$ENV "${OUT_PATH}/qfixdpl" -v 2>/dev/null);
echo "$COMMIT_MSG" echo "$COMMIT_MSG"
else else
echo "---------------------------------" echo "---------------------------------"
echo "Skeleton" echo "QFIXDPL"
echo "Built at: ${BUILT_TIME}" echo "Built at: ${BUILT_TIME}"
echo "Branch: ${BUILD_BRANCH}" echo "Branch: ${BUILD_BRANCH}"
echo "SHA: ${BUILD_HASH}" echo "SHA: ${BUILD_HASH}"

View File

@ -3,7 +3,7 @@
set -e set -e
read -r -p "Issuer: " ISSUER read -r -p "Issuer: " ISSUER
read -r -p "Service (e.g. SKELETON): " SERVICE read -r -p "Service (e.g. QFIXDPL): " SERVICE
read -r -p "Token: " TOKEN read -r -p "Token: " TOKEN
read -r -p "Expire (e.g. 24h) [none]: " EXPIRY read -r -p "Expire (e.g. 24h) [none]: " EXPIRY

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash
systemctl daemon-reload systemctl daemon-reload
systemctl restart skeleton.service systemctl restart qfixdpl.service