Compare commits

..

18 Commits

Author SHA1 Message Date
c05621c508 Merge branch 'develop' of gitea.quantex.com.ar:Quantex/qfixdpl into FIXDPL-2/New_structure_and_persistance 2026-03-30 10:32:48 -03:00
149f18dea0 Structure, persistance, recovery 2026-03-30 10:27:03 -03:00
b58c8df905 fix log library 2026-03-26 12:05:18 -03:00
82d2e1b5f7 Persistance and recovery 2026-03-19 13:23:23 -03:00
51ef6e182d Documentation of 8.4 flow 2026-03-16 17:03:12 -03:00
e17675d973 Flow 8.4 list trading working 2026-03-16 12:44:52 -03:00
5f1d7038ac merging 2026-03-13 14:23:47 -03:00
710772b052 Add QuoteStatusReport and QuoteAck handlers 2026-03-13 14:20:38 -03:00
4e62548091 fix ids 2026-03-13 12:11:40 -03:00
fbcaac95f5 fixes in quotes 2026-03-13 11:35:37 -03:00
3998726100 respond automatically to quote requests 2026-03-12 17:10:31 -03:00
1f1c0afb9a QuoteRequest fix 2026-03-12 14:59:11 -03:00
d1aff0212e Merge pull request 'Add Quickfix library' (#1) from quickfix into develop
Reviewed-on: #1
2026-03-12 15:32:09 +00:00
0910b1e6c8 fixes 2026-03-12 10:23:19 -03:00
50c7f98c37 adding notifications 2026-03-11 12:40:24 -03:00
48373b6855 fix store logs 2026-03-10 17:38:47 -03:00
1d32854a09 fix logs 2026-03-10 17:16:51 -03:00
7e26addd80 improvement 2026-03-10 16:36:57 -03:00
18 changed files with 2408 additions and 222 deletions

View File

@ -0,0 +1,341 @@
# Flow 8.4 — List Trading (Dealer Response)
## Overview
Flow 8.4 defines how a **dealer** responds to a client's Request for Quote (RFQ) through Tradeweb (TW) in a List Trading context. The key rule is:
> **The dealer NEVER sends an ExecutionReport (35=8). Only Tradeweb does.**
The dealer's role is limited to:
- Acknowledging messages (35=AI QuoteStatusReport, 35=BN ExecutionAck)
- Sending a price quote (35=S)
This document covers the **happy path** (client accepts) and two **alternative flows**:
- **Flow 8.6 — Trade Ended:** Client cancels before or after receiving the quote
- **QuoteAck Rejected:** TW rejects the dealer's quote
## Participants
| Abbreviation | Role |
|---|---|
| **TW** | Tradeweb — the platform that orchestrates the trade |
| **Dealer** | Us — responds to RFQs with quotes and acknowledges TW messages |
| **Client** | The counterparty requesting a quote (we never communicate with them directly) |
## Message Flow
```
TW Dealer
│ │
│ 1. QuoteRequest (35=R) │
│ ─────────────────────────────────────────> │
│ │
│ 2. QuoteStatusReport (35=AI) [ACK] │
│ <───────────────────────────────────────── │
│ │
│ 3. Quote (35=S) [price] │
│ <───────────────────────────────────────── │
│ │
│ 4. QuoteAck (35=CW) [ACCEPTED] │
│ ─────────────────────────────────────────> │
│ │
│ 5. QuoteResponse (35=AJ) [Hit/Lift] │
│ ─────────────────────────────────────────> │
│ │
│ 6. QuoteStatusReport (35=AI) [TRDREQACK] │
│ <───────────────────────────────────────── │
│ │
│ 7. ExecutionReport (35=8) [_LISTEND] │
│ ─────────────────────────────────────────> │
│ │
│ 8. ExecutionAck (35=BN) │
│ <───────────────────────────────────────── │
│ │
│ 9. ExecutionReport (35=8) [_TRDEND] │
│ ─────────────────────────────────────────> │
│ │
│ 10. ExecutionAck (35=BN) │
│ <───────────────────────────────────────── │
│ │
│ 11. ExecutionReport (35=8) [_TRDSUMM] │
│ ─────────────────────────────────────────> │
│ │
│ 12. ExecutionAck (35=BN) │
│ <───────────────────────────────────────── │
│ │
```
## Step-by-Step Detail
### Step 1 — QuoteRequest (35=R) — TW → Dealer
TW sends an RFQ on behalf of the client. Key fields:
| Tag | Field | Example | Notes |
|-----|-------|---------|-------|
| 131 | QuoteReqID | `LST_20260316_BYMA_CORI_NY1567246.1_1` | Always starts with `LST_` for List Trading |
| 66 | ListID | `NY1567246.1` | Identifies the list/inquiry |
| 48 | SecurityID | `040114HT0` | Bond identifier |
| 22 | SecurityIDSource | `1` (CUSIP) | Could also be `4` (ISIN) |
| 54 | Side | `2` (SELL) | The client's side — if client sells, dealer buys |
| 38 | OrderQty | `10000` | Quantity requested |
| 15 | Currency | `USD` | Settlement currency |
| 64 | SettlDate | `20260317` | Settlement date |
| 20073 | NegotiationType | `RFQ` | Must be RFQ for this flow |
**Validation:** The dealer must verify `LST_` prefix, non-empty `ListID`, and `NegotiationType=RFQ` before proceeding.
### Step 2 — QuoteStatusReport (35=AI) — Dealer → TW
The dealer acknowledges receipt of the QuoteRequest.
| Tag | Field | Value |
|-----|-------|-------|
| 131 | QuoteReqID | Same as received |
| 117 | QuoteID | Same as QuoteReqID |
| 297 | QuoteStatus | `0` (ACCEPTED) |
### Step 3 — Quote (35=S) — Dealer → TW
The dealer sends a price quote.
| Tag | Field | Example | Notes |
|-----|-------|---------|-------|
| 117 | QuoteID | Same as QuoteReqID | |
| 131 | QuoteReqID | Same as received | |
| 132 | BidPx | `99.60000000` | Set when client side is SELL (dealer bids) |
| 133 | OfferPx | `99.60000000` | Set when client side is BUY (dealer offers) |
| 44 | Price | `99.60000000` | The quote price |
| 423 | PriceType | `1` (Percentage) | |
| 537 | QuoteType | `211` (SEND_QUOTE) | |
### Step 4 — QuoteAck (35=CW) — TW → Dealer
TW confirms the quote was accepted.
| Tag | Field | Value |
|-----|-------|-------|
| 1865 | QuoteAckStatus | `1` (ACCEPTED) |
**Note:** If status is not ACCEPTED, the dealer logs the rejection (including the `Text` field) and cleans up the trade from memory. See [QuoteAck Rejected](#quoteack-rejected-quote-not-accepted) below.
### Step 5 — QuoteResponse (35=AJ) — TW → Dealer
The client has decided to trade (Hit/Lift the quote).
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 694 | QuoteRespType | `1` (Hit/Lift) | `2` would be Counter — that's flow 8.5, not handled here |
| 693 | QuoteRespID | `..._TRDREQ` | Always ends with `_TRDREQ` |
### Step 6 — QuoteStatusReport (35=AI) — Dealer → TW
The dealer acknowledges the trade request (TRDREQACK).
| Tag | Field | Value |
|-----|-------|-------|
| 131 | QuoteReqID | Same as received |
| 693 | QuoteRespID | Same as received |
| 297 | QuoteStatus | `0` (ACCEPTED) |
### Step 7 — ExecutionReport (35=8) `_LISTEND` — TW → Dealer
TW signals that the due-in window for the list has closed.
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 17 | ExecID | `..._LISTEND-{timestamp}` | Contains `_LISTEND` |
| 150 | ExecType | `A` (PendingNew) | |
| 39 | OrdStatus | `A` (PendingNew) | |
**Dealer action:** Send ExecutionAck only. **Do NOT send an ExecutionReport back.**
### Step 8 — ExecutionAck (35=BN) — Dealer → TW
| Tag | Field | Value |
|-----|-------|-------|
| 37 | OrderID | Same as received |
| 17 | ExecID | Same as received |
| 1036 | ExecAckStatus | `1` (ACCEPTED) |
### Step 9 — ExecutionReport (35=8) `_TRDEND` — TW → Dealer
TW sends the trade result.
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 17 | ExecID | `..._TRDEND-{timestamp}` | Contains `_TRDEND` |
| 150 | ExecType | `F` (Trade) | The trade was executed |
| 39 | OrdStatus | `2` (Filled) | |
| 44 | Price | `99.6` | Final execution price |
**Dealer action:** Send ExecutionAck. Clean up internal trade tracking state.
### Step 10 — ExecutionAck (35=BN) — Dealer → TW
Same format as Step 8.
### Step 11 — ExecutionReport (35=8) `_TRDSUMM` — TW → Dealer
TW sends the full trade summary with additional details (parties, settlement info, trade IDs).
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 17 | ExecID | `..._TRDSUMM-{timestamp}` | Contains `_TRDSUMM` |
| 150 | ExecType | `F` (Trade) | |
| 39 | OrdStatus | `2` (Filled) | |
| 453 | NoPartyIDs | Party information | Counterparty details |
| 526 | SecondaryClOrdID | `TRD_...` | Tradeweb trade reference |
| 1003 | TradeID | `20260316.BYMA.CORI.230` | Unique trade identifier |
**Dealer action:** Send ExecutionAck. Log the summary for audit/reconciliation.
### Step 12 — ExecutionAck (35=BN) — Dealer → TW
Same format as Step 8.
---
## Alternative Flows
### Flow 8.6 — Trade Ended (Client Cancels)
The client changes their mind and ends the trade. This can happen at any point after the QuoteRequest — even before the dealer's quote arrives. TW informs the dealer via QuoteResponse (35=AJ) messages with `_TRDEND` and `_TRDSUMM` suffixes instead of the ExecutionReport chain.
> **Critical:** The dealer MUST send a QuoteStatusReport (35=AI) ACK for every QuoteResponse. If no ACK is sent, TW will retry the message indefinitely (~every 11 seconds).
```
TW Dealer
│ │
│ 1. QuoteRequest (35=R) │
│ ─────────────────────────────────────────> │
│ │
│ 2. QuoteStatusReport (35=AI) [ACK] │
│ <───────────────────────────────────────── │
│ │
│ ┌─── Client ends trade ───┐ │
│ │ Meanwhile, dealer may │ │
│ │ still send Quote (S) │ │
│ └─────────────────────────┘ │
│ │
│ 3. QuoteResponse (35=AJ) [_TRDEND] │
│ QuoteRespType=7 (End Trade) │
│ ─────────────────────────────────────────> │
│ │
│ 4. QuoteStatusReport (35=AI) [ACK] │
│ <───────────────────────────────────────── │
│ │
│ 5. QuoteAck (35=CW) [REJECTED] │
│ (if quote was sent, TW rejects it) │
│ ─────────────────────────────────────────> │
│ │
│ 6. QuoteResponse (35=AJ) [_TRDSUMM] │
│ QuoteRespType=7, TradeSummary=Y │
│ ─────────────────────────────────────────> │
│ │
│ 7. QuoteStatusReport (35=AI) [ACK] │
│ <───────────────────────────────────────── │
│ │
```
#### Step 3 — QuoteResponse (35=AJ) `_TRDEND` — TW → Dealer
TW notifies that the client ended the trade.
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 693 | QuoteRespID | `..._TRDEND` | Suffix identifies this as trade end |
| 694 | QuoteRespType | `7` (End Trade) | |
| 131 | QuoteReqID | Same as original | |
**Dealer action:** Send QuoteStatusReport (35=AI) with `693=QuoteRespID` and `297=0` (ACCEPTED).
#### Step 5 — QuoteAck (35=CW) `REJECTED` — TW → Dealer
If the dealer's Quote (35=S) crossed with the TRDEND, TW rejects it. The QuoteAckStatus will be `2` (REJECTED) with a text like "DPL DLRQUOTE received in an invalid state."
**Dealer action:** Log the rejection and clean up the trade from memory.
#### Step 6 — QuoteResponse (35=AJ) `_TRDSUMM` — TW → Dealer
TW sends the final trade summary confirming the outcome.
| Tag | Field | Value | Notes |
|-----|-------|-------|-------|
| 693 | QuoteRespID | `..._TRDSUMM` | Final summary message |
| 694 | QuoteRespType | `7` (End Trade) | |
| 22636 | TradeSummary | `Y` | Confirms this is the summary |
**Dealer action:** Send QuoteStatusReport (35=AI) ACK. Clean up the trade from memory. This is the **terminal message** — no more messages will follow for this QuoteReqID.
---
### QuoteAck Rejected (Quote Not Accepted)
If TW rejects the dealer's Quote (35=CW with status != ACCEPTED), the trade is dead from the dealer's perspective.
```
TW Dealer
│ │
│ 1-3. (same as happy path) │
│ │
│ 4. QuoteAck (35=CW) [REJECTED] │
│ QuoteAckStatus != 1 │
│ ─────────────────────────────────────────> │
│ │
│ Trade is terminated. │
│ │
```
**Dealer action:** Log the rejection (including the `Text` field with the reason) and remove the trade from memory. No further action needed — TW may or may not send subsequent messages for this QuoteReqID.
---
## QuoteRespID Suffix Routing in `handleQuoteResponse`
All QuoteResponse (35=AJ) messages are routed by the suffix of the `QuoteRespID` (tag 693):
```
QuoteRespID ends with "_TRDREQ" → Trade request (flow 8.4 happy path) — ACK
QuoteRespID ends with "_TRDEND" → Trade ended by client (flow 8.6) — ACK
QuoteRespID ends with "_TRDSUMM" → Trade summary (flow 8.6 final) — ACK + cleanup
QuoteRespID ends with "_LISTEND" → List ended — ACK
Other suffix → Ignored (logged)
```
## Code Reference
The implementation lives in `src/client/fix/manager.go`:
| Handler | Triggers on | Action |
|---------|------------|--------|
| `handleQuoteRequest` | 35=R | Sends 35=AI (ack) + 35=S (quote) |
| `handleQuoteAck` | 35=CW | If rejected: logs + cleans up trade. If accepted: logs |
| `handleQuoteResponse` | 35=AJ | Sends 35=AI (ACK). Routes by QuoteRespID suffix. Cleans up on `_TRDSUMM` |
| `handleExecutionReport` | 35=8 | Sends 35=BN (ack) + routes by ExecID suffix |
| `sendQuoteStatusReport` | — | Builds 35=AI for QuoteRequest ack |
| `sendTradeRequestAck` | — | Builds 35=AI for QuoteResponse ack (all suffixes) |
| `sendExecutionAck` | — | Builds 35=BN for ExecutionReport ack |
### ExecID Routing in `handleExecutionReport`
```
ExecID contains "_LISTEND" → Log only, await trade result
ExecID contains "_TRDEND" → Log trade end
ExecID contains "_TRDSUMM" → Log trade summary + cleanup trade from memory
ExecType = F (fallback) → Log generic trade result
```
The order matters: ExecID suffix checks run before ExecType checks, because `_TRDEND` and `_TRDSUMM` both have `ExecType=F`.
### Trade Cleanup Paths
A trade is removed from memory in any of these scenarios:
| Trigger | Message | Condition |
|---------|---------|-----------|
| QuoteAck rejected | 35=CW | `QuoteAckStatus != ACCEPTED` |
| QuoteResponse summary | 35=AJ | `QuoteRespID` ends with `_TRDSUMM` (flow 8.6) |
| ExecutionReport summary | 35=8 | `ExecID` contains `_TRDSUMM` (flow 8.4) |
The `loadActiveTrades` recovery function replays today's messages and applies the same cleanup rules to reconstruct accurate state on restart.

View File

@ -29,27 +29,25 @@ const (
) )
type Controller struct { type Controller struct {
pool *redis.Pool pool *redis.Pool
userData app.UserDataProvider userData app.UserDataProvider
store *store.Store store *store.Store
orderStore domain.OrderStore tradeProvider TradeProvider
fixSender domain.FIXSender config Config
config Config notify domain.Notifier
notify domain.Notifier authMutex deadlock.Mutex
authMutex deadlock.Mutex
} }
func newController(pool *redis.Pool, userData app.UserDataProvider, func newController(pool *redis.Pool, userData app.UserDataProvider,
s *store.Store, orderStore domain.OrderStore, fixSender domain.FIXSender, config Config, n domain.Notifier, s *store.Store, tradeProvider TradeProvider, config Config, n domain.Notifier,
) *Controller { ) *Controller {
return &Controller{ return &Controller{
pool: pool, pool: pool,
userData: userData, userData: userData,
store: s, store: s,
orderStore: orderStore, tradeProvider: tradeProvider,
fixSender: fixSender, config: config,
config: config, notify: n,
notify: n,
} }
} }
@ -293,50 +291,71 @@ func allowed(origin string, config Config) bool {
return false return false
} }
// GetOrders godoc // GetTrades godoc
// @Summary List received FIX orders // @Summary List active trades
// @Description Returns all NewOrderSingle messages received via FIX // @Description Returns only active List Trading trades
// @Tags fix // @Tags fix
// @Produce json // @Produce json
// @Success 200 {array} domain.Order // @Success 200 {array} domain.ListTrade
// @Router /qfixdpl/v1/orders [get] // @Router /qfixdpl/v1/trades [get]
func (cont *Controller) GetOrders(ctx *gin.Context) { func (cont *Controller) GetTrades(ctx *gin.Context) {
orders := cont.orderStore.GetOrders() trades := cont.tradeProvider.GetTrades()
ctx.JSON(http.StatusOK, orders) ctx.JSON(http.StatusOK, trades)
} }
// SendQuote godoc // GetAllTrades godoc
// @Summary Send a FIX Quote // @Summary List all trades
// @Description Sends a Quote (MsgType S) back to the FIX client for a given order // @Description Returns all List Trading trades (active, rejected, completed)
// @Tags fix // @Tags fix
// @Accept json
// @Produce json // @Produce json
// @Param quote body QuoteRequest true "Quote details" // @Success 200 {array} domain.ListTrade
// @Success 200 {object} Msg // @Router /qfixdpl/v1/trades/all [get]
// @Failure 400 {object} HTTPError func (cont *Controller) GetAllTrades(ctx *gin.Context) {
// @Failure 404 {object} HTTPError trades := cont.tradeProvider.GetAllTrades()
// @Failure 500 {object} HTTPError ctx.JSON(http.StatusOK, trades)
// @Router /qfixdpl/v1/quotes [post] }
func (cont *Controller) SendQuote(ctx *gin.Context) {
var req QuoteRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return // GetLogs godoc
} // @Summary Get raw FIX logs for a trade
// @Description Returns raw FIX message logs for a given QuoteReqID
// @Tags fix
// @Produce json
// @Param quoteReqID path string true "QuoteReqID"
// @Success 200 {object} domain.Logs
// @Router /qfixdpl/v1/trades/{quoteReqID}/logs [get]
func (cont *Controller) GetLogs(ctx *gin.Context) {
quoteReqID := ctx.Param("quoteReqID")
bidPx, offerPx, bidSize, offerSize, err := req.toDecimals() logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID)
if err != nil { if err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()}) slog.Error("GetLogs: error fetching logs", "quoteReqID", quoteReqID, "error", err)
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
return return
} }
if err = cont.fixSender.SendQuote(req.ClOrdID, req.QuoteID, req.Symbol, req.Currency, bidPx, offerPx, bidSize, offerSize); err != nil { ctx.JSON(http.StatusOK, logs)
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: err.Error()})
return
}
ctx.JSON(http.StatusOK, Msg{Text: "quote sent"})
} }
// GetFullTradeLog godoc
// @Summary Get full trade lifecycle log
// @Description Returns DPL and Post-Trade raw FIX logs for a given trade (by QuoteReqID)
// @Tags fix
// @Produce json
// @Param quoteReqID path string true "QuoteReqID"
// @Success 200 {object} domain.FullTradeLog
// @Router /qfixdpl/v1/trades/{quoteReqID}/full-log [get]
func (cont *Controller) GetFullTradeLog(ctx *gin.Context) {
quoteReqID := ctx.Param("quoteReqID")
fullLog, err := cont.store.GetFullTradeLog(quoteReqID)
if err != nil {
slog.Error("GetFullTradeLog: error fetching full trade log", "quoteReqID", quoteReqID, "error", err)
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching full trade log"})
return
}
ctx.JSON(http.StatusOK, fullLog)
}

View File

@ -1,10 +1,5 @@
package rest package rest
import (
"fmt"
"github.com/shopspring/decimal"
)
type HTTPError struct { type HTTPError struct {
Error string Error string
@ -23,41 +18,3 @@ type Session struct {
Email string Email string
} }
type QuoteRequest struct {
ClOrdID string `json:"cl_ord_id" binding:"required"`
QuoteID string `json:"quote_id" binding:"required"`
Symbol string `json:"symbol" binding:"required"`
Currency string `json:"currency"`
BidPx string `json:"bid_px" binding:"required"`
OfferPx string `json:"offer_px" binding:"required"`
BidSize string `json:"bid_size"`
OfferSize string `json:"offer_size"`
}
func (r QuoteRequest) toDecimals() (bidPx, offerPx, bidSize, offerSize decimal.Decimal, err error) {
bidPx, err = decimal.NewFromString(r.BidPx)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid bid_px: %w", err)
}
offerPx, err = decimal.NewFromString(r.OfferPx)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid offer_px: %w", err)
}
if r.BidSize != "" {
bidSize, err = decimal.NewFromString(r.BidSize)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid bid_size: %w", err)
}
}
if r.OfferSize != "" {
offerSize, err = decimal.NewFromString(r.OfferSize)
if err != nil {
return bidPx, offerPx, bidSize, offerSize, fmt.Errorf("invalid offer_size: %w", err)
}
}
return bidPx, offerPx, bidSize, offerSize, nil
}

View File

@ -21,8 +21,9 @@ func SetRoutes(api *API) {
qfixdpl := v1.Group("/") qfixdpl := v1.Group("/")
qfixdpl.Use(cont.AuthRequired) qfixdpl.Use(cont.AuthRequired)
qfixdpl.GET("/health", cont.HealthCheck) qfixdpl.GET("/health", cont.HealthCheck)
qfixdpl.GET("/orders", cont.GetOrders) qfixdpl.GET("/trades", cont.GetTrades)
qfixdpl.POST("/quotes", cont.SendQuote) qfixdpl.GET("/trades/all", cont.GetAllTrades)
qfixdpl.GET("/trades/:quoteReqID/full-log", cont.GetFullTradeLog)
backoffice := qfixdpl.Group("/backoffice") backoffice := qfixdpl.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser) backoffice.Use(cont.BackOfficeUser)

View File

@ -16,6 +16,12 @@ import (
"quantex.com/qfixdpl/src/domain" "quantex.com/qfixdpl/src/domain"
) )
// TradeProvider exposes trade data from the FIX manager.
type TradeProvider interface {
GetTrades() []domain.ListTrade
GetAllTrades() []domain.ListTrade
}
const RedisMaxIdle = 3000 // In ms const RedisMaxIdle = 3000 // In ms
type API struct { type API struct {
@ -32,7 +38,7 @@ type Config struct {
EnableJWTAuth bool EnableJWTAuth bool
} }
func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore domain.OrderStore, fixSender domain.FIXSender, config Config, notify domain.Notifier) *API { func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {
// Set up Gin // Set up Gin
var engine *gin.Engine var engine *gin.Engine
if version.Environment() == version.EnvironmentTypeProd { if version.Environment() == version.EnvironmentTypeProd {
@ -58,7 +64,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, orderStore d
} }
api := &API{ api := &API{
Controller: newController(NewPool(), userData, storeInstance, orderStore, fixSender, config, notify), Controller: newController(NewPool(), userData, storeInstance, tradeProvider, config, notify),
Router: engine, Router: engine,
Port: config.Port, Port: config.Port,
} }

View File

@ -1,41 +0,0 @@
package data
import (
"sync"
"quantex.com/qfixdpl/src/domain"
)
type InMemoryOrderStore struct {
mu sync.RWMutex
orders []domain.Order
}
func NewOrderStore() *InMemoryOrderStore {
return &InMemoryOrderStore{}
}
func (s *InMemoryOrderStore) SaveOrder(order domain.Order) {
s.mu.Lock()
defer s.mu.Unlock()
s.orders = append(s.orders, order)
}
func (s *InMemoryOrderStore) GetOrders() []domain.Order {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]domain.Order, len(s.orders))
copy(result, s.orders)
return result
}
func (s *InMemoryOrderStore) GetOrderByClOrdID(id string) (domain.Order, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, o := range s.orders {
if o.ClOrdID == id {
return o, true
}
}
return domain.Order{}, false
}

View File

@ -5,22 +5,42 @@ import (
"log/slog" "log/slog"
"quantex.com/qfixdpl/quickfix" "quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/tag"
"quantex.com/qfixdpl/src/domain"
) )
type application struct { type application struct {
router *quickfix.MessageRouter router *quickfix.MessageRouter
onLogon func(quickfix.SessionID) notifier domain.Notifier
onLogout func(quickfix.SessionID) onLogon func(quickfix.SessionID)
onQuote func(quote.Quote, quickfix.SessionID) onLogout func(quickfix.SessionID)
onQuote func(quote.Quote, quickfix.SessionID)
onQuoteRequest func(quoterequest.QuoteRequest, quickfix.SessionID)
onQuoteAck func(quoteack.QuoteAck, quickfix.SessionID)
onQuoteResponse func(quoteresponse.QuoteResponse, quickfix.SessionID)
onExecutionReport func(executionreport.ExecutionReport, quickfix.SessionID)
onExecutionAck func(executionack.ExecutionAck, quickfix.SessionID)
onRawMessage func(direction string, msg *quickfix.Message)
} }
func newApplication() *application { func newApplication(n domain.Notifier) *application {
app := &application{ app := &application{
router: quickfix.NewMessageRouter(), router: quickfix.NewMessageRouter(),
notifier: n,
} }
app.router.AddRoute(quote.Route(app.handleQuote)) app.router.AddRoute(quote.Route(app.handleQuote))
app.router.AddRoute(quoteack.Route(app.handleQuoteAck))
app.router.AddRoute(quoterequest.Route(app.handleQuoteRequest))
app.router.AddRoute(quoteresponse.Route(app.handleQuoteResponse))
app.router.AddRoute(executionack.Route(app.handleExecutionAck))
app.router.AddRoute(executionreport.Route(app.handleExecutionReport))
return app return app
} }
@ -38,6 +58,9 @@ func (a *application) OnLogon(sessionID quickfix.SessionID) {
func (a *application) OnLogout(sessionID quickfix.SessionID) { func (a *application) OnLogout(sessionID quickfix.SessionID) {
slog.Info("FIX session logged out", "session", sessionID.String()) slog.Info("FIX session logged out", "session", sessionID.String())
go a.notifier.SendMsg(domain.MessageChannelError, "Logout", domain.MessageStatusWarning, nil)
if a.onLogout != nil { if a.onLogout != nil {
a.onLogout(sessionID) a.onLogout(sessionID)
} }
@ -45,14 +68,87 @@ func (a *application) OnLogout(sessionID quickfix.SessionID) {
func (a *application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {} func (a *application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {}
func (a *application) ToApp(_ *quickfix.Message, _ quickfix.SessionID) error { return nil } func (a *application) ToApp(msg *quickfix.Message, _ quickfix.SessionID) error {
if a.onRawMessage != nil {
a.onRawMessage("OUT", msg)
}
return nil
}
func (a *application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError { func (a *application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError {
return nil return nil
} }
func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError { func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError {
return a.router.Route(msg, sessionID) beginString, _ := msg.Header.GetBytes(tag.BeginString)
msgType, _ := msg.Header.GetBytes(tag.MsgType)
var applVerID quickfix.FIXString
msg.Header.GetField(tag.ApplVerID, &applVerID)
slog.Info("FIX FromApp received",
"beginString", string(beginString),
"msgType", string(msgType),
"applVerID", string(applVerID),
"session", sessionID.String(),
"rawMsg", msg.String(),
)
if a.onRawMessage != nil {
a.onRawMessage("IN", msg)
}
rejErr := a.router.Route(msg, sessionID)
if rejErr != nil {
slog.Error("FIX FromApp routing failed",
"msgType", string(msgType),
"error", rejErr.Error(),
"isBusinessReject", rejErr.IsBusinessReject(),
)
}
return rejErr
}
func (a *application) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) quickfix.MessageRejectError {
quoteReqID, err := msg.GetQuoteReqID()
if err != nil {
slog.Error("QuoteRequest missing QuoteReqID", "error", err.Error())
return err
}
slog.Info("QuoteRequest received",
"quoteReqID", quoteReqID,
"session", sessionID.String(),
)
if a.onQuoteRequest != nil {
a.onQuoteRequest(msg, sessionID)
}
return nil
}
func (a *application) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) quickfix.MessageRejectError {
quoteReqID, _ := msg.GetQuoteReqID()
quoteID, _ := msg.GetQuoteID()
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
slog.Info("QuoteAck received",
"quoteReqID", quoteReqID,
"quoteID", quoteID,
"quoteAckStatus", status,
"text", text,
"session", sessionID.String(),
)
if a.onQuoteAck != nil {
a.onQuoteAck(msg, sessionID)
}
return nil
} }
func (a *application) handleQuote(msg quote.Quote, sessionID quickfix.SessionID) quickfix.MessageRejectError { func (a *application) handleQuote(msg quote.Quote, sessionID quickfix.SessionID) quickfix.MessageRejectError {
@ -71,3 +167,64 @@ func (a *application) handleQuote(msg quote.Quote, sessionID quickfix.SessionID)
return nil return nil
} }
func (a *application) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) quickfix.MessageRejectError {
quoteRespID, _ := msg.GetQuoteRespID()
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespType, _ := msg.GetQuoteRespType()
slog.Info("QuoteResponse received",
"quoteRespID", quoteRespID,
"quoteReqID", quoteReqID,
"quoteRespType", quoteRespType,
"session", sessionID.String(),
)
if a.onQuoteResponse != nil {
a.onQuoteResponse(msg, sessionID)
}
return nil
}
func (a *application) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) quickfix.MessageRejectError {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
status, _ := msg.GetExecAckStatus()
slog.Info("ExecutionAck received",
"execID", execID,
"orderID", orderID,
"execAckStatus", status,
"session", sessionID.String(),
)
if a.onExecutionAck != nil {
a.onExecutionAck(msg, sessionID)
}
return nil
}
func (a *application) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
listID, _ := msg.GetListID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
slog.Info("ExecutionReport received",
"execID", execID,
"orderID", orderID,
"listID", listID,
"execType", execType,
"ordStatus", ordStatus,
"session", sessionID.String(),
)
if a.onExecutionReport != nil {
a.onExecutionReport(msg, sessionID)
}
return nil
}

View File

@ -1,71 +1,128 @@
package fix package fix
import ( import (
"fmt"
"log/slog" "log/slog"
"os" "os"
"strings"
"sync" "sync"
"time" "time"
"github.com/rs/zerolog/log"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"quantex.com/qfixdpl/quickfix" "quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/enum" "quantex.com/qfixdpl/quickfix/gen/enum"
"quantex.com/qfixdpl/quickfix/gen/field" "quantex.com/qfixdpl/quickfix/gen/field"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote" "quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport"
filelog "quantex.com/qfixdpl/quickfix/log/file"
"quantex.com/qfixdpl/quickfix/store/file"
"quantex.com/qfixdpl/src/app" "quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain" "quantex.com/qfixdpl/src/domain"
) )
type listTrade struct {
QuoteReqID string
TradeID string
ListID string
Symbol string
SecurityIDSrc enum.SecurityIDSource
Currency string
Side enum.Side
OrderQty decimal.Decimal
SettlDate string
Price decimal.Decimal
OwnerTraderID string
SessionID quickfix.SessionID
Status domain.TradeStatus
}
// Manager wraps the QuickFIX initiator and implements domain.FIXSender. // Manager wraps the QuickFIX initiator and implements domain.FIXSender.
type Manager struct { type Manager struct {
initiator *quickfix.Initiator initiator *quickfix.Initiator
app *application app *application
sessionsMu sync.RWMutex sessionsMu sync.RWMutex
sessions map[string]quickfix.SessionID sessions map[string]quickfix.SessionID
orderStore domain.OrderStore tradesMu sync.RWMutex
trades map[string]*listTrade
store domain.PersistenceStore
notify domain.Notifier notify domain.Notifier
cfg app.FIXConfig cfg app.FIXConfig
} }
func NewManager(cfg app.FIXConfig, orderStore domain.OrderStore, notify domain.Notifier) *Manager { func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
return &Manager{ return &Manager{
sessions: make(map[string]quickfix.SessionID), sessions: make(map[string]quickfix.SessionID),
orderStore: orderStore, trades: make(map[string]*listTrade),
notify: notify, store: store,
cfg: cfg, notify: notify,
cfg: cfg,
} }
} }
func (m *Manager) Start() error { func (m *Manager) Start() error {
fixApp := newApplication() fixApp := newApplication(m.notify)
fixApp.onLogon = m.onLogon fixApp.onLogon = m.onLogon
fixApp.onLogout = m.onLogout fixApp.onLogout = m.onLogout
fixApp.onQuoteRequest = m.handleQuoteRequest
fixApp.onQuoteAck = m.handleQuoteAck
fixApp.onQuoteResponse = m.handleQuoteResponse
fixApp.onExecutionReport = m.handleExecutionReport
fixApp.onExecutionAck = m.handleExecutionAck
fixApp.onRawMessage = m.handleRawMessage
m.app = fixApp m.app = fixApp
if err := m.loadTrades(); err != nil {
slog.Error("failed to load active trades from DB, starting with empty state", "error", err)
}
f, err := os.Open(m.cfg.SettingsFile) f, err := os.Open(m.cfg.SettingsFile)
if err != nil { if err != nil {
return fmt.Errorf("opening FIX settings file %q: %w", m.cfg.SettingsFile, err) err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err)
log.Error().Msg(err.Error())
return err
} }
defer f.Close() defer f.Close()
settings, err := quickfix.ParseSettings(f) settings, err := quickfix.ParseSettings(f)
if err != nil { if err != nil {
return fmt.Errorf("parsing FIX settings: %w", err) err = tracerr.Errorf("error parsing FIX settings: %s", err)
log.Error().Msg(err.Error())
return err
} }
storeFactory := quickfix.NewMemoryStoreFactory() storeFactory := file.NewStoreFactory(settings)
logFactory := quickfix.NewNullLogFactory() logFactory, err := filelog.NewLogFactory(settings)
if err != nil {
err = tracerr.Errorf("error creating file log factory: %s", err)
log.Error().Msg(err.Error())
return err
}
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory) initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
if err != nil { if err != nil {
return fmt.Errorf("creating FIX initiator: %w", err) err = tracerr.Errorf("error creating FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
} }
m.initiator = initiator m.initiator = initiator
if err = m.initiator.Start(); err != nil { if err = m.initiator.Start(); err != nil {
return fmt.Errorf("starting FIX initiator: %w", err) err = tracerr.Errorf("error starting FIX initiator: %s", err)
log.Error().Msg(err.Error())
return err
} }
slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile) slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile)
@ -84,6 +141,18 @@ func (m *Manager) onLogon(sessionID quickfix.SessionID) {
m.sessionsMu.Lock() m.sessionsMu.Lock()
m.sessions[sessionID.String()] = sessionID m.sessions[sessionID.String()] = sessionID
m.sessionsMu.Unlock() m.sessionsMu.Unlock()
// Assign the new session to all recovered trades that have no session yet.
// This covers the case where the service was restarted mid-trade: loadTrades()
// reconstructs the trade data but cannot recover the SessionID from the DB.
// Since this is a single-session initiator, all active trades belong to this session.
m.tradesMu.Lock()
for _, trade := range m.trades {
if trade.Status == domain.TradeStatusActive && trade.SessionID == (quickfix.SessionID{}) {
trade.SessionID = sessionID
}
}
m.tradesMu.Unlock()
} }
func (m *Manager) onLogout(sessionID quickfix.SessionID) { func (m *Manager) onLogout(sessionID quickfix.SessionID) {
@ -92,51 +161,527 @@ func (m *Manager) onLogout(sessionID quickfix.SessionID) {
m.sessionsMu.Unlock() m.sessionsMu.Unlock()
} }
// SendQuote implements domain.FIXSender. // sendQuoteStatusReport sends a QuoteStatusReport (35=AI) to acknowledge the incoming QuoteRequest.
func (m *Manager) SendQuote(clOrdID, quoteID, symbol, currency string, bidPx, offerPx, bidSize, offerSize decimal.Decimal) error { func (m *Manager) sendQuoteStatusReport(quoteReqID, ownerTraderID string, sessionID quickfix.SessionID) error {
m.sessionsMu.RLock() qsr := quotestatusreport.New(
var sessionID quickfix.SessionID field.NewTransactTime(time.Now()),
var ok bool field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED),
for _, sid := range m.sessions { )
sessionID = sid qsr.SetQuoteReqID(quoteReqID)
ok = true qsr.SetQuoteID(quoteReqID)
break qsr.SetSymbol("[N/A]")
} if ownerTraderID != "" {
m.sessionsMu.RUnlock() qsr.SetOwnerTraderID(ownerTraderID)
if !ok {
return fmt.Errorf("no active FIX session")
} }
return quickfix.SendToTarget(qsr, sessionID)
}
// sendTradeRequestAck sends a QuoteStatusReport (35=AI) to acknowledge a trade request (TRDREQACK).
func (m *Manager) sendTradeRequestAck(quoteReqID, quoteRespID string, sessionID quickfix.SessionID) error {
qsr := quotestatusreport.New(
field.NewTransactTime(time.Now()),
field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED),
)
qsr.SetQuoteReqID(quoteReqID)
qsr.SetQuoteRespID(quoteRespID)
qsr.SetSymbol("[N/A]")
return quickfix.SendToTarget(qsr, sessionID)
}
// sendExecutionAck sends an ExecutionAck (35=BN) to acknowledge an incoming ExecutionReport.
func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID quickfix.SessionID) error {
bn := executionack.New(
field.NewOrderID(orderID),
field.NewExecID(execID),
field.NewExecAckStatus(enum.ExecAckStatus_ACCEPTED),
)
bn.SetClOrdID(clOrdID)
bn.SetSymbol("[N/A]")
bn.SetTransactTime(time.Now())
return quickfix.SendToTarget(bn, sessionID)
}
// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge) followed by a Quote at price 99.6.
func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) {
quoteReqID, err := msg.GetQuoteReqID()
if err != nil {
slog.Error("handleQuoteRequest: missing QuoteReqID", "error", err.Error())
return
}
// Validate LST_ prefix for List Trading flow.
if !strings.HasPrefix(quoteReqID, "LST_") {
slog.Warn("handleQuoteRequest: QuoteReqID missing LST_ prefix, ignoring", "quoteReqID", quoteReqID)
return
}
var (
symbol, currency, ownerTraderID, settlDate, listID, negotiationType string
side enum.Side
secIDSource enum.SecurityIDSource
orderQty decimal.Decimal
)
relatedSyms, relErr := msg.GetNoRelatedSym()
if relErr == nil && relatedSyms.Len() > 0 {
sym := relatedSyms.Get(0)
symbol, _ = sym.GetSecurityID()
secIDSource, _ = sym.GetSecurityIDSource()
currency, _ = sym.GetCurrency()
side, _ = sym.GetSide()
ownerTraderID, _ = sym.GetOwnerTraderID()
orderQty, _ = sym.GetOrderQty()
settlDate, _ = sym.GetSettlDate()
listID, _ = sym.GetListID()
negotiationType, _ = sym.GetNegotiationType()
}
if listID == "" {
slog.Warn("handleQuoteRequest: missing ListID", "quoteReqID", quoteReqID)
return
}
if negotiationType != "RFQ" {
slog.Warn("handleQuoteRequest: unexpected NegotiationType", "quoteReqID", quoteReqID, "negotiationType", negotiationType)
return
}
// Step 1: Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil {
slog.Error("handleQuoteRequest: failed to send QuoteStatusReport", "quoteReqID", quoteReqID, "error", ackErr.Error())
return
}
slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID)
// Step 2: Build and send Quote (35=S) with price.
price := decimal.NewFromFloat(99.6)
sIDSource := enum.SecurityIDSource_ISIN_NUMBER
if secIDSource == enum.SecurityIDSource_CUSIP {
sIDSource = enum.SecurityIDSource_CUSIP
}
quoteID := quoteReqID
q := quote.New( q := quote.New(
field.NewQuoteID(quoteID), field.NewQuoteID(quoteID),
field.NewQuoteType(enum.QuoteType_INDICATIVE), field.NewQuoteType(enum.QuoteType_SEND_QUOTE),
field.NewTransactTime(time.Now()), field.NewTransactTime(time.Now()),
) )
q.SetSymbol(symbol) q.SetSymbol("[N/A]")
q.SetQuoteID(quoteID) q.SetSecurityID(symbol)
q.SetSecurityIDSource(sIDSource)
q.SetQuoteReqID(quoteReqID)
if currency != "" { if currency != "" {
q.SetCurrency(currency) q.SetCurrency(currency)
} }
q.SetBidPx(bidPx, 8) if !orderQty.IsZero() {
q.SetOfferPx(offerPx, 8) q.SetOrderQty(orderQty, 0)
if !bidSize.IsZero() {
q.SetBidSize(bidSize, 8)
} }
if !offerSize.IsZero() { if settlDate != "" {
q.SetOfferSize(offerSize, 8) q.SetSettlDate(settlDate)
} }
if err := quickfix.SendToTarget(q, sessionID); err != nil { q.SetPrice(price, 8)
return fmt.Errorf("sending FIX quote: %w", err)
if side == enum.Side_BUY {
q.SetOfferPx(price, 8)
q.SetSide(enum.Side_BUY)
} else {
q.SetBidPx(price, 8)
q.SetSide(enum.Side_SELL)
} }
slog.Info("Quote sent", "clOrdID", clOrdID, "quoteID", quoteID, "symbol", symbol) q.SetPriceType(enum.PriceType_PERCENTAGE)
if ownerTraderID != "" {
q.SetOwnerTraderID(ownerTraderID)
}
if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil {
slog.Error("handleQuoteRequest: failed to send quote", "quoteReqID", quoteReqID, "error", sendErr.Error())
return
}
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol)
// Store trade state for subsequent steps.
m.tradesMu.Lock()
m.trades[quoteReqID] = &listTrade{
QuoteReqID: quoteReqID,
ListID: listID,
Symbol: symbol,
SecurityIDSrc: sIDSource,
Currency: currency,
Side: side,
OrderQty: orderQty,
SettlDate: settlDate,
Price: price,
OwnerTraderID: ownerTraderID,
SessionID: sessionID,
Status: domain.TradeStatusActive,
}
m.tradesMu.Unlock()
// Persist structured message (outside mutex).
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
// Persist outgoing QuoteStatusReport.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
"OwnerTraderID": ownerTraderID,
}))
// Persist outgoing Quote.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteID": quoteID,
"Symbol": symbol,
"Side": string(side),
"Price": price.String(),
"OrderQty": orderQty.String(),
"Currency": currency,
"SettlDate": settlDate,
}))
}
// handleQuoteAck handles an incoming QuoteAck (35=CW).
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
status, _ := msg.GetQuoteAckStatus()
text, _ := msg.GetText()
m.persistMessage(quoteReqID, parseQuoteAck(msg))
// QuoteAckStatus only has two defined values in TW DPL:
// 1 = Accepted — quote delivered to client.
// 2 = Rejected — format error, late quote, viewer busy, or race condition.
if status == enum.QuoteAckStatus_REJECTED {
slog.Error("handleQuoteAck: quote rejected by TW", "quoteReqID", quoteReqID, "quoteAckStatus", string(status), "text", text)
m.tradesMu.Lock()
if t, ok := m.trades[quoteReqID]; ok {
t.Status = domain.TradeStatusRejected
}
m.tradesMu.Unlock()
return
}
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
}
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
quoteReqID, _ := msg.GetQuoteReqID()
quoteRespID, _ := msg.GetQuoteRespID()
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
return
}
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
// Always send ACK regardless of whether the trade is in our map.
// TW will keep retrying until it receives an ACK.
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
slog.Error("handleQuoteResponse: failed to send ACK", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID, "error", ackErr.Error())
return
}
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
// Persist incoming QuoteResponse.
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
// Persist outgoing ACK.
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
"QuoteReqID": quoteReqID,
"QuoteRespID": quoteRespID,
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
}))
// _TRDSUMM is the final message — mark trade as completed.
if isTrdSumm {
slog.Info("Trade summary received, marking completed", "quoteReqID", quoteReqID)
m.tradesMu.Lock()
if t, ok := m.trades[quoteReqID]; ok {
t.Status = domain.TradeStatusCompleted
}
m.tradesMu.Unlock()
}
}
// handleExecutionReport handles an incoming ExecutionReport (35=8).
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
// In TW DPL, ClOrdID (Tag 11) always equals the original QuoteReqID (Tag 131),
// so we use clOrdID directly as the map lookup key.
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
execID, _ := msg.GetExecID()
orderID, _ := msg.GetOrderID()
clOrdID, _ := msg.GetClOrdID()
execType, _ := msg.GetExecType()
ordStatus, _ := msg.GetOrdStatus()
listID, _ := msg.GetListID()
slog.Info("handleExecutionReport received",
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
)
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
slog.Error("handleExecutionReport: failed to send ExecutionAck", "execID", execID, "error", ackErr.Error())
} else {
slog.Info("ExecutionAck sent", "execID", execID)
}
switch {
case strings.Contains(execID, "_LISTEND"):
slog.Info("List ended (due-in closed), awaiting trade result from TW",
"execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDEND"):
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
case strings.Contains(execID, "_TRDSUMM"):
slog.Info("Trade summary received from TW, marking completed",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
m.tradesMu.Lock()
if t, ok := m.trades[clOrdID]; ok {
t.Status = domain.TradeStatusCompleted
}
m.tradesMu.Unlock()
case execType == enum.ExecType_TRADE:
slog.Info("Trade result received from TW",
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
}
// Persist incoming ExecutionReport.
m.persistMessage(clOrdID, parseExecutionReport(msg))
// Persist outgoing ExecutionAck.
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
"OrderID": orderID,
"ExecID": execID,
"ClOrdID": clOrdID,
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
}))
// If the ExecutionReport carries a TradeID, persist it in qfixdpl_logs for cross-service correlation.
tradeID, _ := msg.GetTradeID()
if tradeID != "" {
m.tradesMu.Lock()
if t, ok := m.trades[clOrdID]; ok {
t.TradeID = tradeID
}
m.tradesMu.Unlock()
if err := m.store.UpdateLogTradeID(clOrdID, tradeID); err != nil {
slog.Error("handleExecutionReport: failed to update log trade_id", "clOrdID", clOrdID, "tradeID", tradeID, "error", err)
}
}
}
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
// Logged in application.go, no further action needed.
}
// GetTrades returns a snapshot of only active trades.
func (m *Manager) GetTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
if t.Status != domain.TradeStatusActive {
continue
}
trades = append(trades, toListTrade(t))
}
return trades
}
// GetAllTrades returns a snapshot of all trades (active, rejected, completed).
func (m *Manager) GetAllTrades() []domain.ListTrade {
m.tradesMu.RLock()
defer m.tradesMu.RUnlock()
trades := make([]domain.ListTrade, 0, len(m.trades))
for _, t := range m.trades {
trades = append(trades, toListTrade(t))
}
return trades
}
func toListTrade(t *listTrade) domain.ListTrade {
return domain.ListTrade{
QuoteReqID: t.QuoteReqID,
TradeID: t.TradeID,
ListID: t.ListID,
Symbol: t.Symbol,
SecurityIDSrc: string(t.SecurityIDSrc),
Currency: t.Currency,
Side: string(t.Side),
OrderQty: t.OrderQty.String(),
SettlDate: t.SettlDate,
Price: t.Price.String(),
OwnerTraderID: t.OwnerTraderID,
Status: t.Status,
}
}
// handleRawMessage persists raw FIX message strings to the logs table.
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
quoteReqID := extractIdentifier(msg)
if err := m.store.SaveLog(domain.LogEntry{
QuoteReqID: quoteReqID,
RawMsg: "[" + direction + "] " + msg.String(),
}); err != nil {
slog.Error("failed to persist raw log", "error", err)
}
}
// persistMessage saves a structured FIX message to the messages table.
func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) {
if err := m.store.SaveMessage(domain.TradeMessage{
QuoteReqID: quoteReqID,
JMessage: fixJSON,
}); err != nil {
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "quoteReqID", quoteReqID, "error", err)
}
}
// loadTrades reconstructs all trades and their states from today's messages in the database.
func (m *Manager) loadTrades() error {
messages, err := m.store.GetTodayMessages()
if err != nil {
return err
}
trades := make(map[string]*listTrade)
for _, msg := range messages {
switch msg.JMessage.MsgType {
case "R": // QuoteRequest -> trade is born
if !strings.HasPrefix(msg.QuoteReqID, "LST_") {
continue
}
body := msg.JMessage.Body
nt, _ := body["NegotiationType"].(string)
if nt != "RFQ" {
continue
}
listID, _ := body["ListID"].(string)
if listID == "" {
continue
}
trade := &listTrade{
QuoteReqID: msg.QuoteReqID,
ListID: listID,
Status: domain.TradeStatusActive,
}
if v, ok := body["SecurityID"].(string); ok {
trade.Symbol = v
}
if v, ok := body["Currency"].(string); ok {
trade.Currency = v
}
if v, ok := body["Side"].(string); ok {
trade.Side = enum.Side(v)
}
if v, ok := body["OrderQty"].(string); ok {
trade.OrderQty, _ = decimal.NewFromString(v)
}
if v, ok := body["SettlDate"].(string); ok {
trade.SettlDate = v
}
if v, ok := body["OwnerTraderID"].(string); ok {
trade.OwnerTraderID = v
}
trades[msg.QuoteReqID] = trade
case "CW": // QuoteAck — only status "2" (Rejected) marks the trade as rejected
body := msg.JMessage.Body
quoteAckStatus, _ := body["QuoteAckStatus"].(string)
if quoteAckStatus == string(enum.QuoteAckStatus_REJECTED) {
if t, ok := trades[msg.QuoteReqID]; ok {
t.Status = domain.TradeStatusRejected
}
}
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
body := msg.JMessage.Body
quoteRespID, _ := body["QuoteRespID"].(string)
if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
if t, ok := trades[msg.QuoteReqID]; ok {
t.Status = domain.TradeStatusCompleted
}
}
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
body := msg.JMessage.Body
execID, _ := body["ExecID"].(string)
clOrdID, _ := body["ClOrdID"].(string)
if tid, ok := body["TradeID"].(string); ok && tid != "" {
if t, ok := trades[clOrdID]; ok {
t.TradeID = tid
}
}
if strings.Contains(execID, "_TRDSUMM") {
if t, ok := trades[clOrdID]; ok {
t.Status = domain.TradeStatusCompleted
}
}
}
}
active := 0
for _, t := range trades {
if t.Status == domain.TradeStatusActive {
active++
}
}
m.trades = trades
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
return nil return nil
} }

View File

@ -0,0 +1,647 @@
package fix
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/domain"
)
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
// newTestManager builds a Manager with the given store without calling Start().
func newTestManager(store domain.PersistenceStore) *Manager {
notify := &MockNotifier{}
return NewManager(app.FIXConfig{}, store, notify)
}
// makeMsg builds a TradeMessage with the given quoteReqID, msgType, and body.
func makeMsg(quoteReqID, msgType string, body map[string]interface{}) domain.TradeMessage {
return domain.TradeMessage{
ID: "test-id-" + quoteReqID + "-" + msgType,
QuoteReqID: quoteReqID,
JMessage: domain.FixMessageJSON{
MsgType: msgType,
QuoteReqID: quoteReqID,
Body: body,
ReceiveTime: time.Now(),
},
CreatedAt: time.Now(),
}
}
// makeQuoteRequest builds a "R" (QuoteRequest) TradeMessage.
func makeQuoteRequest(quoteReqID, listID, nt string, extras map[string]interface{}) domain.TradeMessage {
body := map[string]interface{}{
"NegotiationType": nt,
"ListID": listID,
}
for k, v := range extras {
body[k] = v
}
return makeMsg(quoteReqID, "R", body)
}
// makeQuoteAck builds a "CW" (QuoteAck) TradeMessage.
func makeQuoteAck(quoteReqID, status string) domain.TradeMessage {
return makeMsg(quoteReqID, "CW", map[string]interface{}{
"QuoteAckStatus": status,
})
}
// makeQuoteResponse builds an "AJ" (QuoteResponse) TradeMessage.
func makeQuoteResponse(quoteReqID, quoteRespID string) domain.TradeMessage {
return makeMsg(quoteReqID, "AJ", map[string]interface{}{
"QuoteRespID": quoteRespID,
})
}
// makeExecutionReport builds an "8" (ExecutionReport) TradeMessage.
// clOrdID maps to body["ClOrdID"]; it is also set as TradeMessage.QuoteReqID
// to mirror how persistMessage works in handleExecutionReport.
func makeExecutionReport(clOrdID, execID string) domain.TradeMessage {
return makeMsg(clOrdID, "8", map[string]interface{}{
"ClOrdID": clOrdID,
"ExecID": execID,
})
}
// makeOutgoing builds an outgoing message (AI, S, BN) for a given quoteReqID.
func makeOutgoing(quoteReqID, msgType string) domain.TradeMessage {
return makeMsg(quoteReqID, msgType, map[string]interface{}{})
}
// ---------------------------------------------------------------------------
// Group 1 — DB vacia
// ---------------------------------------------------------------------------
func TestLoadTrades_EmptyDB(t *testing.T) {
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.NotNil(t, m.trades)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 2 — Interrupcion despues de QuoteRequest
// ---------------------------------------------------------------------------
func TestLoadTrades_SingleR_CreatesOneTrade(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", map[string]interface{}{
"SecurityID": "US1234567890",
"Currency": "USD",
"Side": "1",
"OrderQty": "1000000",
"SettlDate": "20260320",
"OwnerTraderID": "trader1",
})
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
require.Len(t, m.trades, 1)
trade := m.trades["LST_ABC123"]
require.NotNil(t, trade)
assert.Equal(t, "LST_ABC123", trade.QuoteReqID)
assert.Equal(t, "LIST_1", trade.ListID)
assert.Equal(t, "US1234567890", trade.Symbol)
assert.Equal(t, "USD", trade.Currency)
assert.Equal(t, "1", string(trade.Side))
assert.Equal(t, "20260320", trade.SettlDate)
assert.Equal(t, "trader1", trade.OwnerTraderID)
assert.Equal(t, domain.TradeStatusActive, trade.Status)
store.AssertExpectations(t)
}
func TestLoadTrades_R_WithOutgoingMsgs_IgnoresAI_S(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
ai := makeOutgoing("LST_ABC123", "AI")
s := makeOutgoing("LST_ABC123", "S")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, ai, s}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_R_NonLSTPrefix_Ignored(t *testing.T) {
r := makeQuoteRequest("TRD_ABC123", "LIST_1", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
func TestLoadTrades_R_NonRFQ_NegotiationType_Ignored(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "DEALER", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
func TestLoadTrades_R_EmptyListID_Ignored(t *testing.T) {
r := makeQuoteRequest("LST_X", "", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
func TestLoadTrades_R_MissingOptionalFields_TradeCreated(t *testing.T) {
r := makeQuoteRequest("LST_MIN", "LIST_MIN", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
require.Len(t, m.trades, 1)
trade := m.trades["LST_MIN"]
require.NotNil(t, trade)
assert.Equal(t, "LST_MIN", trade.QuoteReqID)
assert.Equal(t, "LIST_MIN", trade.ListID)
assert.Equal(t, "", trade.Symbol)
assert.Equal(t, "", trade.Currency)
assert.Equal(t, "", trade.SettlDate)
assert.Equal(t, "", trade.OwnerTraderID)
assert.Equal(t, domain.TradeStatusActive, trade.Status)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 3 — Interrupcion despues de QuoteAck
// ---------------------------------------------------------------------------
func TestLoadTrades_CW_Accepted_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1") // ACCEPTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_CW_Rejected_TradeMarkedRejected(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "2") // REJECTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusRejected, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
// QuoteAckStatus "0" is RECEIVED_NOT_YET_PROCESSED — not a rejection.
// Only status "2" (Rejected) should mark the trade as rejected.
func TestLoadTrades_CW_ReceivedNotYetProcessed_TradeStaysActive(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "0") // RECEIVED_NOT_YET_PROCESSED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_CW_WithoutPriorR_NoCrash(t *testing.T) {
cw := makeQuoteAck("LST_ORPHAN", "2")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{cw}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 4 — Interrupcion despues de QuoteResponse
// ---------------------------------------------------------------------------
func TestLoadTrades_AJ_TRDREQ_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDREQ")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_TRDEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_LISTEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_LISTEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_Q1", "1")
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_Q1"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_AJ_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
aj := makeQuoteResponse("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{aj}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 5 — Interrupcion despues de ExecutionReport
// ---------------------------------------------------------------------------
func TestLoadTrades_ExecReport_LISTEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_ABC123", "1")
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_ExecReport_TRDEND_TradeRemains(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_ABC123", "1")
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
execListEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
execTrdEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDEND")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, execListEnd, execTrdEnd}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_ExecReport_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
cw := makeQuoteAck("LST_ABC123", "1")
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 1)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_ABC123"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_ExecReport_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
exec := makeExecutionReport("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{exec}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 6 — Multiples trades en paralelo
// ---------------------------------------------------------------------------
func TestLoadTrades_TwoTrades_BothActive(t *testing.T) {
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
cw1 := makeQuoteAck("LST_TRADE1", "1")
cw2 := makeQuoteAck("LST_TRADE2", "1")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, cw1, cw2}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 2)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE1"].Status)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_TwoTrades_OneCompleted_OneActive(t *testing.T) {
// TRADE1: fully completed via flow 8.4 (_TRDSUMM execution report)
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
cw1 := makeQuoteAck("LST_TRADE1", "1")
aj1 := makeQuoteResponse("LST_TRADE1", "LST_TRADE1_TRDREQ")
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
// TRADE2: still active
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
cw2 := makeQuoteAck("LST_TRADE2", "1")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return(
[]domain.TradeMessage{r1, cw1, aj1, exec1, r2, cw2}, nil,
)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 2)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
store.AssertExpectations(t)
}
func TestLoadTrades_TwoTrades_BothCompleted(t *testing.T) {
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
cw1 := makeQuoteAck("LST_TRADE1", "1")
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
cw2 := makeQuoteAck("LST_TRADE2", "1")
exec2 := makeExecutionReport("LST_TRADE2", "LST_TRADE2_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return(
[]domain.TradeMessage{r1, cw1, exec1, r2, cw2, exec2}, nil,
)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
assert.Len(t, m.trades, 2)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE2"].Status)
store.AssertExpectations(t)
}
// ---------------------------------------------------------------------------
// Group 7 — SessionID se asigna en onLogon (solo a trades activos)
// ---------------------------------------------------------------------------
func TestLoadTrades_SessionID_IsZeroAfterRecovery(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
err := m.loadTrades()
require.NoError(t, err)
require.Len(t, m.trades, 1)
trade := m.trades["LST_X"]
require.NotNil(t, trade)
assert.Equal(t, quickfix.SessionID{}, trade.SessionID)
store.AssertExpectations(t)
}
func TestOnLogon_AssignsSessionToRecoveredActiveTrades(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
sessionID := quickfix.SessionID{
BeginString: "FIXT.1.1",
SenderCompID: "QFIXDPL",
TargetCompID: "TRADEWEB",
}
m.onLogon(sessionID)
trade := m.trades["LST_X"]
require.NotNil(t, trade)
assert.Equal(t, sessionID, trade.SessionID)
}
func TestOnLogon_DoesNotAssignSessionToCompletedTrades(t *testing.T) {
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
exec := makeExecutionReport("LST_X", "LST_X_TRDSUMM")
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, exec}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
sessionID := quickfix.SessionID{
BeginString: "FIXT.1.1",
SenderCompID: "QFIXDPL",
TargetCompID: "TRADEWEB",
}
m.onLogon(sessionID)
trade := m.trades["LST_X"]
require.NotNil(t, trade)
assert.Equal(t, quickfix.SessionID{}, trade.SessionID, "completed trades should not get session assigned")
}
// ---------------------------------------------------------------------------
// Group 8 — GetTrades filtra por active, GetAllTrades devuelve todos
// ---------------------------------------------------------------------------
func TestGetTrades_ReturnsOnlyActive(t *testing.T) {
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
trades := m.GetTrades()
assert.Len(t, trades, 1)
assert.Equal(t, "LST_ACTIVE", trades[0].QuoteReqID)
assert.Equal(t, domain.TradeStatusActive, trades[0].Status)
}
func TestGetAllTrades_ReturnsAll(t *testing.T) {
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
m := newTestManager(store)
require.NoError(t, m.loadTrades())
trades := m.GetAllTrades()
assert.Len(t, trades, 3)
statusMap := map[string]domain.TradeStatus{}
for _, tr := range trades {
statusMap[tr.QuoteReqID] = tr.Status
}
assert.Equal(t, domain.TradeStatusActive, statusMap["LST_ACTIVE"])
assert.Equal(t, domain.TradeStatusCompleted, statusMap["LST_DONE"])
assert.Equal(t, domain.TradeStatusRejected, statusMap["LST_REJ"])
}
// ---------------------------------------------------------------------------
// Group 9 — Error en store
// ---------------------------------------------------------------------------
func TestLoadTrades_StoreError_ReturnsError_MapEmpty(t *testing.T) {
store := &MockPersistenceStore{}
store.On("GetTodayMessages").Return([]domain.TradeMessage(nil), errors.New("db connection failed"))
m := newTestManager(store)
err := m.loadTrades()
assert.Error(t, err)
assert.NotNil(t, m.trades)
assert.Len(t, m.trades, 0)
store.AssertExpectations(t)
}

View File

@ -0,0 +1,56 @@
package fix
import (
"sync"
"github.com/stretchr/testify/mock"
"quantex.com/qfixdpl/src/domain"
)
// MockPersistenceStore is a testify mock implementing domain.PersistenceStore.
type MockPersistenceStore struct {
mock.Mock
}
func (m *MockPersistenceStore) SaveMessage(msg domain.TradeMessage) error {
args := m.Called(msg)
return args.Error(0)
}
func (m *MockPersistenceStore) SaveLog(entry domain.LogEntry) error {
args := m.Called(entry)
return args.Error(0)
}
func (m *MockPersistenceStore) GetTodayMessages() ([]domain.TradeMessage, error) {
args := m.Called()
msgs, _ := args.Get(0).([]domain.TradeMessage)
return msgs, args.Error(1)
}
func (m *MockPersistenceStore) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
args := m.Called(quoteReqID)
logs, _ := args.Get(0).(domain.Logs)
return logs, args.Error(1)
}
func (m *MockPersistenceStore) UpdateLogTradeID(quoteReqID, tradeID string) error {
args := m.Called(quoteReqID, tradeID)
return args.Error(0)
}
func (m *MockPersistenceStore) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
args := m.Called(quoteReqID)
log, _ := args.Get(0).(domain.FullTradeLog)
return log, args.Error(1)
}
// MockNotifier is a testify mock implementing domain.Notifier.
type MockNotifier struct {
mock.Mock
}
func (m *MockNotifier) SendMsg(chat domain.MessageChannel, text string, status domain.MessageStatus, wg *sync.WaitGroup) {
m.Called(chat, text, status, wg)
}

232
src/client/fix/parser.go Normal file
View File

@ -0,0 +1,232 @@
package fix
import (
"time"
"quantex.com/qfixdpl/quickfix"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
"quantex.com/qfixdpl/quickfix/gen/tag"
"quantex.com/qfixdpl/src/domain"
)
func extractHeader(msg *quickfix.Message) map[string]interface{} {
header := make(map[string]interface{})
if v, err := msg.Header.GetBytes(tag.BeginString); err == nil {
header["BeginString"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.MsgType); err == nil {
header["MsgType"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil {
header["SenderCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil {
header["TargetCompID"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.MsgSeqNum); err == nil {
header["MsgSeqNum"] = string(v)
}
if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil {
header["SendingTime"] = string(v)
}
return header
}
func parseQuoteRequest(msg quoterequest.QuoteRequest) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if relSyms, err := msg.GetNoRelatedSym(); err == nil && relSyms.Len() > 0 {
sym := relSyms.Get(0)
if v, e := sym.GetSecurityID(); e == nil {
body["SecurityID"] = v
}
if v, e := sym.GetSecurityIDSource(); e == nil {
body["SecurityIDSource"] = string(v)
}
if v, e := sym.GetCurrency(); e == nil {
body["Currency"] = v
}
if v, e := sym.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := sym.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := sym.GetSettlDate(); e == nil {
body["SettlDate"] = v
}
if v, e := sym.GetListID(); e == nil {
body["ListID"] = v
}
if v, e := sym.GetOwnerTraderID(); e == nil {
body["OwnerTraderID"] = v
}
if v, e := sym.GetNegotiationType(); e == nil {
body["NegotiationType"] = v
}
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "R",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseQuoteAck(msg quoteack.QuoteAck) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if v, e := msg.GetQuoteID(); e == nil {
body["QuoteID"] = v
}
if v, e := msg.GetQuoteAckStatus(); e == nil {
body["QuoteAckStatus"] = string(v)
}
if v, e := msg.GetText(); e == nil {
body["Text"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "CW",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseQuoteResponse(msg quoteresponse.QuoteResponse) domain.FixMessageJSON {
quoteReqID, _ := msg.GetQuoteReqID()
body := map[string]interface{}{"QuoteReqID": quoteReqID}
if v, e := msg.GetQuoteRespID(); e == nil {
body["QuoteRespID"] = v
}
if v, e := msg.GetQuoteRespType(); e == nil {
body["QuoteRespType"] = string(v)
}
if v, e := msg.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := msg.GetPrice(); e == nil {
body["Price"] = v.String()
}
if v, e := msg.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := msg.GetClOrdID(); e == nil {
body["ClOrdID"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "AJ",
QuoteReqID: quoteReqID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessageJSON {
clOrdID, _ := msg.GetClOrdID()
body := map[string]interface{}{"ClOrdID": clOrdID}
if v, e := msg.GetExecID(); e == nil {
body["ExecID"] = v
}
if v, e := msg.GetOrderID(); e == nil {
body["OrderID"] = v
}
if v, e := msg.GetExecType(); e == nil {
body["ExecType"] = string(v)
}
if v, e := msg.GetOrdStatus(); e == nil {
body["OrdStatus"] = string(v)
}
if v, e := msg.GetListID(); e == nil {
body["ListID"] = v
}
if v, e := msg.GetSide(); e == nil {
body["Side"] = string(v)
}
if v, e := msg.GetSymbol(); e == nil {
body["Symbol"] = v
}
if v, e := msg.GetSecurityID(); e == nil {
body["SecurityID"] = v
}
if v, e := msg.GetCurrency(); e == nil {
body["Currency"] = v
}
if v, e := msg.GetPrice(); e == nil {
body["Price"] = v.String()
}
if v, e := msg.GetLastPx(); e == nil {
body["LastPx"] = v.String()
}
if v, e := msg.GetLastQty(); e == nil {
body["LastQty"] = v.String()
}
if v, e := msg.GetOrderQty(); e == nil {
body["OrderQty"] = v.String()
}
if v, e := msg.GetSettlDate(); e == nil {
body["SettlDate"] = v
}
if v, e := msg.GetTradeID(); e == nil {
body["TradeID"] = v
}
return domain.FixMessageJSON{
Direction: "IN",
MsgType: "8",
QuoteReqID: clOrdID,
Header: extractHeader(msg.Message),
Body: body,
ReceiveTime: time.Now(),
}
}
// extractIdentifier extracts the trade identifier from a parsed FIX message.
// For ExecutionReport (8) and ExecutionAck (BN), uses ClOrdID (tag 11).
// For all other message types, uses QuoteReqID (tag 131).
func extractIdentifier(msg *quickfix.Message) string {
msgType, _ := msg.Header.GetBytes(tag.MsgType)
switch string(msgType) {
case "8", "BN":
var clOrdID quickfix.FIXString
if err := msg.Body.GetField(tag.ClOrdID, &clOrdID); err == nil {
return string(clOrdID)
}
default:
var quoteReqID quickfix.FIXString
if err := msg.Body.GetField(tag.QuoteReqID, &quoteReqID); err == nil {
return string(quoteReqID)
}
}
return ""
}
func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]interface{}) domain.FixMessageJSON {
return domain.FixMessageJSON{
Direction: "OUT",
MsgType: msgType,
QuoteReqID: quoteReqID,
Body: body,
ReceiveTime: time.Now(),
}
}

View File

@ -132,7 +132,7 @@ func getMessage(text string, status domain.MessageStatus) string {
'cardId': 'createCardMessage', 'cardId': 'createCardMessage',
'card': { 'card': {
'header': { 'header': {
'title': 'qfixdpl', 'title': 'QFIXDPL',
'subtitle': 'Notification', 'subtitle': 'Notification',
'imageUrl': '%s', 'imageUrl': '%s',
'imageType': 'CIRCLE' 'imageType': 'CIRCLE'

20
src/client/store/db.sql Normal file
View File

@ -0,0 +1,20 @@
CREATE TABLE IF NOT EXISTS qfixdpl_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quote_req_id TEXT NOT NULL,
trade_id TEXT,
j_message JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
CREATE INDEX IF NOT EXISTS idx_messages_trade_id ON qfixdpl_messages(trade_id);
CREATE TABLE IF NOT EXISTS qfixdpl_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quote_req_id TEXT NOT NULL UNIQUE,
trade_id TEXT,
raw_msg TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dpl_logs_trade_id ON qfixdpl_logs(trade_id);

View File

@ -2,7 +2,9 @@
package store package store
import ( import (
_ "embed"
"log/slog" "log/slog"
"strings"
"time" "time"
"quantex.com.ar/multidb" "quantex.com.ar/multidb"
@ -11,6 +13,9 @@ import (
"quantex.com/qfixdpl/src/common/tracerr" "quantex.com/qfixdpl/src/common/tracerr"
) )
//go:embed db.sql
var schemaSQL string
const dbPingSeconds = 30 const dbPingSeconds = 30
type Store struct { type Store struct {
@ -45,9 +50,31 @@ func New(config Config) (*Store, error) {
go s.db.PeriodicDBPing(time.Second * dbPingSeconds) go s.db.PeriodicDBPing(time.Second * dbPingSeconds)
if err := s.ensureTables(); err != nil {
return nil, tracerr.Errorf("error ensuring tables: %w", err)
}
return s, nil return s, nil
} }
func (p *Store) ensureTables() error {
statements := strings.Split(schemaSQL, ";")
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := p.db.Exec(stmt); err != nil {
return tracerr.Errorf("error executing schema statement: %w", err)
}
}
slog.Info("database tables ensured")
return nil
}
func (p *Store) CloseDB() { func (p *Store) CloseDB() {
p.db.Close() p.db.Close()
slog.Info("closing database connection.") slog.Info("closing database connection.")

View File

@ -0,0 +1,175 @@
package store
import (
"encoding/json"
"strings"
"time"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
func (p *Store) SaveMessage(msg domain.TradeMessage) error {
jsonBytes, err := json.Marshal(msg.JMessage)
if err != nil {
return tracerr.Errorf("error marshaling j_message: %w", err)
}
_, err = p.db.Exec(
"INSERT INTO qfixdpl_messages (quote_req_id, trade_id, j_message) VALUES ($1, NULLIF($2, ''), $3)",
msg.QuoteReqID, msg.TradeID, string(jsonBytes),
)
if err != nil {
return tracerr.Errorf("error inserting message: %w", err)
}
return nil
}
func (p *Store) SaveLog(entry domain.LogEntry) error {
upsertStmt := `INSERT INTO qfixdpl_logs (quote_req_id, raw_msg)
VALUES ($1, $2)
ON CONFLICT (quote_req_id) DO UPDATE
SET raw_msg = qfixdpl_logs.raw_msg || E'\n' || EXCLUDED.raw_msg,
updated_at = NOW()`
_, err := p.db.Exec(upsertStmt, entry.QuoteReqID, entry.RawMsg)
if err != nil {
return tracerr.Errorf("error upserting log: %w", err)
}
return nil
}
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
rows, err := p.db.Query(
"SELECT id, quote_req_id, trade_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
)
if err != nil {
return nil, tracerr.Errorf("error querying today messages: %w", err)
}
defer rows.Close()
var messages []domain.TradeMessage
for rows.Next() {
var (
id, quoteReqID string
tradeID *string
jMessageRaw []byte
createdAt time.Time
)
if err := rows.Scan(&id, &quoteReqID, &tradeID, &jMessageRaw, &createdAt); err != nil {
return nil, tracerr.Errorf("error scanning message row: %w", err)
}
var jMessage domain.FixMessageJSON
if err := json.Unmarshal(jMessageRaw, &jMessage); err != nil {
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
}
msg := domain.TradeMessage{
ID: id,
QuoteReqID: quoteReqID,
JMessage: jMessage,
CreatedAt: createdAt,
}
if tradeID != nil {
msg.TradeID = *tradeID
}
messages = append(messages, msg)
}
if err := rows.Err(); err != nil {
return nil, tracerr.Errorf("error iterating message rows: %w", err)
}
return messages, nil
}
func (p *Store) UpdateLogTradeID(quoteReqID, tradeID string) error {
_, err := p.db.Exec(
"UPDATE qfixdpl_logs SET trade_id = $1, updated_at = NOW() WHERE quote_req_id = $2",
tradeID, quoteReqID,
)
if err != nil {
return tracerr.Errorf("error updating log trade_id: %w", err)
}
return nil
}
func (p *Store) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
rows, err := p.db.Query(
"SELECT trade_id, raw_msg FROM qfixdpl_logs WHERE quote_req_id = $1", quoteReqID,
)
if err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error querying dpl log: %w", err)
}
defer rows.Close()
if !rows.Next() {
return domain.FullTradeLog{}, tracerr.Errorf("no log found for quoteReqID: %s", quoteReqID)
}
var (
tradeID *string
rawMsg string
)
if err := rows.Scan(&tradeID, &rawMsg); err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error scanning dpl log row: %w", err)
}
result := domain.FullTradeLog{
QuoteReqID: quoteReqID,
DPLEntries: strings.Split(rawMsg, "\n"),
}
if tradeID != nil && *tradeID != "" {
result.TradeID = *tradeID
ptRows, err := p.db.Query(
"SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", *tradeID,
)
if err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error querying pt logs: %w", err)
}
defer ptRows.Close()
if ptRows.Next() {
var ptRawMsg string
if err := ptRows.Scan(&ptRawMsg); err != nil {
return domain.FullTradeLog{}, tracerr.Errorf("error scanning pt log row: %w", err)
}
result.PTEntries = strings.Split(ptRawMsg, "\n")
}
}
return result, nil
}
func (p *Store) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
selectStmt := "SELECT raw_msg FROM qfixdpl_logs WHERE quote_req_id = '" + quoteReqID + "';"
response, err := p.db.Query(selectStmt)
if err != nil {
return domain.Logs{}, tracerr.Errorf("error querying logs: %w", err)
}
defer response.Close()
if !response.Next() {
return domain.Logs{}, nil
}
var rawMsg string
if err := response.Scan(&rawMsg); err != nil {
return domain.Logs{}, tracerr.Errorf("error scanning log row: %w", err)
}
return domain.Logs{Entries: strings.Split(rawMsg, "\n")}, nil
}

View File

@ -38,9 +38,8 @@ func Runner(cfg app.Config) error {
} }
userData := data.New() userData := data.New()
orderStore := data.NewOrderStore()
fixManager := fix.NewManager(cfg.FIX, orderStore, notify) fixManager := fix.NewManager(cfg.FIX, appStore, notify)
if err = fixManager.Start(); err != nil { if err = fixManager.Start(); err != nil {
return fmt.Errorf("error starting FIX acceptor: %w", err) return fmt.Errorf("error starting FIX acceptor: %w", err)
} }
@ -54,7 +53,7 @@ func Runner(cfg app.Config) error {
EnableJWTAuth: cfg.EnableJWTAuth, EnableJWTAuth: cfg.EnableJWTAuth,
} }
api := rest.New(userData, appStore, orderStore, fixManager, apiConfig, notify) api := rest.New(userData, appStore, fixManager, apiConfig, notify)
api.Run() api.Run()
cmd.WaitForInterruptSignal(nil) cmd.WaitForInterruptSignal(nil)

View File

@ -1,32 +0,0 @@
// Package domain defines all the domain models
package domain
import (
"time"
"github.com/shopspring/decimal"
)
// Order represents a FIX NewOrderSingle message received from a client.
type Order struct {
ClOrdID string
Symbol string
Side string
OrdType string
OrderQty decimal.Decimal
Price decimal.Decimal
SessionID string
ReceivedAt time.Time
}
// OrderStore is the port for persisting and retrieving orders.
type OrderStore interface {
SaveOrder(order Order)
GetOrders() []Order
GetOrderByClOrdID(id string) (Order, bool)
}
// FIXSender is the port for sending FIX messages back to clients.
type FIXSender interface {
SendQuote(clOrdID, quoteID, symbol, currency string, bidPx, offerPx, bidSize, offerSize decimal.Decimal) error
}

77
src/domain/persistence.go Normal file
View File

@ -0,0 +1,77 @@
// Package domain defines all the domain models
package domain
import "time"
// TradeStatus represents the lifecycle state of a List Trading trade.
type TradeStatus string
const (
TradeStatusActive TradeStatus = "active"
TradeStatusRejected TradeStatus = "rejected"
TradeStatusCompleted TradeStatus = "completed"
)
// ListTrade es la representacion exportada de un trade de List Trading.
type ListTrade struct {
QuoteReqID string `json:"quote_req_id"`
TradeID string `json:"trade_id,omitempty"`
ListID string `json:"list_id"`
Symbol string `json:"symbol"`
SecurityIDSrc string `json:"security_id_src"`
Currency string `json:"currency"`
Side string `json:"side"`
OrderQty string `json:"order_qty"`
SettlDate string `json:"settl_date"`
Price string `json:"price"`
OwnerTraderID string `json:"owner_trader_id"`
Status TradeStatus `json:"status"`
}
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
type FixMessageJSON struct {
Direction string `json:"direction"`
MsgType string `json:"msg_type"`
QuoteReqID string `json:"quote_req_id"`
Header map[string]interface{} `json:"header"`
Body map[string]interface{} `json:"body"`
ReceiveTime time.Time `json:"receive_time"`
}
// TradeMessage es una fila de qfixdpl_messages.
type TradeMessage struct {
ID string `json:"id"`
QuoteReqID string `json:"quote_req_id"`
TradeID string `json:"trade_id,omitempty"`
JMessage FixMessageJSON `json:"j_message"`
CreatedAt time.Time `json:"created_at"`
}
// LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs.
type LogEntry struct {
QuoteReqID string
RawMsg string
}
// Logs es la respuesta del endpoint GET /trades/:quoteReqID/logs.
type Logs struct {
Entries []string `json:"entries"`
}
// FullTradeLog es la respuesta del endpoint GET /trades/:id/full-log.
type FullTradeLog struct {
QuoteReqID string `json:"quote_req_id"`
TradeID string `json:"trade_id,omitempty"`
DPLEntries []string `json:"dpl_entries"`
PTEntries []string `json:"pt_entries,omitempty"`
}
// PersistenceStore define la interfaz de persistencia.
type PersistenceStore interface {
SaveMessage(msg TradeMessage) error
SaveLog(entry LogEntry) error
GetTodayMessages() ([]TradeMessage, error)
GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
UpdateLogTradeID(quoteReqID, tradeID string) error
GetFullTradeLog(id string) (FullTradeLog, error)
}