first commit
This commit is contained in:
273
src/client/store/external/manager.go
vendored
Normal file
273
src/client/store/external/manager.go
vendored
Normal file
@ -0,0 +1,273 @@
|
||||
// Package external defines all external services access
|
||||
package external
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/sasha-s/go-deadlock"
|
||||
|
||||
"quantex.com/qfixpt/src/app"
|
||||
"quantex.com/qfixpt/src/app/version"
|
||||
jwttoken "quantex.com/qfixpt/src/common/jwttoken"
|
||||
"quantex.com/qfixpt/src/domain"
|
||||
)
|
||||
|
||||
const (
|
||||
sTimeout = 10
|
||||
lTimeout = 15
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
QApixPort string
|
||||
QApixHost string
|
||||
External map[string]app.ExtAuth
|
||||
QApixToken string
|
||||
EnableJWTAuth bool
|
||||
}
|
||||
|
||||
type cacheItem struct {
|
||||
Time time.Time
|
||||
Resp []byte
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
config Config
|
||||
notifier domain.Notifier
|
||||
cache map[string]cacheItem
|
||||
mutex deadlock.RWMutex
|
||||
}
|
||||
|
||||
type RequestOptions struct {
|
||||
Method string
|
||||
Path string
|
||||
URL string
|
||||
Token string
|
||||
Secured bool
|
||||
Body interface{}
|
||||
Retries int
|
||||
Timeout time.Duration
|
||||
CacheDuration time.Duration
|
||||
Auth *app.ExtAuth
|
||||
SessionToken string
|
||||
}
|
||||
|
||||
// NewManager create new Manager struct
|
||||
func NewManager(n domain.Notifier, cfg Config) *Manager {
|
||||
return &Manager{
|
||||
config: cfg,
|
||||
notifier: n,
|
||||
cache: make(map[string]cacheItem),
|
||||
}
|
||||
}
|
||||
|
||||
//revive:disable:argument-limit we need this arguments
|
||||
func (s *Manager) sendRequestToExternal(opts RequestOptions) ([]byte, error) {
|
||||
host := fmt.Sprintf("http://localhost:%v", opts.Auth.Port)
|
||||
if opts.Auth.Host != "" {
|
||||
host = opts.Auth.Host
|
||||
}
|
||||
|
||||
token := s.config.QApixToken
|
||||
|
||||
secured := false
|
||||
|
||||
if s.config.EnableJWTAuth && opts.Auth != nil {
|
||||
t, err := jwttoken.Encrypt(*opts.Auth)
|
||||
if err != nil {
|
||||
e := fmt.Errorf("error encrypting quantexService: %w", err)
|
||||
log.Error().Msg(e.Error())
|
||||
|
||||
return nil, e
|
||||
}
|
||||
|
||||
token = t
|
||||
|
||||
secured = true
|
||||
}
|
||||
|
||||
opts.Secured = secured
|
||||
opts.Token = token
|
||||
opts.URL = urlFrom(host, opts.Path)
|
||||
|
||||
return s.sendRequestWithCache(opts)
|
||||
}
|
||||
|
||||
func (s *Manager) sendRequestWithCache(opts RequestOptions) ([]byte, error) {
|
||||
sha := optionSha(opts)
|
||||
|
||||
if opts.CacheDuration > 0 && len(sha) > 0 {
|
||||
s.mutex.RLock()
|
||||
t, ok := s.cache[sha]
|
||||
s.mutex.RUnlock()
|
||||
|
||||
if ok && time.Since(t.Time) < opts.CacheDuration {
|
||||
return t.Resp, nil
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := s.sendRequestWithRetries(opts)
|
||||
if err == nil && len(sha) > 0 {
|
||||
s.mutex.Lock()
|
||||
s.cache[sha] = cacheItem{
|
||||
time.Now(),
|
||||
resp,
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
//revive:disable:flag-parameter we need this flag
|
||||
//nolint:funlen //it's long but easy to read
|
||||
func (s *Manager) sendRequestWithRetries(opts RequestOptions) ([]byte, error) {
|
||||
client := &http.Client{
|
||||
Timeout: opts.Timeout,
|
||||
}
|
||||
|
||||
bodyBytes, err := json.Marshal(&opts.Body)
|
||||
if err != nil {
|
||||
e := fmt.Errorf("error encoding the body: %v, %w", opts.Body, err)
|
||||
slog.Error(e.Error())
|
||||
|
||||
return nil, e
|
||||
}
|
||||
|
||||
slog.Debug("sending request to: " + opts.URL)
|
||||
|
||||
request, err := http.NewRequest(opts.Method, opts.URL, bytes.NewBuffer(bodyBytes)) //nolint:noctx //no ctx needed
|
||||
if err != nil {
|
||||
e := fmt.Errorf("error creating new %s request to: %s, %w", opts.Method, opts.URL, err)
|
||||
slog.Error(e.Error())
|
||||
|
||||
return nil, e
|
||||
}
|
||||
|
||||
authorization := "Bearer " + opts.Token
|
||||
if opts.Secured {
|
||||
authorization = opts.Token
|
||||
}
|
||||
|
||||
request.Header.Set("Authorization", authorization)
|
||||
|
||||
request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
if opts.SessionToken != "" {
|
||||
// Create a cookie
|
||||
cookie := &http.Cookie{
|
||||
Name: "session_token",
|
||||
Value: opts.SessionToken,
|
||||
Path: "/",
|
||||
}
|
||||
|
||||
// Add the cookie to the request
|
||||
request.AddCookie(cookie)
|
||||
}
|
||||
|
||||
var resp *http.Response
|
||||
for i := 0; i <= opts.Retries; i++ { //nolint:wsl // It's ok in this case
|
||||
interval := time.Duration(i) * time.Second
|
||||
slog.Debug(fmt.Sprintf("request to '%s' try #%v in %v", request.URL, i, interval))
|
||||
time.Sleep(interval)
|
||||
|
||||
resp, err = client.Do(request)
|
||||
if err != nil {
|
||||
e := fmt.Errorf("error making request to %s. error: %w", request.URL, err)
|
||||
slog.Error(e.Error())
|
||||
|
||||
// send notification if notifier is available
|
||||
s.sendNotification(domain.MessageChannelError, e.Error(), domain.MessageStatusWarning, nil)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
err = fmt.Errorf("error: '%s' response is nil", request.URL)
|
||||
slog.Error(err.Error())
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
slog.Error("error closing response body at sendRequestWithRetries: " + err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
msg := fmt.Sprintf("response code from %s is not 200. StatusCode: %d.", opts.URL, resp.StatusCode)
|
||||
// send notification if notifier is available
|
||||
s.sendNotification(domain.MessageChannelError, msg, domain.MessageStatusWarning, nil)
|
||||
|
||||
return nil, fmt.Errorf("error %s", msg)
|
||||
}
|
||||
|
||||
bodyBytes, resErr := io.ReadAll(resp.Body)
|
||||
if resErr != nil {
|
||||
msg := fmt.Sprintf("error reading response from %s. error: %s", opts.URL, resErr.Error())
|
||||
// send notification if notifier is available
|
||||
s.sendNotification(domain.MessageChannelError, msg, domain.MessageStatusWarning, nil)
|
||||
|
||||
return nil, fmt.Errorf("error %s", msg)
|
||||
}
|
||||
|
||||
return bodyBytes, nil
|
||||
}
|
||||
|
||||
//revive:enable
|
||||
|
||||
// sendNotification is a nil-safe wrapper around the notifier's SendMsg method.
|
||||
// Some call sites run in contexts where Manager.notifier may be nil, so guard the call
|
||||
// to avoid runtime panics.
|
||||
func (s *Manager) sendNotification(channel domain.MessageChannel, message string, status domain.MessageStatus, wg *sync.WaitGroup) {
|
||||
if s == nil || s.notifier == nil {
|
||||
slog.Debug("notifier is nil, skipping SendMsg")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
s.notifier.SendMsg(channel, message, status, wg)
|
||||
}
|
||||
|
||||
func optionSha(opts RequestOptions) string {
|
||||
key := fmt.Sprintf("%s-%s-%s", opts.Method, opts.URL, opts.Body)
|
||||
|
||||
return shaString(key)
|
||||
}
|
||||
|
||||
func shaString(s string) (sha string) {
|
||||
hash := sha256.New()
|
||||
if _, err := hash.Write([]byte(s)); err != nil {
|
||||
err = fmt.Errorf("error generating hash at generateShaString, error: %w", err)
|
||||
slog.Error(err.Error())
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
return hex.EncodeToString(hash.Sum(nil))
|
||||
}
|
||||
|
||||
func urlFrom(host, path string) string {
|
||||
from := strings.ToLower(version.AppName)
|
||||
|
||||
url := fmt.Sprintf("%s%s?from=%s", host, path, from)
|
||||
if strings.Contains(path, "?") {
|
||||
url = fmt.Sprintf("%s%s&from=%s", host, path, from)
|
||||
}
|
||||
|
||||
return url
|
||||
}
|
||||
Reference in New Issue
Block a user