Compare commits
21 Commits
f4ef52e154
...
develop
| Author | SHA1 | Date | |
|---|---|---|---|
| 34b1cd2148 | |||
| c05621c508 | |||
| 149f18dea0 | |||
| b58c8df905 | |||
| 82d2e1b5f7 | |||
| 51ef6e182d | |||
| e17675d973 | |||
| 5f1d7038ac | |||
| 710772b052 | |||
| 4e62548091 | |||
| fbcaac95f5 | |||
| 3998726100 | |||
| 1f1c0afb9a | |||
| d1aff0212e | |||
| 0910b1e6c8 | |||
| 50c7f98c37 | |||
| 48373b6855 | |||
| 1d32854a09 | |||
| 7e26addd80 | |||
| 5053bfa9af | |||
| 557c04436d |
6
Makefile
6
Makefile
@ -56,13 +56,13 @@ build: check-env swag vendor only-build # Build a native version. Set e=environm
|
||||
|
||||
only-build: check-env
|
||||
@echo "Building for $(e) environment..."
|
||||
env OUT_PATH=$(DEFAULT_OUT_PATH) tools/build.sh $(e)
|
||||
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
|
||||
|
||||
linux-build: check-env swag # Build a linux version for prod environment. Set e=environment: prod, dev, demo, open-demo
|
||||
env OUT_PATH=$(DEFAULT_OUT_PATH) GOARCH=amd64 GOOS=linux tools/build.sh $(e)
|
||||
|
||||
deploy: check-env # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo
|
||||
tools/deploy.sh $(e)
|
||||
deploy: # Deploy to remote server. Set e=environment: prod, dev, demo, open-demo; s=serverName; i=instance; e.g. make deploy e=dev s=nonprodFix i=dpl
|
||||
make build e=$(e) && qscp build/out/distribution/qfixdpl.gz $(s):/home/quantex/qfixtb/$(i)/
|
||||
|
||||
fmt: download-versions # Apply the Go formatter to the code
|
||||
cd tools/check; unset GOPATH; GOBIN=$$PWD/../bin go install mvdan.cc/gofumpt@$(call get_version,gofumpt);
|
||||
|
||||
341
docs/FLOW_8_4_LIST_TRADING.md
Normal file
341
docs/FLOW_8_4_LIST_TRADING.md
Normal file
@ -0,0 +1,341 @@
|
||||
# Flow 8.4 — List Trading (Dealer Response)
|
||||
|
||||
## Overview
|
||||
|
||||
Flow 8.4 defines how a **dealer** responds to a client's Request for Quote (RFQ) through Tradeweb (TW) in a List Trading context. The key rule is:
|
||||
|
||||
> **The dealer NEVER sends an ExecutionReport (35=8). Only Tradeweb does.**
|
||||
|
||||
The dealer's role is limited to:
|
||||
- Acknowledging messages (35=AI QuoteStatusReport, 35=BN ExecutionAck)
|
||||
- Sending a price quote (35=S)
|
||||
|
||||
This document covers the **happy path** (client accepts) and two **alternative flows**:
|
||||
- **Flow 8.6 — Trade Ended:** Client cancels before or after receiving the quote
|
||||
- **QuoteAck Rejected:** TW rejects the dealer's quote
|
||||
|
||||
## Participants
|
||||
|
||||
| Abbreviation | Role |
|
||||
|---|---|
|
||||
| **TW** | Tradeweb — the platform that orchestrates the trade |
|
||||
| **Dealer** | Us — responds to RFQs with quotes and acknowledges TW messages |
|
||||
| **Client** | The counterparty requesting a quote (we never communicate with them directly) |
|
||||
|
||||
## Message Flow
|
||||
|
||||
```
|
||||
TW Dealer
|
||||
│ │
|
||||
│ 1. QuoteRequest (35=R) │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 2. QuoteStatusReport (35=AI) [ACK] │
|
||||
│ <───────────────────────────────────────── │
|
||||
│ │
|
||||
│ 3. Quote (35=S) [price] │
|
||||
│ <───────────────────────────────────────── │
|
||||
│ │
|
||||
│ 4. QuoteAck (35=CW) [ACCEPTED] │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 5. QuoteResponse (35=AJ) [Hit/Lift] │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 6. QuoteStatusReport (35=AI) [TRDREQACK] │
|
||||
│ <───────────────────────────────────────── │
|
||||
│ │
|
||||
│ 7. ExecutionReport (35=8) [_LISTEND] │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 8. ExecutionAck (35=BN) │
|
||||
│ <───────────────────────────────────────── │
|
||||
│ │
|
||||
│ 9. ExecutionReport (35=8) [_TRDEND] │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 10. ExecutionAck (35=BN) │
|
||||
│ <───────────────────────────────────────── │
|
||||
│ │
|
||||
│ 11. ExecutionReport (35=8) [_TRDSUMM] │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 12. ExecutionAck (35=BN) │
|
||||
│ <───────────────────────────────────────── │
|
||||
│ │
|
||||
```
|
||||
|
||||
## Step-by-Step Detail
|
||||
|
||||
### Step 1 — QuoteRequest (35=R) — TW → Dealer
|
||||
|
||||
TW sends an RFQ on behalf of the client. Key fields:
|
||||
|
||||
| Tag | Field | Example | Notes |
|
||||
|-----|-------|---------|-------|
|
||||
| 131 | QuoteReqID | `LST_20260316_BYMA_CORI_NY1567246.1_1` | Always starts with `LST_` for List Trading |
|
||||
| 66 | ListID | `NY1567246.1` | Identifies the list/inquiry |
|
||||
| 48 | SecurityID | `040114HT0` | Bond identifier |
|
||||
| 22 | SecurityIDSource | `1` (CUSIP) | Could also be `4` (ISIN) |
|
||||
| 54 | Side | `2` (SELL) | The client's side — if client sells, dealer buys |
|
||||
| 38 | OrderQty | `10000` | Quantity requested |
|
||||
| 15 | Currency | `USD` | Settlement currency |
|
||||
| 64 | SettlDate | `20260317` | Settlement date |
|
||||
| 20073 | NegotiationType | `RFQ` | Must be RFQ for this flow |
|
||||
|
||||
**Validation:** The dealer must verify `LST_` prefix, non-empty `ListID`, and `NegotiationType=RFQ` before proceeding.
|
||||
|
||||
### Step 2 — QuoteStatusReport (35=AI) — Dealer → TW
|
||||
|
||||
The dealer acknowledges receipt of the QuoteRequest.
|
||||
|
||||
| Tag | Field | Value |
|
||||
|-----|-------|-------|
|
||||
| 131 | QuoteReqID | Same as received |
|
||||
| 117 | QuoteID | Same as QuoteReqID |
|
||||
| 297 | QuoteStatus | `0` (ACCEPTED) |
|
||||
|
||||
### Step 3 — Quote (35=S) — Dealer → TW
|
||||
|
||||
The dealer sends a price quote.
|
||||
|
||||
| Tag | Field | Example | Notes |
|
||||
|-----|-------|---------|-------|
|
||||
| 117 | QuoteID | Same as QuoteReqID | |
|
||||
| 131 | QuoteReqID | Same as received | |
|
||||
| 132 | BidPx | `99.60000000` | Set when client side is SELL (dealer bids) |
|
||||
| 133 | OfferPx | `99.60000000` | Set when client side is BUY (dealer offers) |
|
||||
| 44 | Price | `99.60000000` | The quote price |
|
||||
| 423 | PriceType | `1` (Percentage) | |
|
||||
| 537 | QuoteType | `211` (SEND_QUOTE) | |
|
||||
|
||||
### Step 4 — QuoteAck (35=CW) — TW → Dealer
|
||||
|
||||
TW confirms the quote was accepted.
|
||||
|
||||
| Tag | Field | Value |
|
||||
|-----|-------|-------|
|
||||
| 1865 | QuoteAckStatus | `1` (ACCEPTED) |
|
||||
|
||||
**Note:** If status is not ACCEPTED, the dealer logs the rejection (including the `Text` field) and cleans up the trade from memory. See [QuoteAck Rejected](#quoteack-rejected-quote-not-accepted) below.
|
||||
|
||||
### Step 5 — QuoteResponse (35=AJ) — TW → Dealer
|
||||
|
||||
The client has decided to trade (Hit/Lift the quote).
|
||||
|
||||
| Tag | Field | Value | Notes |
|
||||
|-----|-------|-------|-------|
|
||||
| 694 | QuoteRespType | `1` (Hit/Lift) | `2` would be Counter — that's flow 8.5, not handled here |
|
||||
| 693 | QuoteRespID | `..._TRDREQ` | Always ends with `_TRDREQ` |
|
||||
|
||||
### Step 6 — QuoteStatusReport (35=AI) — Dealer → TW
|
||||
|
||||
The dealer acknowledges the trade request (TRDREQACK).
|
||||
|
||||
| Tag | Field | Value |
|
||||
|-----|-------|-------|
|
||||
| 131 | QuoteReqID | Same as received |
|
||||
| 693 | QuoteRespID | Same as received |
|
||||
| 297 | QuoteStatus | `0` (ACCEPTED) |
|
||||
|
||||
### Step 7 — ExecutionReport (35=8) `_LISTEND` — TW → Dealer
|
||||
|
||||
TW signals that the due-in window for the list has closed.
|
||||
|
||||
| Tag | Field | Value | Notes |
|
||||
|-----|-------|-------|-------|
|
||||
| 17 | ExecID | `..._LISTEND-{timestamp}` | Contains `_LISTEND` |
|
||||
| 150 | ExecType | `A` (PendingNew) | |
|
||||
| 39 | OrdStatus | `A` (PendingNew) | |
|
||||
|
||||
**Dealer action:** Send ExecutionAck only. **Do NOT send an ExecutionReport back.**
|
||||
|
||||
### Step 8 — ExecutionAck (35=BN) — Dealer → TW
|
||||
|
||||
| Tag | Field | Value |
|
||||
|-----|-------|-------|
|
||||
| 37 | OrderID | Same as received |
|
||||
| 17 | ExecID | Same as received |
|
||||
| 1036 | ExecAckStatus | `1` (ACCEPTED) |
|
||||
|
||||
### Step 9 — ExecutionReport (35=8) `_TRDEND` — TW → Dealer
|
||||
|
||||
TW sends the trade result.
|
||||
|
||||
| Tag | Field | Value | Notes |
|
||||
|-----|-------|-------|-------|
|
||||
| 17 | ExecID | `..._TRDEND-{timestamp}` | Contains `_TRDEND` |
|
||||
| 150 | ExecType | `F` (Trade) | The trade was executed |
|
||||
| 39 | OrdStatus | `2` (Filled) | |
|
||||
| 44 | Price | `99.6` | Final execution price |
|
||||
|
||||
**Dealer action:** Send ExecutionAck. Clean up internal trade tracking state.
|
||||
|
||||
### Step 10 — ExecutionAck (35=BN) — Dealer → TW
|
||||
|
||||
Same format as Step 8.
|
||||
|
||||
### Step 11 — ExecutionReport (35=8) `_TRDSUMM` — TW → Dealer
|
||||
|
||||
TW sends the full trade summary with additional details (parties, settlement info, trade IDs).
|
||||
|
||||
| Tag | Field | Value | Notes |
|
||||
|-----|-------|-------|-------|
|
||||
| 17 | ExecID | `..._TRDSUMM-{timestamp}` | Contains `_TRDSUMM` |
|
||||
| 150 | ExecType | `F` (Trade) | |
|
||||
| 39 | OrdStatus | `2` (Filled) | |
|
||||
| 453 | NoPartyIDs | Party information | Counterparty details |
|
||||
| 526 | SecondaryClOrdID | `TRD_...` | Tradeweb trade reference |
|
||||
| 1003 | TradeID | `20260316.BYMA.CORI.230` | Unique trade identifier |
|
||||
|
||||
**Dealer action:** Send ExecutionAck. Log the summary for audit/reconciliation.
|
||||
|
||||
### Step 12 — ExecutionAck (35=BN) — Dealer → TW
|
||||
|
||||
Same format as Step 8.
|
||||
|
||||
---
|
||||
|
||||
## Alternative Flows
|
||||
|
||||
### Flow 8.6 — Trade Ended (Client Cancels)
|
||||
|
||||
The client changes their mind and ends the trade. This can happen at any point after the QuoteRequest — even before the dealer's quote arrives. TW informs the dealer via QuoteResponse (35=AJ) messages with `_TRDEND` and `_TRDSUMM` suffixes instead of the ExecutionReport chain.
|
||||
|
||||
> **Critical:** The dealer MUST send a QuoteStatusReport (35=AI) ACK for every QuoteResponse. If no ACK is sent, TW will retry the message indefinitely (~every 11 seconds).
|
||||
|
||||
```
|
||||
TW Dealer
|
||||
│ │
|
||||
│ 1. QuoteRequest (35=R) │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 2. QuoteStatusReport (35=AI) [ACK] │
|
||||
│ <───────────────────────────────────────── │
|
||||
│ │
|
||||
│ ┌─── Client ends trade ───┐ │
|
||||
│ │ Meanwhile, dealer may │ │
|
||||
│ │ still send Quote (S) │ │
|
||||
│ └─────────────────────────┘ │
|
||||
│ │
|
||||
│ 3. QuoteResponse (35=AJ) [_TRDEND] │
|
||||
│ QuoteRespType=7 (End Trade) │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 4. QuoteStatusReport (35=AI) [ACK] │
|
||||
│ <───────────────────────────────────────── │
|
||||
│ │
|
||||
│ 5. QuoteAck (35=CW) [REJECTED] │
|
||||
│ (if quote was sent, TW rejects it) │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 6. QuoteResponse (35=AJ) [_TRDSUMM] │
|
||||
│ QuoteRespType=7, TradeSummary=Y │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ 7. QuoteStatusReport (35=AI) [ACK] │
|
||||
│ <───────────────────────────────────────── │
|
||||
│ │
|
||||
```
|
||||
|
||||
#### Step 3 — QuoteResponse (35=AJ) `_TRDEND` — TW → Dealer
|
||||
|
||||
TW notifies that the client ended the trade.
|
||||
|
||||
| Tag | Field | Value | Notes |
|
||||
|-----|-------|-------|-------|
|
||||
| 693 | QuoteRespID | `..._TRDEND` | Suffix identifies this as trade end |
|
||||
| 694 | QuoteRespType | `7` (End Trade) | |
|
||||
| 131 | QuoteReqID | Same as original | |
|
||||
|
||||
**Dealer action:** Send QuoteStatusReport (35=AI) with `693=QuoteRespID` and `297=0` (ACCEPTED).
|
||||
|
||||
#### Step 5 — QuoteAck (35=CW) `REJECTED` — TW → Dealer
|
||||
|
||||
If the dealer's Quote (35=S) crossed with the TRDEND, TW rejects it. The QuoteAckStatus will be `2` (REJECTED) with a text like "DPL DLRQUOTE received in an invalid state."
|
||||
|
||||
**Dealer action:** Log the rejection and clean up the trade from memory.
|
||||
|
||||
#### Step 6 — QuoteResponse (35=AJ) `_TRDSUMM` — TW → Dealer
|
||||
|
||||
TW sends the final trade summary confirming the outcome.
|
||||
|
||||
| Tag | Field | Value | Notes |
|
||||
|-----|-------|-------|-------|
|
||||
| 693 | QuoteRespID | `..._TRDSUMM` | Final summary message |
|
||||
| 694 | QuoteRespType | `7` (End Trade) | |
|
||||
| 22636 | TradeSummary | `Y` | Confirms this is the summary |
|
||||
|
||||
**Dealer action:** Send QuoteStatusReport (35=AI) ACK. Clean up the trade from memory. This is the **terminal message** — no more messages will follow for this QuoteReqID.
|
||||
|
||||
---
|
||||
|
||||
### QuoteAck Rejected (Quote Not Accepted)
|
||||
|
||||
If TW rejects the dealer's Quote (35=CW with status != ACCEPTED), the trade is dead from the dealer's perspective.
|
||||
|
||||
```
|
||||
TW Dealer
|
||||
│ │
|
||||
│ 1-3. (same as happy path) │
|
||||
│ │
|
||||
│ 4. QuoteAck (35=CW) [REJECTED] │
|
||||
│ QuoteAckStatus != 1 │
|
||||
│ ─────────────────────────────────────────> │
|
||||
│ │
|
||||
│ Trade is terminated. │
|
||||
│ │
|
||||
```
|
||||
|
||||
**Dealer action:** Log the rejection (including the `Text` field with the reason) and remove the trade from memory. No further action needed — TW may or may not send subsequent messages for this QuoteReqID.
|
||||
|
||||
---
|
||||
|
||||
## QuoteRespID Suffix Routing in `handleQuoteResponse`
|
||||
|
||||
All QuoteResponse (35=AJ) messages are routed by the suffix of the `QuoteRespID` (tag 693):
|
||||
|
||||
```
|
||||
QuoteRespID ends with "_TRDREQ" → Trade request (flow 8.4 happy path) — ACK
|
||||
QuoteRespID ends with "_TRDEND" → Trade ended by client (flow 8.6) — ACK
|
||||
QuoteRespID ends with "_TRDSUMM" → Trade summary (flow 8.6 final) — ACK + cleanup
|
||||
QuoteRespID ends with "_LISTEND" → List ended — ACK
|
||||
Other suffix → Ignored (logged)
|
||||
```
|
||||
|
||||
## Code Reference
|
||||
|
||||
The implementation lives in `src/client/fix/manager.go`:
|
||||
|
||||
| Handler | Triggers on | Action |
|
||||
|---------|------------|--------|
|
||||
| `handleQuoteRequest` | 35=R | Sends 35=AI (ack) + 35=S (quote) |
|
||||
| `handleQuoteAck` | 35=CW | If rejected: logs + cleans up trade. If accepted: logs |
|
||||
| `handleQuoteResponse` | 35=AJ | Sends 35=AI (ACK). Routes by QuoteRespID suffix. Cleans up on `_TRDSUMM` |
|
||||
| `handleExecutionReport` | 35=8 | Sends 35=BN (ack) + routes by ExecID suffix |
|
||||
| `sendQuoteStatusReport` | — | Builds 35=AI for QuoteRequest ack |
|
||||
| `sendTradeRequestAck` | — | Builds 35=AI for QuoteResponse ack (all suffixes) |
|
||||
| `sendExecutionAck` | — | Builds 35=BN for ExecutionReport ack |
|
||||
|
||||
### ExecID Routing in `handleExecutionReport`
|
||||
|
||||
```
|
||||
ExecID contains "_LISTEND" → Log only, await trade result
|
||||
ExecID contains "_TRDEND" → Log trade end
|
||||
ExecID contains "_TRDSUMM" → Log trade summary + cleanup trade from memory
|
||||
ExecType = F (fallback) → Log generic trade result
|
||||
```
|
||||
|
||||
The order matters: ExecID suffix checks run before ExecType checks, because `_TRDEND` and `_TRDSUMM` both have `ExecType=F`.
|
||||
|
||||
### Trade Cleanup Paths
|
||||
|
||||
A trade is removed from memory in any of these scenarios:
|
||||
|
||||
| Trigger | Message | Condition |
|
||||
|---------|---------|-----------|
|
||||
| QuoteAck rejected | 35=CW | `QuoteAckStatus != ACCEPTED` |
|
||||
| QuoteResponse summary | 35=AJ | `QuoteRespID` ends with `_TRDSUMM` (flow 8.6) |
|
||||
| ExecutionReport summary | 35=8 | `ExecID` contains `_TRDSUMM` (flow 8.4) |
|
||||
|
||||
The `loadActiveTrades` recovery function replays today's messages and applies the same cleanup rules to reconstruct accurate state on restart.
|
||||
15
fix.cfg
Normal file
15
fix.cfg
Normal file
@ -0,0 +1,15 @@
|
||||
[DEFAULT]
|
||||
ConnectionType=acceptor
|
||||
HeartBtInt=30
|
||||
SenderCompID=QUANTEX
|
||||
ResetOnLogon=Y
|
||||
FileStorePath=fix_store
|
||||
FileLogPath=fix_logs
|
||||
|
||||
[SESSION]
|
||||
BeginString=FIXT.1.1
|
||||
DefaultApplVerID=FIX.5.0SP2
|
||||
TargetCompID=CLIENT
|
||||
StartTime=00:00:00
|
||||
EndTime=00:00:00
|
||||
SocketAcceptPort=5001
|
||||
4
main.go
4
main.go
@ -178,6 +178,10 @@ func parseLogLevel(level string) (slog.Level, error) {
|
||||
|
||||
func startRunner(runner, globalCfg, serviceCfg string) {
|
||||
var fn func(cfg app.Config) error
|
||||
|
||||
if runner == "" {
|
||||
runner = "service"
|
||||
}
|
||||
switch runner {
|
||||
case "service":
|
||||
fn = service.Runner
|
||||
|
||||
@ -36,6 +36,11 @@ type Service struct {
|
||||
AuthorizedServices map[string]AuthorizedService `toml:"AuthorizedServices"`
|
||||
APIBasePort string
|
||||
EnableJWTAuth bool // Enable JWT authentication for service-to-service communication
|
||||
FIX FIXConfig
|
||||
}
|
||||
|
||||
type FIXConfig struct {
|
||||
SettingsFile string // path to fix.cfg file
|
||||
}
|
||||
|
||||
type ExtAuth struct {
|
||||
|
||||
@ -3,7 +3,6 @@ package version
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
)
|
||||
@ -38,17 +37,17 @@ type EnvironmentType int //nolint:recvcheck // The methods of this are autogener
|
||||
var environment EnvironmentType //nolint:gochecknoglobals // Just keept this global to avoid having to create an instance
|
||||
|
||||
func init() {
|
||||
aux := os.Getenv(quantexEnvironment)
|
||||
if aux == "" {
|
||||
panic("QUANTEX_ENVIRONMENT is not set")
|
||||
}
|
||||
// aux := os.Getenv(EnvironmentTypeDev)
|
||||
// if aux == "" {
|
||||
// panic("QUANTEX_ENVIRONMENT is not set")
|
||||
// }
|
||||
|
||||
env, err := ParseEnvironmentType(aux)
|
||||
if err != nil {
|
||||
panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error())
|
||||
}
|
||||
// env, err := ParseEnvironmentType(aux)
|
||||
// if err != nil {
|
||||
// panic("Invalid QUANTEX_ENVIRONMENT value: " + aux + " " + err.Error())
|
||||
// }
|
||||
|
||||
environment = env
|
||||
environment = EnvironmentTypeDev
|
||||
}
|
||||
|
||||
// Base returns the version base name
|
||||
|
||||
@ -29,23 +29,25 @@ const (
|
||||
)
|
||||
|
||||
type Controller struct {
|
||||
pool *redis.Pool
|
||||
userData app.UserDataProvider
|
||||
store *store.Store
|
||||
config Config
|
||||
notify domain.Notifier
|
||||
authMutex deadlock.Mutex
|
||||
pool *redis.Pool
|
||||
userData app.UserDataProvider
|
||||
store *store.Store
|
||||
tradeProvider TradeProvider
|
||||
config Config
|
||||
notify domain.Notifier
|
||||
authMutex deadlock.Mutex
|
||||
}
|
||||
|
||||
func newController(pool *redis.Pool, userData app.UserDataProvider,
|
||||
s *store.Store, config Config, n domain.Notifier,
|
||||
s *store.Store, tradeProvider TradeProvider, config Config, n domain.Notifier,
|
||||
) *Controller {
|
||||
return &Controller{
|
||||
pool: pool,
|
||||
userData: userData,
|
||||
store: s,
|
||||
config: config,
|
||||
notify: n,
|
||||
pool: pool,
|
||||
userData: userData,
|
||||
store: s,
|
||||
tradeProvider: tradeProvider,
|
||||
config: config,
|
||||
notify: n,
|
||||
}
|
||||
}
|
||||
|
||||
@ -288,3 +290,72 @@ func allowed(origin string, config Config) bool {
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// GetTrades godoc
|
||||
// @Summary List active trades
|
||||
// @Description Returns only active List Trading trades
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Success 200 {array} domain.ListTrade
|
||||
// @Router /qfixdpl/v1/trades [get]
|
||||
func (cont *Controller) GetTrades(ctx *gin.Context) {
|
||||
trades := cont.tradeProvider.GetTrades()
|
||||
ctx.JSON(http.StatusOK, trades)
|
||||
}
|
||||
|
||||
// GetAllTrades godoc
|
||||
// @Summary List all trades
|
||||
// @Description Returns all List Trading trades (active, rejected, completed)
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Success 200 {array} domain.ListTrade
|
||||
// @Router /qfixdpl/v1/trades/all [get]
|
||||
func (cont *Controller) GetAllTrades(ctx *gin.Context) {
|
||||
trades := cont.tradeProvider.GetAllTrades()
|
||||
ctx.JSON(http.StatusOK, trades)
|
||||
}
|
||||
|
||||
// GetLogs godoc
|
||||
// @Summary Get raw FIX logs for a trade
|
||||
// @Description Returns raw FIX message logs for a given QuoteReqID
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Param quoteReqID path string true "QuoteReqID"
|
||||
// @Success 200 {object} domain.Logs
|
||||
// @Router /qfixdpl/v1/trades/{quoteReqID}/logs [get]
|
||||
func (cont *Controller) GetLogs(ctx *gin.Context) {
|
||||
quoteReqID := ctx.Param("quoteReqID")
|
||||
|
||||
logs, err := cont.store.GetLogsByQuoteReqID(quoteReqID)
|
||||
if err != nil {
|
||||
slog.Error("GetLogs: error fetching logs", "quoteReqID", quoteReqID, "error", err)
|
||||
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching logs"})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, logs)
|
||||
}
|
||||
|
||||
// GetFullTradeLog godoc
|
||||
// @Summary Get full trade lifecycle log
|
||||
// @Description Returns DPL and Post-Trade raw FIX logs for a given trade (by QuoteReqID)
|
||||
// @Tags fix
|
||||
// @Produce json
|
||||
// @Param quoteReqID path string true "QuoteReqID"
|
||||
// @Success 200 {object} domain.FullTradeLog
|
||||
// @Router /qfixdpl/v1/trades/{quoteReqID}/full-log [get]
|
||||
func (cont *Controller) GetFullTradeLog(ctx *gin.Context) {
|
||||
quoteReqID := ctx.Param("quoteReqID")
|
||||
|
||||
fullLog, err := cont.store.GetFullTradeLog(quoteReqID)
|
||||
if err != nil {
|
||||
slog.Error("GetFullTradeLog: error fetching full trade log", "quoteReqID", quoteReqID, "error", err)
|
||||
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "error fetching full trade log"})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, fullLog)
|
||||
}
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package rest
|
||||
|
||||
|
||||
type HTTPError struct {
|
||||
Error string
|
||||
}
|
||||
@ -16,3 +17,4 @@ type Credentials struct {
|
||||
type Session struct {
|
||||
Email string
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,9 @@ func SetRoutes(api *API) {
|
||||
qfixdpl := v1.Group("/")
|
||||
qfixdpl.Use(cont.AuthRequired)
|
||||
qfixdpl.GET("/health", cont.HealthCheck)
|
||||
qfixdpl.GET("/trades", cont.GetTrades)
|
||||
qfixdpl.GET("/trades/all", cont.GetAllTrades)
|
||||
qfixdpl.GET("/trades/:quoteReqID/full-log", cont.GetFullTradeLog)
|
||||
|
||||
backoffice := qfixdpl.Group("/backoffice")
|
||||
backoffice.Use(cont.BackOfficeUser)
|
||||
|
||||
@ -16,6 +16,12 @@ import (
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
// TradeProvider exposes trade data from the FIX manager.
|
||||
type TradeProvider interface {
|
||||
GetTrades() []domain.ListTrade
|
||||
GetAllTrades() []domain.ListTrade
|
||||
}
|
||||
|
||||
const RedisMaxIdle = 3000 // In ms
|
||||
|
||||
type API struct {
|
||||
@ -32,7 +38,7 @@ type Config struct {
|
||||
EnableJWTAuth bool
|
||||
}
|
||||
|
||||
func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier) *API {
|
||||
func New(userData app.UserDataProvider, storeInstance *store.Store, tradeProvider TradeProvider, config Config, notify domain.Notifier) *API {
|
||||
// Set up Gin
|
||||
var engine *gin.Engine
|
||||
if version.Environment() == version.EnvironmentTypeProd {
|
||||
@ -58,7 +64,7 @@ func New(userData app.UserDataProvider, storeInstance *store.Store, config Confi
|
||||
}
|
||||
|
||||
api := &API{
|
||||
Controller: newController(NewPool(), userData, storeInstance, config, notify),
|
||||
Controller: newController(NewPool(), userData, storeInstance, tradeProvider, config, notify),
|
||||
Router: engine,
|
||||
Port: config.Port,
|
||||
}
|
||||
|
||||
230
src/client/fix/application.go
Normal file
230
src/client/fix/application.go
Normal file
@ -0,0 +1,230 @@
|
||||
// Package fix implements the QuickFIX initiator application.
|
||||
package fix
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
|
||||
"quantex.com/qfixdpl/quickfix"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
|
||||
"quantex.com/qfixdpl/quickfix/gen/tag"
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
type application struct {
|
||||
router *quickfix.MessageRouter
|
||||
notifier domain.Notifier
|
||||
onLogon func(quickfix.SessionID)
|
||||
onLogout func(quickfix.SessionID)
|
||||
onQuote func(quote.Quote, quickfix.SessionID)
|
||||
onQuoteRequest func(quoterequest.QuoteRequest, quickfix.SessionID)
|
||||
onQuoteAck func(quoteack.QuoteAck, quickfix.SessionID)
|
||||
onQuoteResponse func(quoteresponse.QuoteResponse, quickfix.SessionID)
|
||||
onExecutionReport func(executionreport.ExecutionReport, quickfix.SessionID)
|
||||
onExecutionAck func(executionack.ExecutionAck, quickfix.SessionID)
|
||||
onRawMessage func(direction string, msg *quickfix.Message)
|
||||
}
|
||||
|
||||
func newApplication(n domain.Notifier) *application {
|
||||
app := &application{
|
||||
router: quickfix.NewMessageRouter(),
|
||||
notifier: n,
|
||||
}
|
||||
|
||||
app.router.AddRoute(quote.Route(app.handleQuote))
|
||||
app.router.AddRoute(quoteack.Route(app.handleQuoteAck))
|
||||
app.router.AddRoute(quoterequest.Route(app.handleQuoteRequest))
|
||||
app.router.AddRoute(quoteresponse.Route(app.handleQuoteResponse))
|
||||
app.router.AddRoute(executionack.Route(app.handleExecutionAck))
|
||||
app.router.AddRoute(executionreport.Route(app.handleExecutionReport))
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
func (a *application) OnCreate(sessionID quickfix.SessionID) {
|
||||
slog.Info("FIX session created", "session", sessionID.String())
|
||||
}
|
||||
|
||||
func (a *application) OnLogon(sessionID quickfix.SessionID) {
|
||||
slog.Info("FIX session logged on", "session", sessionID.String())
|
||||
if a.onLogon != nil {
|
||||
a.onLogon(sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *application) OnLogout(sessionID quickfix.SessionID) {
|
||||
slog.Info("FIX session logged out", "session", sessionID.String())
|
||||
|
||||
go a.notifier.SendMsg(domain.MessageChannelError, "Logout", domain.MessageStatusWarning, nil)
|
||||
|
||||
if a.onLogout != nil {
|
||||
a.onLogout(sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *application) ToAdmin(_ *quickfix.Message, _ quickfix.SessionID) {}
|
||||
|
||||
func (a *application) ToApp(msg *quickfix.Message, _ quickfix.SessionID) error {
|
||||
if a.onRawMessage != nil {
|
||||
a.onRawMessage("OUT", msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *application) FromAdmin(_ *quickfix.Message, _ quickfix.SessionID) quickfix.MessageRejectError {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *application) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||
beginString, _ := msg.Header.GetBytes(tag.BeginString)
|
||||
msgType, _ := msg.Header.GetBytes(tag.MsgType)
|
||||
|
||||
var applVerID quickfix.FIXString
|
||||
msg.Header.GetField(tag.ApplVerID, &applVerID)
|
||||
|
||||
slog.Info("FIX FromApp received",
|
||||
"beginString", string(beginString),
|
||||
"msgType", string(msgType),
|
||||
"applVerID", string(applVerID),
|
||||
"session", sessionID.String(),
|
||||
"rawMsg", msg.String(),
|
||||
)
|
||||
|
||||
if a.onRawMessage != nil {
|
||||
a.onRawMessage("IN", msg)
|
||||
}
|
||||
|
||||
rejErr := a.router.Route(msg, sessionID)
|
||||
if rejErr != nil {
|
||||
slog.Error("FIX FromApp routing failed",
|
||||
"msgType", string(msgType),
|
||||
"error", rejErr.Error(),
|
||||
"isBusinessReject", rejErr.IsBusinessReject(),
|
||||
)
|
||||
}
|
||||
|
||||
return rejErr
|
||||
}
|
||||
|
||||
func (a *application) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||
quoteReqID, err := msg.GetQuoteReqID()
|
||||
if err != nil {
|
||||
slog.Error("QuoteRequest missing QuoteReqID", "error", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Info("QuoteRequest received",
|
||||
"quoteReqID", quoteReqID,
|
||||
"session", sessionID.String(),
|
||||
)
|
||||
|
||||
if a.onQuoteRequest != nil {
|
||||
a.onQuoteRequest(msg, sessionID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *application) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
quoteID, _ := msg.GetQuoteID()
|
||||
status, _ := msg.GetQuoteAckStatus()
|
||||
text, _ := msg.GetText()
|
||||
|
||||
slog.Info("QuoteAck received",
|
||||
"quoteReqID", quoteReqID,
|
||||
"quoteID", quoteID,
|
||||
"quoteAckStatus", status,
|
||||
"text", text,
|
||||
"session", sessionID.String(),
|
||||
)
|
||||
|
||||
if a.onQuoteAck != nil {
|
||||
a.onQuoteAck(msg, sessionID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *application) handleQuote(msg quote.Quote, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||
quoteID, err := msg.GetQuoteID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
symbol, _ := msg.GetSymbol()
|
||||
|
||||
slog.Info("Quote received", "quoteID", quoteID, "symbol", symbol, "session", sessionID.String())
|
||||
|
||||
if a.onQuote != nil {
|
||||
a.onQuote(msg, sessionID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *application) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||
quoteRespID, _ := msg.GetQuoteRespID()
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
quoteRespType, _ := msg.GetQuoteRespType()
|
||||
|
||||
slog.Info("QuoteResponse received",
|
||||
"quoteRespID", quoteRespID,
|
||||
"quoteReqID", quoteReqID,
|
||||
"quoteRespType", quoteRespType,
|
||||
"session", sessionID.String(),
|
||||
)
|
||||
|
||||
if a.onQuoteResponse != nil {
|
||||
a.onQuoteResponse(msg, sessionID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *application) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||
execID, _ := msg.GetExecID()
|
||||
orderID, _ := msg.GetOrderID()
|
||||
status, _ := msg.GetExecAckStatus()
|
||||
|
||||
slog.Info("ExecutionAck received",
|
||||
"execID", execID,
|
||||
"orderID", orderID,
|
||||
"execAckStatus", status,
|
||||
"session", sessionID.String(),
|
||||
)
|
||||
|
||||
if a.onExecutionAck != nil {
|
||||
a.onExecutionAck(msg, sessionID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *application) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) quickfix.MessageRejectError {
|
||||
execID, _ := msg.GetExecID()
|
||||
orderID, _ := msg.GetOrderID()
|
||||
listID, _ := msg.GetListID()
|
||||
execType, _ := msg.GetExecType()
|
||||
ordStatus, _ := msg.GetOrdStatus()
|
||||
|
||||
slog.Info("ExecutionReport received",
|
||||
"execID", execID,
|
||||
"orderID", orderID,
|
||||
"listID", listID,
|
||||
"execType", execType,
|
||||
"ordStatus", ordStatus,
|
||||
"session", sessionID.String(),
|
||||
)
|
||||
|
||||
if a.onExecutionReport != nil {
|
||||
a.onExecutionReport(msg, sessionID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
687
src/client/fix/manager.go
Normal file
687
src/client/fix/manager.go
Normal file
@ -0,0 +1,687 @@
|
||||
package fix
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/shopspring/decimal"
|
||||
|
||||
"quantex.com/qfixdpl/quickfix"
|
||||
"quantex.com/qfixdpl/quickfix/gen/enum"
|
||||
"quantex.com/qfixdpl/quickfix/gen/field"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionack"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quote"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quotestatusreport"
|
||||
filelog "quantex.com/qfixdpl/quickfix/log/file"
|
||||
"quantex.com/qfixdpl/quickfix/store/file"
|
||||
"quantex.com/qfixdpl/src/app"
|
||||
"quantex.com/qfixdpl/src/common/tracerr"
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
type listTrade struct {
|
||||
QuoteReqID string
|
||||
TradeID string
|
||||
ListID string
|
||||
Symbol string
|
||||
SecurityIDSrc enum.SecurityIDSource
|
||||
Currency string
|
||||
Side enum.Side
|
||||
OrderQty decimal.Decimal
|
||||
SettlDate string
|
||||
Price decimal.Decimal
|
||||
OwnerTraderID string
|
||||
SessionID quickfix.SessionID
|
||||
Status domain.TradeStatus
|
||||
}
|
||||
|
||||
// Manager wraps the QuickFIX initiator and implements domain.FIXSender.
|
||||
type Manager struct {
|
||||
initiator *quickfix.Initiator
|
||||
app *application
|
||||
sessionsMu sync.RWMutex
|
||||
sessions map[string]quickfix.SessionID
|
||||
tradesMu sync.RWMutex
|
||||
trades map[string]*listTrade
|
||||
store domain.PersistenceStore
|
||||
notify domain.Notifier
|
||||
cfg app.FIXConfig
|
||||
}
|
||||
|
||||
func NewManager(cfg app.FIXConfig, store domain.PersistenceStore, notify domain.Notifier) *Manager {
|
||||
return &Manager{
|
||||
sessions: make(map[string]quickfix.SessionID),
|
||||
trades: make(map[string]*listTrade),
|
||||
store: store,
|
||||
notify: notify,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) Start() error {
|
||||
fixApp := newApplication(m.notify)
|
||||
fixApp.onLogon = m.onLogon
|
||||
fixApp.onLogout = m.onLogout
|
||||
fixApp.onQuoteRequest = m.handleQuoteRequest
|
||||
fixApp.onQuoteAck = m.handleQuoteAck
|
||||
fixApp.onQuoteResponse = m.handleQuoteResponse
|
||||
fixApp.onExecutionReport = m.handleExecutionReport
|
||||
fixApp.onExecutionAck = m.handleExecutionAck
|
||||
fixApp.onRawMessage = m.handleRawMessage
|
||||
m.app = fixApp
|
||||
|
||||
if err := m.loadTrades(); err != nil {
|
||||
slog.Error("failed to load active trades from DB, starting with empty state", "error", err)
|
||||
}
|
||||
|
||||
f, err := os.Open(m.cfg.SettingsFile)
|
||||
if err != nil {
|
||||
err = tracerr.Errorf("error opening FIX settings file %q: %s", m.cfg.SettingsFile, err)
|
||||
log.Error().Msg(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
settings, err := quickfix.ParseSettings(f)
|
||||
if err != nil {
|
||||
err = tracerr.Errorf("error parsing FIX settings: %s", err)
|
||||
log.Error().Msg(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
storeFactory := file.NewStoreFactory(settings)
|
||||
logFactory, err := filelog.NewLogFactory(settings)
|
||||
if err != nil {
|
||||
err = tracerr.Errorf("error creating file log factory: %s", err)
|
||||
log.Error().Msg(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
initiator, err := quickfix.NewInitiator(fixApp, storeFactory, settings, logFactory)
|
||||
if err != nil {
|
||||
err = tracerr.Errorf("error creating FIX initiator: %s", err)
|
||||
log.Error().Msg(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
m.initiator = initiator
|
||||
|
||||
if err = m.initiator.Start(); err != nil {
|
||||
err = tracerr.Errorf("error starting FIX initiator: %s", err)
|
||||
log.Error().Msg(err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Info("FIX initiator started", "settings", m.cfg.SettingsFile)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) Stop() {
|
||||
if m.initiator != nil {
|
||||
m.initiator.Stop()
|
||||
slog.Info("FIX initiator stopped")
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) onLogon(sessionID quickfix.SessionID) {
|
||||
m.sessionsMu.Lock()
|
||||
m.sessions[sessionID.String()] = sessionID
|
||||
m.sessionsMu.Unlock()
|
||||
|
||||
// Assign the new session to all recovered trades that have no session yet.
|
||||
// This covers the case where the service was restarted mid-trade: loadTrades()
|
||||
// reconstructs the trade data but cannot recover the SessionID from the DB.
|
||||
// Since this is a single-session initiator, all active trades belong to this session.
|
||||
m.tradesMu.Lock()
|
||||
for _, trade := range m.trades {
|
||||
if trade.Status == domain.TradeStatusActive && trade.SessionID == (quickfix.SessionID{}) {
|
||||
trade.SessionID = sessionID
|
||||
}
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
}
|
||||
|
||||
func (m *Manager) onLogout(sessionID quickfix.SessionID) {
|
||||
m.sessionsMu.Lock()
|
||||
delete(m.sessions, sessionID.String())
|
||||
m.sessionsMu.Unlock()
|
||||
}
|
||||
|
||||
// sendQuoteStatusReport sends a QuoteStatusReport (35=AI) to acknowledge the incoming QuoteRequest.
|
||||
func (m *Manager) sendQuoteStatusReport(quoteReqID, ownerTraderID string, sessionID quickfix.SessionID) error {
|
||||
qsr := quotestatusreport.New(
|
||||
field.NewTransactTime(time.Now()),
|
||||
field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED),
|
||||
)
|
||||
qsr.SetQuoteReqID(quoteReqID)
|
||||
qsr.SetQuoteID(quoteReqID)
|
||||
qsr.SetSymbol("[N/A]")
|
||||
if ownerTraderID != "" {
|
||||
qsr.SetOwnerTraderID(ownerTraderID)
|
||||
}
|
||||
|
||||
return quickfix.SendToTarget(qsr, sessionID)
|
||||
}
|
||||
|
||||
// sendTradeRequestAck sends a QuoteStatusReport (35=AI) to acknowledge a trade request (TRDREQACK).
|
||||
func (m *Manager) sendTradeRequestAck(quoteReqID, quoteRespID string, sessionID quickfix.SessionID) error {
|
||||
qsr := quotestatusreport.New(
|
||||
field.NewTransactTime(time.Now()),
|
||||
field.NewQuoteStatus(enum.QuoteStatus_ACCEPTED),
|
||||
)
|
||||
qsr.SetQuoteReqID(quoteReqID)
|
||||
qsr.SetQuoteRespID(quoteRespID)
|
||||
qsr.SetSymbol("[N/A]")
|
||||
|
||||
return quickfix.SendToTarget(qsr, sessionID)
|
||||
}
|
||||
|
||||
// sendExecutionAck sends an ExecutionAck (35=BN) to acknowledge an incoming ExecutionReport.
|
||||
func (m *Manager) sendExecutionAck(orderID, clOrdID, execID string, sessionID quickfix.SessionID) error {
|
||||
bn := executionack.New(
|
||||
field.NewOrderID(orderID),
|
||||
field.NewExecID(execID),
|
||||
field.NewExecAckStatus(enum.ExecAckStatus_ACCEPTED),
|
||||
)
|
||||
bn.SetClOrdID(clOrdID)
|
||||
bn.SetSymbol("[N/A]")
|
||||
bn.SetTransactTime(time.Now())
|
||||
|
||||
return quickfix.SendToTarget(bn, sessionID)
|
||||
}
|
||||
|
||||
// handleQuoteRequest auto-responds to an incoming QuoteRequest with a QuoteStatusReport (acknowledge) followed by a Quote at price 99.6.
|
||||
func (m *Manager) handleQuoteRequest(msg quoterequest.QuoteRequest, sessionID quickfix.SessionID) {
|
||||
quoteReqID, err := msg.GetQuoteReqID()
|
||||
if err != nil {
|
||||
slog.Error("handleQuoteRequest: missing QuoteReqID", "error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Validate LST_ prefix for List Trading flow.
|
||||
if !strings.HasPrefix(quoteReqID, "LST_") {
|
||||
slog.Warn("handleQuoteRequest: QuoteReqID missing LST_ prefix, ignoring", "quoteReqID", quoteReqID)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
symbol, currency, ownerTraderID, settlDate, listID, negotiationType string
|
||||
side enum.Side
|
||||
secIDSource enum.SecurityIDSource
|
||||
orderQty decimal.Decimal
|
||||
)
|
||||
|
||||
relatedSyms, relErr := msg.GetNoRelatedSym()
|
||||
if relErr == nil && relatedSyms.Len() > 0 {
|
||||
sym := relatedSyms.Get(0)
|
||||
symbol, _ = sym.GetSecurityID()
|
||||
secIDSource, _ = sym.GetSecurityIDSource()
|
||||
currency, _ = sym.GetCurrency()
|
||||
side, _ = sym.GetSide()
|
||||
ownerTraderID, _ = sym.GetOwnerTraderID()
|
||||
orderQty, _ = sym.GetOrderQty()
|
||||
settlDate, _ = sym.GetSettlDate()
|
||||
listID, _ = sym.GetListID()
|
||||
negotiationType, _ = sym.GetNegotiationType()
|
||||
}
|
||||
|
||||
if listID == "" {
|
||||
slog.Warn("handleQuoteRequest: missing ListID", "quoteReqID", quoteReqID)
|
||||
return
|
||||
}
|
||||
|
||||
if negotiationType != "RFQ" {
|
||||
slog.Warn("handleQuoteRequest: unexpected NegotiationType", "quoteReqID", quoteReqID, "negotiationType", negotiationType)
|
||||
return
|
||||
}
|
||||
|
||||
// Step 1: Send QuoteStatusReport (35=AI) to acknowledge the inquiry.
|
||||
if ackErr := m.sendQuoteStatusReport(quoteReqID, ownerTraderID, sessionID); ackErr != nil {
|
||||
slog.Error("handleQuoteRequest: failed to send QuoteStatusReport", "quoteReqID", quoteReqID, "error", ackErr.Error())
|
||||
return
|
||||
}
|
||||
slog.Info("QuoteStatusReport sent", "quoteReqID", quoteReqID)
|
||||
|
||||
// Step 2: Build and send Quote (35=S) with price.
|
||||
price := decimal.NewFromFloat(99.6)
|
||||
|
||||
sIDSource := enum.SecurityIDSource_ISIN_NUMBER
|
||||
if secIDSource == enum.SecurityIDSource_CUSIP {
|
||||
sIDSource = enum.SecurityIDSource_CUSIP
|
||||
}
|
||||
|
||||
quoteID := quoteReqID
|
||||
q := quote.New(
|
||||
field.NewQuoteID(quoteID),
|
||||
field.NewQuoteType(enum.QuoteType_SEND_QUOTE),
|
||||
field.NewTransactTime(time.Now()),
|
||||
)
|
||||
|
||||
q.SetSymbol("[N/A]")
|
||||
q.SetSecurityID(symbol)
|
||||
q.SetSecurityIDSource(sIDSource)
|
||||
q.SetQuoteReqID(quoteReqID)
|
||||
|
||||
if currency != "" {
|
||||
q.SetCurrency(currency)
|
||||
}
|
||||
|
||||
if !orderQty.IsZero() {
|
||||
q.SetOrderQty(orderQty, 0)
|
||||
}
|
||||
|
||||
if settlDate != "" {
|
||||
q.SetSettlDate(settlDate)
|
||||
}
|
||||
|
||||
q.SetPrice(price, 8)
|
||||
|
||||
if side == enum.Side_BUY {
|
||||
q.SetOfferPx(price, 8)
|
||||
q.SetSide(enum.Side_BUY)
|
||||
} else {
|
||||
q.SetBidPx(price, 8)
|
||||
q.SetSide(enum.Side_SELL)
|
||||
}
|
||||
|
||||
q.SetPriceType(enum.PriceType_PERCENTAGE)
|
||||
|
||||
if ownerTraderID != "" {
|
||||
q.SetOwnerTraderID(ownerTraderID)
|
||||
}
|
||||
|
||||
if sendErr := quickfix.SendToTarget(q, sessionID); sendErr != nil {
|
||||
slog.Error("handleQuoteRequest: failed to send quote", "quoteReqID", quoteReqID, "error", sendErr.Error())
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("Quote sent", "quoteReqID", quoteReqID, "quoteID", quoteID, "symbol", symbol)
|
||||
|
||||
// Store trade state for subsequent steps.
|
||||
m.tradesMu.Lock()
|
||||
m.trades[quoteReqID] = &listTrade{
|
||||
QuoteReqID: quoteReqID,
|
||||
ListID: listID,
|
||||
Symbol: symbol,
|
||||
SecurityIDSrc: sIDSource,
|
||||
Currency: currency,
|
||||
Side: side,
|
||||
OrderQty: orderQty,
|
||||
SettlDate: settlDate,
|
||||
Price: price,
|
||||
OwnerTraderID: ownerTraderID,
|
||||
SessionID: sessionID,
|
||||
Status: domain.TradeStatusActive,
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
// Persist structured message (outside mutex).
|
||||
m.persistMessage(quoteReqID, parseQuoteRequest(msg))
|
||||
|
||||
// Persist outgoing QuoteStatusReport.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
|
||||
"OwnerTraderID": ownerTraderID,
|
||||
}))
|
||||
|
||||
// Persist outgoing Quote.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("S", quoteReqID, map[string]interface{}{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteID": quoteID,
|
||||
"Symbol": symbol,
|
||||
"Side": string(side),
|
||||
"Price": price.String(),
|
||||
"OrderQty": orderQty.String(),
|
||||
"Currency": currency,
|
||||
"SettlDate": settlDate,
|
||||
}))
|
||||
}
|
||||
|
||||
// handleQuoteAck handles an incoming QuoteAck (35=CW).
|
||||
func (m *Manager) handleQuoteAck(msg quoteack.QuoteAck, sessionID quickfix.SessionID) {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
status, _ := msg.GetQuoteAckStatus()
|
||||
text, _ := msg.GetText()
|
||||
|
||||
m.persistMessage(quoteReqID, parseQuoteAck(msg))
|
||||
|
||||
// QuoteAckStatus only has two defined values in TW DPL:
|
||||
// 1 = Accepted — quote delivered to client.
|
||||
// 2 = Rejected — format error, late quote, viewer busy, or race condition.
|
||||
if status == enum.QuoteAckStatus_REJECTED {
|
||||
slog.Error("handleQuoteAck: quote rejected by TW", "quoteReqID", quoteReqID, "quoteAckStatus", string(status), "text", text)
|
||||
|
||||
m.tradesMu.Lock()
|
||||
if t, ok := m.trades[quoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusRejected
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("handleQuoteAck: accepted", "quoteReqID", quoteReqID)
|
||||
}
|
||||
|
||||
// handleQuoteResponse handles an incoming QuoteResponse (35=AJ).
|
||||
// Supports _TRDREQ (trade request), _TRDEND (trade ended), and _TRDSUMM (trade summary) suffixes.
|
||||
func (m *Manager) handleQuoteResponse(msg quoteresponse.QuoteResponse, sessionID quickfix.SessionID) {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
quoteRespID, _ := msg.GetQuoteRespID()
|
||||
|
||||
slog.Info("handleQuoteResponse", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
|
||||
|
||||
isTrdReq := strings.HasSuffix(quoteRespID, "_TRDREQ")
|
||||
isTrdEnd := strings.HasSuffix(quoteRespID, "_TRDEND")
|
||||
isTrdSumm := strings.HasSuffix(quoteRespID, "_TRDSUMM")
|
||||
isListEnd := strings.HasSuffix(quoteRespID, "_LISTEND")
|
||||
|
||||
if !isTrdReq && !isTrdEnd && !isTrdSumm && !isListEnd {
|
||||
slog.Info("handleQuoteResponse: QuoteRespID has unrecognized suffix, ignoring", "quoteRespID", quoteRespID)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: handle 694=2 (Counter) for flow 8.5 (Client Countering) in the future.
|
||||
|
||||
// Always send ACK regardless of whether the trade is in our map.
|
||||
// TW will keep retrying until it receives an ACK.
|
||||
if ackErr := m.sendTradeRequestAck(quoteReqID, quoteRespID, sessionID); ackErr != nil {
|
||||
slog.Error("handleQuoteResponse: failed to send ACK", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID, "error", ackErr.Error())
|
||||
return
|
||||
}
|
||||
slog.Info("QuoteResponse ACK sent", "quoteReqID", quoteReqID, "quoteRespID", quoteRespID)
|
||||
|
||||
// Persist incoming QuoteResponse.
|
||||
m.persistMessage(quoteReqID, parseQuoteResponse(msg))
|
||||
|
||||
// Persist outgoing ACK.
|
||||
m.persistMessage(quoteReqID, buildOutgoingMessageJSON("AI", quoteReqID, map[string]interface{}{
|
||||
"QuoteReqID": quoteReqID,
|
||||
"QuoteRespID": quoteRespID,
|
||||
"QuoteStatus": string(enum.QuoteStatus_ACCEPTED),
|
||||
}))
|
||||
|
||||
// _TRDSUMM is the final message — mark trade as completed.
|
||||
if isTrdSumm {
|
||||
slog.Info("Trade summary received, marking completed", "quoteReqID", quoteReqID)
|
||||
m.tradesMu.Lock()
|
||||
if t, ok := m.trades[quoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// handleExecutionReport handles an incoming ExecutionReport (35=8).
|
||||
// In flow 8.4 (List Trading), the dealer NEVER sends an ExecutionReport — only TW does.
|
||||
// In TW DPL, ClOrdID (Tag 11) always equals the original QuoteReqID (Tag 131),
|
||||
// so we use clOrdID directly as the map lookup key.
|
||||
func (m *Manager) handleExecutionReport(msg executionreport.ExecutionReport, sessionID quickfix.SessionID) {
|
||||
execID, _ := msg.GetExecID()
|
||||
orderID, _ := msg.GetOrderID()
|
||||
clOrdID, _ := msg.GetClOrdID()
|
||||
execType, _ := msg.GetExecType()
|
||||
ordStatus, _ := msg.GetOrdStatus()
|
||||
listID, _ := msg.GetListID()
|
||||
|
||||
slog.Info("handleExecutionReport received",
|
||||
"execID", execID, "orderID", orderID, "clOrdID", clOrdID,
|
||||
"execType", string(execType), "ordStatus", string(ordStatus), "listID", listID,
|
||||
)
|
||||
|
||||
// Send ExecutionAck (35=BN) for every incoming ExecutionReport from TW.
|
||||
if ackErr := m.sendExecutionAck(orderID, clOrdID, execID, sessionID); ackErr != nil {
|
||||
slog.Error("handleExecutionReport: failed to send ExecutionAck", "execID", execID, "error", ackErr.Error())
|
||||
} else {
|
||||
slog.Info("ExecutionAck sent", "execID", execID)
|
||||
}
|
||||
|
||||
switch {
|
||||
case strings.Contains(execID, "_LISTEND"):
|
||||
slog.Info("List ended (due-in closed), awaiting trade result from TW",
|
||||
"execID", execID, "clOrdID", clOrdID)
|
||||
|
||||
case strings.Contains(execID, "_TRDEND"):
|
||||
slog.Info("Trade ended", "execID", execID, "clOrdID", clOrdID)
|
||||
|
||||
case strings.Contains(execID, "_TRDSUMM"):
|
||||
slog.Info("Trade summary received from TW, marking completed",
|
||||
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
|
||||
|
||||
m.tradesMu.Lock()
|
||||
if t, ok := m.trades[clOrdID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
case execType == enum.ExecType_TRADE:
|
||||
slog.Info("Trade result received from TW",
|
||||
"execID", execID, "clOrdID", clOrdID, "ordStatus", string(ordStatus), "listID", listID)
|
||||
}
|
||||
|
||||
// Persist incoming ExecutionReport.
|
||||
m.persistMessage(clOrdID, parseExecutionReport(msg))
|
||||
|
||||
// Persist outgoing ExecutionAck.
|
||||
m.persistMessage(clOrdID, buildOutgoingMessageJSON("BN", clOrdID, map[string]interface{}{
|
||||
"OrderID": orderID,
|
||||
"ExecID": execID,
|
||||
"ClOrdID": clOrdID,
|
||||
"ExecAckStatus": string(enum.ExecAckStatus_ACCEPTED),
|
||||
}))
|
||||
|
||||
// If the ExecutionReport carries a TradeID, persist it in qfixdpl_logs for cross-service correlation.
|
||||
tradeID, _ := msg.GetTradeID()
|
||||
if tradeID != "" {
|
||||
m.tradesMu.Lock()
|
||||
if t, ok := m.trades[clOrdID]; ok {
|
||||
t.TradeID = tradeID
|
||||
}
|
||||
m.tradesMu.Unlock()
|
||||
|
||||
if err := m.store.UpdateLogTradeID(clOrdID, tradeID); err != nil {
|
||||
slog.Error("handleExecutionReport: failed to update log trade_id", "clOrdID", clOrdID, "tradeID", tradeID, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleExecutionAck handles an incoming ExecutionAck (35=BN) from TW.
|
||||
func (m *Manager) handleExecutionAck(msg executionack.ExecutionAck, sessionID quickfix.SessionID) {
|
||||
// Logged in application.go, no further action needed.
|
||||
}
|
||||
|
||||
// GetTrades returns a snapshot of only active trades.
|
||||
func (m *Manager) GetTrades() []domain.ListTrade {
|
||||
m.tradesMu.RLock()
|
||||
defer m.tradesMu.RUnlock()
|
||||
|
||||
trades := make([]domain.ListTrade, 0, len(m.trades))
|
||||
for _, t := range m.trades {
|
||||
if t.Status != domain.TradeStatusActive {
|
||||
continue
|
||||
}
|
||||
|
||||
trades = append(trades, toListTrade(t))
|
||||
}
|
||||
|
||||
return trades
|
||||
}
|
||||
|
||||
// GetAllTrades returns a snapshot of all trades (active, rejected, completed).
|
||||
func (m *Manager) GetAllTrades() []domain.ListTrade {
|
||||
m.tradesMu.RLock()
|
||||
defer m.tradesMu.RUnlock()
|
||||
|
||||
trades := make([]domain.ListTrade, 0, len(m.trades))
|
||||
for _, t := range m.trades {
|
||||
trades = append(trades, toListTrade(t))
|
||||
}
|
||||
|
||||
return trades
|
||||
}
|
||||
|
||||
func toListTrade(t *listTrade) domain.ListTrade {
|
||||
return domain.ListTrade{
|
||||
QuoteReqID: t.QuoteReqID,
|
||||
TradeID: t.TradeID,
|
||||
ListID: t.ListID,
|
||||
Symbol: t.Symbol,
|
||||
SecurityIDSrc: string(t.SecurityIDSrc),
|
||||
Currency: t.Currency,
|
||||
Side: string(t.Side),
|
||||
OrderQty: t.OrderQty.String(),
|
||||
SettlDate: t.SettlDate,
|
||||
Price: t.Price.String(),
|
||||
OwnerTraderID: t.OwnerTraderID,
|
||||
Status: t.Status,
|
||||
}
|
||||
}
|
||||
|
||||
// handleRawMessage persists raw FIX message strings to the logs table.
|
||||
func (m *Manager) handleRawMessage(direction string, msg *quickfix.Message) {
|
||||
quoteReqID := extractIdentifier(msg)
|
||||
|
||||
if err := m.store.SaveLog(domain.LogEntry{
|
||||
QuoteReqID: quoteReqID,
|
||||
RawMsg: "[" + direction + "] " + msg.String(),
|
||||
}); err != nil {
|
||||
slog.Error("failed to persist raw log", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// persistMessage saves a structured FIX message to the messages table.
|
||||
func (m *Manager) persistMessage(quoteReqID string, fixJSON domain.FixMessageJSON) {
|
||||
if err := m.store.SaveMessage(domain.TradeMessage{
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: fixJSON,
|
||||
}); err != nil {
|
||||
slog.Error("failed to persist message", "msgType", fixJSON.MsgType, "quoteReqID", quoteReqID, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// loadTrades reconstructs all trades and their states from today's messages in the database.
|
||||
func (m *Manager) loadTrades() error {
|
||||
messages, err := m.store.GetTodayMessages()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
trades := make(map[string]*listTrade)
|
||||
|
||||
for _, msg := range messages {
|
||||
switch msg.JMessage.MsgType {
|
||||
case "R": // QuoteRequest -> trade is born
|
||||
if !strings.HasPrefix(msg.QuoteReqID, "LST_") {
|
||||
continue
|
||||
}
|
||||
|
||||
body := msg.JMessage.Body
|
||||
|
||||
nt, _ := body["NegotiationType"].(string)
|
||||
if nt != "RFQ" {
|
||||
continue
|
||||
}
|
||||
|
||||
listID, _ := body["ListID"].(string)
|
||||
if listID == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
trade := &listTrade{
|
||||
QuoteReqID: msg.QuoteReqID,
|
||||
ListID: listID,
|
||||
Status: domain.TradeStatusActive,
|
||||
}
|
||||
|
||||
if v, ok := body["SecurityID"].(string); ok {
|
||||
trade.Symbol = v
|
||||
}
|
||||
|
||||
if v, ok := body["Currency"].(string); ok {
|
||||
trade.Currency = v
|
||||
}
|
||||
|
||||
if v, ok := body["Side"].(string); ok {
|
||||
trade.Side = enum.Side(v)
|
||||
}
|
||||
|
||||
if v, ok := body["OrderQty"].(string); ok {
|
||||
trade.OrderQty, _ = decimal.NewFromString(v)
|
||||
}
|
||||
|
||||
if v, ok := body["SettlDate"].(string); ok {
|
||||
trade.SettlDate = v
|
||||
}
|
||||
|
||||
if v, ok := body["OwnerTraderID"].(string); ok {
|
||||
trade.OwnerTraderID = v
|
||||
}
|
||||
|
||||
trades[msg.QuoteReqID] = trade
|
||||
|
||||
case "CW": // QuoteAck — only status "2" (Rejected) marks the trade as rejected
|
||||
body := msg.JMessage.Body
|
||||
quoteAckStatus, _ := body["QuoteAckStatus"].(string)
|
||||
|
||||
if quoteAckStatus == string(enum.QuoteAckStatus_REJECTED) {
|
||||
if t, ok := trades[msg.QuoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusRejected
|
||||
}
|
||||
}
|
||||
|
||||
case "AJ": // QuoteResponse — _TRDSUMM means trade is done (flow 8.6)
|
||||
body := msg.JMessage.Body
|
||||
quoteRespID, _ := body["QuoteRespID"].(string)
|
||||
|
||||
if strings.HasSuffix(quoteRespID, "_TRDSUMM") {
|
||||
if t, ok := trades[msg.QuoteReqID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
}
|
||||
|
||||
case "8": // ExecutionReport — _TRDSUMM means trade is done (flow 8.4)
|
||||
body := msg.JMessage.Body
|
||||
execID, _ := body["ExecID"].(string)
|
||||
clOrdID, _ := body["ClOrdID"].(string)
|
||||
|
||||
if tid, ok := body["TradeID"].(string); ok && tid != "" {
|
||||
if t, ok := trades[clOrdID]; ok {
|
||||
t.TradeID = tid
|
||||
}
|
||||
}
|
||||
|
||||
if strings.Contains(execID, "_TRDSUMM") {
|
||||
if t, ok := trades[clOrdID]; ok {
|
||||
t.Status = domain.TradeStatusCompleted
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
active := 0
|
||||
for _, t := range trades {
|
||||
if t.Status == domain.TradeStatusActive {
|
||||
active++
|
||||
}
|
||||
}
|
||||
|
||||
m.trades = trades
|
||||
slog.Info("recovery completed", "totalTrades", len(trades), "activeTrades", active)
|
||||
|
||||
return nil
|
||||
}
|
||||
647
src/client/fix/manager_test.go
Normal file
647
src/client/fix/manager_test.go
Normal file
@ -0,0 +1,647 @@
|
||||
package fix
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"quantex.com/qfixdpl/quickfix"
|
||||
"quantex.com/qfixdpl/src/app"
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// newTestManager builds a Manager with the given store without calling Start().
|
||||
func newTestManager(store domain.PersistenceStore) *Manager {
|
||||
notify := &MockNotifier{}
|
||||
return NewManager(app.FIXConfig{}, store, notify)
|
||||
}
|
||||
|
||||
// makeMsg builds a TradeMessage with the given quoteReqID, msgType, and body.
|
||||
func makeMsg(quoteReqID, msgType string, body map[string]interface{}) domain.TradeMessage {
|
||||
return domain.TradeMessage{
|
||||
ID: "test-id-" + quoteReqID + "-" + msgType,
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: domain.FixMessageJSON{
|
||||
MsgType: msgType,
|
||||
QuoteReqID: quoteReqID,
|
||||
Body: body,
|
||||
ReceiveTime: time.Now(),
|
||||
},
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// makeQuoteRequest builds a "R" (QuoteRequest) TradeMessage.
|
||||
func makeQuoteRequest(quoteReqID, listID, nt string, extras map[string]interface{}) domain.TradeMessage {
|
||||
body := map[string]interface{}{
|
||||
"NegotiationType": nt,
|
||||
"ListID": listID,
|
||||
}
|
||||
for k, v := range extras {
|
||||
body[k] = v
|
||||
}
|
||||
return makeMsg(quoteReqID, "R", body)
|
||||
}
|
||||
|
||||
// makeQuoteAck builds a "CW" (QuoteAck) TradeMessage.
|
||||
func makeQuoteAck(quoteReqID, status string) domain.TradeMessage {
|
||||
return makeMsg(quoteReqID, "CW", map[string]interface{}{
|
||||
"QuoteAckStatus": status,
|
||||
})
|
||||
}
|
||||
|
||||
// makeQuoteResponse builds an "AJ" (QuoteResponse) TradeMessage.
|
||||
func makeQuoteResponse(quoteReqID, quoteRespID string) domain.TradeMessage {
|
||||
return makeMsg(quoteReqID, "AJ", map[string]interface{}{
|
||||
"QuoteRespID": quoteRespID,
|
||||
})
|
||||
}
|
||||
|
||||
// makeExecutionReport builds an "8" (ExecutionReport) TradeMessage.
|
||||
// clOrdID maps to body["ClOrdID"]; it is also set as TradeMessage.QuoteReqID
|
||||
// to mirror how persistMessage works in handleExecutionReport.
|
||||
func makeExecutionReport(clOrdID, execID string) domain.TradeMessage {
|
||||
return makeMsg(clOrdID, "8", map[string]interface{}{
|
||||
"ClOrdID": clOrdID,
|
||||
"ExecID": execID,
|
||||
})
|
||||
}
|
||||
|
||||
// makeOutgoing builds an outgoing message (AI, S, BN) for a given quoteReqID.
|
||||
func makeOutgoing(quoteReqID, msgType string) domain.TradeMessage {
|
||||
return makeMsg(quoteReqID, msgType, map[string]interface{}{})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 1 — DB vacia
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_EmptyDB(t *testing.T) {
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, m.trades)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 2 — Interrupcion despues de QuoteRequest
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_SingleR_CreatesOneTrade(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", map[string]interface{}{
|
||||
"SecurityID": "US1234567890",
|
||||
"Currency": "USD",
|
||||
"Side": "1",
|
||||
"OrderQty": "1000000",
|
||||
"SettlDate": "20260320",
|
||||
"OwnerTraderID": "trader1",
|
||||
})
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, m.trades, 1)
|
||||
|
||||
trade := m.trades["LST_ABC123"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, "LST_ABC123", trade.QuoteReqID)
|
||||
assert.Equal(t, "LIST_1", trade.ListID)
|
||||
assert.Equal(t, "US1234567890", trade.Symbol)
|
||||
assert.Equal(t, "USD", trade.Currency)
|
||||
assert.Equal(t, "1", string(trade.Side))
|
||||
assert.Equal(t, "20260320", trade.SettlDate)
|
||||
assert.Equal(t, "trader1", trade.OwnerTraderID)
|
||||
assert.Equal(t, domain.TradeStatusActive, trade.Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_WithOutgoingMsgs_IgnoresAI_S(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
ai := makeOutgoing("LST_ABC123", "AI")
|
||||
s := makeOutgoing("LST_ABC123", "S")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, ai, s}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_NonLSTPrefix_Ignored(t *testing.T) {
|
||||
r := makeQuoteRequest("TRD_ABC123", "LIST_1", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_NonRFQ_NegotiationType_Ignored(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "DEALER", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_EmptyListID_Ignored(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_R_MissingOptionalFields_TradeCreated(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_MIN", "LIST_MIN", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, m.trades, 1)
|
||||
|
||||
trade := m.trades["LST_MIN"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, "LST_MIN", trade.QuoteReqID)
|
||||
assert.Equal(t, "LIST_MIN", trade.ListID)
|
||||
assert.Equal(t, "", trade.Symbol)
|
||||
assert.Equal(t, "", trade.Currency)
|
||||
assert.Equal(t, "", trade.SettlDate)
|
||||
assert.Equal(t, "", trade.OwnerTraderID)
|
||||
assert.Equal(t, domain.TradeStatusActive, trade.Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 3 — Interrupcion despues de QuoteAck
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_CW_Accepted_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1") // ACCEPTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_CW_Rejected_TradeMarkedRejected(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "2") // REJECTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusRejected, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// QuoteAckStatus "0" is RECEIVED_NOT_YET_PROCESSED — not a rejection.
|
||||
// Only status "2" (Rejected) should mark the trade as rejected.
|
||||
func TestLoadTrades_CW_ReceivedNotYetProcessed_TradeStaysActive(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "0") // RECEIVED_NOT_YET_PROCESSED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_CW_WithoutPriorR_NoCrash(t *testing.T) {
|
||||
cw := makeQuoteAck("LST_ORPHAN", "2")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 4 — Interrupcion despues de QuoteResponse
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_AJ_TRDREQ_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDREQ")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_TRDEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_LISTEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_LISTEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_Q1", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_Q1", "1")
|
||||
aj := makeQuoteResponse("LST_Q1", "LST_Q1_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_Q1"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_AJ_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
|
||||
aj := makeQuoteResponse("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{aj}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 5 — Interrupcion despues de ExecutionReport
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_ExecReport_LISTEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_ABC123", "1")
|
||||
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
|
||||
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_ExecReport_TRDEND_TradeRemains(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_ABC123", "1")
|
||||
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
|
||||
execListEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_LISTEND")
|
||||
execTrdEnd := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDEND")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, execListEnd, execTrdEnd}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_ExecReport_TRDSUMM_TradeMarkedCompleted(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_ABC123", "LIST_1", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_ABC123", "1")
|
||||
aj := makeQuoteResponse("LST_ABC123", "LST_ABC123_TRDREQ")
|
||||
exec := makeExecutionReport("LST_ABC123", "LST_ABC123_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, cw, aj, exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 1)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_ABC123"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_ExecReport_TRDSUMM_WithoutPriorR_NoCrash(t *testing.T) {
|
||||
exec := makeExecutionReport("LST_ORPHAN", "LST_ORPHAN_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 6 — Multiples trades en paralelo
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_TwoTrades_BothActive(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
|
||||
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
|
||||
cw1 := makeQuoteAck("LST_TRADE1", "1")
|
||||
cw2 := makeQuoteAck("LST_TRADE2", "1")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, cw1, cw2}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 2)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE1"].Status)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_TwoTrades_OneCompleted_OneActive(t *testing.T) {
|
||||
// TRADE1: fully completed via flow 8.4 (_TRDSUMM execution report)
|
||||
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
|
||||
cw1 := makeQuoteAck("LST_TRADE1", "1")
|
||||
aj1 := makeQuoteResponse("LST_TRADE1", "LST_TRADE1_TRDREQ")
|
||||
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
|
||||
|
||||
// TRADE2: still active
|
||||
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
|
||||
cw2 := makeQuoteAck("LST_TRADE2", "1")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return(
|
||||
[]domain.TradeMessage{r1, cw1, aj1, exec1, r2, cw2}, nil,
|
||||
)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 2)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
|
||||
assert.Equal(t, domain.TradeStatusActive, m.trades["LST_TRADE2"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestLoadTrades_TwoTrades_BothCompleted(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_TRADE1", "LIST_1", "RFQ", nil)
|
||||
cw1 := makeQuoteAck("LST_TRADE1", "1")
|
||||
exec1 := makeExecutionReport("LST_TRADE1", "LST_TRADE1_TRDSUMM")
|
||||
|
||||
r2 := makeQuoteRequest("LST_TRADE2", "LIST_2", "RFQ", nil)
|
||||
cw2 := makeQuoteAck("LST_TRADE2", "1")
|
||||
exec2 := makeExecutionReport("LST_TRADE2", "LST_TRADE2_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return(
|
||||
[]domain.TradeMessage{r1, cw1, exec1, r2, cw2, exec2}, nil,
|
||||
)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, m.trades, 2)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE1"].Status)
|
||||
assert.Equal(t, domain.TradeStatusCompleted, m.trades["LST_TRADE2"].Status)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 7 — SessionID se asigna en onLogon (solo a trades activos)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_SessionID_IsZeroAfterRecovery(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, m.trades, 1)
|
||||
|
||||
trade := m.trades["LST_X"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, quickfix.SessionID{}, trade.SessionID)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestOnLogon_AssignsSessionToRecoveredActiveTrades(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
sessionID := quickfix.SessionID{
|
||||
BeginString: "FIXT.1.1",
|
||||
SenderCompID: "QFIXDPL",
|
||||
TargetCompID: "TRADEWEB",
|
||||
}
|
||||
m.onLogon(sessionID)
|
||||
|
||||
trade := m.trades["LST_X"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, sessionID, trade.SessionID)
|
||||
}
|
||||
|
||||
func TestOnLogon_DoesNotAssignSessionToCompletedTrades(t *testing.T) {
|
||||
r := makeQuoteRequest("LST_X", "LIST_1", "RFQ", nil)
|
||||
exec := makeExecutionReport("LST_X", "LST_X_TRDSUMM")
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r, exec}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
sessionID := quickfix.SessionID{
|
||||
BeginString: "FIXT.1.1",
|
||||
SenderCompID: "QFIXDPL",
|
||||
TargetCompID: "TRADEWEB",
|
||||
}
|
||||
m.onLogon(sessionID)
|
||||
|
||||
trade := m.trades["LST_X"]
|
||||
require.NotNil(t, trade)
|
||||
assert.Equal(t, quickfix.SessionID{}, trade.SessionID, "completed trades should not get session assigned")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 8 — GetTrades filtra por active, GetAllTrades devuelve todos
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestGetTrades_ReturnsOnlyActive(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
|
||||
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
|
||||
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
|
||||
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
trades := m.GetTrades()
|
||||
assert.Len(t, trades, 1)
|
||||
assert.Equal(t, "LST_ACTIVE", trades[0].QuoteReqID)
|
||||
assert.Equal(t, domain.TradeStatusActive, trades[0].Status)
|
||||
}
|
||||
|
||||
func TestGetAllTrades_ReturnsAll(t *testing.T) {
|
||||
r1 := makeQuoteRequest("LST_ACTIVE", "LIST_1", "RFQ", nil)
|
||||
r2 := makeQuoteRequest("LST_DONE", "LIST_2", "RFQ", nil)
|
||||
exec := makeExecutionReport("LST_DONE", "LST_DONE_TRDSUMM")
|
||||
r3 := makeQuoteRequest("LST_REJ", "LIST_3", "RFQ", nil)
|
||||
cw := makeQuoteAck("LST_REJ", "2") // REJECTED
|
||||
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage{r1, r2, exec, r3, cw}, nil)
|
||||
|
||||
m := newTestManager(store)
|
||||
require.NoError(t, m.loadTrades())
|
||||
|
||||
trades := m.GetAllTrades()
|
||||
assert.Len(t, trades, 3)
|
||||
|
||||
statusMap := map[string]domain.TradeStatus{}
|
||||
for _, tr := range trades {
|
||||
statusMap[tr.QuoteReqID] = tr.Status
|
||||
}
|
||||
|
||||
assert.Equal(t, domain.TradeStatusActive, statusMap["LST_ACTIVE"])
|
||||
assert.Equal(t, domain.TradeStatusCompleted, statusMap["LST_DONE"])
|
||||
assert.Equal(t, domain.TradeStatusRejected, statusMap["LST_REJ"])
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Group 9 — Error en store
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestLoadTrades_StoreError_ReturnsError_MapEmpty(t *testing.T) {
|
||||
store := &MockPersistenceStore{}
|
||||
store.On("GetTodayMessages").Return([]domain.TradeMessage(nil), errors.New("db connection failed"))
|
||||
|
||||
m := newTestManager(store)
|
||||
err := m.loadTrades()
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.NotNil(t, m.trades)
|
||||
assert.Len(t, m.trades, 0)
|
||||
store.AssertExpectations(t)
|
||||
}
|
||||
56
src/client/fix/mock_test.go
Normal file
56
src/client/fix/mock_test.go
Normal file
@ -0,0 +1,56 @@
|
||||
package fix
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
// MockPersistenceStore is a testify mock implementing domain.PersistenceStore.
|
||||
type MockPersistenceStore struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) SaveMessage(msg domain.TradeMessage) error {
|
||||
args := m.Called(msg)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) SaveLog(entry domain.LogEntry) error {
|
||||
args := m.Called(entry)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
args := m.Called()
|
||||
msgs, _ := args.Get(0).([]domain.TradeMessage)
|
||||
return msgs, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
|
||||
args := m.Called(quoteReqID)
|
||||
logs, _ := args.Get(0).(domain.Logs)
|
||||
return logs, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) UpdateLogTradeID(quoteReqID, tradeID string) error {
|
||||
args := m.Called(quoteReqID, tradeID)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockPersistenceStore) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
|
||||
args := m.Called(quoteReqID)
|
||||
log, _ := args.Get(0).(domain.FullTradeLog)
|
||||
return log, args.Error(1)
|
||||
}
|
||||
|
||||
// MockNotifier is a testify mock implementing domain.Notifier.
|
||||
type MockNotifier struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockNotifier) SendMsg(chat domain.MessageChannel, text string, status domain.MessageStatus, wg *sync.WaitGroup) {
|
||||
m.Called(chat, text, status, wg)
|
||||
}
|
||||
232
src/client/fix/parser.go
Normal file
232
src/client/fix/parser.go
Normal file
@ -0,0 +1,232 @@
|
||||
package fix
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"quantex.com/qfixdpl/quickfix"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/executionreport"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteack"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoterequest"
|
||||
"quantex.com/qfixdpl/quickfix/gen/fix50sp2/quoteresponse"
|
||||
"quantex.com/qfixdpl/quickfix/gen/tag"
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
func extractHeader(msg *quickfix.Message) map[string]interface{} {
|
||||
header := make(map[string]interface{})
|
||||
|
||||
if v, err := msg.Header.GetBytes(tag.BeginString); err == nil {
|
||||
header["BeginString"] = string(v)
|
||||
}
|
||||
if v, err := msg.Header.GetBytes(tag.MsgType); err == nil {
|
||||
header["MsgType"] = string(v)
|
||||
}
|
||||
if v, err := msg.Header.GetBytes(tag.SenderCompID); err == nil {
|
||||
header["SenderCompID"] = string(v)
|
||||
}
|
||||
if v, err := msg.Header.GetBytes(tag.TargetCompID); err == nil {
|
||||
header["TargetCompID"] = string(v)
|
||||
}
|
||||
if v, err := msg.Header.GetBytes(tag.MsgSeqNum); err == nil {
|
||||
header["MsgSeqNum"] = string(v)
|
||||
}
|
||||
if v, err := msg.Header.GetBytes(tag.SendingTime); err == nil {
|
||||
header["SendingTime"] = string(v)
|
||||
}
|
||||
|
||||
return header
|
||||
}
|
||||
|
||||
func parseQuoteRequest(msg quoterequest.QuoteRequest) domain.FixMessageJSON {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
body := map[string]interface{}{"QuoteReqID": quoteReqID}
|
||||
|
||||
if relSyms, err := msg.GetNoRelatedSym(); err == nil && relSyms.Len() > 0 {
|
||||
sym := relSyms.Get(0)
|
||||
if v, e := sym.GetSecurityID(); e == nil {
|
||||
body["SecurityID"] = v
|
||||
}
|
||||
if v, e := sym.GetSecurityIDSource(); e == nil {
|
||||
body["SecurityIDSource"] = string(v)
|
||||
}
|
||||
if v, e := sym.GetCurrency(); e == nil {
|
||||
body["Currency"] = v
|
||||
}
|
||||
if v, e := sym.GetSide(); e == nil {
|
||||
body["Side"] = string(v)
|
||||
}
|
||||
if v, e := sym.GetOrderQty(); e == nil {
|
||||
body["OrderQty"] = v.String()
|
||||
}
|
||||
if v, e := sym.GetSettlDate(); e == nil {
|
||||
body["SettlDate"] = v
|
||||
}
|
||||
if v, e := sym.GetListID(); e == nil {
|
||||
body["ListID"] = v
|
||||
}
|
||||
if v, e := sym.GetOwnerTraderID(); e == nil {
|
||||
body["OwnerTraderID"] = v
|
||||
}
|
||||
if v, e := sym.GetNegotiationType(); e == nil {
|
||||
body["NegotiationType"] = v
|
||||
}
|
||||
}
|
||||
|
||||
return domain.FixMessageJSON{
|
||||
Direction: "IN",
|
||||
MsgType: "R",
|
||||
QuoteReqID: quoteReqID,
|
||||
Header: extractHeader(msg.Message),
|
||||
Body: body,
|
||||
ReceiveTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func parseQuoteAck(msg quoteack.QuoteAck) domain.FixMessageJSON {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
body := map[string]interface{}{"QuoteReqID": quoteReqID}
|
||||
|
||||
if v, e := msg.GetQuoteID(); e == nil {
|
||||
body["QuoteID"] = v
|
||||
}
|
||||
if v, e := msg.GetQuoteAckStatus(); e == nil {
|
||||
body["QuoteAckStatus"] = string(v)
|
||||
}
|
||||
if v, e := msg.GetText(); e == nil {
|
||||
body["Text"] = v
|
||||
}
|
||||
|
||||
return domain.FixMessageJSON{
|
||||
Direction: "IN",
|
||||
MsgType: "CW",
|
||||
QuoteReqID: quoteReqID,
|
||||
Header: extractHeader(msg.Message),
|
||||
Body: body,
|
||||
ReceiveTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func parseQuoteResponse(msg quoteresponse.QuoteResponse) domain.FixMessageJSON {
|
||||
quoteReqID, _ := msg.GetQuoteReqID()
|
||||
body := map[string]interface{}{"QuoteReqID": quoteReqID}
|
||||
|
||||
if v, e := msg.GetQuoteRespID(); e == nil {
|
||||
body["QuoteRespID"] = v
|
||||
}
|
||||
if v, e := msg.GetQuoteRespType(); e == nil {
|
||||
body["QuoteRespType"] = string(v)
|
||||
}
|
||||
if v, e := msg.GetSide(); e == nil {
|
||||
body["Side"] = string(v)
|
||||
}
|
||||
if v, e := msg.GetPrice(); e == nil {
|
||||
body["Price"] = v.String()
|
||||
}
|
||||
if v, e := msg.GetOrderQty(); e == nil {
|
||||
body["OrderQty"] = v.String()
|
||||
}
|
||||
if v, e := msg.GetClOrdID(); e == nil {
|
||||
body["ClOrdID"] = v
|
||||
}
|
||||
|
||||
return domain.FixMessageJSON{
|
||||
Direction: "IN",
|
||||
MsgType: "AJ",
|
||||
QuoteReqID: quoteReqID,
|
||||
Header: extractHeader(msg.Message),
|
||||
Body: body,
|
||||
ReceiveTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func parseExecutionReport(msg executionreport.ExecutionReport) domain.FixMessageJSON {
|
||||
clOrdID, _ := msg.GetClOrdID()
|
||||
body := map[string]interface{}{"ClOrdID": clOrdID}
|
||||
|
||||
if v, e := msg.GetExecID(); e == nil {
|
||||
body["ExecID"] = v
|
||||
}
|
||||
if v, e := msg.GetOrderID(); e == nil {
|
||||
body["OrderID"] = v
|
||||
}
|
||||
if v, e := msg.GetExecType(); e == nil {
|
||||
body["ExecType"] = string(v)
|
||||
}
|
||||
if v, e := msg.GetOrdStatus(); e == nil {
|
||||
body["OrdStatus"] = string(v)
|
||||
}
|
||||
if v, e := msg.GetListID(); e == nil {
|
||||
body["ListID"] = v
|
||||
}
|
||||
if v, e := msg.GetSide(); e == nil {
|
||||
body["Side"] = string(v)
|
||||
}
|
||||
if v, e := msg.GetSymbol(); e == nil {
|
||||
body["Symbol"] = v
|
||||
}
|
||||
if v, e := msg.GetSecurityID(); e == nil {
|
||||
body["SecurityID"] = v
|
||||
}
|
||||
if v, e := msg.GetCurrency(); e == nil {
|
||||
body["Currency"] = v
|
||||
}
|
||||
if v, e := msg.GetPrice(); e == nil {
|
||||
body["Price"] = v.String()
|
||||
}
|
||||
if v, e := msg.GetLastPx(); e == nil {
|
||||
body["LastPx"] = v.String()
|
||||
}
|
||||
if v, e := msg.GetLastQty(); e == nil {
|
||||
body["LastQty"] = v.String()
|
||||
}
|
||||
if v, e := msg.GetOrderQty(); e == nil {
|
||||
body["OrderQty"] = v.String()
|
||||
}
|
||||
if v, e := msg.GetSettlDate(); e == nil {
|
||||
body["SettlDate"] = v
|
||||
}
|
||||
if v, e := msg.GetTradeID(); e == nil {
|
||||
body["TradeID"] = v
|
||||
}
|
||||
|
||||
return domain.FixMessageJSON{
|
||||
Direction: "IN",
|
||||
MsgType: "8",
|
||||
QuoteReqID: clOrdID,
|
||||
Header: extractHeader(msg.Message),
|
||||
Body: body,
|
||||
ReceiveTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// extractIdentifier extracts the trade identifier from a parsed FIX message.
|
||||
// For ExecutionReport (8) and ExecutionAck (BN), uses ClOrdID (tag 11).
|
||||
// For all other message types, uses QuoteReqID (tag 131).
|
||||
func extractIdentifier(msg *quickfix.Message) string {
|
||||
msgType, _ := msg.Header.GetBytes(tag.MsgType)
|
||||
|
||||
switch string(msgType) {
|
||||
case "8", "BN":
|
||||
var clOrdID quickfix.FIXString
|
||||
if err := msg.Body.GetField(tag.ClOrdID, &clOrdID); err == nil {
|
||||
return string(clOrdID)
|
||||
}
|
||||
default:
|
||||
var quoteReqID quickfix.FIXString
|
||||
if err := msg.Body.GetField(tag.QuoteReqID, "eReqID); err == nil {
|
||||
return string(quoteReqID)
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func buildOutgoingMessageJSON(msgType, quoteReqID string, body map[string]interface{}) domain.FixMessageJSON {
|
||||
return domain.FixMessageJSON{
|
||||
Direction: "OUT",
|
||||
MsgType: msgType,
|
||||
QuoteReqID: quoteReqID,
|
||||
Body: body,
|
||||
ReceiveTime: time.Now(),
|
||||
}
|
||||
}
|
||||
@ -132,7 +132,7 @@ func getMessage(text string, status domain.MessageStatus) string {
|
||||
'cardId': 'createCardMessage',
|
||||
'card': {
|
||||
'header': {
|
||||
'title': 'qfixdpl',
|
||||
'title': 'QFIXDPL',
|
||||
'subtitle': 'Notification',
|
||||
'imageUrl': '%s',
|
||||
'imageType': 'CIRCLE'
|
||||
|
||||
20
src/client/store/db.sql
Normal file
20
src/client/store/db.sql
Normal file
@ -0,0 +1,20 @@
|
||||
CREATE TABLE IF NOT EXISTS qfixdpl_messages (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
quote_req_id TEXT NOT NULL,
|
||||
trade_id TEXT,
|
||||
j_message JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_quote_req_id ON qfixdpl_messages(quote_req_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON qfixdpl_messages(created_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_trade_id ON qfixdpl_messages(trade_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS qfixdpl_logs (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
quote_req_id TEXT NOT NULL UNIQUE,
|
||||
trade_id TEXT,
|
||||
raw_msg TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_dpl_logs_trade_id ON qfixdpl_logs(trade_id);
|
||||
@ -2,7 +2,9 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
_ "embed"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"quantex.com.ar/multidb"
|
||||
@ -11,6 +13,9 @@ import (
|
||||
"quantex.com/qfixdpl/src/common/tracerr"
|
||||
)
|
||||
|
||||
//go:embed db.sql
|
||||
var schemaSQL string
|
||||
|
||||
const dbPingSeconds = 30
|
||||
|
||||
type Store struct {
|
||||
@ -45,9 +50,31 @@ func New(config Config) (*Store, error) {
|
||||
|
||||
go s.db.PeriodicDBPing(time.Second * dbPingSeconds)
|
||||
|
||||
if err := s.ensureTables(); err != nil {
|
||||
return nil, tracerr.Errorf("error ensuring tables: %w", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (p *Store) ensureTables() error {
|
||||
statements := strings.Split(schemaSQL, ";")
|
||||
for _, stmt := range statements {
|
||||
stmt = strings.TrimSpace(stmt)
|
||||
if stmt == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := p.db.Exec(stmt); err != nil {
|
||||
return tracerr.Errorf("error executing schema statement: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
slog.Info("database tables ensured")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Store) CloseDB() {
|
||||
p.db.Close()
|
||||
slog.Info("closing database connection.")
|
||||
|
||||
175
src/client/store/persistence.go
Normal file
175
src/client/store/persistence.go
Normal file
@ -0,0 +1,175 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"quantex.com/qfixdpl/src/common/tracerr"
|
||||
"quantex.com/qfixdpl/src/domain"
|
||||
)
|
||||
|
||||
func (p *Store) SaveMessage(msg domain.TradeMessage) error {
|
||||
jsonBytes, err := json.Marshal(msg.JMessage)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error marshaling j_message: %w", err)
|
||||
}
|
||||
|
||||
_, err = p.db.Exec(
|
||||
"INSERT INTO qfixdpl_messages (quote_req_id, trade_id, j_message) VALUES ($1, NULLIF($2, ''), $3)",
|
||||
msg.QuoteReqID, msg.TradeID, string(jsonBytes),
|
||||
)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error inserting message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Store) SaveLog(entry domain.LogEntry) error {
|
||||
upsertStmt := `INSERT INTO qfixdpl_logs (quote_req_id, raw_msg)
|
||||
VALUES ($1, $2)
|
||||
ON CONFLICT (quote_req_id) DO UPDATE
|
||||
SET raw_msg = qfixdpl_logs.raw_msg || E'\n' || EXCLUDED.raw_msg,
|
||||
updated_at = NOW()`
|
||||
|
||||
_, err := p.db.Exec(upsertStmt, entry.QuoteReqID, entry.RawMsg)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error upserting log: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Store) GetTodayMessages() ([]domain.TradeMessage, error) {
|
||||
rows, err := p.db.Query(
|
||||
"SELECT id, quote_req_id, trade_id, j_message, created_at FROM qfixdpl_messages WHERE created_at >= current_date ORDER BY created_at ASC",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, tracerr.Errorf("error querying today messages: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var messages []domain.TradeMessage
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
id, quoteReqID string
|
||||
tradeID *string
|
||||
jMessageRaw []byte
|
||||
createdAt time.Time
|
||||
)
|
||||
|
||||
if err := rows.Scan(&id, "eReqID, &tradeID, &jMessageRaw, &createdAt); err != nil {
|
||||
return nil, tracerr.Errorf("error scanning message row: %w", err)
|
||||
}
|
||||
|
||||
var jMessage domain.FixMessageJSON
|
||||
if err := json.Unmarshal(jMessageRaw, &jMessage); err != nil {
|
||||
return nil, tracerr.Errorf("error unmarshaling j_message: %w", err)
|
||||
}
|
||||
|
||||
msg := domain.TradeMessage{
|
||||
ID: id,
|
||||
QuoteReqID: quoteReqID,
|
||||
JMessage: jMessage,
|
||||
CreatedAt: createdAt,
|
||||
}
|
||||
|
||||
if tradeID != nil {
|
||||
msg.TradeID = *tradeID
|
||||
}
|
||||
|
||||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, tracerr.Errorf("error iterating message rows: %w", err)
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (p *Store) UpdateLogTradeID(quoteReqID, tradeID string) error {
|
||||
_, err := p.db.Exec(
|
||||
"UPDATE qfixdpl_logs SET trade_id = $1, updated_at = NOW() WHERE quote_req_id = $2",
|
||||
tradeID, quoteReqID,
|
||||
)
|
||||
if err != nil {
|
||||
return tracerr.Errorf("error updating log trade_id: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Store) GetFullTradeLog(quoteReqID string) (domain.FullTradeLog, error) {
|
||||
rows, err := p.db.Query(
|
||||
"SELECT trade_id, raw_msg FROM qfixdpl_logs WHERE quote_req_id = $1", quoteReqID,
|
||||
)
|
||||
if err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error querying dpl log: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("no log found for quoteReqID: %s", quoteReqID)
|
||||
}
|
||||
|
||||
var (
|
||||
tradeID *string
|
||||
rawMsg string
|
||||
)
|
||||
|
||||
if err := rows.Scan(&tradeID, &rawMsg); err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error scanning dpl log row: %w", err)
|
||||
}
|
||||
|
||||
result := domain.FullTradeLog{
|
||||
QuoteReqID: quoteReqID,
|
||||
DPLEntries: strings.Split(rawMsg, "\n"),
|
||||
}
|
||||
|
||||
if tradeID != nil && *tradeID != "" {
|
||||
result.TradeID = *tradeID
|
||||
|
||||
ptRows, err := p.db.Query(
|
||||
"SELECT raw_msg FROM qfixpt_logs WHERE trade_id = $1", *tradeID,
|
||||
)
|
||||
if err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error querying pt logs: %w", err)
|
||||
}
|
||||
defer ptRows.Close()
|
||||
|
||||
if ptRows.Next() {
|
||||
var ptRawMsg string
|
||||
if err := ptRows.Scan(&ptRawMsg); err != nil {
|
||||
return domain.FullTradeLog{}, tracerr.Errorf("error scanning pt log row: %w", err)
|
||||
}
|
||||
|
||||
result.PTEntries = strings.Split(ptRawMsg, "\n")
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (p *Store) GetLogsByQuoteReqID(quoteReqID string) (domain.Logs, error) {
|
||||
selectStmt := "SELECT raw_msg FROM qfixdpl_logs WHERE quote_req_id = '" + quoteReqID + "';"
|
||||
|
||||
response, err := p.db.Query(selectStmt)
|
||||
if err != nil {
|
||||
return domain.Logs{}, tracerr.Errorf("error querying logs: %w", err)
|
||||
}
|
||||
defer response.Close()
|
||||
|
||||
if !response.Next() {
|
||||
return domain.Logs{}, nil
|
||||
}
|
||||
|
||||
var rawMsg string
|
||||
if err := response.Scan(&rawMsg); err != nil {
|
||||
return domain.Logs{}, tracerr.Errorf("error scanning log row: %w", err)
|
||||
}
|
||||
|
||||
return domain.Logs{Entries: strings.Split(rawMsg, "\n")}, nil
|
||||
}
|
||||
@ -8,6 +8,7 @@ import (
|
||||
"quantex.com/qfixdpl/src/app"
|
||||
"quantex.com/qfixdpl/src/client/api/rest"
|
||||
"quantex.com/qfixdpl/src/client/data"
|
||||
"quantex.com/qfixdpl/src/client/fix"
|
||||
googlechat "quantex.com/qfixdpl/src/client/notify/google"
|
||||
"quantex.com/qfixdpl/src/client/store"
|
||||
"quantex.com/qfixdpl/src/client/store/external"
|
||||
@ -38,6 +39,12 @@ func Runner(cfg app.Config) error {
|
||||
|
||||
userData := data.New()
|
||||
|
||||
fixManager := fix.NewManager(cfg.FIX, appStore, notify)
|
||||
if err = fixManager.Start(); err != nil {
|
||||
return fmt.Errorf("error starting FIX acceptor: %w", err)
|
||||
}
|
||||
defer fixManager.Stop()
|
||||
|
||||
apiConfig := rest.Config{
|
||||
Port: cfg.APIBasePort,
|
||||
AllowedOrigins: cfg.AllowedOrigins,
|
||||
@ -46,7 +53,7 @@ func Runner(cfg app.Config) error {
|
||||
EnableJWTAuth: cfg.EnableJWTAuth,
|
||||
}
|
||||
|
||||
api := rest.New(userData, appStore, apiConfig, notify)
|
||||
api := rest.New(userData, appStore, fixManager, apiConfig, notify)
|
||||
api.Run()
|
||||
|
||||
cmd.WaitForInterruptSignal(nil)
|
||||
|
||||
77
src/domain/persistence.go
Normal file
77
src/domain/persistence.go
Normal file
@ -0,0 +1,77 @@
|
||||
// Package domain defines all the domain models
|
||||
package domain
|
||||
|
||||
import "time"
|
||||
|
||||
// TradeStatus represents the lifecycle state of a List Trading trade.
|
||||
type TradeStatus string
|
||||
|
||||
const (
|
||||
TradeStatusActive TradeStatus = "active"
|
||||
TradeStatusRejected TradeStatus = "rejected"
|
||||
TradeStatusCompleted TradeStatus = "completed"
|
||||
)
|
||||
|
||||
// ListTrade es la representacion exportada de un trade de List Trading.
|
||||
type ListTrade struct {
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
TradeID string `json:"trade_id,omitempty"`
|
||||
ListID string `json:"list_id"`
|
||||
Symbol string `json:"symbol"`
|
||||
SecurityIDSrc string `json:"security_id_src"`
|
||||
Currency string `json:"currency"`
|
||||
Side string `json:"side"`
|
||||
OrderQty string `json:"order_qty"`
|
||||
SettlDate string `json:"settl_date"`
|
||||
Price string `json:"price"`
|
||||
OwnerTraderID string `json:"owner_trader_id"`
|
||||
Status TradeStatus `json:"status"`
|
||||
}
|
||||
|
||||
// FixMessageJSON es la representacion estructurada de un mensaje FIX para almacenamiento.
|
||||
type FixMessageJSON struct {
|
||||
Direction string `json:"direction"`
|
||||
MsgType string `json:"msg_type"`
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
Header map[string]interface{} `json:"header"`
|
||||
Body map[string]interface{} `json:"body"`
|
||||
ReceiveTime time.Time `json:"receive_time"`
|
||||
}
|
||||
|
||||
// TradeMessage es una fila de qfixdpl_messages.
|
||||
type TradeMessage struct {
|
||||
ID string `json:"id"`
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
TradeID string `json:"trade_id,omitempty"`
|
||||
JMessage FixMessageJSON `json:"j_message"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// LogEntry es el DTO para insertar/actualizar un log crudo en qfixdpl_logs.
|
||||
type LogEntry struct {
|
||||
QuoteReqID string
|
||||
RawMsg string
|
||||
}
|
||||
|
||||
// Logs es la respuesta del endpoint GET /trades/:quoteReqID/logs.
|
||||
type Logs struct {
|
||||
Entries []string `json:"entries"`
|
||||
}
|
||||
|
||||
// FullTradeLog es la respuesta del endpoint GET /trades/:id/full-log.
|
||||
type FullTradeLog struct {
|
||||
QuoteReqID string `json:"quote_req_id"`
|
||||
TradeID string `json:"trade_id,omitempty"`
|
||||
DPLEntries []string `json:"dpl_entries"`
|
||||
PTEntries []string `json:"pt_entries,omitempty"`
|
||||
}
|
||||
|
||||
// PersistenceStore define la interfaz de persistencia.
|
||||
type PersistenceStore interface {
|
||||
SaveMessage(msg TradeMessage) error
|
||||
SaveLog(entry LogEntry) error
|
||||
GetTodayMessages() ([]TradeMessage, error)
|
||||
GetLogsByQuoteReqID(quoteReqID string) (Logs, error)
|
||||
UpdateLogTradeID(quoteReqID, tradeID string) error
|
||||
GetFullTradeLog(id string) (FullTradeLog, error)
|
||||
}
|
||||
@ -42,7 +42,7 @@ if COMMIT_MSG=$(QUANTEX_ENVIRONMENT=$ENV "${OUT_PATH}/qfixdpl" -v 2>/dev/null);
|
||||
echo "$COMMIT_MSG"
|
||||
else
|
||||
echo "---------------------------------"
|
||||
echo "Skeleton"
|
||||
echo "QFIXDPL"
|
||||
echo "Built at: ${BUILT_TIME}"
|
||||
echo "Branch: ${BUILD_BRANCH}"
|
||||
echo "SHA: ${BUILD_HASH}"
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
set -e
|
||||
|
||||
read -r -p "Issuer: " ISSUER
|
||||
read -r -p "Service (e.g. SKELETON): " SERVICE
|
||||
read -r -p "Service (e.g. QFIXDPL): " SERVICE
|
||||
read -r -p "Token: " TOKEN
|
||||
read -r -p "Expire (e.g. 24h) [none]: " EXPIRY
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
|
||||
systemctl daemon-reload
|
||||
systemctl restart skeleton.service
|
||||
systemctl restart qfixdpl.service
|
||||
Reference in New Issue
Block a user