//---------------------------------------------------------------------------- // EgalTech 2025-2025 //---------------------------------------------------------------------------- // File : EXE_Redis.cpp Data : 17.09.25 Versione : 2.7i3 // Contenuto : Funzioni per interfacciarsi con server Redis. // // // // Modifiche : 17.09.25 RE Creazione modulo. ( ver. 2.7i3) // Modifiche : 31.10.25 RE Aggiunta gestione connessioni multiple. ( ver. 2.7j4) // Modifiche : 04.12.25 RE Aggiunta Memorizzazione messaggio. ( ver 2.7l1) // // //---------------------------------------------------------------------------- //--------------------------- Include ---------------------------------------- #include #include #include "stdafx.h" #include "EXE.h" #include "EXE_Macro.h" #include "AuxTools.h" #include "/EgtDev/Include/EXeExecutor.h" #include "/EgtDev/Include/EGkStringUtils3d.h" #include "/EgtDev/Include/EGnStringKeyVal.h" #pragma warning( disable: 4244) // conversione da 'uint64_t' a 'size_t' #pragma warning( disable: 4200) // utilizzata estensione non standard: matrice di dimensioni zero in struct/union #include "/EgtDev/Extern/hiredis/Include/async.h" #include #include #include #include #include using namespace std ; // --------------------------------------------------------------------------- // Ambiente Debug // --------------------------------------------------------------------------- #define DEBUG 0 // --------------------------------------------------------------------------- // Definizione variabili generali Redis // --------------------------------------------------------------------------- static const int REDIS_MIN_DB = 0 ; static const int REDIS_MAX_DB = 15 ; static const int DEFAULT_PORT = 6379 ; static const int DEFAULT_SENTINEL_MASTER_PORT = 26379 ; static const string DEFAULT_USER = "default" ; static const int DEFAULT_DATABASE = 0 ; static const int DEFAULT_TIMEOUT = 15000 ; // [ms] static const int DEFAULT_ALIVE_TIME = 180 ; // [s] static const int MAX_CONNECTION_NBR = 10 ; static const int NO_CONNECTION = - 1 ; static const string KEY_TYPE_STRING = "string" ; static const string KEY_TYPE_LIST = "list" ; // non usato static const string KEY_TYPE_SET = "set" ; // non usato static const string KEY_TYPE_HASH = "hash" ; // non usato static const string KEY_TYPE_ZSET = "zset" ; // non usato static const string KEY_TYPE_JSON = "ReJSON-RL" ; // non usato, disponibile se versione >= 8 ( noi usiamo la 6) // Struttura per parametri della stringa di Connessione struct RedisConnectionInfo { string sHost, sServiceName, sUser, sPassword ; int nPort, nDefaultDataBase, nKeepAlive, nConnectTimeout, nSyncTimeout, nAsyncTimeout ; bool bAbortConnect, bSsl, bAllowAdmin ; RedisConnectionInfo() { sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ; nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ; nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ; bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ; } ; RedisConnectionInfo( string sMyHost, string sMyServiceName, string sMyUser, string sMyPassword, int nMyConnection, int nMyPort, int nMyDefaultDataBase, int dMyKeepAlive, int dMyConnectTimeout, int dMySyncTimeout, int dMyAsyncTimeout, bool bMyAbortConnect, bool bMySsl, bool bMyAllowAdmin) : sHost( sMyHost), sServiceName( sMyServiceName), sUser( sMyUser), sPassword( sMyPassword), nPort( nMyPort), nDefaultDataBase( nMyDefaultDataBase), nKeepAlive( dMyKeepAlive), nConnectTimeout( dMyConnectTimeout), nSyncTimeout( dMySyncTimeout), nAsyncTimeout( dMyAsyncTimeout), bAbortConnect( bMyAbortConnect), bSsl( bMySsl), bAllowAdmin( bMyAllowAdmin) {} inline void Clear() { sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ; nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ; nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ; bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ; } } ; // Struttura per Connessiona Sincrona struct RedisSync { atomic bFreeConnection ; redisContext* pRedisContext ; RedisConnectionInfo ConnectionInfo ; RedisSync() : bFreeConnection( true), pRedisContext( nullptr), ConnectionInfo() {} inline void Clear() { bFreeConnection = true ; redisFree( pRedisContext) ; pRedisContext = nullptr ; ConnectionInfo.Clear() ; } } ; static array s_vRedisClients ; // Struttura per stato di Connessione Asincrona struct RedisConnectionStatus { atomic nIdConnection ; atomic bConnected ; atomic bAlive ; atomic bAuthenticated ; atomic bSelectedDB ; atomic bDisconnected ; atomic bPublished ; atomic bSubscribed ; atomic bUnsubscribed ; atomic bMessage ; string sMessage ; // univoco grazie a bMessage RedisConnectionStatus() : nIdConnection( 0), bConnected( false), bAlive( false), bAuthenticated( false), bSelectedDB( false), bDisconnected( false), bPublished( false), bSubscribed( false), bUnsubscribed( false), bMessage( false), sMessage( "") {} inline void Clear() { nIdConnection = 0 ; bConnected = false ; bAlive = false ; bAuthenticated = false ; bSelectedDB = false ; bDisconnected = false ; bPublished = false ; bSubscribed = false ; bUnsubscribed = false ; bMessage = false ; sMessage = "" ; } } ; // Classe per Connessione Asincrona class RedisAsync { public : atomic m_bFreeConnection ; redisAsyncContext* m_pRedisAsyncPubContext ; redisAsyncContext* m_pRedisAsyncSubContext ; string m_sConnectionString ; unordered_set m_set_SubChannels ; int m_nDataBase ; string m_sPassword ; string m_sUser ; atomic m_bPubLoopRunning ; atomic m_bSubLoopRunning ; RedisConnectionInfo m_ConnectionInfo ; RedisConnectionStatus m_ConnectionSubStatus ; RedisConnectionStatus m_ConnectionPubStatus ; thread m_PubThread ; thread m_SubThread ; WSADATA m_wsaData ; RedisAsync() : m_bFreeConnection( true), m_pRedisAsyncPubContext( nullptr), m_pRedisAsyncSubContext( nullptr), m_sConnectionString( ""), m_set_SubChannels(), m_nDataBase( REDIS_MIN_DB), m_sPassword( ""), m_sUser( DEFAULT_USER), m_bPubLoopRunning( false), m_bSubLoopRunning( false), m_ConnectionInfo(), m_ConnectionSubStatus(), m_ConnectionPubStatus() {} ; // thread è già inizializzato con il suo costruttore di Default valido inline void Clear() { // Loop da terminare m_bPubLoopRunning.store( false, memory_order_release) ; m_bSubLoopRunning.store( false, memory_order_release) ; // Sveglio i Thread bloccanti if ( m_pRedisAsyncPubContext) redisAsyncDisconnect( m_pRedisAsyncPubContext) ; if ( m_pRedisAsyncSubContext) redisAsyncDisconnect( m_pRedisAsyncSubContext) ; // Join dei Thread if ( m_PubThread.joinable()) m_PubThread.join() ; if ( m_SubThread.joinable()) m_SubThread.join() ; // Rilascio dei contesti Redis if ( m_pRedisAsyncPubContext) { redisAsyncFree( m_pRedisAsyncPubContext) ; m_pRedisAsyncPubContext = nullptr ; } if ( m_pRedisAsyncSubContext) { redisAsyncFree( m_pRedisAsyncSubContext) ; m_pRedisAsyncSubContext = nullptr ; } // Reset dello Stato m_bFreeConnection.store( true, memory_order_release) ; m_sConnectionString.clear() ; m_nDataBase = REDIS_MIN_DB ; m_sPassword.clear() ; m_sUser = DEFAULT_USER ; m_ConnectionInfo.Clear() ; m_ConnectionSubStatus.Clear() ; m_ConnectionPubStatus.Clear() ; m_set_SubChannels.clear() ; } ~RedisAsync() { if ( m_PubThread.joinable()) m_PubThread.join() ; if ( m_SubThread.joinable()) m_SubThread.join(); } public : void RedisEventLoop( bool bIsSub) const ; } ; static array s_vAsyncRedisClients ; // Mappa lettura Messaggi per Modalità Asicnrona struct RedisAsyncMessage { atomic_flag Lock = ATOMIC_FLAG_INIT ; int nCount = 0 ; string sMessage ; } ; static unordered_map s_vAsyncRedisMessages ; // --------------------------------------------------------------------------- // [Redis] Funzione Ciclo degli eventi per Read/Write su Socket // --------------------------------------------------------------------------- void RedisAsync::RedisEventLoop( bool bIsPub) const { // Verifica della validità del contesto redisAsyncContext* ctx = ( bIsPub ? m_pRedisAsyncPubContext : m_pRedisAsyncSubContext) ; if ( ctx == nullptr || ctx->err != 0) return ; // se attivazione loop prima della inizializzazione callBack // Recupero del file descriptor del socket TCP usato da Redis. SOCKET sock = ctx->c.fd ; // Creazione dei due insiemi per il file descriptor ( lettura e scrittura ) // r = read, w = write fd_set rfds, wfds ; // Timeout per funzione select { secondi, millisecondi} // La funzione select attende che il socket sia pronto per lettura e scrittura // Resituisce : // SOCKET_ERROR in caso di errore // 0 se scade il timeout // > 0 se ci sono eventi da gestire timeval timeout = {0, 100000 } ; // 100ms // Ciclo while ( ( bIsPub ? m_bPubLoopRunning.load( memory_order_acquire) : m_bSubLoopRunning.load( memory_order_acquire))) { // Se contesto non valido, interrompo il ciclo if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING)) break ; // Recupero file descriptor di lettura e scrittura FD_ZERO( &rfds) ; FD_ZERO( &wfds) ; FD_SET( sock, &rfds) ; FD_SET( sock, &wfds) ; // Controllo il risultato ottenuto int nRet = select( 0, &rfds, &wfds, nullptr, &timeout) ; if ( nRet == SOCKET_ERROR) { // Se errore -> interruzione del ciclo LOG_INFO( GetCmdLogger(), ( string{ "Error : Socket select() failed -> "} + ToString( WSAGetLastError())).c_str()) ; break ; } else if ( nRet > 0) { // Se Socket pronto per lettura/ scrittua if ( ctx != nullptr && ! ( ctx->c.flags & REDIS_DISCONNECTING) && ctx->c.fd != -1) { if ( FD_ISSET( sock, &rfds)) redisAsyncHandleRead( ctx) ; if ( FD_ISSET( sock, &wfds)) redisAsyncHandleWrite( ctx) ; } // Evito saturazione CPU this_thread::sleep_for( chrono::milliseconds( 1)) ; } // Reset del timeOut ad ogni ciclo timeout.tv_sec = 0 ; timeout.tv_usec = 100000 ; // 100ms } return ; } //---------------------------------------------------------------------------- // [Utility] Funzione di Controllo per Id Connessione // --------------------------------------------------------------------------- static bool CheckIdConnection( int nIdConnection) { if ( nIdConnection < 0 || nIdConnection >= MAX_CONNECTION_NBR) { LOG_INFO( GetCmdLogger(), ( string{ "Error : Not a Valid id Connestion : [1, "} + ToString( MAX_CONNECTION_NBR) + "]").c_str()) return false ; } return true ; } // --------------------------------------------------------------------------- // [Utility] Funzione di Split parametri per Stringa di Connessione // --------------------------------------------------------------------------- static bool GetParamsFromConnectionString( const string& sConnection, RedisConnectionInfo& ConnectionInfo) { // Se la stringa di connessione è vuota, errore if ( sConnection.empty()) { LOG_INFO( GetCmdLogger(), "Error : Empty Connection String") ; return false ; } // Ricerco le informazioni Splittando la stringa di connessione per "," string sHostPort, sParams ; SplitFirst( sConnection, ",", sHostPort, sParams) ; if ( ! sHostPort.empty()) { // Host e Porta string sHost, sPort ; SplitFirst( sHostPort, ":", sHost, sPort) ; if ( sHost.empty()) { LOG_INFO( GetCmdLogger(), "Error : Invalid Host") ; return false ; } int nPort = DEFAULT_PORT ; if ( ! sPort.empty()) { if ( ! FromString( sPort, nPort)) { LOG_INFO( GetCmdLogger(), "Error : Invalid Port") ; return false ; } } ConnectionInfo.sHost = sHost ; ConnectionInfo.nPort = nPort ; } // Recupero i parametri ( rimuovo tutti gli spazi ) sParams.erase( remove( sParams.begin(), sParams.end(), ' '), sParams.end()) ; GetValInNotes( sParams, "serviceName", ",", ConnectionInfo.sServiceName) ; GetValInNotes( sParams, "user", ",", ConnectionInfo.sUser) ; GetValInNotes( sParams, "password", ",", ConnectionInfo.sPassword) ; GetValInNotes( sParams, "DefaultDatabase", ",", ConnectionInfo.nDefaultDataBase) ; if ( ConnectionInfo.nDefaultDataBase < 0 || ConnectionInfo.nDefaultDataBase > REDIS_MAX_DB) { ConnectionInfo.Clear() ; LOG_INFO( GetCmdLogger(), "Error : DB Out of range [0, 15]") return false ; } GetValInNotes( sParams, "keepAlive", ",", ConnectionInfo.nKeepAlive) ; GetValInNotes( sParams, "connectTimeout", ",", ConnectionInfo.nConnectTimeout) ; GetValInNotes( sParams, "syncTimeout", ",", ConnectionInfo.nSyncTimeout) ; GetValInNotes( sParams, "asyncTimeout", ",", ConnectionInfo.nAsyncTimeout) ; GetValInNotes( sParams, "abortConnect", ",", ConnectionInfo.bAbortConnect) ; GetValInNotes( sParams, "ssl", "," ,ConnectionInfo.bSsl) ; GetValInNotes( sParams, "allowAdmin", ",", ConnectionInfo.bAllowAdmin) ; return true ; } // --------------------------------------------------------------------------- // [Utility] Funzione di Creazione Chiave di accesso alla Mappa dei Messaggi // --------------------------------------------------------------------------- static string GetMessageMapKey( int nIdConnection, const string& sChannel) { return ( ToString( nIdConnection) + "_" + sChannel) ; } //---------------------------------------------------------------------------- // Funzione per comando CONNECT Sincrono // --------------------------------------------------------------------------- bool ExeRedisConnect( const string& sConnection, int& nIdConnection) { // Inizializzo una nuova connessione, recuperando l'Id int nMyIdConnection = NO_CONNECTION ; for ( int i = 0 ; i < int( s_vRedisClients.size()) ; ++ i) { if ( s_vRedisClients[i].bFreeConnection) { nMyIdConnection = i ; break ; } } if ( nMyIdConnection == NO_CONNECTION) { LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} + ToString( MAX_CONNECTION_NBR) + " )").c_str()) return false ; } // Recupero i riferimenti necessari e blocco la connessione corrente RedisSync& SyncRedisClient = s_vRedisClients[nMyIdConnection] ; SyncRedisClient.bFreeConnection = false ; RedisConnectionInfo& ConnectionInfo = SyncRedisClient.ConnectionInfo ; // Recupero i parametri dalla stringa di connessione if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) { SyncRedisClient.Clear() ; return false ; } // Se connessione senza TimeOut if ( ConnectionInfo.nSyncTimeout <= 0) { // Imposto il contesto per connesione sincrona SyncRedisClient.pRedisContext = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; if ( SyncRedisClient.pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } else if ( SyncRedisClient.pRedisContext->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) SyncRedisClient.Clear() ; return false ; } } // Se connessione con parametri else { // TimeOut per connessione struct timeval TimeOutConnection{} ; TimeOutConnection.tv_sec = static_cast( ConnectionInfo.nSyncTimeout) ; TimeOutConnection.tv_usec = static_cast( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ; // Imposto il contesto per connesione sincrona con TimeOut SyncRedisClient.pRedisContext = redisConnectWithTimeout( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort, TimeOutConnection) ; if ( SyncRedisClient.pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } else if ( SyncRedisClient.pRedisContext->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) SyncRedisClient.Clear() ; return false ; } // TimeOut per richiesta I/O struct timeval TimeOutIO{} ; TimeOutIO.tv_sec = static_cast( ConnectionInfo.nSyncTimeout) ; TimeOutIO.tv_usec = static_cast( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ; // Imposto TimeOut per I/O int nResult = redisSetTimeout( SyncRedisClient.pRedisContext, TimeOutIO) ; if ( nResult != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : Can't set timeout I/O") ; return false ; } } // Verifico se la Connessione è di tipo Sentinel redisReply* replySentinel = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SENTINEL get-master-addr-by-name %s", ConnectionInfo.sServiceName.c_str()) ; if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) { if ( replySentinel != nullptr) freeReplyObject( replySentinel) ; } else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr && replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) { // --- Nodo sentinella string sMasterHost = replySentinel->element[0]->str ; int nMasterPort = DEFAULT_SENTINEL_MASTER_PORT ; FromString( replySentinel->element[1]->str, nMasterPort) ; freeReplyObject( replySentinel) ; // Effettuo la connessione al master redis freeReplyObject( SyncRedisClient.pRedisContext) ; SyncRedisClient.pRedisContext = redisConnect( sMasterHost.c_str(), nMasterPort) ; if ( SyncRedisClient.pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } else if ( SyncRedisClient.pRedisContext->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) SyncRedisClient.Clear() ; return false ; } } LOG_INFO( GetCmdLogger(), "Connected to Redis !") // Verifico se richiesta autenticazione if ( ! ConnectionInfo.sPassword.empty()) { redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "AUTH %s %s", ConnectionInfo.sUser.c_str(), ConnectionInfo.sPassword.c_str()) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { LOG_INFO( GetCmdLogger(), "Error : Authentication failed") ; if ( reply == nullptr) freeReplyObject( reply) ; SyncRedisClient.Clear() ; return false ; } freeReplyObject( reply) ; } LOG_INFO( GetCmdLogger(), "Authenticated to Redis !") // Seleziono il DataBase redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SELECT %d", ConnectionInfo.nDefaultDataBase) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) ; return false ; } else if ( reply->type == REDIS_REPLY_ERROR) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + reply->str).c_str()) freeReplyObject( reply) ; SyncRedisClient.Clear() ; return false ; } freeReplyObject( reply) ; LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ; // Restituisco l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua) nIdConnection = nMyIdConnection + 1 ; return true ; } //---------------------------------------------------------------------------- // Funzione per comando DISCONNECT Sincrono // --------------------------------------------------------------------------- bool ExeRedisDisconnect( int nIdConnection) { // Verifico che l'Id di connesione sia valido int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva if ( s_vRedisClients[nMyIdConnection].bFreeConnection) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } // Pulisco la connessione corrente s_vRedisClients[nMyIdConnection].Clear() ; LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ; return true ; } //---------------------------------------------------------------------------- // Funzione per comando SET Sincrono ( sola scrittura {key:string, val:string}) //---------------------------------------------------------------------------- bool ExeRedisSetValFromKey( int nIdConnection, const string& sKey, const string& sVal) { // Verifico che l'Id di connesione sia valido int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva if ( s_vRedisClients[nMyIdConnection].bFreeConnection) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } // Recupero la connessione corrente RedisSync& RedisClient = s_vRedisClients[nMyIdConnection] ; // Il valore associato alla chiave può essere solo di tipo stringa redisReply* reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "SET %s %s", sKey.c_str(), sVal.c_str()) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Null reply") ; return false ; } else if ( reply->type != REDIS_REPLY_STATUS || string( reply->str ) != "OK") { LOG_INFO( GetCmdLogger(), "Error SET : No answer") ; freeReplyObject( reply) ; return false ; } freeReplyObject( reply) ; LOG_INFO( GetCmdLogger(), ( string{ "New Key Set : ["} + sKey + ":" + sVal + "]").c_str()) return true ; } //---------------------------------------------------------------------------- // Funzione per comando Get Sincrono ( sola lettura {key:string, val:string}) //---------------------------------------------------------------------------- bool ExeRedisGetValFromKey( int nIdConnection, const string& sKey, string& sVal) { sVal.clear() ; // Verifico che l'Id di connesione sia valido int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva if ( s_vRedisClients[nMyIdConnection].bFreeConnection) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } // Recupero la connessione corrente RedisSync& RedisClient = s_vRedisClients[nMyIdConnection] ; // Recupero il tipo associato alla chiave da Redis redisReply* typeReply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "TYPE %s", sKey.c_str()) ; if ( typeReply == nullptr || typeReply->type != REDIS_REPLY_STATUS) { string sErr = ( typeReply != nullptr ? string( typeReply->str) : "null reply") ; if ( typeReply != nullptr) freeReplyObject( typeReply) ; LOG_INFO( GetCmdLogger(), ( "Error : Failed to get key type, " + sErr).c_str()) ; return false ; } string sType = typeReply->str ; freeReplyObject( typeReply) ; // Determino il formato per la chiamata al DataBase Redis // Per ora accettate solamente valori string string sFormat = ( sType == KEY_TYPE_STRING ? "Get %s" : "") ; if ( sFormat.empty()) { LOG_ERROR( GetCmdLogger(), "Error : Not valid Key Type") ; return false ; } // Effettuo la chiamata redisReply* reply = nullptr ; reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, sFormat.c_str(), sKey.c_str()) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Null reply") ; return false ; } // Recupero il Valore if ( sType == KEY_TYPE_STRING) { if ( reply->type != REDIS_REPLY_STRING) { LOG_INFO( GetCmdLogger(), "Error : Failed to read the value") ; freeReplyObject( reply) ; return false ; } sVal = string( reply->str) ; } LOG_INFO( GetCmdLogger(), ( string{ "Key Read : ["} + sKey + ":" + sVal + "]").c_str()) freeReplyObject( reply) ; return true ; } // --------------------------------------------------------------------------- // [CB] Funzione di CallBack per comando SELECT Asincrono ( Selezione DB) // --------------------------------------------------------------------------- static void RedisDbSElectionCallBack( redisAsyncContext* ctx, void* r, void*) { // Invio la richiesta redisReply* reply = static_cast( r) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; LOG_INFO( GetCmdLogger(), ( string{ "Error : Db Selection -> "} + sErrMsg).c_str()) ; return ; } // Imposto flag di Autenticazione RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } // Selezione riuscita MyStatus->bSelectedDB.store( true, memory_order_release) ; MyStatus->bSelectedDB.notify_one() ; } //---------------------------------------------------------------------------- // [CB] Funzione di CallBack per comando AUTH Asincrono ( Autenticazione) // --------------------------------------------------------------------------- static void RedisAuthenticationCallBack( redisAsyncContext* ctx, void* r, void*) { // Invio la richiesta redisReply* reply = static_cast( r) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; LOG_INFO( GetCmdLogger(), ( string{ "Error : Authentication -> "} + sErrMsg).c_str()) ; return ; } // Imposto flag di Autenticazione RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } // Autenticazione riuscita MyStatus->bAuthenticated.store( true, memory_order_release) ; MyStatus->bAuthenticated.notify_one() ; } // --------------------------------------------------------------------------- // [CB] Funzione di CallBack per comandi PUBLISH, SUBSCRIBE, MESSAGE, UNSUBSCRIBE // Asincroni // --------------------------------------------------------------------------- static void RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*) { // Verifico che il contesto sia definito if ( ctx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack") return ; } // Recupero della risposta dal server redis redisReply* reply = static_cast( r) ; if ( reply == nullptr) return ; LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked") // Caso PUBLISH: hiredis risponde con REDIS_REPLY_INTEGER (numero di subscribers) if ( reply->type == REDIS_REPLY_INTEGER) { // Recupero lo Stato RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } // Imposto stato di pubblicazione MyStatus->bPublished.store( true, memory_order_release) ; MyStatus->bPublished.notify_one() ; // Recupero il numero di clients iscritti al canale LOG_INFO( GetCmdLogger(), ( string{ "Message published to "} + ToString( reply->integer) + " clients !").c_str()) ; return ; } // Caso ARRAY: Subscribe / Unsubscribe / Message else if ( reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) { // Recupero la tipologia di chiamata effettuata const char* msgType = ( reply->element[0] != nullptr && reply->element[0]->str != nullptr ? reply->element[0]->str : "") ; // --- Se Messaggio if ( strcmp( msgType, "message") == 0 && reply->elements == 3) { // Messaggio ricevuto da un canale if ( reply->element[1] != nullptr && reply->element[1]->str != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) { string sChannel{ reply->element[1]->str, reply->element[1]->len} ; string sMessage{ reply->element[2]->str, reply->element[2]->len} ; #if DEBUG LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " + sMessage).c_str()) #endif // Recupero lo Stato RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } // Aggiorno la Mappa dei Messaggi ( viene sempre sovrascritto all'ultimo) string sKey = GetMessageMapKey( MyStatus->nIdConnection, sChannel) ; auto Iter = s_vAsyncRedisMessages.find( sKey) ; if ( Iter != s_vAsyncRedisMessages.end()) { RedisAsyncMessage& LockedMessage = Iter->second ; while ( LockedMessage.Lock.test_and_set( memory_order_acquire)) LockedMessage.Lock.wait( true, memory_order_relaxed) ; ++ LockedMessage.nCount ; LockedMessage.sMessage = sMessage ; LockedMessage.Lock.clear( memory_order_release) ; LockedMessage.Lock.notify_one() ; } } else { LOG_INFO( GetCmdLogger(), "Errror : Invalid Message received") return ; } } // --- Se SUBSCRIBE else if ( strcmp( msgType, "subscribe") == 0 && reply->elements >= 3) { if ( reply->element[1] != nullptr && reply->element[1] != nullptr && reply->element[2] != nullptr) { // Recupero lo Stato RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } // Imposto stato di Subscribe MyStatus->bSubscribed.store( true, memory_order_release) ; MyStatus->bSubscribed.notify_one() ; // Comunico il risultato string sChannel = reply->element[1]->str ; int nCount = reply->element[2]->integer ; LOG_INFO( GetCmdLogger(), ( string{ "Subscribed to ["} + sChannel + "]," + " total subscribtions : " + ToString( nCount)).c_str()) ; } else { LOG_INFO( GetCmdLogger(), "Errror : Invalid Subscription reply") return ; } } // --- se Unsubscribe else if ( strcmp( msgType, "unsubscribe") == 0 && reply->elements >= 3) { if ( reply->element[1] != nullptr && reply->element[1] != nullptr && reply->element[2] != nullptr) { // Recupero lo Stato RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } // Imposto stato Unsubscribe MyStatus->bUnsubscribed.store( true, memory_order_release) ; MyStatus->bUnsubscribed.notify_one() ; // Comunico il risultato string sChannel = reply->element[1]->str ; int nCount = reply->element[2]->integer ; LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribed to ["} + sChannel + "]," + " subscriptions left : " + ToString( nCount)).c_str()) } else { LOG_INFO( GetCmdLogger(), "Errror : Invalid Unsubscription reply") return ; } } // --- Undefined else { LOG_INFO( GetCmdLogger(), "Error : Undefined reply in CallBack from Redis") return ; } } return ; } // --------------------------------------------------------------------------- // [CB] Funzione di CallBack per ricezione MESSAGE Asincrona // --------------------------------------------------------------------------- static void WaitMessageCallback( redisAsyncContext* ctx, void* r, void*) { // Verifico che il contesto sia definito if ( ctx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack") return ; } LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked") // Recupero della risposta dal server redis redisReply* reply = static_cast( r) ; if ( reply == nullptr) return ; if ( reply->type != REDIS_REPLY_ARRAY || reply->elements < 3) LOG_INFO( GetCmdLogger(), "Error : Invalid Message CallBack reply") else { // Recupero il Messaggio ( se callBack riferita al tipo "message") if ( reply->element[0] == nullptr || reply->element[0]->str == nullptr) LOG_INFO( GetCmdLogger(), "Error : Null message type received") else { const char* msgType = reply->element[0]->str ; if ( strcmp( msgType, "message") == 0) { if ( reply->element[2] == nullptr || reply->element[2]->str == nullptr) LOG_INFO( GetCmdLogger(), "Error : Null message received") else { // Recupero lo Stato RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } // Imposto stato di pubblicazione MyStatus->bMessage = true ; MyStatus->sMessage = reply->element[2]->str ; } } } } } // --------------------------------------------------------------------------- // [CB] Funzione di CallBack per comando CONNECT Asincrono // --------------------------------------------------------------------------- static void RedisConnectCallBack( const redisAsyncContext* ctx, int nStatus) { // Verifico che il Contesto sia valido if ( ctx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack failed ") ; return ; } // Recupero dal Contesto lo stato della connessione RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } // Verifico che lo stato internno al contesto Redis sia OK if ( nStatus != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : Redis Async connection failed") ; MyStatus->bAlive.store( false, memory_order_release) ; MyStatus->bConnected.store( false, memory_order_release) ; MyStatus->bConnected.notify_one() ; return ; } // Connessione riuscita MyStatus->bAlive.store( true, memory_order_release) ; MyStatus->bConnected.store( true, memory_order_release) ; MyStatus->bConnected.notify_one() ; } // --------------------------------------------------------------------------- // [CB] Funzione di CallBack per comando DISCONNECT Asincrono // --------------------------------------------------------------------------- static void RedisDisconnectCallBack( const redisAsyncContext* ctx, int nStatus) { // Verifico che il Contesto sia valido if ( ctx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Null Context for Disconnetion CallBack ") ; return ; } // Recupero lo stato della connessione RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } // nStatus non è affidabile in questo contesto, può essere != REDIS_OK anche in disconnessione normale // NB. nStatus è solo informativo, ignora tutte le disconnessioni reali LOG_INFO( GetCmdLogger(), "Redis Async Disconnected ( x2, called for PUB and SUB)") ; // Aggiorno gli stati ed eventuali thread in attesa MyStatus->bAlive.store( false, memory_order_release) ; MyStatus->bConnected.store( false, memory_order_release) ; MyStatus->bConnected.notify_one() ; MyStatus->bDisconnected.store( true, memory_order_release) ; MyStatus->bDisconnected.notify_one() ; } //---------------------------------------------------------------------------- // Funzione per comando CONNECT Asincrono // --------------------------------------------------------------------------- bool ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection) { // Inizializzo una nuova connessione, recuperando l'Id int nMyIdConnection = NO_CONNECTION ; for ( int i = 0 ; i < int( s_vAsyncRedisClients.size()) ; ++ i) { if ( s_vAsyncRedisClients[i].m_bFreeConnection.load( memory_order_acquire)) { nMyIdConnection = i ; break ; } } if ( nMyIdConnection == NO_CONNECTION) { LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} + ToString( MAX_CONNECTION_NBR) + " )").c_str()) return false ; } // Recupero lo Slot RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; AsyncRedisClient.Clear() ; // per possibile riconnessione AsyncRedisClient.m_bFreeConnection.store( false, memory_order_release) ; // Recupero i riferimenti necessari e blocco la connessione corrente RedisConnectionInfo& ConnectionInfo = AsyncRedisClient.m_ConnectionInfo ; // Recupero i parametri dalla stringa di connessione AsyncRedisClient.m_sConnectionString = sConnection ; // per riconnessione if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) { AsyncRedisClient.Clear() ; return false ; } // Inizializzazione libreria Winsock, necessaria per usare le funzionalità di rete su Windows, // come socket TCP/IP. Versione utilizzata 2.2 int nStatus = WSAStartup( MAKEWORD( 2, 2), &AsyncRedisClient.m_wsaData) ; if ( nStatus != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : WSAStartup -> "} + ToString( nStatus)).c_str()) ; AsyncRedisClient.Clear() ; return false ; } // Verifico se si tratta di un nodo Sentinella redisContext* sentinelCtx = redisConnect( AsyncRedisClient.m_ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; if ( sentinelCtx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") AsyncRedisClient.Clear() ; return false ; } else if ( sentinelCtx->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + sentinelCtx->errstr).c_str()) redisFree( sentinelCtx) ; AsyncRedisClient.Clear() ; return false ; } redisReply* replySentinel = ( redisReply*)redisCommand( sentinelCtx, "SENTINEL get-master-addr-by-name %s", ConnectionInfo.sServiceName.c_str()) ; if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) { if ( replySentinel != nullptr) freeReplyObject( replySentinel) ; } else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr && replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) { // --- Nodo sentinella ConnectionInfo.sHost = replySentinel->element[0]->str ; ConnectionInfo.nPort = DEFAULT_SENTINEL_MASTER_PORT ; FromString( replySentinel->element[1]->str, ConnectionInfo.nPort) ; freeReplyObject( replySentinel) ; redisFree( sentinelCtx) ; } // Definisco le 2 connessioni asincroni con Host e Porta ( per PUBLISH e SUBSCRIBE/UNSUBSCRIBE) AsyncRedisClient.m_pRedisAsyncPubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; AsyncRedisClient.m_pRedisAsyncSubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; if ( AsyncRedisClient.m_pRedisAsyncPubContext == nullptr || AsyncRedisClient.m_pRedisAsyncSubContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate Redis Async context") AsyncRedisClient.Clear() ; return false ; } else if ( AsyncRedisClient.m_pRedisAsyncPubContext->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncPubContext->errstr).c_str()) AsyncRedisClient.Clear() ; return false ; } else if ( AsyncRedisClient.m_pRedisAsyncSubContext->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncSubContext->errstr).c_str()) AsyncRedisClient.Clear() ; return false ; } // Definizione e partenza del ciclo degli eventi in un thread separato ( in BackGround) AsyncRedisClient.m_ConnectionPubStatus.Clear() ; AsyncRedisClient.m_ConnectionSubStatus.Clear() ; AsyncRedisClient.m_bPubLoopRunning.store( true, memory_order_release) ; AsyncRedisClient.m_bSubLoopRunning.store( true, memory_order_release) ; AsyncRedisClient.m_PubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, true) ; AsyncRedisClient.m_SubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, false) ; // Imposto e Definisco la funzione di CallBack per contesto PUB/SUB per Connessione/Disconnessione asincrona AsyncRedisClient.m_pRedisAsyncPubContext->data = &AsyncRedisClient.m_ConnectionPubStatus ; AsyncRedisClient.m_pRedisAsyncSubContext->data = &AsyncRedisClient.m_ConnectionSubStatus ; AsyncRedisClient.m_ConnectionPubStatus.bConnected.store( false, memory_order_release) ; AsyncRedisClient.m_ConnectionSubStatus.bConnected.store( false, memory_order_release) ; redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisConnectCallBack) ; redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisConnectCallBack) ; redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDisconnectCallBack) ; redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDisconnectCallBack) ; // Attendo la risposta di CallBack auto tStart = chrono::steady_clock::now() ; auto tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; while ( ! AsyncRedisClient.m_ConnectionPubStatus.bConnected.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; AsyncRedisClient.Clear() ; return false ; } // NB. non usare la wait( false) !!! se in rete perdo qualche informazione // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro this_thread::sleep_for( chrono::milliseconds( 10)) ; } while ( ! AsyncRedisClient.m_ConnectionSubStatus.bConnected.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; AsyncRedisClient.Clear() ; return false ; } // NB. non usare la wait( false) !!! se in rete perdo qualche informazione // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro this_thread::sleep_for( chrono::milliseconds( 10)) ; } LOG_INFO( GetCmdLogger(), "Connected to Redis !") #if DEBUG double dMsConnectionTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsConnectionTime) + " ms").c_str()) #endif // Se presente una Password, eseguo l'autenticazione if ( ! AsyncRedisClient.m_sPassword.empty()) { AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( false, memory_order_relaxed) ; AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( false, memory_order_relaxed) ; redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisAuthenticationCallBack, nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ; redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisAuthenticationCallBack, nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ; tStart = chrono::steady_clock::now() ; tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; while ( ! AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ; AsyncRedisClient.Clear() ; return false ; } // NB. non usare la wait( false) !!! se in rete perdo qualche informazione // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro this_thread::sleep_for( chrono::milliseconds( 10)) ; } while ( ! AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ; AsyncRedisClient.Clear() ; return false ; } // NB. non usare la wait( false) !!! se in rete perdo qualche informazione // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro this_thread::sleep_for( chrono::milliseconds( 10)) ; } LOG_INFO( GetCmdLogger(), "Authenticated to Redis !") #if DEBUG double dMsAuthenticationTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsAuthenticationTime) + " ms").c_str()) #endif } else { AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( true, memory_order_release) ; AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( true, memory_order_release) ; } // Se Presente una Selezione del Database, eseguo la selezione if ( AsyncRedisClient.m_nDataBase != DEFAULT_DATABASE) { AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( false, memory_order_relaxed) ; AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( false, memory_order_relaxed) ; redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDbSElectionCallBack, nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ; redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDbSElectionCallBack, nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ; tStart = chrono::steady_clock::now() ; tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; while ( ! AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ; AsyncRedisClient.Clear() ; return false ; } // NB. non usare la wait( false) !!! se in rete perdo qualche informazione // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro this_thread::sleep_for( chrono::milliseconds( 10)) ; } while ( ! AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ; AsyncRedisClient.Clear() ; return false ; } // NB. non usare la wait( false) !!! se in rete perdo qualche informazione // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro this_thread::sleep_for( chrono::milliseconds( 10)) ; } LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ; #if DEBUG double dMsDbSelectionTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDbSelectionTime) + " ms").c_str()) #endif } else { AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( true, memory_order_release) ; AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( true, memory_order_release) ; } // Restituisco e salvo l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua) nIdConnection = nMyIdConnection + 1 ; AsyncRedisClient.m_ConnectionPubStatus.nIdConnection = nIdConnection ; AsyncRedisClient.m_ConnectionSubStatus.nIdConnection = nIdConnection ; return true ; } //---------------------------------------------------------------------------- // Funzione per comando DISCONNECT Asincrono // --------------------------------------------------------------------------- bool ExeRedisAsyncDisconnect( int nIdConnection) { // Verifico che l'Id di connesione sia valido int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Recupero la connessione RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; // Verifico che la connessione sia utilizzata, e non libera if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } // Reset dei Flag prima della connessione AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.store( false, memory_order_relaxed) ; AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.store( false, memory_order_relaxed) ; AsyncRedisClient.m_ConnectionPubStatus.bAlive.store( false, memory_order_relaxed) ; AsyncRedisClient.m_ConnectionSubStatus.bAlive.store( false, memory_order_relaxed) ; // Effettuo la Disconnesione asincrona ( libera la memoria in automatico) redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncPubContext) ; redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncSubContext) ; // Attendo la CallBack di disconnessione auto tStart = chrono::steady_clock::now() ; auto tTimeOut = chrono::milliseconds( static_cast( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ; while ( ! AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.load( memory_order_acquire)) { AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.wait( false) ; if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; return false ; } } while ( ! AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.load( memory_order_acquire)) { AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.wait( false) ; if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; return false ; } } // Rendo libera la connessione AsyncRedisClient.m_bFreeConnection.store( true) ; AsyncRedisClient.Clear() ; // Elimino i Messaggi dalla mappa for ( auto Iter = s_vAsyncRedisMessages.begin() ; Iter != s_vAsyncRedisMessages.end() ; ) { if ( Iter->first.starts_with( GetMessageMapKey( nIdConnection, ""))) Iter = s_vAsyncRedisMessages.erase( Iter) ; else ++ Iter ; } LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ; #if DEBUG double dMsDisconnectionTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDisconnectionTime) + " ms").c_str()) #endif return true ; } // --------------------------------------------------------------------------- // Funzione per comando PUBLISH Asincrono // --------------------------------------------------------------------------- bool ExeRedisAsyncPublish( int nIdConnection, const string& sChannel, const string& sMessage) { // Verifico che l'Id di connesione sia valido int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } // Se non ho alcun canale -> errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") return false ; } // Se non ho alcun messaggio -> warning if ( sMessage.empty()) LOG_INFO( GetCmdLogger(), "Warning : Empty Message") ; // Numero massimo di Tentativi per effettuare PUBLISH const int MAX_RETRY = 3 ; for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) { // Rileggo il Client, dato che potrebbe essere cambiato dopo la riconnessione RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ; RedisConnectionStatus& curStatus = currClient.m_ConnectionPubStatus ; // Se la connessione non è attiva, provo a riconnetermi if ( ! currClient.m_ConnectionPubStatus.bAlive.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), ( string{ "Publish: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ; // Recupero la stringa di connessione corrente string sConnStr = s_vAsyncRedisClients[nMyIdConnection].m_sConnectionString ; // Recupero i Canali su cui ero iscritto unordered_set setSubChannels = currClient.m_set_SubChannels ; // Pulisco il contesto corrente AsyncRedisClient.Clear() ; int nNewId = 0 ; bool bOkReconnect = false ; if ( ExeRedisAsyncConnect( sConnStr, nNewId)) { // Se ho scelto lo stesso slot, aggiorno nMyIdConnection int nNewIndex = nNewId - 1 ; if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) { // Sostituisco il riferimento Client con il nuovo if ( nNewIndex != nMyIdConnection) nMyIdConnection = nNewIndex ; } bOkReconnect = true ; // Mi iscrivo a tutti i canali a cui ero precedentemente iscritto for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) { if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter)) return false ; } } else { LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ; // riprovo al ciclo successivo... } // Se non mi sono riconesso metto un piccolo Delay per estrema sicureza if ( ! bOkReconnect) { this_thread::sleep_for( chrono::milliseconds( 100)) ; continue ; } } // Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione) RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ; RedisConnectionStatus& nextStatus = nextClient.m_ConnectionPubStatus ; // Ricalcolo timeout dal client aggiornato auto tTimeOut = chrono::milliseconds( static_cast( nextClient.m_ConnectionInfo.nAsyncTimeout)) ; // Reset dello stato di pubblicazione nextStatus.bPublished.store( false, memory_order_relaxed) ; // Invio del comando PUBLISH int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncPubContext, RedisPubSubUnsubMsgCallback, nullptr, "PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) ; if ( nResult != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Publish: redisAsyncCommand failed") ; nextStatus.bAlive.store( false, memory_order_release) ; this_thread::sleep_for( chrono::milliseconds( 100)) ; continue ; } auto tStart = chrono::steady_clock::now() ; while ( ! nextStatus.bPublished.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ; break ; } // NB. non usare la wait( false) !!! se in rete perdo qualche informazione // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro this_thread::sleep_for( chrono::milliseconds( 10)) ; } if ( nextStatus.bPublished.load( memory_order_acquire)) { #if DEBUG double dMsPublishTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ; #endif return true ; } LOG_INFO( GetCmdLogger(), ( string{ "Publish failed, attempt "} + ToString( nAttempt)).c_str()) ; // Breve pausa prima del Retry this_thread::sleep_for( chrono::milliseconds( 100)) ; } // Tutti i tentativi sono falliti return false ; } //---------------------------------------------------------------------------- // Funzione per comando SUBSCRIBE Asincrono // --------------------------------------------------------------------------- bool ExeRedisAsyncSubscribe( int nIdConnection, const string& sChannel) { // Verifico che l'Id di connesione sia valido int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") ; return false ; } // Numero massimo di Tentativi per effettuare SUBSCRIBE const int MAX_RETRY = 3 ; for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) { // Rileggo il client e lo stato (potrebbero cambiare dopo reconnect) RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ; RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ; // Se la connessione non è viva, provo a riconnettere if ( ! currStatus.bAlive.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), ( string{ "Subscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ; // Recupero la stringa di connessione corrente string sConnStr = currClient.m_sConnectionString ; // Recupero i Canali su cui ero iscritto unordered_set setSubChannels = currClient.m_set_SubChannels ; // Pulisco il contesto corrente AsyncRedisClient.Clear() ; int nNewId = 0 ; bool bOkReconnect = false ; if ( ExeRedisAsyncConnect( sConnStr, nNewId)) { // Se ho scelto lo stesso slot, aggiorno nMyIdConnection int nNewIndex = nNewId - 1 ; if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) { // Sostituisco il riferimento Client con il nuovo if ( nNewIndex != nMyIdConnection) nMyIdConnection = nNewIndex ; } bOkReconnect = true ; // Mi iscrivo a tutti i canali a cui ero precedentemente iscritto for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) { if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter)) return false ; } } else { LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ; // riprovo al ciclo successivo... } // Se non mi sono riconesso metto un piccolo Delay per estrema sicureza if ( ! bOkReconnect) { this_thread::sleep_for( chrono::milliseconds( 100)) ; continue ; } } // Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione) RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ; RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ; // Ricalcolo timeout dal client aggiornato auto tTimeOut = chrono::milliseconds( static_cast( nextClient.m_ConnectionInfo.nAsyncTimeout)) ; // Reset dello stato di pubblicazione nextStatus.bSubscribed.store( false, memory_order_relaxed) ; int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr, "SUBSCRIBE %s", sChannel.c_str()) ; if ( nResult != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Subscribe: redisAsyncCommand failed") ; nextStatus.bAlive.store( false, memory_order_release) ; this_thread::sleep_for( chrono::milliseconds( 100)) ; continue ; } auto tStart = chrono::steady_clock::now() ; while ( ! nextStatus.bSubscribed.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ; break ; } // NB. non usare la wait( false) !!! se in rete perdo qualche informazione // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro this_thread::sleep_for( chrono::milliseconds( 10)) ; } if ( nextStatus.bSubscribed.load( memory_order_acquire)) { // Uso l'ID corrente ( aggiornato) per la chiave messaggi int nEffectiveId = nMyIdConnection + 1 ; string sKey = GetMessageMapKey( nEffectiveId, sChannel) ; s_vAsyncRedisMessages.try_emplace( sKey) ; s_vAsyncRedisMessages[sKey].Lock.clear() ; #if DEBUG double dMsPublishTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ; #endif // Memorizzo il canale nella set dei canali nextClient.m_set_SubChannels.insert( sChannel) ; return true ; } LOG_INFO( GetCmdLogger(), ( string{ "Subscribe failed, attempt "} + ToString( nAttempt)).c_str()) ; // Breve pausa prima del Retry this_thread::sleep_for( chrono::milliseconds( 100)) ; } // Tutti i tentativi sono falliti return false ; } // --------------------------------------------------------------------------- // Funzione per comando UNSUBSCRIBE Asincrono // --------------------------------------------------------------------------- bool ExeRedisAsyncUnsubscribe( int nIdConnection, const string& sChannel) { // Verifico che l'Id di connesione sia valido int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") return false ; } const int MAX_RETRY = 3 ; for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) { // Rileggo il client e lo stato ( potrebbero cambiare dopo reconnect) RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ; RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ; // Se la connessione non è viva, provo a riconnettere if ( ! currStatus.bAlive.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), ( string{ "Unubscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ; // Recupero la stringa di connessione corrente string sConnStr = currClient.m_sConnectionString ; // Recupero i Canali su cui ero iscritto unordered_set setSubChannels = currClient.m_set_SubChannels ; // Pulisco il contesto corrente AsyncRedisClient.Clear() ; int nNewId = 0 ; bool bOkReconnect = false ; if ( ExeRedisAsyncConnect( sConnStr, nNewId)) { // Se ho scelto lo stesso slot, aggiorno nMyIdConnection int nNewIndex = nNewId - 1 ; if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) { // Sostituisco il riferimento Client con il nuovo if ( nNewIndex != nMyIdConnection) nMyIdConnection = nNewIndex ; } bOkReconnect = true ; // Mi iscrivo a tutti i canali a cui ero precedentemente iscritto for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) { if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter)) return false ; } } else { LOG_INFO( GetCmdLogger(), "Unsubscribe: reconnect attempt failed") ; // riprovo al ciclo successivo... } // Se non mi sono riconesso metto un piccolo Delay per estrema sicureza if ( ! bOkReconnect) { this_thread::sleep_for( chrono::milliseconds( 100)) ; continue ; } } // Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione) RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ; RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ; // Ricalcolo timeout dal client aggiornato auto tTimeOut = chrono::milliseconds( static_cast( nextClient.m_ConnectionInfo.nAsyncTimeout)) ; // Reset dello stato di pubblicazione nextStatus.bUnsubscribed.store( false, memory_order_relaxed) ; int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr, "UNSUBSCRIBE %s", sChannel.c_str()) ; if ( nResult != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed") ; nextStatus.bAlive.store( false, memory_order_release) ; this_thread::sleep_for( chrono::milliseconds( 100)) ; continue ; } auto tStart = chrono::steady_clock::now() ; while ( ! nextStatus.bUnsubscribed.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Unsubscribe: timeout waiting for publish reply") ; break ; } // NB. non usare la wait( false) !!! se in rete perdo qualche informazione // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro this_thread::sleep_for( chrono::milliseconds( 10)) ; } if ( nextStatus.bUnsubscribed.load( memory_order_acquire)) { // Rimuovo la chiave dalla mappa messaggi usando l'ID effettivo (aggiornato) int nEffectiveId = nMyIdConnection + 1 ; string keyToRemove = GetMessageMapKey( nEffectiveId, sChannel) ; for ( auto it = s_vAsyncRedisMessages.begin() ; it != s_vAsyncRedisMessages.end() ; ) { if ( it->first == keyToRemove) it = s_vAsyncRedisMessages.erase( it) ; else ++ it ; } #if DEBUG double dMsUnsubscribeTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsUnsubscribeTime) + " ms").c_str()) ; #endif // Tolgo il canale dalla lista nextClient.m_set_SubChannels.erase( sChannel) ; return true ; } LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribe failed, attempt "} + ToString( nAttempt)).c_str()) ; // Breve pausa prima del Retry this_thread::sleep_for( chrono::milliseconds( 100)) ; } // Tutti i tentativi sono falliti return false ; } // --------------------------------------------------------------------------- // La funzione esegue una Subscribe e una Unsubscribe // Se entro il tempo stabilito riceve un messaggio, lo comunica, altrimenti errore // --------------------------------------------------------------------------- bool ExeRedisAsyncSubscribeOneMessage( int nIdConnection, const string& sChannel, double dMaxTimeOut, string& sMessage) { // Verifico che l'Id di connesione sia valido int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") return false ; } // Controllo validità tempo di TimeOut if ( dMaxTimeOut <= 0.) { LOG_INFO( GetCmdLogger(), "Error : Invalid timeout value") ; return false ; } // Recupero la connessione RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; // Invio del comando di SUBSCRIBE AsyncRedisClient.m_ConnectionSubStatus.bMessage = false ; AsyncRedisClient.m_ConnectionSubStatus.sMessage = "" ; if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr, ( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to subscribe to"} + sChannel).c_str()) return false ; } // Attivo il TimeOut per l'attesa del messaggio auto tStart = chrono::steady_clock::now() ; auto tTimeOut = chrono::milliseconds( static_cast( dMaxTimeOut)) ; while ( ! AsyncRedisClient.m_ConnectionSubStatus.bMessage) { this_thread::sleep_for( chrono::milliseconds( 10)) ; if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), ( string{ "Timeout : No Message received on ["} + sChannel + "]").c_str()) return false ; } } // Invio del comando UNSUBSCRIBE if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr, ( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to Unsubscribe to "} + sChannel).c_str()) return false ; } // Controllo se ho ricevuto un messaggio if ( AsyncRedisClient.m_ConnectionSubStatus.bMessage) { sMessage = AsyncRedisClient.m_ConnectionSubStatus.sMessage ; LOG_INFO( GetCmdLogger(), ( string{ "Message received on ["} + sChannel + "]: " + sMessage).c_str()) return true ; } #if DEBUG LOG_INFO( GetCmdLogger(), ( string{ "Timeout: No message received on ["} + sChannel + "]").c_str()) ; #endif return false ; } // --------------------------------------------------------------------------- // Funzione per lettura Asicnrona dell'ultimo messaggio su un canale // --------------------------------------------------------------------------- bool ExeRedisAsyncGetMessage( int nIdConnection, const string& sChannel, int& nCount, string& sMessage) { // Verifico che l'Id di connesione sia valido int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") return false ; } // Recupero la Chiave di accesso alla Mappa dei Messaggi string sKey = GetMessageMapKey( nIdConnection, sChannel) ; auto Iter = s_vAsyncRedisMessages.find( sKey) ; if ( Iter == s_vAsyncRedisMessages.end()) { LOG_INFO( GetCmdLogger(), "Error : Connection not Subscribed to this Channel") ; return false ; } else { RedisAsyncMessage& LockedMessage = Iter->second ; while ( LockedMessage.Lock.test_and_set( memory_order_acquire)) LockedMessage.Lock.wait( true, memory_order_relaxed) ; nCount = LockedMessage.nCount ; sMessage = LockedMessage.sMessage ; LockedMessage.Lock.clear( memory_order_release) ; LockedMessage.Lock.notify_one() ; } return true ; }