19 Commits

Author SHA1 Message Date
36b841fc66 pascal case 2026-05-06 14:11:50 -03:00
68238d309a adding endpoints 2026-05-06 11:56:12 -03:00
15a60bac92 handling errors 2026-05-05 15:34:01 -03:00
b58c8df905 fix log library 2026-03-26 12:05:18 -03:00
82d2e1b5f7 Persistance and recovery 2026-03-19 13:23:23 -03:00
51ef6e182d Documentation of 8.4 flow 2026-03-16 17:03:12 -03:00
e17675d973 Flow 8.4 list trading working 2026-03-16 12:44:52 -03:00
5f1d7038ac merging 2026-03-13 14:23:47 -03:00
710772b052 Add QuoteStatusReport and QuoteAck handlers 2026-03-13 14:20:38 -03:00
4e62548091 fix ids 2026-03-13 12:11:40 -03:00
fbcaac95f5 fixes in quotes 2026-03-13 11:35:37 -03:00
3998726100 respond automatically to quote requests 2026-03-12 17:10:31 -03:00
1f1c0afb9a QuoteRequest fix 2026-03-12 14:59:11 -03:00
d1aff0212e Merge pull request 'Add Quickfix library' (#1) from quickfix into develop
Reviewed-on: #1
2026-03-12 15:32:09 +00:00
0910b1e6c8 fixes 2026-03-12 10:23:19 -03:00
50c7f98c37 adding notifications 2026-03-11 12:40:24 -03:00
48373b6855 fix store logs 2026-03-10 17:38:47 -03:00
1d32854a09 fix logs 2026-03-10 17:16:51 -03:00
7e26addd80 improvement 2026-03-10 16:36:57 -03:00
18 changed files with 1733 additions and 206 deletions

View File

@ -62,7 +62,7 @@ linux-build: check-env swag # Build a linux version for prod environment. Set e=
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
deploy: # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=dpl
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/dpl/
fmt: download-versions # Apply the Go formatter to the code
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);

View File

@ -0,0 +1,341 @@
# Flow 8.4 — List Trading (Dealer Response)
## Overview
Flow 8.4 defines how a **dealer** responds to a client's Request for Quote (RFQ) through Tradeweb (TW) in a List Trading context. The key rule is:
> **The dealer NEVER sends an ExecutionReport (35=8). Only Tradeweb does.**
The dealer's role is limited to:
- Acknowledging messages (35=AI QuoteStatusReport, 35=BN ExecutionAck)
- Sending a price quote (35=S)
This document covers the **happy path** (client accepts) and two **alternative flows**:
- **Flow 8.6 — Trade Ended:** Client cancels before or after receiving the quote
- **QuoteAck Rejected:** TW rejects the dealer's quote
## Participants
| Abbreviation | Role |
|---|---|
| **TW** | Tradeweb — the platform that orchestrates the trade |
| **Dealer** | Us — responds to RFQs with quotes and acknowledges TW messages |
| **Client** | The counterparty requesting a quote (we never communicate with them directly) |
## Message Flow
```
TW Dealer
│ │
│ 1. QuoteRequest (35=R) │
│ ─────────────────────────────────────────> │
│ │
│ 2. QuoteStatusReport (35=AI) [ACK] │
│ <───────────────────────────────────────── │
│ │
│ 3. Quote (35=S) [price] │
│ <───────────────────────────────────────── │
│ │
│ 4. QuoteAck (35=CW) [ACCEPTED] │
│ ─────────────────────────────────────────> │
│ │
│ 5. QuoteResponse (35=AJ) [Hit/Lift] │
│ ─────────────────────────────────────────> │
│ │
│ 6. QuoteStatusReport (35=AI) [TRDREQACK] │
│ <───────────────────────────────────────── │
│ │
│ 7. ExecutionReport (35=8) [_LISTEND] │
│ ─────────────────────────────────────────> │
│ │
│ 8. ExecutionAck (35=BN) │
│ <───────────────────────────────────────── │
│ │
│ 9. ExecutionReport (35=8) [_TRDEND] │
│ ─────────────────────────────────────────> │
│ │
│ 10. ExecutionAck (35=BN) │
│ <───────────────────────────────────────── │
│ │
│ 11. ExecutionReport (35=8) [_TRDSUMM] │
│ ─────────────────────────────────────────> │
│ │
│ 12. ExecutionAck (35=BN) │
│ <───────────────────────────────────────── │
│ │
```
## Step-by-Step Detail
### Step 1 — QuoteRequest (35=R) — TW → Dealer
TW sends an RFQ on behalf of the client. Key fields:
| Tag | Field | Example | Notes |
|-----|-------|---------|-------|
| 131 | QuoteReqID | `LST_20260316_BYMA_CORI_NY1567246.1_1` | Always starts with `LST_` for List Trading |
| 66 | ListID | `NY1567246.1` | Identifies the list/inquiry |
| 48 | SecurityID | `040114HT0` | Bond identifier |
| 22 | SecurityIDSource | `1` (CUSIP) | Could also be `4` (ISIN) |
| 54 | Side | `2` (SELL) | The client's side — if client sells, dealer buys |
| 38 | OrderQty | `10000` | Quantity requested |
| 15 | Currency | `USD` | Settlement currency |
| 64 | SettlDate | `20260317` | Settlement date |
| 20073 | NegotiationType | `RFQ` | Must be RFQ for this flow |
**Validation:** The dealer must verify `LST_` prefix, non-empty `ListID`, and `NegotiationType=RFQ` before proceeding.
### Step 2 — QuoteStatusReport (35=AI) — Dealer → TW
The dealer acknowledges receipt of the QuoteRequest.
| Tag | Field | Value |
|-----|-------|-------|
| 131 | QuoteReqID | Same as received |
| 117 | QuoteID | Same as QuoteReqID |
| 297 | QuoteStatus | `0` (ACCEPTED) |
### Step 3 — Quote (35=S) — Dealer → TW
The dealer sends a price quote.
| Tag | Field | Example | Notes |
|-----|-------|---------|-------|
| 117 | QuoteID | Same as QuoteReqID | |
| 131 | QuoteReqID | Same as received | |
| 132 | BidPx | `99.60000000` | Set when client side is SELL (dealer bids) |
| 133 | OfferPx | `99.60000000` | Set when client side is BUY (dealer offers) |
| 44 | Price | `99.60000000` | The quote price |
| 423 | PriceType | `1` (Percentage) | |
| 537 | QuoteType | `211` (SEND_QUOTE) | |
### Step 4 — QuoteAck (35=CW) — TW → Dealer
TW confirms the quote was accepted.
| Tag | Field | Value |
|-----|-------|-------|
| 1865 | QuoteAckStatus | `1` (ACCEPTED) |
**Note:** If status is not ACCEPTED, the dealer logs the rejection (including the `Text` field) and cleans up the trade from memory. See [QuoteAck Rejected](#quoteack-rejected-quote-not-accepted) below.
### Step 5 — QuoteResponse (35=AJ) — TW → Dealer
The client has decided to trade (Hit/Lift the quote).
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 694 | QuoteRespType | `1` (Hit/Lift) | `2` would be Counter — that's flow 8.5, not handled here |
| 693 | QuoteRespID | `..._TRDREQ` | Always ends with `_TRDREQ` |
### Step 6 — QuoteStatusReport (35=AI) — Dealer → TW
The dealer acknowledges the trade request (TRDREQACK).
| Tag | Field | Value |
|-----|-------|-------|
| 131 | QuoteReqID | Same as received |
| 693 | QuoteRespID | Same as received |
| 297 | QuoteStatus | `0` (ACCEPTED) |
### Step 7 — ExecutionReport (35=8) `_LISTEND` — TW → Dealer
TW signals that the due-in window for the list has closed.
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 17 | ExecID | `..._LISTEND-{timestamp}` | Contains `_LISTEND` |
| 150 | ExecType | `A` (PendingNew) | |
| 39 | OrdStatus | `A` (PendingNew) | |
**Dealer action:** Send ExecutionAck only. **Do NOT send an ExecutionReport back.**
### Step 8 — ExecutionAck (35=BN) — Dealer → TW
| Tag | Field | Value |
|-----|-------|-------|
| 37 | OrderID | Same as received |
| 17 | ExecID | Same as received |
| 1036 | ExecAckStatus | `1` (ACCEPTED) |
### Step 9 — ExecutionReport (35=8) `_TRDEND` — TW → Dealer
TW sends the trade result.
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 17 | ExecID | `..._TRDEND-{timestamp}` | Contains `_TRDEND` |
| 150 | ExecType | `F` (Trade) | The trade was executed |
| 39 | OrdStatus | `2` (Filled) | |
| 44 | Price | `99.6` | Final execution price |
**Dealer action:** Send ExecutionAck. Clean up internal trade tracking state.
### Step 10 — ExecutionAck (35=BN) — Dealer → TW
Same format as Step 8.
### Step 11 — ExecutionReport (35=8) `_TRDSUMM` — TW → Dealer
TW sends the full trade summary with additional details (parties, settlement info, trade IDs).
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 17 | ExecID | `..._TRDSUMM-{timestamp}` | Contains `_TRDSUMM` |
| 150 | ExecType | `F` (Trade) | |
| 39 | OrdStatus | `2` (Filled) | |
| 453 | NoPartyIDs | Party information | Counterparty details |
| 526 | SecondaryClOrdID | `TRD_...` | Tradeweb trade reference |
| 1003 | TradeID | `20260316.BYMA.CORI.230` | Unique trade identifier |
**Dealer action:** Send ExecutionAck. Log the summary for audit/reconciliation.
### Step 12 — ExecutionAck (35=BN) — Dealer → TW
Same format as Step 8.
---
## Alternative Flows
### Flow 8.6 — Trade Ended (Client Cancels)
The client changes their mind and ends the trade. This can happen at any point after the QuoteRequest — even before the dealer's quote arrives. TW informs the dealer via QuoteResponse (35=AJ) messages with `_TRDEND` and `_TRDSUMM` suffixes instead of the ExecutionReport chain.
> **Critical:** The dealer MUST send a QuoteStatusReport (35=AI) ACK for every QuoteResponse. If no ACK is sent, TW will retry the message indefinitely (~every 11 seconds).
```
TW Dealer
│ │
│ 1. QuoteRequest (35=R) │
│ ─────────────────────────────────────────> │
│ │
│ 2. QuoteStatusReport (35=AI) [ACK] │
│ <───────────────────────────────────────── │
│ │
│ ┌─── Client ends trade ───┐ │
│ │ Meanwhile, dealer may │ │
│ │ still send Quote (S) │ │
│ └─────────────────────────┘ │
│ │
│ 3. QuoteResponse (35=AJ) [_TRDEND] │
│ QuoteRespType=7 (End Trade) │
│ ─────────────────────────────────────────> │
│ │
│ 4. QuoteStatusReport (35=AI) [ACK] │
│ <───────────────────────────────────────── │
│ │
│ 5. QuoteAck (35=CW) [REJECTED] │
│ (if quote was sent, TW rejects it) │
│ ─────────────────────────────────────────> │
│ │
│ 6. QuoteResponse (35=AJ) [_TRDSUMM] │
│ QuoteRespType=7, TradeSummary=Y │
│ ─────────────────────────────────────────> │
│ │
│ 7. QuoteStatusReport (35=AI) [ACK] │
│ <───────────────────────────────────────── │
│ │
```
#### Step 3 — QuoteResponse (35=AJ) `_TRDEND` — TW → Dealer
TW notifies that the client ended the trade.
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 693 | QuoteRespID | `..._TRDEND` | Suffix identifies this as trade end |
| 694 | QuoteRespType | `7` (End Trade) | |
| 131 | QuoteReqID | Same as original | |
**Dealer action:** Send QuoteStatusReport (35=AI) with `693=QuoteRespID` and `297=0` (ACCEPTED).
#### Step 5 — QuoteAck (35=CW) `REJECTED` — TW → Dealer
If the dealer's Quote (35=S) crossed with the TRDEND, TW rejects it. The QuoteAckStatus will be `2` (REJECTED) with a text like "DPL DLRQUOTE received in an invalid state."
**Dealer action:** Log the rejection and clean up the trade from memory.
#### Step 6 — QuoteResponse (35=AJ) `_TRDSUMM` — TW → Dealer
TW sends the final trade summary confirming the outcome.
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 693 | QuoteRespID | `..._TRDSUMM` | Final summary message |
| 694 | QuoteRespType | `7` (End Trade) | |
| 22636 | TradeSummary | `Y` | Confirms this is the summary |
**Dealer action:** Send QuoteStatusReport (35=AI) ACK. Clean up the trade from memory. This is the **terminal message** — no more messages will follow for this QuoteReqID.
---
### QuoteAck Rejected (Quote Not Accepted)
If TW rejects the dealer's Quote (35=CW with status != ACCEPTED), the trade is dead from the dealer's perspective.
```
TW Dealer
│ │
│ 1-3. (same as happy path) │
│ │
│ 4. QuoteAck (35=CW) [REJECTED] │
│ QuoteAckStatus != 1 │
│ ─────────────────────────────────────────> │
│ │
│ Trade is terminated. │
│ │
```
**Dealer action:** Log the rejection (including the `Text` field with the reason) and remove the trade from memory. No further action needed — TW may or may not send subsequent messages for this QuoteReqID.
---
## QuoteRespID Suffix Routing in `handleQuoteResponse`
All QuoteResponse (35=AJ) messages are routed by the suffix of the `QuoteRespID` (tag 693):
```
QuoteRespID ends with "_TRDREQ" → Trade request (flow 8.4 happy path) — ACK
QuoteRespID ends with "_TRDEND" → Trade ended by client (flow 8.6) — ACK
QuoteRespID ends with "_TRDSUMM" → Trade summary (flow 8.6 final) — ACK + cleanup
QuoteRespID ends with "_LISTEND" → List ended — ACK
Other suffix → Ignored (logged)
```
## Code Reference
The implementation lives in `src/client/fix/manager.go`:
| Handler | Triggers on | Action |
|---------|------------|--------|
| `handleQuoteRequest` | 35=R | Sends 35=AI (ack) + 35=S (quote) |
| `handleQuoteAck` | 35=CW | If rejected: logs + cleans up trade. If accepted: logs |
| `handleQuoteResponse` | 35=AJ | Sends 35=AI (ACK). Routes by QuoteRespID suffix. Cleans up on `_TRDSUMM` |
| `handleExecutionReport` | 35=8 | Sends 35=BN (ack) + routes by ExecID suffix |
| `sendQuoteStatusReport` | — | Builds 35=AI for QuoteRequest ack |
| `sendTradeRequestAck` | — | Builds 35=AI for QuoteResponse ack (all suffixes) |
| `sendExecutionAck` | — | Builds 35=BN for ExecutionReport ack |
### ExecID Routing in `handleExecutionReport`
```
ExecID contains "_LISTEND" → Log only, await trade result
ExecID contains "_TRDEND" → Log trade end
ExecID contains "_TRDSUMM" → Log trade summary + cleanup trade from memory
ExecType = F (fallback) → Log generic trade result
```
The order matters: ExecID suffix checks run before ExecType checks, because `_TRDEND` and `_TRDSUMM` both have `ExecType=F`.
### Trade Cleanup Paths
A trade is removed from memory in any of these scenarios:
| Trigger | Message | Condition |
|---------|---------|-----------|
| QuoteAck rejected | 35=CW | `QuoteAckStatus != ACCEPTED` |
| QuoteResponse summary | 35=AJ | `QuoteRespID` ends with `_TRDSUMM` (flow 8.6) |
| ExecutionReport summary | 35=8 | `ExecID` contains `_TRDSUMM` (flow 8.4) |
The `loadActiveTrades` recovery function replays today's messages and applies the same cleanup rules to reconstruct accurate state on restart.

View File

@ -4,12 +4,14 @@ import (
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis"
"github.com/sasha-s/go-deadlock"
uuid "github.com/satori/go.uuid"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version"
@ -32,22 +34,20 @@ type Controller struct {
pool *redis.Pool
userData app.UserDataProvider
store *store.Store
orderStore domain.OrderStore
fixSender domain.FIXSender
tradeProvider TradeProvider
config Config
notify domain.Notifier
authMutex deadlock.Mutex
}
func newController(pool *redis.Pool, userData app.UserDataProvider,
s *store.Store, orderStore domain.OrderStore, fixSender domain.FIXSender, config Config, n domain.Notifier,
s *store.Store, tradeProvider TradeProvider, config Config, n domain.Notifier,
) *Controller {
return &Controller{
pool: pool,
userData: userData,
store: s,
orderStore: orderStore,
fixSender: fixSender,
tradeProvider: tradeProvider,
config: config,
notify: n,
}
@ -293,50 +293,92 @@ func allowed(origin string, config Config) bool {
return false
}
// GetOrders godoc
// @Summary List received FIX orders
// @Description Returns all NewOrderSingle messages received via FIX
// GetTrades godoc
// @Summary List active trades
// @Description Returns all active List Trading trades
// @Tags fix
// @Produce json
// @Success 200 {array} domain.Order
// @Router /qfixdpl/v1/orders [get]
func (cont *Controller) GetOrders(ctx *gin.Context) {
orders := cont.orderStore.GetOrders()
ctx.JSON(http.StatusOK, orders)
// @Success 200 {array} domain.ListTrade
// @Router /qfixdpl/v1/trades [get]
func (cont *Controller) GetTrades(ctx *gin.Context) {
trades := cont.tradeProvider.GetTrades()
ctx.JSON(http.StatusOK, trades)
}
// GetLogs godoc
// @Summary Get raw FIX logs for a trade
// @Description Returns raw FIX message logs for a given QuoteReqID
// @Tags fix
// @Produce json
// @Param quoteReqID path string true "QuoteReqID"
// @Success 200 {object} domain.Logs
// @Router /qfixdpl/v1/trades/{quoteReqID}/logs [get]
func (cont *Controller) GetLogs(ctx *gin.Context) {
quoteReqID := ctx.Param("quoteReqID")
logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID)
if err != nil {
err = tracerr.Errorf("GetLogs: error fetching logs (quoteReqID=%s): %w", quoteReqID, err)
slog.Error(err.Error())
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
return
}
ctx.JSON(http.StatusOK, logs)
}
// GetPendingQuoteRequests godoc
// @Summary List pending QuoteRequests
// @Description Returns all QuoteRequests received from TW that have not been quoted yet by the dealer
// @Tags fix
// @Produce json
// @Success 200 {array} domain.ListTrade
// @Router /qfixdpl/v1/quote-requests [get]
func (cont *Controller) GetPendingQuoteRequests(ctx *gin.Context) {
pending := cont.tradeProvider.GetPendingQuoteRequests()
ctx.JSON(http.StatusOK, pending)
}
// SendQuote godoc
// @Summary Send a FIX Quote
// @Description Sends a Quote (MsgType S) back to the FIX client for a given order
// @Summary Send a Quote for a pending QuoteRequest
// @Description Builds and sends a Quote (35=S) to TW for an existing QuoteRequest at the given price
// @Tags fix
// @Accept json
// @Produce json
// @Param quote body QuoteRequest true "Quote details"
// @Param body body SendQuoteRequest true "Quote to send"
// @Success 200 {object} Msg
// @Failure 400 {object} HTTPError
// @Failure 404 {object} HTTPError
// @Failure 409 {object} HTTPError
// @Failure 500 {object} HTTPError
// @Router /qfixdpl/v1/quotes [post]
func (cont *Controller) SendQuote(ctx *gin.Context) {
var req QuoteRequest
var req SendQuoteRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
bidPx, offerPx, bidSize, offerSize, err := req.toDecimals()
price, err := decimal.NewFromString(req.Price)
if err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
ctx.JSON(http.StatusBadRequest, HTTPError{Error: "invalid price: " + err.Error()})
return
}
if err = cont.fixSender.SendQuote(req.ClOrdID, req.QuoteID, req.Symbol, req.Currency, bidPx, offerPx, bidSize, offerSize); err != nil {
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: err.Error()})
if err := cont.tradeProvider.SendQuote(req.QuoteReqID, price); err != nil {
msg := err.Error()
switch {
case strings.Contains(msg, "not found"):
ctx.JSON(http.StatusNotFound, HTTPError{Error: "quoteReqID not found"})
case strings.Contains(msg, "already sent"):
ctx.JSON(http.StatusConflict, HTTPError{Error: "quote already sent for this quoteReqID"})
default:
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "failed to send quote"})
}
return
}
ctx.JSON(http.StatusOK, Msg{Text: "quote sent"})
ctx.JSON(http.StatusOK, Msg{Text: "Quote sent"})
}

View File

@ -1,11 +1,5 @@
package rest
import (
"fmt"
"github.com/shopspring/decimal"
)
type HTTPError struct {
Error string
}
@ -23,41 +17,7 @@ type Session struct {
Email string
}
type QuoteRequest struct {
ClOrdID string `json:"cl_ord_id" binding:"required"`
QuoteID string `json:"quote_id" binding:"required"`
Symbol string `json:"symbol" binding:"required"`
Currency string `json:"currency"`
BidPx string `json:"bid_px" binding:"required"`
OfferPx string `json:"offer_px" binding:"required"`
BidSize string `json:"bid_size"`
OfferSize string `json:"offer_size"`
}
func (r QuoteRequest) toDecimals() (bidPx, offerPx, bidSize, offerSize decimal.Decimal, err error) {
bidPx, err = decimal.NewFromString(r.BidPx)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid bid_px: %w", err)
}
offerPx, err = decimal.NewFromString(r.OfferPx)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid offer_px: %w", err)
}
if r.BidSize != "" {
bidSize, err = decimal.NewFromString(r.BidSize)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid bid_size: %w", err)
}
}
if r.OfferSize != "" {
offerSize, err = decimal.NewFromString(r.OfferSize)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid offer_size: %w", err)
}
}
return bidPx, offerPx, bidSize, offerSize, nil
type SendQuoteRequest struct {
QuoteReqID string `json:"QuoteReqID" binding:"required"`
Price string `json:"Price" binding:"required" example:"99.6"`
}

View File

@ -21,7 +21,9 @@ func SetRoutes(api *API) {
qfixdpl := v1.Group("/")
qfixdpl.Use(cont.AuthRequired)
qfixdpl.GET("/health", cont.HealthCheck)
qfixdpl.GET("/orders", cont.GetOrders)
qfixdpl.GET("/trades", cont.GetTrades)
qfixdpl.GET("/trades/:quoteReqID/logs", cont.GetLogs)
qfixdpl.GET("/quote-requests", cont.GetPendingQuoteRequests)
qfixdpl.POST("/quotes", cont.SendQuote)
backoffice := qfixdpl.Group("/backoffice")

View File

@ -7,6 +7,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis"
"github.com/shopspring/decimal"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version"
@ -16,6 +17,13 @@ import (
"quantex.com/qfixdpl/src/domain"
)
// TradeProvider exposes trade data from the FIX manager.
type TradeProvider interface {
GetTrades() []domain.ListTrade
GetPendingQuoteRequests() []domain.ListTrade
SendQuote(quoteReqID string, price decimal.Decimal) error
}
const RedisMaxIdle = 3000 // In ms
type API struct {
@ -32,7 +40,7 @@ type Config struct {
EnableJWTAuth bool
}
func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore domain.OrderStore, fixSender domain.FIXSender, config Config, notify domain.Notifier) *API {
func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {
// Set up Gin
var engine *gin.Engine
if version.Environment() == version.EnvironmentTypeProd {
@ -58,7 +66,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore d
}
api := &API{
Controller: newController(NewPool(), userData, storeInstance, orderStore, fixSender, config, notify),
Controller: newController(NewPool(), userData, storeInstance, tradeProvider, config, notify),
Router: engine,
Port: config.Port,
}

View File

@ -1,41 +0,0 @@
package data
import (
"sync"
"quantex.com/qfixdpl/src/domain"
)
type InMemoryOrderStore struct {
mu sync.RWMutex
orders []domain.Order
}
func NewOrderStore() *InMemoryOrderStore {
return &InMemoryOrderStore{}
}
func (s *InMemoryOrderStore) SaveOrder(order domain.Order) {
s.mu.Lock()
defer s.mu.Unlock()
s.orders = append(s.orders, order)
}
func (s *InMemoryOrderStore) GetOrders() []domain.Order {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]domain.Order, len(s.orders))
copy(result, s.orders)
return result
}
func (s *InMemoryOrderStore) GetOrderByClOrdID(id string) (domain.Order, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, o := range s.orders {
if o.ClOrdID == id {
return o, true
}
}
return domain.Order{}, false
}

View File

@ -5,22 +5,42 @@ import (
"log/slog"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/tag"
"quantex.com/qfixdpl/src/domain"
)
type application struct {
router *quickfix.MessageRouter
notifier domain.Notifier
onLogon func(quickfix.SessionID)
onLogout func(quickfix.SessionID)
onQuote func(quote.Quote, quickfix.SessionID)
onQuoteRequest func(quoterequest.QuoteRequest, quickfix.SessionID)
onQuoteAck func(quoteack.QuoteAck, quickfix.SessionID)
onQuoteResponse func(quoteresponse.QuoteResponse, quickfix.SessionID)
onExecutionReport func(executionreport.ExecutionReport, quickfix.SessionID)
onExecutionAck func(executionack.ExecutionAck, quickfix.SessionID)
onRawMessage func(direction string, msg *quickfix.Message)
}
func newApplication() *application {
func newApplication(n domain.Notifier) *application {
app := &application{
router: quickfix.NewMessageRouter(),
notifier: n,
}
app.router.AddRoute(quote.Route(app.handleQuote))
app.router.AddRoute(quoteack.Route(app.handleQuoteAck))
app.router.AddRoute(quoterequest.Route(app.handleQuoteRequest))
app.router.AddRoute(quoteresponse.Route(app.handleQuoteResponse))
app.router.AddRoute(executionack.Route(app.handleExecutionAck))
app.router.AddRoute(executionreport.Route(app.handleExecutionReport))
return app
}
@ -38,6 +58,9 @@ func (a *application) OnLogon(sessionID quickfix.SessionID) {
func (a *application) OnLogout(sessionID quickfix.SessionID) {
slog.Info("FIX session logged out", "session", sessionID.String())
go a.notifier.SendMsg(domain.MessageChannelError, "Logout", domain.MessageStatusWarning, nil)
if a.onLogout != nil {
a.onLogout(sessionID)
}
@ -45,14 +68,87 @@ func (a *application) OnLogout(sessionID quickfix.SessionID) {
func (a *application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {}
func (a *application) ToApp(_ *quickfix.Message, _ quickfix.SessionID) error { return nil }
func (a *application) ToApp(msg *quickfix.Message, _ quickfix.SessionID) error {
if a.onRawMessage != nil {
a.onRawMessage("OUT", msg)
}
return nil
}
func (a *application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError {
return nil
}
func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError {
return a.router.Route(msg, sessionID)
beginString, _ := msg.Header.GetBytes(tag.BeginString)
msgType, _ := msg.Header.GetBytes(tag.MsgType)
var applVerID quickfix.FIXString
msg.Header.GetField(tag.ApplVerID, &applVerID)
slog.Info("FIX FromApp received",
"beginString", string(beginString),
"msgType", string(msgType),
"applVerID", string(applVerID),
"session", sessionID.String(),
"rawMsg", msg.String(),
)
if a.onRawMessage != nil {
a.onRawMessage("IN", msg)
}
rejErr := a.router.Route(msg, sessionID)
if rejErr != nil {
slog.Error("FIX FromApp routing failed",
"msgType", string(msgType),
"error", rejErr.Error(),
"isBusinessReject", rejErr.IsBusinessReject(),
)
}
return rejErr
}
func (a *application) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) quickfix.MessageRejectError {
quoteReqID, err := msg.GetQuoteReqID()
if err != nil {
slog.Error("QuoteRequest missing QuoteReqID", "error", err.Error())
return err
}
slog.Info("QuoteRequest received",
"quoteReqID", quoteReqID,
"session", sessionID.String(),
)
if a.onQuoteRequest != nil {
a.onQuoteRequest(msg, sessionID)
}
return nil
}
func (a *application) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) quickfix.MessageRejectError {
quoteReqID, _ := msg.GetQuoteReqID()
quoteID, _ := msg.GetQuoteID()
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
slog.Info("QuoteAck received",
"quoteReqID", quoteReqID,
"quoteID", quoteID,
"quoteAckStatus", status,
"text", text,
"session", sessionID.String(),
)
if a.onQuoteAck != nil {
a.onQuoteAck(msg, sessionID)
}
return nil
}
func (a *application) handleQuote(msg quote.Quote, sessionID quickfix.SessionID) quickfix.MessageRejectError {
@ -71,3 +167,64 @@ func (a *application) handleQuote(msg quote.Quote, sessionID quickfix.SessionID)
return nil
}
func (a *application) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) quickfix.MessageRejectError {
quoteRespID, _ := msg.GetQuoteRespID()
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespType, _ := msg.GetQuoteRespType()
slog.Info("QuoteResponse received",
"quoteRespID", quoteRespID,
"quoteReqID", quoteReqID,
"quoteRespType", quoteRespType,
"session", sessionID.String(),
)
if a.onQuoteResponse != nil {
a.onQuoteResponse(msg, sessionID)
}
return nil
}
func (a *application) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) quickfix.MessageRejectError {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
status, _ := msg.GetExecAckStatus()
slog.Info("ExecutionAck received",
"execID", execID,
"orderID", orderID,
"execAckStatus", status,
"session", sessionID.String(),
)
if a.onExecutionAck != nil {
a.onExecutionAck(msg, sessionID)
}
return nil
}
func (a *application) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
listID, _ := msg.GetListID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
slog.Info("ExecutionReport received",
"execID", execID,
"orderID", orderID,
"listID", listID,
"execType", execType,
"ordStatus", ordStatus,
"session", sessionID.String(),
)
if a.onExecutionReport != nil {
a.onExecutionReport(msg, sessionID)
}
return nil
}

View File

@ -1,9 +1,9 @@
package fix
import (
"fmt"
"log/slog"
"os"
"strings"
"sync"
"time"
@ -12,60 +12,116 @@ import (
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/enum"
"quantex.com/qfixdpl/quickfix/gen/field"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport"
filelog "quantex.com/qfixdpl/quickfix/log/file"
"quantex.com/qfixdpl/quickfix/store/file"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
type listTrade struct {
QuoteReqID string
ListID string
Symbol string
SecurityIDSrc enum.SecurityIDSource
Currency string
Side enum.Side
OrderQty decimal.Decimal
SettlDate string
Price decimal.Decimal
OwnerTraderID string
SessionID quickfix.SessionID
Quoted bool
}
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
type Manager struct {
initiator *quickfix.Initiator
app *application
sessionsMu sync.RWMutex
sessions map[string]quickfix.SessionID
orderStore domain.OrderStore
tradesMu sync.RWMutex
trades map[string]*listTrade
store domain.PersistenceStore
notify domain.Notifier
cfg app.FIXConfig
}
func NewManager(cfg app.FIXConfig, orderStore domain.OrderStore, notify domain.Notifier) *Manager {
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
return &Manager{
sessions: make(map[string]quickfix.SessionID),
orderStore: orderStore,
trades: make(map[string]*listTrade),
store: store,
notify: notify,
cfg: cfg,
}
}
func (m *Manager) Start() error {
fixApp := newApplication()
fixApp := newApplication(m.notify)
fixApp.onLogon = m.onLogon
fixApp.onLogout = m.onLogout
fixApp.onQuoteRequest = m.handleQuoteRequest
fixApp.onQuoteAck = m.handleQuoteAck
fixApp.onQuoteResponse = m.handleQuoteResponse
fixApp.onExecutionReport = m.handleExecutionReport
fixApp.onExecutionAck = m.handleExecutionAck
fixApp.onRawMessage = m.handleRawMessage
m.app = fixApp
if err := m.loadActiveTrades(); err != nil {
err = tracerr.Errorf("failed to load active trades from DB, starting with empty state: %w", err)
slog.Error(err.Error())
}
f, err := os.Open(m.cfg.SettingsFile)
if err != nil {
return fmt.Errorf("opening FIX settings file %q: %w", m.cfg.SettingsFile, err)
err = tracerr.Errorf("error opening FIX settings file %q: %w", m.cfg.SettingsFile, err)
slog.Error(err.Error())
return err
}
defer f.Close()
settings, err := quickfix.ParseSettings(f)
if err != nil {
return fmt.Errorf("parsing FIX settings: %w", err)
err = tracerr.Errorf("error parsing FIX settings: %w", err)
slog.Error(err.Error())
return err
}
storeFactory := quickfix.NewMemoryStoreFactory()
logFactory := quickfix.NewNullLogFactory()
storeFactory := file.NewStoreFactory(settings)
logFactory, err := filelog.NewLogFactory(settings)
if err != nil {
err = tracerr.Errorf("error creating file log factory: %w", err)
slog.Error(err.Error())
return err
}
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
if err != nil {
return fmt.Errorf("creating FIX initiator: %w", err)
err = tracerr.Errorf("error creating FIX initiator: %w", err)
slog.Error(err.Error())
return err
}
m.initiator = initiator
if err = m.initiator.Start(); err != nil {
return fmt.Errorf("starting FIX initiator: %w", err)
err = tracerr.Errorf("error starting FIX initiator: %w", err)
slog.Error(err.Error())
return err
}
slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile)
@ -92,51 +148,550 @@ func (m *Manager) onLogout(sessionID quickfix.SessionID) {
m.sessionsMu.Unlock()
}
// SendQuote implements domain.FIXSender.
func (m *Manager) SendQuote(clOrdID, quoteID, symbol, currency string, bidPx, offerPx, bidSize, offerSize decimal.Decimal) error {
m.sessionsMu.RLock()
var sessionID quickfix.SessionID
var ok bool
for _, sid := range m.sessions {
sessionID = sid
ok = true
break
// sendQuoteStatusReport sends a QuoteStatusReport (35=AI) to acknowledge the incoming QuoteRequest.
func (m *Manager) sendQuoteStatusReport(quoteReqID, ownerTraderID string, sessionID quickfix.SessionID) error {
qsr := quotestatusreport.New(
field.NewTransactTime(time.Now()),
field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED),
)
qsr.SetQuoteReqID(quoteReqID)
qsr.SetQuoteID(quoteReqID)
qsr.SetSymbol("[N/A]")
if ownerTraderID != "" {
qsr.SetOwnerTraderID(ownerTraderID)
}
m.sessionsMu.RUnlock()
return quickfix.SendToTarget(qsr, sessionID)
}
// sendTradeRequestAck sends a QuoteStatusReport (35=AI) to acknowledge a trade request (TRDREQACK).
func (m *Manager) sendTradeRequestAck(quoteReqID, quoteRespID string, sessionID quickfix.SessionID) error {
qsr := quotestatusreport.New(
field.NewTransactTime(time.Now()),
field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED),
)
qsr.SetQuoteReqID(quoteReqID)
qsr.SetQuoteRespID(quoteRespID)
qsr.SetSymbol("[N/A]")
return quickfix.SendToTarget(qsr, sessionID)
}
// sendExecutionAck sends an ExecutionAck (35=BN) to acknowledge an incoming ExecutionReport.
func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID quickfix.SessionID) error {
bn := executionack.New(
field.NewOrderID(orderID),
field.NewExecID(execID),
field.NewExecAckStatus(enum.ExecAckStatus_ACCEPTED),
)
bn.SetClOrdID(clOrdID)
bn.SetSymbol("[N/A]")
bn.SetTransactTime(time.Now())
return quickfix.SendToTarget(bn, sessionID)
}
// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge) followed by a Quote at price 99.6.
func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) {
quoteReqID, err := msg.GetQuoteReqID()
if err != nil {
err := tracerr.Errorf("handleQuoteRequest: missing QuoteReqID: %w", err)
slog.Error(err.Error())
return
}
// Validate LST_ prefix for List Trading flow.
if !strings.HasPrefix(quoteReqID, "LST_") {
slog.Warn("handleQuoteRequest: QuoteReqID missing LST_ prefix, ignoring", "quoteReqID", quoteReqID)
return
}
var (
symbol, currency, ownerTraderID, settlDate, listID, negotiationType string
side enum.Side
secIDSource enum.SecurityIDSource
orderQty decimal.Decimal
)
relatedSyms, relErr := msg.GetNoRelatedSym()
if relErr == nil && relatedSyms.Len() > 0 {
sym := relatedSyms.Get(0)
symbol, _ = sym.GetSecurityID()
secIDSource, _ = sym.GetSecurityIDSource()
currency, _ = sym.GetCurrency()
side, _ = sym.GetSide()
ownerTraderID, _ = sym.GetOwnerTraderID()
orderQty, _ = sym.GetOrderQty()
settlDate, _ = sym.GetSettlDate()
listID, _ = sym.GetListID()
negotiationType, _ = sym.GetNegotiationType()
}
if listID == "" {
slog.Warn("handleQuoteRequest: missing ListID", "quoteReqID", quoteReqID)
return
}
if negotiationType != "RFQ" {
slog.Warn("handleQuoteRequest: unexpected NegotiationType", "quoteReqID", quoteReqID, "negotiationType", negotiationType)
return
}
// Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleQuoteRequest: failed to send QuoteStatusReport (quoteReqID=%s): %w", quoteReqID, ackErr)
slog.Error(ackErr.Error())
return
}
slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID)
sIDSource := enum.SecurityIDSource_ISIN_NUMBER
if secIDSource == enum.SecurityIDSource_CUSIP {
sIDSource = enum.SecurityIDSource_CUSIP
}
// Store trade state as pending; Quote (35=S) is sent later via REST endpoint.
m.tradesMu.Lock()
m.trades[quoteReqID] = &listTrade{
QuoteReqID: quoteReqID,
ListID: listID,
Symbol: symbol,
SecurityIDSrc: sIDSource,
Currency: currency,
Side: side,
OrderQty: orderQty,
SettlDate: settlDate,
OwnerTraderID: ownerTraderID,
SessionID: sessionID,
Quoted: false,
}
m.tradesMu.Unlock()
// Persist structured message (outside mutex).
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
// Persist outgoing QuoteStatusReport.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
}))
}
// handleQuoteAck handles an incoming QuoteAck (35=CW).
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
m.persistMessage(quoteReqID, parseQuoteAck(msg))
if status != enum.QuoteAckStatus_ACCEPTED {
err := tracerr.Errorf("handleQuoteAck: quote rejected by TW (quoteReqID=%s, quoteAckStatus=%s, text=%s)", quoteReqID, string(status), text)
slog.Error(err.Error())
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
return
}
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
}
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespID, _ := msg.GetQuoteRespID()
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
return
}
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
// Always send ACK regardless of whether the trade is in our map.
// TW will keep retrying until it receives an ACK.
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleQuoteResponse: failed to send ACK (quoteReqID=%s, quoteRespID=%s): %w", quoteReqID, quoteRespID, ackErr)
slog.Error(ackErr.Error())
return
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
}))
// _TRDSUMM is the final message — clean up the trade.
if isTrdSumm {
slog.Info("Trade summary received, cleaning up", "quoteReqID", quoteReqID)
m.tradesMu.Lock()
delete(m.trades, quoteReqID)
m.tradesMu.Unlock()
}
}
// handleExecutionReport handles an incoming ExecutionReport (35=8).
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
clOrdID, _ := msg.GetClOrdID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
listID, _ := msg.GetListID()
slog.Info("handleExecutionReport received",
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
)
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
ackErr = tracerr.Errorf("handleExecutionReport: failed to send ExecutionAck (execID=%s): %w", execID, ackErr)
slog.Error(ackErr.Error())
} else {
slog.Info("ExecutionAck sent", "execID", execID)
}
switch {
case strings.Contains(execID, "_LISTEND"):
slog.Info("List ended (due-in closed), awaiting trade result from TW",
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW, cleaning up",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
m.tradesMu.Lock()
delete(m.trades, clOrdID)
m.tradesMu.Unlock()
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
}))
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}
// GetTrades returns a snapshot of all active trades.
func (m *Manager) GetTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
trades = append(trades, toDomainListTrade(t))
}
return trades
}
// GetPendingQuoteRequests returns trades that have received a QuoteRequest but not yet been quoted by the dealer.
func (m *Manager) GetPendingQuoteRequests() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
pending := make([]domain.ListTrade, 0)
for _, t := range m.trades {
if !t.Quoted {
pending = append(pending, toDomainListTrade(t))
}
}
return pending
}
func toDomainListTrade(t *listTrade) domain.ListTrade {
return domain.ListTrade{
QuoteReqID: t.QuoteReqID,
ListID: t.ListID,
Symbol: t.Symbol,
SecurityIDSrc: string(t.SecurityIDSrc),
Currency: t.Currency,
Side: string(t.Side),
OrderQty: t.OrderQty.String(),
SettlDate: t.SettlDate,
Price: t.Price.String(),
OwnerTraderID: t.OwnerTraderID,
}
}
// SendQuote builds and sends a Quote (35=S) for an existing pending QuoteRequest at the given price.
func (m *Manager) SendQuote(quoteReqID string, price decimal.Decimal) error {
m.tradesMu.Lock()
t, ok := m.trades[quoteReqID]
if !ok {
return fmt.Errorf("no active FIX session")
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: quoteReqID %s not found", quoteReqID)
slog.Error(err.Error())
return err
}
if t.Quoted {
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: quote already sent for quoteReqID %s", quoteReqID)
slog.Error(err.Error())
return err
}
sessionID := t.SessionID
if sessionID == (quickfix.SessionID{}) {
sessionID = m.anyActiveSessionID()
if sessionID == (quickfix.SessionID{}) {
m.tradesMu.Unlock()
err := tracerr.Errorf("SendQuote: no active FIX session for quoteReqID %s", quoteReqID)
slog.Error(err.Error())
return err
}
}
symbol := t.Symbol
sIDSource := t.SecurityIDSrc
currency := t.Currency
side := t.Side
orderQty := t.OrderQty
settlDate := t.SettlDate
ownerTraderID := t.OwnerTraderID
m.tradesMu.Unlock()
quoteID := quoteReqID
q := quote.New(
field.NewQuoteID(quoteID),
field.NewQuoteType(enum.QuoteType_INDICATIVE),
field.NewQuoteType(enum.QuoteType_SEND_QUOTE),
field.NewTransactTime(time.Now()),
)
q.SetSymbol(symbol)
q.SetQuoteID(quoteID)
q.SetSymbol("[N/A]")
q.SetSecurityID(symbol)
q.SetSecurityIDSource(sIDSource)
q.SetQuoteReqID(quoteReqID)
if currency != "" {
q.SetCurrency(currency)
}
q.SetBidPx(bidPx, 8)
q.SetOfferPx(offerPx, 8)
if !bidSize.IsZero() {
q.SetBidSize(bidSize, 8)
if !orderQty.IsZero() {
q.SetOrderQty(orderQty, 0)
}
if !offerSize.IsZero() {
q.SetOfferSize(offerSize, 8)
if settlDate != "" {
q.SetSettlDate(settlDate)
}
if err := quickfix.SendToTarget(q, sessionID); err != nil {
return fmt.Errorf("sending FIX quote: %w", err)
q.SetPrice(price, 8)
if side == enum.Side_BUY {
q.SetOfferPx(price, 8)
q.SetSide(enum.Side_BUY)
} else {
q.SetBidPx(price, 8)
q.SetSide(enum.Side_SELL)
}
slog.Info("Quote sent", "clOrdID", clOrdID, "quoteID", quoteID, "symbol", symbol)
q.SetPriceType(enum.PriceType_PERCENTAGE)
if ownerTraderID != "" {
q.SetOwnerTraderID(ownerTraderID)
}
if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil {
sendErr = tracerr.Errorf("SendQuote: failed to send quote (quoteReqID=%s): %w", quoteReqID, sendErr)
slog.Error(sendErr.Error())
return sendErr
}
m.tradesMu.Lock()
if t, ok := m.trades[quoteReqID]; ok {
t.Price = price
t.Quoted = true
}
m.tradesMu.Unlock()
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol, "price", price.String())
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteID": quoteID,
"Symbol": symbol,
"Side": string(side),
"Price": price.String(),
"OrderQty": orderQty.String(),
"Currency": currency,
"SettlDate": settlDate,
}))
return nil
}
func (m *Manager) anyActiveSessionID() quickfix.SessionID {
m.sessionsMu.RLock()
defer m.sessionsMu.RUnlock()
for _, s := range m.sessions {
return s
}
return quickfix.SessionID{}
}
// handleRawMessage persists raw FIX message strings to the logs table.
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
quoteReqID := extractIdentifier(msg)
if err := m.store.SaveLog(domain.LogEntry{
QuoteReqID: quoteReqID,
RawMsg: "[" + direction + "] " + msg.String(),
}); err != nil {
err = tracerr.Errorf("failed to persist raw log: %w", err)
slog.Error(err.Error())
}
}
// persistMessage saves a structured FIX message to the messages table.
func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) {
if err := m.store.SaveMessage(domain.TradeMessage{
QuoteReqID: quoteReqID,
JMessage: fixJSON,
}); err != nil {
err = tracerr.Errorf("failed to persist message (msgType=%s, quoteReqID=%s): %w", fixJSON.MsgType, quoteReqID, err)
slog.Error(err.Error())
}
}
// loadActiveTrades reconstructs active trades from today's messages in the database.
func (m *Manager) loadActiveTrades() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
activeTrades := make(map[string]*listTrade)
for _, msg := range messages {
switch msg.JMessage.MsgType {
case "R": // QuoteRequest -> trade is born
if !strings.HasPrefix(msg.QuoteReqID, "LST_") {
continue
}
body := msg.JMessage.Body
nt, _ := body["NegotiationType"].(string)
if nt != "RFQ" {
continue
}
listID, _ := body["ListID"].(string)
if listID == "" {
continue
}
trade := &listTrade{
QuoteReqID: msg.QuoteReqID,
ListID: listID,
}
if v, ok := body["SecurityID"].(string); ok {
trade.Symbol = v
}
if v, ok := body["SecurityIDSource"].(string); ok {
trade.SecurityIDSrc = enum.SecurityIDSource(v)
}
if v, ok := body["Currency"].(string); ok {
trade.Currency = v
}
if v, ok := body["Side"].(string); ok {
trade.Side = enum.Side(v)
}
if v, ok := body["OrderQty"].(string); ok {
trade.OrderQty, _ = decimal.NewFromString(v)
}
if v, ok := body["SettlDate"].(string); ok {
trade.SettlDate = v
}
if v, ok := body["OwnerTraderID"].(string); ok {
trade.OwnerTraderID = v
}
activeTrades[msg.QuoteReqID] = trade
case "S": // Outgoing Quote — dealer has already quoted this trade
if t, ok := activeTrades[msg.QuoteReqID]; ok {
t.Quoted = true
if v, ok := msg.JMessage.Body["Price"].(string); ok {
t.Price, _ = decimal.NewFromString(v)
}
}
case "CW": // QuoteAck — if rejected, trade is dead
body := msg.JMessage.Body
quoteAckStatus, _ := body["QuoteAckStatus"].(string)
if quoteAckStatus != string(enum.QuoteAckStatus_ACCEPTED) {
delete(activeTrades, msg.QuoteReqID)
}
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
body := msg.JMessage.Body
quoteRespID, _ := body["QuoteRespID"].(string)
if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
delete(activeTrades, msg.QuoteReqID)
}
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
body := msg.JMessage.Body
execID, _ := body["ExecID"].(string)
clOrdID, _ := body["ClOrdID"].(string)
if strings.Contains(execID, "_TRDSUMM") {
delete(activeTrades, clOrdID)
}
}
}
m.trades = activeTrades
slog.Info("recovery completed", "activeTrades", len(activeTrades))
return nil
}

229
src/client/fix/parser.go Normal file
View File

@ -0,0 +1,229 @@
package fix
import (
"time"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/tag"
"quantex.com/qfixdpl/src/domain"
)
func extractHeader(msg *quickfix.Message) map[string]interface{} {
header := make(map[string]interface{})
if v, err := msg.Header.GetBytes(tag.BeginString); err == nil {
header["BeginString"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.MsgType); err == nil {
header["MsgType"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil {
header["SenderCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil {
header["TargetCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.MsgSeqNum); err == nil {
header["MsgSeqNum"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil {
header["SendingTime"] = string(v)
}
return header
}
func parseQuoteRequest(msg quoterequest.QuoteRequest) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if relSyms, err := msg.GetNoRelatedSym(); err == nil && relSyms.Len() > 0 {
sym := relSyms.Get(0)
if v, e := sym.GetSecurityID(); e == nil {
body["SecurityID"] = v
}
if v, e := sym.GetSecurityIDSource(); e == nil {
body["SecurityIDSource"] = string(v)
}
if v, e := sym.GetCurrency(); e == nil {
body["Currency"] = v
}
if v, e := sym.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := sym.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := sym.GetSettlDate(); e == nil {
body["SettlDate"] = v
}
if v, e := sym.GetListID(); e == nil {
body["ListID"] = v
}
if v, e := sym.GetOwnerTraderID(); e == nil {
body["OwnerTraderID"] = v
}
if v, e := sym.GetNegotiationType(); e == nil {
body["NegotiationType"] = v
}
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "R",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseQuoteAck(msg quoteack.QuoteAck) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if v, e := msg.GetQuoteID(); e == nil {
body["QuoteID"] = v
}
if v, e := msg.GetQuoteAckStatus(); e == nil {
body["QuoteAckStatus"] = string(v)
}
if v, e := msg.GetText(); e == nil {
body["Text"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "CW",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseQuoteResponse(msg quoteresponse.QuoteResponse) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if v, e := msg.GetQuoteRespID(); e == nil {
body["QuoteRespID"] = v
}
if v, e := msg.GetQuoteRespType(); e == nil {
body["QuoteRespType"] = string(v)
}
if v, e := msg.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := msg.GetPrice(); e == nil {
body["Price"] = v.String()
}
if v, e := msg.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := msg.GetClOrdID(); e == nil {
body["ClOrdID"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AJ",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessageJSON {
clOrdID, _ := msg.GetClOrdID()
body := map[string]interface{}{"ClOrdID": clOrdID}
if v, e := msg.GetExecID(); e == nil {
body["ExecID"] = v
}
if v, e := msg.GetOrderID(); e == nil {
body["OrderID"] = v
}
if v, e := msg.GetExecType(); e == nil {
body["ExecType"] = string(v)
}
if v, e := msg.GetOrdStatus(); e == nil {
body["OrdStatus"] = string(v)
}
if v, e := msg.GetListID(); e == nil {
body["ListID"] = v
}
if v, e := msg.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := msg.GetSymbol(); e == nil {
body["Symbol"] = v
}
if v, e := msg.GetSecurityID(); e == nil {
body["SecurityID"] = v
}
if v, e := msg.GetCurrency(); e == nil {
body["Currency"] = v
}
if v, e := msg.GetPrice(); e == nil {
body["Price"] = v.String()
}
if v, e := msg.GetLastPx(); e == nil {
body["LastPx"] = v.String()
}
if v, e := msg.GetLastQty(); e == nil {
body["LastQty"] = v.String()
}
if v, e := msg.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := msg.GetSettlDate(); e == nil {
body["SettlDate"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "8",
QuoteReqID: clOrdID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
// extractIdentifier extracts the trade identifier from a parsed FIX message.
// For ExecutionReport (8) and ExecutionAck (BN), uses ClOrdID (tag 11).
// For all other message types, uses QuoteReqID (tag 131).
func extractIdentifier(msg *quickfix.Message) string {
msgType, _ := msg.Header.GetBytes(tag.MsgType)
switch string(msgType) {
case "8", "BN":
var clOrdID quickfix.FIXString
if err := msg.Body.GetField(tag.ClOrdID, &clOrdID); err == nil {
return string(clOrdID)
}
default:
var quoteReqID quickfix.FIXString
if err := msg.Body.GetField(tag.QuoteReqID, &quoteReqID); err == nil {
return string(quoteReqID)
}
}
return ""
}
func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]interface{}) domain.FixMessageJSON {
return domain.FixMessageJSON{
Direction: "OUT",
MsgType: msgType,
QuoteReqID: quoteReqID,
Body: body,
ReceiveTime: time.Now(),
}
}

104
src/client/fix/protocol.txt Normal file
View File

@ -0,0 +1,104 @@
Step 1
Direction: ← QuoteRequest (R)
Comentary: Tradeweb sends trade information to dealer.
FIX Message: 8=FIXT.1.1 9=869 35=R 34=2 49=TRADEWEB 52=20160401-16:53:19.992 56=TW1_CORI_TEST_12345_DLRDPL 131=LST_20160401_TW1_CORI_NY302485.18_1 146=1 55=AMXLMM 2.375 09/08/16 48=02364WBC8 22=1 460=12 167=CORP 762=REGCORIPRC 541=20160908 225=20110908 470=MX 223=2.375 106=AMERICA MOVIL SAB DE CV 54=2 38=1000000 64=20160406 15=USD 6110=Comms 60=20160401-16:53:19 662=99.98046875 22570=0.76 663=1 699=912828UR9 761=1 423=6 44=-999999 5023=0.001000 66=NY302485.18 6847=1 75=20160401 464=Y 20086=1 20074=Y 20075=Y 20077=LatAm Comms 20078=pddealer 20079=60 20081=60 20090=60 20072=60 20098=60 5745=1 20073=RFQ 20076=Y 20156=N 20130=2000000000 20138=JPM,MER 20175=20120308 2115=0 22630=0 20265=LatAm 453=3 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=1 523=Tradeweb0001 803=4002 448=DTCC 447=C 452=4 5114=2 5113=1 20169=A2 5113=0 20169=A- 10=121
Step 2
Direction: QuoteStatusReport (AI) →
Comentary: Dealer acknowledges trade.
FIX Message: 8=FIXT.1.1 9=198 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=2 52=20160401-16:53:19.988 131=LST_20160401_TW1_CORI_NY302485.18_1 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:53:19.988 297=0 10=106
Step 3
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=231 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=3 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 6153=emackdlr 44=.99 423=6 60=20160401-16:53:19.990 10=247
Step 4
Direction: ← QuoteAck (CW)
Comentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=3 49=TRADEWEB 52=20160401-16:53:25.102 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:25 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
Step 5
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=4 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.97 423=6 60=20160401-16:53:19.990 10=114
Step 6
Direction: ← QuoteAck (CW)
Comentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=4 49=TRADEWEB 52=20160401-16:53:30.055 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:30 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=154
Step 7
Direction: Quote (S) →
Comentary: Dealer sends quote.
FIX Message: 8=FIXT.1.1 9=239 35=S 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=6 52=20160401-16:53:19.990 117=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 54=2 131=LST_20160401_TW1_CORI_NY302485.18_1 537=211 20087=5 6153=emackdlr 44=.98 423=6 60=20160401-16:53:19.990 10=117
Step 8
Direction: ← QuoteAck (CW)
Commentary: Tradeweb acknowledges quote.
FIX Message: 8=FIXT.1.1 9=186 35=CW 34=5 49=TRADEWEB 52=20160401-16:53:35.071 56=TW1_CORI_TEST_12345_DLRDPL 60=20160401-16:53:35 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 1865=1 10=163
Step 9
Direction: ← ExecutionReport (8)
Commentary: Tradeweb notifies dealer that the list trading (Due In time) has ended. ExecType=A
FIX Message: 8=FIXT.1.1 9=289 35=8 34=7 49=TRADEWEB 52=20160401-16:54:19.463 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.463 37=LST_20160401_TW1_CORI_NY302485.18_1 39=D 55=[N/A] 60=20160401-16:54:19 75=20160401 150=I 151=0 20086=1 10=073
Step 10
Direction: ExecutionAck (BN) →
Commentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=8 52=20160401-16:54:19.448 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125419.448 55=[N/A] 60=20160401-16:54:19.448 10=119
Step 11
Direction: ← QuoteResponse (AJ)
Commentary: Customer lifts after good-for time expires. Tradeweb informs dealer customer lift.
FIX Message: 8=FIXT.1.1 9=431 35=AJ 34=10 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 22=1 38=1000000 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:32 117=LST_20160401_TW1_CORI_NY302485.18_1 131=LST_20160401_TW1_CORI_NY302485.18_1 423=6 662=99.98046875 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 694=1 2115=1 20074=Y 20075=N 20076=N 20079=60 20082=60 20156=N 22570=0.760289080299 22630=0 10=183
Step 12
Direction: QuoteStatusReport (AI) →
Commentary: Dealer acknowledges customers trade acceptance.
FIX Message: 8=FIXT.1.1 9=206 35=AI 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=12 52=20160401-16:55:32.849 693=LST_20160401_TW1_CORI_NY302485.18_1_TRDREQ 131=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:32.849 297=0 10=189
Step 13
Direction: ← ExecutionReport (8)
Commentary: Tradeweb notifies the dealer that list has ended
FIX Message: incoming8=FIXT.1.1 9=290 35=8 34=11 49=TRADEWEB 52=20160401-16:55:32.855 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.855 37=LST_20160401_TW1_CORI_NY302485.18_1 39=A 55=[N/A] 60=20160401-16:55:32 75=20160401 150=A 151=0 20086=1 10=095
Step 14
Direction: ExecutionAck (BN) →
Commentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=13 52=20160401-16:55:32.852 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_LISTEND-125532.852 55=[N/A] 60=20160401-16:55:32.852 10=155
Step 15
Direction: ExecutionReport (8) →
Commentary: Dealer accepts re-quote is not allowed. ExecType=F, OrdStatus=Filled
FIX Message: 8=FIXT.1.1 9=283 35=8 34=14 49=TW1_CORI_TEST_12345_DLRDPL 52=20160401-16:55:32.850 56=TRADEWEB 6=0 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 54=2 55=[N/A] 150=F 151=0 6153=LST_20160401_TW1_CORI_NY302485.18_1 22631=POSTTRADE_STRING 22632=POSTTRADE_STRING 10=155
Step 16
Direction: ← ExecutionAck (BN)
Commentary: Tradeweb acknowledges ExecutionReport
FIX Message: 8=FIXT.1.1 9=233 35=BN 34=12 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1 37=LST_20160401_TW1_CORI_NY302485.18_1 55=[N/A] 60=20160401-16:55:37 1036=1 10=051
Step 17
Direction: ← ExecutionReport (8)
Commentary: Tradeweb informs Dealer of outcome of a list trade item. OrdStatus=Filled, ExecType=Trade
FIX Message: 8=FIXT.1.1 9=337 35=8 34=13 49=TRADEWEB 52=20160401-16:55:37.917 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 37=LST_20160401_TW1_CORI_NY302485.18_1 39=2 44=0.98 54=2 55=[N/A] 60=20160401-16:55:37 75=20160401 150=F 151=0 423=6 662=99.98046875 22570=0.760289080299 10=067
Step 18
Direction: ExecutionAck (BN) →
Comentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=255 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=15 52=20160401-16:55:37.910 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDEND-125537.917 55=[N/A] 60=20160401-16:55:37.910 10=078
Step 19
Direction:
Comentary: Autospot at Tradeweb Treasury composite.
FIX Message:
Step 20
Direction: ← ExecutionReport (8)
Comentary: Tradeweb sends ExecutionReport message to confirm final outcome of list trade item including applicable cover quote, spot, settlement money information, etc. OrdStatus=Filled, ExecType=Order Status TradeSummary = Y
FIX Message: 8=FIXT.1.1 9=733 35=8 34=14 49=TRADEWEB 52=20160401-16:55:40.292 56=TW1_CORI_TEST_12345_DLRDPL 6=0 11=LST_20160401_TW1_CORI_NY302485.18_1 14=0 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 22=1 37=LST_20160401_TW1_CORI_NY302485.18_1 38=1000000 39=2 44=0.98 48=02364WBC8 54=2 55=[N/A] 60=20160401-16:55:40 64=20160406 75=20160401 150=F 151=0 167=CORP 236=1.74 423=6 453=2 448=emack 447=C 452=3 802=3 523=Dev Test 803=2 523=NY 803=25 523=USA 803=4000 448=Tradeweb 447=C 452=1 802=2 523=Tradeweb0001 803=4002 523=YES 803=4003 526=TRD_20160401_TW1_CORI_23 662=99.98046875 1003=20160401.TW1.CORI.23 6153=emackdlr 6731=20160401.TW1.CORI.23 20115=100.265 20250=225000 22570=0.76 22630=0 22631=POSTTRADE_STRING 22634=160401.DLRX.TRSY.120 22636=Y 10=239
Step 21
Direction: ExecutionAck (BN) →
Comentary: Dealer acknowledges ExecutionReport message.
FIX Message: 8=FIXT.1.1 9=256 35=BN 49=TW1_CORI_TEST_12345_DLRDPL 56=TRADEWEB 34=16 52=20160401-16:55:40.287 1036=1 37=LST_20160401_TW1_CORI_NY302485.18_1 11=LST_20160401_TW1_CORI_NY302485.18_1 17=LST_20160401_TW1_CORI_NY302485.18_1_TRDSUMM-125540.277 55=[N/A] 60=20160401-16:55:40.287 10=182

View File

@ -132,7 +132,7 @@ func getMessage(text string, status domain.MessageStatus) string {
'cardId': 'createCardMessage',
'card': {
'header': {
'title': 'qfixdpl',
'title': 'QFIXDPL',
'subtitle': 'Notification',
'imageUrl': '%s',
'imageType': 'CIRCLE'

16
src/client/store/db.sql Normal file
View File

@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS qfixdpl_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quote_req_id TEXT NOT NULL,
j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
CREATE TABLE IF NOT EXISTS qfixdpl_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quote_req_id TEXT NOT NULL UNIQUE,
raw_msg TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

View File

@ -2,7 +2,9 @@
package store
import (
_ "embed"
"log/slog"
"strings"
"time"
"quantex.com.ar/multidb"
@ -11,6 +13,9 @@ import (
"quantex.com/qfixdpl/src/common/tracerr"
)
//go:embed db.sql
var schemaSQL string
const dbPingSeconds = 30
type Store struct {
@ -45,9 +50,31 @@ func New(config Config) (*Store, error) {
go s.db.PeriodicDBPing(time.Second * dbPingSeconds)
if err := s.ensureTables(); err != nil {
return nil, tracerr.Errorf("error ensuring tables: %w", err)
}
return s, nil
}
func (p *Store) ensureTables() error {
statements := strings.Split(schemaSQL, ";")
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := p.db.Exec(stmt); err != nil {
return tracerr.Errorf("error executing schema statement: %w", err)
}
}
slog.Info("database tables ensured")
return nil
}
func (p *Store) CloseDB() {
p.db.Close()
slog.Info("closing database connection.")

View File

@ -0,0 +1,105 @@
package store
import (
"encoding/json"
"strings"
"time"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
func (p *Store) SaveMessage(msg domain.TradeMessage) error {
jsonBytes, err := json.Marshal(msg.JMessage)
if err != nil {
return tracerr.Errorf("error marshaling j_message: %w", err)
}
_, err = p.db.Exec(
"INSERT INTO qfixdpl_messages (quote_req_id, j_message) VALUES ($1, $2)",
msg.QuoteReqID, string(jsonBytes),
)
if err != nil {
return tracerr.Errorf("error inserting message: %w", err)
}
return nil
}
func (p *Store) SaveLog(entry domain.LogEntry) error {
upsertStmt := `INSERT INTO qfixdpl_logs (quote_req_id, raw_msg)
VALUES ($1, $2)
ON CONFLICT (quote_req_id) DO UPDATE
SET raw_msg = qfixdpl_logs.raw_msg || E'\n' || EXCLUDED.raw_msg,
updated_at = NOW()`
_, err := p.db.Exec(upsertStmt, entry.QuoteReqID, entry.RawMsg)
if err != nil {
return tracerr.Errorf("error upserting log: %w", err)
}
return nil
}
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
rows, err := p.db.Query(
"SELECT id, quote_req_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
)
if err != nil {
return nil, tracerr.Errorf("error querying today messages: %w", err)
}
defer rows.Close()
var messages []domain.TradeMessage
for rows.Next() {
var (
id, quoteReqID string
jMessageRaw []byte
createdAt time.Time
)
if err := rows.Scan(&id, &quoteReqID, &jMessageRaw, &createdAt); err != nil {
return nil, tracerr.Errorf("error scanning message row: %w", err)
}
var jMessage domain.FixMessageJSON
if err := json.Unmarshal(jMessageRaw, &jMessage); err != nil {
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
}
messages = append(messages, domain.TradeMessage{
ID: id,
QuoteReqID: quoteReqID,
JMessage: jMessage,
CreatedAt: createdAt,
})
}
if err := rows.Err(); err != nil {
return nil, tracerr.Errorf("error iterating message rows: %w", err)
}
return messages, nil
}
func (p *Store) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
selectStmt := "SELECT raw_msg FROM qfixdpl_logs WHERE quote_req_id = '" + quoteReqID + "';"
response, err := p.db.Query(selectStmt)
if err != nil {
return domain.Logs{}, tracerr.Errorf("error querying logs: %w", err)
}
defer response.Close()
if !response.Next() {
return domain.Logs{}, nil
}
var rawMsg string
if err := response.Scan(&rawMsg); err != nil {
return domain.Logs{}, tracerr.Errorf("error scanning log row: %w", err)
}
return domain.Logs{Entries: strings.Split(rawMsg, "\n")}, nil
}

View File

@ -38,9 +38,8 @@ func Runner(cfg app.Config) error {
}
userData := data.New()
orderStore := data.NewOrderStore()
fixManager := fix.NewManager(cfg.FIX, orderStore, notify)
fixManager := fix.NewManager(cfg.FIX, appStore, notify)
if err = fixManager.Start(); err != nil {
return fmt.Errorf("error starting FIX acceptor: %w", err)
}
@ -54,7 +53,7 @@ func Runner(cfg app.Config) error {
EnableJWTAuth: cfg.EnableJWTAuth,
}
api := rest.New(userData, appStore, orderStore, fixManager, apiConfig, notify)
api := rest.New(userData, appStore, fixManager, apiConfig, notify)
api.Run()
cmd.WaitForInterruptSignal(nil)

View File

@ -1,32 +0,0 @@
// Package domain defines all the domain models
package domain
import (
"time"
"github.com/shopspring/decimal"
)
// Order represents a FIX NewOrderSingle message received from a client.
type Order struct {
ClOrdID string
Symbol string
Side string
OrdType string
OrderQty decimal.Decimal
Price decimal.Decimal
SessionID string
ReceivedAt time.Time
}
// OrderStore is the port for persisting and retrieving orders.
type OrderStore interface {
SaveOrder(order Order)
GetOrders() []Order
GetOrderByClOrdID(id string) (Order, bool)
}
// FIXSender is the port for sending FIX messages back to clients.
type FIXSender interface {
SendQuote(clOrdID, quoteID, symbol, currency string, bidPx, offerPx, bidSize, offerSize decimal.Decimal) error
}

55
src/domain/persistence.go Normal file
View File

@ -0,0 +1,55 @@
// Package domain defines all the domain models
package domain
import "time"
// ListTrade es la representacion exportada de un trade de List Trading.
type ListTrade struct {
QuoteReqID string
ListID string
Symbol string
SecurityIDSrc string
Currency string
Side string
OrderQty string
SettlDate string
Price string
OwnerTraderID string
}
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
type FixMessageJSON struct {
Direction string `json:"direction"`
MsgType string `json:"msg_type"`
QuoteReqID string `json:"quote_req_id"`
Header map[string]interface{} `json:"header"`
Body map[string]interface{} `json:"body"`
ReceiveTime time.Time `json:"receive_time"`
}
// TradeMessage es una fila de qfixdpl_messages.
type TradeMessage struct {
ID string `json:"id"`
QuoteReqID string `json:"quote_req_id"`
JMessage FixMessageJSON `json:"j_message"`
CreatedAt time.Time `json:"created_at"`
}
// LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs.
type LogEntry struct {
QuoteReqID string
RawMsg string
}
// Logs es la respuesta del endpoint GET /trades/:quoteReqID/logs.
type Logs struct {
Entries []string `json:"entries"`
}
// PersistenceStore define la interfaz de persistencia.
type PersistenceStore interface {
SaveMessage(msg TradeMessage) error
SaveLog(entry LogEntry) error
GetTodayMessages() ([]TradeMessage, error)
GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
}