Compare commits
8 Commits
QFIXDPL-4/
...
QFIXDPL-7/
| Author | SHA1 | Date | |
|---|---|---|---|
| 96bf917191 | |||
| 298e9c39e3 | |||
| 7cc4a96a03 | |||
| 1676909cbf | |||
| d06433e0f5 | |||
| 0f3ac0dd8d | |||
| 4270284362 | |||
| 6e46fde5d2 |
@ -36,7 +36,11 @@ type Service struct {
|
||||
AuthorizedServices map[string]AuthorizedService `toml:"AuthorizedServices"`
|
||||
APIBasePort string
|
||||
EnableJWTAuth bool // Enable JWT authentication for service-to-service communication
|
||||
FIX FIXConfig
|
||||
// ServiceAPIKey is the shared secret that authenticates qbymarouter (and any
|
||||
// other internal service) when posting to /qfixdpl/v1/quotes and
|
||||
// /qfixdpl/v1/messages. Must match the DPLAPIKey configured on the caller.
|
||||
ServiceAPIKey string
|
||||
FIX FIXConfig
|
||||
}
|
||||
|
||||
type FIXConfig struct {
|
||||
|
||||
@ -328,6 +328,41 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
|
||||
ctx.JSON(http.StatusOK, logs)
|
||||
}
|
||||
|
||||
// AllMessages godoc
|
||||
// @Summary List FIX application messages of the day after the caller's last-seen sequence
|
||||
// @Description Returns today's FIX application messages (no admin: heartbeats/logon/logout/etc.) with MsgSeqNum greater than the caller's last-seen sequence per direction. "In" is the last MsgSeqNum the caller received on the IN side; "Out" is the same for OUT. Pass 0 to receive everything on that side. Sorted by CreatedAt ascending.
|
||||
// @Tags fix
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param body body AllMessagesRequest true "API key and last-seen MsgSeqNum per direction"
|
||||
// @Success 200 {array} domain.Message
|
||||
// @Failure 400 {object} HTTPError
|
||||
// @Failure 401 {object} HTTPError
|
||||
// @Router /qfixdpl/v1/messages [post]
|
||||
func (cont *Controller) AllMessages(ctx *gin.Context) {
|
||||
setHeaders(ctx, cont.config)
|
||||
|
||||
var req AllMessagesRequest
|
||||
if err := ctx.ShouldBindJSON(&req); err != nil {
|
||||
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if !cont.checkServiceAPIKey(req.APIKey) {
|
||||
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"})
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, cont.tradeProvider.GetAllMessages(req.In, req.Out))
|
||||
}
|
||||
|
||||
// checkServiceAPIKey returns true when the provided key matches the configured
|
||||
// service-to-service shared secret. Empty configured key is always rejected
|
||||
// to avoid open authentication when misconfigured.
|
||||
func (cont *Controller) checkServiceAPIKey(key string) bool {
|
||||
return cont.config.ServiceAPIKey != "" && key == cont.config.ServiceAPIKey
|
||||
}
|
||||
|
||||
// GetPendingQuoteRequests godoc
|
||||
// @Summary List pending QuoteRequests
|
||||
// @Description Returns all QuoteRequests received from TW that have not been quoted yet by the dealer
|
||||
@ -354,12 +389,19 @@ func (cont *Controller) GetPendingQuoteRequests(ctx *gin.Context) {
|
||||
// @Failure 500 {object} HTTPError
|
||||
// @Router /qfixdpl/v1/quotes [post]
|
||||
func (cont *Controller) SendQuote(ctx *gin.Context) {
|
||||
setHeaders(ctx, cont.config)
|
||||
|
||||
var req SendQuoteRequest
|
||||
if err := ctx.ShouldBindJSON(&req); err != nil {
|
||||
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if !cont.checkServiceAPIKey(req.APIKey) {
|
||||
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"})
|
||||
return
|
||||
}
|
||||
|
||||
price, err := decimal.NewFromString(req.Price)
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusBadRequest, HTTPError{Error: "invalid price: " + err.Error()})
|
||||
@ -382,3 +424,46 @@ func (cont *Controller) SendQuote(ctx *gin.Context) {
|
||||
ctx.JSON(http.StatusOK, Msg{Text: "Quote sent"})
|
||||
}
|
||||
|
||||
// CancelQuote godoc
|
||||
// @Summary Cancel a Quote for a QuoteRequest
|
||||
// @Description Builds and sends a QuoteCancel (35=Z) to TW for an existing QuoteRequest
|
||||
// @Tags fix
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param body body CancelQuoteRequest true "Quote cancel request"
|
||||
// @Success 200 {object} Msg
|
||||
// @Failure 400 {object} HTTPError
|
||||
// @Failure 401 {object} HTTPError
|
||||
// @Failure 404 {object} HTTPError
|
||||
// @Failure 409 {object} HTTPError
|
||||
// @Failure 500 {object} HTTPError
|
||||
// @Router /qfixdpl/v1/quotes/cancel [post]
|
||||
func (cont *Controller) CancelQuote(ctx *gin.Context) {
|
||||
setHeaders(ctx, cont.config)
|
||||
|
||||
var req CancelQuoteRequest
|
||||
if err := ctx.ShouldBindJSON(&req); err != nil {
|
||||
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if !cont.checkServiceAPIKey(req.APIKey) {
|
||||
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"})
|
||||
return
|
||||
}
|
||||
|
||||
if err := cont.tradeProvider.CancelQuote(req.QuoteReqID, req.Text); err != nil {
|
||||
msg := err.Error()
|
||||
switch {
|
||||
case strings.Contains(msg, "not found"):
|
||||
ctx.JSON(http.StatusNotFound, HTTPError{Error: "quoteReqID not found"})
|
||||
case strings.Contains(msg, "cannot respond"):
|
||||
ctx.JSON(http.StatusConflict, HTTPError{Error: "quote request cannot be cancelled"})
|
||||
default:
|
||||
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "failed to cancel quote"})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, Msg{Text: "Quote cancel sent"})
|
||||
}
|
||||
|
||||
@ -18,6 +18,19 @@ type Session struct {
|
||||
}
|
||||
|
||||
type SendQuoteRequest struct {
|
||||
APIKey string `json:"APIKey" binding:"required"`
|
||||
QuoteReqID string `json:"QuoteReqID" binding:"required"`
|
||||
Price string `json:"Price" binding:"required" example:"99.6"`
|
||||
}
|
||||
|
||||
type CancelQuoteRequest struct {
|
||||
APIKey string `json:"APIKey" binding:"required"`
|
||||
QuoteReqID string `json:"QuoteReqID" binding:"required"`
|
||||
Text string `json:"Text,omitempty"`
|
||||
}
|
||||
|
||||
type AllMessagesRequest struct {
|
||||
APIKey string
|
||||
In int
|
||||
Out int
|
||||
}
|
||||
|
||||
@ -24,7 +24,12 @@ func SetRoutes(api *API) {
|
||||
qfixdpl.GET("/trades", cont.GetTrades)
|
||||
qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs)
|
||||
qfixdpl.GET("/quote-requests", cont.GetPendingQuoteRequests)
|
||||
qfixdpl.POST("/quotes", cont.SendQuote)
|
||||
|
||||
// services group: API-key auth via body, no session cookie required.
|
||||
services := v1.Group("/")
|
||||
services.POST("/messages", cont.AllMessages)
|
||||
services.POST("/quotes", cont.SendQuote)
|
||||
services.POST("/quotes/cancel", cont.CancelQuote)
|
||||
|
||||
backoffice := qfixdpl.Group("/backoffice")
|
||||
backoffice.Use(cont.BackOfficeUser)
|
||||
|
||||
@ -22,6 +22,8 @@ type TradeProvider interface {
|
||||
GetTrades() []domain.ListTrade
|
||||
GetPendingQuoteRequests() []domain.ListTrade
|
||||
SendQuote(quoteReqID string, price decimal.Decimal) error
|
||||
CancelQuote(quoteReqID, text string) error
|
||||
GetAllMessages(inSeq, outSeq int) []domain.Message
|
||||
}
|
||||
|
||||
const RedisMaxIdle = 3000 // In ms
|
||||
@ -38,6 +40,10 @@ type Config struct {
|
||||
AuthorizedServices map[string]app.AuthorizedService `toml:"AuthorizedServices"`
|
||||
Port string
|
||||
EnableJWTAuth bool
|
||||
// ServiceAPIKey authenticates internal services (qbymarouter, etc.) calling
|
||||
// /qfixdpl/v1/quotes and /qfixdpl/v1/messages. Compared against the APIKey
|
||||
// field in the request body.
|
||||
ServiceAPIKey string
|
||||
}
|
||||
|
||||
func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {
|
||||
|
||||
@ -1,12 +1,16 @@
|
||||
package fix
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"github.com/shopspring/decimal"
|
||||
|
||||
"quantex.com/qfixdpl/quickfix"
|
||||
@ -17,9 +21,11 @@ import (
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotecancel"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport"
|
||||
"quantex.com/qfixdpl/quickfix/gen/tag"
|
||||
filelog "quantex.com/qfixdpl/quickfix/log/file"
|
||||
"quantex.com/qfixdpl/quickfix/store/file"
|
||||
"quantex.com/qfixdpl/src/app"
|
||||
@ -34,6 +40,8 @@ type listTrade struct {
|
||||
Price decimal.Decimal
|
||||
}
|
||||
|
||||
const defaultQuoteCancelText = "Quote withdrawn by dealer"
|
||||
|
||||
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
|
||||
type Manager struct {
|
||||
initiator *quickfix.Initiator
|
||||
@ -42,6 +50,8 @@ type Manager struct {
|
||||
sessions map[string]quickfix.SessionID
|
||||
tradesMu sync.RWMutex
|
||||
trades map[string]*listTrade
|
||||
messagesMu sync.RWMutex
|
||||
messages []domain.Message
|
||||
store domain.PersistenceStore
|
||||
notify domain.Notifier
|
||||
cfg app.FIXConfig
|
||||
@ -52,6 +62,7 @@ func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.
|
||||
return &Manager{
|
||||
sessions: make(map[string]quickfix.SessionID),
|
||||
trades: make(map[string]*listTrade),
|
||||
messages: make([]domain.Message, 0),
|
||||
store: store,
|
||||
notify: notify,
|
||||
cfg: cfg,
|
||||
@ -84,6 +95,11 @@ func (m *Manager) Start() error {
|
||||
slog.Error(err.Error())
|
||||
}
|
||||
|
||||
if err := m.loadTodayMessages(); err != nil {
|
||||
err = tracerr.Errorf("failed to load today messages from DB, starting with empty list: %w", err)
|
||||
slog.Error(err.Error())
|
||||
}
|
||||
|
||||
f, err := os.Open(m.cfg.SettingsFile)
|
||||
if err != nil {
|
||||
err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err)
|
||||
@ -255,16 +271,6 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
|
||||
Quoted: false,
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
// Persist incoming QuoteRequest.
|
||||
m.persistMessage(quoteReqID, parsed)
|
||||
|
||||
// Persist outgoing QuoteStatusReport.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
|
||||
"OwnerTraderID": ownerTraderID,
|
||||
}))
|
||||
}
|
||||
|
||||
// handleQuoteAck handles an incoming QuoteAck (35=CW).
|
||||
@ -273,16 +279,10 @@ func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.Sessi
|
||||
status, _ := msg.GetQuoteAckStatus()
|
||||
text, _ := msg.GetText()
|
||||
|
||||
m.persistMessage(quoteReqID, parseQuoteAck(msg, m.dict))
|
||||
|
||||
if status != enum.QuoteAckStatus_ACCEPTED {
|
||||
err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text)
|
||||
slog.Error(err.Error())
|
||||
|
||||
m.tradesMu.Lock()
|
||||
delete(m.trades, quoteReqID)
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -318,22 +318,9 @@ func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID
|
||||
}
|
||||
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
|
||||
|
||||
// Persist incoming QuoteResponse.
|
||||
m.persistMessage(quoteReqID, parseQuoteResponse(msg, m.dict))
|
||||
|
||||
// Persist outgoing ACK.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]any{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteRespID": quoteRespID,
|
||||
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
|
||||
}))
|
||||
|
||||
// _TRDSUMM is the final message — clean up the trade.
|
||||
if isTrdSumm {
|
||||
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
|
||||
m.tradesMu.Lock()
|
||||
delete(m.trades, quoteReqID)
|
||||
m.tradesMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@ -372,25 +359,10 @@ func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, ses
|
||||
slog.Info("Trade summary received from TW, cleaning up",
|
||||
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
|
||||
|
||||
m.tradesMu.Lock()
|
||||
delete(m.trades, clOrdID)
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
case execType == enum.ExecType_TRADE:
|
||||
slog.Info("Trade result received from TW",
|
||||
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
|
||||
}
|
||||
|
||||
// Persist incoming ExecutionReport.
|
||||
m.persistMessage(clOrdID, parseExecutionReport(msg, m.dict))
|
||||
|
||||
// Persist outgoing ExecutionAck.
|
||||
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]any{
|
||||
"OrderID": orderID,
|
||||
"ExecID": execID,
|
||||
"ClOrdID": clOrdID,
|
||||
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
|
||||
}))
|
||||
}
|
||||
|
||||
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
|
||||
@ -428,9 +400,7 @@ func toDomainListTrade(t *listTrade) domain.ListTrade {
|
||||
out := domain.ListTrade{
|
||||
QuoteRequest: t.QuoteRequest,
|
||||
}
|
||||
if !t.Price.IsZero() {
|
||||
out.Price = t.Price.String()
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
@ -444,12 +414,6 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
|
||||
slog.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
// if t.Quoted {
|
||||
// m.tradesMu.Unlock()
|
||||
// err := tracerr.Errorf("SendQuote: quote already sent for quoteReqID %s", quoteReqID)
|
||||
// slog.Error(err.Error())
|
||||
// return err
|
||||
// }
|
||||
|
||||
sessionID := t.SessionID
|
||||
if sessionID == (quickfix.SessionID{}) {
|
||||
@ -530,16 +494,91 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
|
||||
|
||||
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String())
|
||||
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]any{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteID": quoteID,
|
||||
"Symbol": symbol,
|
||||
"Side": string(side),
|
||||
"Price": price.String(),
|
||||
"OrderQty": orderQty.String(),
|
||||
"Currency": currency,
|
||||
"SettlDate": settlDate,
|
||||
}))
|
||||
return nil
|
||||
}
|
||||
|
||||
// CancelQuote builds and sends a QuoteCancel (35=Z) for an existing QuoteRequest.
|
||||
func (m *Manager) CancelQuote(quoteReqID, text string) error {
|
||||
m.tradesMu.RLock()
|
||||
t, ok := m.trades[quoteReqID]
|
||||
if !ok {
|
||||
m.tradesMu.RUnlock()
|
||||
err := tracerr.Errorf("CancelQuote: quoteReqID %s not found", quoteReqID)
|
||||
slog.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
sessionID := t.SessionID
|
||||
if sessionID == (quickfix.SessionID{}) {
|
||||
sessionID = m.anyActiveSessionID()
|
||||
if sessionID == (quickfix.SessionID{}) {
|
||||
m.tradesMu.RUnlock()
|
||||
err := tracerr.Errorf("CancelQuote: no active FIX session for quoteReqID %s", quoteReqID)
|
||||
slog.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
quoteRequestBody := t.QuoteRequest.Body
|
||||
m.tradesMu.RUnlock()
|
||||
|
||||
ownerTraderID := quoteRequestString(quoteRequestBody, "OwnerTraderID")
|
||||
canRespond, hasCanRespond, rawCanRespond := quoteRequestCanRespond(quoteRequestBody)
|
||||
canRespondLogValue := rawCanRespond
|
||||
if canRespondLogValue == "" {
|
||||
canRespondLogValue = "missing"
|
||||
}
|
||||
|
||||
if !hasCanRespond {
|
||||
slog.Warn("CancelQuote: CanRespond missing or unparseable, sending QuoteCancel anyway",
|
||||
"quoteReqID", quoteReqID,
|
||||
"canRespond", canRespondLogValue,
|
||||
)
|
||||
} else if !canRespond {
|
||||
err := tracerr.Errorf("CancelQuote: quoteReqID %s cannot respond", quoteReqID)
|
||||
slog.Error(err.Error(),
|
||||
"quoteReqID", quoteReqID,
|
||||
"canRespond", canRespondLogValue,
|
||||
)
|
||||
return err
|
||||
} else {
|
||||
slog.Info("CancelQuote: CanRespond allows QuoteCancel",
|
||||
"quoteReqID", quoteReqID,
|
||||
"canRespond", canRespondLogValue,
|
||||
)
|
||||
}
|
||||
|
||||
quoteID := uuid.NewV4().String()
|
||||
text = strings.TrimSpace(text)
|
||||
if text == "" {
|
||||
text = defaultQuoteCancelText
|
||||
}
|
||||
|
||||
qc := quotecancel.New(
|
||||
field.NewQuoteReqID(quoteReqID),
|
||||
field.NewQuoteCancelType(enum.QuoteCancelType_CANCEL_SPECIFIED_SINGLE_QUOTE),
|
||||
)
|
||||
qc.SetQuoteID(quoteID)
|
||||
qc.SetTransactTime(time.Now().UTC())
|
||||
qc.SetText(text)
|
||||
|
||||
if ownerTraderID != "" {
|
||||
qc.SetOwnerTraderID(ownerTraderID)
|
||||
}
|
||||
|
||||
if sendErr := quickfix.SendToTarget(qc, sessionID); sendErr != nil {
|
||||
sendErr = tracerr.Errorf("CancelQuote: failed to send quote cancel (quoteReqID=%s, quoteID=%s): %w", quoteReqID, quoteID, sendErr)
|
||||
slog.Error(sendErr.Error())
|
||||
return sendErr
|
||||
}
|
||||
|
||||
slog.Info("QuoteCancel sent",
|
||||
"quoteReqID", quoteReqID,
|
||||
"quoteID", quoteID,
|
||||
"canRespond", canRespondLogValue,
|
||||
"ownerTraderID", ownerTraderID,
|
||||
"text", text,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -549,11 +588,22 @@ func firstGroup(body map[string]any, name string) map[string]any {
|
||||
if body == nil {
|
||||
return nil
|
||||
}
|
||||
groups, ok := body[name].([]map[string]any)
|
||||
if !ok || len(groups) == 0 {
|
||||
return nil
|
||||
|
||||
switch groups := body[name].(type) {
|
||||
case []map[string]any:
|
||||
if len(groups) == 0 {
|
||||
return nil
|
||||
}
|
||||
return groups[0]
|
||||
case []any:
|
||||
if len(groups) == 0 {
|
||||
return nil
|
||||
}
|
||||
group, _ := groups[0].(map[string]any)
|
||||
return group
|
||||
}
|
||||
return groups[0]
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getString reads a string value from a body map, tolerating nil maps and missing keys.
|
||||
@ -567,6 +617,53 @@ func getString(body map[string]any, name string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func quoteRequestString(body map[string]any, name string) string {
|
||||
if v := getString(body, name); v != "" {
|
||||
return v
|
||||
}
|
||||
|
||||
return getString(firstGroup(body, "NoRelatedSym"), name)
|
||||
}
|
||||
|
||||
func quoteRequestCanRespond(body map[string]any) (bool, bool, string) {
|
||||
if value, ok := body["CanRespond"]; ok {
|
||||
return parseCanRespondValue(value)
|
||||
}
|
||||
if value, ok := firstGroup(body, "NoRelatedSym")["CanRespond"]; ok {
|
||||
return parseCanRespondValue(value)
|
||||
}
|
||||
|
||||
return false, false, ""
|
||||
}
|
||||
|
||||
func parseCanRespondValue(value any) (bool, bool, string) {
|
||||
switch v := value.(type) {
|
||||
case bool:
|
||||
if v {
|
||||
return true, true, "true"
|
||||
}
|
||||
return false, true, "false"
|
||||
case string:
|
||||
normalized := strings.ToUpper(strings.TrimSpace(v))
|
||||
switch normalized {
|
||||
case "Y", "YES", "TRUE", "1":
|
||||
return true, true, v
|
||||
case "N", "NO", "FALSE", "0":
|
||||
return false, true, v
|
||||
default:
|
||||
return false, false, v
|
||||
}
|
||||
case int:
|
||||
return v != 0, true, fmt.Sprint(v)
|
||||
case int64:
|
||||
return v != 0, true, fmt.Sprint(v)
|
||||
case float64:
|
||||
return v != 0, true, fmt.Sprint(v)
|
||||
default:
|
||||
return false, false, ""
|
||||
}
|
||||
}
|
||||
|
||||
// getDecimal reads a decimal value from a body map. Numeric FIX types come through
|
||||
// as strings (e.g. "10000"); INT-typed counts may be int. Both are accepted.
|
||||
func getDecimal(body map[string]any, name string) decimal.Decimal {
|
||||
@ -592,7 +689,11 @@ func (m *Manager) anyActiveSessionID() quickfix.SessionID {
|
||||
return quickfix.SessionID{}
|
||||
}
|
||||
|
||||
// handleRawMessage persists raw FIX message strings to the logs table.
|
||||
// handleRawMessage is the single ingest path for every application-level FIX message
|
||||
// (admin messages — Logon, Logout, Heartbeat, TestRequest, ResendRequest, SequenceReset —
|
||||
// go through ToAdmin/FromAdmin and never reach this callback). It persists the raw
|
||||
// envelope to the logs table, builds a structured Message and saves it to
|
||||
// the messages table, and appends to the in-memory list.
|
||||
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
|
||||
quoteReqID := extractIdentifier(msg)
|
||||
|
||||
@ -603,17 +704,77 @@ func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
|
||||
err = tracerr.Errorf("failed to persist raw log: %w", err)
|
||||
slog.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// persistMessage saves a structured FIX message to the messages table.
|
||||
func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) {
|
||||
if err := m.store.SaveMessage(domain.TradeMessage{
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: fixJSON,
|
||||
}); err != nil {
|
||||
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", fixJSON.MsgType, quoteReqID, err)
|
||||
msgTypeBytes, _ := msg.Header.GetBytes(tag.MsgType)
|
||||
msgType := string(msgTypeBytes)
|
||||
senderCompID, msgSeqNum, sendingTime := extractHeaderMeta(msg)
|
||||
|
||||
fixJSON := buildFixMessageJSON(direction, msgType, quoteReqID, msg, m.dict)
|
||||
|
||||
stored := domain.Message{
|
||||
ID: uuid.NewV4().String(),
|
||||
SenderCompID: senderCompID,
|
||||
MsgSeqNum: msgSeqNum,
|
||||
SendingTime: sendingTime,
|
||||
CreatedAt: time.Now(),
|
||||
JMessage: fixJSON,
|
||||
}
|
||||
|
||||
if err := m.store.SaveMessage(stored); err != nil {
|
||||
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", msgType, quoteReqID, err)
|
||||
slog.Error(err.Error())
|
||||
}
|
||||
|
||||
m.messagesMu.Lock()
|
||||
m.messages = append(m.messages, stored)
|
||||
m.messagesMu.Unlock()
|
||||
}
|
||||
|
||||
// GetAllMessages returns today's FIX application messages with MsgSeqNum greater than
|
||||
// the caller's last-seen sequence per direction (inSeq for IN, outSeq for OUT), sorted
|
||||
// ascending by CreatedAt. Passing 0 for either cursor returns all messages on that side.
|
||||
func (m *Manager) GetAllMessages(inSeq, outSeq int) []domain.Message {
|
||||
m.messagesMu.RLock()
|
||||
log.Info().Msgf("request received, inSeq: %d, outSeq: %d", inSeq, outSeq)
|
||||
|
||||
filtered := make([]domain.Message, 0, len(m.messages))
|
||||
for _, msg := range m.messages {
|
||||
switch msg.JMessage.Direction {
|
||||
case "IN":
|
||||
if msg.MsgSeqNum > inSeq {
|
||||
filtered = append(filtered, msg)
|
||||
}
|
||||
case "OUT":
|
||||
if msg.MsgSeqNum > outSeq {
|
||||
filtered = append(filtered, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
m.messagesMu.RUnlock()
|
||||
|
||||
sort.Slice(filtered, func(i, j int) bool { return filtered[i].CreatedAt.Before(filtered[j].CreatedAt) })
|
||||
|
||||
log.Info().Msgf("messages sent: %d", len(filtered))
|
||||
|
||||
return filtered
|
||||
}
|
||||
|
||||
// loadTodayMessages rebuilds the in-memory message list from today's rows in the DB.
|
||||
// Must be called before the FIX initiator starts so live ingest doesn't race with replay.
|
||||
func (m *Manager) loadTodayMessages() error {
|
||||
messages, err := m.store.GetTodayMessages()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.messagesMu.Lock()
|
||||
m.messages = messages
|
||||
m.messagesMu.Unlock()
|
||||
|
||||
slog.Info("today messages loaded", "count", len(messages))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadActiveTrades reconstructs active trades from today's messages in the database.
|
||||
@ -626,9 +787,11 @@ func (m *Manager) loadActiveTrades() error {
|
||||
activeTrades := make(map[string]*listTrade)
|
||||
|
||||
for _, msg := range messages {
|
||||
quoteReqID := msg.JMessage.QuoteReqID
|
||||
|
||||
switch msg.JMessage.MsgType {
|
||||
case "R": // QuoteRequest -> trade is born
|
||||
if !strings.HasPrefix(msg.QuoteReqID, "LST_") {
|
||||
if !strings.HasPrefix(quoteReqID, "LST_") {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -640,31 +803,15 @@ func (m *Manager) loadActiveTrades() error {
|
||||
continue
|
||||
}
|
||||
|
||||
activeTrades[msg.QuoteReqID] = &listTrade{
|
||||
activeTrades[quoteReqID] = &listTrade{
|
||||
QuoteRequest: msg.JMessage,
|
||||
}
|
||||
|
||||
case "S": // Outgoing Quote — dealer has already quoted this trade
|
||||
if t, ok := activeTrades[msg.QuoteReqID]; ok {
|
||||
if t, ok := activeTrades[quoteReqID]; ok {
|
||||
t.Quoted = true
|
||||
t.Price = getDecimal(msg.JMessage.Body, "Price")
|
||||
}
|
||||
|
||||
case "CW": // QuoteAck — if rejected, trade is dead
|
||||
if getString(msg.JMessage.Body, "QuoteAckStatus") != string(enum.QuoteAckStatus_ACCEPTED) {
|
||||
delete(activeTrades, msg.QuoteReqID)
|
||||
}
|
||||
|
||||
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
|
||||
if strings.HasSuffix(getString(msg.JMessage.Body, "QuoteRespID"), "_TRDSUMM") {
|
||||
delete(activeTrades, msg.QuoteReqID)
|
||||
}
|
||||
|
||||
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
|
||||
body := msg.JMessage.Body
|
||||
if strings.Contains(getString(body, "ExecID"), "_TRDSUMM") {
|
||||
delete(activeTrades, getString(body, "ClOrdID"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
82
src/client/fix/manager_cancel_test.go
Normal file
82
src/client/fix/manager_cancel_test.go
Normal file
@ -0,0 +1,82 @@
|
||||
package fix
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestQuoteRequestCanRespond(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
body map[string]any
|
||||
want bool
|
||||
wantOK bool
|
||||
wantRaw string
|
||||
}{
|
||||
{
|
||||
name: "top-level bool true",
|
||||
body: map[string]any{"CanRespond": true},
|
||||
want: true,
|
||||
wantOK: true,
|
||||
wantRaw: "true",
|
||||
},
|
||||
{
|
||||
name: "group string yes",
|
||||
body: map[string]any{
|
||||
"NoRelatedSym": []map[string]any{{"CanRespond": "Y"}},
|
||||
},
|
||||
want: true,
|
||||
wantOK: true,
|
||||
wantRaw: "Y",
|
||||
},
|
||||
{
|
||||
name: "json decoded group string yes",
|
||||
body: map[string]any{
|
||||
"NoRelatedSym": []any{map[string]any{"CanRespond": "Y"}},
|
||||
},
|
||||
want: true,
|
||||
wantOK: true,
|
||||
wantRaw: "Y",
|
||||
},
|
||||
{
|
||||
name: "group false",
|
||||
body: map[string]any{
|
||||
"NoRelatedSym": []map[string]any{{"CanRespond": false}},
|
||||
},
|
||||
want: false,
|
||||
wantOK: true,
|
||||
wantRaw: "false",
|
||||
},
|
||||
{
|
||||
name: "missing",
|
||||
body: map[string]any{},
|
||||
want: false,
|
||||
wantOK: false,
|
||||
wantRaw: "",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
got, gotOK, gotRaw := quoteRequestCanRespond(tc.body)
|
||||
if got != tc.want || gotOK != tc.wantOK || gotRaw != tc.wantRaw {
|
||||
t.Fatalf("quoteRequestCanRespond() = (%v, %v, %q), want (%v, %v, %q)",
|
||||
got, gotOK, gotRaw, tc.want, tc.wantOK, tc.wantRaw)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestQuoteRequestStringFallsBackToNoRelatedSym(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
body := map[string]any{
|
||||
"NoRelatedSym": []map[string]any{{"OwnerTraderID": "dealer-1"}},
|
||||
}
|
||||
|
||||
if got := quoteRequestString(body, "OwnerTraderID"); got != "dealer-1" {
|
||||
t.Fatalf("quoteRequestString() = %q, want dealer-1", got)
|
||||
}
|
||||
}
|
||||
@ -5,10 +5,7 @@ import (
|
||||
|
||||
"quantex.com/qfixdpl/quickfix"
|
||||
"quantex.com/qfixdpl/quickfix/datadictionary"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
|
||||
"quantex.com/qfixdpl/quickfix/gen/tag"
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
@ -50,21 +47,6 @@ func parseQuoteRequest(msg quoterequest.QuoteRequest, dd *datadictionary.DataDic
|
||||
return buildFixMessageJSON("IN", "R", quoteReqID, msg.Message, dd)
|
||||
}
|
||||
|
||||
func parseQuoteAck(msg quoteack.QuoteAck, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
return buildFixMessageJSON("IN", "CW", quoteReqID, msg.Message, dd)
|
||||
}
|
||||
|
||||
func parseQuoteResponse(msg quoteresponse.QuoteResponse, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
return buildFixMessageJSON("IN", "AJ", quoteReqID, msg.Message, dd)
|
||||
}
|
||||
|
||||
func parseExecutionReport(msg executionreport.ExecutionReport, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
|
||||
clOrdID, _ := msg.GetClOrdID()
|
||||
return buildFixMessageJSON("IN", "8", clOrdID, msg.Message, dd)
|
||||
}
|
||||
|
||||
// extractIdentifier extracts the trade identifier from a parsed FIX message.
|
||||
// For ExecutionReport (8) and ExecutionAck (BN), uses ClOrdID (tag 11).
|
||||
// For all other message types, uses QuoteReqID (tag 131).
|
||||
@ -87,12 +69,17 @@ func extractIdentifier(msg *quickfix.Message) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]any) domain.FixMessageJSON {
|
||||
return domain.FixMessageJSON{
|
||||
Direction: "OUT",
|
||||
MsgType: msgType,
|
||||
QuoteReqID: quoteReqID,
|
||||
Body: body,
|
||||
ReceiveTime: time.Now(),
|
||||
// extractHeaderMeta reads SenderCompID (49), MsgSeqNum (34) and SendingTime (52)
|
||||
// from a quickfix.Message header. Returns zero values when a field is absent.
|
||||
func extractHeaderMeta(msg *quickfix.Message) (senderCompID string, msgSeqNum int, sendingTime time.Time) {
|
||||
if s, err := msg.Header.GetString(tag.SenderCompID); err == nil {
|
||||
senderCompID = s
|
||||
}
|
||||
if n, err := msg.Header.GetInt(tag.MsgSeqNum); err == nil {
|
||||
msgSeqNum = n
|
||||
}
|
||||
if t, err := msg.Header.GetTime(tag.SendingTime); err == nil {
|
||||
sendingTime = t
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -1,11 +1,12 @@
|
||||
CREATE TABLE IF NOT EXISTS qfixdpl_messages (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
quote_req_id TEXT NOT NULL,
|
||||
j_message JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
id UUID PRIMARY KEY,
|
||||
sender_comp_id TEXT NOT NULL,
|
||||
msg_seq_num BIGINT NOT NULL,
|
||||
j_message JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_session ON qfixdpl_messages(sender_comp_id, msg_seq_num);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS qfixdpl_logs (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
|
||||
@ -2,6 +2,7 @@ package store
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -9,15 +10,20 @@ import (
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
func (p *Store) SaveMessage(msg domain.TradeMessage) error {
|
||||
func (p *Store) SaveMessage(msg domain.Message) error {
|
||||
jsonBytes, err := json.Marshal(msg.JMessage)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error marshaling j_message: %w", err)
|
||||
}
|
||||
|
||||
_, err = p.db.Exec(
|
||||
"INSERT INTO qfixdpl_messages (quote_req_id, j_message) VALUES ($1, $2)",
|
||||
msg.QuoteReqID, string(jsonBytes),
|
||||
`INSERT INTO qfixdpl_messages (id, sender_comp_id, msg_seq_num, j_message, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5)`,
|
||||
msg.ID,
|
||||
msg.SenderCompID,
|
||||
strconv.Itoa(msg.MsgSeqNum),
|
||||
string(jsonBytes),
|
||||
msg.CreatedAt.UTC().Format(time.RFC3339Nano),
|
||||
)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error inserting message: %w", err)
|
||||
@ -41,25 +47,29 @@ func (p *Store) SaveLog(entry domain.LogEntry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
func (p *Store) GetTodayMessages() ([]domain.Message, error) {
|
||||
rows, err := p.db.Query(
|
||||
"SELECT id, quote_req_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
|
||||
`SELECT id, sender_comp_id, msg_seq_num, j_message, created_at
|
||||
FROM qfixdpl_messages
|
||||
WHERE created_at >= current_date
|
||||
ORDER BY created_at ASC`,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, tracerr.Errorf("error querying today messages: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var messages []domain.TradeMessage
|
||||
var messages []domain.Message
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
id, quoteReqID string
|
||||
jMessageRaw []byte
|
||||
createdAt time.Time
|
||||
id, senderCompID string
|
||||
msgSeqNum int
|
||||
jMessageRaw []byte
|
||||
createdAt time.Time
|
||||
)
|
||||
|
||||
if err := rows.Scan(&id, "eReqID, &jMessageRaw, &createdAt); err != nil {
|
||||
if err := rows.Scan(&id, &senderCompID, &msgSeqNum, &jMessageRaw, &createdAt); err != nil {
|
||||
return nil, tracerr.Errorf("error scanning message row: %w", err)
|
||||
}
|
||||
|
||||
@ -68,11 +78,22 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
|
||||
}
|
||||
|
||||
messages = append(messages, domain.TradeMessage{
|
||||
ID: id,
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: jMessage,
|
||||
CreatedAt: createdAt,
|
||||
sendingTime, _ := jMessage.Header["SendingTime"].(time.Time)
|
||||
if sendingTime.IsZero() {
|
||||
if s, ok := jMessage.Header["SendingTime"].(string); ok {
|
||||
if t, parseErr := time.Parse(time.RFC3339Nano, s); parseErr == nil {
|
||||
sendingTime = t
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
messages = append(messages, domain.Message{
|
||||
ID: id,
|
||||
SenderCompID: senderCompID,
|
||||
MsgSeqNum: msgSeqNum,
|
||||
SendingTime: sendingTime,
|
||||
CreatedAt: createdAt,
|
||||
JMessage: jMessage,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -51,6 +51,7 @@ func Runner(cfg app.Config) error {
|
||||
External: cfg.External,
|
||||
AuthorizedServices: cfg.AuthorizedServices,
|
||||
EnableJWTAuth: cfg.EnableJWTAuth,
|
||||
ServiceAPIKey: cfg.ServiceAPIKey,
|
||||
}
|
||||
|
||||
api := rest.New(userData, appStore, fixManager, apiConfig, notify)
|
||||
|
||||
@ -6,7 +6,6 @@ import "time"
|
||||
// ListTrade es la representacion exportada de un trade de List Trading.
|
||||
type ListTrade struct {
|
||||
QuoteRequest FixMessageJSON `json:"quote_request"`
|
||||
Price string `json:"price,omitempty"`
|
||||
}
|
||||
|
||||
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
|
||||
@ -20,12 +19,15 @@ type FixMessageJSON struct {
|
||||
ReceiveTime time.Time `json:"receive_time"`
|
||||
}
|
||||
|
||||
// TradeMessage es una fila de qfixdpl_messages.
|
||||
type TradeMessage struct {
|
||||
ID string `json:"id"`
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
JMessage FixMessageJSON `json:"j_message"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
// Message es una fila de qfixdpl_messages, con la metadata del header FIX hoisted
|
||||
// para que los consumidores puedan ordenar/filtrar sin parsear el JSON.
|
||||
type Message struct {
|
||||
ID string `json:"id"`
|
||||
SenderCompID string `json:"sender_comp_id"`
|
||||
MsgSeqNum int `json:"msg_seq_num"`
|
||||
SendingTime time.Time `json:"sending_time"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
JMessage FixMessageJSON `json:"j_message"`
|
||||
}
|
||||
|
||||
// LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs.
|
||||
@ -41,8 +43,8 @@ type Logs struct {
|
||||
|
||||
// PersistenceStore define la interfaz de persistencia.
|
||||
type PersistenceStore interface {
|
||||
SaveMessage(msg TradeMessage) error
|
||||
SaveMessage(msg Message) error
|
||||
SaveLog(entry LogEntry) error
|
||||
GetTodayMessages() ([]TradeMessage, error)
|
||||
GetTodayMessages() ([]Message, error)
|
||||
GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user