sending the messages by seqnum
This commit is contained in:
@ -329,11 +329,15 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
|
||||
}
|
||||
|
||||
// AllMessages godoc
|
||||
// @Summary List all FIX application messages of the day
|
||||
// @Description Returns every FIX application message recorded today (no admin: heartbeats/logon/logout/etc.), sorted by CreatedAt ascending
|
||||
// @Summary List FIX application messages of the day after the caller's last-seen sequence
|
||||
// @Description Returns today's FIX application messages (no admin: heartbeats/logon/logout/etc.) with MsgSeqNum greater than the caller's last-seen sequence per direction. "In" is the last MsgSeqNum the caller received on the IN side; "Out" is the same for OUT. Pass 0 to receive everything on that side. Sorted by CreatedAt ascending.
|
||||
// @Tags fix
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param body body AllMessagesRequest true "API key and last-seen MsgSeqNum per direction"
|
||||
// @Success 200 {array} domain.Message
|
||||
// @Failure 400 {object} HTTPError
|
||||
// @Failure 401 {object} HTTPError
|
||||
// @Router /qfixdpl/v1/messages [post]
|
||||
func (cont *Controller) AllMessages(ctx *gin.Context) {
|
||||
setHeaders(ctx, cont.config)
|
||||
@ -349,7 +353,7 @@ func (cont *Controller) AllMessages(ctx *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, cont.tradeProvider.GetAllMessages())
|
||||
ctx.JSON(http.StatusOK, cont.tradeProvider.GetAllMessages(req.In, req.Out))
|
||||
}
|
||||
|
||||
// GetPendingQuoteRequests godoc
|
||||
|
||||
@ -24,4 +24,6 @@ type SendQuoteRequest struct {
|
||||
|
||||
type AllMessagesRequest struct {
|
||||
APIKey string
|
||||
In int
|
||||
Out int
|
||||
}
|
||||
|
||||
@ -22,7 +22,7 @@ type TradeProvider interface {
|
||||
GetTrades() []domain.ListTrade
|
||||
GetPendingQuoteRequests() []domain.ListTrade
|
||||
SendQuote(quoteReqID string, price decimal.Decimal) error
|
||||
GetAllMessages() []domain.Message
|
||||
GetAllMessages(inSeq, outSeq int) []domain.Message
|
||||
}
|
||||
|
||||
const RedisMaxIdle = 3000 // In ms
|
||||
|
||||
@ -598,17 +598,29 @@ func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
|
||||
m.messagesMu.Unlock()
|
||||
}
|
||||
|
||||
// GetAllMessages returns a snapshot of every FIX application message recorded today,
|
||||
// sorted ascending by CreatedAt.
|
||||
func (m *Manager) GetAllMessages() []domain.Message {
|
||||
// GetAllMessages returns today's FIX application messages with MsgSeqNum greater than
|
||||
// the caller's last-seen sequence per direction (inSeq for IN, outSeq for OUT), sorted
|
||||
// ascending by CreatedAt. Passing 0 for either cursor returns all messages on that side.
|
||||
func (m *Manager) GetAllMessages(inSeq, outSeq int) []domain.Message {
|
||||
m.messagesMu.RLock()
|
||||
out := make([]domain.Message, len(m.messages))
|
||||
copy(out, m.messages)
|
||||
filtered := make([]domain.Message, 0, len(m.messages))
|
||||
for _, msg := range m.messages {
|
||||
switch msg.JMessage.Direction {
|
||||
case "IN":
|
||||
if msg.MsgSeqNum > inSeq {
|
||||
filtered = append(filtered, msg)
|
||||
}
|
||||
case "OUT":
|
||||
if msg.MsgSeqNum > outSeq {
|
||||
filtered = append(filtered, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
m.messagesMu.RUnlock()
|
||||
|
||||
sort.Slice(out, func(i, j int) bool { return out[i].CreatedAt.Before(out[j].CreatedAt) })
|
||||
sort.Slice(filtered, func(i, j int) bool { return filtered[i].CreatedAt.Before(filtered[j].CreatedAt) })
|
||||
|
||||
return out
|
||||
return filtered
|
||||
}
|
||||
|
||||
// loadTodayMessages rebuilds the in-memory message list from today's rows in the DB.
|
||||
|
||||
Reference in New Issue
Block a user