Files
EgtExecutor/EXE_Redis.cpp
T
Riccardo Elitropi cd5a82324b EgtExecutor :
- in Redis aggiunta autenticazione con utente e password per modalità sincrona e asincrona.
2025-10-01 13:32:54 +02:00

1115 lines
45 KiB
C++

//----------------------------------------------------------------------------
// EgalTech 2025-2025
//----------------------------------------------------------------------------
// File : EXE_Redis.cpp Data : 17.09.25 Versione : 2.7i3
// Contenuto : Funzioni per interfacciarsi con server Redis.
//
//
//
// Modifiche : 17.09.25 RE Creazione modulo.
//
//
//----------------------------------------------------------------------------
//--------------------------- Include ----------------------------------------
#include <winsock2.h>
#include <WS2tcpip.h>
#include "stdafx.h"
#include "EXE.h"
#include "EXE_Macro.h"
#include "AuxTools.h"
#include "/EgtDev/Include/EXeExecutor.h"
#include "/EgtDev/Include/EGkStringUtils3d.h"
#include "/EgtDev/Include/EGnStringKeyVal.h"
#pragma warning( disable: 4244) // conversione da 'uint64_t' a 'size_t'
#pragma warning( disable: 4200) // utilizzata estensione non standard: matrice di dimensioni zero in struct/union
#include "/EgtDev/Extern/hiredis/Include/async.h"
#include <thread>
#include <atomic>
#include <map>
#include <sstream>
#include <future>
using namespace std ;
// ---------------------------------------------------------------------------
// Definizione variabili generali Redis
// ---------------------------------------------------------------------------
static const int REDIS_MIN_DB = 0 ;
static const int REDIS_MAX_DB = 15 ;
static const int DEFAULT_PORT = 6379 ;
static const int DEFAULT_SENTINEL_MASTER_PORT = 26379 ;
static const string DEFAULT_USER = "default" ;
static const int DEFAULT_DATABASE = 0 ;
static const int DEFAULT_TIMEOUT = 15000 ; // [ms]
static const int DEFAULT_ALIVE_TIME = 180 ; // [s]
// Contesto Sincrono
static redisContext* s_pRedisContext = nullptr ;
// Contesto Asincrono
static redisAsyncContext* s_pRedisAsyncSubContext = nullptr ;
static redisAsyncContext* s_pRedisAsyncPubContext = nullptr ;
static atomic<int> s_nPendingDataBase = REDIS_MIN_DB ; // Default
static string s_sPassword = "" ; // Default
static string s_sUser = DEFAULT_USER ; // Default
static atomic<bool> s_bSubConnected = false ;
static atomic<bool> s_bPubConnected = false ;
static atomic<bool> s_bPubLoopRunning = false ;
static atomic<bool> s_bSubLoopRunning = false ;
static atomic<bool> s_bMessage = false ;
static string s_sMessage ;
// Tipo di valore recuperabile mediante una chiave di tipo string
static const string KEY_TYPE_STRING = "string" ;
static const string KEY_TYPE_LIST = "list" ; // non usato
static const string KEY_TYPE_SET = "set" ; // non usato
static const string KEY_TYPE_HASH = "hash" ; // non usato
static const string KEY_TYPE_ZSET = "zset" ; // non usato
static const string KEY_TYPE_JSON = "ReJSON-RL" ; // non usato, disponibile se versione >= 8 ( noi usiamo la 6)
// struttura per parametri della stringa di Connessione
struct RedisConnectionInfo {
string sHost, sServiceName, sUser, sPassword ;
int nPort, nDefaultDataBase, nKeepAlive, nConnectTimeout, nSyncTimeout, nAsyncTimeout ;
bool bAbortConnect, bSsl, bAllowAdmin ;
RedisConnectionInfo() {
sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ;
nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ;
nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ;
bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ;
} ;
RedisConnectionInfo( string sMyHost, string sMyServiceName, string sMyUser, string sMyPassword, int nMyPort,
int nMyDefaultDataBase, int dMyKeepAlive, int dMyConnectTimeout, int dMySyncTimeout,
int dMyAsyncTimeout, bool bMyAbortConnect, bool bMySsl, bool bMyAllowAdmin)
: sHost( sMyHost), sServiceName( sMyServiceName), sUser( sMyUser), sPassword( sMyPassword), nPort( nMyPort),
nDefaultDataBase( nMyDefaultDataBase), nKeepAlive( dMyKeepAlive), nConnectTimeout( dMyConnectTimeout),
nSyncTimeout( dMySyncTimeout), nAsyncTimeout( dMyAsyncTimeout), bAbortConnect( bMyAbortConnect),
bSsl( bMySsl), bAllowAdmin( bMyAllowAdmin) {}
} ;
// ---------------------------------------------------------------------------
// -------- Funzione di interpretazione della stringa di connessione ---------
// ---------------------------------------------------------------------------
static bool
GetParamsFromConnectionString( const string& sConnection, RedisConnectionInfo& ConnectionInfo)
{
// Se la stringa di connessione è vuota, errore
if ( sConnection.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Empty Connection String") ;
return false ;
}
// Ricerco le informazioni Splittando la stringa di connessione per ","
string sHostPort, sParams ;
SplitFirst( sConnection, ",", sHostPort, sParams) ;
if ( ! sHostPort.empty()) {
// Host e Porta
string sHost, sPort ;
SplitFirst( sHostPort, ":", sHost, sPort) ;
if ( sHost.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Invalid Host") ;
return false ;
}
int nPort = DEFAULT_PORT ;
if ( ! sPort.empty()) {
if ( ! FromString( sPort, nPort)) {
LOG_INFO( GetCmdLogger(), "Error : Invalid Port") ;
return false ;
}
}
ConnectionInfo.sHost = sHost ;
ConnectionInfo.nPort = nPort ;
}
// Recupero i parametri ( rimuovo tutti gli spazi )
sParams.erase( remove( sParams.begin(), sParams.end(), ' '), sParams.end()) ;
GetValInNotes( sParams, "serviceName", ",", ConnectionInfo.sServiceName) ;
GetValInNotes( sParams, "user", ",", ConnectionInfo.sUser) ;
GetValInNotes( sParams, "password", ",", ConnectionInfo.sPassword) ;
GetValInNotes( sParams, "DefaultDatabase", ",", ConnectionInfo.nDefaultDataBase) ;
GetValInNotes( sParams, "keepAlive", ",", ConnectionInfo.nKeepAlive) ;
GetValInNotes( sParams, "connectTimeout", ",", ConnectionInfo.nConnectTimeout) ;
GetValInNotes( sParams, "syncTimeout", ",", ConnectionInfo.nSyncTimeout) ;
GetValInNotes( sParams, "asyncTimeout", ",", ConnectionInfo.nAsyncTimeout) ;
GetValInNotes( sParams, "abortConnect", ",", ConnectionInfo.bAbortConnect) ;
GetValInNotes( sParams, "ssl", "," ,ConnectionInfo.bSsl) ;
GetValInNotes( sParams, "allowAdmin", ",", ConnectionInfo.bAllowAdmin) ;
return true ;
}
// ---------------------------------------------------------------------------
// ------------------------------ Sync ---------------------------------------
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
// Funzione di Connessione sincrona a Redis
//----------------------------------------------------------------------------
bool
ExeRedisConnect( const string& sConnection)
{
// Se connesione già presente, termino e creo la nuova
if ( s_pRedisContext != nullptr)
redisFree( s_pRedisContext) ;
// Recupero i parametri dalla stringa di connessione
RedisConnectionInfo ConnectionInfo ;
if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo))
return false ;
// Se connessione senza TimeOut
if ( ConnectionInfo.nSyncTimeout <= 0) {
// Imposto il contesto per connesione sincrona
s_pRedisContext = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
if ( s_pRedisContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
return false ;
}
else if ( s_pRedisContext->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str())
redisFree( s_pRedisContext) ;
return false ;
}
}
// Se connessione con parametri
else {
// TimeOut per connessione
struct timeval TimeOutConnection{} ;
TimeOutConnection.tv_sec = static_cast<long>( ConnectionInfo.nSyncTimeout) ;
TimeOutConnection.tv_usec = static_cast<long>( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ;
// Imposto il contesto per connesione sincrona con TimeOut
s_pRedisContext = redisConnectWithTimeout( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort, TimeOutConnection) ;
if ( s_pRedisContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
return false ;
}
else if ( s_pRedisContext->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str())
redisFree( s_pRedisContext) ;
return false ;
}
// TimeOut per richiesta I/O
struct timeval TimeOutIO{} ;
TimeOutIO.tv_sec = static_cast<long>( ConnectionInfo.nSyncTimeout) ;
TimeOutIO.tv_usec = static_cast<long>( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ;
// Imposto TimeOut per I/O
int nResult = redisSetTimeout( s_pRedisContext, TimeOutIO) ;
if ( nResult != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : Can't set timeout I/O") ;
return false ;
}
}
LOG_INFO( GetCmdLogger(), "Sync connection to redis !")
// Verifico se la connessione è di tipo Sentinel
redisReply* replySentinel = ( redisReply*)redisCommand( s_pRedisContext, "SENTINEL get-master-addr-by-name %s",
ConnectionInfo.sServiceName.c_str()) ;
if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) {
if ( replySentinel != nullptr)
freeReplyObject( replySentinel) ;
}
else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr &&
replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) {
// --- Nodo sentinella
string sMasterHost = replySentinel->element[0]->str ;
int nMasterPort = DEFAULT_SENTINEL_MASTER_PORT ;
FromString( replySentinel->element[1]->str, nMasterPort) ;
freeReplyObject( replySentinel) ;
// Effettuo la connessione al master redis
s_pRedisContext = redisConnect( sMasterHost.c_str(), nMasterPort) ;
if ( s_pRedisContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
return false ;
}
else if ( s_pRedisContext->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str())
redisFree( s_pRedisContext) ;
return false ;
}
}
// Verifico se richiesta autenticazione
if ( ! ConnectionInfo.sPassword.empty()) {
redisReply* reply = ( redisReply*)redisCommand( s_pRedisContext, "AUTH %s %s", ConnectionInfo.sUser.c_str(), ConnectionInfo.sPassword.c_str()) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
LOG_INFO(GetCmdLogger(), "Error : Authentication failed") ;
if ( reply == nullptr)
freeReplyObject( reply) ;
redisFree( s_pRedisContext) ;
return false ;
}
freeReplyObject( reply) ;
}
// Seleziono il DataBase
redisReply* reply = ( redisReply*)redisCommand( s_pRedisContext, "SELECT %d", ConnectionInfo.nDefaultDataBase) ;
if ( reply == nullptr) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str()) ;
return false ;
}
else if ( reply->type == REDIS_REPLY_ERROR) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + reply->str).c_str())
freeReplyObject( reply) ;
return false ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Connected to DB #"} + ToString( ConnectionInfo.nDefaultDataBase) + " !").c_str())
freeReplyObject( reply) ;
return true ;
}
//----------------------------------------------------------------------------
// Funzione di Disconnessione sincrona a Redis
//----------------------------------------------------------------------------
bool
ExeRedisDisconnect( void)
{
// Se connessione non presente
if ( s_pRedisContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Closing a Sync Connection never created") ;
return false ;
}
// Se connessione in stato di errore
if ( s_pRedisContext->err != 0) {
LOG_INFO( GetCmdLogger(), "Warning : Closing Redis Sync Connection in error") ;
redisFree( s_pRedisContext) ;
s_pRedisContext = nullptr ;
return false ;
}
// Effettuo disconnessione sincrona
redisFree( s_pRedisContext) ;
s_pRedisContext = nullptr ;
LOG_INFO( GetCmdLogger(), "Sync Connection Closed !") ;
return true ;
}
//----------------------------------------------------------------------------
// Funzione di scrittura sincrona chiave-valore
//----------------------------------------------------------------------------
bool
ExeRedisSetValFromKey( const string& sKey, const string& sVal)
{
// Se connesione non presente, errore
if ( s_pRedisContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Connection") ;
return false ;
}
// Se connessione in stato di errore, allora errore
if ( s_pRedisContext->err != 0) {
LOG_INFO( GetCmdLogger(), "Error : Invalid Connection") ;
return false ;
}
// Il valore associato alla chiave può essere solo di tipo stringa
redisReply* reply = ( redisReply*)redisCommand( s_pRedisContext, "SET %s %s", sKey.c_str(), sVal.c_str()) ;
if ( reply == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Null reply") ;
return false ;
}
else if ( reply->type != REDIS_REPLY_STATUS || string( reply->str ) != "OK") {
LOG_INFO( GetCmdLogger(), "Error SET : No answer") ;
freeReplyObject( reply) ;
return false ;
}
LOG_INFO( GetCmdLogger(), ( string{ "New Key Added : ["} + sKey + ":" + sVal + "]").c_str())
freeReplyObject( reply) ;
return true ;
}
//----------------------------------------------------------------------------
// Funzione di lettura sincrona per valore di una chiave
//----------------------------------------------------------------------------
bool
ExeRedisGetValFromKey( const string& sKey, string& sVal)
{
sVal.clear() ;
// Se connesione non presente, errore
if ( s_pRedisContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Connection") ;
return false ;
}
// Se connessione in stato di errore, allora errore
if ( s_pRedisContext->err != 0) {
LOG_INFO( GetCmdLogger(), "Error : Invalid Connection") ;
return false ;
}
// Recupero il tipo associato alla chiave da Redis
redisReply* typeReply = ( redisReply*)redisCommand( s_pRedisContext, "TYPE %s", sKey.c_str()) ;
if ( typeReply == nullptr || typeReply->type != REDIS_REPLY_STATUS) {
string sErr = ( typeReply != nullptr ? string( typeReply->str) : "null reply") ;
if ( typeReply == nullptr)
freeReplyObject( typeReply) ;
LOG_INFO( GetCmdLogger(), ( "Error : Failed to get key type, " + sErr).c_str()) ;
return false ;
}
string sType = typeReply->str ;
freeReplyObject( typeReply) ;
// Determino il formato per la chiamata al DataBase Redis
// Per ora accettate solamente valori string
string sFormat = ( sType == KEY_TYPE_STRING ? "Get %s" : "") ;
if ( sFormat.empty()) {
LOG_ERROR( GetCmdLogger(), "Error : Not valid Key Type") ;
return false ;
}
// Effettuo la chiamata
redisReply* reply = nullptr ;
reply = ( redisReply*)redisCommand( s_pRedisContext, sFormat.c_str(), sKey.c_str()) ;
if ( reply == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Null reply") ;
return false ;
}
// Recupero il Valore
if ( sType == KEY_TYPE_STRING) {
if ( reply->type != REDIS_REPLY_STRING) {
LOG_INFO( GetCmdLogger(), "Error : Failed to read the value") ;
freeReplyObject( reply) ;
return false ;
}
sVal = string( reply->str) ;
}
freeReplyObject( reply) ;
return true ;
}
// ---------------------------------------------------------------------------
// ------------------------------ ASync --------------------------------------
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
// Funzione Ciclo degli eventi per chiamate CallBack
//----------------------------------------------------------------------------
static void
RedisEventLoop( redisAsyncContext* ctx, bool bIsPub)
{
// Verifica della validità del contesto
if ( ctx == nullptr || ctx->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error: Invalid Redis context at start of event loop for " } +
( bIsPub ? "Pub" : "Sub")).c_str())
return ;
}
// Recupero del file descriptor del socket TCP usato da Redis.
SOCKET sock = ctx->c.fd ;
// Imposto condizione di ascolto degli eventi
bool bLoopRunning = bIsPub ? s_bPubLoopRunning : s_bSubLoopRunning ;
// Creazione dei due insiemi per il file descriptor ( lettura e scrittura )
// r = read, w = write
fd_set rfds, wfds ;
// Timeout per funzione select { secondi, millisecondi}
// La funzione select attende che il socket sia pronto per lettura e scrittura
// Resituisce :
// SOCKET_ERROR in caso di errore
// 0 se scade il timeout
// > 0 se ci sono eventi da gestire
timeval timeout = {0, 100000 } ; // 100ms
// Ciclo
while ( bLoopRunning) {
// Se contesto non valido, interrompo il ciclo
if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING)) {
LOG_INFO ( GetCmdLogger(), ( string{ "Redis event loop terminated for Async "} +
( bIsPub ? "Pub" : "Sub") + " Events").c_str())
break ;
}
// Recupero file descriptor di lettura e scrittura
FD_ZERO( &rfds) ;
FD_ZERO( &wfds) ;
FD_SET( sock, &rfds) ;
FD_SET( sock, &wfds) ;
// Controllo il risultato ottenuto
int nRet = select( 0, &rfds, &wfds, nullptr, &timeout) ;
if ( nRet == SOCKET_ERROR) {
// Se errore -> interruzione del ciclo
LOG_INFO( GetCmdLogger(), ( string{ "Error : Socket select() failed -> "} +
ToString( WSAGetLastError())).c_str()) ;
break ;
}
else if ( nRet > 0) {
// Se Socket pronto per lettura/ scrittua
if ( ctx != nullptr && ! ( ctx->c.flags & REDIS_DISCONNECTING) && ctx->c.fd != -1) {
if ( FD_ISSET( sock, &rfds))
redisAsyncHandleRead( ctx) ;
if ( FD_ISSET( sock, &wfds))
redisAsyncHandleWrite( ctx) ;
}
// Evito saturazione CPU
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
// Reset del timeOut ad ogni ciclo
timeout.tv_sec = 0 ;
timeout.tv_usec = 100000 ; // 100ms
}
return ;
}
//----------------------------------------------------------------------------
// Funzione di controllo che la connessione sia attiva e il contesto valido
// ---------------------------------------------------------------------------
static bool
CheckConnectionAndContext( redisAsyncContext* ctx, bool bIsPub)
{
// Se contesto nullo -> Errore
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : Invalid Context for "} +
( bIsPub ? "Pub" : "Sub")).c_str())
return false ;
}
// Se connessione non effettuata -> Errore
if ( bIsPub && ! s_bPubConnected) {
LOG_INFO( GetCmdLogger(), "Error : aSync Pub connection not connected") ;
redisAsyncFree( ctx) ;
ctx = nullptr ;
return false ;
}
if ( ! bIsPub && ! s_bSubConnected) {
LOG_INFO( GetCmdLogger(), "Error : aSync Sub connection not closed") ;
redisAsyncFree( ctx) ;
ctx = nullptr ;
return false ;
}
// Se connessione effettuata ma in stato di errore -> Errore
if ( ctx->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Warning : Closing aSync Connection in error for "} +
( bIsPub ? "Pub" : "Sub")).c_str()) ;
redisAsyncFree( ctx) ;
ctx = nullptr ;
return false ;
}
return true ;
}
//----------------------------------------------------------------------------
// Funzione di CallBack per PUBLISH, SUBSCRIBE e UNSUBSCRIBE
//----------------------------------------------------------------------------
static void
MessageCallback( redisAsyncContext* ctx, void* r, void*)
{
// Verifico che il contesto sia definito
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack")
return ;
}
LOG_INFO( GetCmdLogger(), "RedisMessageCallback invoked")
// Recupero della risposta dal server redis
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Redis reply null in CallBack")
return ;
}
else if ( reply->type == REDIS_REPLY_INTEGER) {
// Recupero il numero di clients iscritti al canale dove ho effettuato una publish
LOG_INFO( GetCmdLogger(), ( string{ "Message published to "} + ToString( reply->integer) + " clients !").c_str()) ;
return ;
}
else if ( reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) {
// Recupero la tipologia di chiamata effettuata
const char* msgType = ( reply->element[0] != nullptr && reply->element[0]->str != nullptr ?
reply->element[0]->str : "") ;
// --- Se Messaggio
if ( strcmp( msgType, "message") == 0 && reply->elements == 3) {
// Messaggio ricevut da un canale
if ( reply->element[1] != nullptr && reply->element[1]->str != nullptr &&
reply->element[2] != nullptr && reply->element[2]->str != nullptr) {
string sChannel = reply->element[1]->str ;
string sMessage = reply->element[2]->str ;
LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " +
sMessage).c_str())
}
else {
LOG_INFO( GetCmdLogger(), "Errror : Invalid Message received")
return ;
}
}
// --- Se Subscribe
else if ( strcmp( msgType, "subscribe") == 0 && reply->elements >= 3) {
if ( reply->element[1] != nullptr && reply->element[1] != nullptr &&
reply->element[2] != nullptr) {
string sChannel = reply->element[1]->str ;
int nCount = reply->element[2]->integer ;
LOG_INFO( GetCmdLogger(), ( string{ "Subscribed to ["} + sChannel + "]," +
" total subscribtions : " + ToString( nCount)).c_str()) ;
}
else {
LOG_INFO( GetCmdLogger(), "Errror : Invalid Subscription reply")
return ;
}
}
// --- se Unsubscribe
else if ( strcmp( msgType, "unsubscribe") == 0 && reply->elements >= 3) {
if ( reply->element[1] != nullptr && reply->element[1] != nullptr &&
reply->element[2] != nullptr) {
string sChannel = reply->element[1]->str ;
int nCount = reply->element[2]->integer ;
LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribed to ["} + sChannel + "]," +
" subscriptions left : " + ToString( nCount)).c_str())
}
else {
LOG_INFO( GetCmdLogger(), "Errror : Invalid Unbubscription reply")
return ;
}
}
// --- Undefined
else {
LOG_INFO( GetCmdLogger(), "Undefined reply in CallBack from Redis")
return ;
}
}
return ;
}
//----------------------------------------------------------------------------
// Funzione di CallBack per WaitForReadisMessage
//----------------------------------------------------------------------------
static void
WaitMessageCallback( redisAsyncContext* ctx, void* r, void*)
{
// Verifico che il contesto sia definito
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack")
return ;
}
LOG_INFO( GetCmdLogger(), "RedisMessageCallback invoked")
// Recupero della risposta dal server redis
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Redis reply null in CallBack")
return ;
}
else if ( reply->type != REDIS_REPLY_ARRAY || reply->elements < 3)
LOG_INFO( GetCmdLogger(), "Error : Invalid Message CallBack reply")
else {
// Recupero il Messaggio ( se callBack riferita al tipo "message")
if ( reply->element[0] == nullptr || reply->element[0]->str == nullptr)
LOG_INFO( GetCmdLogger(), "Error : Null message type received")
else {
const char* msgType = reply->element[0]->str ;
if ( strcmp( msgType, "message") == 0) {
if ( reply->element[2] == nullptr || reply->element[2]->str == nullptr)
LOG_INFO( GetCmdLogger(), "Error : Null message received")
else {
s_sMessage = reply->element[2]->str ;
s_bMessage = true ;
}
}
}
}
}
//----------------------------------------------------------------------------
// Funzione per Memorizzare il DataBase Redis da selezionare [Callback]
//----------------------------------------------------------------------------
static bool
SetPendingDataBase( int nDB)
{
// I DataBase su Redis sono al più 15
if ( REDIS_MIN_DB <= nDB && nDB <= REDIS_MAX_DB) {
s_nPendingDataBase = nDB ;
return true ;
}
return false ;
}
//----------------------------------------------------------------------------
// Funzione per Ricavare il numero di DataBase Redis da selezionare [Callback]
//----------------------------------------------------------------------------
static int
GetPendingDataBase()
{
return s_nPendingDataBase ;
}
// ---------------------------------------------------------------------------
// Funzione per Memorizzare l'utente di autenticazione [Callback]
// ---------------------------------------------------------------------------
static void
SetPendingUser( string sUsr)
{
s_sUser = sUsr ;
}
// --------------------------------------------------------------------------
// Funzione per ottenere l'utente di autenticazione [Callback]
// --------------------------------------------------------------------------
static string
GetPendingUser()
{
return s_sUser ;
}
// ---------------------------------------------------------------------------
// Funzione per Memorizzare la Password di autenticazione [Callback]
// ---------------------------------------------------------------------------
static void
SetPendingPassword( string sPsw)
{
s_sPassword = sPsw ;
}
// --------------------------------------------------------------------------
// Funzione per ottenere la Password di autenticazione [Callback]
// --------------------------------------------------------------------------
static string
GetPendingPassword()
{
return s_sPassword ;
}
//----------------------------------------------------------------------------
static bool
GetConnectionContext( redisAsyncContext*& ctx, const string& sHost, int nPort)
{
ctx = redisAsyncConnect( sHost.c_str(), nPort) ;
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate Redis Async context")
return false ;
}
if ( ctx->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + ctx->errstr).c_str())
redisAsyncFree( ctx) ;
ctx = nullptr ;
WSACleanup() ;
return false ;
}
return true ;
}
//----------------------------------------------------------------------------
static void
CleanupWinsock( void)
{
// entrambi i contesti devono essere nulli
if ( s_pRedisAsyncPubContext == nullptr && s_pRedisAsyncSubContext == nullptr) {
// entrambi i loop devono essere terminati
if ( ! s_bPubLoopRunning && ! s_bSubLoopRunning) {
WSACleanup() ;
LOG_INFO( GetCmdLogger(), "Winsock cleaned up") ;
}
}
}
//----------------------------------------------------------------------------
// Funzione di Connessione asincrona a Redis
//----------------------------------------------------------------------------
bool
ExeRedisAsyncConnect( const string& sConnection)
{
// Flag di connessione
s_bPubConnected = false ;
s_bSubConnected = false ;
// Recupero i parametri dalla stringa di connessione
RedisConnectionInfo ConnectionInfo ;
if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo))
return false ;
// Se connesione già presente, termino e creo la nuova
if ( s_pRedisAsyncPubContext != nullptr || s_pRedisAsyncSubContext != nullptr)
ExeRedisAsyncDisconnect() ;
// Memorizzo il valore dello User, della Password e del DataBase, serviranno una volta
// che la connessione è effettivamente stabilita e corretta ( -> CallBack di connessione)
SetPendingUser( ConnectionInfo.sUser) ;
SetPendingPassword( ConnectionInfo.sPassword) ;
if ( ! SetPendingDataBase( ConnectionInfo.nDefaultDataBase)) {
LOG_INFO( GetCmdLogger(), "Error : DataBase number must be in range [0,15]") ;
return false ;
}
// Inizializzazione libreria Winsock, necessaria per usare le funzionalità di rete su Windows,
// come socket TCP/IP. Versione utilizzata 2.2
WSADATA wsaData ;
int nStatus = WSAStartup( MAKEWORD( 2, 2), &wsaData) ;
if ( nStatus != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : WSAStartup -> "} + ToString( nStatus)).c_str()) ;
return false ;
}
// Verifico se si tratta di un nodo Sentinella
redisContext* sentinelCtx = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
if ( sentinelCtx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
return false ;
}
else if ( sentinelCtx->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + sentinelCtx->errstr).c_str())
redisFree( sentinelCtx) ;
return false ;
}
redisReply* replySentinel = ( redisReply*)redisCommand( sentinelCtx, "SENTINEL get-master-addr-by-name %s",
ConnectionInfo.sServiceName.c_str()) ;
if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) {
if ( replySentinel != nullptr)
freeReplyObject( replySentinel) ;
}
else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr &&
replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) {
// --- Nodo sentinella
ConnectionInfo.sHost = replySentinel->element[0]->str ;
ConnectionInfo.nPort = DEFAULT_SENTINEL_MASTER_PORT ;
FromString( replySentinel->element[1]->str, ConnectionInfo.nPort) ;
freeReplyObject( replySentinel) ;
redisFree( sentinelCtx) ;
}
// Definisco le 2 connessioni asincroni con Host e Porta ( per PUBLISH e SUBSCRIBE/UNSUBSCRIBE)
if ( ! GetConnectionContext( s_pRedisAsyncPubContext, ConnectionInfo.sHost, ConnectionInfo.nPort) ||
! GetConnectionContext( s_pRedisAsyncSubContext, ConnectionInfo.sHost, ConnectionInfo.nPort))
return false ;
LOG_INFO( GetCmdLogger(), "Connected to Redis ! ( Pub/Sub)")
// Imposto e Definisco la funzione di CallBack per contesto PUB per Connessione asincrona
redisAsyncSetConnectCallback(
s_pRedisAsyncPubContext,
[]( const redisAsyncContext* ctx, int nStatus) {
// --- Se contesto nullo, errore
if ( ctx == nullptr || nStatus != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack Pub failed ") ;
return ;
}
// --- Se Autenticazione richiesta -> AUTH e SELECT
string sPassword = GetPendingPassword() ;
if ( ! sPassword.empty()) {
redisAsyncCommand(
s_pRedisAsyncPubContext,
[]( redisAsyncContext* ctx, void* r, void*) {
// Invio la richiesta
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null asnwer" ;
LOG_INFO( GetCmdLogger(), ( string{ " Error : Authentication -> "} + sErrMsg).c_str())
return ;
}
LOG_INFO( GetCmdLogger(), "Valid Authentication for Pub connection")
redisAsyncCommand(
ctx,
[]( redisAsyncContext* ctx, void* r, void*) {
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ;
return ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) ;
s_bPubConnected = true ;
},
nullptr, "SELECT %d", GetPendingDataBase()
) ;
},
nullptr, "AUTH %s %s", GetPendingUser().c_str(), GetPendingPassword().c_str()
) ;
}
// Se nessuna autenticazione -> SELECT
else {
redisAsyncCommand(
s_pRedisAsyncPubContext,
[]( redisAsyncContext* ctx, void* r, void*) {
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ;
return ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str())
s_bPubConnected = true ;
},
nullptr, "SELECT %d", GetPendingDataBase()
) ;
}
}
) ;
// Imposto e Definisco la funzione di CallBack per contesto SUB per Connessione asincrona
redisAsyncSetConnectCallback(
s_pRedisAsyncSubContext,
[]( const redisAsyncContext* ctx, int nStatus) {
// --- Se contesto nullo, errore
if ( ctx == nullptr || nStatus != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack Pub failed ") ;
return ;
}
// --- Se Autenticazione richiesta -> AUTH e SELECT
string sPassword = GetPendingPassword() ;
if ( ! sPassword.empty()) {
redisAsyncCommand(
s_pRedisAsyncSubContext,
[]( redisAsyncContext* ctx, void* r, void*) {
// Invio la richiesta
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null asnwer" ;
LOG_INFO( GetCmdLogger(), ( string{ " Error : Authentication -> "} + sErrMsg).c_str())
return ;
}
LOG_INFO( GetCmdLogger(), "Valid Authentication for Pub connection")
redisAsyncCommand(
ctx,
[]( redisAsyncContext* ctx, void* r, void*) {
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ;
return ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) ;
s_bSubConnected = true ;
},
nullptr, "SELECT %d", GetPendingDataBase()
) ;
},
nullptr, "AUTH %s %s", GetPendingUser().c_str(), GetPendingPassword().c_str()
) ;
}
// Se nessuna autenticazione -> SELECT
else {
redisAsyncCommand(
s_pRedisAsyncSubContext,
[]( redisAsyncContext* ctx, void* r, void*) {
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ;
return ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str())
s_bSubConnected = true ;
},
nullptr, "SELECT %d", GetPendingDataBase()
) ;
}
}
) ;
// Imposto e Definisco la funzione di CallBack per la Disconnessione Pub asincrona
redisAsyncSetDisconnectCallback(
s_pRedisAsyncPubContext,
[]( const redisAsyncContext* ctx, int status) {
LOG_INFO( GetCmdLogger(), "Disconnected from Redis ( Pub )") ;
s_bPubLoopRunning = false ;
s_pRedisAsyncPubContext = nullptr ;
this_thread::sleep_for( chrono::milliseconds( 50)) ; // per sicurezza
CleanupWinsock() ;
}
) ;
// Imposto e Definisco la funzione di CallBack per la Disconnessione Sub asincrona
redisAsyncSetDisconnectCallback(
s_pRedisAsyncSubContext,
[]( const redisAsyncContext* ctx, int status) {
LOG_INFO( GetCmdLogger(), "Disconnected from Redis ( Sub )") ;
s_pRedisAsyncSubContext = nullptr ;
s_bSubLoopRunning = false ;
this_thread::sleep_for( chrono::milliseconds( 50)) ; // per sicurezza
CleanupWinsock() ;
}
) ;
// Definizione e partenza del ciclo degli eventi in un thread separato ( in BackGround)
s_bPubLoopRunning = true ;
s_bSubLoopRunning = true ;
thread tPubEventLoop( RedisEventLoop, s_pRedisAsyncPubContext, true) ;
thread tSubEventLoop( RedisEventLoop, s_pRedisAsyncSubContext, false) ;
tPubEventLoop.detach() ;
tSubEventLoop.detach() ;
// Se richiesto massimo tempo di connessione, aspetto
if ( ConnectionInfo.nAsyncTimeout > 0.) {
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
while ( ! s_bSubConnected && ! s_bPubConnected) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Error in Connection : TimeOut exceeded")
return false ;
}
}
auto tElapsed = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{ "Connected in "} + ToString( tElapsed) + " ms").c_str())
}
return true ;
}
//----------------------------------------------------------------------------
// Funzione di Disconnessione asincrona a Redis
//----------------------------------------------------------------------------
bool
ExeRedisAsyncDisconnect( void)
{
// Controllo che la connessione sia valida e il contesto ben definito
bool bOkPub = CheckConnectionAndContext( s_pRedisAsyncPubContext, true) ;
bool bOkSub = CheckConnectionAndContext( s_pRedisAsyncSubContext, false) ;
if ( ! bOkSub || ! bOkPub)
return false ;
// Effettuo la Disconnesione asincrona
redisAsyncDisconnect( s_pRedisAsyncPubContext) ; // libera la memoria in automatico
redisAsyncDisconnect( s_pRedisAsyncSubContext) ; // libera la memoria in automatico
LOG_INFO( GetCmdLogger(), "Async Connection Closed !") ;
return true ;
}
//----------------------------------------------------------------------------
// Funzione di Publish asincrona a Redis
//----------------------------------------------------------------------------
bool
ExeRedisAsyncPublish( const string& sChannel, const string& sMessage)
{
// Controllo che la connessione Pub sia valida e il contesto Pub ben definito
if ( ! CheckConnectionAndContext( s_pRedisAsyncPubContext, true))
return false ;
// Se non ho alcun canale -> errore
if ( sChannel.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
return false ;
}
// Se non ho alcun messaggio -> warning
if ( sMessage.empty())
LOG_INFO( GetCmdLogger(), "Warning : Empty Message") ;
// Eseguo il comando PUBLISH
if ( redisAsyncCommand( s_pRedisAsyncPubContext, MessageCallback, nullptr,
"PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error: redisAsyncCommand PUBLISH failed")
return false ;
}
return true ;
}
//----------------------------------------------------------------------------
// Funzione di Subscribe asincrona a Redis
//----------------------------------------------------------------------------
bool
ExeRedisAsyncSubscribe( const string& sChannel)
{
// Controllo che la connessione sia valida e il contesto ben definito
if ( ! CheckConnectionAndContext( s_pRedisAsyncSubContext, false))
return false ;
// Se non ho alcun canale, errore
if ( sChannel.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") ;
return false ;
}
// Eseguo il comando Subscribe
if ( redisAsyncCommand( s_pRedisAsyncSubContext, MessageCallback, nullptr,
( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand SUBSCRIBE failed")
return false ;
}
return true ;
}
//----------------------------------------------------------------------------
// Funzione di Unsubscribe asincrona a Redis
//----------------------------------------------------------------------------
bool
ExeRedisAsyncUnsubscribe( const string& sChannel)
{
// Controllo che la connessione sia valida e il contesto ben definito
if ( ! CheckConnectionAndContext( s_pRedisAsyncSubContext, false))
return false ;
// Se non ho alcun canale, errore
if ( sChannel.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
return false ;
}
// Eseguo il comando Unsubscribe
if ( redisAsyncCommand( s_pRedisAsyncSubContext, MessageCallback, nullptr,
( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed")
return false ;
}
return true ;
}
//----------------------------------------------------------------------------
// Funzione di che aspetta un messaggio asincro su un canale Redis
//----------------------------------------------------------------------------
// La funzione esegue una Subscribe e una Unsubscribe
// Se entro il tempo stabilito riceve un messaggio, lo comunica, altrimenti errore
bool
ExeRedisAsyncSubscribeOneMessage( const string& sChannel, double dMaxTimeOut, string& sMessage)
{
// Controllo che la connessione sia valida e il contesto ben definito
if ( ! CheckConnectionAndContext( s_pRedisAsyncPubContext, true))
return false ;
// Se non ho alcun canale, errore
if ( sChannel.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
return false ;
}
// Controllo validità tempo di TimeOut
if ( dMaxTimeOut <= 0.) {
LOG_INFO( GetCmdLogger(), "Error : Invalid timeout value") ;
return false ;
}
// flag per ricezione del messaggio
s_bMessage = false ;
s_sMessage.clear() ;
// Invio del comando di SubScribe
if ( redisAsyncCommand( s_pRedisAsyncSubContext, WaitMessageCallback, nullptr,
( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to subscribe to"} + sChannel).c_str())
return false ;
}
// Attivo il TimeOut per l'attesa del messaggio
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( dMaxTimeOut)) ;
while ( ! s_bMessage) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), ( string{ "Timeout : No Message received on ["} + sChannel + "]").c_str())
return false ;
}
}
// Invio del comando Unsubscribe
if ( redisAsyncCommand( s_pRedisAsyncSubContext, WaitMessageCallback, nullptr,
( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to Unsubscribe to "} + sChannel).c_str())
return false ;
}
// Controllo se ho ricevuto un messaggio
if ( s_bMessage) {
sMessage = s_sMessage ;
LOG_INFO( GetCmdLogger(), ( string{ "Message received on ["} + sChannel + "]: " + sMessage).c_str())
return true ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Timeout: No message received on ["} + sChannel + "]").c_str()) ;
return false ;
}