2 Commits

24 changed files with 1454 additions and 1363 deletions

View File

@ -62,10 +62,7 @@ linux-build: check-env swag # Build a linux version for prod environment. Set e=
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
deploy: # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=dpl
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/dpl/
open-demo:
make deploy e=open-demo s=nonprodFix
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/
fmt: download-versions # Apply the Go formatter to the code
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);

View File

@ -1,5 +0,0 @@
8=FIXT.1.1|9=861|35=R|34=413|49=TRADEWEB|52=20260508-16:51:56.674|56=BYMA_CORI_TEST_12345_DLRDPL|131=LST_20260508_BYMA_CORI_NY1600105.1_1|146=1|55=ARGENT 1.500 07/09/35|48=040114HT0|22=1|460=12|167=CORP|762=REGCORIINV|541=20350709|225=20200904|470=AR|223=1.5|106=ARGENTINE REPUBLIC|54=2|38=10000|64=20260511|15=USD|6110=Sov|60=20260508-16:51:56|423=1|44=-999999|236=-999999|5023=0.000010|66=NY1600105.1|6847=1|75=20260508|464=Y|20086=1|20074=Y|20075=Y|20077=[N/A]|20078=[N/A]|20079=300|20081=300|20090=0|20072=300|20098=0|5745=1|20073=RFQ|20076=Y|20156=Y|20130=20501720000|20138=HSB|561=1|562=1|20175=20210709|20223=American|22630=0|20265=LatAm|20227=Global|23068=TRWB|453=3|448=bymacust|447=C|452=3|802=3|523=BYMA CUST|803=2|523=NY|803=25|523=AR|803=4000|448=BYMA Customer|447=C|452=1|448=DTCC|447=C|452=4|454=1|455=US040114HT09|456=4|5114=2|5113=1|20169=Ca|5113=0|20169=CCC+|10=078
8=FIXT.1.1|9=203|35=AI|34=418|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:51:56.695|56=TRADEWEB|55=[N/A]|60=20260508-16:51:56.695|117=LST_20260508_BYMA_CORI_NY1600105.1_1|131=LST_20260508_BYMA_CORI_NY1600105.1_1|297=0|10=165
8=FIXT.1.1|9=292|35=AJ|34=414|49=TRADEWEB|52=20260508-16:52:05.343|56=BYMA_CORI_TEST_12345_DLRDPL|11=LST_20260508_BYMA_CORI_NY1600105.1_1|55=[N/A]|60=20260508-16:52:05|117=[N/A]|131=LST_20260508_BYMA_CORI_NY1600105.1_1|693=LST_20260508_BYMA_CORI_NY1600105.1_1_LISTEND|694=7|20086=1|20103=N/A|20110=NO|22636=Y|10=077
8=FIXT.1.1|9=211|35=AI|34=419|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:52:05.367|56=TRADEWEB|55=[N/A]|60=20260508-16:52:05.367|131=LST_20260508_BYMA_CORI_NY1600105.1_1|297=0|693=LST_20260508_BYMA_CORI_NY1600105.1_1_LISTEND|10=014

View File

@ -1,14 +0,0 @@
8=FIXT.1.1|9=862|35=R|34=393|49=TRADEWEB|52=20260508-16:44:06.978|56=BYMA_CORI_TEST_12345_DLRDPL|131=LST_20260508_BYMA_CORI_NY1600095.1_1|146=1|55=ARGENT 1.500 07/09/35|48=040114HT0|22=1|460=12|167=CORP|762=REGCORIINV|541=20350709|225=20200904|470=AR|223=1.5|106=ARGENTINE REPUBLIC|54=2|38=850000|64=20260511|15=USD|6110=Sov|60=20260508-16:44:06|423=1|44=-999999|236=-999999|5023=0.000010|66=NY1600095.1|6847=1|75=20260508|464=Y|20086=1|20074=Y|20075=Y|20077=[N/A]|20078=[N/A]|20079=300|20081=300|20090=0|20072=300|20098=0|5745=1|20073=RFQ|20076=Y|20156=Y|20130=20501720000|20138=HSB|561=1|562=1|20175=20210709|20223=American|22630=0|20265=LatAm|20227=Global|23068=TRWB|453=3|448=bymacust|447=C|452=3|802=3|523=BYMA CUST|803=2|523=NY|803=25|523=AR|803=4000|448=BYMA Customer|447=C|452=1|448=DTCC|447=C|452=4|454=1|455=US040114HT09|456=4|5114=2|5113=1|20169=Ca|5113=0|20169=CCC+|10=163
8=FIXT.1.1|9=203|35=AI|34=398|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:44:07.018|56=TRADEWEB|55=[N/A]|60=20260508-16:44:07.018|117=LST_20260508_BYMA_CORI_NY1600095.1_1|131=LST_20260508_BYMA_CORI_NY1600095.1_1|297=0|10=162
8=FIXT.1.1|9=293|35=S|34=399|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:44:34.096|56=TRADEWEB|15=USD|22=1|38=850000|44=91.00000000|48=040114HT0|54=2|55=[N/A]|60=20260508-16:44:34.096|64=20260511|117=LST_20260508_BYMA_CORI_NY1600095.1_1|131=LST_20260508_BYMA_CORI_NY1600095.1_1|132=91.00000000|423=1|537=211|10=127
8=FIXT.1.1|9=191|35=CW|34=394|49=TRADEWEB|52=20260508-16:44:34.137|56=BYMA_CORI_TEST_12345_DLRDPL|60=20260508-16:44:34|117=LST_20260508_BYMA_CORI_NY1600095.1_1|131=LST_20260508_BYMA_CORI_NY1600095.1_1|1865=1|10=004
8=FIXT.1.1|9=80|35=0|34=400|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:04.125|56=TRADEWEB|10=169
8=FIXT.1.1|9=80|35=0|34=395|49=TRADEWEB|52=20260508-16:45:04.142|56=BYMA_CORI_TEST_12345_DLRDPL|10=181
8=FIXT.1.1|9=391|35=AJ|34=396|49=TRADEWEB|52=20260508-16:45:10.511|56=BYMA_CORI_TEST_12345_DLRDPL|11=LST_20260508_BYMA_CORI_NY1600095.1_1|22=1|38=850000|44=91|48=040114HT0|54=2|55=[N/A]|60=20260508-16:45:10|117=LST_20260508_BYMA_CORI_NY1600095.1_1|131=LST_20260508_BYMA_CORI_NY1600095.1_1|423=1|693=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDREQ|694=1|20074=N|20075=N|20076=N|20079=236|20082=60|20156=N|22630=0|10=127
8=FIXT.1.1|9=295|35=8|34=397|49=TRADEWEB|52=20260508-16:45:10.514|56=BYMA_CORI_TEST_12345_DLRDPL|6=0|11=LST_20260508_BYMA_CORI_NY1600095.1_1|14=0|17=LST_20260508_BYMA_CORI_NY1600095.1_1_LISTEND-124510.514|37=LST_20260508_BYMA_CORI_NY1600095.1_1|39=A|55=[N/A]|60=20260508-16:45:10|75=20260508|150=A|151=0|20086=1|10=213
8=FIXT.1.1|9=210|35=AI|34=401|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:10.539|56=TRADEWEB|55=[N/A]|60=20260508-16:45:10.539|131=LST_20260508_BYMA_CORI_NY1600095.1_1|297=0|693=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDREQ|10=209
8=FIXT.1.1|9=261|35=BN|34=402|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:10.601|56=TRADEWEB|11=LST_20260508_BYMA_CORI_NY1600095.1_1|17=LST_20260508_BYMA_CORI_NY1600095.1_1_LISTEND-124510.514|37=LST_20260508_BYMA_CORI_NY1600095.1_1|55=[N/A]|60=20260508-16:45:10.601|1036=1|10=232
8=FIXT.1.1|9=303|35=8|34=398|49=TRADEWEB|52=20260508-16:45:10.666|56=BYMA_CORI_TEST_12345_DLRDPL|6=0|11=LST_20260508_BYMA_CORI_NY1600095.1_1|14=0|17=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDEND-124510.660|37=LST_20260508_BYMA_CORI_NY1600095.1_1|39=2|44=91|54=2|55=[N/A]|60=20260508-16:45:10|75=20260508|150=F|151=0|423=1|10=252
8=FIXT.1.1|9=260|35=BN|34=403|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:10.690|56=TRADEWEB|11=LST_20260508_BYMA_CORI_NY1600095.1_1|17=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDEND-124510.660|37=LST_20260508_BYMA_CORI_NY1600095.1_1|55=[N/A]|60=20260508-16:45:10.690|1036=1|10=168
8=FIXT.1.1|9=729|35=8|34=399|49=TRADEWEB|52=20260508-16:45:10.738|56=BYMA_CORI_TEST_12345_DLRDPL|6=0|11=LST_20260508_BYMA_CORI_NY1600095.1_1|14=0|17=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDSUMM-124510.736|22=1|37=LST_20260508_BYMA_CORI_NY1600095.1_1|38=850000|39=2|44=91|48=040114HT0|54=2|55=[N/A]|60=20260508-16:45:10.535|64=20260511|75=20260508|150=F|151=0|167=CORP|236=2.61081622|423=1|453=2|448=bymacust|447=C|452=3|802=3|523=BYMA CUST|803=2|523=NY|803=25|523=AR|803=4000|448=BYMA Customer|447=C|452=1|802=1|523=YES|803=4003|454=1|455=US040114HT09|456=4|526=TRD_20260508_BYMA_CORI_213|1003=20260508.BYMA.CORI.213|1907=1|1903=20260508BYMACORI213|1906=5|6731=20260508.BYMA.CORI.213|20115=91|22630=0|22636=Y|23068=TRWB|23096=20260508BYMACORI213|10=051
8=FIXT.1.1|9=261|35=BN|34=404|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:10.781|56=TRADEWEB|11=LST_20260508_BYMA_CORI_NY1600095.1_1|17=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDSUMM-124510.736|37=LST_20260508_BYMA_CORI_NY1600095.1_1|55=[N/A]|60=20260508-16:45:10.781|1036=1|10=027

View File

@ -1,10 +0,0 @@
8=FIXT.1.1|9=858|35=R|34=416|49=TRADEWEB|52=20260508-16:52:50.205|56=BYMA_CORI_TEST_12345_DLRDPL|131=LST_20260508_BYMA_CORI_NY1600105.2_1|146=1|55=ARGENT 1.500 07/09/35|48=040114HT0|22=1|460=12|167=CORP|762=REGCORIINV|541=20350709|225=20200904|470=AR|223=1.5|106=ARGENTINE REPUBLIC|54=2|38=15000|64=20260511|15=USD|6110=Sov|60=20260508-16:52:50|423=1|44=-999999|236=-999999|5023=0.000010|66=NY1600105.2|6847=1|75=20260508|464=Y|20086=1|20074=Y|20075=Y|20077=[N/A]|20078=[N/A]|20079=60|20081=60|20090=0|20072=60|20098=0|5745=1|20073=RFQ|20076=Y|20156=Y|20130=20501720000|20138=HSB|561=1|562=1|20175=20210709|20223=American|22630=0|20265=LatAm|20227=Global|23068=TRWB|453=3|448=bymacust|447=C|452=3|802=3|523=BYMA CUST|803=2|523=NY|803=25|523=AR|803=4000|448=BYMA Customer|447=C|452=1|448=DTCC|447=C|452=4|454=1|455=US040114HT09|456=4|5114=2|5113=1|20169=Ca|5113=0|20169=CCC+|10=195
8=FIXT.1.1|9=203|35=AI|34=421|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:52:50.226|56=TRADEWEB|55=[N/A]|60=20260508-16:52:50.226|117=LST_20260508_BYMA_CORI_NY1600105.2_1|131=LST_20260508_BYMA_CORI_NY1600105.2_1|297=0|10=131
8=FIXT.1.1|9=80|35=0|34=422|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:53:20.289|56=TRADEWEB|10=181
8=FIXT.1.1|9=80|35=0|34=417|49=TRADEWEB|52=20260508-16:53:20.291|56=BYMA_CORI_TEST_12345_DLRDPL|10=178
8=FIXT.1.1|9=80|35=0|34=423|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:53:50.295|56=TRADEWEB|10=182
8=FIXT.1.1|9=80|35=0|34=418|49=TRADEWEB|52=20260508-16:53:50.305|56=BYMA_CORI_TEST_12345_DLRDPL|10=178
8=FIXT.1.1|9=265|35=AJ|34=419|49=TRADEWEB|52=20260508-16:53:50.698|56=BYMA_CORI_TEST_12345_DLRDPL|11=LST_20260508_BYMA_CORI_NY1600105.2_1|55=[N/A]|60=20260508-16:53:50|117=[N/A]|131=LST_20260508_BYMA_CORI_NY1600105.2_1|693=LST_20260508_BYMA_CORI_NY1600105.2_1_LISTEND|694=8|20086=1|10=010
8=FIXT.1.1|9=211|35=AI|34=424|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:53:50.721|56=TRADEWEB|55=[N/A]|60=20260508-16:53:50.721|131=LST_20260508_BYMA_CORI_NY1600105.2_1|297=0|693=LST_20260508_BYMA_CORI_NY1600105.2_1_LISTEND|10=002
8=FIXT.1.1|9=292|35=AJ|34=420|49=TRADEWEB|52=20260508-16:53:53.741|56=BYMA_CORI_TEST_12345_DLRDPL|11=LST_20260508_BYMA_CORI_NY1600105.2_1|55=[N/A]|60=20260508-16:53:53|117=[N/A]|131=LST_20260508_BYMA_CORI_NY1600105.2_1|693=LST_20260508_BYMA_CORI_NY1600105.2_1_LISTEND|694=8|20086=1|20103=N/A|20110=NO|22636=Y|10=088
8=FIXT.1.1|9=211|35=AI|34=425|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:53:53.763|56=TRADEWEB|55=[N/A]|60=20260508-16:53:53.763|131=LST_20260508_BYMA_CORI_NY1600105.2_1|297=0|693=LST_20260508_BYMA_CORI_NY1600105.2_1_LISTEND|10=021

View File

@ -5,10 +5,6 @@ SenderCompID=QUANTEX
ResetOnLogon=Y
FileStorePath=fix_store
FileLogPath=fix_logs
TransportDataDictionary=spec/FIXT11.xml
AppDataDictionary=spec/FIX50SP2.xml
AllowUnknownMessageFields=Y
RejectInvalidMessage=N
[SESSION]
BeginString=FIXT.1.1

View File

@ -89,24 +89,6 @@ func (m FieldMap) Get(parser Field) MessageRejectError {
return m.GetField(parser.Tag(), parser)
}
// RawValues returns a copy of the underlying TagValue slice for a tag.
// For repeating groups, the first entry is the count and the remaining
// entries are the flattened inner-field values across all repetitions.
func (m FieldMap) RawValues(tag Tag) []TagValue {
m.rwLock.RLock()
defer m.rwLock.RUnlock()
f, ok := m.tagLookup[tag]
if !ok {
return nil
}
out := make([]TagValue, len(f))
copy(out, f)
return out
}
// Has returns true if the Tag is present in this FieldMap.
func (m FieldMap) Has(tag Tag) bool {
m.rwLock.RLock()

View File

@ -86,16 +86,6 @@ func (tv TagValue) String() string {
return string(tv.bytes)
}
// Tag returns the FIX tag for this TagValue.
func (tv TagValue) Tag() Tag {
return tv.tag
}
// Value returns the raw FIX value bytes for this TagValue.
func (tv TagValue) Value() []byte {
return tv.value
}
func bytesTotal(bytes []byte) (total int) {
for _, b := range bytes {
total += int(b)

View File

@ -1,313 +0,0 @@
<?xml version='1.0' encoding='UTF-8'?>
<fix type='FIXT' major='1' minor='1' servicepack='0'>
<header>
<field name='BeginString' required='Y'/>
<field name='BodyLength' required='Y'/>
<field name='MsgType' required='Y'/>
<field name='SenderCompID' required='Y'/>
<field name='TargetCompID' required='Y'/>
<field name='OnBehalfOfCompID' required='N'/>
<field name='DeliverToCompID' required='N'/>
<field name='SecureDataLen' required='N'/>
<field name='SecureData' required='N'/>
<field name='MsgSeqNum' required='Y'/>
<field name='SenderSubID' required='N'/>
<field name='SenderLocationID' required='N'/>
<field name='TargetSubID' required='N'/>
<field name='TargetLocationID' required='N'/>
<field name='OnBehalfOfSubID' required='N'/>
<field name='OnBehalfOfLocationID' required='N'/>
<field name='DeliverToSubID' required='N'/>
<field name='DeliverToLocationID' required='N'/>
<field name='PossDupFlag' required='N'/>
<field name='PossResend' required='N'/>
<field name='SendingTime' required='Y'/>
<field name='OrigSendingTime' required='N'/>
<field name='XmlDataLen' required='N'/>
<field name='XmlData' required='N'/>
<field name='MessageEncoding' required='N'/>
<field name='LastMsgSeqNumProcessed' required='N'/>
<component name='HopGrp' required='N'/>
<field name='ApplVerID' required='N'/>
<field name='CstmApplVerID' required='N'/>
</header>
<trailer>
<field name='SignatureLength' required='N'/>
<field name='Signature' required='N'/>
<field name='CheckSum' required='Y'/>
</trailer>
<messages>
<message msgcat='admin' msgtype='0' name='Heartbeat'>
<field name='TestReqID' required='N'/>
</message>
<message msgcat='admin' msgtype='1' name='TestRequest'>
<field name='TestReqID' required='Y'/>
</message>
<message msgcat='admin' msgtype='2' name='ResendRequest'>
<field name='BeginSeqNo' required='Y'/>
<field name='EndSeqNo' required='Y'/>
</message>
<message msgcat='admin' msgtype='3' name='Reject'>
<field name='RefSeqNum' required='Y'/>
<field name='RefTagID' required='N'/>
<field name='RefMsgType' required='N'/>
<field name='SessionRejectReason' required='N'/>
<field name='Text' required='N'/>
<field name='EncodedTextLen' required='N'/>
<field name='EncodedText' required='N'/>
</message>
<message msgcat='admin' msgtype='4' name='SequenceReset'>
<field name='GapFillFlag' required='N'/>
<field name='NewSeqNo' required='Y'/>
</message>
<message msgcat='admin' msgtype='5' name='Logout'>
<field name='Text' required='N'/>
<field name='EncodedTextLen' required='N'/>
<field name='EncodedText' required='N'/>
</message>
<message msgcat='admin' msgtype='A' name='Logon'>
<field name='EncryptMethod' required='Y'/>
<field name='HeartBtInt' required='Y'/>
<field name='RawDataLength' required='N'/>
<field name='RawData' required='N'/>
<field name='ResetSeqNumFlag' required='N'/>
<field name='NextExpectedMsgSeqNum' required='N'/>
<field name='MaxMessageSize' required='N'/>
<field name='TestMessageIndicator' required='N'/>
<field name='Username' required='N'/>
<field name='Password' required='N'/>
<field name='DefaultApplVerID' required='Y'/>
<component name='MsgTypeGrp' required='N'/>
</message>
</messages>
<components>
<component name='HopGrp'>
<group name='NoHops' required='N'>
<field name='HopCompID' required='N'/>
<field name='HopSendingTime' required='N'/>
<field name='HopRefID' required='N'/>
</group>
</component>
<component name='MsgTypeGrp'>
<group name='NoMsgTypes' required='N'>
<field name='RefMsgType' required='N'/>
<field name='MsgDirection' required='N'/>
<field name='RefApplVerID' required='N'/>
<field name='RefCstmApplVerID' required='N'/>
</group>
</component>
</components>
<fields>
<field name='BeginSeqNo' number='7' type='SEQNUM'/>
<field name='BeginString' number='8' type='STRING'/>
<field name='BodyLength' number='9' type='LENGTH'/>
<field name='CheckSum' number='10' type='STRING'/>
<field name='EndSeqNo' number='16' type='SEQNUM'/>
<field name='MsgSeqNum' number='34' type='SEQNUM'/>
<field number='35' name='MsgType' type='STRING'>
<value enum='0' description='HEARTBEAT'/>
<value enum='1' description='TEST_REQUEST'/>
<value enum='2' description='RESEND_REQUEST'/>
<value enum='3' description='REJECT'/>
<value enum='4' description='SEQUENCE_RESET'/>
<value enum='5' description='LOGOUT'/>
<value enum='6' description='INDICATION_OF_INTEREST'/>
<value enum='7' description='ADVERTISEMENT'/>
<value enum='8' description='EXECUTION_REPORT'/>
<value enum='9' description='ORDER_CANCEL_REJECT'/>
<value enum='A' description='LOGON'/>
<value enum='B' description='NEWS'/>
<value enum='C' description='EMAIL'/>
<value enum='D' description='ORDER_SINGLE'/>
<value enum='E' description='ORDER_LIST'/>
<value enum='F' description='ORDER_CANCEL_REQUEST'/>
<value enum='G' description='ORDER_CANCEL_REPLACE_REQUEST'/>
<value enum='H' description='ORDER_STATUS_REQUEST'/>
<value enum='J' description='ALLOCATION_INSTRUCTION'/>
<value enum='K' description='LIST_CANCEL_REQUEST'/>
<value enum='L' description='LIST_EXECUTE'/>
<value enum='M' description='LIST_STATUS_REQUEST'/>
<value enum='N' description='LIST_STATUS'/>
<value enum='P' description='ALLOCATION_INSTRUCTION_ACK'/>
<value enum='Q' description='DONT_KNOW_TRADE'/>
<value enum='R' description='QUOTE_REQUEST'/>
<value enum='S' description='QUOTE'/>
<value enum='T' description='SETTLEMENT_INSTRUCTIONS'/>
<value enum='V' description='MARKET_DATA_REQUEST'/>
<value enum='W' description='MARKET_DATA_SNAPSHOT_FULL_REFRESH'/>
<value enum='X' description='MARKET_DATA_INCREMENTAL_REFRESH'/>
<value enum='Y' description='MARKET_DATA_REQUEST_REJECT'/>
<value enum='Z' description='QUOTE_CANCEL'/>
<value enum='a' description='QUOTE_STATUS_REQUEST'/>
<value enum='b' description='MASS_QUOTE_ACKNOWLEDGEMENT'/>
<value enum='c' description='SECURITY_DEFINITION_REQUEST'/>
<value enum='d' description='SECURITY_DEFINITION'/>
<value enum='e' description='SECURITY_STATUS_REQUEST'/>
<value enum='f' description='SECURITY_STATUS'/>
<value enum='g' description='TRADING_SESSION_STATUS_REQUEST'/>
<value enum='h' description='TRADING_SESSION_STATUS'/>
<value enum='i' description='MASS_QUOTE'/>
<value enum='j' description='BUSINESS_MESSAGE_REJECT'/>
<value enum='k' description='BID_REQUEST'/>
<value enum='l' description='BID_RESPONSE'/>
<value enum='m' description='LIST_STRIKE_PRICE'/>
<value enum='n' description='XML_MESSAGE'/>
<value enum='o' description='REGISTRATION_INSTRUCTIONS'/>
<value enum='p' description='REGISTRATION_INSTRUCTIONS_RESPONSE'/>
<value enum='q' description='ORDER_MASS_CANCEL_REQUEST'/>
<value enum='r' description='ORDER_MASS_CANCEL_REPORT'/>
<value enum='s' description='NEW_ORDER_CROSS'/>
<value enum='t' description='CROSS_ORDER_CANCEL_REPLACE_REQUEST'/>
<value enum='u' description='CROSS_ORDER_CANCEL_REQUEST'/>
<value enum='v' description='SECURITY_TYPE_REQUEST'/>
<value enum='w' description='SECURITY_TYPES'/>
<value enum='x' description='SECURITY_LIST_REQUEST'/>
<value enum='y' description='SECURITY_LIST'/>
<value enum='z' description='DERIVATIVE_SECURITY_LIST_REQUEST'/>
<value enum='AA' description='DERIVATIVE_SECURITY_LIST'/>
<value enum='AB' description='NEW_ORDER_MULTILEG'/>
<value enum='AC' description='MULTILEG_ORDER_CANCEL_REPLACE'/>
<value enum='AD' description='TRADE_CAPTURE_REPORT_REQUEST'/>
<value enum='AE' description='TRADE_CAPTURE_REPORT'/>
<value enum='AF' description='ORDER_MASS_STATUS_REQUEST'/>
<value enum='AG' description='QUOTE_REQUEST_REJECT'/>
<value enum='AH' description='RFQ_REQUEST'/>
<value enum='AI' description='QUOTE_STATUS_REPORT'/>
<value enum='AJ' description='QUOTE_RESPONSE'/>
<value enum='AK' description='CONFIRMATION'/>
<value enum='AL' description='POSITION_MAINTENANCE_REQUEST'/>
<value enum='AM' description='POSITION_MAINTENANCE_REPORT'/>
<value enum='AN' description='REQUEST_FOR_POSITIONS'/>
<value enum='AO' description='REQUEST_FOR_POSITIONS_ACK'/>
<value enum='AP' description='POSITION_REPORT'/>
<value enum='AQ' description='TRADE_CAPTURE_REPORT_REQUEST_ACK'/>
<value enum='AR' description='TRADE_CAPTURE_REPORT_ACK'/>
<value enum='AS' description='ALLOCATION_REPORT'/>
<value enum='AT' description='ALLOCATION_REPORT_ACK'/>
<value enum='AU' description='CONFIRMATION_ACK'/>
<value enum='AV' description='SETTLEMENT_INSTRUCTION_REQUEST'/>
<value enum='AW' description='ASSIGNMENT_REPORT'/>
<value enum='AX' description='COLLATERAL_REQUEST'/>
<value enum='AY' description='COLLATERAL_ASSIGNMENT'/>
<value enum='AZ' description='COLLATERAL_RESPONSE'/>
<value enum='BA' description='COLLATERAL_REPORT'/>
<value enum='BB' description='COLLATERAL_INQUIRY'/>
<value enum='BC' description='NETWORK_STATUS_REQUEST'/>
<value enum='BD' description='NETWORK_STATUS_RESPONSE'/>
<value enum='BE' description='USER_REQUEST'/>
<value enum='BF' description='USER_RESPONSE'/>
<value enum='BG' description='COLLATERAL_INQUIRY_ACK'/>
<value enum='BH' description='CONFIRMATION_REQUEST'/>
<value enum='BI' description='TRADING_SESSION_LIST_REQUEST'/>
<value enum='BJ' description='TRADING_SESSION_LIST'/>
<value enum='BK' description='SECURITY_LIST_UPDATE_REPORT'/>
<value enum='BL' description='ADJUSTED_POSITION_REPORT'/>
<value enum='BM' description='ALLOCATION_INSTRUCTION_ALERT'/>
<value enum='BN' description='EXECUTION_ACKNOWLEDGEMENT'/>
<value enum='BO' description='CONTRARY_INTENTION_REPORT'/>
<value enum='BP' description='SECURITY_DEFINITION_UPDATE_REPORT'/>
</field>
<field name='NewSeqNo' number='36' type='SEQNUM'/>
<field name='PossDupFlag' number='43' type='BOOLEAN'/>
<field name='RefSeqNum' number='45' type='SEQNUM'/>
<field name='SenderCompID' number='49' type='STRING'/>
<field name='SenderSubID' number='50' type='STRING'/>
<field name='SendingTime' number='52' type='UTCTIMESTAMP'/>
<field name='TargetCompID' number='56' type='STRING'/>
<field name='TargetSubID' number='57' type='STRING'/>
<field name='Text' number='58' type='STRING'/>
<field name='Signature' number='89' type='DATA'/>
<field name='SecureDataLen' number='90' type='LENGTH'/>
<field name='SecureData' number='91' type='DATA'/>
<field name='SignatureLength' number='93' type='LENGTH'/>
<field name='RawDataLength' number='95' type='LENGTH'/>
<field name='RawData' number='96' type='DATA'/>
<field name='PossResend' number='97' type='BOOLEAN'/>
<field name='EncryptMethod' number='98' type='INT'>
<value description='NONE_OTHER' enum='0'/>
<value description='PKCS' enum='1'/>
<value description='DES' enum='2'/>
<value description='PKCS_DES' enum='3'/>
<value description='PGP_DES' enum='4'/>
<value description='PGP_DES_MD5' enum='5'/>
<value description='PEM_DES_MD5' enum='6'/>
</field>
<field name='HeartBtInt' number='108' type='INT'/>
<field name='TestReqID' number='112' type='STRING'/>
<field name='OnBehalfOfCompID' number='115' type='STRING'/>
<field name='OnBehalfOfSubID' number='116' type='STRING'/>
<field name='OrigSendingTime' number='122' type='UTCTIMESTAMP'/>
<field name='GapFillFlag' number='123' type='BOOLEAN'/>
<field name='DeliverToCompID' number='128' type='STRING'/>
<field name='DeliverToSubID' number='129' type='STRING'/>
<field name='ResetSeqNumFlag' number='141' type='BOOLEAN'/>
<field name='SenderLocationID' number='142' type='STRING'/>
<field name='TargetLocationID' number='143' type='STRING'/>
<field name='OnBehalfOfLocationID' number='144' type='STRING'/>
<field name='DeliverToLocationID' number='145' type='STRING'/>
<field name='XmlDataLen' number='212' type='LENGTH'/>
<field name='XmlData' number='213' type='DATA'/>
<field number='347' name='MessageEncoding' type='STRING'>
<value enum='ISO-2022-JP' description='ISO_2022_JP'/>
<value enum='EUC-JP' description='EUC_JP'/>
<value enum='SHIFT_JIS' description='SHIFT_JIS'/>
<value enum='UTF-8' description='UTF_8'/>
</field>
<field name='EncodedTextLen' number='354' type='LENGTH'/>
<field name='EncodedText' number='355' type='DATA'/>
<field name='LastMsgSeqNumProcessed' number='369' type='SEQNUM'/>
<field name='RefTagID' number='371' type='INT'/>
<field name='RefMsgType' number='372' type='STRING'/>
<field name='SessionRejectReason' number='373' type='INT'>
<value description='INVALID_TAG_NUMBER' enum='0'/>
<value description='REQUIRED_TAG_MISSING' enum='1'/>
<value description='SENDINGTIME_ACCURACY_PROBLEM' enum='10'/>
<value description='INVALID_MSGTYPE' enum='11'/>
<value description='XML_VALIDATION_ERROR' enum='12'/>
<value description='TAG_APPEARS_MORE_THAN_ONCE' enum='13'/>
<value description='TAG_SPECIFIED_OUT_OF_REQUIRED_ORDER' enum='14'/>
<value description='REPEATING_GROUP_FIELDS_OUT_OF_ORDER' enum='15'/>
<value description='INCORRECT_NUMINGROUP_COUNT_FOR_REPEATING_GROUP' enum='16'/>
<value description='NON_DATA_VALUE_INCLUDES_FIELD_DELIMITER' enum='17'/>
<value description='TAG_NOT_DEFINED_FOR_THIS_MESSAGE_TYPE' enum='2'/>
<value description='UNDEFINED_TAG' enum='3'/>
<value description='TAG_SPECIFIED_WITHOUT_A_VALUE' enum='4'/>
<value description='VALUE_IS_INCORRECT' enum='5'/>
<value description='INCORRECT_DATA_FORMAT_FOR_VALUE' enum='6'/>
<value description='DECRYPTION_PROBLEM' enum='7'/>
<value description='SIGNATURE_PROBLEM' enum='8'/>
<value description='COMPID_PROBLEM' enum='9'/>
<value description='OTHER' enum='99'/>
</field>
<field name='MaxMessageSize' number='383' type='LENGTH'/>
<field name='NoMsgTypes' number='384' type='NUMINGROUP'/>
<field name='MsgDirection' number='385' type='CHAR'>
<value description='RECEIVE' enum='R'/>
<value description='SEND' enum='S'/>
</field>
<field name='TestMessageIndicator' number='464' type='BOOLEAN'/>
<field name='Username' number='553' type='STRING'/>
<field name='Password' number='554' type='STRING'/>
<field name='NoHops' number='627' type='NUMINGROUP'/>
<field name='HopCompID' number='628' type='STRING'/>
<field name='HopSendingTime' number='629' type='UTCTIMESTAMP'/>
<field name='HopRefID' number='630' type='SEQNUM'/>
<field name='NextExpectedMsgSeqNum' number='789' type='SEQNUM'/>
<field name='ApplVerID' number='1128' type='STRING'>
<value description='FIX27' enum='0'/>
<value description='FIX30' enum='1'/>
<value description='FIX40' enum='2'/>
<value description='FIX41' enum='3'/>
<value description='FIX42' enum='4'/>
<value description='FIX43' enum='5'/>
<value description='FIX44' enum='6'/>
<value description='FIX50' enum='7'/>
<value description='FIX50SP1' enum='8'/>
<value description='FIX50SP2' enum='9'/>
</field>
<field name='CstmApplVerID' number='1129' type='STRING'/>
<field name='RefApplVerID' number='1130' type='STRING'/>
<field name='RefCstmApplVerID' number='1131' type='STRING'/>
<field name='DefaultApplVerID' number='1137' type='STRING'/>
</fields>
</fix>

View File

@ -36,16 +36,11 @@ type Service struct {
AuthorizedServices map[string]AuthorizedService `toml:"AuthorizedServices"`
APIBasePort string
EnableJWTAuth bool // Enable JWT authentication for service-to-service communication
// ServiceAPIKey is the shared secret that authenticates qbymarouter (and any
// other internal service) when posting to /qfixdpl/v1/quotes and
// /qfixdpl/v1/messages. Must match the DPLAPIKey configured on the caller.
ServiceAPIKey string
FIX FIXConfig
FIX FIXConfig
}
type FIXConfig struct {
SettingsFile string // path to fix.cfg file
DataDictionaryFile string // path to FIX data dictionary XML (e.g. spec/FIX50SP2.xml)
SettingsFile string // path to fix.cfg file
}
type ExtAuth struct {

View File

@ -4,14 +4,12 @@ import (
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis"
"github.com/sasha-s/go-deadlock"
uuid "github.com/satori/go.uuid"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version"
@ -295,7 +293,7 @@ func allowed(origin string, config Config) bool {
// GetTrades godoc
// @Summary List active trades
// @Description Returns all active List Trading trades
// @Description Returns only active List Trading trades
// @Tags fix
// @Produce json
// @Success 200 {array} domain.ListTrade
@ -305,6 +303,18 @@ func (cont *Controller) GetTrades(ctx *gin.Context) {
ctx.JSON(http.StatusOK, trades)
}
// GetAllTrades godoc
// @Summary List all trades
// @Description Returns all List Trading trades (active, rejected, completed)
// @Tags fix
// @Produce json
// @Success 200 {array} domain.ListTrade
// @Router /qfixdpl/v1/trades/all [get]
func (cont *Controller) GetAllTrades(ctx *gin.Context) {
trades := cont.tradeProvider.GetAllTrades()
ctx.JSON(http.StatusOK, trades)
}
// GetLogs godoc
// @Summary Get raw FIX logs for a trade
// @Description Returns raw FIX message logs for a given QuoteReqID
@ -318,8 +328,7 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID)
if err != nil {
err = tracerr.Errorf("GetLogs: error fetching logs (quoteReqID=%s): %w", quoteReqID, err)
slog.Error(err.Error())
slog.Error("GetLogs: error fetching logs", "quoteReqID", quoteReqID, "error", err)
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
return
@ -328,98 +337,25 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
ctx.JSON(http.StatusOK, logs)
}
// AllMessages godoc
// @Summary List FIX application messages of the day after the caller's last-seen sequence
// @Description Returns today's FIX application messages (no admin: heartbeats/logon/logout/etc.) with MsgSeqNum greater than the caller's last-seen sequence per direction. "In" is the last MsgSeqNum the caller received on the IN side; "Out" is the same for OUT. Pass 0 to receive everything on that side. Sorted by CreatedAt ascending.
// @Tags fix
// @Accept json
// @Produce json
// @Param body body AllMessagesRequest true "API key and last-seen MsgSeqNum per direction"
// @Success 200 {array} domain.Message
// @Failure 400 {object} HTTPError
// @Failure 401 {object} HTTPError
// @Router /qfixdpl/v1/messages [post]
func (cont *Controller) AllMessages(ctx *gin.Context) {
setHeaders(ctx, cont.config)
var req AllMessagesRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
if !cont.checkServiceAPIKey(req.APIKey) {
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"})
return
}
ctx.JSON(http.StatusOK, cont.tradeProvider.GetAllMessages(req.In, req.Out))
}
// checkServiceAPIKey returns true when the provided key matches the configured
// service-to-service shared secret. Empty configured key is always rejected
// to avoid open authentication when misconfigured.
func (cont *Controller) checkServiceAPIKey(key string) bool {
return cont.config.ServiceAPIKey != "" && key == cont.config.ServiceAPIKey
}
// GetPendingQuoteRequests godoc
// @Summary List pending QuoteRequests
// @Description Returns all QuoteRequests received from TW that have not been quoted yet by the dealer
// GetFullTradeLog godoc
// @Summary Get full trade lifecycle log
// @Description Returns DPL and Post-Trade raw FIX logs for a given trade (by QuoteReqID)
// @Tags fix
// @Produce json
// @Success 200 {array} domain.ListTrade
// @Router /qfixdpl/v1/quote-requests [get]
func (cont *Controller) GetPendingQuoteRequests(ctx *gin.Context) {
pending := cont.tradeProvider.GetPendingQuoteRequests()
ctx.JSON(http.StatusOK, pending)
}
// @Param quoteReqID path string true "QuoteReqID"
// @Success 200 {object} domain.FullTradeLog
// @Router /qfixdpl/v1/trades/{quoteReqID}/full-log [get]
func (cont *Controller) GetFullTradeLog(ctx *gin.Context) {
quoteReqID := ctx.Param("quoteReqID")
// SendQuote godoc
// @Summary Send a Quote for a pending QuoteRequest
// @Description Builds and sends a Quote (35=S) to TW for an existing QuoteRequest at the given price
// @Tags fix
// @Accept json
// @Produce json
// @Param body body SendQuoteRequest true "Quote to send"
// @Success 200 {object} Msg
// @Failure 400 {object} HTTPError
// @Failure 404 {object} HTTPError
// @Failure 409 {object} HTTPError
// @Failure 500 {object} HTTPError
// @Router /qfixdpl/v1/quotes [post]
func (cont *Controller) SendQuote(ctx *gin.Context) {
setHeaders(ctx, cont.config)
var req SendQuoteRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
if !cont.checkServiceAPIKey(req.APIKey) {
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"})
return
}
price, err := decimal.NewFromString(req.Price)
fullLog, err := cont.store.GetFullTradeLog(quoteReqID)
if err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: "invalid price: " + err.Error()})
slog.Error("GetFullTradeLog: error fetching full trade log", "quoteReqID", quoteReqID, "error", err)
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching full trade log"})
return
}
if err := cont.tradeProvider.SendQuote(req.QuoteReqID, price); err != nil {
msg := err.Error()
switch {
case strings.Contains(msg, "not found"):
ctx.JSON(http.StatusNotFound, HTTPError{Error: "quoteReqID not found"})
case strings.Contains(msg, "already sent"):
ctx.JSON(http.StatusConflict, HTTPError{Error: "quote already sent for this quoteReqID"})
default:
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "failed to send quote"})
}
return
}
ctx.JSON(http.StatusOK, Msg{Text: "Quote sent"})
ctx.JSON(http.StatusOK, fullLog)
}

View File

@ -1,5 +1,6 @@
package rest
type HTTPError struct {
Error string
}
@ -17,14 +18,3 @@ type Session struct {
Email string
}
type SendQuoteRequest struct {
APIKey string `json:"APIKey" binding:"required"`
QuoteReqID string `json:"QuoteReqID" binding:"required"`
Price string `json:"Price" binding:"required" example:"99.6"`
}
type AllMessagesRequest struct {
APIKey string
In int
Out int
}

View File

@ -22,13 +22,8 @@ func SetRoutes(api *API) {
qfixdpl.Use(cont.AuthRequired)
qfixdpl.GET("/health", cont.HealthCheck)
qfixdpl.GET("/trades", cont.GetTrades)
qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs)
qfixdpl.GET("/quote-requests", cont.GetPendingQuoteRequests)
// services group: API-key auth via body, no session cookie required.
services := v1.Group("/")
services.POST("/messages", cont.AllMessages)
services.POST("/quotes", cont.SendQuote)
qfixdpl.GET("/trades/all", cont.GetAllTrades)
qfixdpl.GET("/trades/:quoteReqID/full-log", cont.GetFullTradeLog)
backoffice := qfixdpl.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser)

View File

@ -7,7 +7,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version"
@ -20,9 +19,7 @@ import (
// TradeProvider exposes trade data from the FIX manager.
type TradeProvider interface {
GetTrades() []domain.ListTrade
GetPendingQuoteRequests() []domain.ListTrade
SendQuote(quoteReqID string, price decimal.Decimal) error
GetAllMessages(inSeq, outSeq int) []domain.Message
GetAllTrades() []domain.ListTrade
}
const RedisMaxIdle = 3000 // In ms
@ -39,10 +36,6 @@ type Config struct {
AuthorizedServices map[string]app.AuthorizedService `toml:"AuthorizedServices"`
Port string
EnableJWTAuth bool
// ServiceAPIKey authenticates internal services (qbymarouter, etc.) calling
// /qfixdpl/v1/quotes and /qfixdpl/v1/messages. Compared against the APIKey
// field in the request body.
ServiceAPIKey string
}
func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {

View File

@ -1,172 +0,0 @@
package fix
import (
"fmt"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/datadictionary"
)
// BuildFieldMap walks a quickfix.FieldMap and produces an enriched FieldMap
// keyed by FixField (Name + Tag + Type) with values typed per the FIX
// data dictionary. Repeating groups become []FieldMap; nested groups recurse.
func BuildFieldMap(qfMap quickfix.FieldMap, dd *datadictionary.DataDictionary, fieldsByTag map[int]*datadictionary.FieldDef) FieldMap {
out := FieldMap{}
for _, t := range qfMap.Tags() {
tagInt := int(t)
rawValues := qfMap.RawValues(t)
if len(rawValues) == 0 {
continue
}
ff, fd := resolveField(dd, fieldsByTag, tagInt)
if ff.Type == "NUMINGROUP" {
if fd != nil && fd.IsGroup() {
out[ff] = buildGroup(rawValues[1:], dd, fd.Fields)
} else {
// Count tag with no group structure available (dictionary lookup
// failed, or quickfix didn't nest the inner fields). Emit an empty
// group rather than an int so the consumer's type expectation holds.
out[ff] = []FieldMap{}
}
continue
}
out[ff] = parseScalar(rawValues[0].Value(), ff.Type)
}
return out
}
// buildGroup splits a flat slice of TagValues (the inner values of a NUMINGROUP,
// excluding the count) into per-repetition FieldMaps. The first element of
// innerFields is the delimiter that marks the start of each repetition.
func buildGroup(tvs []quickfix.TagValue, dd *datadictionary.DataDictionary, innerFields []*datadictionary.FieldDef) []FieldMap {
if len(innerFields) == 0 || len(tvs) == 0 {
return nil
}
delimiter := innerFields[0].Tag()
fdByTag := make(map[int]*datadictionary.FieldDef, len(innerFields))
for _, f := range innerFields {
fdByTag[f.Tag()] = f
}
var (
repetitions []FieldMap
current FieldMap
)
i := 0
for i < len(tvs) {
tv := tvs[i]
tagInt := int(tv.Tag())
if tagInt == delimiter {
current = FieldMap{}
repetitions = append(repetitions, current)
}
if current == nil {
// Field appeared before the first delimiter; skip.
i++
continue
}
fd, known := fdByTag[tagInt]
if !known {
current[FixField{Name: fmt.Sprintf("tag_%d", tagInt), Tag: tagInt, Type: "STRING"}] = string(tv.Value())
i++
continue
}
if fd.IsGroup() {
nestedAllowed := allowedTags(fd.Fields)
j := i + 1
for j < len(tvs) && nestedAllowed[int(tvs[j].Tag())] {
j++
}
nested := buildGroup(tvs[i+1:j], dd, fd.Fields)
current[FixField{Name: fd.Name(), Tag: tagInt, Type: "NUMINGROUP"}] = nested
i = j
continue
}
current[FixField{Name: fd.Name(), Tag: tagInt, Type: fd.Type}] = parseScalar(tv.Value(), fd.Type)
i++
}
return repetitions
}
// resolveField looks up the FixField metadata (name + type) for a tag.
// Prefers the message-level FieldDef so group structure is preserved;
// falls back to the global FieldTypeByTag, then to a synthetic "tag_<N>" STRING.
func resolveField(dd *datadictionary.DataDictionary, fieldsByTag map[int]*datadictionary.FieldDef, tagInt int) (FixField, *datadictionary.FieldDef) {
if fieldsByTag != nil {
if fd, ok := fieldsByTag[tagInt]; ok {
return FixField{Name: fd.Name(), Tag: tagInt, Type: fd.Type}, fd
}
}
if dd != nil {
if ft, ok := dd.FieldTypeByTag[tagInt]; ok {
return FixField{Name: ft.Name(), Tag: tagInt, Type: ft.Type}, nil
}
}
return FixField{Name: fmt.Sprintf("tag_%d", tagInt), Tag: tagInt, Type: "STRING"}, nil
}
// parseScalar converts raw FIX bytes into the Go type expected by the consumer
// (GetKeyValue): bool for BOOLEAN, int for INT/SEQNUM, time.Time for UTCTIMESTAMP,
// string for everything else.
func parseScalar(raw []byte, fixType string) interface{} {
s := string(raw)
switch fixType {
case "INT", "SEQNUM", "LENGTH":
var v quickfix.FIXInt
if err := v.Read(raw); err == nil {
return int(v)
}
return s
case "BOOLEAN":
var v quickfix.FIXBoolean
if err := v.Read(raw); err == nil {
return bool(v)
}
return s
case "UTCTIMESTAMP":
var v quickfix.FIXUTCTimestamp
if err := v.Read(raw); err == nil {
return v.Time
}
return s
default:
return s
}
}
// allowedTags returns the set of all tags valid inside a group (including
// nested-group counts and their descendants), used to detect where a flat
// nested-group slice ends within its parent.
func allowedTags(fields []*datadictionary.FieldDef) map[int]bool {
out := make(map[int]bool, len(fields))
var visit func(fs []*datadictionary.FieldDef)
visit = func(fs []*datadictionary.FieldDef) {
for _, f := range fs {
out[f.Tag()] = true
if f.IsGroup() {
visit(f.Fields)
}
}
}
visit(fields)
return out
}

View File

@ -1,106 +0,0 @@
package fix
import (
"log/slog"
"time"
"quantex.com/qfixdpl/src/common/tracerr"
)
// FixField identifies a single FIX field by its dictionary metadata.
type FixField struct {
Name string
Tag int
Type string
}
// FieldValue is the typed value for a FIX field.
type FieldValue interface{}
// FieldMap is the enriched representation of a FIX FieldMap (header, body, or trailer).
type FieldMap map[FixField]FieldValue
// GetMap converts a FieldMap into a JSON-friendly map keyed by field name.
func GetMap(fieldMap FieldMap) map[string]interface{} {
result := map[string]interface{}{}
for f, v := range fieldMap {
slog.Info("try to parse fieldMap: %+v, value: %+v", f, v)
k, val := GetKeyValue(f, v)
result[k] = val
}
return result
}
//nolint:funlen,gocyclo,cyclop //it's long but easy to read
func GetKeyValue(f FixField, value FieldValue) (key string, val interface{}) {
key = f.Name
switch f.Type {
case "NUMINGROUP":
var groups []map[string]interface{}
fMapLst, ok := value.([]FieldMap)
if !ok {
err := tracerr.Errorf("could not parse as []FieldMap, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
for _, fieldMap := range fMapLst {
groups = append(groups, GetMap(fieldMap))
}
val = groups
case "BOOLEAN":
b, ok := value.(bool)
if !ok {
err := tracerr.Errorf("could not parse as bool, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = b
case "INT", "LENGTH", "SEQNUM":
i, ok := value.(int)
if !ok {
err := tracerr.Errorf("could not parse as int, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = i
case "UTCTIMESTAMP":
t, ok := value.(time.Time)
if !ok {
err := tracerr.Errorf("could not parse as Time, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = t
case "STRING":
s, ok := value.(string)
if !ok {
err := tracerr.Errorf("could not parse as string, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = s
case "CHAR":
s, ok := value.(string)
if !ok {
err := tracerr.Errorf("could not parse as string, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = s
default:
s, ok := value.(string)
if !ok {
err := tracerr.Errorf("could not parse as string, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = s
}
return key, val
}

View File

@ -3,17 +3,14 @@ package fix
import (
"log/slog"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
uuid "github.com/satori/go.uuid"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/datadictionary"
"quantex.com/qfixdpl/quickfix/gen/enum"
"quantex.com/qfixdpl/quickfix/gen/field"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
@ -23,7 +20,6 @@ import (
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport"
"quantex.com/qfixdpl/quickfix/gen/tag"
filelog "quantex.com/qfixdpl/quickfix/log/file"
"quantex.com/qfixdpl/quickfix/store/file"
"quantex.com/qfixdpl/src/app"
@ -32,10 +28,19 @@ import (
)
type listTrade struct {
QuoteRequest domain.FixMessageJSON
SessionID quickfix.SessionID
Quoted bool
Price decimal.Decimal
QuoteReqID string
TradeID string
ListID string
Symbol string
SecurityIDSrc enum.SecurityIDSource
Currency string
Side enum.Side
OrderQty decimal.Decimal
SettlDate string
Price decimal.Decimal
OwnerTraderID string
SessionID quickfix.SessionID
Status domain.TradeStatus
}
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
@ -46,19 +51,15 @@ type Manager struct {
sessions map[string]quickfix.SessionID
tradesMu sync.RWMutex
trades map[string]*listTrade
messagesMu sync.RWMutex
messages []domain.Message
store domain.PersistenceStore
notify domain.Notifier
cfg app.FIXConfig
dict *datadictionary.DataDictionary
}
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
return &Manager{
sessions: make(map[string]quickfix.SessionID),
trades: make(map[string]*listTrade),
messages: make([]domain.Message, 0),
store: store,
notify: notify,
cfg: cfg,
@ -77,29 +78,14 @@ func (m *Manager) Start() error {
fixApp.onRawMessage = m.handleRawMessage
m.app = fixApp
dict, err := datadictionary.Parse(m.cfg.DataDictionaryFile)
if err != nil {
err = tracerr.Errorf("error parsing FIX data dictionary %q: %w", m.cfg.DataDictionaryFile, err)
slog.Error(err.Error())
return err
}
m.dict = dict
if err := m.loadActiveTrades(); err != nil {
err = tracerr.Errorf("failed to load active trades from DB, starting with empty state: %w", err)
slog.Error(err.Error())
}
if err := m.loadTodayMessages(); err != nil {
err = tracerr.Errorf("failed to load today messages from DB, starting with empty list: %w", err)
slog.Error(err.Error())
if err := m.loadTrades(); err != nil {
slog.Error("failed to load active trades from DB, starting with empty state", "error", err)
}
f, err := os.Open(m.cfg.SettingsFile)
if err != nil {
err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err)
slog.Error(err.Error())
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err)
log.Error().Msg(err.Error())
return err
}
@ -107,8 +93,8 @@ func (m *Manager) Start() error {
settings, err := quickfix.ParseSettings(f)
if err != nil {
err = tracerr.Errorf("error parsing FIX settings: %w", err)
slog.Error(err.Error())
err = tracerr.Errorf("error parsing FIX settings: %s", err)
log.Error().Msg(err.Error())
return err
}
@ -116,16 +102,16 @@ func (m *Manager) Start() error {
storeFactory := file.NewStoreFactory(settings)
logFactory, err := filelog.NewLogFactory(settings)
if err != nil {
err = tracerr.Errorf("error creating file log factory: %w", err)
slog.Error(err.Error())
err = tracerr.Errorf("error creating file log factory: %s", err)
log.Error().Msg(err.Error())
return err
}
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
if err != nil {
err = tracerr.Errorf("error creating FIX initiator: %w", err)
slog.Error(err.Error())
err = tracerr.Errorf("error creating FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
}
@ -133,8 +119,8 @@ func (m *Manager) Start() error {
m.initiator = initiator
if err = m.initiator.Start(); err != nil {
err = tracerr.Errorf("error starting FIX initiator: %w", err)
slog.Error(err.Error())
err = tracerr.Errorf("error starting FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
}
@ -155,6 +141,18 @@ func (m *Manager) onLogon(sessionID quickfix.SessionID) {
m.sessionsMu.Lock()
m.sessions[sessionID.String()] = sessionID
m.sessionsMu.Unlock()
// Assign the new session to all recovered trades that have no session yet.
// This covers the case where the service was restarted mid-trade: loadTrades()
// reconstructs the trade data but cannot recover the SessionID from the DB.
// Since this is a single-session initiator, all active trades belong to this session.
m.tradesMu.Lock()
for _, trade := range m.trades {
if trade.Status == domain.TradeStatusActive && trade.SessionID == (quickfix.SessionID{}) {
trade.SessionID = sessionID
}
}
m.tradesMu.Unlock()
}
func (m *Manager) onLogout(sessionID quickfix.SessionID) {
@ -206,233 +204,64 @@ func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID qu
return quickfix.SendToTarget(bn, sessionID)
}
// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge).
// The Quote (35=S) is sent later via the REST endpoint when the dealer prices it.
// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge) followed by a Quote at price 99.6.
func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) {
parsed := parseQuoteRequest(msg, m.dict)
quoteReqID := parsed.QuoteReqID
if quoteReqID == "" {
m.notify.SendMsg(
domain.MessageChannelError,
"quoteReqID missing in quote request",
domain.MessageStatusWarning,
nil,
)
err := tracerr.Errorf("handleQuoteRequest, missing QuoteReqID, quoteRequest: %+v", parsed)
slog.Error(err.Error())
quoteReqID, err := msg.GetQuoteReqID()
if err != nil {
slog.Error("handleQuoteRequest: missing QuoteReqID", "error", err.Error())
return
}
// Validate LST_ prefix for List Trading flow.
if !strings.HasPrefix(quoteReqID, "LST_") {
m.notify.SendMsg(
domain.MessageChannelError,
"quoteReqID ("+quoteReqID+") missing LST_ prefix",
domain.MessageStatusWarning,
nil,
)
slog.Warn("handleQuoteRequest: QuoteReqID missing LST_ prefix, ignoring", "quoteReqID", quoteReqID)
return
}
bodyKeys := make([]string, 0, len(parsed.Body))
for k := range parsed.Body {
bodyKeys = append(bodyKeys, k)
var (
symbol, currency, ownerTraderID, settlDate, listID, negotiationType string
side enum.Side
secIDSource enum.SecurityIDSource
orderQty decimal.Decimal
)
relatedSyms, relErr := msg.GetNoRelatedSym()
if relErr == nil && relatedSyms.Len() > 0 {
sym := relatedSyms.Get(0)
symbol, _ = sym.GetSecurityID()
secIDSource, _ = sym.GetSecurityIDSource()
currency, _ = sym.GetCurrency()
side, _ = sym.GetSide()
ownerTraderID, _ = sym.GetOwnerTraderID()
orderQty, _ = sym.GetOrderQty()
settlDate, _ = sym.GetSettlDate()
listID, _ = sym.GetListID()
negotiationType, _ = sym.GetNegotiationType()
}
slog.Info("handleQuoteRequest: parsed body keys", "quoteReqID", quoteReqID, "keys", bodyKeys)
relSym := firstGroup(parsed.Body, "NoRelatedSym")
relSymKeys := make([]string, 0, len(relSym))
for k := range relSym {
relSymKeys = append(relSymKeys, k)
if listID == "" {
slog.Warn("handleQuoteRequest: missing ListID", "quoteReqID", quoteReqID)
return
}
slog.Info("handleQuoteRequest: NoRelatedSym keys", "quoteReqID", quoteReqID, "keys", relSymKeys)
ownerTraderID := getString(relSym, "OwnerTraderID")
if negotiationType != "RFQ" {
slog.Warn("handleQuoteRequest: unexpected NegotiationType", "quoteReqID", quoteReqID, "negotiationType", negotiationType)
return
}
// Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
// Step 1: Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleQuoteRequest: failed to send QuoteStatusReport (quoteReqID=%s): %w", quoteReqID, ackErr)
slog.Error(ackErr.Error())
slog.Error("handleQuoteRequest: failed to send QuoteStatusReport", "quoteReqID", quoteReqID, "error", ackErr.Error())
return
}
slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID)
// Store trade state as pending; Quote (35=S) is sent later via REST endpoint.
m.tradesMu.Lock()
m.trades[quoteReqID] = &listTrade{
QuoteRequest: parsed,
SessionID: sessionID,
Quoted: false,
}
m.tradesMu.Unlock()
}
// Step 2: Build and send Quote (35=S) with price.
price := decimal.NewFromFloat(99.6)
// handleQuoteAck handles an incoming QuoteAck (35=CW).
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
if status != enum.QuoteAckStatus_ACCEPTED {
err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text)
slog.Error(err.Error())
return
}
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
}
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespID, _ := msg.GetQuoteRespID()
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
return
}
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
// Always send ACK regardless of whether the trade is in our map.
// TW will keep retrying until it receives an ACK.
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleQuoteResponse: failed to send ACK (quoteReqID=%s, quoteRespID=%s): %w", quoteReqID, quoteRespID, ackErr)
slog.Error(ackErr.Error())
return
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// _TRDSUMM is the final message — clean up the trade.
if isTrdSumm {
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
}
}
// handleExecutionReport handles an incoming ExecutionReport (35=8).
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
clOrdID, _ := msg.GetClOrdID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
listID, _ := msg.GetListID()
slog.Info("handleExecutionReport received",
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
)
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleExecutionReport: failed to send ExecutionAck (execID=%s): %w", execID, ackErr)
slog.Error(ackErr.Error())
} else {
slog.Info("ExecutionAck sent", "execID", execID)
}
switch {
case strings.Contains(execID, "_LISTEND"):
slog.Info("List ended (due-in closed), awaiting trade result from TW",
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW, cleaning up",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}
// GetTrades returns a snapshot of all active trades.
func (m *Manager) GetTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
trades = append(trades, toDomainListTrade(t))
}
return trades
}
// GetPendingQuoteRequests returns trades that have received a QuoteRequest but not yet been quoted by the dealer.
func (m *Manager) GetPendingQuoteRequests() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
pending := make([]domain.ListTrade, 0)
for _, t := range m.trades {
pending = append(pending, toDomainListTrade(t))
}
return pending
}
func toDomainListTrade(t *listTrade) domain.ListTrade {
out := domain.ListTrade{
QuoteRequest: t.QuoteRequest,
}
return out
}
// SendQuote builds and sends a Quote (35=S) for an existing pending QuoteRequest at the given price.
func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
m.tradesMu.Lock()
t, ok := m.trades[quoteReqID]
if !ok {
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: quoteReqID %s not found", quoteReqID)
slog.Error(err.Error())
return err
}
sessionID := t.SessionID
if sessionID == (quickfix.SessionID{}) {
sessionID = m.anyActiveSessionID()
if sessionID == (quickfix.SessionID{}) {
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: no active FIX session for quoteReqID %s", quoteReqID)
slog.Error(err.Error())
return err
}
}
symbol := getString(t.QuoteRequest.Body, "SecurityID")
sIDSource := enum.SecurityIDSource(getString(t.QuoteRequest.Body, "SecurityIDSource"))
currency := getString(t.QuoteRequest.Body, "Currency")
side := enum.Side(getString(t.QuoteRequest.Body, "Side"))
orderQty := getDecimal(t.QuoteRequest.Body, "OrderQty")
settlDate := getString(t.QuoteRequest.Body, "SettlDate")
ownerTraderID := getString(t.QuoteRequest.Body, "OwnerTraderID")
m.tradesMu.Unlock()
if sIDSource != enum.SecurityIDSource_CUSIP {
sIDSource = enum.SecurityIDSource_ISIN_NUMBER
sIDSource := enum.SecurityIDSource_ISIN_NUMBER
if secIDSource == enum.SecurityIDSource_CUSIP {
sIDSource = enum.SecurityIDSource_CUSIP
}
quoteID := quoteReqID
@ -476,76 +305,254 @@ func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
}
if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil {
sendErr = tracerr.Errorf("SendQuote: failed to send quote (quoteReqID=%s): %w", quoteReqID, sendErr)
slog.Error(sendErr.Error())
return sendErr
slog.Error("handleQuoteRequest: failed to send quote", "quoteReqID", quoteReqID, "error", sendErr.Error())
return
}
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol)
// Store trade state for subsequent steps.
m.tradesMu.Lock()
if t, ok := m.trades[quoteReqID]; ok {
t.Price = price
t.Quoted = true
m.trades[quoteReqID] = &listTrade{
QuoteReqID: quoteReqID,
ListID: listID,
Symbol: symbol,
SecurityIDSrc: sIDSource,
Currency: currency,
Side: side,
OrderQty: orderQty,
SettlDate: settlDate,
Price: price,
OwnerTraderID: ownerTraderID,
SessionID: sessionID,
Status: domain.TradeStatusActive,
}
m.tradesMu.Unlock()
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String())
// Persist structured message (outside mutex).
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
return nil
// Persist outgoing QuoteStatusReport.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
}))
// Persist outgoing Quote.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteID": quoteID,
"Symbol": symbol,
"Side": string(side),
"Price": price.String(),
"OrderQty": orderQty.String(),
"Currency": currency,
"SettlDate": settlDate,
}))
}
// firstGroup returns the first repetition of a NUMINGROUP field, or nil.
func firstGroup(body map[string]any, name string) map[string]any {
if body == nil {
return nil
// handleQuoteAck handles an incoming QuoteAck (35=CW).
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
m.persistMessage(quoteReqID, parseQuoteAck(msg))
// QuoteAckStatus only has two defined values in TW DPL:
// 1 = Accepted — quote delivered to client.
// 2 = Rejected — format error, late quote, viewer busy, or race condition.
if status == enum.QuoteAckStatus_REJECTED {
slog.Error("handleQuoteAck: quote rejected by TW", "quoteReqID", quoteReqID, "quoteAckStatus", string(status), "text", text)
m.tradesMu.Lock()
if t, ok := m.trades[quoteReqID]; ok {
t.Status = domain.TradeStatusRejected
}
m.tradesMu.Unlock()
return
}
groups, ok := body[name].([]map[string]any)
if !ok || len(groups) == 0 {
return nil
}
return groups[0]
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
}
// getString reads a string value from a body map, tolerating nil maps and missing keys.
func getString(body map[string]any, name string) string {
if body == nil {
return ""
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespID, _ := msg.GetQuoteRespID()
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
return
}
if v, ok := body[name].(string); ok {
return v
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
// Always send ACK regardless of whether the trade is in our map.
// TW will keep retrying until it receives an ACK.
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
slog.Error("handleQuoteResponse: failed to send ACK", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID, "error", ackErr.Error())
return
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
}))
// _TRDSUMM is the final message — mark trade as completed.
if isTrdSumm {
slog.Info("Trade summary received, marking completed", "quoteReqID", quoteReqID)
m.tradesMu.Lock()
if t, ok := m.trades[quoteReqID]; ok {
t.Status = domain.TradeStatusCompleted
}
m.tradesMu.Unlock()
}
return ""
}
// getDecimal reads a decimal value from a body map. Numeric FIX types come through
// as strings (e.g. "10000"); INT-typed counts may be int. Both are accepted.
func getDecimal(body map[string]any, name string) decimal.Decimal {
if body == nil {
return decimal.Decimal{}
// handleExecutionReport handles an incoming ExecutionReport (35=8).
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
// In TW DPL, ClOrdID (Tag 11) always equals the original QuoteReqID (Tag 131),
// so we use clOrdID directly as the map lookup key.
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
clOrdID, _ := msg.GetClOrdID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
listID, _ := msg.GetListID()
slog.Info("handleExecutionReport received",
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
)
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
slog.Error("handleExecutionReport: failed to send ExecutionAck", "execID", execID, "error", ackErr.Error())
} else {
slog.Info("ExecutionAck sent", "execID", execID)
}
switch v := body[name].(type) {
case string:
d, _ := decimal.NewFromString(v)
return d
case int:
return decimal.NewFromInt(int64(v))
switch {
case strings.Contains(execID, "_LISTEND"):
slog.Info("List ended (due-in closed), awaiting trade result from TW",
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW, marking completed",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
m.tradesMu.Lock()
if t, ok := m.trades[clOrdID]; ok {
t.Status = domain.TradeStatusCompleted
}
m.tradesMu.Unlock()
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
}))
// If the ExecutionReport carries a TradeID, persist it in qfixdpl_logs for cross-service correlation.
tradeID, _ := msg.GetTradeID()
if tradeID != "" {
m.tradesMu.Lock()
if t, ok := m.trades[clOrdID]; ok {
t.TradeID = tradeID
}
m.tradesMu.Unlock()
if err := m.store.UpdateLogTradeID(clOrdID, tradeID); err != nil {
slog.Error("handleExecutionReport: failed to update log trade_id", "clOrdID", clOrdID, "tradeID", tradeID, "error", err)
}
}
return decimal.Decimal{}
}
func (m *Manager) anyActiveSessionID() quickfix.SessionID {
m.sessionsMu.RLock()
defer m.sessionsMu.RUnlock()
for _, s := range m.sessions {
return s
}
return quickfix.SessionID{}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}
// handleRawMessage is the single ingest path for every application-level FIX message
// (admin messages — Logon, Logout, Heartbeat, TestRequest, ResendRequest, SequenceReset —
// go through ToAdmin/FromAdmin and never reach this callback). It persists the raw
// envelope to the logs table, builds a structured Message and saves it to
// the messages table, and appends to the in-memory list.
// GetTrades returns a snapshot of only active trades.
func (m *Manager) GetTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
if t.Status != domain.TradeStatusActive {
continue
}
trades = append(trades, toListTrade(t))
}
return trades
}
// GetAllTrades returns a snapshot of all trades (active, rejected, completed).
func (m *Manager) GetAllTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
trades = append(trades, toListTrade(t))
}
return trades
}
func toListTrade(t *listTrade) domain.ListTrade {
return domain.ListTrade{
QuoteReqID: t.QuoteReqID,
TradeID: t.TradeID,
ListID: t.ListID,
Symbol: t.Symbol,
SecurityIDSrc: string(t.SecurityIDSrc),
Currency: t.Currency,
Side: string(t.Side),
OrderQty: t.OrderQty.String(),
SettlDate: t.SettlDate,
Price: t.Price.String(),
OwnerTraderID: t.OwnerTraderID,
Status: t.Status,
}
}
// handleRawMessage persists raw FIX message strings to the logs table.
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
quoteReqID := extractIdentifier(msg)
@ -553,122 +560,128 @@ func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
QuoteReqID: quoteReqID,
RawMsg: "[" + direction + "] " + msg.String(),
}); err != nil {
err = tracerr.Errorf("failed to persist raw log: %w", err)
slog.Error(err.Error())
slog.Error("failed to persist raw log", "error", err)
}
msgTypeBytes, _ := msg.Header.GetBytes(tag.MsgType)
msgType := string(msgTypeBytes)
senderCompID, msgSeqNum, sendingTime := extractHeaderMeta(msg)
fixJSON := buildFixMessageJSON(direction, msgType, quoteReqID, msg, m.dict)
stored := domain.Message{
ID: uuid.NewV4().String(),
SenderCompID: senderCompID,
MsgSeqNum: msgSeqNum,
SendingTime: sendingTime,
CreatedAt: time.Now(),
JMessage: fixJSON,
}
if err := m.store.SaveMessage(stored); err != nil {
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", msgType, quoteReqID, err)
slog.Error(err.Error())
}
m.messagesMu.Lock()
m.messages = append(m.messages, stored)
m.messagesMu.Unlock()
}
// GetAllMessages returns today's FIX application messages with MsgSeqNum greater than
// the caller's last-seen sequence per direction (inSeq for IN, outSeq for OUT), sorted
// ascending by CreatedAt. Passing 0 for either cursor returns all messages on that side.
func (m *Manager) GetAllMessages(inSeq, outSeq int) []domain.Message {
m.messagesMu.RLock()
log.Info().Msgf("request received, inSeq: %d, outSeq: %d", inSeq, outSeq)
filtered := make([]domain.Message, 0, len(m.messages))
for _, msg := range m.messages {
switch msg.JMessage.Direction {
case "IN":
if msg.MsgSeqNum > inSeq {
filtered = append(filtered, msg)
}
case "OUT":
if msg.MsgSeqNum > outSeq {
filtered = append(filtered, msg)
}
}
// persistMessage saves a structured FIX message to the messages table.
func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) {
if err := m.store.SaveMessage(domain.TradeMessage{
QuoteReqID: quoteReqID,
JMessage: fixJSON,
}); err != nil {
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "quoteReqID", quoteReqID, "error", err)
}
m.messagesMu.RUnlock()
sort.Slice(filtered, func(i, j int) bool { return filtered[i].CreatedAt.Before(filtered[j].CreatedAt) })
log.Info().Msgf("messages sent: %d", len(filtered))
return filtered
}
// loadTodayMessages rebuilds the in-memory message list from today's rows in the DB.
// Must be called before the FIX initiator starts so live ingest doesn't race with replay.
func (m *Manager) loadTodayMessages() error {
// loadTrades reconstructs all trades and their states from today's messages in the database.
func (m *Manager) loadTrades() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
m.messagesMu.Lock()
m.messages = messages
m.messagesMu.Unlock()
slog.Info("today messages loaded", "count", len(messages))
return nil
}
// loadActiveTrades reconstructs active trades from today's messages in the database.
func (m *Manager) loadActiveTrades() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
activeTrades := make(map[string]*listTrade)
trades := make(map[string]*listTrade)
for _, msg := range messages {
quoteReqID := msg.JMessage.QuoteReqID
switch msg.JMessage.MsgType {
case "R": // QuoteRequest -> trade is born
if !strings.HasPrefix(quoteReqID, "LST_") {
if !strings.HasPrefix(msg.QuoteReqID, "LST_") {
continue
}
relSym := firstGroup(msg.JMessage.Body, "NoRelatedSym")
if getString(relSym, "NegotiationType") != "RFQ" {
continue
}
if getString(relSym, "ListID") == "" {
body := msg.JMessage.Body
nt, _ := body["NegotiationType"].(string)
if nt != "RFQ" {
continue
}
activeTrades[quoteReqID] = &listTrade{
QuoteRequest: msg.JMessage,
listID, _ := body["ListID"].(string)
if listID == "" {
continue
}
case "S": // Outgoing Quote — dealer has already quoted this trade
if t, ok := activeTrades[quoteReqID]; ok {
t.Quoted = true
t.Price = getDecimal(msg.JMessage.Body, "Price")
trade := &listTrade{
QuoteReqID: msg.QuoteReqID,
ListID: listID,
Status: domain.TradeStatusActive,
}
if v, ok := body["SecurityID"].(string); ok {
trade.Symbol = v
}
if v, ok := body["Currency"].(string); ok {
trade.Currency = v
}
if v, ok := body["Side"].(string); ok {
trade.Side = enum.Side(v)
}
if v, ok := body["OrderQty"].(string); ok {
trade.OrderQty, _ = decimal.NewFromString(v)
}
if v, ok := body["SettlDate"].(string); ok {
trade.SettlDate = v
}
if v, ok := body["OwnerTraderID"].(string); ok {
trade.OwnerTraderID = v
}
trades[msg.QuoteReqID] = trade
case "CW": // QuoteAck — only status "2" (Rejected) marks the trade as rejected
body := msg.JMessage.Body
quoteAckStatus, _ := body["QuoteAckStatus"].(string)
if quoteAckStatus == string(enum.QuoteAckStatus_REJECTED) {
if t, ok := trades[msg.QuoteReqID]; ok {
t.Status = domain.TradeStatusRejected
}
}
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
body := msg.JMessage.Body
quoteRespID, _ := body["QuoteRespID"].(string)
if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
if t, ok := trades[msg.QuoteReqID]; ok {
t.Status = domain.TradeStatusCompleted
}
}
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
body := msg.JMessage.Body
execID, _ := body["ExecID"].(string)
clOrdID, _ := body["ClOrdID"].(string)
if tid, ok := body["TradeID"].(string); ok && tid != "" {
if t, ok := trades[clOrdID]; ok {
t.TradeID = tid
}
}
if strings.Contains(execID, "_TRDSUMM") {
if t, ok := trades[clOrdID]; ok {
t.Status = domain.TradeStatusCompleted
}
}
}
}
m.trades = activeTrades
slog.Info("recovery completed", "activeTrades", len(activeTrades))
active := 0
for _, t := range trades {
if t.Status == domain.TradeStatusActive {
active++
}
}
m.trades = trades
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
return nil
}

View File

@ -0,0 +1,647 @@
package fix
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/domain"
)
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
// newTestManager builds a Manager with the given store without calling Start().
func newTestManager(store domain.PersistenceStore) *Manager {
notify := &MockNotifier{}
return NewManager(app.FIXConfig{}, store, notify)
}
// makeMsg builds a TradeMessage with the given quoteReqID, msgType, and body.
func makeMsg(quoteReqID, msgType string, body map[string]interface{}) domain.TradeMessage {
return domain.TradeMessage{
ID: "test-id-" + quoteReqID + "-" + msgType,
QuoteReqID: quoteReqID,
JMessage: domain.FixMessageJSON{
MsgType: msgType,
QuoteReqID: quoteReqID,
Body: body,
ReceiveTime: time.Now(),
},
CreatedAt: time.Now(),
}
}
// makeQuoteRequest builds a "R" (QuoteRequest) TradeMessage.
func makeQuoteRequest(quoteReqID, listID, nt string, extras map[string]interface{}) domain.TradeMessage {
body := map[string]interface{}{
"NegotiationType": nt,
"ListID": listID,
}
for k, v := range extras {
body[k] = v
}
return makeMsg(quoteReqID, "R", body)
}
// makeQuoteAck builds a "CW" (QuoteAck) TradeMessage.
func makeQuoteAck(quoteReqID, status string) domain.TradeMessage {
return makeMsg(quoteReqID, "CW", map[string]interface{}{
"QuoteAckStatus": status,
})
}
// makeQuoteResponse builds an "AJ" (QuoteResponse) TradeMessage.
func makeQuoteResponse(quoteReqID, quoteRespID string) domain.TradeMessage {
return makeMsg(quoteReqID, "AJ", map[string]interface{}{
"QuoteRespID": quoteRespID,
})
}
// makeExecutionReport builds an "8" (ExecutionReport) TradeMessage.
// clOrdID maps to body["ClOrdID"]; it is also set as TradeMessage.QuoteReqID
// to mirror how persistMessage works in handleExecutionReport.
func makeExecutionReport(clOrdID, execID string) domain.TradeMessage {
return makeMsg(clOrdID, "8", map[string]interface{}{
"ClOrdID": clOrdID,
"ExecID": execID,
})
}
// makeOutgoing builds an outgoing message (AI, S, BN) for a given quoteReqID.
func makeOutgoing(quoteReqID, msgType string) domain.TradeMessage {
return makeMsg(quoteReqID, msgType, map[string]interface{}{})
}
// ---------------------------------------------------------------------------
// Group 1 — DB vacia
// ---------------------------------------------------------------------------
func TestLoadTrades_EmptyDB(t *testing.T) {
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.NotNil(t, m.trades)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 2 — Interrupcion despues de QuoteRequest
// ---------------------------------------------------------------------------
func TestLoadTrades_SingleR_CreatesOneTrade(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", map[string]interface{}{
"SecurityID": "US1234567890",
"Currency": "USD",
"Side": "1",
"OrderQty": "1000000",
"SettlDate": "20260320",
"OwnerTraderID": "trader1",
})
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
require.Len(t, m.trades, 1)
trade := m.trades["LST_ABC123"]
require.NotNil(t, trade)
assert.Equal(t, "LST_ABC123", trade.QuoteReqID)
assert.Equal(t, "LIST_1", trade.ListID)
assert.Equal(t, "US1234567890", trade.Symbol)
assert.Equal(t, "USD", trade.Currency)
assert.Equal(t, "1", string(trade.Side))
assert.Equal(t, "20260320", trade.SettlDate)
assert.Equal(t, "trader1", trade.OwnerTraderID)
assert.Equal(t, domain.TradeStatusActive, trade.Status)
store.AssertExpectations(t)
}
func TestLoadTrades_R_WithOutgoingMsgs_IgnoresAI_S(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
ai := makeOutgoing("LST_ABC123", "AI")
s := makeOutgoing("LST_ABC123", "S")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, ai, s}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_R_NonLSTPrefix_Ignored(t *testing.T) {
r := makeQuoteRequest("TRD_ABC123", "LIST_1", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
func TestLoadTrades_R_NonRFQ_NegotiationType_Ignored(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "DEALER", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
func TestLoadTrades_R_EmptyListID_Ignored(t *testing.T) {
r := makeQuoteRequest("LST_X", "", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
func TestLoadTrades_R_MissingOptionalFields_TradeCreated(t *testing.T) {
r := makeQuoteRequest("LST_MIN", "LIST_MIN", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
require.Len(t, m.trades, 1)
trade := m.trades["LST_MIN"]
require.NotNil(t, trade)
assert.Equal(t, "LST_MIN", trade.QuoteReqID)
assert.Equal(t, "LIST_MIN", trade.ListID)
assert.Equal(t, "", trade.Symbol)
assert.Equal(t, "", trade.Currency)
assert.Equal(t, "", trade.SettlDate)
assert.Equal(t, "", trade.OwnerTraderID)
assert.Equal(t, domain.TradeStatusActive, trade.Status)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 3 — Interrupcion despues de QuoteAck
// ---------------------------------------------------------------------------
func TestLoadTrades_CW_Accepted_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1") // ACCEPTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_CW_Rejected_TradeMarkedRejected(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "2") // REJECTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusRejected, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
// QuoteAckStatus "0" is RECEIVED_NOT_YET_PROCESSED — not a rejection.
// Only status "2" (Rejected) should mark the trade as rejected.
func TestLoadTrades_CW_ReceivedNotYetProcessed_TradeStaysActive(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "0") // RECEIVED_NOT_YET_PROCESSED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_CW_WithoutPriorR_NoCrash(t *testing.T) {
cw := makeQuoteAck("LST_ORPHAN", "2")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 4 — Interrupcion despues de QuoteResponse
// ---------------------------------------------------------------------------
func TestLoadTrades_AJ_TRDREQ_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDREQ")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_TRDEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_LISTEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_LISTEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
aj := makeQuoteResponse("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 5 — Interrupcion despues de ExecutionReport
// ---------------------------------------------------------------------------
func TestLoadTrades_ExecReport_LISTEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_ABC123", "1")
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_ExecReport_TRDEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_ABC123", "1")
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
execListEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
execTrdEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, execListEnd, execTrdEnd}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_ExecReport_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_ABC123", "1")
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_ExecReport_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
exec := makeExecutionReport("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{exec}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 6 — Multiples trades en paralelo
// ---------------------------------------------------------------------------
func TestLoadTrades_TwoTrades_BothActive(t *testing.T) {
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
cw1 := makeQuoteAck("LST_TRADE1", "1")
cw2 := makeQuoteAck("LST_TRADE2", "1")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, cw1, cw2}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 2)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE1"].Status)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_TwoTrades_OneCompleted_OneActive(t *testing.T) {
// TRADE1: fully completed via flow 8.4 (_TRDSUMM execution report)
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
cw1 := makeQuoteAck("LST_TRADE1", "1")
aj1 := makeQuoteResponse("LST_TRADE1", "LST_TRADE1_TRDREQ")
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
// TRADE2: still active
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
cw2 := makeQuoteAck("LST_TRADE2", "1")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return(
[]domain.TradeMessage{r1, cw1, aj1, exec1, r2, cw2}, nil,
)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 2)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_TwoTrades_BothCompleted(t *testing.T) {
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
cw1 := makeQuoteAck("LST_TRADE1", "1")
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
cw2 := makeQuoteAck("LST_TRADE2", "1")
exec2 := makeExecutionReport("LST_TRADE2", "LST_TRADE2_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return(
[]domain.TradeMessage{r1, cw1, exec1, r2, cw2, exec2}, nil,
)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 2)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE2"].Status)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 7 — SessionID se asigna en onLogon (solo a trades activos)
// ---------------------------------------------------------------------------
func TestLoadTrades_SessionID_IsZeroAfterRecovery(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
require.Len(t, m.trades, 1)
trade := m.trades["LST_X"]
require.NotNil(t, trade)
assert.Equal(t, quickfix.SessionID{}, trade.SessionID)
store.AssertExpectations(t)
}
func TestOnLogon_AssignsSessionToRecoveredActiveTrades(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
sessionID := quickfix.SessionID{
BeginString: "FIXT.1.1",
SenderCompID: "QFIXDPL",
TargetCompID: "TRADEWEB",
}
m.onLogon(sessionID)
trade := m.trades["LST_X"]
require.NotNil(t, trade)
assert.Equal(t, sessionID, trade.SessionID)
}
func TestOnLogon_DoesNotAssignSessionToCompletedTrades(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
exec := makeExecutionReport("LST_X", "LST_X_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, exec}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
sessionID := quickfix.SessionID{
BeginString: "FIXT.1.1",
SenderCompID: "QFIXDPL",
TargetCompID: "TRADEWEB",
}
m.onLogon(sessionID)
trade := m.trades["LST_X"]
require.NotNil(t, trade)
assert.Equal(t, quickfix.SessionID{}, trade.SessionID, "completed trades should not get session assigned")
}
// ---------------------------------------------------------------------------
// Group 8 — GetTrades filtra por active, GetAllTrades devuelve todos
// ---------------------------------------------------------------------------
func TestGetTrades_ReturnsOnlyActive(t *testing.T) {
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
trades := m.GetTrades()
assert.Len(t, trades, 1)
assert.Equal(t, "LST_ACTIVE", trades[0].QuoteReqID)
assert.Equal(t, domain.TradeStatusActive, trades[0].Status)
}
func TestGetAllTrades_ReturnsAll(t *testing.T) {
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
trades := m.GetAllTrades()
assert.Len(t, trades, 3)
statusMap := map[string]domain.TradeStatus{}
for _, tr := range trades {
statusMap[tr.QuoteReqID] = tr.Status
}
assert.Equal(t, domain.TradeStatusActive, statusMap["LST_ACTIVE"])
assert.Equal(t, domain.TradeStatusCompleted, statusMap["LST_DONE"])
assert.Equal(t, domain.TradeStatusRejected, statusMap["LST_REJ"])
}
// ---------------------------------------------------------------------------
// Group 9 — Error en store
// ---------------------------------------------------------------------------
func TestLoadTrades_StoreError_ReturnsError_MapEmpty(t *testing.T) {
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage(nil), errors.New("db connection failed"))
m := newTestManager(store)
err := m.loadTrades()
assert.Error(t, err)
assert.NotNil(t, m.trades)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}

View File

@ -0,0 +1,56 @@
package fix
import (
"sync"
"github.com/stretchr/testify/mock"
"quantex.com/qfixdpl/src/domain"
)
// MockPersistenceStore is a testify mock implementing domain.PersistenceStore.
type MockPersistenceStore struct {
mock.Mock
}
func (m *MockPersistenceStore) SaveMessage(msg domain.TradeMessage) error {
args := m.Called(msg)
return args.Error(0)
}
func (m *MockPersistenceStore) SaveLog(entry domain.LogEntry) error {
args := m.Called(entry)
return args.Error(0)
}
func (m *MockPersistenceStore) GetTodayMessages() ([]domain.TradeMessage, error) {
args := m.Called()
msgs, _ := args.Get(0).([]domain.TradeMessage)
return msgs, args.Error(1)
}
func (m *MockPersistenceStore) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
args := m.Called(quoteReqID)
logs, _ := args.Get(0).(domain.Logs)
return logs, args.Error(1)
}
func (m *MockPersistenceStore) UpdateLogTradeID(quoteReqID, tradeID string) error {
args := m.Called(quoteReqID, tradeID)
return args.Error(0)
}
func (m *MockPersistenceStore) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
args := m.Called(quoteReqID)
log, _ := args.Get(0).(domain.FullTradeLog)
return log, args.Error(1)
}
// MockNotifier is a testify mock implementing domain.Notifier.
type MockNotifier struct {
mock.Mock
}
func (m *MockNotifier) SendMsg(chat domain.MessageChannel, text string, status domain.MessageStatus, wg *sync.WaitGroup) {
m.Called(chat, text, status, wg)
}

View File

@ -4,47 +4,199 @@ import (
"time"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/datadictionary"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/tag"
"quantex.com/qfixdpl/src/domain"
)
// buildFixMessageJSON walks the full FIX message (header + body + trailer)
// using the data dictionary and returns a fully populated FixMessageJSON.
func buildFixMessageJSON(direction, msgType, quoteReqID string, msg *quickfix.Message, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
var (
headerFields map[int]*datadictionary.FieldDef
trailerFields map[int]*datadictionary.FieldDef
bodyFields map[int]*datadictionary.FieldDef
)
func extractHeader(msg *quickfix.Message) map[string]interface{} {
header := make(map[string]interface{})
if dd != nil {
if dd.Header != nil {
headerFields = dd.Header.Fields
if v, err := msg.Header.GetBytes(tag.BeginString); err == nil {
header["BeginString"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.MsgType); err == nil {
header["MsgType"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil {
header["SenderCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil {
header["TargetCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.MsgSeqNum); err == nil {
header["MsgSeqNum"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil {
header["SendingTime"] = string(v)
}
return header
}
func parseQuoteRequest(msg quoterequest.QuoteRequest) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if relSyms, err := msg.GetNoRelatedSym(); err == nil && relSyms.Len() > 0 {
sym := relSyms.Get(0)
if v, e := sym.GetSecurityID(); e == nil {
body["SecurityID"] = v
}
if dd.Trailer != nil {
trailerFields = dd.Trailer.Fields
if v, e := sym.GetSecurityIDSource(); e == nil {
body["SecurityIDSource"] = string(v)
}
if md, ok := dd.Messages[msgType]; ok {
bodyFields = md.Fields
if v, e := sym.GetCurrency(); e == nil {
body["Currency"] = v
}
if v, e := sym.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := sym.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := sym.GetSettlDate(); e == nil {
body["SettlDate"] = v
}
if v, e := sym.GetListID(); e == nil {
body["ListID"] = v
}
if v, e := sym.GetOwnerTraderID(); e == nil {
body["OwnerTraderID"] = v
}
if v, e := sym.GetNegotiationType(); e == nil {
body["NegotiationType"] = v
}
}
return domain.FixMessageJSON{
Direction: direction,
MsgType: msgType,
Direction: "IN",
MsgType: "R",
QuoteReqID: quoteReqID,
Header: GetMap(BuildFieldMap(msg.Header.FieldMap, dd, headerFields)),
Body: GetMap(BuildFieldMap(msg.Body.FieldMap, dd, bodyFields)),
Trailer: GetMap(BuildFieldMap(msg.Trailer.FieldMap, dd, trailerFields)),
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseQuoteRequest(msg quoterequest.QuoteRequest, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
func parseQuoteAck(msg quoteack.QuoteAck) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
return buildFixMessageJSON("IN", "R", quoteReqID, msg.Message, dd)
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if v, e := msg.GetQuoteID(); e == nil {
body["QuoteID"] = v
}
if v, e := msg.GetQuoteAckStatus(); e == nil {
body["QuoteAckStatus"] = string(v)
}
if v, e := msg.GetText(); e == nil {
body["Text"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "CW",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseQuoteResponse(msg quoteresponse.QuoteResponse) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if v, e := msg.GetQuoteRespID(); e == nil {
body["QuoteRespID"] = v
}
if v, e := msg.GetQuoteRespType(); e == nil {
body["QuoteRespType"] = string(v)
}
if v, e := msg.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := msg.GetPrice(); e == nil {
body["Price"] = v.String()
}
if v, e := msg.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := msg.GetClOrdID(); e == nil {
body["ClOrdID"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AJ",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessageJSON {
clOrdID, _ := msg.GetClOrdID()
body := map[string]interface{}{"ClOrdID": clOrdID}
if v, e := msg.GetExecID(); e == nil {
body["ExecID"] = v
}
if v, e := msg.GetOrderID(); e == nil {
body["OrderID"] = v
}
if v, e := msg.GetExecType(); e == nil {
body["ExecType"] = string(v)
}
if v, e := msg.GetOrdStatus(); e == nil {
body["OrdStatus"] = string(v)
}
if v, e := msg.GetListID(); e == nil {
body["ListID"] = v
}
if v, e := msg.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := msg.GetSymbol(); e == nil {
body["Symbol"] = v
}
if v, e := msg.GetSecurityID(); e == nil {
body["SecurityID"] = v
}
if v, e := msg.GetCurrency(); e == nil {
body["Currency"] = v
}
if v, e := msg.GetPrice(); e == nil {
body["Price"] = v.String()
}
if v, e := msg.GetLastPx(); e == nil {
body["LastPx"] = v.String()
}
if v, e := msg.GetLastQty(); e == nil {
body["LastQty"] = v.String()
}
if v, e := msg.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := msg.GetSettlDate(); e == nil {
body["SettlDate"] = v
}
if v, e := msg.GetTradeID(); e == nil {
body["TradeID"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "8",
QuoteReqID: clOrdID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
// extractIdentifier extracts the trade identifier from a parsed FIX message.
@ -69,17 +221,12 @@ func extractIdentifier(msg *quickfix.Message) string {
return ""
}
// extractHeaderMeta reads SenderCompID (49), MsgSeqNum (34) and SendingTime (52)
// from a quickfix.Message header. Returns zero values when a field is absent.
func extractHeaderMeta(msg *quickfix.Message) (senderCompID string, msgSeqNum int, sendingTime time.Time) {
if s, err := msg.Header.GetString(tag.SenderCompID); err == nil {
senderCompID = s
func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]interface{}) domain.FixMessageJSON {
return domain.FixMessageJSON{
Direction: "OUT",
MsgType: msgType,
QuoteReqID: quoteReqID,
Body: body,
ReceiveTime: time.Now(),
}
if n, err := msg.Header.GetInt(tag.MsgSeqNum); err == nil {
msgSeqNum = n
}
if t, err := msg.Header.GetTime(tag.SendingTime); err == nil {
sendingTime = t
}
return
}

View File

@ -1,104 +0,0 @@
Step 1
Direction: ← QuoteRequest (R)
Comentary: Tradeweb sends trade information to dealer.
FIX Message: 8=FIXT.1.1 9=869 35=R 34=2 49=TRADEWEB 52=20160401-16:53:19.992 56=TW1_CORI_TEST_12345_DLRDPL 131=LST_20160401_TW1_CORI_NY302485.18_1 146=1 55=AMXLMM 2.375 09/08/16 48=02364WBC8 22=1 460=12 167=CORP 762=REGCORIPRC 541=20160908 225=20110908 470=MX 223=2.375 106=AMERICA MOVIL SAB DE CV 54=2 38=1000000 64=20160406 15=USD 6110=Comms 60=20160401-16:53:19 662=99.98046875 22570=0.76 663=1 699=912828UR9 761=1 423=6 44=-999999 5023=0.001000 66=NY302485.18 6847=1 75=20160401 464=Y 20086=1 20074=Y 20075=Y 20077=LatAm Comms 20078=pddealer 20079=60 20081=60 20090=60 20072=60 20098=60 5745=1 20073=RFQ 20076=Y 20156=N 20130=2000000000 20138=JPM,MER 20175=20120308 2115=0 22630=0 20265=LatAm 453=3 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=1 523=Tradeweb0001 803=4002 448=DTCC 447=C 452=4 5114=2 5113=1 20169=A2 5113=0 20169=A- 10=121
Step 2
Direction: QuoteStatusReport (AI) →
Comentary: Dealer acknowledges trade.
FIX Message: 8=FIXT.1.1 9=198 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=2 52=20160401-16:53:19.988 131=LST_20160401_TW1_CORI_NY302485.18_1 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:53:19.988 297=0 10=106
Step 3
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=231 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=3 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 6153=emackdlr 44=.99 423=6 60=20160401-16:53:19.990 10=247
Step 4
Direction: ← QuoteAck (CW)
Comentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=3 49=TRADEWEB 52=20160401-16:53:25.102 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:25 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
Step 5
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=4 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.97 423=6 60=20160401-16:53:19.990 10=114
Step 6
Direction: ← QuoteAck (CW)
Comentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=4 49=TRADEWEB 52=20160401-16:53:30.055 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:30 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
Step 7
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=6 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.98 423=6 60=20160401-16:53:19.990 10=117
Step 8
Direction: ← QuoteAck (CW)
Commentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=5 49=TRADEWEB 52=20160401-16:53:35.071 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:35 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=163
Step 9
Direction: ← ExecutionReport (8)
Commentary: Tradeweb notifies dealer that the list trading (Due In time) has ended. ExecType=A
FIX Message: 8=FIXT.1.1 9=289 35=8 34=7 49=TRADEWEB 52=20160401-16:54:19.463 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.463 37=LST_20160401_TW1_CORI_NY302485.18_1 39=D 55=[N/A] 60=20160401-16:54:19 75=20160401 150=I 151=0 20086=1 10=073
Step 10
Direction: ExecutionAck (BN) →
Commentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=8 52=20160401-16:54:19.448 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.448 55=[N/A] 60=20160401-16:54:19.448 10=119
Step 11
Direction: ← QuoteResponse (AJ)
Commentary: Customer lifts after good-for time expires. Tradeweb informs dealer customer lift.
FIX Message: 8=FIXT.1.1 9=431 35=AJ 34=10 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 22=1 38=1000000 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:32 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 423=6 662=99.98046875 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 694=1 2115=1 20074=Y 20075=N 20076=N 20079=60 20082=60 20156=N 22570=0.760289080299 22630=0 10=183
Step 12
Direction: QuoteStatusReport (AI) →
Commentary: Dealer acknowledges customers trade acceptance.
FIX Message: 8=FIXT.1.1 9=206 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=12 52=20160401-16:55:32.849 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 131=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:32.849 297=0 10=189
Step 13
Direction: ← ExecutionReport (8)
Commentary: Tradeweb notifies the dealer that list has ended
FIX Message: incoming8=FIXT.1.1 9=290 35=8 34=11 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.855 37=LST_20160401_TW1_CORI_NY302485.18_1 39=A 55=[N/A] 60=20160401-16:55:32 75=20160401 150=A 151=0 20086=1 10=095
Step 14
Direction: ExecutionAck (BN) →
Commentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=13 52=20160401-16:55:32.852 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.852 55=[N/A] 60=20160401-16:55:32.852 10=155
Step 15
Direction: ExecutionReport (8) →
Commentary: Dealer accepts re-quote is not allowed. ExecType=F, OrdStatus=Filled
FIX Message: 8=FIXT.1.1 9=283 35=8 34=14 49=TW1_CORI_TEST_12345_DLRDPL 52=20160401-16:55:32.850 56=TRADEWEB 6=0 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 54=2 55=[N/A] 150=F 151=0 6153=LST_20160401_TW1_CORI_NY302485.18_1 22631=POSTTRADE_STRING 22632=POSTTRADE_STRING 10=155
Step 16
Direction: ← ExecutionAck (BN)
Commentary: Tradeweb acknowledges ExecutionReport
FIX Message: 8=FIXT.1.1 9=233 35=BN 34=12 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:37 1036=1 10=051
Step 17
Direction: ← ExecutionReport (8)
Commentary: Tradeweb informs Dealer of outcome of a list trade item. OrdStatus=Filled, ExecType=Trade
FIX Message: 8=FIXT.1.1 9=337 35=8 34=13 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 44=0.98 54=2 55=[N/A] 60=20160401-16:55:37 75=20160401 150=F 151=0 423=6 662=99.98046875 22570=0.760289080299 10=067
Step 18
Direction: ExecutionAck (BN) →
Comentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=15 52=20160401-16:55:37.910 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 55=[N/A] 60=20160401-16:55:37.910 10=078
Step 19
Direction:
Comentary: Autospot at Tradeweb Treasury composite.
FIX Message:
Step 20
Direction: ← ExecutionReport (8)
Comentary: Tradeweb sends ExecutionReport message to confirm final outcome of list trade item including applicable cover quote, spot, settlement money information, etc. OrdStatus=Filled, ExecType=Order Status TradeSummary = Y
FIX Message: 8=FIXT.1.1 9=733 35=8 34=14 49=TRADEWEB 52=20160401-16:55:40.292 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 22=1 37=LST_20160401_TW1_CORI_NY302485.18_1 38=1000000 39=2 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:40 64=20160406 75=20160401 150=F 151=0 167=CORP 236=1.74 423=6 453=2 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=2 523=Tradeweb0001 803=4002 523=YES 803=4003 526=TRD_20160401_TW1_CORI_23 662=99.98046875 1003=20160401.TW1.CORI.23 6153=emackdlr 6731=20160401.TW1.CORI.23 20115=100.265 20250=225000 22570=0.76 22630=0 22631=POSTTRADE_STRING 22634=160401.DLRX.TRSY.120 22636=Y 10=239
Step 21
Direction: ExecutionAck (BN) →
Comentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=16 52=20160401-16:55:40.287 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 55=[N/A] 60=20160401-16:55:40.287 10=182

View File

@ -1,17 +1,20 @@
CREATE TABLE IF NOT EXISTS qfixdpl_messages (
id UUID PRIMARY KEY,
sender_comp_id TEXT NOT NULL,
msg_seq_num BIGINT NOT NULL,
j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quote_req_id TEXT NOT NULL,
trade_id TEXT,
j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
CREATE INDEX IF NOT EXISTS idx_messages_session ON qfixdpl_messages(sender_comp_id, msg_seq_num);
CREATE INDEX IF NOT EXISTS idx_messages_trade_id ON qfixdpl_messages(trade_id);
CREATE TABLE IF NOT EXISTS qfixdpl_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quote_req_id TEXT NOT NULL UNIQUE,
trade_id TEXT,
raw_msg TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dpl_logs_trade_id ON qfixdpl_logs(trade_id);

View File

@ -2,7 +2,6 @@ package store
import (
"encoding/json"
"strconv"
"strings"
"time"
@ -10,20 +9,15 @@ import (
"quantex.com/qfixdpl/src/domain"
)
func (p *Store) SaveMessage(msg domain.Message) error {
func (p *Store) SaveMessage(msg domain.TradeMessage) error {
jsonBytes, err := json.Marshal(msg.JMessage)
if err != nil {
return tracerr.Errorf("error marshaling j_message: %w", err)
}
_, err = p.db.Exec(
`INSERT INTO qfixdpl_messages (id, sender_comp_id, msg_seq_num, j_message, created_at)
VALUES ($1, $2, $3, $4, $5)`,
msg.ID,
msg.SenderCompID,
strconv.Itoa(msg.MsgSeqNum),
string(jsonBytes),
msg.CreatedAt.UTC().Format(time.RFC3339Nano),
"INSERT INTO qfixdpl_messages (quote_req_id, trade_id, j_message) VALUES ($1, NULLIF($2, ''), $3)",
msg.QuoteReqID, msg.TradeID, string(jsonBytes),
)
if err != nil {
return tracerr.Errorf("error inserting message: %w", err)
@ -47,29 +41,26 @@ func (p *Store) SaveLog(entry domain.LogEntry) error {
return nil
}
func (p *Store) GetTodayMessages() ([]domain.Message, error) {
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
rows, err := p.db.Query(
`SELECT id, sender_comp_id, msg_seq_num, j_message, created_at
FROM qfixdpl_messages
WHERE created_at >= current_date
ORDER BY created_at ASC`,
"SELECT id, quote_req_id, trade_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
)
if err != nil {
return nil, tracerr.Errorf("error querying today messages: %w", err)
}
defer rows.Close()
var messages []domain.Message
var messages []domain.TradeMessage
for rows.Next() {
var (
id, senderCompID string
msgSeqNum int
jMessageRaw []byte
createdAt time.Time
id, quoteReqID string
tradeID *string
jMessageRaw []byte
createdAt time.Time
)
if err := rows.Scan(&id, &senderCompID, &msgSeqNum, &jMessageRaw, &createdAt); err != nil {
if err := rows.Scan(&id, &quoteReqID, &tradeID, &jMessageRaw, &createdAt); err != nil {
return nil, tracerr.Errorf("error scanning message row: %w", err)
}
@ -78,23 +69,18 @@ func (p *Store) GetTodayMessages() ([]domain.Message, error) {
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
}
sendingTime, _ := jMessage.Header["SendingTime"].(time.Time)
if sendingTime.IsZero() {
if s, ok := jMessage.Header["SendingTime"].(string); ok {
if t, parseErr := time.Parse(time.RFC3339Nano, s); parseErr == nil {
sendingTime = t
}
}
msg := domain.TradeMessage{
ID: id,
QuoteReqID: quoteReqID,
JMessage: jMessage,
CreatedAt: createdAt,
}
messages = append(messages, domain.Message{
ID: id,
SenderCompID: senderCompID,
MsgSeqNum: msgSeqNum,
SendingTime: sendingTime,
CreatedAt: createdAt,
JMessage: jMessage,
})
if tradeID != nil {
msg.TradeID = *tradeID
}
messages = append(messages, msg)
}
if err := rows.Err(); err != nil {
@ -104,6 +90,69 @@ func (p *Store) GetTodayMessages() ([]domain.Message, error) {
return messages, nil
}
func (p *Store) UpdateLogTradeID(quoteReqID, tradeID string) error {
_, err := p.db.Exec(
"UPDATE qfixdpl_logs SET trade_id = $1, updated_at = NOW() WHERE quote_req_id = $2",
tradeID, quoteReqID,
)
if err != nil {
return tracerr.Errorf("error updating log trade_id: %w", err)
}
return nil
}
func (p *Store) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
rows, err := p.db.Query(
"SELECT trade_id, raw_msg FROM qfixdpl_logs WHERE quote_req_id = $1", quoteReqID,
)
if err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error querying dpl log: %w", err)
}
defer rows.Close()
if !rows.Next() {
return domain.FullTradeLog{}, tracerr.Errorf("no log found for quoteReqID: %s", quoteReqID)
}
var (
tradeID *string
rawMsg string
)
if err := rows.Scan(&tradeID, &rawMsg); err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error scanning dpl log row: %w", err)
}
result := domain.FullTradeLog{
QuoteReqID: quoteReqID,
DPLEntries: strings.Split(rawMsg, "\n"),
}
if tradeID != nil && *tradeID != "" {
result.TradeID = *tradeID
ptRows, err := p.db.Query(
"SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", *tradeID,
)
if err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error querying pt logs: %w", err)
}
defer ptRows.Close()
if ptRows.Next() {
var ptRawMsg string
if err := ptRows.Scan(&ptRawMsg); err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error scanning pt log row: %w", err)
}
result.PTEntries = strings.Split(ptRawMsg, "\n")
}
}
return result, nil
}
func (p *Store) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
selectStmt := "SELECT raw_msg FROM qfixdpl_logs WHERE quote_req_id = '" + quoteReqID + "';"

View File

@ -51,7 +51,6 @@ func Runner(cfg app.Config) error {
External: cfg.External,
AuthorizedServices: cfg.AuthorizedServices,
EnableJWTAuth: cfg.EnableJWTAuth,
ServiceAPIKey: cfg.ServiceAPIKey,
}
api := rest.New(userData, appStore, fixManager, apiConfig, notify)

View File

@ -3,31 +3,48 @@ package domain
import "time"
// TradeStatus represents the lifecycle state of a List Trading trade.
type TradeStatus string
const (
TradeStatusActive TradeStatus = "active"
TradeStatusRejected TradeStatus = "rejected"
TradeStatusCompleted TradeStatus = "completed"
)
// ListTrade es la representacion exportada de un trade de List Trading.
type ListTrade struct {
QuoteRequest FixMessageJSON `json:"quote_request"`
QuoteReqID string `json:"quote_req_id"`
TradeID string `json:"trade_id,omitempty"`
ListID string `json:"list_id"`
Symbol string `json:"symbol"`
SecurityIDSrc string `json:"security_id_src"`
Currency string `json:"currency"`
Side string `json:"side"`
OrderQty string `json:"order_qty"`
SettlDate string `json:"settl_date"`
Price string `json:"price"`
OwnerTraderID string `json:"owner_trader_id"`
Status TradeStatus `json:"status"`
}
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
type FixMessageJSON struct {
Direction string `json:"direction"`
MsgType string `json:"msg_type"`
QuoteReqID string `json:"quote_req_id"`
Header map[string]any `json:"header"`
Body map[string]any `json:"body"`
Trailer map[string]any `json:"trailer"`
ReceiveTime time.Time `json:"receive_time"`
Direction string `json:"direction"`
MsgType string `json:"msg_type"`
QuoteReqID string `json:"quote_req_id"`
Header map[string]interface{} `json:"header"`
Body map[string]interface{} `json:"body"`
ReceiveTime time.Time `json:"receive_time"`
}
// Message es una fila de qfixdpl_messages, con la metadata del header FIX hoisted
// para que los consumidores puedan ordenar/filtrar sin parsear el JSON.
type Message struct {
ID string `json:"id"`
SenderCompID string `json:"sender_comp_id"`
MsgSeqNum int `json:"msg_seq_num"`
SendingTime time.Time `json:"sending_time"`
CreatedAt time.Time `json:"created_at"`
JMessage FixMessageJSON `json:"j_message"`
// TradeMessage es una fila de qfixdpl_messages.
type TradeMessage struct {
ID string `json:"id"`
QuoteReqID string `json:"quote_req_id"`
TradeID string `json:"trade_id,omitempty"`
JMessage FixMessageJSON `json:"j_message"`
CreatedAt time.Time `json:"created_at"`
}
// LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs.
@ -41,10 +58,20 @@ type Logs struct {
Entries []string `json:"entries"`
}
// FullTradeLog es la respuesta del endpoint GET /trades/:id/full-log.
type FullTradeLog struct {
QuoteReqID string `json:"quote_req_id"`
TradeID string `json:"trade_id,omitempty"`
DPLEntries []string `json:"dpl_entries"`
PTEntries []string `json:"pt_entries,omitempty"`
}
// PersistenceStore define la interfaz de persistencia.
type PersistenceStore interface {
SaveMessage(msg Message) error
SaveMessage(msg TradeMessage) error
SaveLog(entry LogEntry) error
GetTodayMessages() ([]Message, error)
GetTodayMessages() ([]TradeMessage, error)
GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
UpdateLogTradeID(quoteReqID, tradeID string) error
GetFullTradeLog(id string) (FullTradeLog, error)
}