This commit is contained in:
Ramiro Paz
2026-03-09 15:09:06 -03:00
parent 8299f8bc96
commit 0e8fe168ef
85 changed files with 14079 additions and 0 deletions

View File

@ -0,0 +1,21 @@
// Package async defines functions to assist application with parallel jobs
package async
//go:generate go-enum -f=$GOFILE --lower --marshal
import (
"time"
)
//nolint:varcheck // This is ok to keep here for future use. Remove this comment when start using qos constant.
const (
qos = 2 // Quality of Service. 2 -> Only once
retryInterval = time.Second * 5
)
// Origin specify where is the server located
// ENUM(
// Local
// Server
// )
type Origin int //nolint:recvcheck // The methods of this are autogenerated

View File

@ -0,0 +1,186 @@
package async
import (
"encoding/json"
"fmt"
blog "log"
"log/slog"
"os"
"path"
"strconv"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/golang-jwt/jwt/v5"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
const defaultTokenExpireSeconds = 60
type MQTTManager struct {
client mqtt.Client
config app.Async
notify domain.Notifier
}
func New(cfg app.Async, n domain.Notifier) *MQTTManager {
manager := &MQTTManager{
config: cfg,
notify: n,
}
manager.client = manager.newClient(version.AppName+strconv.FormatInt(time.Now().Unix(), 10), nil)
return manager
}
func (m *MQTTManager) Start() {
token := m.client.Connect()
if token.Wait() && token.Error() != nil {
slog.Error("Error trying to connect to broker = " + token.Error().Error())
return
}
slog.Info("MQTT Client successfully created!")
}
//nolint:ireturn,nolintlint // We don't control this
func (m *MQTTManager) newClient(clientID string, onConnectHandler mqtt.OnConnectHandler) mqtt.Client {
mqtt.ERROR = blog.New(os.Stdout, "", 0)
clientID = clientID + "_" + strconv.Itoa(time.Now().Nanosecond())
opts := mqtt.NewClientOptions()
err := checkMQTTConfig(m.config)
if err != nil {
panic(err.Error())
}
mqttBkr := fmt.Sprintf("%s://%s", m.config.Protocol, m.config.URL)
opts.AddBroker(mqttBkr).SetClientID(clientID)
opts.SetAutoReconnect(true)
opts.SetKeepAlive(5 * time.Second)
opts.SetPingTimeout(2 * time.Second)
opts.SetMaxReconnectInterval(2 * time.Second)
opts.SetDefaultPublishHandler(msgHandler)
opts.SetCredentialsProvider(m.credentialHandler)
opts.OnConnectionLost = m.connectionLostHandler
if onConnectHandler != nil {
opts.SetOnConnectHandler(onConnectHandler)
}
return mqtt.NewClient(opts)
}
func (m *MQTTManager) credentialHandler() (username, password string) {
token, err := generateMqttToken(version.AppName, m.config.Secret, defaultTokenExpireSeconds)
if err != nil {
msg := tracerr.Errorf("Error getting token = %w", err)
slog.Error(msg.Error())
m.notify.SendMsg(domain.MessageChannelError, msg.Error(), domain.MessageStatusStopper, nil)
return "", ""
}
return version.AppName, token
}
func (m *MQTTManager) Subscribe(topic string, handler func(topic string, msg []byte)) {
t := path.Join(m.config.Subdomain, "quantex/", topic)
token := m.client.Subscribe(t, qos, func(_ mqtt.Client, msg mqtt.Message) {
handler(msg.Topic(), msg.Payload())
})
m.CheckSubscribe(token, t)
}
func (m *MQTTManager) Publish(topic string, msg any) {
payload, err := json.Marshal(msg)
if err != nil {
slog.Error("error. could not send alert msg: " + err.Error())
return
}
// Publish a message
t := path.Join(m.config.Subdomain, "quantex/", topic)
token := m.client.Publish(t, qos, false, payload)
m.CheckPublish(token, t, payload)
}
func (m *MQTTManager) connectionLostHandler(client mqtt.Client, reason error) {
opts := client.OptionsReader()
msg := fmt.Sprintf("MQTT Connection lost for client: %s. Reason: %s", opts.ClientID(), reason.Error())
slog.Warn(msg)
m.notify.SendMsg(domain.MessageChannelError, msg, domain.MessageStatusWarning, nil)
}
func msgHandler(_ mqtt.Client, msg mqtt.Message) {
slog.Info(fmt.Sprintf("Message received: [mqtt] -> [A] | received: '%s' topic: '%s'",
msg.Payload(), msg.Topic()))
}
func generateMqttToken(user, secret string, exp int64) (string, error) {
// Create a new token object, specifying signing method and the claims
// you would like it to contain.
now := time.Now().Unix()
token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{
"username": user,
"exp": now + exp,
"iat": now,
})
// Sign and get the complete encoded token as a string using the secret
tokenString, err := token.SignedString([]byte(secret))
if err != nil {
e := tracerr.Errorf("error generating the MQTT token %w", err)
return "", e
}
return tokenString, nil
}
// CheckPublish check the result of publish action.
// For asynchronous check use:
// go CheckPublish(token, topic, payload)
func (m *MQTTManager) CheckPublish(token mqtt.Token, topic string, payload []byte) {
slog.Info("[mqtt: %s] <- %s\n", topic, string(payload))
errMsg := tracerr.Errorf("MQTT Publish Error. Topic: %s. Error: %w", topic, token.Error())
m.checkToken(token, errMsg)
}
// CheckSubscribe check the result of subscribe action.
// For asynchronous check use:
// go CheckSubscribe(token, topic)
func (m *MQTTManager) CheckSubscribe(token mqtt.Token, topic string) {
slog.Info("subscribing to [mqtt: " + topic + "]")
errMsg := tracerr.Errorf("MQTT Subscriber Error. Topic: %s. Error: %w", topic, token.Error())
m.checkToken(token, errMsg)
}
func (m *MQTTManager) checkToken(token mqtt.Token, errMsg error) {
if token.Wait() && token.Error() != nil {
err := tracerr.Errorf("checkToken error: %w", errMsg)
slog.Error(err.Error())
m.notify.SendMsg(domain.MessageChannelError, err.Error(), domain.MessageStatusWarning, nil)
}
}
func checkMQTTConfig(config app.Async) error {
if config.Protocol == "" ||
config.URL == "" ||
config.Subdomain == "" ||
config.Secret == "" {
return tracerr.Errorf("mqtt configuration is needed: Protocol, URL, Subdomain and/or Secret are empty")
}
return nil
}

View File

@ -0,0 +1,290 @@
package rest
import (
"fmt"
"log/slog"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis"
"github.com/sasha-s/go-deadlock"
uuid "github.com/satori/go.uuid"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version"
"quantex.com/qfixdpl/src/client/store"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
const (
ProdEnv = "prod"
TokenExpireTime = 1200 * time.Second
)
const (
responseKey = "responseKey"
sessionTokenKey = "sessionTokenKey"
)
type Controller struct {
pool *redis.Pool
userData app.UserDataProvider
store *store.Store
config Config
notify domain.Notifier
authMutex deadlock.Mutex
}
func newController(pool *redis.Pool, userData app.UserDataProvider,
s *store.Store, config Config, n domain.Notifier,
) *Controller {
return &Controller{
pool: pool,
userData: userData,
store: s,
config: config,
notify: n,
}
}
func (cont *Controller) GetUser(ctx *gin.Context) app.User {
// This is set on the AuthRequired middleware
response, ok := ctx.Get(responseKey)
if !ok {
// TODO log this issue
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "internal server error"})
}
val, ok := response.([]uint8)
if !ok {
// TODO log this issue
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: "internal server error"})
}
return cont.userData.GetUserByEmail(string(val))
}
// Login godoc
// @Summary Login
// @Description Authenticate a User using credentials
// @Tags auth
// @Accept json
// @Produce json
// @Param credentials body Credentials true "Authentication"
// @Success 200 {object} Session
// @Failure 400 {object} HTTPError
// @Failure 401 {object} HTTPError
// @Failure 404 {object} HTTPError
// @Failure 500 {object} HTTPError
// @Router /auth/login [post]
func (cont *Controller) Login(ctx *gin.Context) {
defer cont.authMutex.Unlock()
cont.authMutex.Lock()
setHeaders(ctx, cont.config)
// Get the JSON body and decode into credentials
var creds Credentials
if err := ctx.ShouldBindJSON(&creds); err != nil {
ctx.JSON(http.StatusBadRequest, HTTPError{Error: err.Error()})
return
}
// Get the expected Password from our in memory map
expectedUser := cont.userData.GetUserByEmail(creds.Email)
// If a Password exists for the given User
// AND, if it is the same as the Password we received, the we can move ahead
// if NOT, then we return an "Unauthorized" status
if expectedUser.Email == "" || expectedUser.Password != creds.Password {
ctx.JSON(http.StatusUnauthorized, HTTPError{Error: "Invalid credentials"})
return
}
// Create a new random session token
sessionToken := uuid.NewV4().String()
// Set the token in the cache, along with the User whom it represents
// The token has an expiry time of 120 seconds
conn := cont.pool.Get()
defer func() {
err := conn.Close()
if err != nil {
e := tracerr.Errorf("error closing connection: %w", err)
slog.Error(e.Error())
}
}()
_, err := conn.Do("SETEX", sessionToken, TokenExpireTime, creds.Email)
if err != nil {
slog.Error(tracerr.Errorf("Error setting token in redis cache: %w", err).Error())
// If there is an error in setting the cache, return an internal server error
ctx.JSON(http.StatusInternalServerError, HTTPError{Error: err.Error()})
return
}
// Finally, we set the client cookie for "session_token" as the session token we just generated
// we also set an expiry time of 120 seconds, the same as the cache
// Finally, we set the client cookie for "session_token" as the session token we just generated
// we also set an expiry time of TokenExpireTime, the same as the cache
cookie := &http.Cookie{
Name: "session_token",
Value: sessionToken,
HttpOnly: true,
Path: "/",
Secure: true,
Expires: time.Now().Add(TokenExpireTime),
}
if version.Environment() == version.EnvironmentTypeDev {
cookie.SameSite = http.SameSiteNoneMode
}
http.SetCookie(ctx.Writer, cookie)
ctx.JSON(http.StatusOK, Session{Email: creds.Email})
}
// Refresh godoc
// @Summary Refresh the authorization token
// @Description This endpoint must be called periodically to get a new token before the current one expires
// @Tags auth
// @Accept json
// @Produce json
// @Success 200 {object} Msg
// @Failure 400 {object} HTTPError
// @Failure 401 {object} HTTPError
// @Failure 404 {object} HTTPError
// @Failure 500 {object} HTTPError
// @Router /auth/refresh [get]
func (cont *Controller) Refresh(ctx *gin.Context) {
defer cont.authMutex.Unlock()
cont.authMutex.Lock()
setHeaders(ctx, cont.config)
// (BEGIN) The code until this point is the same as the first part of the `Welcome` route
response, ok1 := ctx.Get(responseKey)
if !ok1 {
ctx.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"})
}
// (END) The code uptil this point is the same as the first part of the `Welcome` route
// Now, create a new session token for the current User
newSessionToken := uuid.NewV4().String()
conn := cont.pool.Get()
defer func() {
err := conn.Close()
if err != nil {
e := tracerr.Errorf("error closing connection: %w", err)
slog.Error(e.Error())
}
}()
_, err := conn.Do("SETEX", newSessionToken, "120", fmt.Sprintf("%s", response))
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
// Set the new token as the Users `session_token` cookie
cookie := &http.Cookie{
Name: "session_token",
Value: newSessionToken,
HttpOnly: true,
Path: "/",
Secure: true,
Expires: time.Now().Add(TokenExpireTime),
}
if version.Environment() == version.EnvironmentTypeDev {
cookie.SameSite = http.SameSiteNoneMode
}
http.SetCookie(ctx.Writer, cookie)
ctx.JSON(http.StatusOK, Msg{Text: "Token updated"})
}
// HealthCheck godoc
// @Summary Health check
// @Description Return service health
// @Tags health
// @Produce json
// @Success 200 {object} map[string]string
// @Router /health [get]
func (cont *Controller) HealthCheck(ctx *gin.Context) {
// ensure CORS and other headers are set consistently
setHeaders(ctx, cont.config)
status := struct {
Status string `json:"status"`
Build string `json:"build"`
Sha string `json:"sha"`
JwtAuthentications string `json:"jwtAuthentications,omitempty"`
}{
Status: "ok",
Build: version.BuildBranch(),
Sha: version.BuildHash(),
}
// Only check JWT authentication if enabled
if cont.config.EnableJWTAuth {
status.JwtAuthentications = "ok"
user, err := cont.store.UserByEmail("fede")
if err != nil || user == nil {
status.JwtAuthentications = "error"
status.Status = "degraded"
err = tracerr.Errorf("error fetching user: %w", err)
slog.Error(err.Error())
ctx.JSON(http.StatusInternalServerError, status)
return
}
}
// return a minimal JSON health response
ctx.JSON(http.StatusOK, status)
}
// revive:disable:cyclomatic // We need this complexity
func setHeaders(ctx *gin.Context, config Config) {
origin := ctx.Request.Header.Get("Origin")
if allowed(origin, config) {
ctx.Writer.Header().Set("Access-Control-Allow-Origin", origin)
ctx.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
ctx.Writer.Header().Set("Access-Control-Allow-Headers",
"Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin,"+
"UserCache-Control, X-Requested-With")
ctx.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT")
}
}
func allowed(origin string, config Config) bool {
if version.Environment() == version.EnvironmentTypeProd {
return origin == "https://monitor.quantex.com.ar"
}
for _, o := range config.AllowedOrigins {
if o == origin {
return true
}
}
return false
}

View File

@ -0,0 +1,254 @@
package rest
import (
"errors"
"net/http"
"strings"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"quantex.com/qfixdpl/src/app"
jwttoken "quantex.com/qfixdpl/src/common/jwttoken"
"quantex.com/qfixdpl/src/common/tracerr"
)
const ErrorField = "error"
func (cont *Controller) PartyAdmin(c *gin.Context) {
if cont.GetUser(c).IsPartyAdmin {
return
}
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Unauthorized Admin"})
}
func (cont *Controller) CanSendOrder(c *gin.Context) {
if cont.GetUser(c).IsViewer {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Rol Viewer is unauthorized to send orders"})
}
}
func (cont *Controller) Middleman(c *gin.Context) {
if cont.GetUser(c).IsMiddleman {
return
}
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Unauthorized Middleman"})
}
func (cont *Controller) BackOfficeUser(c *gin.Context) {
if cont.GetUser(c).IsBackOffice {
return
}
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Unauthorized User"})
}
func (cont *Controller) SuperUser(c *gin.Context) {
if cont.GetUser(c).IsSuperUser {
return
}
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Unauthorized Admin User"})
}
func (cont *Controller) Options(c *gin.Context) {
if c.Request.Method != http.MethodOptions {
c.Next()
} else {
setHeaders(c, cont.config)
c.AbortWithStatus(http.StatusOK)
}
}
func (cont *Controller) AuthRequired(ctx *gin.Context) {
setHeaders(ctx, cont.config)
if c, err := ctx.Cookie("session_token"); c != "" && err == nil {
cont.SessionCookieAuth(ctx)
return
}
// check header for Token Auth
reqToken := ctx.GetHeader("Authorization")
if reqToken != "" {
cont.AuthorizationAuth(reqToken, ctx)
return
}
log.Error().Msg("Token Auth Unauthorized: missing session cookie and Authorization header")
ctx.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Unauthorized - AuthRequired 1"})
}
func (cont *Controller) AuthorizationAuth(reqToken string, ctx *gin.Context) {
token := strings.TrimSpace(strings.TrimPrefix(reqToken, "Bearer"))
if cont.config.EnableJWTAuth && jwttoken.IsJWT(token) {
cont.JWTTokenAuth(token, ctx)
return
}
cont.BearerTokenAuth(token, ctx)
}
func (cont *Controller) BearerTokenAuth(reqToken string, ctx *gin.Context) {
if !strings.HasPrefix(reqToken, "Bearer ") {
log.Error().Msg("Token Auth Unauthorized: missing Bearer prefix")
ctx.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Token Auth Unauthorized"})
return
}
token := strings.Split(reqToken, "Bearer ")
if len(token) != 2 {
log.Error().Msg("Token Auth Unauthorized at TokenAuth: invalid token format")
ctx.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Token Auth Unauthorized 1"})
return
}
user, err := cont.validateUserToken(token[1])
if err != nil {
err = errors.New("Token Auth Unauthorized at TokenAuth - %s" + err.Error())
log.Error().Msg(err.Error())
ctx.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Token Auth Unauthorized 2"})
return
}
log.Info().Msgf("User %s authenticated successfully at TokenAuth", user.Email)
ctx.Set(responseKey, []byte(user.Email))
ctx.Next()
}
func (cont *Controller) JWTTokenAuth(token string, ctx *gin.Context) {
from := ctx.Query("from")
serviceAuth, err := jwttoken.Validate(from, token, cont.config.AuthorizedServices)
if err != nil || serviceAuth == nil || serviceAuth.Token == nil {
err := tracerr.Errorf("invalid token or claims: %w", err)
log.Error().Msg(err.Error())
ctx.AbortWithStatusJSON(http.StatusUnauthorized,
gin.H{ErrorField: err.Error()})
return
}
user, err := cont.validateUserToken(*serviceAuth.Token)
if err != nil {
err = tracerr.Errorf("Token Auth Unauthorized at JWTTokenAuth: %w", err)
log.Error().Msg(err.Error())
ctx.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Invalid credentials"})
return
}
ctx.Set("authorized_service", serviceAuth)
ctx.Set(responseKey, []byte(user.Email)) // TODO: will services be treated as users? if not remove this line
log.Info().Str("issuer", serviceAuth.Name).Msg("Service authenticated successfully")
ctx.Next()
}
func (cont *Controller) validateUserToken(token string) (user *app.User, err error) {
userInfo := strings.Split(token, ":")
if len(userInfo) != 2 || userInfo[1] == "" {
err = tracerr.Errorf("invalid token format at validateUserToken")
log.Error().Msg(err.Error())
return nil, err
}
email := userInfo[0]
user, err = cont.store.UserByEmail(email)
if user == nil || err != nil {
err = tracerr.Errorf("user not found at validateUserToken: %w", err)
log.Error().Msg(err.Error())
return nil, err
}
tkn := userInfo[1]
if user.Token != tkn {
err = tracerr.Errorf("invalid token credentials at validateUserToken")
log.Error().Msg(err.Error())
return nil, err
}
log.Info().Str("email", user.Email).Msg("Service user validated successfully")
return user, nil
}
func (cont *Controller) SessionCookieAuth(ctx *gin.Context) {
// We can obtain the session token from the requests cookies, which come with every handler
sessionToken, err := ctx.Cookie("session_token")
if err != nil {
if errors.Is(err, http.ErrNoCookie) {
// If the cookie is not set, return an unauthorized status
ctx.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Unauthorized - AuthRequired 1"})
return
}
// For any other type of error, return a bad handler status
ctx.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ErrorField: "Bad handler - AuthRequired 2"})
return
}
cont.validateCookieToExternal(sessionToken, ctx)
}
func (cont *Controller) validateCookieToExternal(sessionToken string, ctx *gin.Context) {
ok, err := cont.store.ValidateSession(sessionToken)
if err != nil {
ctx.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Unauthorized - AuthRequired 4"})
return
}
if !ok {
// If the session token is not valid, return an unauthorized error
ctx.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ErrorField: "Unauthorized - AuthRequired 5"})
return
}
ctx.Next()
}
func IPWhiteList(whitelist map[string]bool) gin.HandlerFunc {
return func(c *gin.Context) {
if !whitelist[c.ClientIP()] {
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"status": http.StatusForbidden,
"message": "Permission denied",
})
return
}
}
}

View File

@ -0,0 +1,18 @@
package rest
type HTTPError struct {
Error string
}
type Msg struct {
Text string
}
type Credentials struct {
Email string `json:"email" binding:"required" example:"user1"`
Password string `json:"password" binding:"required" example:"password1"`
}
type Session struct {
Email string
}

View File

@ -0,0 +1,39 @@
package rest
import (
"github.com/gin-gonic/gin"
swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"
_ "quantex.com/qfixdpl/src/client/api/rest/docs" // Swag needs this import to work properly
)
func SetRoutes(api *API) {
cont := api.Controller
v1 := api.Router.Group("/qfixdpl/v1")
api.Router.Use(cont.Options)
{
auth := v1.Group("/auth")
auth.POST("/login", cont.Login)
}
qfixdpl := v1.Group("/")
qfixdpl.Use(cont.AuthRequired)
qfixdpl.GET("/health", cont.HealthCheck)
backoffice := qfixdpl.Group("/backoffice")
backoffice.Use(cont.BackOfficeUser)
admin := qfixdpl.Group("/admin")
admin.Use(cont.SuperUser)
SetSwagger(v1, cont)
}
func SetSwagger(path *gin.RouterGroup, cont *Controller) {
auth := path.Group("/")
auth.Use(cont.AuthRequired)
auth.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
}

View File

@ -0,0 +1,95 @@
// Package rest defines all API rest functionality
package rest
import (
"log/slog"
"time"
"github.com/gin-gonic/gin"
"github.com/gomodule/redigo/redis"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version"
"quantex.com/qfixdpl/src/client/store"
"quantex.com/qfixdpl/src/common/logger"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
const RedisMaxIdle = 3000 // In ms
type API struct {
Router *gin.Engine
Controller *Controller
Port string
}
type Config struct {
AllowedOrigins []string
External map[string]app.ExtAuth `toml:"External"`
AuthorizedServices map[string]app.AuthorizedService `toml:"AuthorizedServices"`
Port string
EnableJWTAuth bool
}
func New(userData app.UserDataProvider, storeInstance *store.Store, config Config, notify domain.Notifier) *API {
// Set up Gin
var engine *gin.Engine
if version.Environment() == version.EnvironmentTypeProd {
gin.SetMode(gin.ReleaseMode)
engine = gin.New()
engine.Use(gin.Recovery())
// Use a custom logger middleware
engine.Use(logger.GinLoggerMiddleware(slog.Default()))
} else {
gin.SetMode(gin.DebugMode)
engine = gin.New()
// Don't use recovery middleware in debug mode
engine.Use(gin.Logger())
}
err := engine.SetTrustedProxies([]string{"127.0.0.1"})
if err != nil {
panic("error setting trusted proxies: %v" + err.Error())
}
if config.Port == "" {
panic("API Base Port can not be empty!")
}
api := &API{
Controller: newController(NewPool(), userData, storeInstance, config, notify),
Router: engine,
Port: config.Port,
}
SetRoutes(api)
return api
}
func NewPool() *redis.Pool {
return &redis.Pool{
MaxIdle: RedisMaxIdle,
IdleTimeout: 1 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.DialURL("redis://localhost")
if err != nil {
return nil, tracerr.Errorf("error connecting to Redis: %w", err)
}
return c, nil
},
}
}
// Run starts the API
func (api *API) Run() {
// Gin blocks the calling gorutine, so we start it in its own gorutine
// calling directly go api.Router.Run doesn't prevent having to press double
// ctrl+c to stop the service
go func() {
// start the server
slog.Error(api.Router.Run("localhost:" + api.Port).Error())
}()
}

View File

@ -0,0 +1 @@
package rest

View File

@ -0,0 +1 @@
package rest

View File

@ -0,0 +1,31 @@
// Package config defines all application configuration
package config
import (
"os"
"path/filepath"
"github.com/BurntSushi/toml"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/common/tracerr"
)
// Read the config from disk
func Read(files []string) (cfg app.Config, err error) {
for _, file := range files {
var d []byte
if file != "" {
if d, err = os.ReadFile(filepath.Clean(file)); err != nil {
return app.Config{}, tracerr.Errorf("%v", err.Error())
}
}
if _, err = toml.Decode(string(d), &cfg); err != nil {
return app.Config{}, tracerr.Errorf("%v", err.Error())
}
}
return cfg, nil
}

16
src/client/data/data.go Normal file
View File

@ -0,0 +1,16 @@
// Package data defines data functions as cache
package data
import (
"quantex.com/qfixdpl/src/app"
)
type Data struct{}
func New() *Data {
return &Data{}
}
func (*Data) GetUserByEmail(string) app.User {
return app.User{}
}

View File

@ -0,0 +1,32 @@
// Package notifyall provides functionality for sending messages to multiple chat platforms
package notifyall
import (
"sync"
google "quantex.com/qfixdpl/src/client/notify/google"
"quantex.com/qfixdpl/src/client/notify/slack"
"quantex.com/qfixdpl/src/domain"
)
type AllNotify struct {
google *google.Notify
slack *slack.Notify
}
type Config struct {
Slack domain.Channels
Google domain.Channels
}
func New(cfg Config) *AllNotify {
return &AllNotify{
google: google.New(cfg.Google),
slack: slack.New(cfg.Slack),
}
}
func (an AllNotify) SendMsg(chat domain.MessageChannel, text string, status domain.MessageStatus, wg *sync.WaitGroup) {
an.google.SendMsg(chat, text, status, wg)
an.slack.SendMsg(chat, text, status, wg)
}

142
src/client/notify/common.go Normal file
View File

@ -0,0 +1,142 @@
// Package notify provides utilities to the notify packages
package notify
import (
"bytes"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"
"quantex.com/qfixdpl/src/common/logger"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
// maxRetry is the send message maximum number of tries.
const maxRetry = 5
// waitBetweenTries is the time in seconds that the SendCo func should wait before try again.
const waitBetweenTries = 5
//revive:disable:argument-limit // need this length to pass the logger
func SendWithRetry(text string, status domain.MessageStatus, waitgroup *sync.WaitGroup, url string,
getMessage func(string, domain.MessageStatus) string,
) {
loc, err := time.LoadLocation("America/Argentina/Buenos_Aires")
if err != nil {
logger.ErrorWoNotifier("error loading timezone %v", err)
return
}
// TODO check why we have these, here it should be a better way to avoid this
start := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 7, 30, 0, 0, loc)
if time.Now().Before(start) {
slog.Info("skipping notification, inactive app")
return
}
for range maxRetry {
err = send(text, status, url, getMessage)
if err == nil {
slog.Info("msg sent with notifier: " + text)
if waitgroup != nil {
waitgroup.Done()
}
return
}
time.Sleep(time.Second * waitBetweenTries)
}
logger.ErrorWoNotifier("unable to send msg in %d tries", maxRetry)
if waitgroup != nil {
waitgroup.Done()
}
}
//revive:enable:argument-limit
func send(text string, status domain.MessageStatus, url string,
getMessage func(string, domain.MessageStatus) string,
) (errOut error) {
slog.Debug("URL:> " + url)
msg := getMessage(text, status) // this function changes
slog.Debug(msg)
jsonStr := []byte(msg)
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(jsonStr)) //nolint:noctx // we don't need context here
if err != nil {
logger.ErrorWoNotifier("%v", err.Error())
return err //nolint:wrapcheck // we don't need to wrap this error
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{
Timeout: time.Second * 10,
}
resp, err := client.Do(req)
if err != nil {
return err //nolint:wrapcheck // we don't need to wrap this error
}
if resp == nil {
err = tracerr.Errorf("error at send: '%s' response is nil", req.URL)
logger.ErrorWoNotifier("%v", err.Error())
return err
}
defer func() {
if err := resp.Body.Close(); err != nil {
err = tracerr.Errorf("error closing body request while sending msg at send: %v", err)
logger.ErrorWoNotifier("%v", err.Error())
}
}()
r := resp.Status
slog.Debug("response Status: " + r)
slog.Debug(fmt.Sprintf("response Headers: %+v", resp.Header))
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.ErrorWoNotifier("error reading response body while sending msg, error: %s", err.Error())
}
slog.Debug("response Body: " + string(body))
if resp.StatusCode != http.StatusOK {
err = tracerr.Errorf("error sending msg statusCode: %d body: %s", resp.StatusCode, string(body))
logger.ErrorWoNotifier("%v", err.Error())
return err
}
return nil
}
func ValidateChannelConfig(channels domain.Channels) error {
if channels.Test == "" || channels.Web == "" || channels.Panic == "" || channels.Error == "" {
return tracerr.Errorf("channels configuration is needed: Test, Web, Panic and/or Error are empty. "+
"cfg: %v", channels)
}
return nil
}

View File

@ -0,0 +1,271 @@
// Package googlechat provides functionality to send google chat notifications
package googlechat
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"net"
"os"
"strings"
"sync"
"time"
"github.com/sasha-s/go-deadlock"
"quantex.com/qfixdpl/src/app/version"
common "quantex.com/qfixdpl/src/client/notify"
"quantex.com/qfixdpl/src/common/logger"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
//revive:disable:line-length-limit It's just links
type SpaceID string
// spaces URLs
const googleChatURL = "https://chat.googleapis.com/v1/spaces/"
type Notify struct {
spaces map[domain.MessageChannel]SpaceID
messages map[string]time.Time
m deadlock.Mutex
}
//nolint:lll // It's just a link
func New(chls domain.Channels) *Notify {
if err := common.ValidateChannelConfig(chls); err != nil {
panic(tracerr.Errorf("google config error: %w", err))
}
messages := make(map[string]time.Time)
spaces := make(map[domain.MessageChannel]SpaceID)
spaces[domain.MessageChannelTest] = SpaceID(chls.Test)
spaces[domain.MessageChannelWeb] = SpaceID(chls.Web)
spaces[domain.MessageChannelPanic] = SpaceID(chls.Panic)
spaces[domain.MessageChannelError] = SpaceID(chls.Error)
return &Notify{
spaces: spaces,
messages: messages,
}
}
func (g *Notify) Example() {
wg := sync.WaitGroup{}
wg.Add(3)
go g.SendMsg(domain.MessageChannelTest, "Hello World!", domain.MessageStatusGood, &wg)
go g.SendMsg(domain.MessageChannelTest, "Error", domain.MessageStatusStopper, &wg)
go g.SendMsg(domain.MessageChannelTest, "Warning", domain.MessageStatusWarning, &wg)
wg.Wait()
}
func (g *Notify) SendMsg(chat domain.MessageChannel, text string, status domain.MessageStatus, waitgroup *sync.WaitGroup) {
var space SpaceID
if s, ok := g.spaces[chat]; !ok {
space = g.spaces[domain.MessageChannelError]
err := tracerr.Errorf("error sending google notification, there's no space id for chat: %s", chat)
logger.ErrorWoNotifier("%v", err.Error())
} else {
space = s
}
s := fmt.Sprintf("%s-%s-%s", text, status.String(), string(space))
if ok := g.shouldSendMessage(s); ok {
go common.SendWithRetry(text, status, waitgroup, googleChatURL+string(space), getMessage)
}
}
//nolint:lll // It's just a link
const (
errImg = "https://media.gettyimages.com/id/1359003186/es/foto/illustration-of-a-tick-or-an-x-indicating-right-and-wrong.jpg?s=612x612&w=0&k=20&c=FdYIXI1qCPpVcY6phcIJom8sRhZw4hgYVtwOC6lNlmA="
goodImg = "https://media.gettyimages.com/id/1352723074/es/foto/drawing-of-green-tick-check-mark.jpg?s=612x612&w=0&k=20&c=RgoCJ-n0OpZSClCWhJstoXAfiGrBZhnggUvOp2ooJqI="
warnImg = "https://media.gettyimages.com/id/1407160246/es/vector/icono-de-tri%C3%A1ngulo-de-peligro.jpg?s=612x612&w=0&k=20&c=rI0IYmZmkg62txtNQFhyTbHx5oW311_OupBkbWODfjg="
)
func getImage(status domain.MessageStatus) (string, error) {
switch status {
case domain.MessageStatusGood:
return goodImg, nil
case domain.MessageStatusWarning:
return warnImg, nil
case domain.MessageStatusStopper:
return errImg, nil
}
return "", tracerr.Errorf("unknown Message Status Type")
}
func getMessage(text string, status domain.MessageStatus) string {
image, err := getImage(status)
if err != nil {
err = tracerr.Errorf("error getting image at getMessage, error: %w", err)
logger.ErrorWoNotifier("%v", err.Error())
var e error
image, e = getImage(domain.MessageStatusWarning)
if e != nil {
e = tracerr.Errorf("error getting image at getMessage, error: %w", e)
logger.ErrorWoNotifier("%v", e.Error())
}
}
env := version.Environment()
appVersion := version.Base()
text = strings.ReplaceAll(text, "'", "\"")
msg := `
{
'cardsV2': [{
'cardId': 'createCardMessage',
'card': {
'header': {
'title': 'qfixdpl',
'subtitle': 'Notification',
'imageUrl': '%s',
'imageType': 'CIRCLE'
},
'sections': [
{
'widgets':[
{
'textParagraph': {
'text': '<b>Environment:</b> %s'
}
},
{
'textParagraph': {
'text': '<b>Message:</b> %s'
}
},
{
'textParagraph': {
'text': '<b>Build:</b> %s'
}
},
{
'textParagraph': {
'text': '<b>Time:</b> %s'
}
},
{
'textParagraph': {
'text': '<b>Hostname:</b> %s'
}
},
{
'textParagraph': {
'text': '<b>IP:</b> %s'
}
}
]
}
]
}
}]
}`
loc, err := time.LoadLocation("America/Argentina/Buenos_Aires")
if err != nil {
err := tracerr.Errorf("error loading timezone %v, setting default UTC", err)
logger.ErrorWoNotifier("%v", err.Error())
loc = time.UTC
}
now := time.Now().In(loc).Format("15:04:05 02-01-2006 -0700")
jsonMsg := fmt.Sprintf(msg, image, env, text, appVersion, now,
getHostname(), getOutboundIP())
return jsonMsg
}
func (g *Notify) shouldSendMessage(s string) (ok bool) {
sha, err := generateShaString(s)
if err != nil {
err = tracerr.Errorf("error generating sha string at shouldSendMessage, error: %w", err)
logger.ErrorWoNotifier("%v", err.Error())
return true
}
g.m.Lock()
defer g.m.Unlock()
if t, ok := g.messages[sha]; ok && time.Since(t) < time.Minute {
return false
}
g.messages[sha] = time.Now()
return true
}
func generateShaString(s string) (sha string, err error) {
hash := sha256.New()
if _, err := hash.Write([]byte(s)); err != nil {
err = tracerr.Errorf("error generating hash at generateShaString, error: %w", err)
logger.ErrorWoNotifier("%v", err.Error())
return sha, err
}
return hex.EncodeToString(hash.Sum(nil)), nil
}
//revive:enable:line-length-limit
//nolint:gochecknoglobals // need to be global
var outboundIP string
func getOutboundIP() string {
if outboundIP == "" {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
logger.ErrorWoNotifier("failed to get UDP address")
return ""
}
defer func(conn net.Conn) {
err = conn.Close()
if err != nil {
logger.ErrorWoNotifier("error closing net connection")
}
}(conn)
localAddr, ok := conn.LocalAddr().(*net.UDPAddr)
if !ok {
logger.ErrorWoNotifier("error: conn.LocalAddr() is not a *net.UDPAddr type")
}
outboundIP = localAddr.IP.String()
}
return outboundIP
}
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
logger.ErrorWoNotifier("failed to get hostname")
return ""
}
return hostname
}

View File

@ -0,0 +1,122 @@
// Package slack provides functionality to send slack notifications
package slack
import (
"fmt"
"sync"
"time"
"quantex.com/qfixdpl/src/app/version"
common "quantex.com/qfixdpl/src/client/notify"
"quantex.com/qfixdpl/src/common/logger"
"quantex.com/qfixdpl/src/common/tracerr"
"quantex.com/qfixdpl/src/domain"
)
type ChannelKey string
const slackBotURL = "https://hooks.slack.com/services/"
type Notify struct {
channels map[domain.MessageChannel]ChannelKey
}
func New(chls domain.Channels) *Notify {
if err := common.ValidateChannelConfig(chls); err != nil {
panic(tracerr.Errorf("slack config error: %w", err))
}
channels := make(map[domain.MessageChannel]ChannelKey)
channels[domain.MessageChannelTest] = ChannelKey(chls.Test)
channels[domain.MessageChannelWeb] = ChannelKey(chls.Web)
channels[domain.MessageChannelPanic] = ChannelKey(chls.Panic)
channels[domain.MessageChannelError] = ChannelKey(chls.Error)
return &Notify{
channels: channels,
}
}
func (s *Notify) Example() {
wg := sync.WaitGroup{}
wg.Add(1)
s.SendMsg(domain.MessageChannelTest, "Hello World!", domain.MessageStatusGood, &wg)
wg.Wait()
}
func (s *Notify) SendMsg(channel domain.MessageChannel, text string, status domain.MessageStatus, waitgroup *sync.WaitGroup) {
var chKey ChannelKey
if c, ok := s.channels[channel]; !ok {
chKey = s.channels[domain.MessageChannelError]
err := tracerr.Errorf("error sending slack notification, there's no channel key for channel: %s", channel)
logger.ErrorWoNotifier("%v", err.Error())
} else {
chKey = c
}
common.SendWithRetry(text, status, waitgroup, slackBotURL+string(chKey), getMessage)
}
func getStatusTypeString(status domain.MessageStatus) (str string, err error) {
switch status {
case domain.MessageStatusGood:
return "good", nil
case domain.MessageStatusWarning:
return "warning", nil
case domain.MessageStatusStopper:
return "danger", nil
}
return "", tracerr.Errorf("unknown Message Status Type")
}
func getMessage(text string, status domain.MessageStatus) string {
dt := time.Now()
dtf := dt.Format("15:04:05.000 -07 [02/01/2006]")
sts, err := getStatusTypeString(status)
if err != nil {
err := tracerr.Errorf("error getting status type string: %v", err)
logger.ErrorWoNotifier("%v", err.Error())
var e error
sts, e = getStatusTypeString(domain.MessageStatusWarning)
if e != nil {
e = tracerr.Errorf("error getting status type string: %v", e)
logger.ErrorWoNotifier("%v", e.Error())
}
}
appVersion := version.Base()
msg := `
{
"username": "SystemMonitor",
"attachments": [
{
"color": "%s",
"title": "qfixdpl",
"title_link": "https://api.slack.com/",
"text": "%s\n%s",
"fields": [
{
"title": "Build",
"value": "%s",
"short": false
}
]
}
]
}`
jsonMsg := fmt.Sprintf(msg, sts, text, dtf, appVersion)
return jsonMsg
}

118
src/client/res/resources.go Normal file
View File

@ -0,0 +1,118 @@
// Package res defines all resources
package res
import (
"embed"
"errors"
"io"
"io/fs"
"log/slog"
"net/http"
"quantex.com/qfixdpl/src/common/tracerr"
)
var resources fs.FS //nolint
func Set(r fs.FS) {
resources = r
}
func Get() fs.FS {
return resources
}
// ReadFile reads the named file and returns the contents.
// Got it from Go Standard Library: /usr/local/go/src/os/file.go
// A successful call returns err == nil, not err == EOF.
// Because ReadFile reads the whole file, it does not treat an EOF from Read
// as an error to be reported.
func ReadFile(name string) ([]byte, error) {
switch assets := resources.(type) {
case embed.FS:
b, err := assets.ReadFile(name)
if err != nil {
return nil, tracerr.Errorf("error reading file for embed.FS: %w", err)
}
return b, nil
case fs.FS:
b, err := readFileInternal(name)
if err != nil {
return nil, tracerr.Errorf("error reading file for embed.FS: %w", err)
}
return b, nil
default:
return nil, tracerr.Errorf("not allowed FS type: %T", assets)
}
}
//revive:disable:cognitive-complexity we need this level of complexity
//revive:disable:cyclomatic we need this level of complexity
func readFileInternal(name string) ([]byte, error) {
file, err := resources.Open(name)
if err != nil {
return nil, tracerr.Errorf("error opening file %s: %w", name, err)
}
defer func() {
e := file.Close()
if err != nil || e != nil {
err = tracerr.Errorf("error closing file %s: %w", e, err)
}
}()
var size int
if info, err := file.Stat(); err == nil {
size64 := info.Size()
if int64(int(size64)) == size64 {
size = int(size64)
}
}
size++ // one byte for final read at EOF
// If a file claims a small size, read at least 512 bytes.
// In particular, files in Linux's /proc claim size 0 but
// then do not work right if read in small pieces,
// so an initial read of 1 byte would not work correctly.
if size < 512 {
size = 512
}
data := make([]byte, 0, size)
for {
if len(data) >= cap(data) {
d := append(data[:cap(data)], 0)
data = d[:len(data)]
}
n, err := file.Read(data[len(data):cap(data)])
data = data[:len(data)+n]
if err != nil {
if errors.Is(err, io.EOF) {
err = nil
}
return data, err
}
}
}
//revive:enable:cyclomatic
//revive:enable:cognitive-complexity
func GetFileSystem(prefix string) http.FileSystem {
fsys, err := fs.Sub(resources, prefix)
if err != nil {
e := tracerr.Errorf("error with prefix %s: %w", prefix, err)
slog.Error(e.Error())
}
return http.FS(fsys)
}

42
src/client/store/external/auth.go vendored Normal file
View File

@ -0,0 +1,42 @@
package external
import (
"errors"
"fmt"
"log/slog"
"net/http"
"time"
)
func (s *Manager) ValidateSession(sessionToken string) (bool, error) {
path := "/api/v1/auth/session/validate"
auth, ok := s.config.External[QApixService]
if !ok {
err := errors.New("error validating auth to qapix service at ValidateSession")
slog.Error(err.Error())
return false, err
}
options := RequestOptions{
Method: http.MethodGet,
Path: path,
Body: nil,
Retries: 3,
Timeout: sTimeout * time.Second,
CacheDuration: time.Second * 10,
Auth: &auth,
SessionToken: sessionToken,
}
res, err := s.sendRequestToExternal(options)
if err != nil || res == nil {
err := fmt.Errorf("error making ValidateSession request to qapix server. error: %w", err)
slog.Error(err.Error())
return false, err
}
return true, nil
}

273
src/client/store/external/manager.go vendored Normal file
View File

@ -0,0 +1,273 @@
// Package external defines all external services access
package external
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
"github.com/sasha-s/go-deadlock"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/app/version"
jwttoken "quantex.com/qfixdpl/src/common/jwttoken"
"quantex.com/qfixdpl/src/domain"
)
const (
sTimeout = 10
lTimeout = 15
)
type Config struct {
QApixPort string
QApixHost string
External map[string]app.ExtAuth
QApixToken string
EnableJWTAuth bool
}
type cacheItem struct {
Time time.Time
Resp []byte
}
type Manager struct {
config Config
notifier domain.Notifier
cache map[string]cacheItem
mutex deadlock.RWMutex
}
type RequestOptions struct {
Method string
Path string
URL string
Token string
Secured bool
Body interface{}
Retries int
Timeout time.Duration
CacheDuration time.Duration
Auth *app.ExtAuth
SessionToken string
}
// NewManager create new Manager struct
func NewManager(n domain.Notifier, cfg Config) *Manager {
return &Manager{
config: cfg,
notifier: n,
cache: make(map[string]cacheItem),
}
}
//revive:disable:argument-limit we need this arguments
func (s *Manager) sendRequestToExternal(opts RequestOptions) ([]byte, error) {
host := fmt.Sprintf("http://localhost:%v", opts.Auth.Port)
if opts.Auth.Host != "" {
host = opts.Auth.Host
}
token := s.config.QApixToken
secured := false
if s.config.EnableJWTAuth && opts.Auth != nil {
t, err := jwttoken.Encrypt(*opts.Auth)
if err != nil {
e := fmt.Errorf("error encrypting quantexService: %w", err)
log.Error().Msg(e.Error())
return nil, e
}
token = t
secured = true
}
opts.Secured = secured
opts.Token = token
opts.URL = urlFrom(host, opts.Path)
return s.sendRequestWithCache(opts)
}
func (s *Manager) sendRequestWithCache(opts RequestOptions) ([]byte, error) {
sha := optionSha(opts)
if opts.CacheDuration > 0 && len(sha) > 0 {
s.mutex.RLock()
t, ok := s.cache[sha]
s.mutex.RUnlock()
if ok && time.Since(t.Time) < opts.CacheDuration {
return t.Resp, nil
}
}
resp, err := s.sendRequestWithRetries(opts)
if err == nil && len(sha) > 0 {
s.mutex.Lock()
s.cache[sha] = cacheItem{
time.Now(),
resp,
}
s.mutex.Unlock()
}
return resp, err
}
//revive:disable:flag-parameter we need this flag
//nolint:funlen //it's long but easy to read
func (s *Manager) sendRequestWithRetries(opts RequestOptions) ([]byte, error) {
client := &http.Client{
Timeout: opts.Timeout,
}
bodyBytes, err := json.Marshal(&opts.Body)
if err != nil {
e := fmt.Errorf("error encoding the body: %v, %w", opts.Body, err)
slog.Error(e.Error())
return nil, e
}
slog.Debug("sending request to: " + opts.URL)
request, err := http.NewRequest(opts.Method, opts.URL, bytes.NewBuffer(bodyBytes)) //nolint:noctx //no ctx needed
if err != nil {
e := fmt.Errorf("error creating new %s request to: %s, %w", opts.Method, opts.URL, err)
slog.Error(e.Error())
return nil, e
}
authorization := "Bearer " + opts.Token
if opts.Secured {
authorization = opts.Token
}
request.Header.Set("Authorization", authorization)
request.Header.Set("Content-Type", "application/json")
if opts.SessionToken != "" {
// Create a cookie
cookie := &http.Cookie{
Name: "session_token",
Value: opts.SessionToken,
Path: "/",
}
// Add the cookie to the request
request.AddCookie(cookie)
}
var resp *http.Response
for i := 0; i <= opts.Retries; i++ { //nolint:wsl // It's ok in this case
interval := time.Duration(i) * time.Second
slog.Debug(fmt.Sprintf("request to '%s' try #%v in %v", request.URL, i, interval))
time.Sleep(interval)
resp, err = client.Do(request)
if err != nil {
e := fmt.Errorf("error making request to %s. error: %w", request.URL, err)
slog.Error(e.Error())
// send notification if notifier is available
s.sendNotification(domain.MessageChannelError, e.Error(), domain.MessageStatusWarning, nil)
continue
}
break
}
if resp == nil {
err = fmt.Errorf("error: '%s' response is nil", request.URL)
slog.Error(err.Error())
return nil, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
slog.Error("error closing response body at sendRequestWithRetries: " + err.Error())
}
}()
if resp.StatusCode != http.StatusOK {
msg := fmt.Sprintf("response code from %s is not 200. StatusCode: %d.", opts.URL, resp.StatusCode)
// send notification if notifier is available
s.sendNotification(domain.MessageChannelError, msg, domain.MessageStatusWarning, nil)
return nil, fmt.Errorf("error %s", msg)
}
bodyBytes, resErr := io.ReadAll(resp.Body)
if resErr != nil {
msg := fmt.Sprintf("error reading response from %s. error: %s", opts.URL, resErr.Error())
// send notification if notifier is available
s.sendNotification(domain.MessageChannelError, msg, domain.MessageStatusWarning, nil)
return nil, fmt.Errorf("error %s", msg)
}
return bodyBytes, nil
}
//revive:enable
// sendNotification is a nil-safe wrapper around the notifier's SendMsg method.
// Some call sites run in contexts where Manager.notifier may be nil, so guard the call
// to avoid runtime panics.
func (s *Manager) sendNotification(channel domain.MessageChannel, message string, status domain.MessageStatus, wg *sync.WaitGroup) {
if s == nil || s.notifier == nil {
slog.Debug("notifier is nil, skipping SendMsg")
return
}
s.notifier.SendMsg(channel, message, status, wg)
}
func optionSha(opts RequestOptions) string {
key := fmt.Sprintf("%s-%s-%s", opts.Method, opts.URL, opts.Body)
return shaString(key)
}
func shaString(s string) (sha string) {
hash := sha256.New()
if _, err := hash.Write([]byte(s)); err != nil {
err = fmt.Errorf("error generating hash at generateShaString, error: %w", err)
slog.Error(err.Error())
return ""
}
return hex.EncodeToString(hash.Sum(nil))
}
func urlFrom(host, path string) string {
from := strings.ToLower(version.AppName)
url := fmt.Sprintf("%s%s?from=%s", host, path, from)
if strings.Contains(path, "?") {
url = fmt.Sprintf("%s%s&from=%s", host, path, from)
}
return url
}

141
src/client/store/external/user.go vendored Normal file
View File

@ -0,0 +1,141 @@
package external
import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"time"
"quantex.com/qfixdpl/src/app"
)
const QApixService = "QApix"
func (s *Manager) Users(out any) (err error) {
auth, ok := s.config.External[QApixService]
if !ok {
err = errors.New("error getting auth data for qapix service at Users")
slog.Error(err.Error())
return err
}
options := RequestOptions{
Method: http.MethodGet,
Path: "/api/v1/auth/data/users",
Body: nil,
Retries: 0,
Timeout: lTimeout * time.Second,
CacheDuration: -1,
Auth: &auth,
}
response, err := s.sendRequestToExternal(options)
if err != nil {
err = fmt.Errorf("error making request to qapix server. error: %w", err)
slog.Error(err.Error())
return err
}
err = json.Unmarshal(response, out)
if err != nil {
err = fmt.Errorf("error making request to qapix server. error: %w", err)
slog.Error(err.Error())
return err
}
return nil
}
func (s *Manager) UserByID(userID string, out any) (err error) {
path := "/api/v1/auth/data/user_by_id/" + userID
auth, ok := s.config.External[QApixService]
if !ok {
err = errors.New("error getting auth data for qapix service at UserByEmail")
slog.Error(err.Error())
return err
}
options := RequestOptions{
Method: http.MethodGet,
Path: path,
Body: nil,
Retries: 3,
Timeout: sTimeout * time.Second,
CacheDuration: time.Second * 10,
Auth: &auth,
}
res, err := s.sendRequestToExternal(options)
if err != nil {
err = fmt.Errorf("error making user_by_id request to qapix server. error: %w", err)
slog.Error(err.Error())
return err
}
err = json.Unmarshal(res, out)
if err != nil {
err = fmt.Errorf("error unmarshalling user by id from qapix server. error: %w", err)
slog.Error(err.Error())
return err
}
return nil
}
func (s *Manager) UserByEmail(email string, out *app.User) (err error) {
path := "/api/v1/auth/data/user_by_email"
auth, ok := s.config.External[QApixService]
if !ok {
err = errors.New("error getting auth data for qapix service at UserByEmail")
slog.Error(err.Error())
return err
}
options := RequestOptions{
Method: http.MethodPost,
Path: path,
Body: map[string]string{"Email": email},
Retries: 3,
Timeout: sTimeout * time.Second,
CacheDuration: time.Second * 10,
Auth: &auth,
}
res, err := s.sendRequestToExternal(options)
if err != nil {
err = fmt.Errorf("error making user_by_email request to qapix server. error: %w", err)
slog.Error(err.Error())
return err
}
err = json.Unmarshal(res, out)
if err != nil {
err = fmt.Errorf("error unmarshalling user by email from qapix server. error: %w", err)
slog.Error(err.Error())
return err
}
return nil
}

View File

@ -0,0 +1,72 @@
// Package store defines database functions
package store
import (
"log/slog"
"time"
"quantex.com.ar/multidb"
"quantex.com/qfixdpl/src/app"
"quantex.com/qfixdpl/src/client/store/external"
"quantex.com/qfixdpl/src/common/tracerr"
)
const dbPingSeconds = 30
type Store struct {
db *multidb.MultiDB
ext *external.Manager
}
type Config struct {
MultiDB multidb.Config
External external.Config
}
// New NewStore creates Store object
func New(config Config) (*Store, error) {
database, err := multidb.New("postgres", config.MultiDB)
if err != nil {
return nil, tracerr.Errorf("error trying to create multidb: %w", err)
}
database.Start()
if err = database.Ping(); err != nil {
return nil, tracerr.Errorf("error ping to database: %w", err)
}
ext := external.NewManager(nil, config.External)
s := &Store{
db: database,
ext: ext,
}
go s.db.PeriodicDBPing(time.Second * dbPingSeconds)
return s, nil
}
func (p *Store) CloseDB() {
p.db.Close()
slog.Info("closing database connection.")
}
func (p *Store) UserByEmail(email string) (out *app.User, err error) {
var user app.User
if err := p.ext.UserByEmail(email, &user); err != nil {
return nil, tracerr.Errorf("error fetching user by email from external service: %w", err)
}
return &user, nil
}
func (p *Store) ValidateSession(sessionToken string) (bool, error) {
ok, err := p.ext.ValidateSession(sessionToken)
if err != nil {
return false, tracerr.Errorf("error validating session: %w", err)
}
return ok, nil
}