419 lines
12 KiB
Go
419 lines
12 KiB
Go
// Copyright (c) quickfixengine.org All rights reserved.
|
|
//
|
|
// This file may be distributed under the terms of the quickfixengine.org
|
|
// license as defined by quickfixengine.org and appearing in the file
|
|
// LICENSE included in the packaging of this file.
|
|
//
|
|
// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
|
|
// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
|
|
// PARTICULAR PURPOSE.
|
|
//
|
|
// See http://www.quickfixengine.org/LICENSE for licensing information.
|
|
//
|
|
// Contact ask@quickfixengine.org if any conditions of this licensing
|
|
// are not clear to you.
|
|
|
|
package quickfix
|
|
|
|
import (
|
|
"bytes"
|
|
"time"
|
|
|
|
"quantex.com/qfixdpl/quickfix/internal"
|
|
)
|
|
|
|
type inSession struct{ loggedOn }
|
|
|
|
func (state inSession) String() string { return "In Session" }
|
|
|
|
func (state inSession) FixMsgIn(session *session, msg *Message) sessionState {
|
|
msgType, err := msg.Header.GetBytes(tagMsgType)
|
|
if err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
|
|
switch {
|
|
case bytes.Equal(msgTypeLogon, msgType):
|
|
if err := session.handleLogon(msg); err != nil {
|
|
if err := session.initiateLogoutInReplyTo("", msg); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
return logoutState{}
|
|
}
|
|
|
|
return state
|
|
case bytes.Equal(msgTypeLogout, msgType):
|
|
return state.handleLogout(session, msg)
|
|
case bytes.Equal(msgTypeResendRequest, msgType):
|
|
return state.handleResendRequest(session, msg)
|
|
case bytes.Equal(msgTypeSequenceReset, msgType):
|
|
return state.handleSequenceReset(session, msg)
|
|
case bytes.Equal(msgTypeTestRequest, msgType):
|
|
return state.handleTestRequest(session, msg)
|
|
default:
|
|
if err := session.verify(msg); err != nil {
|
|
return state.processReject(session, msg, err)
|
|
}
|
|
}
|
|
|
|
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
|
|
return state
|
|
}
|
|
|
|
func (state inSession) Timeout(session *session, event internal.Event) (nextState sessionState) {
|
|
switch event {
|
|
case internal.NeedHeartbeat:
|
|
heartBt := NewMessage()
|
|
heartBt.Header.SetField(tagMsgType, FIXString("0"))
|
|
if err := session.send(heartBt); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
case internal.PeerTimeout:
|
|
testReq := NewMessage()
|
|
testReq.Header.SetField(tagMsgType, FIXString("1"))
|
|
testReq.Body.SetField(tagTestReqID, FIXString("TEST"))
|
|
if err := session.send(testReq); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
session.log.OnEvent("Sent test request TEST")
|
|
session.peerTimer.Reset(time.Duration(float64(1.2) * float64(session.HeartBtInt)))
|
|
return pendingTimeout{state}
|
|
}
|
|
|
|
return state
|
|
}
|
|
|
|
func (state inSession) handleLogout(session *session, msg *Message) (nextState sessionState) {
|
|
if err := session.verifySelect(msg, false, false, true); err != nil {
|
|
return state.processReject(session, msg, err)
|
|
}
|
|
|
|
if session.IsLoggedOn() {
|
|
session.log.OnEvent("Received logout request")
|
|
session.log.OnEvent("Sending logout response")
|
|
|
|
if err := session.sendLogoutInReplyTo("", msg); err != nil {
|
|
session.logError(err)
|
|
}
|
|
} else {
|
|
session.log.OnEvent("Received logout response")
|
|
}
|
|
|
|
if session.ResetOnLogout {
|
|
if err := session.dropAndReset(); err != nil {
|
|
session.logError(err)
|
|
}
|
|
return latentState{}
|
|
}
|
|
|
|
if err := session.checkTargetTooLow(msg); err != nil {
|
|
return latentState{}
|
|
}
|
|
|
|
if err := session.checkTargetTooHigh(msg); err != nil {
|
|
return latentState{}
|
|
}
|
|
|
|
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
|
|
session.logError(err)
|
|
}
|
|
|
|
return latentState{}
|
|
}
|
|
|
|
func (state inSession) handleTestRequest(session *session, msg *Message) (nextState sessionState) {
|
|
if err := session.verify(msg); err != nil {
|
|
return state.processReject(session, msg, err)
|
|
}
|
|
var testReq FIXString
|
|
if err := msg.Body.GetField(tagTestReqID, &testReq); err != nil {
|
|
session.log.OnEvent("Test Request with no testRequestID")
|
|
} else {
|
|
heartBt := NewMessage()
|
|
heartBt.Header.SetField(tagMsgType, FIXString("0"))
|
|
heartBt.Body.SetField(tagTestReqID, testReq)
|
|
if err := session.sendInReplyTo(heartBt, msg); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
}
|
|
|
|
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
return state
|
|
}
|
|
|
|
func (state inSession) handleSequenceReset(session *session, msg *Message) (nextState sessionState) {
|
|
var gapFillFlag FIXBoolean
|
|
if msg.Body.Has(tagGapFillFlag) {
|
|
if err := msg.Body.GetField(tagGapFillFlag, &gapFillFlag); err != nil {
|
|
return state.processReject(session, msg, err)
|
|
}
|
|
}
|
|
|
|
if err := session.verifySelect(msg, bool(gapFillFlag), bool(gapFillFlag), true); err != nil {
|
|
return state.processReject(session, msg, err)
|
|
}
|
|
|
|
var newSeqNo FIXInt
|
|
if err := msg.Body.GetField(tagNewSeqNo, &newSeqNo); err == nil {
|
|
expectedSeqNum := FIXInt(session.store.NextTargetMsgSeqNum())
|
|
session.log.OnEventf("Received SequenceReset FROM: %v TO: %v", expectedSeqNum, newSeqNo)
|
|
|
|
switch {
|
|
case newSeqNo > expectedSeqNum:
|
|
if err := session.store.SetNextTargetMsgSeqNum(int(newSeqNo)); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
case newSeqNo < expectedSeqNum:
|
|
// FIXME: to be compliant with legacy tests, do not include tag in reftagid? (11c_NewSeqNoLess).
|
|
if err := session.doReject(msg, valueIsIncorrectNoTag()); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
}
|
|
}
|
|
return state
|
|
}
|
|
|
|
func (state inSession) handleResendRequest(session *session, msg *Message) (nextState sessionState) {
|
|
if err := session.verifyIgnoreSeqNumTooHighOrLow(msg); err != nil {
|
|
return state.processReject(session, msg, err)
|
|
}
|
|
|
|
var err error
|
|
var beginSeqNoField FIXInt
|
|
if err = msg.Body.GetField(tagBeginSeqNo, &beginSeqNoField); err != nil {
|
|
return state.processReject(session, msg, RequiredTagMissing(tagBeginSeqNo))
|
|
}
|
|
|
|
beginSeqNo := beginSeqNoField
|
|
|
|
var endSeqNoField FIXInt
|
|
if err = msg.Body.GetField(tagEndSeqNo, &endSeqNoField); err != nil {
|
|
return state.processReject(session, msg, RequiredTagMissing(tagEndSeqNo))
|
|
}
|
|
|
|
endSeqNo := int(endSeqNoField)
|
|
|
|
session.log.OnEventf("Received ResendRequest FROM: %d TO: %d", beginSeqNo, endSeqNo)
|
|
expectedSeqNum := session.store.NextSenderMsgSeqNum()
|
|
|
|
if (session.sessionID.BeginString >= BeginStringFIX42 && endSeqNo == 0) ||
|
|
(session.sessionID.BeginString <= BeginStringFIX42 && endSeqNo == 999999) ||
|
|
(endSeqNo >= expectedSeqNum) {
|
|
endSeqNo = expectedSeqNum - 1
|
|
}
|
|
|
|
if err := state.resendMessages(session, int(beginSeqNo), endSeqNo, *msg); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
|
|
if err := session.checkTargetTooLow(msg); err != nil {
|
|
return state
|
|
}
|
|
|
|
if err := session.checkTargetTooHigh(msg); err != nil {
|
|
return state
|
|
}
|
|
|
|
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
return state
|
|
}
|
|
|
|
func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int, inReplyTo Message) error {
|
|
if session.DisableMessagePersist {
|
|
return state.generateSequenceReset(session, beginSeqNo, endSeqNo+1, inReplyTo)
|
|
}
|
|
|
|
// resendMutex must always be locked before sendMutex to prevent a potential deadlock
|
|
// sendMutex is locked below in session.EnqueueBytesAndSend()
|
|
session.resendMutex.Lock()
|
|
defer session.resendMutex.Unlock()
|
|
|
|
seqNum := beginSeqNo
|
|
nextSeqNum := seqNum
|
|
msg := NewMessage()
|
|
err := session.store.IterateMessages(beginSeqNo, endSeqNo, func(msgBytes []byte) error {
|
|
err := ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
|
|
if err != nil {
|
|
session.log.OnEventf("Resend Msg Parse Error: %v, %v", err.Error(), bytes.NewBuffer(msgBytes).String())
|
|
return err // We cant continue with a message that cant be parsed correctly.
|
|
}
|
|
msgType, _ := msg.Header.GetBytes(tagMsgType)
|
|
sentMessageSeqNum, _ := msg.Header.GetInt(tagMsgSeqNum)
|
|
|
|
if isAdminMessageType(msgType) {
|
|
nextSeqNum = sentMessageSeqNum + 1
|
|
return nil
|
|
}
|
|
|
|
if !session.resend(msg) {
|
|
nextSeqNum = sentMessageSeqNum + 1
|
|
return nil
|
|
}
|
|
|
|
if seqNum != sentMessageSeqNum {
|
|
if err = state.generateSequenceReset(session, seqNum, sentMessageSeqNum, inReplyTo); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
session.log.OnEventf("Resending Message: %v", sentMessageSeqNum)
|
|
msgBytes = msg.buildWithBodyBytes(msg.bodyBytes) // workaround for maintaining repeating group field order
|
|
session.EnqueueBytesAndSend(msgBytes)
|
|
|
|
seqNum = sentMessageSeqNum + 1
|
|
nextSeqNum = seqNum
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
session.log.OnEventf("error retrieving messages from store: %s", err.Error())
|
|
return err
|
|
}
|
|
|
|
if seqNum != nextSeqNum { // gapfill for catch-up
|
|
if err = state.generateSequenceReset(session, seqNum, nextSeqNum, inReplyTo); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (state inSession) processReject(session *session, msg *Message, rej MessageRejectError) sessionState {
|
|
switch TypedError := rej.(type) {
|
|
case targetTooHigh:
|
|
|
|
var nextState resendState
|
|
switch currentState := session.State.(type) {
|
|
case resendState:
|
|
// Assumes target too high reject already sent.
|
|
nextState = currentState
|
|
default:
|
|
var err error
|
|
if nextState, err = session.doTargetTooHigh(TypedError); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
}
|
|
|
|
if nextState.messageStash == nil {
|
|
nextState.messageStash = make(map[int]*Message)
|
|
}
|
|
|
|
nextState.messageStash[TypedError.ReceivedTarget] = msg
|
|
|
|
return nextState
|
|
|
|
case targetTooLow:
|
|
return state.doTargetTooLow(session, msg, TypedError)
|
|
case incorrectBeginString:
|
|
if err := session.initiateLogout(rej.Error()); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
return logoutState{}
|
|
}
|
|
|
|
switch rej.RejectReason() {
|
|
case rejectReasonCompIDProblem, rejectReasonSendingTimeAccuracyProblem:
|
|
if err := session.doReject(msg, rej); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
|
|
if err := session.initiateLogout(""); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
return logoutState{}
|
|
default:
|
|
if err := session.doReject(msg, rej); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
|
|
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
return state
|
|
}
|
|
}
|
|
|
|
func (state inSession) doTargetTooLow(session *session, msg *Message, rej targetTooLow) (nextState sessionState) {
|
|
var posDupFlag FIXBoolean
|
|
if msg.Header.Has(tagPossDupFlag) {
|
|
if err := msg.Header.GetField(tagPossDupFlag, &posDupFlag); err != nil {
|
|
if rejErr := session.doReject(msg, err); rejErr != nil {
|
|
return handleStateError(session, rejErr)
|
|
}
|
|
return state
|
|
}
|
|
}
|
|
|
|
if !posDupFlag.Bool() {
|
|
if err := session.initiateLogout(rej.Error()); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
return logoutState{}
|
|
}
|
|
|
|
if !msg.Header.Has(tagOrigSendingTime) {
|
|
if err := session.doReject(msg, RequiredTagMissing(tagOrigSendingTime)); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
return state
|
|
}
|
|
|
|
var origSendingTime FIXUTCTimestamp
|
|
if err := msg.Header.GetField(tagOrigSendingTime, &origSendingTime); err != nil {
|
|
if rejErr := session.doReject(msg, err); rejErr != nil {
|
|
return handleStateError(session, rejErr)
|
|
}
|
|
return state
|
|
}
|
|
|
|
sendingTime := new(FIXUTCTimestamp)
|
|
if err := msg.Header.GetField(tagSendingTime, sendingTime); err != nil {
|
|
return state.processReject(session, msg, err)
|
|
}
|
|
|
|
if sendingTime.Before(origSendingTime.Time) {
|
|
if err := session.doReject(msg, sendingTimeAccuracyProblem()); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
|
|
if err := session.initiateLogout(""); err != nil {
|
|
return handleStateError(session, err)
|
|
}
|
|
return logoutState{}
|
|
}
|
|
|
|
return state
|
|
}
|
|
|
|
func (state *inSession) generateSequenceReset(session *session, beginSeqNo int, endSeqNo int, inReplyTo Message) (err error) {
|
|
sequenceReset := NewMessage()
|
|
session.fillDefaultHeader(sequenceReset, &inReplyTo)
|
|
|
|
sequenceReset.Header.SetField(tagMsgType, FIXString("4"))
|
|
sequenceReset.Header.SetField(tagMsgSeqNum, FIXInt(beginSeqNo))
|
|
sequenceReset.Header.SetField(tagPossDupFlag, FIXBoolean(true))
|
|
sequenceReset.Body.SetField(tagNewSeqNo, FIXInt(endSeqNo))
|
|
sequenceReset.Body.SetField(tagGapFillFlag, FIXBoolean(true))
|
|
|
|
var origSendingTime FIXString
|
|
if err := sequenceReset.Header.GetField(tagSendingTime, &origSendingTime); err == nil {
|
|
sequenceReset.Header.SetField(tagOrigSendingTime, origSendingTime)
|
|
}
|
|
|
|
session.application.ToAdmin(sequenceReset, session.sessionID)
|
|
|
|
msgBytes := sequenceReset.build()
|
|
|
|
session.EnqueueBytesAndSend(msgBytes)
|
|
session.log.OnEventf("Sent SequenceReset TO: %v", endSeqNo)
|
|
|
|
return
|
|
}
|