11 Commits

Author SHA1 Message Date
7cc4a96a03 fix 2026-05-19 15:38:43 -03:00
1676909cbf fixes 2026-05-19 15:31:39 -03:00
d06433e0f5 add log 2026-05-19 15:15:54 -03:00
0f3ac0dd8d sending the messages by seqnum 2026-05-15 17:03:54 -03:00
4270284362 Add endpoint for all messages 2026-05-12 13:27:13 -03:00
6e46fde5d2 fixes 2026-05-11 14:27:27 -03:00
99c7f8ccb0 fixes 2026-05-11 12:34:55 -03:00
45fad9de6c generic json for fix messages 2026-05-07 17:37:17 -03:00
36b841fc66 pascal case 2026-05-06 14:11:50 -03:00
68238d309a adding endpoints 2026-05-06 11:56:12 -03:00
15a60bac92 handling errors 2026-05-05 15:34:01 -03:00
21 changed files with 1337 additions and 532 deletions

View File

@ -62,7 +62,10 @@ linux-build: check-env swag # Build a linux version for prod environment. Set e=
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e) env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
deploy: # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=dpl deploy: # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=dpl
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/ make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/dpl/
open-demo:
make deploy e=open-demo s=nonprodFix
fmt: download-versions # Apply the Go formatter to the code fmt: download-versions # Apply the Go formatter to the code
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt); cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);

5
docs/EndTrade.log Normal file
View File

@ -0,0 +1,5 @@
8=FIXT.1.1|9=861|35=R|34=413|49=TRADEWEB|52=20260508-16:51:56.674|56=BYMA_CORI_TEST_12345_DLRDPL|131=LST_20260508_BYMA_CORI_NY1600105.1_1|146=1|55=ARGENT 1.500 07/09/35|48=040114HT0|22=1|460=12|167=CORP|762=REGCORIINV|541=20350709|225=20200904|470=AR|223=1.5|106=ARGENTINE REPUBLIC|54=2|38=10000|64=20260511|15=USD|6110=Sov|60=20260508-16:51:56|423=1|44=-999999|236=-999999|5023=0.000010|66=NY1600105.1|6847=1|75=20260508|464=Y|20086=1|20074=Y|20075=Y|20077=[N/A]|20078=[N/A]|20079=300|20081=300|20090=0|20072=300|20098=0|5745=1|20073=RFQ|20076=Y|20156=Y|20130=20501720000|20138=HSB|561=1|562=1|20175=20210709|20223=American|22630=0|20265=LatAm|20227=Global|23068=TRWB|453=3|448=bymacust|447=C|452=3|802=3|523=BYMA CUST|803=2|523=NY|803=25|523=AR|803=4000|448=BYMA Customer|447=C|452=1|448=DTCC|447=C|452=4|454=1|455=US040114HT09|456=4|5114=2|5113=1|20169=Ca|5113=0|20169=CCC+|10=078
8=FIXT.1.1|9=203|35=AI|34=418|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:51:56.695|56=TRADEWEB|55=[N/A]|60=20260508-16:51:56.695|117=LST_20260508_BYMA_CORI_NY1600105.1_1|131=LST_20260508_BYMA_CORI_NY1600105.1_1|297=0|10=165
8=FIXT.1.1|9=292|35=AJ|34=414|49=TRADEWEB|52=20260508-16:52:05.343|56=BYMA_CORI_TEST_12345_DLRDPL|11=LST_20260508_BYMA_CORI_NY1600105.1_1|55=[N/A]|60=20260508-16:52:05|117=[N/A]|131=LST_20260508_BYMA_CORI_NY1600105.1_1|693=LST_20260508_BYMA_CORI_NY1600105.1_1_LISTEND|694=7|20086=1|20103=N/A|20110=NO|22636=Y|10=077
8=FIXT.1.1|9=211|35=AI|34=419|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:52:05.367|56=TRADEWEB|55=[N/A]|60=20260508-16:52:05.367|131=LST_20260508_BYMA_CORI_NY1600105.1_1|297=0|693=LST_20260508_BYMA_CORI_NY1600105.1_1_LISTEND|10=014

14
docs/Execution.log Normal file
View File

@ -0,0 +1,14 @@
8=FIXT.1.1|9=862|35=R|34=393|49=TRADEWEB|52=20260508-16:44:06.978|56=BYMA_CORI_TEST_12345_DLRDPL|131=LST_20260508_BYMA_CORI_NY1600095.1_1|146=1|55=ARGENT 1.500 07/09/35|48=040114HT0|22=1|460=12|167=CORP|762=REGCORIINV|541=20350709|225=20200904|470=AR|223=1.5|106=ARGENTINE REPUBLIC|54=2|38=850000|64=20260511|15=USD|6110=Sov|60=20260508-16:44:06|423=1|44=-999999|236=-999999|5023=0.000010|66=NY1600095.1|6847=1|75=20260508|464=Y|20086=1|20074=Y|20075=Y|20077=[N/A]|20078=[N/A]|20079=300|20081=300|20090=0|20072=300|20098=0|5745=1|20073=RFQ|20076=Y|20156=Y|20130=20501720000|20138=HSB|561=1|562=1|20175=20210709|20223=American|22630=0|20265=LatAm|20227=Global|23068=TRWB|453=3|448=bymacust|447=C|452=3|802=3|523=BYMA CUST|803=2|523=NY|803=25|523=AR|803=4000|448=BYMA Customer|447=C|452=1|448=DTCC|447=C|452=4|454=1|455=US040114HT09|456=4|5114=2|5113=1|20169=Ca|5113=0|20169=CCC+|10=163
8=FIXT.1.1|9=203|35=AI|34=398|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:44:07.018|56=TRADEWEB|55=[N/A]|60=20260508-16:44:07.018|117=LST_20260508_BYMA_CORI_NY1600095.1_1|131=LST_20260508_BYMA_CORI_NY1600095.1_1|297=0|10=162
8=FIXT.1.1|9=293|35=S|34=399|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:44:34.096|56=TRADEWEB|15=USD|22=1|38=850000|44=91.00000000|48=040114HT0|54=2|55=[N/A]|60=20260508-16:44:34.096|64=20260511|117=LST_20260508_BYMA_CORI_NY1600095.1_1|131=LST_20260508_BYMA_CORI_NY1600095.1_1|132=91.00000000|423=1|537=211|10=127
8=FIXT.1.1|9=191|35=CW|34=394|49=TRADEWEB|52=20260508-16:44:34.137|56=BYMA_CORI_TEST_12345_DLRDPL|60=20260508-16:44:34|117=LST_20260508_BYMA_CORI_NY1600095.1_1|131=LST_20260508_BYMA_CORI_NY1600095.1_1|1865=1|10=004
8=FIXT.1.1|9=80|35=0|34=400|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:04.125|56=TRADEWEB|10=169
8=FIXT.1.1|9=80|35=0|34=395|49=TRADEWEB|52=20260508-16:45:04.142|56=BYMA_CORI_TEST_12345_DLRDPL|10=181
8=FIXT.1.1|9=391|35=AJ|34=396|49=TRADEWEB|52=20260508-16:45:10.511|56=BYMA_CORI_TEST_12345_DLRDPL|11=LST_20260508_BYMA_CORI_NY1600095.1_1|22=1|38=850000|44=91|48=040114HT0|54=2|55=[N/A]|60=20260508-16:45:10|117=LST_20260508_BYMA_CORI_NY1600095.1_1|131=LST_20260508_BYMA_CORI_NY1600095.1_1|423=1|693=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDREQ|694=1|20074=N|20075=N|20076=N|20079=236|20082=60|20156=N|22630=0|10=127
8=FIXT.1.1|9=295|35=8|34=397|49=TRADEWEB|52=20260508-16:45:10.514|56=BYMA_CORI_TEST_12345_DLRDPL|6=0|11=LST_20260508_BYMA_CORI_NY1600095.1_1|14=0|17=LST_20260508_BYMA_CORI_NY1600095.1_1_LISTEND-124510.514|37=LST_20260508_BYMA_CORI_NY1600095.1_1|39=A|55=[N/A]|60=20260508-16:45:10|75=20260508|150=A|151=0|20086=1|10=213
8=FIXT.1.1|9=210|35=AI|34=401|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:10.539|56=TRADEWEB|55=[N/A]|60=20260508-16:45:10.539|131=LST_20260508_BYMA_CORI_NY1600095.1_1|297=0|693=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDREQ|10=209
8=FIXT.1.1|9=261|35=BN|34=402|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:10.601|56=TRADEWEB|11=LST_20260508_BYMA_CORI_NY1600095.1_1|17=LST_20260508_BYMA_CORI_NY1600095.1_1_LISTEND-124510.514|37=LST_20260508_BYMA_CORI_NY1600095.1_1|55=[N/A]|60=20260508-16:45:10.601|1036=1|10=232
8=FIXT.1.1|9=303|35=8|34=398|49=TRADEWEB|52=20260508-16:45:10.666|56=BYMA_CORI_TEST_12345_DLRDPL|6=0|11=LST_20260508_BYMA_CORI_NY1600095.1_1|14=0|17=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDEND-124510.660|37=LST_20260508_BYMA_CORI_NY1600095.1_1|39=2|44=91|54=2|55=[N/A]|60=20260508-16:45:10|75=20260508|150=F|151=0|423=1|10=252
8=FIXT.1.1|9=260|35=BN|34=403|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:10.690|56=TRADEWEB|11=LST_20260508_BYMA_CORI_NY1600095.1_1|17=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDEND-124510.660|37=LST_20260508_BYMA_CORI_NY1600095.1_1|55=[N/A]|60=20260508-16:45:10.690|1036=1|10=168
8=FIXT.1.1|9=729|35=8|34=399|49=TRADEWEB|52=20260508-16:45:10.738|56=BYMA_CORI_TEST_12345_DLRDPL|6=0|11=LST_20260508_BYMA_CORI_NY1600095.1_1|14=0|17=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDSUMM-124510.736|22=1|37=LST_20260508_BYMA_CORI_NY1600095.1_1|38=850000|39=2|44=91|48=040114HT0|54=2|55=[N/A]|60=20260508-16:45:10.535|64=20260511|75=20260508|150=F|151=0|167=CORP|236=2.61081622|423=1|453=2|448=bymacust|447=C|452=3|802=3|523=BYMA CUST|803=2|523=NY|803=25|523=AR|803=4000|448=BYMA Customer|447=C|452=1|802=1|523=YES|803=4003|454=1|455=US040114HT09|456=4|526=TRD_20260508_BYMA_CORI_213|1003=20260508.BYMA.CORI.213|1907=1|1903=20260508BYMACORI213|1906=5|6731=20260508.BYMA.CORI.213|20115=91|22630=0|22636=Y|23068=TRWB|23096=20260508BYMACORI213|10=051
8=FIXT.1.1|9=261|35=BN|34=404|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:45:10.781|56=TRADEWEB|11=LST_20260508_BYMA_CORI_NY1600095.1_1|17=LST_20260508_BYMA_CORI_NY1600095.1_1_TRDSUMM-124510.736|37=LST_20260508_BYMA_CORI_NY1600095.1_1|55=[N/A]|60=20260508-16:45:10.781|1036=1|10=027

10
docs/Timeout.log Normal file
View File

@ -0,0 +1,10 @@
8=FIXT.1.1|9=858|35=R|34=416|49=TRADEWEB|52=20260508-16:52:50.205|56=BYMA_CORI_TEST_12345_DLRDPL|131=LST_20260508_BYMA_CORI_NY1600105.2_1|146=1|55=ARGENT 1.500 07/09/35|48=040114HT0|22=1|460=12|167=CORP|762=REGCORIINV|541=20350709|225=20200904|470=AR|223=1.5|106=ARGENTINE REPUBLIC|54=2|38=15000|64=20260511|15=USD|6110=Sov|60=20260508-16:52:50|423=1|44=-999999|236=-999999|5023=0.000010|66=NY1600105.2|6847=1|75=20260508|464=Y|20086=1|20074=Y|20075=Y|20077=[N/A]|20078=[N/A]|20079=60|20081=60|20090=0|20072=60|20098=0|5745=1|20073=RFQ|20076=Y|20156=Y|20130=20501720000|20138=HSB|561=1|562=1|20175=20210709|20223=American|22630=0|20265=LatAm|20227=Global|23068=TRWB|453=3|448=bymacust|447=C|452=3|802=3|523=BYMA CUST|803=2|523=NY|803=25|523=AR|803=4000|448=BYMA Customer|447=C|452=1|448=DTCC|447=C|452=4|454=1|455=US040114HT09|456=4|5114=2|5113=1|20169=Ca|5113=0|20169=CCC+|10=195
8=FIXT.1.1|9=203|35=AI|34=421|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:52:50.226|56=TRADEWEB|55=[N/A]|60=20260508-16:52:50.226|117=LST_20260508_BYMA_CORI_NY1600105.2_1|131=LST_20260508_BYMA_CORI_NY1600105.2_1|297=0|10=131
8=FIXT.1.1|9=80|35=0|34=422|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:53:20.289|56=TRADEWEB|10=181
8=FIXT.1.1|9=80|35=0|34=417|49=TRADEWEB|52=20260508-16:53:20.291|56=BYMA_CORI_TEST_12345_DLRDPL|10=178
8=FIXT.1.1|9=80|35=0|34=423|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:53:50.295|56=TRADEWEB|10=182
8=FIXT.1.1|9=80|35=0|34=418|49=TRADEWEB|52=20260508-16:53:50.305|56=BYMA_CORI_TEST_12345_DLRDPL|10=178
8=FIXT.1.1|9=265|35=AJ|34=419|49=TRADEWEB|52=20260508-16:53:50.698|56=BYMA_CORI_TEST_12345_DLRDPL|11=LST_20260508_BYMA_CORI_NY1600105.2_1|55=[N/A]|60=20260508-16:53:50|117=[N/A]|131=LST_20260508_BYMA_CORI_NY1600105.2_1|693=LST_20260508_BYMA_CORI_NY1600105.2_1_LISTEND|694=8|20086=1|10=010
8=FIXT.1.1|9=211|35=AI|34=424|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:53:50.721|56=TRADEWEB|55=[N/A]|60=20260508-16:53:50.721|131=LST_20260508_BYMA_CORI_NY1600105.2_1|297=0|693=LST_20260508_BYMA_CORI_NY1600105.2_1_LISTEND|10=002
8=FIXT.1.1|9=292|35=AJ|34=420|49=TRADEWEB|52=20260508-16:53:53.741|56=BYMA_CORI_TEST_12345_DLRDPL|11=LST_20260508_BYMA_CORI_NY1600105.2_1|55=[N/A]|60=20260508-16:53:53|117=[N/A]|131=LST_20260508_BYMA_CORI_NY1600105.2_1|693=LST_20260508_BYMA_CORI_NY1600105.2_1_LISTEND|694=8|20086=1|20103=N/A|20110=NO|22636=Y|10=088
8=FIXT.1.1|9=211|35=AI|34=425|49=BYMA_CORI_TEST_12345_DLRDPL|52=20260508-16:53:53.763|56=TRADEWEB|55=[N/A]|60=20260508-16:53:53.763|131=LST_20260508_BYMA_CORI_NY1600105.2_1|297=0|693=LST_20260508_BYMA_CORI_NY1600105.2_1_LISTEND|10=021

View File

@ -5,6 +5,10 @@ SenderCompID=QUANTEX
ResetOnLogon=Y ResetOnLogon=Y
FileStorePath=fix_store FileStorePath=fix_store
FileLogPath=fix_logs FileLogPath=fix_logs
TransportDataDictionary=spec/FIXT11.xml
AppDataDictionary=spec/FIX50SP2.xml
AllowUnknownMessageFields=Y
RejectInvalidMessage=N
[SESSION] [SESSION]
BeginString=FIXT.1.1 BeginString=FIXT.1.1

View File

@ -89,6 +89,24 @@ func (m FieldMap) Get(parser Field) MessageRejectError {
return m.GetField(parser.Tag(), parser) return m.GetField(parser.Tag(), parser)
} }
// RawValues returns a copy of the underlying TagValue slice for a tag.
// For repeating groups, the first entry is the count and the remaining
// entries are the flattened inner-field values across all repetitions.
func (m FieldMap) RawValues(tag Tag) []TagValue {
m.rwLock.RLock()
defer m.rwLock.RUnlock()
f, ok := m.tagLookup[tag]
if !ok {
return nil
}
out := make([]TagValue, len(f))
copy(out, f)
return out
}
// Has returns true if the Tag is present in this FieldMap. // Has returns true if the Tag is present in this FieldMap.
func (m FieldMap) Has(tag Tag) bool { func (m FieldMap) Has(tag Tag) bool {
m.rwLock.RLock() m.rwLock.RLock()

View File

@ -86,6 +86,16 @@ func (tv TagValue) String() string {
return string(tv.bytes) return string(tv.bytes)
} }
// Tag returns the FIX tag for this TagValue.
func (tv TagValue) Tag() Tag {
return tv.tag
}
// Value returns the raw FIX value bytes for this TagValue.
func (tv TagValue) Value() []byte {
return tv.value
}
func bytesTotal(bytes []byte) (total int) { func bytesTotal(bytes []byte) (total int) {
for _, b := range bytes { for _, b := range bytes {
total += int(b) total += int(b)

313
spec/FIXT11.xml Normal file
View File

@ -0,0 +1,313 @@
<?xml version='1.0' encoding='UTF-8'?>
<fix type='FIXT' major='1' minor='1' servicepack='0'>
<header>
<field name='BeginString' required='Y'/>
<field name='BodyLength' required='Y'/>
<field name='MsgType' required='Y'/>
<field name='SenderCompID' required='Y'/>
<field name='TargetCompID' required='Y'/>
<field name='OnBehalfOfCompID' required='N'/>
<field name='DeliverToCompID' required='N'/>
<field name='SecureDataLen' required='N'/>
<field name='SecureData' required='N'/>
<field name='MsgSeqNum' required='Y'/>
<field name='SenderSubID' required='N'/>
<field name='SenderLocationID' required='N'/>
<field name='TargetSubID' required='N'/>
<field name='TargetLocationID' required='N'/>
<field name='OnBehalfOfSubID' required='N'/>
<field name='OnBehalfOfLocationID' required='N'/>
<field name='DeliverToSubID' required='N'/>
<field name='DeliverToLocationID' required='N'/>
<field name='PossDupFlag' required='N'/>
<field name='PossResend' required='N'/>
<field name='SendingTime' required='Y'/>
<field name='OrigSendingTime' required='N'/>
<field name='XmlDataLen' required='N'/>
<field name='XmlData' required='N'/>
<field name='MessageEncoding' required='N'/>
<field name='LastMsgSeqNumProcessed' required='N'/>
<component name='HopGrp' required='N'/>
<field name='ApplVerID' required='N'/>
<field name='CstmApplVerID' required='N'/>
</header>
<trailer>
<field name='SignatureLength' required='N'/>
<field name='Signature' required='N'/>
<field name='CheckSum' required='Y'/>
</trailer>
<messages>
<message msgcat='admin' msgtype='0' name='Heartbeat'>
<field name='TestReqID' required='N'/>
</message>
<message msgcat='admin' msgtype='1' name='TestRequest'>
<field name='TestReqID' required='Y'/>
</message>
<message msgcat='admin' msgtype='2' name='ResendRequest'>
<field name='BeginSeqNo' required='Y'/>
<field name='EndSeqNo' required='Y'/>
</message>
<message msgcat='admin' msgtype='3' name='Reject'>
<field name='RefSeqNum' required='Y'/>
<field name='RefTagID' required='N'/>
<field name='RefMsgType' required='N'/>
<field name='SessionRejectReason' required='N'/>
<field name='Text' required='N'/>
<field name='EncodedTextLen' required='N'/>
<field name='EncodedText' required='N'/>
</message>
<message msgcat='admin' msgtype='4' name='SequenceReset'>
<field name='GapFillFlag' required='N'/>
<field name='NewSeqNo' required='Y'/>
</message>
<message msgcat='admin' msgtype='5' name='Logout'>
<field name='Text' required='N'/>
<field name='EncodedTextLen' required='N'/>
<field name='EncodedText' required='N'/>
</message>
<message msgcat='admin' msgtype='A' name='Logon'>
<field name='EncryptMethod' required='Y'/>
<field name='HeartBtInt' required='Y'/>
<field name='RawDataLength' required='N'/>
<field name='RawData' required='N'/>
<field name='ResetSeqNumFlag' required='N'/>
<field name='NextExpectedMsgSeqNum' required='N'/>
<field name='MaxMessageSize' required='N'/>
<field name='TestMessageIndicator' required='N'/>
<field name='Username' required='N'/>
<field name='Password' required='N'/>
<field name='DefaultApplVerID' required='Y'/>
<component name='MsgTypeGrp' required='N'/>
</message>
</messages>
<components>
<component name='HopGrp'>
<group name='NoHops' required='N'>
<field name='HopCompID' required='N'/>
<field name='HopSendingTime' required='N'/>
<field name='HopRefID' required='N'/>
</group>
</component>
<component name='MsgTypeGrp'>
<group name='NoMsgTypes' required='N'>
<field name='RefMsgType' required='N'/>
<field name='MsgDirection' required='N'/>
<field name='RefApplVerID' required='N'/>
<field name='RefCstmApplVerID' required='N'/>
</group>
</component>
</components>
<fields>
<field name='BeginSeqNo' number='7' type='SEQNUM'/>
<field name='BeginString' number='8' type='STRING'/>
<field name='BodyLength' number='9' type='LENGTH'/>
<field name='CheckSum' number='10' type='STRING'/>
<field name='EndSeqNo' number='16' type='SEQNUM'/>
<field name='MsgSeqNum' number='34' type='SEQNUM'/>
<field number='35' name='MsgType' type='STRING'>
<value enum='0' description='HEARTBEAT'/>
<value enum='1' description='TEST_REQUEST'/>
<value enum='2' description='RESEND_REQUEST'/>
<value enum='3' description='REJECT'/>
<value enum='4' description='SEQUENCE_RESET'/>
<value enum='5' description='LOGOUT'/>
<value enum='6' description='INDICATION_OF_INTEREST'/>
<value enum='7' description='ADVERTISEMENT'/>
<value enum='8' description='EXECUTION_REPORT'/>
<value enum='9' description='ORDER_CANCEL_REJECT'/>
<value enum='A' description='LOGON'/>
<value enum='B' description='NEWS'/>
<value enum='C' description='EMAIL'/>
<value enum='D' description='ORDER_SINGLE'/>
<value enum='E' description='ORDER_LIST'/>
<value enum='F' description='ORDER_CANCEL_REQUEST'/>
<value enum='G' description='ORDER_CANCEL_REPLACE_REQUEST'/>
<value enum='H' description='ORDER_STATUS_REQUEST'/>
<value enum='J' description='ALLOCATION_INSTRUCTION'/>
<value enum='K' description='LIST_CANCEL_REQUEST'/>
<value enum='L' description='LIST_EXECUTE'/>
<value enum='M' description='LIST_STATUS_REQUEST'/>
<value enum='N' description='LIST_STATUS'/>
<value enum='P' description='ALLOCATION_INSTRUCTION_ACK'/>
<value enum='Q' description='DONT_KNOW_TRADE'/>
<value enum='R' description='QUOTE_REQUEST'/>
<value enum='S' description='QUOTE'/>
<value enum='T' description='SETTLEMENT_INSTRUCTIONS'/>
<value enum='V' description='MARKET_DATA_REQUEST'/>
<value enum='W' description='MARKET_DATA_SNAPSHOT_FULL_REFRESH'/>
<value enum='X' description='MARKET_DATA_INCREMENTAL_REFRESH'/>
<value enum='Y' description='MARKET_DATA_REQUEST_REJECT'/>
<value enum='Z' description='QUOTE_CANCEL'/>
<value enum='a' description='QUOTE_STATUS_REQUEST'/>
<value enum='b' description='MASS_QUOTE_ACKNOWLEDGEMENT'/>
<value enum='c' description='SECURITY_DEFINITION_REQUEST'/>
<value enum='d' description='SECURITY_DEFINITION'/>
<value enum='e' description='SECURITY_STATUS_REQUEST'/>
<value enum='f' description='SECURITY_STATUS'/>
<value enum='g' description='TRADING_SESSION_STATUS_REQUEST'/>
<value enum='h' description='TRADING_SESSION_STATUS'/>
<value enum='i' description='MASS_QUOTE'/>
<value enum='j' description='BUSINESS_MESSAGE_REJECT'/>
<value enum='k' description='BID_REQUEST'/>
<value enum='l' description='BID_RESPONSE'/>
<value enum='m' description='LIST_STRIKE_PRICE'/>
<value enum='n' description='XML_MESSAGE'/>
<value enum='o' description='REGISTRATION_INSTRUCTIONS'/>
<value enum='p' description='REGISTRATION_INSTRUCTIONS_RESPONSE'/>
<value enum='q' description='ORDER_MASS_CANCEL_REQUEST'/>
<value enum='r' description='ORDER_MASS_CANCEL_REPORT'/>
<value enum='s' description='NEW_ORDER_CROSS'/>
<value enum='t' description='CROSS_ORDER_CANCEL_REPLACE_REQUEST'/>
<value enum='u' description='CROSS_ORDER_CANCEL_REQUEST'/>
<value enum='v' description='SECURITY_TYPE_REQUEST'/>
<value enum='w' description='SECURITY_TYPES'/>
<value enum='x' description='SECURITY_LIST_REQUEST'/>
<value enum='y' description='SECURITY_LIST'/>
<value enum='z' description='DERIVATIVE_SECURITY_LIST_REQUEST'/>
<value enum='AA' description='DERIVATIVE_SECURITY_LIST'/>
<value enum='AB' description='NEW_ORDER_MULTILEG'/>
<value enum='AC' description='MULTILEG_ORDER_CANCEL_REPLACE'/>
<value enum='AD' description='TRADE_CAPTURE_REPORT_REQUEST'/>
<value enum='AE' description='TRADE_CAPTURE_REPORT'/>
<value enum='AF' description='ORDER_MASS_STATUS_REQUEST'/>
<value enum='AG' description='QUOTE_REQUEST_REJECT'/>
<value enum='AH' description='RFQ_REQUEST'/>
<value enum='AI' description='QUOTE_STATUS_REPORT'/>
<value enum='AJ' description='QUOTE_RESPONSE'/>
<value enum='AK' description='CONFIRMATION'/>
<value enum='AL' description='POSITION_MAINTENANCE_REQUEST'/>
<value enum='AM' description='POSITION_MAINTENANCE_REPORT'/>
<value enum='AN' description='REQUEST_FOR_POSITIONS'/>
<value enum='AO' description='REQUEST_FOR_POSITIONS_ACK'/>
<value enum='AP' description='POSITION_REPORT'/>
<value enum='AQ' description='TRADE_CAPTURE_REPORT_REQUEST_ACK'/>
<value enum='AR' description='TRADE_CAPTURE_REPORT_ACK'/>
<value enum='AS' description='ALLOCATION_REPORT'/>
<value enum='AT' description='ALLOCATION_REPORT_ACK'/>
<value enum='AU' description='CONFIRMATION_ACK'/>
<value enum='AV' description='SETTLEMENT_INSTRUCTION_REQUEST'/>
<value enum='AW' description='ASSIGNMENT_REPORT'/>
<value enum='AX' description='COLLATERAL_REQUEST'/>
<value enum='AY' description='COLLATERAL_ASSIGNMENT'/>
<value enum='AZ' description='COLLATERAL_RESPONSE'/>
<value enum='BA' description='COLLATERAL_REPORT'/>
<value enum='BB' description='COLLATERAL_INQUIRY'/>
<value enum='BC' description='NETWORK_STATUS_REQUEST'/>
<value enum='BD' description='NETWORK_STATUS_RESPONSE'/>
<value enum='BE' description='USER_REQUEST'/>
<value enum='BF' description='USER_RESPONSE'/>
<value enum='BG' description='COLLATERAL_INQUIRY_ACK'/>
<value enum='BH' description='CONFIRMATION_REQUEST'/>
<value enum='BI' description='TRADING_SESSION_LIST_REQUEST'/>
<value enum='BJ' description='TRADING_SESSION_LIST'/>
<value enum='BK' description='SECURITY_LIST_UPDATE_REPORT'/>
<value enum='BL' description='ADJUSTED_POSITION_REPORT'/>
<value enum='BM' description='ALLOCATION_INSTRUCTION_ALERT'/>
<value enum='BN' description='EXECUTION_ACKNOWLEDGEMENT'/>
<value enum='BO' description='CONTRARY_INTENTION_REPORT'/>
<value enum='BP' description='SECURITY_DEFINITION_UPDATE_REPORT'/>
</field>
<field name='NewSeqNo' number='36' type='SEQNUM'/>
<field name='PossDupFlag' number='43' type='BOOLEAN'/>
<field name='RefSeqNum' number='45' type='SEQNUM'/>
<field name='SenderCompID' number='49' type='STRING'/>
<field name='SenderSubID' number='50' type='STRING'/>
<field name='SendingTime' number='52' type='UTCTIMESTAMP'/>
<field name='TargetCompID' number='56' type='STRING'/>
<field name='TargetSubID' number='57' type='STRING'/>
<field name='Text' number='58' type='STRING'/>
<field name='Signature' number='89' type='DATA'/>
<field name='SecureDataLen' number='90' type='LENGTH'/>
<field name='SecureData' number='91' type='DATA'/>
<field name='SignatureLength' number='93' type='LENGTH'/>
<field name='RawDataLength' number='95' type='LENGTH'/>
<field name='RawData' number='96' type='DATA'/>
<field name='PossResend' number='97' type='BOOLEAN'/>
<field name='EncryptMethod' number='98' type='INT'>
<value description='NONE_OTHER' enum='0'/>
<value description='PKCS' enum='1'/>
<value description='DES' enum='2'/>
<value description='PKCS_DES' enum='3'/>
<value description='PGP_DES' enum='4'/>
<value description='PGP_DES_MD5' enum='5'/>
<value description='PEM_DES_MD5' enum='6'/>
</field>
<field name='HeartBtInt' number='108' type='INT'/>
<field name='TestReqID' number='112' type='STRING'/>
<field name='OnBehalfOfCompID' number='115' type='STRING'/>
<field name='OnBehalfOfSubID' number='116' type='STRING'/>
<field name='OrigSendingTime' number='122' type='UTCTIMESTAMP'/>
<field name='GapFillFlag' number='123' type='BOOLEAN'/>
<field name='DeliverToCompID' number='128' type='STRING'/>
<field name='DeliverToSubID' number='129' type='STRING'/>
<field name='ResetSeqNumFlag' number='141' type='BOOLEAN'/>
<field name='SenderLocationID' number='142' type='STRING'/>
<field name='TargetLocationID' number='143' type='STRING'/>
<field name='OnBehalfOfLocationID' number='144' type='STRING'/>
<field name='DeliverToLocationID' number='145' type='STRING'/>
<field name='XmlDataLen' number='212' type='LENGTH'/>
<field name='XmlData' number='213' type='DATA'/>
<field number='347' name='MessageEncoding' type='STRING'>
<value enum='ISO-2022-JP' description='ISO_2022_JP'/>
<value enum='EUC-JP' description='EUC_JP'/>
<value enum='SHIFT_JIS' description='SHIFT_JIS'/>
<value enum='UTF-8' description='UTF_8'/>
</field>
<field name='EncodedTextLen' number='354' type='LENGTH'/>
<field name='EncodedText' number='355' type='DATA'/>
<field name='LastMsgSeqNumProcessed' number='369' type='SEQNUM'/>
<field name='RefTagID' number='371' type='INT'/>
<field name='RefMsgType' number='372' type='STRING'/>
<field name='SessionRejectReason' number='373' type='INT'>
<value description='INVALID_TAG_NUMBER' enum='0'/>
<value description='REQUIRED_TAG_MISSING' enum='1'/>
<value description='SENDINGTIME_ACCURACY_PROBLEM' enum='10'/>
<value description='INVALID_MSGTYPE' enum='11'/>
<value description='XML_VALIDATION_ERROR' enum='12'/>
<value description='TAG_APPEARS_MORE_THAN_ONCE' enum='13'/>
<value description='TAG_SPECIFIED_OUT_OF_REQUIRED_ORDER' enum='14'/>
<value description='REPEATING_GROUP_FIELDS_OUT_OF_ORDER' enum='15'/>
<value description='INCORRECT_NUMINGROUP_COUNT_FOR_REPEATING_GROUP' enum='16'/>
<value description='NON_DATA_VALUE_INCLUDES_FIELD_DELIMITER' enum='17'/>
<value description='TAG_NOT_DEFINED_FOR_THIS_MESSAGE_TYPE' enum='2'/>
<value description='UNDEFINED_TAG' enum='3'/>
<value description='TAG_SPECIFIED_WITHOUT_A_VALUE' enum='4'/>
<value description='VALUE_IS_INCORRECT' enum='5'/>
<value description='INCORRECT_DATA_FORMAT_FOR_VALUE' enum='6'/>
<value description='DECRYPTION_PROBLEM' enum='7'/>
<value description='SIGNATURE_PROBLEM' enum='8'/>
<value description='COMPID_PROBLEM' enum='9'/>
<value description='OTHER' enum='99'/>
</field>
<field name='MaxMessageSize' number='383' type='LENGTH'/>
<field name='NoMsgTypes' number='384' type='NUMINGROUP'/>
<field name='MsgDirection' number='385' type='CHAR'>
<value description='RECEIVE' enum='R'/>
<value description='SEND' enum='S'/>
</field>
<field name='TestMessageIndicator' number='464' type='BOOLEAN'/>
<field name='Username' number='553' type='STRING'/>
<field name='Password' number='554' type='STRING'/>
<field name='NoHops' number='627' type='NUMINGROUP'/>
<field name='HopCompID' number='628' type='STRING'/>
<field name='HopSendingTime' number='629' type='UTCTIMESTAMP'/>
<field name='HopRefID' number='630' type='SEQNUM'/>
<field name='NextExpectedMsgSeqNum' number='789' type='SEQNUM'/>
<field name='ApplVerID' number='1128' type='STRING'>
<value description='FIX27' enum='0'/>
<value description='FIX30' enum='1'/>
<value description='FIX40' enum='2'/>
<value description='FIX41' enum='3'/>
<value description='FIX42' enum='4'/>
<value description='FIX43' enum='5'/>
<value description='FIX44' enum='6'/>
<value description='FIX50' enum='7'/>
<value description='FIX50SP1' enum='8'/>
<value description='FIX50SP2' enum='9'/>
</field>
<field name='CstmApplVerID' number='1129' type='STRING'/>
<field name='RefApplVerID' number='1130' type='STRING'/>
<field name='RefCstmApplVerID' number='1131' type='STRING'/>
<field name='DefaultApplVerID' number='1137' type='STRING'/>
</fields>
</fix>

View File

@ -41,6 +41,7 @@ type Service struct {
type FIXConfig struct { type FIXConfig struct {
SettingsFile string // path to fix.cfg file SettingsFile string // path to fix.cfg file
DataDictionaryFile string // path to FIX data dictionary XML (e.g. spec/FIX50SP2.xml)
} }
type ExtAuth struct { type ExtAuth struct {

View File

@ -4,12 +4,14 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strings"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/sasha-s/go-deadlock" "github.com/sasha-s/go-deadlock"
uuid "github.com/satori/go.uuid" uuid "github.com/satori/go.uuid"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/src/app" "quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version" "quantex.com/qfixdpl/src/app/version"
@ -316,7 +318,8 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID) logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID)
if err != nil { if err != nil {
slog.Error("GetLogs: error fetching logs", "quoteReqID", quoteReqID, "error", err) err = tracerr.Errorf("GetLogs: error fetching logs (quoteReqID=%s): %w", quoteReqID, err)
slog.Error(err.Error())
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"}) ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
return return
@ -325,3 +328,84 @@ func (cont *Controller) GetLogs(ctx *gin.Context) {
ctx.JSON(http.StatusOK, logs) ctx.JSON(http.StatusOK, logs)
} }
// AllMessages godoc
// @Summary List FIX application messages of the day after the caller's last-seen sequence
// @Description Returns today's FIX application messages (no admin: heartbeats/logon/logout/etc.) with MsgSeqNum greater than the caller's last-seen sequence per direction. "In" is the last MsgSeqNum the caller received on the IN side; "Out" is the same for OUT. Pass 0 to receive everything on that side. Sorted by CreatedAt ascending.
// @Tags fix
// @Accept json
// @Produce json
// @Param body body AllMessagesRequest true "API key and last-seen MsgSeqNum per direction"
// @Success 200 {array} domain.Message
// @Failure 400 {object} HTTPError
// @Failure 401 {object} HTTPError
// @Router /qfixdpl/v1/messages [post]
func (cont *Controller) AllMessages(ctx *gin.Context) {
setHeaders(ctx, cont.config)
var req AllMessagesRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
if req.APIKey != "1234" {
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Not allowed to perform this request"})
return
}
ctx.JSON(http.StatusOK, cont.tradeProvider.GetAllMessages(req.In, req.Out))
}
// GetPendingQuoteRequests godoc
// @Summary List pending QuoteRequests
// @Description Returns all QuoteRequests received from TW that have not been quoted yet by the dealer
// @Tags fix
// @Produce json
// @Success 200 {array} domain.ListTrade
// @Router /qfixdpl/v1/quote-requests [get]
func (cont *Controller) GetPendingQuoteRequests(ctx *gin.Context) {
pending := cont.tradeProvider.GetPendingQuoteRequests()
ctx.JSON(http.StatusOK, pending)
}
// SendQuote godoc
// @Summary Send a Quote for a pending QuoteRequest
// @Description Builds and sends a Quote (35=S) to TW for an existing QuoteRequest at the given price
// @Tags fix
// @Accept json
// @Produce json
// @Param body body SendQuoteRequest true "Quote to send"
// @Success 200 {object} Msg
// @Failure 400 {object} HTTPError
// @Failure 404 {object} HTTPError
// @Failure 409 {object} HTTPError
// @Failure 500 {object} HTTPError
// @Router /qfixdpl/v1/quotes [post]
func (cont *Controller) SendQuote(ctx *gin.Context) {
var req SendQuoteRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
price, err := decimal.NewFromString(req.Price)
if err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: "invalid price: " + err.Error()})
return
}
if err := cont.tradeProvider.SendQuote(req.QuoteReqID, price); err != nil {
msg := err.Error()
switch {
case strings.Contains(msg, "not found"):
ctx.JSON(http.StatusNotFound, HTTPError{Error: "quoteReqID not found"})
case strings.Contains(msg, "already sent"):
ctx.JSON(http.StatusConflict, HTTPError{Error: "quote already sent for this quoteReqID"})
default:
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "failed to send quote"})
}
return
}
ctx.JSON(http.StatusOK, Msg{Text: "Quote sent"})
}

View File

@ -1,6 +1,5 @@
package rest package rest
type HTTPError struct { type HTTPError struct {
Error string Error string
} }
@ -18,3 +17,13 @@ type Session struct {
Email string Email string
} }
type SendQuoteRequest struct {
QuoteReqID string `json:"QuoteReqID" binding:"required"`
Price string `json:"Price" binding:"required" example:"99.6"`
}
type AllMessagesRequest struct {
APIKey string
In int
Out int
}

View File

@ -23,6 +23,11 @@ func SetRoutes(api *API) {
qfixdpl.GET("/health", cont.HealthCheck) qfixdpl.GET("/health", cont.HealthCheck)
qfixdpl.GET("/trades", cont.GetTrades) qfixdpl.GET("/trades", cont.GetTrades)
qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs) qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs)
qfixdpl.GET("/quote-requests", cont.GetPendingQuoteRequests)
qfixdpl.POST("/quotes", cont.SendQuote)
msgs := v1.Group("/")
msgs.POST("/messages", cont.AllMessages)
backoffice := qfixdpl.Group("/backoffice") backoffice := qfixdpl.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser) backoffice.Use(cont.BackOfficeUser)

View File

@ -7,6 +7,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/src/app" "quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version" "quantex.com/qfixdpl/src/app/version"
@ -19,6 +20,9 @@ import (
// TradeProvider exposes trade data from the FIX manager. // TradeProvider exposes trade data from the FIX manager.
type TradeProvider interface { type TradeProvider interface {
GetTrades() []domain.ListTrade GetTrades() []domain.ListTrade
GetPendingQuoteRequests() []domain.ListTrade
SendQuote(quoteReqID string, price decimal.Decimal) error
GetAllMessages(inSeq, outSeq int) []domain.Message
} }
const RedisMaxIdle = 3000 // In ms const RedisMaxIdle = 3000 // In ms

172
src/client/fix/builder.go Normal file
View File

@ -0,0 +1,172 @@
package fix
import (
"fmt"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/datadictionary"
)
// BuildFieldMap walks a quickfix.FieldMap and produces an enriched FieldMap
// keyed by FixField (Name + Tag + Type) with values typed per the FIX
// data dictionary. Repeating groups become []FieldMap; nested groups recurse.
func BuildFieldMap(qfMap quickfix.FieldMap, dd *datadictionary.DataDictionary, fieldsByTag map[int]*datadictionary.FieldDef) FieldMap {
out := FieldMap{}
for _, t := range qfMap.Tags() {
tagInt := int(t)
rawValues := qfMap.RawValues(t)
if len(rawValues) == 0 {
continue
}
ff, fd := resolveField(dd, fieldsByTag, tagInt)
if ff.Type == "NUMINGROUP" {
if fd != nil && fd.IsGroup() {
out[ff] = buildGroup(rawValues[1:], dd, fd.Fields)
} else {
// Count tag with no group structure available (dictionary lookup
// failed, or quickfix didn't nest the inner fields). Emit an empty
// group rather than an int so the consumer's type expectation holds.
out[ff] = []FieldMap{}
}
continue
}
out[ff] = parseScalar(rawValues[0].Value(), ff.Type)
}
return out
}
// buildGroup splits a flat slice of TagValues (the inner values of a NUMINGROUP,
// excluding the count) into per-repetition FieldMaps. The first element of
// innerFields is the delimiter that marks the start of each repetition.
func buildGroup(tvs []quickfix.TagValue, dd *datadictionary.DataDictionary, innerFields []*datadictionary.FieldDef) []FieldMap {
if len(innerFields) == 0 || len(tvs) == 0 {
return nil
}
delimiter := innerFields[0].Tag()
fdByTag := make(map[int]*datadictionary.FieldDef, len(innerFields))
for _, f := range innerFields {
fdByTag[f.Tag()] = f
}
var (
repetitions []FieldMap
current FieldMap
)
i := 0
for i < len(tvs) {
tv := tvs[i]
tagInt := int(tv.Tag())
if tagInt == delimiter {
current = FieldMap{}
repetitions = append(repetitions, current)
}
if current == nil {
// Field appeared before the first delimiter; skip.
i++
continue
}
fd, known := fdByTag[tagInt]
if !known {
current[FixField{Name: fmt.Sprintf("tag_%d", tagInt), Tag: tagInt, Type: "STRING"}] = string(tv.Value())
i++
continue
}
if fd.IsGroup() {
nestedAllowed := allowedTags(fd.Fields)
j := i + 1
for j < len(tvs) && nestedAllowed[int(tvs[j].Tag())] {
j++
}
nested := buildGroup(tvs[i+1:j], dd, fd.Fields)
current[FixField{Name: fd.Name(), Tag: tagInt, Type: "NUMINGROUP"}] = nested
i = j
continue
}
current[FixField{Name: fd.Name(), Tag: tagInt, Type: fd.Type}] = parseScalar(tv.Value(), fd.Type)
i++
}
return repetitions
}
// resolveField looks up the FixField metadata (name + type) for a tag.
// Prefers the message-level FieldDef so group structure is preserved;
// falls back to the global FieldTypeByTag, then to a synthetic "tag_<N>" STRING.
func resolveField(dd *datadictionary.DataDictionary, fieldsByTag map[int]*datadictionary.FieldDef, tagInt int) (FixField, *datadictionary.FieldDef) {
if fieldsByTag != nil {
if fd, ok := fieldsByTag[tagInt]; ok {
return FixField{Name: fd.Name(), Tag: tagInt, Type: fd.Type}, fd
}
}
if dd != nil {
if ft, ok := dd.FieldTypeByTag[tagInt]; ok {
return FixField{Name: ft.Name(), Tag: tagInt, Type: ft.Type}, nil
}
}
return FixField{Name: fmt.Sprintf("tag_%d", tagInt), Tag: tagInt, Type: "STRING"}, nil
}
// parseScalar converts raw FIX bytes into the Go type expected by the consumer
// (GetKeyValue): bool for BOOLEAN, int for INT/SEQNUM, time.Time for UTCTIMESTAMP,
// string for everything else.
func parseScalar(raw []byte, fixType string) interface{} {
s := string(raw)
switch fixType {
case "INT", "SEQNUM", "LENGTH":
var v quickfix.FIXInt
if err := v.Read(raw); err == nil {
return int(v)
}
return s
case "BOOLEAN":
var v quickfix.FIXBoolean
if err := v.Read(raw); err == nil {
return bool(v)
}
return s
case "UTCTIMESTAMP":
var v quickfix.FIXUTCTimestamp
if err := v.Read(raw); err == nil {
return v.Time
}
return s
default:
return s
}
}
// allowedTags returns the set of all tags valid inside a group (including
// nested-group counts and their descendants), used to detect where a flat
// nested-group slice ends within its parent.
func allowedTags(fields []*datadictionary.FieldDef) map[int]bool {
out := make(map[int]bool, len(fields))
var visit func(fs []*datadictionary.FieldDef)
visit = func(fs []*datadictionary.FieldDef) {
for _, f := range fs {
out[f.Tag()] = true
if f.IsGroup() {
visit(f.Fields)
}
}
}
visit(fields)
return out
}

106
src/client/fix/dict.go Normal file
View File

@ -0,0 +1,106 @@
package fix
import (
"log/slog"
"time"
"quantex.com/qfixdpl/src/common/tracerr"
)
// FixField identifies a single FIX field by its dictionary metadata.
type FixField struct {
Name string
Tag int
Type string
}
// FieldValue is the typed value for a FIX field.
type FieldValue interface{}
// FieldMap is the enriched representation of a FIX FieldMap (header, body, or trailer).
type FieldMap map[FixField]FieldValue
// GetMap converts a FieldMap into a JSON-friendly map keyed by field name.
func GetMap(fieldMap FieldMap) map[string]interface{} {
result := map[string]interface{}{}
for f, v := range fieldMap {
slog.Info("try to parse fieldMap: %+v, value: %+v", f, v)
k, val := GetKeyValue(f, v)
result[k] = val
}
return result
}
//nolint:funlen,gocyclo,cyclop //it's long but easy to read
func GetKeyValue(f FixField, value FieldValue) (key string, val interface{}) {
key = f.Name
switch f.Type {
case "NUMINGROUP":
var groups []map[string]interface{}
fMapLst, ok := value.([]FieldMap)
if !ok {
err := tracerr.Errorf("could not parse as []FieldMap, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
for _, fieldMap := range fMapLst {
groups = append(groups, GetMap(fieldMap))
}
val = groups
case "BOOLEAN":
b, ok := value.(bool)
if !ok {
err := tracerr.Errorf("could not parse as bool, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = b
case "INT", "LENGTH", "SEQNUM":
i, ok := value.(int)
if !ok {
err := tracerr.Errorf("could not parse as int, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = i
case "UTCTIMESTAMP":
t, ok := value.(time.Time)
if !ok {
err := tracerr.Errorf("could not parse as Time, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = t
case "STRING":
s, ok := value.(string)
if !ok {
err := tracerr.Errorf("could not parse as string, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = s
case "CHAR":
s, ok := value.(string)
if !ok {
err := tracerr.Errorf("could not parse as string, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = s
default:
s, ok := value.(string)
if !ok {
err := tracerr.Errorf("could not parse as string, value: %+v, key: %s", value, key)
slog.Error(err.Error())
}
val = s
}
return key, val
}

View File

@ -3,14 +3,17 @@ package fix
import ( import (
"log/slog" "log/slog"
"os" "os"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
uuid "github.com/satori/go.uuid"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"quantex.com/qfixdpl/quickfix" "quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/datadictionary"
"quantex.com/qfixdpl/quickfix/gen/enum" "quantex.com/qfixdpl/quickfix/gen/enum"
"quantex.com/qfixdpl/quickfix/gen/field" "quantex.com/qfixdpl/quickfix/gen/field"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
@ -20,6 +23,7 @@ import (
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport"
"quantex.com/qfixdpl/quickfix/gen/tag"
filelog "quantex.com/qfixdpl/quickfix/log/file" filelog "quantex.com/qfixdpl/quickfix/log/file"
"quantex.com/qfixdpl/quickfix/store/file" "quantex.com/qfixdpl/quickfix/store/file"
"quantex.com/qfixdpl/src/app" "quantex.com/qfixdpl/src/app"
@ -28,17 +32,10 @@ import (
) )
type listTrade struct { type listTrade struct {
QuoteReqID string QuoteRequest domain.FixMessageJSON
ListID string
Symbol string
SecurityIDSrc enum.SecurityIDSource
Currency string
Side enum.Side
OrderQty decimal.Decimal
SettlDate string
Price decimal.Decimal
OwnerTraderID string
SessionID quickfix.SessionID SessionID quickfix.SessionID
Quoted bool
Price decimal.Decimal
} }
// Manager wraps the QuickFIX initiator and implements domain.FIXSender. // Manager wraps the QuickFIX initiator and implements domain.FIXSender.
@ -49,15 +46,19 @@ type Manager struct {
sessions map[string]quickfix.SessionID sessions map[string]quickfix.SessionID
tradesMu sync.RWMutex tradesMu sync.RWMutex
trades map[string]*listTrade trades map[string]*listTrade
messagesMu sync.RWMutex
messages []domain.Message
store domain.PersistenceStore store domain.PersistenceStore
notify domain.Notifier notify domain.Notifier
cfg app.FIXConfig cfg app.FIXConfig
dict *datadictionary.DataDictionary
} }
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager { func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
return &Manager{ return &Manager{
sessions: make(map[string]quickfix.SessionID), sessions: make(map[string]quickfix.SessionID),
trades: make(map[string]*listTrade), trades: make(map[string]*listTrade),
messages: make([]domain.Message, 0),
store: store, store: store,
notify: notify, notify: notify,
cfg: cfg, cfg: cfg,
@ -76,14 +77,29 @@ func (m *Manager) Start() error {
fixApp.onRawMessage = m.handleRawMessage fixApp.onRawMessage = m.handleRawMessage
m.app = fixApp m.app = fixApp
dict, err := datadictionary.Parse(m.cfg.DataDictionaryFile)
if err != nil {
err = tracerr.Errorf("error parsing FIX data dictionary %q: %w", m.cfg.DataDictionaryFile, err)
slog.Error(err.Error())
return err
}
m.dict = dict
if err := m.loadActiveTrades(); err != nil { if err := m.loadActiveTrades(); err != nil {
slog.Error("failed to load active trades from DB, starting with empty state", "error", err) err = tracerr.Errorf("failed to load active trades from DB, starting with empty state: %w", err)
slog.Error(err.Error())
}
if err := m.loadTodayMessages(); err != nil {
err = tracerr.Errorf("failed to load today messages from DB, starting with empty list: %w", err)
slog.Error(err.Error())
} }
f, err := os.Open(m.cfg.SettingsFile) f, err := os.Open(m.cfg.SettingsFile)
if err != nil { if err != nil {
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err) err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
@ -91,8 +107,8 @@ func (m *Manager) Start() error {
settings, err := quickfix.ParseSettings(f) settings, err := quickfix.ParseSettings(f)
if err != nil { if err != nil {
err = tracerr.Errorf("error parsing FIX settings: %s", err) err = tracerr.Errorf("error parsing FIX settings: %w", err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
@ -100,16 +116,16 @@ func (m *Manager) Start() error {
storeFactory := file.NewStoreFactory(settings) storeFactory := file.NewStoreFactory(settings)
logFactory, err := filelog.NewLogFactory(settings) logFactory, err := filelog.NewLogFactory(settings)
if err != nil { if err != nil {
err = tracerr.Errorf("error creating file log factory: %s", err) err = tracerr.Errorf("error creating file log factory: %w", err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory) initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
if err != nil { if err != nil {
err = tracerr.Errorf("error creating FIX initiator: %s", err) err = tracerr.Errorf("error creating FIX initiator: %w", err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
@ -117,8 +133,8 @@ func (m *Manager) Start() error {
m.initiator = initiator m.initiator = initiator
if err = m.initiator.Start(); err != nil { if err = m.initiator.Start(); err != nil {
err = tracerr.Errorf("error starting FIX initiator: %s", err) err = tracerr.Errorf("error starting FIX initiator: %w", err)
log.Error().Msg(err.Error()) slog.Error(err.Error())
return err return err
} }
@ -190,64 +206,233 @@ func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID qu
return quickfix.SendToTarget(bn, sessionID) return quickfix.SendToTarget(bn, sessionID)
} }
// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge) followed by a Quote at price 99.6. // handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge).
// The Quote (35=S) is sent later via the REST endpoint when the dealer prices it.
func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) { func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) {
quoteReqID, err := msg.GetQuoteReqID() parsed := parseQuoteRequest(msg, m.dict)
if err != nil { quoteReqID := parsed.QuoteReqID
slog.Error("handleQuoteRequest: missing QuoteReqID", "error", err.Error())
if quoteReqID == "" {
m.notify.SendMsg(
domain.MessageChannelError,
"quoteReqID missing in quote request",
domain.MessageStatusWarning,
nil,
)
err := tracerr.Errorf("handleQuoteRequest, missing QuoteReqID, quoteRequest: %+v", parsed)
slog.Error(err.Error())
return return
} }
// Validate LST_ prefix for List Trading flow. // Validate LST_ prefix for List Trading flow.
if !strings.HasPrefix(quoteReqID, "LST_") { if !strings.HasPrefix(quoteReqID, "LST_") {
m.notify.SendMsg(
domain.MessageChannelError,
"quoteReqID ("+quoteReqID+") missing LST_ prefix",
domain.MessageStatusWarning,
nil,
)
slog.Warn("handleQuoteRequest: QuoteReqID missing LST_ prefix, ignoring", "quoteReqID", quoteReqID) slog.Warn("handleQuoteRequest: QuoteReqID missing LST_ prefix, ignoring", "quoteReqID", quoteReqID)
return return
} }
var ( bodyKeys := make([]string, 0, len(parsed.Body))
symbol, currency, ownerTraderID, settlDate, listID, negotiationType string for k := range parsed.Body {
side enum.Side bodyKeys = append(bodyKeys, k)
secIDSource enum.SecurityIDSource
orderQty decimal.Decimal
)
relatedSyms, relErr := msg.GetNoRelatedSym()
if relErr == nil && relatedSyms.Len() > 0 {
sym := relatedSyms.Get(0)
symbol, _ = sym.GetSecurityID()
secIDSource, _ = sym.GetSecurityIDSource()
currency, _ = sym.GetCurrency()
side, _ = sym.GetSide()
ownerTraderID, _ = sym.GetOwnerTraderID()
orderQty, _ = sym.GetOrderQty()
settlDate, _ = sym.GetSettlDate()
listID, _ = sym.GetListID()
negotiationType, _ = sym.GetNegotiationType()
} }
slog.Info("handleQuoteRequest: parsed body keys", "quoteReqID", quoteReqID, "keys", bodyKeys)
if listID == "" { relSym := firstGroup(parsed.Body, "NoRelatedSym")
slog.Warn("handleQuoteRequest: missing ListID", "quoteReqID", quoteReqID) relSymKeys := make([]string, 0, len(relSym))
return for k := range relSym {
relSymKeys = append(relSymKeys, k)
} }
slog.Info("handleQuoteRequest: NoRelatedSym keys", "quoteReqID", quoteReqID, "keys", relSymKeys)
if negotiationType != "RFQ" { ownerTraderID := getString(relSym, "OwnerTraderID")
slog.Warn("handleQuoteRequest: unexpected NegotiationType", "quoteReqID", quoteReqID, "negotiationType", negotiationType)
return
}
// Step 1: Send QuoteStatusReport (35=AI) to acknowledge the inquiry. // Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil { if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil {
slog.Error("handleQuoteRequest: failed to send QuoteStatusReport", "quoteReqID", quoteReqID, "error", ackErr.Error()) ackErr = tracerr.Errorf("handleQuoteRequest: failed to send QuoteStatusReport (quoteReqID=%s): %w", quoteReqID, ackErr)
slog.Error(ackErr.Error())
return return
} }
slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID) slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID)
// Step 2: Build and send Quote (35=S) with price. // Store trade state as pending; Quote (35=S) is sent later via REST endpoint.
price := decimal.NewFromFloat(99.6) m.tradesMu.Lock()
m.trades[quoteReqID] = &listTrade{
QuoteRequest: parsed,
SessionID: sessionID,
Quoted: false,
}
m.tradesMu.Unlock()
}
sIDSource := enum.SecurityIDSource_ISIN_NUMBER // handleQuoteAck handles an incoming QuoteAck (35=CW).
if secIDSource == enum.SecurityIDSource_CUSIP { func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
sIDSource = enum.SecurityIDSource_CUSIP quoteReqID, _ := msg.GetQuoteReqID()
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
if status != enum.QuoteAckStatus_ACCEPTED {
err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text)
slog.Error(err.Error())
return
}
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
}
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespID, _ := msg.GetQuoteRespID()
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
return
}
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
// Always send ACK regardless of whether the trade is in our map.
// TW will keep retrying until it receives an ACK.
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleQuoteResponse: failed to send ACK (quoteReqID=%s, quoteRespID=%s): %w", quoteReqID, quoteRespID, ackErr)
slog.Error(ackErr.Error())
return
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// _TRDSUMM is the final message — clean up the trade.
if isTrdSumm {
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
}
}
// handleExecutionReport handles an incoming ExecutionReport (35=8).
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
clOrdID, _ := msg.GetClOrdID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
listID, _ := msg.GetListID()
slog.Info("handleExecutionReport received",
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
)
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleExecutionReport: failed to send ExecutionAck (execID=%s): %w", execID, ackErr)
slog.Error(ackErr.Error())
} else {
slog.Info("ExecutionAck sent", "execID", execID)
}
switch {
case strings.Contains(execID, "_LISTEND"):
slog.Info("List ended (due-in closed), awaiting trade result from TW",
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW, cleaning up",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}
// GetTrades returns a snapshot of all active trades.
func (m *Manager) GetTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
trades = append(trades, toDomainListTrade(t))
}
return trades
}
// GetPendingQuoteRequests returns trades that have received a QuoteRequest but not yet been quoted by the dealer.
func (m *Manager) GetPendingQuoteRequests() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
pending := make([]domain.ListTrade, 0)
for _, t := range m.trades {
pending = append(pending, toDomainListTrade(t))
}
return pending
}
func toDomainListTrade(t *listTrade) domain.ListTrade {
out := domain.ListTrade{
QuoteRequest: t.QuoteRequest,
}
return out
}
// SendQuote builds and sends a Quote (35=S) for an existing pending QuoteRequest at the given price.
func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
m.tradesMu.Lock()
t, ok := m.trades[quoteReqID]
if !ok {
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: quoteReqID %s not found", quoteReqID)
slog.Error(err.Error())
return err
}
sessionID := t.SessionID
if sessionID == (quickfix.SessionID{}) {
sessionID = m.anyActiveSessionID()
if sessionID == (quickfix.SessionID{}) {
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: no active FIX session for quoteReqID %s", quoteReqID)
slog.Error(err.Error())
return err
}
}
symbol := getString(t.QuoteRequest.Body, "SecurityID")
sIDSource := enum.SecurityIDSource(getString(t.QuoteRequest.Body, "SecurityIDSource"))
currency := getString(t.QuoteRequest.Body, "Currency")
side := enum.Side(getString(t.QuoteRequest.Body, "Side"))
orderQty := getDecimal(t.QuoteRequest.Body, "OrderQty")
settlDate := getString(t.QuoteRequest.Body, "SettlDate")
ownerTraderID := getString(t.QuoteRequest.Body, "OwnerTraderID")
m.tradesMu.Unlock()
if sIDSource != enum.SecurityIDSource_CUSIP {
sIDSource = enum.SecurityIDSource_ISIN_NUMBER
} }
quoteID := quoteReqID quoteID := quoteReqID
@ -291,205 +476,76 @@ func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID qu
} }
if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil { if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil {
slog.Error("handleQuoteRequest: failed to send quote", "quoteReqID", quoteReqID, "error", sendErr.Error()) sendErr = tracerr.Errorf("SendQuote: failed to send quote (quoteReqID=%s): %w", quoteReqID, sendErr)
return slog.Error(sendErr.Error())
return sendErr
} }
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol)
// Store trade state for subsequent steps.
m.tradesMu.Lock() m.tradesMu.Lock()
m.trades[quoteReqID] = &listTrade{ if t, ok := m.trades[quoteReqID]; ok {
QuoteReqID: quoteReqID, t.Price = price
ListID: listID, t.Quoted = true
Symbol: symbol,
SecurityIDSrc: sIDSource,
Currency: currency,
Side: side,
OrderQty: orderQty,
SettlDate: settlDate,
Price: price,
OwnerTraderID: ownerTraderID,
SessionID: sessionID,
} }
m.tradesMu.Unlock() m.tradesMu.Unlock()
// Persist structured message (outside mutex). slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String())
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
// Persist outgoing QuoteStatusReport. return nil
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
}))
// Persist outgoing Quote.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteID": quoteID,
"Symbol": symbol,
"Side": string(side),
"Price": price.String(),
"OrderQty": orderQty.String(),
"Currency": currency,
"SettlDate": settlDate,
}))
} }
// handleQuoteAck handles an incoming QuoteAck (35=CW). // firstGroup returns the first repetition of a NUMINGROUP field, or nil.
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) { func firstGroup(body map[string]any, name string) map[string]any {
quoteReqID, _ := msg.GetQuoteReqID() if body == nil {
status, _ := msg.GetQuoteAckStatus() return nil
text, _ := msg.GetText() }
groups, ok := body[name].([]map[string]any)
m.persistMessage(quoteReqID, parseQuoteAck(msg)) if !ok || len(groups) == 0 {
return nil
if status != enum.QuoteAckStatus_ACCEPTED { }
slog.Error("handleQuoteAck: quote rejected by TW", "quoteReqID", quoteReqID, "quoteAckStatus", string(status), "text", text) return groups[0]
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
return
} }
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID) // getString reads a string value from a body map, tolerating nil maps and missing keys.
func getString(body map[string]any, name string) string {
if body == nil {
return ""
}
if v, ok := body[name].(string); ok {
return v
}
return ""
} }
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ). // getDecimal reads a decimal value from a body map. Numeric FIX types come through
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes. // as strings (e.g. "10000"); INT-typed counts may be int. Both are accepted.
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) { func getDecimal(body map[string]any, name string) decimal.Decimal {
quoteReqID, _ := msg.GetQuoteReqID() if body == nil {
quoteRespID, _ := msg.GetQuoteRespID() return decimal.Decimal{}
}
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID) switch v := body[name].(type) {
case string:
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ") d, _ := decimal.NewFromString(v)
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND") return d
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM") case int:
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND") return decimal.NewFromInt(int64(v))
}
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd { return decimal.Decimal{}
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
return
} }
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future. func (m *Manager) anyActiveSessionID() quickfix.SessionID {
m.sessionsMu.RLock()
// Always send ACK regardless of whether the trade is in our map. defer m.sessionsMu.RUnlock()
// TW will keep retrying until it receives an ACK. for _, s := range m.sessions {
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil { return s
slog.Error("handleQuoteResponse: failed to send ACK", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID, "error", ackErr.Error())
return
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
}))
// _TRDSUMM is the final message — clean up the trade.
if isTrdSumm {
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
} }
return quickfix.SessionID{}
} }
// handleExecutionReport handles an incoming ExecutionReport (35=8). // handleRawMessage is the single ingest path for every application-level FIX message
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does. // (admin messages — Logon, Logout, Heartbeat, TestRequest, ResendRequest, SequenceReset —
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) { // go through ToAdmin/FromAdmin and never reach this callback). It persists the raw
execID, _ := msg.GetExecID() // envelope to the logs table, builds a structured Message and saves it to
orderID, _ := msg.GetOrderID() // the messages table, and appends to the in-memory list.
clOrdID, _ := msg.GetClOrdID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
listID, _ := msg.GetListID()
slog.Info("handleExecutionReport received",
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
)
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
slog.Error("handleExecutionReport: failed to send ExecutionAck", "execID", execID, "error", ackErr.Error())
} else {
slog.Info("ExecutionAck sent", "execID", execID)
}
switch {
case strings.Contains(execID, "_LISTEND"):
slog.Info("List ended (due-in closed), awaiting trade result from TW",
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW, cleaning up",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
m.tradesMu.Lock()
delete(m.trades, clOrdID)
m.tradesMu.Unlock()
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
}))
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}
// GetTrades returns a snapshot of all active trades.
func (m *Manager) GetTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
trades = append(trades, domain.ListTrade{
QuoteReqID: t.QuoteReqID,
ListID: t.ListID,
Symbol: t.Symbol,
SecurityIDSrc: string(t.SecurityIDSrc),
Currency: t.Currency,
Side: string(t.Side),
OrderQty: t.OrderQty.String(),
SettlDate: t.SettlDate,
Price: t.Price.String(),
OwnerTraderID: t.OwnerTraderID,
})
}
return trades
}
// handleRawMessage persists raw FIX message strings to the logs table.
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) { func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
quoteReqID := extractIdentifier(msg) quoteReqID := extractIdentifier(msg)
@ -497,18 +553,80 @@ func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
QuoteReqID: quoteReqID, QuoteReqID: quoteReqID,
RawMsg: "[" + direction + "] " + msg.String(), RawMsg: "[" + direction + "] " + msg.String(),
}); err != nil { }); err != nil {
slog.Error("failed to persist raw log", "error", err) err = tracerr.Errorf("failed to persist raw log: %w", err)
slog.Error(err.Error())
}
msgTypeBytes, _ := msg.Header.GetBytes(tag.MsgType)
msgType := string(msgTypeBytes)
senderCompID, msgSeqNum, sendingTime := extractHeaderMeta(msg)
fixJSON := buildFixMessageJSON(direction, msgType, quoteReqID, msg, m.dict)
stored := domain.Message{
ID: uuid.NewV4().String(),
SenderCompID: senderCompID,
MsgSeqNum: msgSeqNum,
SendingTime: sendingTime,
CreatedAt: time.Now(),
JMessage: fixJSON,
}
if err := m.store.SaveMessage(stored); err != nil {
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", msgType, quoteReqID, err)
slog.Error(err.Error())
}
m.messagesMu.Lock()
m.messages = append(m.messages, stored)
m.messagesMu.Unlock()
}
// GetAllMessages returns today's FIX application messages with MsgSeqNum greater than
// the caller's last-seen sequence per direction (inSeq for IN, outSeq for OUT), sorted
// ascending by CreatedAt. Passing 0 for either cursor returns all messages on that side.
func (m *Manager) GetAllMessages(inSeq, outSeq int) []domain.Message {
m.messagesMu.RLock()
log.Info().Msgf("request received, inSeq: %d, outSeq: %d", inSeq, outSeq)
filtered := make([]domain.Message, 0, len(m.messages))
for _, msg := range m.messages {
switch msg.JMessage.Direction {
case "IN":
if msg.MsgSeqNum > inSeq {
filtered = append(filtered, msg)
}
case "OUT":
if msg.MsgSeqNum > outSeq {
filtered = append(filtered, msg)
}
} }
} }
// persistMessage saves a structured FIX message to the messages table. m.messagesMu.RUnlock()
func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) {
if err := m.store.SaveMessage(domain.TradeMessage{ sort.Slice(filtered, func(i, j int) bool { return filtered[i].CreatedAt.Before(filtered[j].CreatedAt) })
QuoteReqID: quoteReqID,
JMessage: fixJSON, log.Info().Msgf("messages sent: %d", len(filtered))
}); err != nil {
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "quoteReqID", quoteReqID, "error", err) return filtered
} }
// loadTodayMessages rebuilds the in-memory message list from today's rows in the DB.
// Must be called before the FIX initiator starts so live ingest doesn't race with replay.
func (m *Manager) loadTodayMessages() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
m.messagesMu.Lock()
m.messages = messages
m.messagesMu.Unlock()
slog.Info("today messages loaded", "count", len(messages))
return nil
} }
// loadActiveTrades reconstructs active trades from today's messages in the database. // loadActiveTrades reconstructs active trades from today's messages in the database.
@ -521,78 +639,30 @@ func (m *Manager) loadActiveTrades() error {
activeTrades := make(map[string]*listTrade) activeTrades := make(map[string]*listTrade)
for _, msg := range messages { for _, msg := range messages {
quoteReqID := msg.JMessage.QuoteReqID
switch msg.JMessage.MsgType { switch msg.JMessage.MsgType {
case "R": // QuoteRequest -> trade is born case "R": // QuoteRequest -> trade is born
if !strings.HasPrefix(msg.QuoteReqID, "LST_") { if !strings.HasPrefix(quoteReqID, "LST_") {
continue continue
} }
body := msg.JMessage.Body relSym := firstGroup(msg.JMessage.Body, "NoRelatedSym")
if getString(relSym, "NegotiationType") != "RFQ" {
nt, _ := body["NegotiationType"].(string) continue
if nt != "RFQ" { }
if getString(relSym, "ListID") == "" {
continue continue
} }
listID, _ := body["ListID"].(string) activeTrades[quoteReqID] = &listTrade{
if listID == "" { QuoteRequest: msg.JMessage,
continue
} }
trade := &listTrade{ case "S": // Outgoing Quote — dealer has already quoted this trade
QuoteReqID: msg.QuoteReqID, if t, ok := activeTrades[quoteReqID]; ok {
ListID: listID, t.Quoted = true
} t.Price = getDecimal(msg.JMessage.Body, "Price")
if v, ok := body["SecurityID"].(string); ok {
trade.Symbol = v
}
if v, ok := body["Currency"].(string); ok {
trade.Currency = v
}
if v, ok := body["Side"].(string); ok {
trade.Side = enum.Side(v)
}
if v, ok := body["OrderQty"].(string); ok {
trade.OrderQty, _ = decimal.NewFromString(v)
}
if v, ok := body["SettlDate"].(string); ok {
trade.SettlDate = v
}
if v, ok := body["OwnerTraderID"].(string); ok {
trade.OwnerTraderID = v
}
activeTrades[msg.QuoteReqID] = trade
case "CW": // QuoteAck — if rejected, trade is dead
body := msg.JMessage.Body
quoteAckStatus, _ := body["QuoteAckStatus"].(string)
if quoteAckStatus != string(enum.QuoteAckStatus_ACCEPTED) {
delete(activeTrades, msg.QuoteReqID)
}
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
body := msg.JMessage.Body
quoteRespID, _ := body["QuoteRespID"].(string)
if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
delete(activeTrades, msg.QuoteReqID)
}
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
body := msg.JMessage.Body
execID, _ := body["ExecID"].(string)
clOrdID, _ := body["ClOrdID"].(string)
if strings.Contains(execID, "_TRDSUMM") {
delete(activeTrades, clOrdID)
} }
} }
} }

View File

@ -4,196 +4,47 @@ import (
"time" "time"
"quantex.com/qfixdpl/quickfix" "quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport" "quantex.com/qfixdpl/quickfix/datadictionary"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/tag" "quantex.com/qfixdpl/quickfix/gen/tag"
"quantex.com/qfixdpl/src/domain" "quantex.com/qfixdpl/src/domain"
) )
func extractHeader(msg *quickfix.Message) map[string]interface{} { // buildFixMessageJSON walks the full FIX message (header + body + trailer)
header := make(map[string]interface{}) // using the data dictionary and returns a fully populated FixMessageJSON.
func buildFixMessageJSON(direction, msgType, quoteReqID string, msg *quickfix.Message, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
var (
headerFields map[int]*datadictionary.FieldDef
trailerFields map[int]*datadictionary.FieldDef
bodyFields map[int]*datadictionary.FieldDef
)
if v, err := msg.Header.GetBytes(tag.BeginString); err == nil { if dd != nil {
header["BeginString"] = string(v) if dd.Header != nil {
headerFields = dd.Header.Fields
} }
if v, err := msg.Header.GetBytes(tag.MsgType); err == nil { if dd.Trailer != nil {
header["MsgType"] = string(v) trailerFields = dd.Trailer.Fields
} }
if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil { if md, ok := dd.Messages[msgType]; ok {
header["SenderCompID"] = string(v) bodyFields = md.Fields
}
if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil {
header["TargetCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.MsgSeqNum); err == nil {
header["MsgSeqNum"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil {
header["SendingTime"] = string(v)
}
return header
}
func parseQuoteRequest(msg quoterequest.QuoteRequest) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if relSyms, err := msg.GetNoRelatedSym(); err == nil && relSyms.Len() > 0 {
sym := relSyms.Get(0)
if v, e := sym.GetSecurityID(); e == nil {
body["SecurityID"] = v
}
if v, e := sym.GetSecurityIDSource(); e == nil {
body["SecurityIDSource"] = string(v)
}
if v, e := sym.GetCurrency(); e == nil {
body["Currency"] = v
}
if v, e := sym.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := sym.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := sym.GetSettlDate(); e == nil {
body["SettlDate"] = v
}
if v, e := sym.GetListID(); e == nil {
body["ListID"] = v
}
if v, e := sym.GetOwnerTraderID(); e == nil {
body["OwnerTraderID"] = v
}
if v, e := sym.GetNegotiationType(); e == nil {
body["NegotiationType"] = v
} }
} }
return domain.FixMessageJSON{ return domain.FixMessageJSON{
Direction: "IN", Direction: direction,
MsgType: "R", MsgType: msgType,
QuoteReqID: quoteReqID, QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message), Header: GetMap(BuildFieldMap(msg.Header.FieldMap, dd, headerFields)),
Body: body, Body: GetMap(BuildFieldMap(msg.Body.FieldMap, dd, bodyFields)),
Trailer: GetMap(BuildFieldMap(msg.Trailer.FieldMap, dd, trailerFields)),
ReceiveTime: time.Now(), ReceiveTime: time.Now(),
} }
} }
func parseQuoteAck(msg quoteack.QuoteAck) domain.FixMessageJSON { func parseQuoteRequest(msg quoterequest.QuoteRequest, dd *datadictionary.DataDictionary) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID() quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID} return buildFixMessageJSON("IN", "R", quoteReqID, msg.Message, dd)
if v, e := msg.GetQuoteID(); e == nil {
body["QuoteID"] = v
}
if v, e := msg.GetQuoteAckStatus(); e == nil {
body["QuoteAckStatus"] = string(v)
}
if v, e := msg.GetText(); e == nil {
body["Text"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "CW",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseQuoteResponse(msg quoteresponse.QuoteResponse) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if v, e := msg.GetQuoteRespID(); e == nil {
body["QuoteRespID"] = v
}
if v, e := msg.GetQuoteRespType(); e == nil {
body["QuoteRespType"] = string(v)
}
if v, e := msg.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := msg.GetPrice(); e == nil {
body["Price"] = v.String()
}
if v, e := msg.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := msg.GetClOrdID(); e == nil {
body["ClOrdID"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AJ",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessageJSON {
clOrdID, _ := msg.GetClOrdID()
body := map[string]interface{}{"ClOrdID": clOrdID}
if v, e := msg.GetExecID(); e == nil {
body["ExecID"] = v
}
if v, e := msg.GetOrderID(); e == nil {
body["OrderID"] = v
}
if v, e := msg.GetExecType(); e == nil {
body["ExecType"] = string(v)
}
if v, e := msg.GetOrdStatus(); e == nil {
body["OrdStatus"] = string(v)
}
if v, e := msg.GetListID(); e == nil {
body["ListID"] = v
}
if v, e := msg.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := msg.GetSymbol(); e == nil {
body["Symbol"] = v
}
if v, e := msg.GetSecurityID(); e == nil {
body["SecurityID"] = v
}
if v, e := msg.GetCurrency(); e == nil {
body["Currency"] = v
}
if v, e := msg.GetPrice(); e == nil {
body["Price"] = v.String()
}
if v, e := msg.GetLastPx(); e == nil {
body["LastPx"] = v.String()
}
if v, e := msg.GetLastQty(); e == nil {
body["LastQty"] = v.String()
}
if v, e := msg.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := msg.GetSettlDate(); e == nil {
body["SettlDate"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "8",
QuoteReqID: clOrdID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
} }
// extractIdentifier extracts the trade identifier from a parsed FIX message. // extractIdentifier extracts the trade identifier from a parsed FIX message.
@ -218,12 +69,17 @@ func extractIdentifier(msg *quickfix.Message) string {
return "" return ""
} }
func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]interface{}) domain.FixMessageJSON { // extractHeaderMeta reads SenderCompID (49), MsgSeqNum (34) and SendingTime (52)
return domain.FixMessageJSON{ // from a quickfix.Message header. Returns zero values when a field is absent.
Direction: "OUT", func extractHeaderMeta(msg *quickfix.Message) (senderCompID string, msgSeqNum int, sendingTime time.Time) {
MsgType: msgType, if s, err := msg.Header.GetString(tag.SenderCompID); err == nil {
QuoteReqID: quoteReqID, senderCompID = s
Body: body,
ReceiveTime: time.Now(),
} }
if n, err := msg.Header.GetInt(tag.MsgSeqNum); err == nil {
msgSeqNum = n
}
if t, err := msg.Header.GetTime(tag.SendingTime); err == nil {
sendingTime = t
}
return
} }

104
src/client/fix/protocol.txt Normal file
View File

@ -0,0 +1,104 @@
Step 1
Direction: ← QuoteRequest (R)
Comentary: Tradeweb sends trade information to dealer.
FIX Message: 8=FIXT.1.1 9=869 35=R 34=2 49=TRADEWEB 52=20160401-16:53:19.992 56=TW1_CORI_TEST_12345_DLRDPL 131=LST_20160401_TW1_CORI_NY302485.18_1 146=1 55=AMXLMM 2.375 09/08/16 48=02364WBC8 22=1 460=12 167=CORP 762=REGCORIPRC 541=20160908 225=20110908 470=MX 223=2.375 106=AMERICA MOVIL SAB DE CV 54=2 38=1000000 64=20160406 15=USD 6110=Comms 60=20160401-16:53:19 662=99.98046875 22570=0.76 663=1 699=912828UR9 761=1 423=6 44=-999999 5023=0.001000 66=NY302485.18 6847=1 75=20160401 464=Y 20086=1 20074=Y 20075=Y 20077=LatAm Comms 20078=pddealer 20079=60 20081=60 20090=60 20072=60 20098=60 5745=1 20073=RFQ 20076=Y 20156=N 20130=2000000000 20138=JPM,MER 20175=20120308 2115=0 22630=0 20265=LatAm 453=3 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=1 523=Tradeweb0001 803=4002 448=DTCC 447=C 452=4 5114=2 5113=1 20169=A2 5113=0 20169=A- 10=121
Step 2
Direction: QuoteStatusReport (AI) →
Comentary: Dealer acknowledges trade.
FIX Message: 8=FIXT.1.1 9=198 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=2 52=20160401-16:53:19.988 131=LST_20160401_TW1_CORI_NY302485.18_1 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:53:19.988 297=0 10=106
Step 3
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=231 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=3 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 6153=emackdlr 44=.99 423=6 60=20160401-16:53:19.990 10=247
Step 4
Direction: ← QuoteAck (CW)
Comentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=3 49=TRADEWEB 52=20160401-16:53:25.102 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:25 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
Step 5
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=4 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.97 423=6 60=20160401-16:53:19.990 10=114
Step 6
Direction: ← QuoteAck (CW)
Comentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=4 49=TRADEWEB 52=20160401-16:53:30.055 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:30 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
Step 7
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=6 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.98 423=6 60=20160401-16:53:19.990 10=117
Step 8
Direction: ← QuoteAck (CW)
Commentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=5 49=TRADEWEB 52=20160401-16:53:35.071 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:35 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=163
Step 9
Direction: ← ExecutionReport (8)
Commentary: Tradeweb notifies dealer that the list trading (Due In time) has ended. ExecType=A
FIX Message: 8=FIXT.1.1 9=289 35=8 34=7 49=TRADEWEB 52=20160401-16:54:19.463 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.463 37=LST_20160401_TW1_CORI_NY302485.18_1 39=D 55=[N/A] 60=20160401-16:54:19 75=20160401 150=I 151=0 20086=1 10=073
Step 10
Direction: ExecutionAck (BN) →
Commentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=8 52=20160401-16:54:19.448 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.448 55=[N/A] 60=20160401-16:54:19.448 10=119
Step 11
Direction: ← QuoteResponse (AJ)
Commentary: Customer lifts after good-for time expires. Tradeweb informs dealer customer lift.
FIX Message: 8=FIXT.1.1 9=431 35=AJ 34=10 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 22=1 38=1000000 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:32 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 423=6 662=99.98046875 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 694=1 2115=1 20074=Y 20075=N 20076=N 20079=60 20082=60 20156=N 22570=0.760289080299 22630=0 10=183
Step 12
Direction: QuoteStatusReport (AI) →
Commentary: Dealer acknowledges customers trade acceptance.
FIX Message: 8=FIXT.1.1 9=206 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=12 52=20160401-16:55:32.849 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 131=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:32.849 297=0 10=189
Step 13
Direction: ← ExecutionReport (8)
Commentary: Tradeweb notifies the dealer that list has ended
FIX Message: incoming8=FIXT.1.1 9=290 35=8 34=11 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.855 37=LST_20160401_TW1_CORI_NY302485.18_1 39=A 55=[N/A] 60=20160401-16:55:32 75=20160401 150=A 151=0 20086=1 10=095
Step 14
Direction: ExecutionAck (BN) →
Commentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=13 52=20160401-16:55:32.852 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.852 55=[N/A] 60=20160401-16:55:32.852 10=155
Step 15
Direction: ExecutionReport (8) →
Commentary: Dealer accepts re-quote is not allowed. ExecType=F, OrdStatus=Filled
FIX Message: 8=FIXT.1.1 9=283 35=8 34=14 49=TW1_CORI_TEST_12345_DLRDPL 52=20160401-16:55:32.850 56=TRADEWEB 6=0 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 54=2 55=[N/A] 150=F 151=0 6153=LST_20160401_TW1_CORI_NY302485.18_1 22631=POSTTRADE_STRING 22632=POSTTRADE_STRING 10=155
Step 16
Direction: ← ExecutionAck (BN)
Commentary: Tradeweb acknowledges ExecutionReport
FIX Message: 8=FIXT.1.1 9=233 35=BN 34=12 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:37 1036=1 10=051
Step 17
Direction: ← ExecutionReport (8)
Commentary: Tradeweb informs Dealer of outcome of a list trade item. OrdStatus=Filled, ExecType=Trade
FIX Message: 8=FIXT.1.1 9=337 35=8 34=13 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 44=0.98 54=2 55=[N/A] 60=20160401-16:55:37 75=20160401 150=F 151=0 423=6 662=99.98046875 22570=0.760289080299 10=067
Step 18
Direction: ExecutionAck (BN) →
Comentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=15 52=20160401-16:55:37.910 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 55=[N/A] 60=20160401-16:55:37.910 10=078
Step 19
Direction:
Comentary: Autospot at Tradeweb Treasury composite.
FIX Message:
Step 20
Direction: ← ExecutionReport (8)
Comentary: Tradeweb sends ExecutionReport message to confirm final outcome of list trade item including applicable cover quote, spot, settlement money information, etc. OrdStatus=Filled, ExecType=Order Status TradeSummary = Y
FIX Message: 8=FIXT.1.1 9=733 35=8 34=14 49=TRADEWEB 52=20160401-16:55:40.292 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 22=1 37=LST_20160401_TW1_CORI_NY302485.18_1 38=1000000 39=2 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:40 64=20160406 75=20160401 150=F 151=0 167=CORP 236=1.74 423=6 453=2 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=2 523=Tradeweb0001 803=4002 523=YES 803=4003 526=TRD_20160401_TW1_CORI_23 662=99.98046875 1003=20160401.TW1.CORI.23 6153=emackdlr 6731=20160401.TW1.CORI.23 20115=100.265 20250=225000 22570=0.76 22630=0 22631=POSTTRADE_STRING 22634=160401.DLRX.TRSY.120 22636=Y 10=239
Step 21
Direction: ExecutionAck (BN) →
Comentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=16 52=20160401-16:55:40.287 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 55=[N/A] 60=20160401-16:55:40.287 10=182

View File

@ -1,11 +1,12 @@
CREATE TABLE IF NOT EXISTS qfixdpl_messages ( CREATE TABLE IF NOT EXISTS qfixdpl_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), id UUID PRIMARY KEY,
quote_req_id TEXT NOT NULL, sender_comp_id TEXT NOT NULL,
msg_seq_num BIGINT NOT NULL,
j_message JSONB NOT NULL, j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() created_at TIMESTAMPTZ NOT NULL
); );
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at); CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
CREATE INDEX IF NOT EXISTS idx_messages_session ON qfixdpl_messages(sender_comp_id, msg_seq_num);
CREATE TABLE IF NOT EXISTS qfixdpl_logs ( CREATE TABLE IF NOT EXISTS qfixdpl_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

View File

@ -2,6 +2,7 @@ package store
import ( import (
"encoding/json" "encoding/json"
"strconv"
"strings" "strings"
"time" "time"
@ -9,15 +10,20 @@ import (
"quantex.com/qfixdpl/src/domain" "quantex.com/qfixdpl/src/domain"
) )
func (p *Store) SaveMessage(msg domain.TradeMessage) error { func (p *Store) SaveMessage(msg domain.Message) error {
jsonBytes, err := json.Marshal(msg.JMessage) jsonBytes, err := json.Marshal(msg.JMessage)
if err != nil { if err != nil {
return tracerr.Errorf("error marshaling j_message: %w", err) return tracerr.Errorf("error marshaling j_message: %w", err)
} }
_, err = p.db.Exec( _, err = p.db.Exec(
"INSERT INTO qfixdpl_messages (quote_req_id, j_message) VALUES ($1, $2)", `INSERT INTO qfixdpl_messages (id, sender_comp_id, msg_seq_num, j_message, created_at)
msg.QuoteReqID, string(jsonBytes), VALUES ($1, $2, $3, $4, $5)`,
msg.ID,
msg.SenderCompID,
strconv.Itoa(msg.MsgSeqNum),
string(jsonBytes),
msg.CreatedAt.UTC().Format(time.RFC3339Nano),
) )
if err != nil { if err != nil {
return tracerr.Errorf("error inserting message: %w", err) return tracerr.Errorf("error inserting message: %w", err)
@ -41,25 +47,29 @@ func (p *Store) SaveLog(entry domain.LogEntry) error {
return nil return nil
} }
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) { func (p *Store) GetTodayMessages() ([]domain.Message, error) {
rows, err := p.db.Query( rows, err := p.db.Query(
"SELECT id, quote_req_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC", `SELECT id, sender_comp_id, msg_seq_num, j_message, created_at
FROM qfixdpl_messages
WHERE created_at >= current_date
ORDER BY created_at ASC`,
) )
if err != nil { if err != nil {
return nil, tracerr.Errorf("error querying today messages: %w", err) return nil, tracerr.Errorf("error querying today messages: %w", err)
} }
defer rows.Close() defer rows.Close()
var messages []domain.TradeMessage var messages []domain.Message
for rows.Next() { for rows.Next() {
var ( var (
id, quoteReqID string id, senderCompID string
msgSeqNum int
jMessageRaw []byte jMessageRaw []byte
createdAt time.Time createdAt time.Time
) )
if err := rows.Scan(&id, &quoteReqID, &jMessageRaw, &createdAt); err != nil { if err := rows.Scan(&id, &senderCompID, &msgSeqNum, &jMessageRaw, &createdAt); err != nil {
return nil, tracerr.Errorf("error scanning message row: %w", err) return nil, tracerr.Errorf("error scanning message row: %w", err)
} }
@ -68,11 +78,22 @@ func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err) return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
} }
messages = append(messages, domain.TradeMessage{ sendingTime, _ := jMessage.Header["SendingTime"].(time.Time)
if sendingTime.IsZero() {
if s, ok := jMessage.Header["SendingTime"].(string); ok {
if t, parseErr := time.Parse(time.RFC3339Nano, s); parseErr == nil {
sendingTime = t
}
}
}
messages = append(messages, domain.Message{
ID: id, ID: id,
QuoteReqID: quoteReqID, SenderCompID: senderCompID,
JMessage: jMessage, MsgSeqNum: msgSeqNum,
SendingTime: sendingTime,
CreatedAt: createdAt, CreatedAt: createdAt,
JMessage: jMessage,
}) })
} }

View File

@ -5,16 +5,7 @@ import "time"
// ListTrade es la representacion exportada de un trade de List Trading. // ListTrade es la representacion exportada de un trade de List Trading.
type ListTrade struct { type ListTrade struct {
QuoteReqID string `json:"quote_req_id"` QuoteRequest FixMessageJSON `json:"quote_request"`
ListID string `json:"list_id"`
Symbol string `json:"symbol"`
SecurityIDSrc string `json:"security_id_src"`
Currency string `json:"currency"`
Side string `json:"side"`
OrderQty string `json:"order_qty"`
SettlDate string `json:"settl_date"`
Price string `json:"price"`
OwnerTraderID string `json:"owner_trader_id"`
} }
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento. // FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
@ -22,17 +13,21 @@ type FixMessageJSON struct {
Direction string `json:"direction"` Direction string `json:"direction"`
MsgType string `json:"msg_type"` MsgType string `json:"msg_type"`
QuoteReqID string `json:"quote_req_id"` QuoteReqID string `json:"quote_req_id"`
Header map[string]interface{} `json:"header"` Header map[string]any `json:"header"`
Body map[string]interface{} `json:"body"` Body map[string]any `json:"body"`
Trailer map[string]any `json:"trailer"`
ReceiveTime time.Time `json:"receive_time"` ReceiveTime time.Time `json:"receive_time"`
} }
// TradeMessage es una fila de qfixdpl_messages. // Message es una fila de qfixdpl_messages, con la metadata del header FIX hoisted
type TradeMessage struct { // para que los consumidores puedan ordenar/filtrar sin parsear el JSON.
type Message struct {
ID string `json:"id"` ID string `json:"id"`
QuoteReqID string `json:"quote_req_id"` SenderCompID string `json:"sender_comp_id"`
JMessage FixMessageJSON `json:"j_message"` MsgSeqNum int `json:"msg_seq_num"`
SendingTime time.Time `json:"sending_time"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
JMessage FixMessageJSON `json:"j_message"`
} }
// LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs. // LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs.
@ -48,8 +43,8 @@ type Logs struct {
// PersistenceStore define la interfaz de persistencia. // PersistenceStore define la interfaz de persistencia.
type PersistenceStore interface { type PersistenceStore interface {
SaveMessage(msg TradeMessage) error SaveMessage(msg Message) error
SaveLog(entry LogEntry) error SaveLog(entry LogEntry) error
GetTodayMessages() ([]TradeMessage, error) GetTodayMessages() ([]Message, error)
GetLogsByQuoteReqID(quoteReqID string) (Logs, error) GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
} }