//---------------------------------------------------------------------------- // EgalTech 2025-2025 //---------------------------------------------------------------------------- // File : EXE_Redis.cpp Data : 17.09.25 Versione : 2.7i3 // Contenuto : Funzioni per interfacciarsi con server Redis. // // // // Modifiche : 17.09.25 RE Creazione modulo. // // //---------------------------------------------------------------------------- //--------------------------- Include ---------------------------------------- #include #include #include "stdafx.h" #include "EXE.h" #include "EXE_Macro.h" #include "AuxTools.h" #include "/EgtDev/Include/EXeExecutor.h" #include "/EgtDev/Include/EGkStringUtils3d.h" #include "/EgtDev/Include/EGnStringKeyVal.h" #pragma warning( disable: 4244) // conversione da 'uint64_t' a 'size_t' #pragma warning( disable: 4200) // utilizzata estensione non standard: matrice di dimensioni zero in struct/union #include "/EgtDev/Extern/hiredis/Include/async.h" #include #include #include #include #include using namespace std ; // --------------------------------------------------------------------------- // Definizione variabili generali Redis // --------------------------------------------------------------------------- static const int REDIS_MIN_DB = 0 ; static const int REDIS_MAX_DB = 15 ; static const int DEFAULT_PORT = 6379 ; static const int DEFAULT_SENTINEL_MASTER_PORT = 26379 ; static const string DEFAULT_USER = "default" ; static const int DEFAULT_DATABASE = 0 ; static const int DEFAULT_TIMEOUT = 15000 ; // [ms] static const int DEFAULT_ALIVE_TIME = 180 ; // [s] // Contesto Sincrono static redisContext* s_pRedisContext = nullptr ; // Contesto Asincrono static redisAsyncContext* s_pRedisAsyncSubContext = nullptr ; static redisAsyncContext* s_pRedisAsyncPubContext = nullptr ; static atomic s_nPendingDataBase = REDIS_MIN_DB ; // Default static string s_sPassword = "" ; // Default static string s_sUser = DEFAULT_USER ; // Default static atomic s_bSubConnected = false ; static atomic s_bPubConnected = false ; static atomic s_bPubLoopRunning = false ; static atomic s_bSubLoopRunning = false ; static atomic s_bMessage = false ; static string s_sMessage ; // Tipo di valore recuperabile mediante una chiave di tipo string static const string KEY_TYPE_STRING = "string" ; static const string KEY_TYPE_LIST = "list" ; // non usato static const string KEY_TYPE_SET = "set" ; // non usato static const string KEY_TYPE_HASH = "hash" ; // non usato static const string KEY_TYPE_ZSET = "zset" ; // non usato static const string KEY_TYPE_JSON = "ReJSON-RL" ; // non usato, disponibile se versione >= 8 ( noi usiamo la 6) // struttura per parametri della stringa di Connessione struct RedisConnectionInfo { string sHost, sServiceName, sUser, sPassword ; int nPort, nDefaultDataBase, nKeepAlive, nConnectTimeout, nSyncTimeout, nAsyncTimeout ; bool bAbortConnect, bSsl, bAllowAdmin ; RedisConnectionInfo() { sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ; nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ; nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ; bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ; } ; RedisConnectionInfo( string sMyHost, string sMyServiceName, string sMyUser, string sMyPassword, int nMyPort, int nMyDefaultDataBase, int dMyKeepAlive, int dMyConnectTimeout, int dMySyncTimeout, int dMyAsyncTimeout, bool bMyAbortConnect, bool bMySsl, bool bMyAllowAdmin) : sHost( sMyHost), sServiceName( sMyServiceName), sUser( sMyUser), sPassword( sMyPassword), nPort( nMyPort), nDefaultDataBase( nMyDefaultDataBase), nKeepAlive( dMyKeepAlive), nConnectTimeout( dMyConnectTimeout), nSyncTimeout( dMySyncTimeout), nAsyncTimeout( dMyAsyncTimeout), bAbortConnect( bMyAbortConnect), bSsl( bMySsl), bAllowAdmin( bMyAllowAdmin) {} } ; // --------------------------------------------------------------------------- // -------- Funzione di interpretazione della stringa di connessione --------- // --------------------------------------------------------------------------- static bool GetParamsFromConnectionString( const string& sConnection, RedisConnectionInfo& ConnectionInfo) { // Se la stringa di connessione è vuota, errore if ( sConnection.empty()) { LOG_INFO( GetCmdLogger(), "Error : Empty Connection String") ; return false ; } // Ricerco le informazioni Splittando la stringa di connessione per "," string sHostPort, sParams ; SplitFirst( sConnection, ",", sHostPort, sParams) ; if ( ! sHostPort.empty()) { // Host e Porta string sHost, sPort ; SplitFirst( sHostPort, ":", sHost, sPort) ; if ( sHost.empty()) { LOG_INFO( GetCmdLogger(), "Error : Invalid Host") ; return false ; } int nPort = DEFAULT_PORT ; if ( ! sPort.empty()) { if ( ! FromString( sPort, nPort)) { LOG_INFO( GetCmdLogger(), "Error : Invalid Port") ; return false ; } } ConnectionInfo.sHost = sHost ; ConnectionInfo.nPort = nPort ; } // Recupero i parametri ( rimuovo tutti gli spazi ) sParams.erase( remove( sParams.begin(), sParams.end(), ' '), sParams.end()) ; GetValInNotes( sParams, "serviceName", ",", ConnectionInfo.sServiceName) ; GetValInNotes( sParams, "user", ",", ConnectionInfo.sUser) ; GetValInNotes( sParams, "password", ",", ConnectionInfo.sPassword) ; GetValInNotes( sParams, "DefaultDatabase", ",", ConnectionInfo.nDefaultDataBase) ; GetValInNotes( sParams, "keepAlive", ",", ConnectionInfo.nKeepAlive) ; GetValInNotes( sParams, "connectTimeout", ",", ConnectionInfo.nConnectTimeout) ; GetValInNotes( sParams, "syncTimeout", ",", ConnectionInfo.nSyncTimeout) ; GetValInNotes( sParams, "asyncTimeout", ",", ConnectionInfo.nAsyncTimeout) ; GetValInNotes( sParams, "abortConnect", ",", ConnectionInfo.bAbortConnect) ; GetValInNotes( sParams, "ssl", "," ,ConnectionInfo.bSsl) ; GetValInNotes( sParams, "allowAdmin", ",", ConnectionInfo.bAllowAdmin) ; return true ; } // --------------------------------------------------------------------------- // ------------------------------ Sync --------------------------------------- //---------------------------------------------------------------------------- //---------------------------------------------------------------------------- // Funzione di Connessione sincrona a Redis //---------------------------------------------------------------------------- bool ExeRedisConnect( const string& sConnection) { // Se connesione già presente, termino e creo la nuova if ( s_pRedisContext != nullptr) redisFree( s_pRedisContext) ; // Recupero i parametri dalla stringa di connessione RedisConnectionInfo ConnectionInfo ; if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) return false ; // Se connessione senza TimeOut if ( ConnectionInfo.nSyncTimeout <= 0) { // Imposto il contesto per connesione sincrona s_pRedisContext = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; if ( s_pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } else if ( s_pRedisContext->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str()) redisFree( s_pRedisContext) ; return false ; } } // Se connessione con parametri else { // TimeOut per connessione struct timeval TimeOutConnection{} ; TimeOutConnection.tv_sec = static_cast( ConnectionInfo.nSyncTimeout) ; TimeOutConnection.tv_usec = static_cast( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ; // Imposto il contesto per connesione sincrona con TimeOut s_pRedisContext = redisConnectWithTimeout( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort, TimeOutConnection) ; if ( s_pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } else if ( s_pRedisContext->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str()) redisFree( s_pRedisContext) ; return false ; } // TimeOut per richiesta I/O struct timeval TimeOutIO{} ; TimeOutIO.tv_sec = static_cast( ConnectionInfo.nSyncTimeout) ; TimeOutIO.tv_usec = static_cast( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ; // Imposto TimeOut per I/O int nResult = redisSetTimeout( s_pRedisContext, TimeOutIO) ; if ( nResult != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : Can't set timeout I/O") ; return false ; } } LOG_INFO( GetCmdLogger(), "Sync connection to redis !") // Verifico se la connessione è di tipo Sentinel redisReply* replySentinel = ( redisReply*)redisCommand( s_pRedisContext, "SENTINEL get-master-addr-by-name %s", ConnectionInfo.sServiceName.c_str()) ; if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) { if ( replySentinel != nullptr) freeReplyObject( replySentinel) ; } else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr && replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) { // --- Nodo sentinella string sMasterHost = replySentinel->element[0]->str ; int nMasterPort = DEFAULT_SENTINEL_MASTER_PORT ; FromString( replySentinel->element[1]->str, nMasterPort) ; freeReplyObject( replySentinel) ; // Effettuo la connessione al master redis s_pRedisContext = redisConnect( sMasterHost.c_str(), nMasterPort) ; if ( s_pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } else if ( s_pRedisContext->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str()) redisFree( s_pRedisContext) ; return false ; } } // Verifico se richiesta autenticazione if ( ! ConnectionInfo.sPassword.empty()) { redisReply* reply = ( redisReply*)redisCommand( s_pRedisContext, "AUTH %s %s", ConnectionInfo.sUser.c_str(), ConnectionInfo.sPassword.c_str()) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { LOG_INFO(GetCmdLogger(), "Error : Authentication failed") ; if ( reply == nullptr) freeReplyObject( reply) ; redisFree( s_pRedisContext) ; return false ; } freeReplyObject( reply) ; } // Seleziono il DataBase redisReply* reply = ( redisReply*)redisCommand( s_pRedisContext, "SELECT %d", ConnectionInfo.nDefaultDataBase) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str()) ; return false ; } else if ( reply->type == REDIS_REPLY_ERROR) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + reply->str).c_str()) freeReplyObject( reply) ; return false ; } LOG_INFO( GetCmdLogger(), ( string{ "Connected to DB #"} + ToString( ConnectionInfo.nDefaultDataBase) + " !").c_str()) freeReplyObject( reply) ; return true ; } //---------------------------------------------------------------------------- // Funzione di Disconnessione sincrona a Redis //---------------------------------------------------------------------------- bool ExeRedisDisconnect( void) { // Se connessione non presente if ( s_pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Closing a Sync Connection never created") ; return false ; } // Se connessione in stato di errore if ( s_pRedisContext->err != 0) { LOG_INFO( GetCmdLogger(), "Warning : Closing Redis Sync Connection in error") ; redisFree( s_pRedisContext) ; s_pRedisContext = nullptr ; return false ; } // Effettuo disconnessione sincrona redisFree( s_pRedisContext) ; s_pRedisContext = nullptr ; LOG_INFO( GetCmdLogger(), "Sync Connection Closed !") ; return true ; } //---------------------------------------------------------------------------- // Funzione di scrittura sincrona chiave-valore //---------------------------------------------------------------------------- bool ExeRedisSetValFromKey( const string& sKey, const string& sVal) { // Se connesione non presente, errore if ( s_pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Connection") ; return false ; } // Se connessione in stato di errore, allora errore if ( s_pRedisContext->err != 0) { LOG_INFO( GetCmdLogger(), "Error : Invalid Connection") ; return false ; } // Il valore associato alla chiave può essere solo di tipo stringa redisReply* reply = ( redisReply*)redisCommand( s_pRedisContext, "SET %s %s", sKey.c_str(), sVal.c_str()) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Null reply") ; return false ; } else if ( reply->type != REDIS_REPLY_STATUS || string( reply->str ) != "OK") { LOG_INFO( GetCmdLogger(), "Error SET : No answer") ; freeReplyObject( reply) ; return false ; } LOG_INFO( GetCmdLogger(), ( string{ "New Key Added : ["} + sKey + ":" + sVal + "]").c_str()) freeReplyObject( reply) ; return true ; } //---------------------------------------------------------------------------- // Funzione di lettura sincrona per valore di una chiave //---------------------------------------------------------------------------- bool ExeRedisGetValFromKey( const string& sKey, string& sVal) { sVal.clear() ; // Se connesione non presente, errore if ( s_pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Connection") ; return false ; } // Se connessione in stato di errore, allora errore if ( s_pRedisContext->err != 0) { LOG_INFO( GetCmdLogger(), "Error : Invalid Connection") ; return false ; } // Recupero il tipo associato alla chiave da Redis redisReply* typeReply = ( redisReply*)redisCommand( s_pRedisContext, "TYPE %s", sKey.c_str()) ; if ( typeReply == nullptr || typeReply->type != REDIS_REPLY_STATUS) { string sErr = ( typeReply != nullptr ? string( typeReply->str) : "null reply") ; if ( typeReply == nullptr) freeReplyObject( typeReply) ; LOG_INFO( GetCmdLogger(), ( "Error : Failed to get key type, " + sErr).c_str()) ; return false ; } string sType = typeReply->str ; freeReplyObject( typeReply) ; // Determino il formato per la chiamata al DataBase Redis // Per ora accettate solamente valori string string sFormat = ( sType == KEY_TYPE_STRING ? "Get %s" : "") ; if ( sFormat.empty()) { LOG_ERROR( GetCmdLogger(), "Error : Not valid Key Type") ; return false ; } // Effettuo la chiamata redisReply* reply = nullptr ; reply = ( redisReply*)redisCommand( s_pRedisContext, sFormat.c_str(), sKey.c_str()) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Null reply") ; return false ; } // Recupero il Valore if ( sType == KEY_TYPE_STRING) { if ( reply->type != REDIS_REPLY_STRING) { LOG_INFO( GetCmdLogger(), "Error : Failed to read the value") ; freeReplyObject( reply) ; return false ; } sVal = string( reply->str) ; } freeReplyObject( reply) ; return true ; } // --------------------------------------------------------------------------- // ------------------------------ ASync -------------------------------------- //---------------------------------------------------------------------------- //---------------------------------------------------------------------------- // Funzione Ciclo degli eventi per chiamate CallBack //---------------------------------------------------------------------------- static void RedisEventLoop( redisAsyncContext* ctx, bool bIsPub) { // Verifica della validità del contesto if ( ctx == nullptr || ctx->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error: Invalid Redis context at start of event loop for " } + ( bIsPub ? "Pub" : "Sub")).c_str()) return ; } // Recupero del file descriptor del socket TCP usato da Redis. SOCKET sock = ctx->c.fd ; // Imposto condizione di ascolto degli eventi bool bLoopRunning = bIsPub ? s_bPubLoopRunning : s_bSubLoopRunning ; // Creazione dei due insiemi per il file descriptor ( lettura e scrittura ) // r = read, w = write fd_set rfds, wfds ; // Timeout per funzione select { secondi, millisecondi} // La funzione select attende che il socket sia pronto per lettura e scrittura // Resituisce : // SOCKET_ERROR in caso di errore // 0 se scade il timeout // > 0 se ci sono eventi da gestire timeval timeout = {0, 100000 } ; // 100ms // Ciclo while ( bLoopRunning) { // Se contesto non valido, interrompo il ciclo if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING)) { LOG_INFO ( GetCmdLogger(), ( string{ "Redis event loop terminated for Async "} + ( bIsPub ? "Pub" : "Sub") + " Events").c_str()) break ; } // Recupero file descriptor di lettura e scrittura FD_ZERO( &rfds) ; FD_ZERO( &wfds) ; FD_SET( sock, &rfds) ; FD_SET( sock, &wfds) ; // Controllo il risultato ottenuto int nRet = select( 0, &rfds, &wfds, nullptr, &timeout) ; if ( nRet == SOCKET_ERROR) { // Se errore -> interruzione del ciclo LOG_INFO( GetCmdLogger(), ( string{ "Error : Socket select() failed -> "} + ToString( WSAGetLastError())).c_str()) ; break ; } else if ( nRet > 0) { // Se Socket pronto per lettura/ scrittua if ( ctx != nullptr && ! ( ctx->c.flags & REDIS_DISCONNECTING) && ctx->c.fd != -1) { if ( FD_ISSET( sock, &rfds)) redisAsyncHandleRead( ctx) ; if ( FD_ISSET( sock, &wfds)) redisAsyncHandleWrite( ctx) ; } // Evito saturazione CPU this_thread::sleep_for( chrono::milliseconds( 10)) ; } // Reset del timeOut ad ogni ciclo timeout.tv_sec = 0 ; timeout.tv_usec = 100000 ; // 100ms } return ; } //---------------------------------------------------------------------------- // Funzione di controllo che la connessione sia attiva e il contesto valido // --------------------------------------------------------------------------- static bool CheckConnectionAndContext( redisAsyncContext* ctx, bool bIsPub) { // Se contesto nullo -> Errore if ( ctx == nullptr) { LOG_INFO( GetCmdLogger(), ( string{ "Error : Invalid Context for "} + ( bIsPub ? "Pub" : "Sub")).c_str()) return false ; } // Se connessione non effettuata -> Errore if ( bIsPub && ! s_bPubConnected) { LOG_INFO( GetCmdLogger(), "Error : aSync Pub connection not connected") ; redisAsyncFree( ctx) ; ctx = nullptr ; return false ; } if ( ! bIsPub && ! s_bSubConnected) { LOG_INFO( GetCmdLogger(), "Error : aSync Sub connection not closed") ; redisAsyncFree( ctx) ; ctx = nullptr ; return false ; } // Se connessione effettuata ma in stato di errore -> Errore if ( ctx->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Warning : Closing aSync Connection in error for "} + ( bIsPub ? "Pub" : "Sub")).c_str()) ; redisAsyncFree( ctx) ; ctx = nullptr ; return false ; } return true ; } //---------------------------------------------------------------------------- // Funzione di CallBack per PUBLISH, SUBSCRIBE e UNSUBSCRIBE //---------------------------------------------------------------------------- static void MessageCallback( redisAsyncContext* ctx, void* r, void*) { // Verifico che il contesto sia definito if ( ctx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack") return ; } LOG_INFO( GetCmdLogger(), "RedisMessageCallback invoked") // Recupero della risposta dal server redis redisReply* reply = static_cast( r) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Redis reply null in CallBack") return ; } else if ( reply->type == REDIS_REPLY_INTEGER) { // Recupero il numero di clients iscritti al canale dove ho effettuato una publish LOG_INFO( GetCmdLogger(), ( string{ "Message published to "} + ToString( reply->integer) + " clients !").c_str()) ; return ; } else if ( reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) { // Recupero la tipologia di chiamata effettuata const char* msgType = ( reply->element[0] != nullptr && reply->element[0]->str != nullptr ? reply->element[0]->str : "") ; // --- Se Messaggio if ( strcmp( msgType, "message") == 0 && reply->elements == 3) { // Messaggio ricevut da un canale if ( reply->element[1] != nullptr && reply->element[1]->str != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) { string sChannel = reply->element[1]->str ; string sMessage = reply->element[2]->str ; LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " + sMessage).c_str()) } else { LOG_INFO( GetCmdLogger(), "Errror : Invalid Message received") return ; } } // --- Se Subscribe else if ( strcmp( msgType, "subscribe") == 0 && reply->elements >= 3) { if ( reply->element[1] != nullptr && reply->element[1] != nullptr && reply->element[2] != nullptr) { string sChannel = reply->element[1]->str ; int nCount = reply->element[2]->integer ; LOG_INFO( GetCmdLogger(), ( string{ "Subscribed to ["} + sChannel + "]," + " total subscribtions : " + ToString( nCount)).c_str()) ; } else { LOG_INFO( GetCmdLogger(), "Errror : Invalid Subscription reply") return ; } } // --- se Unsubscribe else if ( strcmp( msgType, "unsubscribe") == 0 && reply->elements >= 3) { if ( reply->element[1] != nullptr && reply->element[1] != nullptr && reply->element[2] != nullptr) { string sChannel = reply->element[1]->str ; int nCount = reply->element[2]->integer ; LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribed to ["} + sChannel + "]," + " subscriptions left : " + ToString( nCount)).c_str()) } else { LOG_INFO( GetCmdLogger(), "Errror : Invalid Unbubscription reply") return ; } } // --- Undefined else { LOG_INFO( GetCmdLogger(), "Undefined reply in CallBack from Redis") return ; } } return ; } //---------------------------------------------------------------------------- // Funzione di CallBack per WaitForReadisMessage //---------------------------------------------------------------------------- static void WaitMessageCallback( redisAsyncContext* ctx, void* r, void*) { // Verifico che il contesto sia definito if ( ctx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack") return ; } LOG_INFO( GetCmdLogger(), "RedisMessageCallback invoked") // Recupero della risposta dal server redis redisReply* reply = static_cast( r) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Redis reply null in CallBack") return ; } else if ( reply->type != REDIS_REPLY_ARRAY || reply->elements < 3) LOG_INFO( GetCmdLogger(), "Error : Invalid Message CallBack reply") else { // Recupero il Messaggio ( se callBack riferita al tipo "message") if ( reply->element[0] == nullptr || reply->element[0]->str == nullptr) LOG_INFO( GetCmdLogger(), "Error : Null message type received") else { const char* msgType = reply->element[0]->str ; if ( strcmp( msgType, "message") == 0) { if ( reply->element[2] == nullptr || reply->element[2]->str == nullptr) LOG_INFO( GetCmdLogger(), "Error : Null message received") else { s_sMessage = reply->element[2]->str ; s_bMessage = true ; } } } } } //---------------------------------------------------------------------------- // Funzione per Memorizzare il DataBase Redis da selezionare [Callback] //---------------------------------------------------------------------------- static bool SetPendingDataBase( int nDB) { // I DataBase su Redis sono al più 15 if ( REDIS_MIN_DB <= nDB && nDB <= REDIS_MAX_DB) { s_nPendingDataBase = nDB ; return true ; } return false ; } //---------------------------------------------------------------------------- // Funzione per Ricavare il numero di DataBase Redis da selezionare [Callback] //---------------------------------------------------------------------------- static int GetPendingDataBase() { return s_nPendingDataBase ; } // --------------------------------------------------------------------------- // Funzione per Memorizzare l'utente di autenticazione [Callback] // --------------------------------------------------------------------------- static void SetPendingUser( string sUsr) { s_sUser = sUsr ; } // -------------------------------------------------------------------------- // Funzione per ottenere l'utente di autenticazione [Callback] // -------------------------------------------------------------------------- static string GetPendingUser() { return s_sUser ; } // --------------------------------------------------------------------------- // Funzione per Memorizzare la Password di autenticazione [Callback] // --------------------------------------------------------------------------- static void SetPendingPassword( string sPsw) { s_sPassword = sPsw ; } // -------------------------------------------------------------------------- // Funzione per ottenere la Password di autenticazione [Callback] // -------------------------------------------------------------------------- static string GetPendingPassword() { return s_sPassword ; } //---------------------------------------------------------------------------- static bool GetConnectionContext( redisAsyncContext*& ctx, const string& sHost, int nPort) { ctx = redisAsyncConnect( sHost.c_str(), nPort) ; if ( ctx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate Redis Async context") return false ; } if ( ctx->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + ctx->errstr).c_str()) redisAsyncFree( ctx) ; ctx = nullptr ; WSACleanup() ; return false ; } return true ; } //---------------------------------------------------------------------------- static void CleanupWinsock( void) { // entrambi i contesti devono essere nulli if ( s_pRedisAsyncPubContext == nullptr && s_pRedisAsyncSubContext == nullptr) { // entrambi i loop devono essere terminati if ( ! s_bPubLoopRunning && ! s_bSubLoopRunning) { WSACleanup() ; LOG_INFO( GetCmdLogger(), "Winsock cleaned up") ; } } } //---------------------------------------------------------------------------- // Funzione di Connessione asincrona a Redis //---------------------------------------------------------------------------- bool ExeRedisAsyncConnect( const string& sConnection) { // Flag di connessione s_bPubConnected = false ; s_bSubConnected = false ; // Recupero i parametri dalla stringa di connessione RedisConnectionInfo ConnectionInfo ; if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) return false ; // Se connesione già presente, termino e creo la nuova if ( s_pRedisAsyncPubContext != nullptr || s_pRedisAsyncSubContext != nullptr) ExeRedisAsyncDisconnect() ; // Memorizzo il valore dello User, della Password e del DataBase, serviranno una volta // che la connessione è effettivamente stabilita e corretta ( -> CallBack di connessione) SetPendingUser( ConnectionInfo.sUser) ; SetPendingPassword( ConnectionInfo.sPassword) ; if ( ! SetPendingDataBase( ConnectionInfo.nDefaultDataBase)) { LOG_INFO( GetCmdLogger(), "Error : DataBase number must be in range [0,15]") ; return false ; } // Inizializzazione libreria Winsock, necessaria per usare le funzionalità di rete su Windows, // come socket TCP/IP. Versione utilizzata 2.2 WSADATA wsaData ; int nStatus = WSAStartup( MAKEWORD( 2, 2), &wsaData) ; if ( nStatus != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : WSAStartup -> "} + ToString( nStatus)).c_str()) ; return false ; } // Verifico se si tratta di un nodo Sentinella redisContext* sentinelCtx = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; if ( sentinelCtx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } else if ( sentinelCtx->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + sentinelCtx->errstr).c_str()) redisFree( sentinelCtx) ; return false ; } redisReply* replySentinel = ( redisReply*)redisCommand( sentinelCtx, "SENTINEL get-master-addr-by-name %s", ConnectionInfo.sServiceName.c_str()) ; if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) { if ( replySentinel != nullptr) freeReplyObject( replySentinel) ; } else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr && replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) { // --- Nodo sentinella ConnectionInfo.sHost = replySentinel->element[0]->str ; ConnectionInfo.nPort = DEFAULT_SENTINEL_MASTER_PORT ; FromString( replySentinel->element[1]->str, ConnectionInfo.nPort) ; freeReplyObject( replySentinel) ; redisFree( sentinelCtx) ; } // Definisco le 2 connessioni asincroni con Host e Porta ( per PUBLISH e SUBSCRIBE/UNSUBSCRIBE) if ( ! GetConnectionContext( s_pRedisAsyncPubContext, ConnectionInfo.sHost, ConnectionInfo.nPort) || ! GetConnectionContext( s_pRedisAsyncSubContext, ConnectionInfo.sHost, ConnectionInfo.nPort)) return false ; LOG_INFO( GetCmdLogger(), "Connected to Redis ! ( Pub/Sub)") // Imposto e Definisco la funzione di CallBack per contesto PUB per Connessione asincrona redisAsyncSetConnectCallback( s_pRedisAsyncPubContext, []( const redisAsyncContext* ctx, int nStatus) { // --- Se contesto nullo, errore if ( ctx == nullptr || nStatus != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack Pub failed ") ; return ; } // --- Se Autenticazione richiesta -> AUTH e SELECT string sPassword = GetPendingPassword() ; if ( ! sPassword.empty()) { redisAsyncCommand( s_pRedisAsyncPubContext, []( redisAsyncContext* ctx, void* r, void*) { // Invio la richiesta redisReply* reply = static_cast( r) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null asnwer" ; LOG_INFO( GetCmdLogger(), ( string{ " Error : Authentication -> "} + sErrMsg).c_str()) return ; } LOG_INFO( GetCmdLogger(), "Valid Authentication for Pub connection") redisAsyncCommand( ctx, []( redisAsyncContext* ctx, void* r, void*) { redisReply* reply = static_cast( r) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ; return ; } LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) ; s_bPubConnected = true ; }, nullptr, "SELECT %d", GetPendingDataBase() ) ; }, nullptr, "AUTH %s %s", GetPendingUser().c_str(), GetPendingPassword().c_str() ) ; } // Se nessuna autenticazione -> SELECT else { redisAsyncCommand( s_pRedisAsyncPubContext, []( redisAsyncContext* ctx, void* r, void*) { redisReply* reply = static_cast( r) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ; return ; } LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) s_bPubConnected = true ; }, nullptr, "SELECT %d", GetPendingDataBase() ) ; } } ) ; // Imposto e Definisco la funzione di CallBack per contesto SUB per Connessione asincrona redisAsyncSetConnectCallback( s_pRedisAsyncSubContext, []( const redisAsyncContext* ctx, int nStatus) { // --- Se contesto nullo, errore if ( ctx == nullptr || nStatus != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack Pub failed ") ; return ; } // --- Se Autenticazione richiesta -> AUTH e SELECT string sPassword = GetPendingPassword() ; if ( ! sPassword.empty()) { redisAsyncCommand( s_pRedisAsyncSubContext, []( redisAsyncContext* ctx, void* r, void*) { // Invio la richiesta redisReply* reply = static_cast( r) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null asnwer" ; LOG_INFO( GetCmdLogger(), ( string{ " Error : Authentication -> "} + sErrMsg).c_str()) return ; } LOG_INFO( GetCmdLogger(), "Valid Authentication for Pub connection") redisAsyncCommand( ctx, []( redisAsyncContext* ctx, void* r, void*) { redisReply* reply = static_cast( r) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ; return ; } LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) ; s_bSubConnected = true ; }, nullptr, "SELECT %d", GetPendingDataBase() ) ; }, nullptr, "AUTH %s %s", GetPendingUser().c_str(), GetPendingPassword().c_str() ) ; } // Se nessuna autenticazione -> SELECT else { redisAsyncCommand( s_pRedisAsyncSubContext, []( redisAsyncContext* ctx, void* r, void*) { redisReply* reply = static_cast( r) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ; return ; } LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) s_bSubConnected = true ; }, nullptr, "SELECT %d", GetPendingDataBase() ) ; } } ) ; // Imposto e Definisco la funzione di CallBack per la Disconnessione Pub asincrona redisAsyncSetDisconnectCallback( s_pRedisAsyncPubContext, []( const redisAsyncContext* ctx, int status) { LOG_INFO( GetCmdLogger(), "Disconnected from Redis ( Pub )") ; s_bPubLoopRunning = false ; s_pRedisAsyncPubContext = nullptr ; this_thread::sleep_for( chrono::milliseconds( 50)) ; // per sicurezza CleanupWinsock() ; } ) ; // Imposto e Definisco la funzione di CallBack per la Disconnessione Sub asincrona redisAsyncSetDisconnectCallback( s_pRedisAsyncSubContext, []( const redisAsyncContext* ctx, int status) { LOG_INFO( GetCmdLogger(), "Disconnected from Redis ( Sub )") ; s_pRedisAsyncSubContext = nullptr ; s_bSubLoopRunning = false ; this_thread::sleep_for( chrono::milliseconds( 50)) ; // per sicurezza CleanupWinsock() ; } ) ; // Definizione e partenza del ciclo degli eventi in un thread separato ( in BackGround) s_bPubLoopRunning = true ; s_bSubLoopRunning = true ; thread tPubEventLoop( RedisEventLoop, s_pRedisAsyncPubContext, true) ; thread tSubEventLoop( RedisEventLoop, s_pRedisAsyncSubContext, false) ; tPubEventLoop.detach() ; tSubEventLoop.detach() ; // Se richiesto massimo tempo di connessione, aspetto if ( ConnectionInfo.nAsyncTimeout > 0.) { auto tStart = chrono::steady_clock::now() ; auto tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; while ( ! s_bSubConnected && ! s_bPubConnected) { this_thread::sleep_for( chrono::milliseconds( 10)) ; if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Error in Connection : TimeOut exceeded") return false ; } } auto tElapsed = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; LOG_INFO( GetCmdLogger(), ( string{ "Connected in "} + ToString( tElapsed) + " ms").c_str()) } return true ; } //---------------------------------------------------------------------------- // Funzione di Disconnessione asincrona a Redis //---------------------------------------------------------------------------- bool ExeRedisAsyncDisconnect( void) { // Controllo che la connessione sia valida e il contesto ben definito bool bOkPub = CheckConnectionAndContext( s_pRedisAsyncPubContext, true) ; bool bOkSub = CheckConnectionAndContext( s_pRedisAsyncSubContext, false) ; if ( ! bOkSub || ! bOkPub) return false ; // Effettuo la Disconnesione asincrona redisAsyncDisconnect( s_pRedisAsyncPubContext) ; // libera la memoria in automatico redisAsyncDisconnect( s_pRedisAsyncSubContext) ; // libera la memoria in automatico LOG_INFO( GetCmdLogger(), "Async Connection Closed !") ; return true ; } //---------------------------------------------------------------------------- // Funzione di Publish asincrona a Redis //---------------------------------------------------------------------------- bool ExeRedisAsyncPublish( const string& sChannel, const string& sMessage) { // Controllo che la connessione Pub sia valida e il contesto Pub ben definito if ( ! CheckConnectionAndContext( s_pRedisAsyncPubContext, true)) return false ; // Se non ho alcun canale -> errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") return false ; } // Se non ho alcun messaggio -> warning if ( sMessage.empty()) LOG_INFO( GetCmdLogger(), "Warning : Empty Message") ; // Eseguo il comando PUBLISH if ( redisAsyncCommand( s_pRedisAsyncPubContext, MessageCallback, nullptr, "PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error: redisAsyncCommand PUBLISH failed") return false ; } return true ; } //---------------------------------------------------------------------------- // Funzione di Subscribe asincrona a Redis //---------------------------------------------------------------------------- bool ExeRedisAsyncSubscribe( const string& sChannel) { // Controllo che la connessione sia valida e il contesto ben definito if ( ! CheckConnectionAndContext( s_pRedisAsyncSubContext, false)) return false ; // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") ; return false ; } // Eseguo il comando Subscribe if ( redisAsyncCommand( s_pRedisAsyncSubContext, MessageCallback, nullptr, ( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand SUBSCRIBE failed") return false ; } return true ; } //---------------------------------------------------------------------------- // Funzione di Unsubscribe asincrona a Redis //---------------------------------------------------------------------------- bool ExeRedisAsyncUnsubscribe( const string& sChannel) { // Controllo che la connessione sia valida e il contesto ben definito if ( ! CheckConnectionAndContext( s_pRedisAsyncSubContext, false)) return false ; // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") return false ; } // Eseguo il comando Unsubscribe if ( redisAsyncCommand( s_pRedisAsyncSubContext, MessageCallback, nullptr, ( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed") return false ; } return true ; } //---------------------------------------------------------------------------- // Funzione di che aspetta un messaggio asincro su un canale Redis //---------------------------------------------------------------------------- // La funzione esegue una Subscribe e una Unsubscribe // Se entro il tempo stabilito riceve un messaggio, lo comunica, altrimenti errore bool ExeRedisAsyncSubscribeOneMessage( const string& sChannel, double dMaxTimeOut, string& sMessage) { // Controllo che la connessione sia valida e il contesto ben definito if ( ! CheckConnectionAndContext( s_pRedisAsyncPubContext, true)) return false ; // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") return false ; } // Controllo validità tempo di TimeOut if ( dMaxTimeOut <= 0.) { LOG_INFO( GetCmdLogger(), "Error : Invalid timeout value") ; return false ; } // flag per ricezione del messaggio s_bMessage = false ; s_sMessage.clear() ; // Invio del comando di SubScribe if ( redisAsyncCommand( s_pRedisAsyncSubContext, WaitMessageCallback, nullptr, ( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to subscribe to"} + sChannel).c_str()) return false ; } // Attivo il TimeOut per l'attesa del messaggio auto tStart = chrono::steady_clock::now() ; auto tTimeOut = chrono::milliseconds( static_cast( dMaxTimeOut)) ; while ( ! s_bMessage) { this_thread::sleep_for( chrono::milliseconds( 10)) ; if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), ( string{ "Timeout : No Message received on ["} + sChannel + "]").c_str()) return false ; } } // Invio del comando Unsubscribe if ( redisAsyncCommand( s_pRedisAsyncSubContext, WaitMessageCallback, nullptr, ( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to Unsubscribe to "} + sChannel).c_str()) return false ; } // Controllo se ho ricevuto un messaggio if ( s_bMessage) { sMessage = s_sMessage ; LOG_INFO( GetCmdLogger(), ( string{ "Message received on ["} + sChannel + "]: " + sMessage).c_str()) return true ; } LOG_INFO( GetCmdLogger(), ( string{ "Timeout: No message received on ["} + sChannel + "]").c_str()) ; return false ; }