diff --git a/EXE_Redis.cpp b/EXE_Redis.cpp index 1830f17..fd58047 100644 --- a/EXE_Redis.cpp +++ b/EXE_Redis.cpp @@ -6,7 +6,8 @@ // // // -// Modifiche : 17.09.25 RE Creazione modulo. +// Modifiche : 17.09.25 RE Creazione modulo. ( ver. 2.7i3) +// Modifiche : 31.10.25 RE Aggiunta gestione connessioni multiple // // //---------------------------------------------------------------------------- @@ -32,6 +33,11 @@ using namespace std ; +// --------------------------------------------------------------------------- +// Ambiente Debug +// --------------------------------------------------------------------------- +#define DEBUG 0 + // --------------------------------------------------------------------------- // Definizione variabili generali Redis // --------------------------------------------------------------------------- @@ -43,23 +49,8 @@ static const string DEFAULT_USER = "default" ; static const int DEFAULT_DATABASE = 0 ; static const int DEFAULT_TIMEOUT = 15000 ; // [ms] static const int DEFAULT_ALIVE_TIME = 180 ; // [s] - -// Contesto Sincrono -static redisContext* s_pRedisContext = nullptr ; -// Contesto Asincrono -static redisAsyncContext* s_pRedisAsyncSubContext = nullptr ; -static redisAsyncContext* s_pRedisAsyncPubContext = nullptr ; -static atomic s_nPendingDataBase = REDIS_MIN_DB ; // Default -static string s_sPassword = "" ; // Default -static string s_sUser = DEFAULT_USER ; // Default -static atomic s_bSubConnected = false ; -static atomic s_bPubConnected = false ; -static atomic s_bPubLoopRunning = false ; -static atomic s_bSubLoopRunning = false ; -static atomic s_bMessage = false ; -static string s_sMessage ; - -// Tipo di valore recuperabile mediante una chiave di tipo string +static const int MAX_CONNECTION_NBR = 10 ; +static const int NO_CONNECTION = - 1 ; static const string KEY_TYPE_STRING = "string" ; static const string KEY_TYPE_LIST = "list" ; // non usato static const string KEY_TYPE_SET = "set" ; // non usato @@ -67,7 +58,7 @@ static const string KEY_TYPE_HASH = "hash" ; // non usato static const string KEY_TYPE_ZSET = "zset" ; // non usato static const string KEY_TYPE_JSON = "ReJSON-RL" ; // non usato, disponibile se versione >= 8 ( noi usiamo la 6) -// struttura per parametri della stringa di Connessione +// Struttura per parametri della stringa di Connessione struct RedisConnectionInfo { string sHost, sServiceName, sUser, sPassword ; @@ -80,6 +71,7 @@ struct RedisConnectionInfo { nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ; bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ; } ; + RedisConnectionInfo( string sMyHost, string sMyServiceName, string sMyUser, string sMyPassword, int nMyPort, int nMyDefaultDataBase, int dMyKeepAlive, int dMyConnectTimeout, int dMySyncTimeout, int dMyAsyncTimeout, bool bMyAbortConnect, bool bMySsl, bool bMyAllowAdmin) @@ -87,10 +79,183 @@ struct RedisConnectionInfo { nDefaultDataBase( nMyDefaultDataBase), nKeepAlive( dMyKeepAlive), nConnectTimeout( dMyConnectTimeout), nSyncTimeout( dMySyncTimeout), nAsyncTimeout( dMyAsyncTimeout), bAbortConnect( bMyAbortConnect), bSsl( bMySsl), bAllowAdmin( bMyAllowAdmin) {} + + inline void Clear() { + sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ; + nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ; + nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ; + bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ; + } + } ; +// Struttura per Connessiona Sincrona +struct RedisSync { + + atomic bFreeConnection ; + redisContext* pRedisContext ; + RedisConnectionInfo ConnectionInfo ; + + RedisSync() + : bFreeConnection( true), pRedisContext( nullptr), ConnectionInfo() {} + + inline void Clear() { + bFreeConnection = true ; + redisFree( pRedisContext) ; + pRedisContext = nullptr ; + ConnectionInfo.Clear() ; + } + +} ; +static array vRedisClients ; + +// Struttura per stato di Connessione Asincrona +struct RedisConnectionStatus { + + atomic bConnected ; + atomic bAuthenticated ; + atomic bSelectedDB ; + atomic bDisconnected ; + atomic bPublished ; + atomic bSubscribed ; + atomic bUnsubscribed ; + atomic bMessage ; + string sMessage ; // univoco grazie a bMessage + + RedisConnectionStatus() + : bConnected( false), bAuthenticated( false), bSelectedDB( false), bDisconnected( false), + bPublished( false), bSubscribed( false), bUnsubscribed( false), bMessage( false), + sMessage( "") {} + + inline void Clear() { + bConnected = false ; + bAuthenticated = false ; + bSelectedDB = false ; + bDisconnected = false ; + bPublished = false ; + bSubscribed = false ; + bUnsubscribed = false ; + bMessage = false ; + sMessage = "" ; + } + +} ; + +// Classe per Connessione Asincrona +class RedisAsync { + + public : + atomic m_bFreeConnection ; + redisAsyncContext* m_pRedisAsyncPubContext ; + redisAsyncContext* m_pRedisAsyncSubContext ; + int m_nDataBase ; + string m_sPassword ; + string m_sUser ; + atomic m_bPubLoopRunning ; + atomic m_bSubLoopRunning ; + RedisConnectionInfo m_ConnectionInfo ; + RedisConnectionStatus m_ConnectionSubStatus ; + RedisConnectionStatus m_ConnectionPubStatus ; + WSADATA m_wsaData ; + + RedisAsync() + : m_bFreeConnection( true), m_pRedisAsyncPubContext( nullptr), m_pRedisAsyncSubContext( nullptr), + m_nDataBase( REDIS_MIN_DB), m_sPassword( ""), m_sUser( DEFAULT_USER), + m_bPubLoopRunning( false), m_bSubLoopRunning( false), + m_ConnectionInfo(), m_ConnectionSubStatus(), m_ConnectionPubStatus() {} ; + + inline void Clear() { + m_bFreeConnection = true ; + redisAsyncFree( m_pRedisAsyncPubContext) ; + m_pRedisAsyncPubContext = nullptr ; + redisAsyncFree( m_pRedisAsyncSubContext) ; + m_pRedisAsyncSubContext = nullptr ; + m_nDataBase = REDIS_MIN_DB ; + m_sPassword = "" ; + m_sUser = DEFAULT_USER ; + m_bPubLoopRunning = false ; + m_bSubLoopRunning = false ; + m_ConnectionInfo.Clear() ; + m_ConnectionSubStatus.Clear() ; + m_ConnectionPubStatus.Clear() ; + } + + public : + void RedisEventLoop( bool bIsSub) const ; + +} ; +static array vAsyncRedisClients ; + // --------------------------------------------------------------------------- -// -------- Funzione di interpretazione della stringa di connessione --------- +// [Redis] Funzione Ciclo degli eventi per Read/Write su Socket +// --------------------------------------------------------------------------- +void +RedisAsync::RedisEventLoop( bool bIsPub) const +{ + // Verifica della validità del contesto + redisAsyncContext* ctx = nullptr ; + if ( bIsPub) + ctx = m_pRedisAsyncPubContext ; + else + ctx = m_pRedisAsyncSubContext ; + if ( ctx == nullptr || ctx->err != 0) + return ; // se attivazione loop prima della inizializzazione callBack + + // Recupero del file descriptor del socket TCP usato da Redis. + SOCKET sock = ctx->c.fd ; + // Imposto condizione di ascolto degli eventi + bool bLoopRunning = bIsPub ? m_bPubLoopRunning : m_bSubLoopRunning ; + // Creazione dei due insiemi per il file descriptor ( lettura e scrittura ) + // r = read, w = write + fd_set rfds, wfds ; + // Timeout per funzione select { secondi, millisecondi} + // La funzione select attende che il socket sia pronto per lettura e scrittura + // Resituisce : + // SOCKET_ERROR in caso di errore + // 0 se scade il timeout + // > 0 se ci sono eventi da gestire + timeval timeout = {0, 100000 } ; // 100ms + + // Ciclo + while ( bLoopRunning) { + // Se contesto non valido, interrompo il ciclo + if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING)) + break ; + // Recupero file descriptor di lettura e scrittura + FD_ZERO( &rfds) ; + FD_ZERO( &wfds) ; + FD_SET( sock, &rfds) ; + FD_SET( sock, &wfds) ; + // Controllo il risultato ottenuto + int nRet = select( 0, &rfds, &wfds, nullptr, &timeout) ; + if ( nRet == SOCKET_ERROR) { + // Se errore -> interruzione del ciclo + LOG_INFO( GetCmdLogger(), ( string{ "Error : Socket select() failed -> "} + + ToString( WSAGetLastError())).c_str()) ; + break ; + } + else if ( nRet > 0) { + // Se Socket pronto per lettura/ scrittua + if ( ctx != nullptr && ! ( ctx->c.flags & REDIS_DISCONNECTING) && ctx->c.fd != -1) { + if ( FD_ISSET( sock, &rfds)) + redisAsyncHandleRead( ctx) ; + if ( FD_ISSET( sock, &wfds)) + redisAsyncHandleWrite( ctx) ; + } + // Evito saturazione CPU + this_thread::sleep_for( chrono::milliseconds( 10)) ; + } + + // Reset del timeOut ad ogni ciclo + timeout.tv_sec = 0 ; + timeout.tv_usec = 100000 ; // 100ms + } + + return ; +} + +// --------------------------------------------------------------------------- +// [Utility] Funzione di Split parametri per Stringa di Connessione // --------------------------------------------------------------------------- static bool GetParamsFromConnectionString( const string& sConnection, RedisConnectionInfo& ConnectionInfo) @@ -129,6 +294,11 @@ GetParamsFromConnectionString( const string& sConnection, RedisConnectionInfo& C GetValInNotes( sParams, "user", ",", ConnectionInfo.sUser) ; GetValInNotes( sParams, "password", ",", ConnectionInfo.sPassword) ; GetValInNotes( sParams, "DefaultDatabase", ",", ConnectionInfo.nDefaultDataBase) ; + if ( ConnectionInfo.nDefaultDataBase < 0 || ConnectionInfo.nDefaultDataBase > REDIS_MAX_DB) { + ConnectionInfo.Clear() ; + LOG_INFO( GetCmdLogger(), "Error : DB Out of range [0, 15]") + return false ; + } GetValInNotes( sParams, "keepAlive", ",", ConnectionInfo.nKeepAlive) ; GetValInNotes( sParams, "connectTimeout", ",", ConnectionInfo.nConnectTimeout) ; GetValInNotes( sParams, "syncTimeout", ",", ConnectionInfo.nSyncTimeout) ; @@ -140,36 +310,61 @@ GetParamsFromConnectionString( const string& sConnection, RedisConnectionInfo& C return true ; } -// --------------------------------------------------------------------------- -// ------------------------------ Sync --------------------------------------- //---------------------------------------------------------------------------- +// [Utility] Funzione di Controllo per Id Connessione +// --------------------------------------------------------------------------- +static bool +CheckIdConnection( int nIdConnection) +{ + if ( nIdConnection < 0 || nIdConnection >= MAX_CONNECTION_NBR) { + LOG_INFO( GetCmdLogger(), ( string{ "Error : Not a Valid id Connestion : [1, "} + ToString( MAX_CONNECTION_NBR) + "]").c_str()) + return false ; + } + return true ; +} //---------------------------------------------------------------------------- -// Funzione di Connessione sincrona a Redis -//---------------------------------------------------------------------------- +// Funzione per comando CONNECT Sincrono +// --------------------------------------------------------------------------- bool -ExeRedisConnect( const string& sConnection) +ExeRedisConnect( const string& sConnection, int& nIdConnection) { - // Se connesione già presente, termino e creo la nuova - if ( s_pRedisContext != nullptr) - redisFree( s_pRedisContext) ; + // Inizializzo una nuova connessione, recuperando l'Id + int nMyIdConnection = NO_CONNECTION ; + for ( int i = 0 ; i < int( vRedisClients.size()) ; ++ i) { + if ( vRedisClients[i].bFreeConnection) { + nMyIdConnection = i ; + break ; + } + } + if ( nMyIdConnection == NO_CONNECTION) { + LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} + + ToString( MAX_CONNECTION_NBR) + " )").c_str()) + return false ; + } + + // Recupero i riferimenti necessari e blocco la connessione corrente + RedisSync& SyncRedisClient = vRedisClients[nMyIdConnection] ; + SyncRedisClient.bFreeConnection = false ; + RedisConnectionInfo& ConnectionInfo = SyncRedisClient.ConnectionInfo ; // Recupero i parametri dalla stringa di connessione - RedisConnectionInfo ConnectionInfo ; - if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) + if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) { + SyncRedisClient.Clear() ; return false ; + } // Se connessione senza TimeOut if ( ConnectionInfo.nSyncTimeout <= 0) { // Imposto il contesto per connesione sincrona - s_pRedisContext = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; - if ( s_pRedisContext == nullptr) { + SyncRedisClient.pRedisContext = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; + if ( SyncRedisClient.pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } - else if ( s_pRedisContext->err != 0) { - LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str()) - redisFree( s_pRedisContext) ; + else if ( SyncRedisClient.pRedisContext->err != 0) { + LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) + SyncRedisClient.Clear() ; return false ; } } @@ -181,14 +376,14 @@ ExeRedisConnect( const string& sConnection) TimeOutConnection.tv_usec = static_cast( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ; // Imposto il contesto per connesione sincrona con TimeOut - s_pRedisContext = redisConnectWithTimeout( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort, TimeOutConnection) ; - if ( s_pRedisContext == nullptr) { + SyncRedisClient.pRedisContext = redisConnectWithTimeout( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort, TimeOutConnection) ; + if ( SyncRedisClient.pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } - else if ( s_pRedisContext->err != 0) { - LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str()) - redisFree( s_pRedisContext) ; + else if ( SyncRedisClient.pRedisContext->err != 0) { + LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) + SyncRedisClient.Clear() ; return false ; } @@ -198,16 +393,15 @@ ExeRedisConnect( const string& sConnection) TimeOutIO.tv_usec = static_cast( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ; // Imposto TimeOut per I/O - int nResult = redisSetTimeout( s_pRedisContext, TimeOutIO) ; + int nResult = redisSetTimeout( SyncRedisClient.pRedisContext, TimeOutIO) ; if ( nResult != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : Can't set timeout I/O") ; return false ; } } - LOG_INFO( GetCmdLogger(), "Sync connection to redis !") - // Verifico se la connessione è di tipo Sentinel - redisReply* replySentinel = ( redisReply*)redisCommand( s_pRedisContext, "SENTINEL get-master-addr-by-name %s", + // Verifico se la Connessione è di tipo Sentinel + redisReply* replySentinel = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SENTINEL get-master-addr-by-name %s", ConnectionInfo.sServiceName.c_str()) ; if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) { if ( replySentinel != nullptr) @@ -222,132 +416,139 @@ ExeRedisConnect( const string& sConnection) freeReplyObject( replySentinel) ; // Effettuo la connessione al master redis - s_pRedisContext = redisConnect( sMasterHost.c_str(), nMasterPort) ; - if ( s_pRedisContext == nullptr) { + freeReplyObject( SyncRedisClient.pRedisContext) ; + SyncRedisClient.pRedisContext = redisConnect( sMasterHost.c_str(), nMasterPort) ; + if ( SyncRedisClient.pRedisContext == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") return false ; } - else if ( s_pRedisContext->err != 0) { - LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str()) - redisFree( s_pRedisContext) ; + else if ( SyncRedisClient.pRedisContext->err != 0) { + LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) + SyncRedisClient.Clear() ; return false ; } } + LOG_INFO( GetCmdLogger(), "Connected to Redis !") // Verifico se richiesta autenticazione if ( ! ConnectionInfo.sPassword.empty()) { - redisReply* reply = ( redisReply*)redisCommand( s_pRedisContext, "AUTH %s %s", ConnectionInfo.sUser.c_str(), ConnectionInfo.sPassword.c_str()) ; + redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "AUTH %s %s", + ConnectionInfo.sUser.c_str(), ConnectionInfo.sPassword.c_str()) ; if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { - LOG_INFO(GetCmdLogger(), "Error : Authentication failed") ; + LOG_INFO( GetCmdLogger(), "Error : Authentication failed") ; if ( reply == nullptr) freeReplyObject( reply) ; - redisFree( s_pRedisContext) ; + SyncRedisClient.Clear() ; return false ; } freeReplyObject( reply) ; } + LOG_INFO( GetCmdLogger(), "Authenticated to Redis !") // Seleziono il DataBase - redisReply* reply = ( redisReply*)redisCommand( s_pRedisContext, "SELECT %d", ConnectionInfo.nDefaultDataBase) ; + redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SELECT %d", ConnectionInfo.nDefaultDataBase) ; if ( reply == nullptr) { - LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + s_pRedisContext->errstr).c_str()) ; + LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) ; return false ; } else if ( reply->type == REDIS_REPLY_ERROR) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + reply->str).c_str()) freeReplyObject( reply) ; + SyncRedisClient.Clear() ; return false ; } - LOG_INFO( GetCmdLogger(), ( string{ "Connected to DB #"} + ToString( ConnectionInfo.nDefaultDataBase) + " !").c_str()) - freeReplyObject( reply) ; + LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ; + + // Restituisco l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua) + nIdConnection = nMyIdConnection + 1 ; return true ; } //---------------------------------------------------------------------------- -// Funzione di Disconnessione sincrona a Redis -//---------------------------------------------------------------------------- +// Funzione per comando DISCONNECT Sincrono +// --------------------------------------------------------------------------- bool -ExeRedisDisconnect( void) +ExeRedisDisconnect( int nIdConnection) { - // Se connessione non presente - if ( s_pRedisContext == nullptr) { - LOG_INFO( GetCmdLogger(), "Error : Closing a Sync Connection never created") ; + // Verifico che l'Id di connesione sia valido + int nMyIdConnection = nIdConnection - 1 ; + if ( ! CheckIdConnection( nMyIdConnection)) + return false ; + // Verifico che la connessione sia attiva + if ( vRedisClients[nMyIdConnection].bFreeConnection) { + LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } - // Se connessione in stato di errore - if ( s_pRedisContext->err != 0) { - LOG_INFO( GetCmdLogger(), "Warning : Closing Redis Sync Connection in error") ; - redisFree( s_pRedisContext) ; - s_pRedisContext = nullptr ; - return false ; - } + // Pulisco la connessione corrente + vRedisClients[nMyIdConnection].Clear() ; + LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ; - // Effettuo disconnessione sincrona - redisFree( s_pRedisContext) ; - s_pRedisContext = nullptr ; - LOG_INFO( GetCmdLogger(), "Sync Connection Closed !") ; return true ; } //---------------------------------------------------------------------------- -// Funzione di scrittura sincrona chiave-valore +// Funzione per comando SET Sincrono ( sola scrittura {key:string, val:string}) //---------------------------------------------------------------------------- bool -ExeRedisSetValFromKey( const string& sKey, const string& sVal) +ExeRedisSetValFromKey( int nIdConnection, const string& sKey, const string& sVal) { - // Se connesione non presente, errore - if ( s_pRedisContext == nullptr) { - LOG_INFO( GetCmdLogger(), "Error : No Connection") ; - return false ; - } - // Se connessione in stato di errore, allora errore - if ( s_pRedisContext->err != 0) { - LOG_INFO( GetCmdLogger(), "Error : Invalid Connection") ; + // Verifico che l'Id di connesione sia valido + int nMyIdConnection = nIdConnection - 1 ; + if ( ! CheckIdConnection( nMyIdConnection)) + return false ; + // Verifico che la connessione sia attiva + if ( vRedisClients[nMyIdConnection].bFreeConnection) { + LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } + // Recupero la connessione corrente + RedisSync& RedisClient = vRedisClients[nMyIdConnection] ; + // Il valore associato alla chiave può essere solo di tipo stringa - redisReply* reply = ( redisReply*)redisCommand( s_pRedisContext, "SET %s %s", sKey.c_str(), sVal.c_str()) ; + redisReply* reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "SET %s %s", sKey.c_str(), sVal.c_str()) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Null reply") ; return false ; } - else if ( reply->type != REDIS_REPLY_STATUS || string( reply->str ) != "OK") { + else if ( reply->type != REDIS_REPLY_STATUS || string( reply->str ) != "OK") { LOG_INFO( GetCmdLogger(), "Error SET : No answer") ; freeReplyObject( reply) ; return false ; } - LOG_INFO( GetCmdLogger(), ( string{ "New Key Added : ["} + sKey + ":" + sVal + "]").c_str()) - freeReplyObject( reply) ; + + LOG_INFO( GetCmdLogger(), ( string{ "New Key Set : ["} + sKey + ":" + sVal + "]").c_str()) return true ; } //---------------------------------------------------------------------------- -// Funzione di lettura sincrona per valore di una chiave +// Funzione per comando Get Sincrono ( sola lettura {key:string, val:string}) //---------------------------------------------------------------------------- bool -ExeRedisGetValFromKey( const string& sKey, string& sVal) +ExeRedisGetValFromKey( int nIdConnection, const string& sKey, string& sVal) { sVal.clear() ; - // Se connesione non presente, errore - if ( s_pRedisContext == nullptr) { - LOG_INFO( GetCmdLogger(), "Error : No Connection") ; + // Verifico che l'Id di connesione sia valido + int nMyIdConnection = nIdConnection - 1 ; + if ( ! CheckIdConnection( nMyIdConnection)) return false ; - } - // Se connessione in stato di errore, allora errore - if ( s_pRedisContext->err != 0) { - LOG_INFO( GetCmdLogger(), "Error : Invalid Connection") ; + // Verifico che la connessione sia attiva + if ( vRedisClients[nMyIdConnection].bFreeConnection) { + LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } + // Recupero la connessione corrente + RedisSync& RedisClient = vRedisClients[nMyIdConnection] ; + // Recupero il tipo associato alla chiave da Redis - redisReply* typeReply = ( redisReply*)redisCommand( s_pRedisContext, "TYPE %s", sKey.c_str()) ; + redisReply* typeReply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "TYPE %s", sKey.c_str()) ; if ( typeReply == nullptr || typeReply->type != REDIS_REPLY_STATUS) { string sErr = ( typeReply != nullptr ? string( typeReply->str) : "null reply") ; - if ( typeReply == nullptr) + if ( typeReply != nullptr) freeReplyObject( typeReply) ; LOG_INFO( GetCmdLogger(), ( "Error : Failed to get key type, " + sErr).c_str()) ; return false ; @@ -365,7 +566,7 @@ ExeRedisGetValFromKey( const string& sKey, string& sVal) // Effettuo la chiamata redisReply* reply = nullptr ; - reply = ( redisReply*)redisCommand( s_pRedisContext, sFormat.c_str(), sKey.c_str()) ; + reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, sFormat.c_str(), sKey.c_str()) ; if ( reply == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Null reply") ; return false ; @@ -380,144 +581,86 @@ ExeRedisGetValFromKey( const string& sKey, string& sVal) } sVal = string( reply->str) ; } + LOG_INFO( GetCmdLogger(), ( string{ "Key Read : ["} + sKey + ":" + sVal + "]").c_str()) freeReplyObject( reply) ; return true ; } // --------------------------------------------------------------------------- -// ------------------------------ ASync -------------------------------------- -//---------------------------------------------------------------------------- - -//---------------------------------------------------------------------------- -// Funzione Ciclo degli eventi per chiamate CallBack -//---------------------------------------------------------------------------- +// [CB] Funzione di CallBack per comando SELECT Asincrono ( Selezione DB) +// --------------------------------------------------------------------------- static void -RedisEventLoop( redisAsyncContext* ctx, bool bIsPub) +RedisDbSElectionCallBack( redisAsyncContext* ctx, void* r, void*) { - // Verifica della validità del contesto - if ( ctx == nullptr || ctx->err != 0) { - LOG_INFO( GetCmdLogger(), ( string{ "Error: Invalid Redis context at start of event loop for " } + - ( bIsPub ? "Pub" : "Sub")).c_str()) + // Invio la richiesta + redisReply* reply = static_cast( r) ; + if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { + string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; + LOG_INFO( GetCmdLogger(), ( string{ "Error : Db Selection -> "} + sErrMsg).c_str()) ; + return ; + } + // Imposto flag di Autenticazione + RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; + if ( MyStatus == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } - - // Recupero del file descriptor del socket TCP usato da Redis. - SOCKET sock = ctx->c.fd ; - // Imposto condizione di ascolto degli eventi - bool bLoopRunning = bIsPub ? s_bPubLoopRunning : s_bSubLoopRunning ; - // Creazione dei due insiemi per il file descriptor ( lettura e scrittura ) - // r = read, w = write - fd_set rfds, wfds ; - // Timeout per funzione select { secondi, millisecondi} - // La funzione select attende che il socket sia pronto per lettura e scrittura - // Resituisce : - // SOCKET_ERROR in caso di errore - // 0 se scade il timeout - // > 0 se ci sono eventi da gestire - timeval timeout = {0, 100000 } ; // 100ms - - // Ciclo - while ( bLoopRunning) { - // Se contesto non valido, interrompo il ciclo - if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING)) { - LOG_INFO ( GetCmdLogger(), ( string{ "Redis event loop terminated for Async "} + - ( bIsPub ? "Pub" : "Sub") + " Events").c_str()) - break ; - } - // Recupero file descriptor di lettura e scrittura - FD_ZERO( &rfds) ; - FD_ZERO( &wfds) ; - FD_SET( sock, &rfds) ; - FD_SET( sock, &wfds) ; - // Controllo il risultato ottenuto - int nRet = select( 0, &rfds, &wfds, nullptr, &timeout) ; - if ( nRet == SOCKET_ERROR) { - // Se errore -> interruzione del ciclo - LOG_INFO( GetCmdLogger(), ( string{ "Error : Socket select() failed -> "} + - ToString( WSAGetLastError())).c_str()) ; - break ; - } - else if ( nRet > 0) { - // Se Socket pronto per lettura/ scrittua - if ( ctx != nullptr && ! ( ctx->c.flags & REDIS_DISCONNECTING) && ctx->c.fd != -1) { - if ( FD_ISSET( sock, &rfds)) - redisAsyncHandleRead( ctx) ; - if ( FD_ISSET( sock, &wfds)) - redisAsyncHandleWrite( ctx) ; - } - // Evito saturazione CPU - this_thread::sleep_for( chrono::milliseconds( 10)) ; - } - - // Reset del timeOut ad ogni ciclo - timeout.tv_sec = 0 ; - timeout.tv_usec = 100000 ; // 100ms - } - - return ; + MyStatus->bSelectedDB = true ; } //---------------------------------------------------------------------------- -// Funzione di controllo che la connessione sia attiva e il contesto valido +// [CB] Funzione di CallBack per comando AUTH Asincrono ( Autenticazione) // --------------------------------------------------------------------------- -static bool -CheckConnectionAndContext( redisAsyncContext* ctx, bool bIsPub) +static void +RedisAuthenticationCallBack( redisAsyncContext* ctx, void* r, void*) { - // Se contesto nullo -> Errore - if ( ctx == nullptr) { - LOG_INFO( GetCmdLogger(), ( string{ "Error : Invalid Context for "} + - ( bIsPub ? "Pub" : "Sub")).c_str()) - return false ; + // Invio la richiesta + redisReply* reply = static_cast( r) ; + if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { + string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; + LOG_INFO( GetCmdLogger(), ( string{ "Error : Authentication -> "} + sErrMsg).c_str()) ; + return ; } - - // Se connessione non effettuata -> Errore - if ( bIsPub && ! s_bPubConnected) { - LOG_INFO( GetCmdLogger(), "Error : aSync Pub connection not connected") ; - redisAsyncFree( ctx) ; - ctx = nullptr ; - return false ; + // Imposto flag di Autenticazione + RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; + if ( MyStatus == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; + return ; } - if ( ! bIsPub && ! s_bSubConnected) { - LOG_INFO( GetCmdLogger(), "Error : aSync Sub connection not closed") ; - redisAsyncFree( ctx) ; - ctx = nullptr ; - return false ; - } - - // Se connessione effettuata ma in stato di errore -> Errore - if ( ctx->err != 0) { - LOG_INFO( GetCmdLogger(), ( string{ "Warning : Closing aSync Connection in error for "} + - ( bIsPub ? "Pub" : "Sub")).c_str()) ; - redisAsyncFree( ctx) ; - ctx = nullptr ; - return false ; - } - - return true ; + MyStatus->bAuthenticated = true ; } -//---------------------------------------------------------------------------- -// Funzione di CallBack per PUBLISH, SUBSCRIBE e UNSUBSCRIBE -//---------------------------------------------------------------------------- +// --------------------------------------------------------------------------- +// [CB] Funzione di CallBack per comandi PUBLISH, SUBSCRIBE, MESSAGE, UNSUBSCRIBE +// Asincroni +// --------------------------------------------------------------------------- static void -MessageCallback( redisAsyncContext* ctx, void* r, void*) +RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*) { // Verifico che il contesto sia definito if ( ctx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack") return ; } - LOG_INFO( GetCmdLogger(), "RedisMessageCallback invoked") - // Recupero della risposta dal server redis + // Recupero della risposta dal server redis ( nullptr nel caso di residui) redisReply* reply = static_cast( r) ; - if ( reply == nullptr) { - LOG_INFO( GetCmdLogger(), "Error : Redis reply null in CallBack") + if ( reply == nullptr) return ; - } - else if ( reply->type == REDIS_REPLY_INTEGER) { - // Recupero il numero di clients iscritti al canale dove ho effettuato una publish + + LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked") + + if ( reply->type == REDIS_REPLY_INTEGER) { + // Recupero lo Stato + RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; + if ( MyStatus == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; + return ; + } + // Imposto stato di pubblicazione + MyStatus->bPublished = true ; + // Recupero il numero di clients iscritti al canale LOG_INFO( GetCmdLogger(), ( string{ "Message published to "} + ToString( reply->integer) + " clients !").c_str()) ; return ; } @@ -533,8 +676,7 @@ MessageCallback( redisAsyncContext* ctx, void* r, void*) reply->element[2] != nullptr && reply->element[2]->str != nullptr) { string sChannel = reply->element[1]->str ; string sMessage = reply->element[2]->str ; - LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " + - sMessage).c_str()) + LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " + sMessage).c_str()) } else { LOG_INFO( GetCmdLogger(), "Errror : Invalid Message received") @@ -545,6 +687,15 @@ MessageCallback( redisAsyncContext* ctx, void* r, void*) else if ( strcmp( msgType, "subscribe") == 0 && reply->elements >= 3) { if ( reply->element[1] != nullptr && reply->element[1] != nullptr && reply->element[2] != nullptr) { + // Recupero lo Stato + RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; + if ( MyStatus == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; + return ; + } + // Imposto stato di Subscribe + MyStatus->bSubscribed = true ; + // Comunico il risultato string sChannel = reply->element[1]->str ; int nCount = reply->element[2]->integer ; LOG_INFO( GetCmdLogger(), ( string{ "Subscribed to ["} + sChannel + "]," + @@ -559,19 +710,28 @@ MessageCallback( redisAsyncContext* ctx, void* r, void*) else if ( strcmp( msgType, "unsubscribe") == 0 && reply->elements >= 3) { if ( reply->element[1] != nullptr && reply->element[1] != nullptr && reply->element[2] != nullptr) { + // Recupero lo Stato + RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; + if ( MyStatus == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; + return ; + } + // Imposto stato di pubblicazione + MyStatus->bUnsubscribed = true ; + // Comunico il risultato string sChannel = reply->element[1]->str ; int nCount = reply->element[2]->integer ; LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribed to ["} + sChannel + "]," + " subscriptions left : " + ToString( nCount)).c_str()) } else { - LOG_INFO( GetCmdLogger(), "Errror : Invalid Unbubscription reply") + LOG_INFO( GetCmdLogger(), "Errror : Invalid Unsubscription reply") return ; } } // --- Undefined else { - LOG_INFO( GetCmdLogger(), "Undefined reply in CallBack from Redis") + LOG_INFO( GetCmdLogger(), "Error : Undefined reply in CallBack from Redis") return ; } } @@ -579,9 +739,9 @@ MessageCallback( redisAsyncContext* ctx, void* r, void*) return ; } -//---------------------------------------------------------------------------- -// Funzione di CallBack per WaitForReadisMessage -//---------------------------------------------------------------------------- +// --------------------------------------------------------------------------- +// [CB] Funzione di CallBack per ricezione MESSAGE Asincrona +// --------------------------------------------------------------------------- static void WaitMessageCallback( redisAsyncContext* ctx, void* r, void*) { @@ -590,15 +750,14 @@ WaitMessageCallback( redisAsyncContext* ctx, void* r, void*) LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack") return ; } - LOG_INFO( GetCmdLogger(), "RedisMessageCallback invoked") + LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked") // Recupero della risposta dal server redis redisReply* reply = static_cast( r) ; - if ( reply == nullptr) { - LOG_INFO( GetCmdLogger(), "Error : Redis reply null in CallBack") + if ( reply == nullptr) return ; - } - else if ( reply->type != REDIS_REPLY_ARRAY || reply->elements < 3) + + if ( reply->type != REDIS_REPLY_ARRAY || reply->elements < 3) LOG_INFO( GetCmdLogger(), "Error : Invalid Message CallBack reply") else { // Recupero il Messaggio ( se callBack riferita al tipo "message") @@ -610,153 +769,114 @@ WaitMessageCallback( redisAsyncContext* ctx, void* r, void*) if ( reply->element[2] == nullptr || reply->element[2]->str == nullptr) LOG_INFO( GetCmdLogger(), "Error : Null message received") else { - s_sMessage = reply->element[2]->str ; - s_bMessage = true ; + // Recupero lo Stato + RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; + if ( MyStatus == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; + return ; + } + // Imposto stato di pubblicazione + MyStatus->bMessage = true ; + MyStatus->sMessage = reply->element[2]->str ; } } } } } -//---------------------------------------------------------------------------- -// Funzione per Memorizzare il DataBase Redis da selezionare [Callback] -//---------------------------------------------------------------------------- -static bool -SetPendingDataBase( int nDB) -{ - // I DataBase su Redis sono al più 15 - if ( REDIS_MIN_DB <= nDB && nDB <= REDIS_MAX_DB) { - s_nPendingDataBase = nDB ; - return true ; - } - return false ; -} - -//---------------------------------------------------------------------------- -// Funzione per Ricavare il numero di DataBase Redis da selezionare [Callback] -//---------------------------------------------------------------------------- -static int -GetPendingDataBase() -{ - return s_nPendingDataBase ; -} - // --------------------------------------------------------------------------- -// Funzione per Memorizzare l'utente di autenticazione [Callback] +// [CB] Funzione di CallBack per comando CONNECT Asincrono // --------------------------------------------------------------------------- static void -SetPendingUser( string sUsr) +RedisConnectCallBack( const redisAsyncContext* ctx, int nStatus) { - s_sUser = sUsr ; -} + // Verifico che il contesto e lo stato siano validi + if ( ctx == nullptr || nStatus != REDIS_OK) { + LOG_INFO( GetCmdLogger(), "Error : Async Disconnection CallBack failed ") ; + return ; + } -// -------------------------------------------------------------------------- -// Funzione per ottenere l'utente di autenticazione [Callback] -// -------------------------------------------------------------------------- -static string -GetPendingUser() -{ - return s_sUser ; + // Imposto flag di Connessione + RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; + if ( MyStatus == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; + return ; + } + MyStatus->bConnected = true ; } // --------------------------------------------------------------------------- -// Funzione per Memorizzare la Password di autenticazione [Callback] +// [CB] Funzione di CallBack per comando DISCONNECT Asincrono // --------------------------------------------------------------------------- static void -SetPendingPassword( string sPsw) +RedisDisconnectCallBack( const redisAsyncContext* ctx, int nStatus) { - s_sPassword = sPsw ; -} + // Verifico che il contesto e lo stato siano validi + if ( ctx == nullptr || nStatus != REDIS_OK) { + LOG_INFO( GetCmdLogger(), "Error : Async Disconnection CallBack failed ") ; + return ; + } -// -------------------------------------------------------------------------- -// Funzione per ottenere la Password di autenticazione [Callback] -// -------------------------------------------------------------------------- -static string -GetPendingPassword() -{ - return s_sPassword ; + // Imposto flag di Disconnessione + RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; + if ( MyStatus == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; + return ; + } + MyStatus->bDisconnected = true ; } //---------------------------------------------------------------------------- -static bool -GetConnectionContext( redisAsyncContext*& ctx, const string& sHost, int nPort) +// Funzione per comando CONNECT Asincrono +// --------------------------------------------------------------------------- +bool +ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection) { - ctx = redisAsyncConnect( sHost.c_str(), nPort) ; - if ( ctx == nullptr) { - LOG_INFO( GetCmdLogger(), "Error : Can't allocate Redis Async context") - return false ; - } - if ( ctx->err != 0) { - LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + ctx->errstr).c_str()) - redisAsyncFree( ctx) ; - ctx = nullptr ; - WSACleanup() ; - return false ; - } - - return true ; -} - -//---------------------------------------------------------------------------- -static void -CleanupWinsock( void) -{ - // entrambi i contesti devono essere nulli - if ( s_pRedisAsyncPubContext == nullptr && s_pRedisAsyncSubContext == nullptr) { - // entrambi i loop devono essere terminati - if ( ! s_bPubLoopRunning && ! s_bSubLoopRunning) { - WSACleanup() ; - LOG_INFO( GetCmdLogger(), "Winsock cleaned up") ; + // Inizializzo una nuova connessione, recuperando l'Id + int nMyIdConnection = NO_CONNECTION ; + for ( int i = 0 ; i < int( vAsyncRedisClients.size()) ; ++ i) { + if ( vAsyncRedisClients[i].m_bFreeConnection) { + nMyIdConnection = i ; + break ; } } -} + if ( nMyIdConnection == NO_CONNECTION) { + LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} + + ToString( MAX_CONNECTION_NBR) + " )").c_str()) + return false ; + } -//---------------------------------------------------------------------------- -// Funzione di Connessione asincrona a Redis -//---------------------------------------------------------------------------- -bool -ExeRedisAsyncConnect( const string& sConnection) -{ - // Flag di connessione - s_bPubConnected = false ; - s_bSubConnected = false ; + // Recupero i riferimenti necessari e blocco la connessione corrente + RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ; + AsyncRedisClient.m_bFreeConnection = false ; + RedisConnectionInfo& ConnectionInfo = AsyncRedisClient.m_ConnectionInfo ; // Recupero i parametri dalla stringa di connessione - RedisConnectionInfo ConnectionInfo ; - if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) - return false ; - - // Se connesione già presente, termino e creo la nuova - if ( s_pRedisAsyncPubContext != nullptr || s_pRedisAsyncSubContext != nullptr) - ExeRedisAsyncDisconnect() ; - - // Memorizzo il valore dello User, della Password e del DataBase, serviranno una volta - // che la connessione è effettivamente stabilita e corretta ( -> CallBack di connessione) - SetPendingUser( ConnectionInfo.sUser) ; - SetPendingPassword( ConnectionInfo.sPassword) ; - if ( ! SetPendingDataBase( ConnectionInfo.nDefaultDataBase)) { - LOG_INFO( GetCmdLogger(), "Error : DataBase number must be in range [0,15]") ; + if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) { + AsyncRedisClient.Clear() ; return false ; } // Inizializzazione libreria Winsock, necessaria per usare le funzionalità di rete su Windows, // come socket TCP/IP. Versione utilizzata 2.2 - WSADATA wsaData ; - int nStatus = WSAStartup( MAKEWORD( 2, 2), &wsaData) ; + int nStatus = WSAStartup( MAKEWORD( 2, 2), &AsyncRedisClient.m_wsaData) ; if ( nStatus != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : WSAStartup -> "} + ToString( nStatus)).c_str()) ; + AsyncRedisClient.Clear() ; return false ; } // Verifico se si tratta di un nodo Sentinella - redisContext* sentinelCtx = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; + redisContext* sentinelCtx = redisConnect( AsyncRedisClient.m_ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; if ( sentinelCtx == nullptr) { LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context") + AsyncRedisClient.Clear() ; return false ; } else if ( sentinelCtx->err != 0) { LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + sentinelCtx->errstr).c_str()) redisFree( sentinelCtx) ; + AsyncRedisClient.Clear() ; return false ; } redisReply* replySentinel = ( redisReply*)redisCommand( sentinelCtx, "SENTINEL get-master-addr-by-name %s", @@ -776,212 +896,182 @@ ExeRedisAsyncConnect( const string& sConnection) } // Definisco le 2 connessioni asincroni con Host e Porta ( per PUBLISH e SUBSCRIBE/UNSUBSCRIBE) - if ( ! GetConnectionContext( s_pRedisAsyncPubContext, ConnectionInfo.sHost, ConnectionInfo.nPort) || - ! GetConnectionContext( s_pRedisAsyncSubContext, ConnectionInfo.sHost, ConnectionInfo.nPort)) + AsyncRedisClient.m_pRedisAsyncPubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; + AsyncRedisClient.m_pRedisAsyncSubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ; + if ( AsyncRedisClient.m_pRedisAsyncPubContext == nullptr || AsyncRedisClient.m_pRedisAsyncSubContext == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : Can't allocate Redis Async context") + AsyncRedisClient.Clear() ; return false ; - LOG_INFO( GetCmdLogger(), "Connected to Redis ! ( Pub/Sub)") - - // Imposto e Definisco la funzione di CallBack per contesto PUB per Connessione asincrona - redisAsyncSetConnectCallback( - s_pRedisAsyncPubContext, - []( const redisAsyncContext* ctx, int nStatus) { - // --- Se contesto nullo, errore - if ( ctx == nullptr || nStatus != REDIS_OK) { - LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack Pub failed ") ; - return ; - } - // --- Se Autenticazione richiesta -> AUTH e SELECT - string sPassword = GetPendingPassword() ; - if ( ! sPassword.empty()) { - redisAsyncCommand( - s_pRedisAsyncPubContext, - []( redisAsyncContext* ctx, void* r, void*) { - // Invio la richiesta - redisReply* reply = static_cast( r) ; - if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { - string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null asnwer" ; - LOG_INFO( GetCmdLogger(), ( string{ " Error : Authentication -> "} + sErrMsg).c_str()) - return ; - } - LOG_INFO( GetCmdLogger(), "Valid Authentication for Pub connection") - redisAsyncCommand( - ctx, - []( redisAsyncContext* ctx, void* r, void*) { - redisReply* reply = static_cast( r) ; - if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { - string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; - LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ; - return ; - } - LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) ; - s_bPubConnected = true ; - }, - nullptr, "SELECT %d", GetPendingDataBase() - ) ; - }, - nullptr, "AUTH %s %s", GetPendingUser().c_str(), GetPendingPassword().c_str() - ) ; - } - // Se nessuna autenticazione -> SELECT - else { - redisAsyncCommand( - s_pRedisAsyncPubContext, - []( redisAsyncContext* ctx, void* r, void*) { - redisReply* reply = static_cast( r) ; - if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { - string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; - LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ; - return ; - } - LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) - s_bPubConnected = true ; - }, - nullptr, "SELECT %d", GetPendingDataBase() - ) ; - } - } - ) ; - - // Imposto e Definisco la funzione di CallBack per contesto SUB per Connessione asincrona - redisAsyncSetConnectCallback( - s_pRedisAsyncSubContext, - []( const redisAsyncContext* ctx, int nStatus) { - // --- Se contesto nullo, errore - if ( ctx == nullptr || nStatus != REDIS_OK) { - LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack Pub failed ") ; - return ; - } - // --- Se Autenticazione richiesta -> AUTH e SELECT - string sPassword = GetPendingPassword() ; - if ( ! sPassword.empty()) { - redisAsyncCommand( - s_pRedisAsyncSubContext, - []( redisAsyncContext* ctx, void* r, void*) { - // Invio la richiesta - redisReply* reply = static_cast( r) ; - if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { - string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null asnwer" ; - LOG_INFO( GetCmdLogger(), ( string{ " Error : Authentication -> "} + sErrMsg).c_str()) - return ; - } - LOG_INFO( GetCmdLogger(), "Valid Authentication for Pub connection") - redisAsyncCommand( - ctx, - []( redisAsyncContext* ctx, void* r, void*) { - redisReply* reply = static_cast( r) ; - if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { - string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; - LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ; - return ; - } - LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) ; - s_bSubConnected = true ; - }, - nullptr, "SELECT %d", GetPendingDataBase() - ) ; - }, - nullptr, "AUTH %s %s", GetPendingUser().c_str(), GetPendingPassword().c_str() - ) ; - } - // Se nessuna autenticazione -> SELECT - else { - redisAsyncCommand( - s_pRedisAsyncSubContext, - []( redisAsyncContext* ctx, void* r, void*) { - redisReply* reply = static_cast( r) ; - if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) { - string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ; - LOG_INFO( GetCmdLogger(), ( string{ "Error : DataBase selection -> "} + sErrMsg).c_str()) ; - return ; - } - LOG_INFO( GetCmdLogger(), ( string{ "Connected to DataBase #"} + ToString( GetPendingDataBase()) + " for Pub commands !").c_str()) - s_bSubConnected = true ; - }, - nullptr, "SELECT %d", GetPendingDataBase() - ) ; - } - } - ) ; - - // Imposto e Definisco la funzione di CallBack per la Disconnessione Pub asincrona - redisAsyncSetDisconnectCallback( - s_pRedisAsyncPubContext, - []( const redisAsyncContext* ctx, int status) { - LOG_INFO( GetCmdLogger(), "Disconnected from Redis ( Pub )") ; - s_bPubLoopRunning = false ; - s_pRedisAsyncPubContext = nullptr ; - this_thread::sleep_for( chrono::milliseconds( 50)) ; // per sicurezza - CleanupWinsock() ; - } - ) ; - // Imposto e Definisco la funzione di CallBack per la Disconnessione Sub asincrona - redisAsyncSetDisconnectCallback( - s_pRedisAsyncSubContext, - []( const redisAsyncContext* ctx, int status) { - LOG_INFO( GetCmdLogger(), "Disconnected from Redis ( Sub )") ; - s_pRedisAsyncSubContext = nullptr ; - s_bSubLoopRunning = false ; - this_thread::sleep_for( chrono::milliseconds( 50)) ; // per sicurezza - CleanupWinsock() ; - } - ) ; + } + else if ( AsyncRedisClient.m_pRedisAsyncPubContext->err != 0) { + LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncPubContext->errstr).c_str()) + AsyncRedisClient.Clear() ; + return false ; + } + else if ( AsyncRedisClient.m_pRedisAsyncSubContext->err != 0) { + LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncSubContext->errstr).c_str()) + AsyncRedisClient.Clear() ; + return false ; + } // Definizione e partenza del ciclo degli eventi in un thread separato ( in BackGround) - s_bPubLoopRunning = true ; - s_bSubLoopRunning = true ; - thread tPubEventLoop( RedisEventLoop, s_pRedisAsyncPubContext, true) ; - thread tSubEventLoop( RedisEventLoop, s_pRedisAsyncSubContext, false) ; + AsyncRedisClient.m_bPubLoopRunning = true ; + AsyncRedisClient.m_bSubLoopRunning = true ; + thread tPubEventLoop( &RedisAsync::RedisEventLoop, &AsyncRedisClient, true) ; + thread tSubEventLoop( &RedisAsync::RedisEventLoop, &AsyncRedisClient, false) ; tPubEventLoop.detach() ; tSubEventLoop.detach() ; - // Se richiesto massimo tempo di connessione, aspetto - if ( ConnectionInfo.nAsyncTimeout > 0.) { - auto tStart = chrono::steady_clock::now() ; - auto tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; + // Imposto e Definisco la funzione di CallBack per contesto PUB/SUB per Connessione asincrona + AsyncRedisClient.m_pRedisAsyncPubContext->data = &AsyncRedisClient.m_ConnectionPubStatus ; + AsyncRedisClient.m_pRedisAsyncSubContext->data = &AsyncRedisClient.m_ConnectionSubStatus ; + redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisConnectCallBack) ; + redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisConnectCallBack) ; + // Attendo la risposta di CallBack + auto tStart = chrono::steady_clock::now() ; + auto tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; + while ( ! AsyncRedisClient.m_ConnectionPubStatus.bConnected || ! AsyncRedisClient.m_ConnectionSubStatus.bConnected) { + this_thread::sleep_for( chrono::milliseconds( 10)) ; + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; + AsyncRedisClient.Clear() ; + return false ; + } + } + LOG_INFO( GetCmdLogger(), "Connected to Redis !") + #if DEBUG + double dMsConnectionTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsConnectionTime) + " ms").c_str()) + #endif - while ( ! s_bSubConnected && ! s_bPubConnected) { + // Se presente una Password, eseguo l'autenticazione + if ( AsyncRedisClient.m_sPassword.empty()) { + redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisAuthenticationCallBack, + nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ; + redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisAuthenticationCallBack, + nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ; + tStart = chrono::steady_clock::now() ; + tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; + while ( ! AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated || ! AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated) { this_thread::sleep_for( chrono::milliseconds( 10)) ; if ( chrono::steady_clock::now() - tStart > tTimeOut) { - LOG_INFO( GetCmdLogger(), "Error in Connection : TimeOut exceeded") + LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ; + AsyncRedisClient.Clear() ; return false ; } } - - auto tElapsed = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; - LOG_INFO( GetCmdLogger(), ( string{ "Connected in "} + ToString( tElapsed) + " ms").c_str()) + LOG_INFO( GetCmdLogger(), "Authenticated to Redis !") + #if DEBUG + double dMsAuthenticationTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsAuthenticationTime) + " ms").c_str()) + #endif + } + else { + AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated = true ; + AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated = true ; } + // Se Presente una Selezione del Database, eseguo la selezione + if ( AsyncRedisClient.m_nDataBase != DEFAULT_DATABASE) { + redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDbSElectionCallBack, + nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ; + redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDbSElectionCallBack, + nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ; + tStart = chrono::steady_clock::now() ; + tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; + while ( ! AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB || ! AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB) { + this_thread::sleep_for( chrono::milliseconds( 10)) ; + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ; + AsyncRedisClient.Clear() ; + return false ; + } + } + LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ; + #if DEBUG + double dMsDbSelectionTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDbSelectionTime) + " ms").c_str()) + #endif + } + else { + AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB = true ; + AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB = true ; + } + + // Imposto e Definisco la funzione di CallBack per la Disconnessione Pub/Sub asincrona + redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDisconnectCallBack) ; + redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDisconnectCallBack) ; + + // Restituisco l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua) + nIdConnection = nMyIdConnection + 1 ; return true ; } //---------------------------------------------------------------------------- -// Funzione di Disconnessione asincrona a Redis -//---------------------------------------------------------------------------- +// Funzione per comando DISCONNECT Asincrono +// --------------------------------------------------------------------------- bool -ExeRedisAsyncDisconnect( void) +ExeRedisAsyncDisconnect( int nIdConnection) { - // Controllo che la connessione sia valida e il contesto ben definito - bool bOkPub = CheckConnectionAndContext( s_pRedisAsyncPubContext, true) ; - bool bOkSub = CheckConnectionAndContext( s_pRedisAsyncSubContext, false) ; - if ( ! bOkSub || ! bOkPub) + // Verifico che l'Id di connesione sia valido + int nMyIdConnection = nIdConnection - 1 ; + if ( ! CheckIdConnection( nMyIdConnection)) return false ; + // Verifico che la connessione sia attiva + if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { + LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") + return false ; + } - // Effettuo la Disconnesione asincrona - redisAsyncDisconnect( s_pRedisAsyncPubContext) ; // libera la memoria in automatico - redisAsyncDisconnect( s_pRedisAsyncSubContext) ; // libera la memoria in automatico - LOG_INFO( GetCmdLogger(), "Async Connection Closed !") ; + // Recupero la connessione + RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ; + + // Effettuo la Disconnesione asincrona ( libera la memoria in automatico) + redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncPubContext) ; + redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncSubContext) ; + + // Attendo la CallBack di disconnessione + auto tStart = chrono::steady_clock::now() ; + auto tTimeOut = chrono::milliseconds( static_cast( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ; + while ( ! AsyncRedisClient.m_ConnectionPubStatus.bDisconnected || + ! AsyncRedisClient.m_ConnectionSubStatus.bDisconnected) { + this_thread::sleep_for( chrono::milliseconds( 10)) ; + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; + return false ; + } + } + + // Rendo libera la connessione + AsyncRedisClient.m_bFreeConnection = true ; + AsyncRedisClient.Clear() ; + + // Pulisco WinSock2 ( timer ?) + // ??? + + LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ; + #if DEBUG + double dMsDisconnectionTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDisconnectionTime) + " ms").c_str()) + #endif return true ; } -//---------------------------------------------------------------------------- -// Funzione di Publish asincrona a Redis -//---------------------------------------------------------------------------- +// --------------------------------------------------------------------------- +// Funzione per comando PUBLISH Asincrono +// --------------------------------------------------------------------------- bool -ExeRedisAsyncPublish( const string& sChannel, const string& sMessage) +ExeRedisAsyncPublish( int nIdConnection, const string& sChannel, const string& sMessage) { - // Controllo che la connessione Pub sia valida e il contesto Pub ben definito - if ( ! CheckConnectionAndContext( s_pRedisAsyncPubContext, true)) + // Verifico che l'Id di connesione sia valido + int nMyIdConnection = nIdConnection - 1 ; + if ( ! CheckIdConnection( nMyIdConnection)) return false ; + // Verifico che la connessione sia attiva + if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { + LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") + return false ; + } // Se non ho alcun canale -> errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") @@ -991,77 +1081,143 @@ ExeRedisAsyncPublish( const string& sChannel, const string& sMessage) if ( sMessage.empty()) LOG_INFO( GetCmdLogger(), "Warning : Empty Message") ; + // Recupero la connessione + RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ; + // Eseguo il comando PUBLISH - if ( redisAsyncCommand( s_pRedisAsyncPubContext, MessageCallback, nullptr, + AsyncRedisClient.m_ConnectionPubStatus.bPublished = false ; + if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisPubSubUnsubMsgCallback, nullptr, "PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error: redisAsyncCommand PUBLISH failed") return false ; } - + auto tStart = chrono::steady_clock::now() ; + auto tTimeOut = chrono::milliseconds( static_cast( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ; + while ( ! AsyncRedisClient.m_ConnectionPubStatus.bPublished) { + this_thread::sleep_for( chrono::milliseconds( 10)) ; + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout Subscribe exceeded") ; + return false ; + } + } + #if DEBUG + double dMsPublishTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsPublishTime) + " ms").c_str()) + #endif return true ; } //---------------------------------------------------------------------------- -// Funzione di Subscribe asincrona a Redis -//---------------------------------------------------------------------------- +// Funzione per comando SUBSCRIBE Asincrono +// --------------------------------------------------------------------------- bool -ExeRedisAsyncSubscribe( const string& sChannel) +ExeRedisAsyncSubscribe( int nIdConnection, const string& sChannel) { - // Controllo che la connessione sia valida e il contesto ben definito - if ( ! CheckConnectionAndContext( s_pRedisAsyncSubContext, false)) + // Verifico che l'Id di connesione sia valido + int nMyIdConnection = nIdConnection - 1 ; + if ( ! CheckIdConnection( nMyIdConnection)) return false ; + // Verifico che la connessione sia attiva + if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { + LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") + return false ; + } // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") ; return false ; } - // Eseguo il comando Subscribe - if ( redisAsyncCommand( s_pRedisAsyncSubContext, MessageCallback, nullptr, + // Recupero la connessione + RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ; + + // Eseguo il comando SUBSCRIBE + AsyncRedisClient.m_ConnectionPubStatus.bSubscribed = false ; + if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr, ( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand SUBSCRIBE failed") return false ; } - + auto tStart = chrono::steady_clock::now() ; + auto tTimeOut = chrono::milliseconds( static_cast( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ; + while ( ! AsyncRedisClient.m_ConnectionSubStatus.bSubscribed) { + this_thread::sleep_for( chrono::milliseconds( 10)) ; + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout Subscribe exceeded") ; + return false ; + } + } + #if DEBUG + double dMsSubscribeTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsSubscribeTime) + " ms").c_str()) + #endif return true ; } -//---------------------------------------------------------------------------- -// Funzione di Unsubscribe asincrona a Redis -//---------------------------------------------------------------------------- +// --------------------------------------------------------------------------- +// Funzione per comando UNSUBSCRIBE Asincrono +// --------------------------------------------------------------------------- bool -ExeRedisAsyncUnsubscribe( const string& sChannel) +ExeRedisAsyncUnsubscribe( int nIdConnection, const string& sChannel) { - // Controllo che la connessione sia valida e il contesto ben definito - if ( ! CheckConnectionAndContext( s_pRedisAsyncSubContext, false)) + // Verifico che l'Id di connesione sia valido + int nMyIdConnection = nIdConnection - 1 ; + if ( ! CheckIdConnection( nMyIdConnection)) return false ; + // Verifico che la connessione sia attiva + if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { + LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") + return false ; + } // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") return false ; } - // Eseguo il comando Unsubscribe - if ( redisAsyncCommand( s_pRedisAsyncSubContext, MessageCallback, nullptr, + // Recupero la connessione + RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ; + + // Eseguo il comando UNSUBSCRIBE + AsyncRedisClient.m_ConnectionPubStatus.bUnsubscribed = false ; + if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr, ( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed") return false ; } - + auto tStart = chrono::steady_clock::now() ; + auto tTimeOut = chrono::milliseconds( static_cast( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ; + while ( ! AsyncRedisClient.m_ConnectionSubStatus.bUnsubscribed) { + this_thread::sleep_for( chrono::milliseconds( 10)) ; + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout Unsubscribe exceeded") ; + return false ; + } + } + #if DEBUG + double dMsUnsubscribeTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsUnsubscribeTime) + " ms").c_str()) + #endif return true ; } -//---------------------------------------------------------------------------- -// Funzione di che aspetta un messaggio asincro su un canale Redis -//---------------------------------------------------------------------------- +// --------------------------------------------------------------------------- // La funzione esegue una Subscribe e una Unsubscribe // Se entro il tempo stabilito riceve un messaggio, lo comunica, altrimenti errore +// --------------------------------------------------------------------------- bool -ExeRedisAsyncSubscribeOneMessage( const string& sChannel, double dMaxTimeOut, string& sMessage) +ExeRedisAsyncSubscribeOneMessage( int nIdConnection, const string& sChannel, double dMaxTimeOut, + string& sMessage) { - // Controllo che la connessione sia valida e il contesto ben definito - if ( ! CheckConnectionAndContext( s_pRedisAsyncPubContext, true)) + // Verifico che l'Id di connesione sia valido + int nMyIdConnection = nIdConnection - 1 ; + if ( ! CheckIdConnection( nMyIdConnection)) return false ; + // Verifico che la connessione sia attiva + if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { + LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") + return false ; + } // Se non ho alcun canale, errore if ( sChannel.empty()) { LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") @@ -1073,12 +1229,13 @@ ExeRedisAsyncSubscribeOneMessage( const string& sChannel, double dMaxTimeOut, st return false ; } - // flag per ricezione del messaggio - s_bMessage = false ; - s_sMessage.clear() ; + // Recupero la connessione + RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ; - // Invio del comando di SubScribe - if ( redisAsyncCommand( s_pRedisAsyncSubContext, WaitMessageCallback, nullptr, + // Invio del comando di SUBSCRIBE + AsyncRedisClient.m_ConnectionSubStatus.bMessage = false ; + AsyncRedisClient.m_ConnectionSubStatus.sMessage = "" ; + if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr, ( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to subscribe to"} + sChannel).c_str()) return false ; @@ -1087,7 +1244,7 @@ ExeRedisAsyncSubscribeOneMessage( const string& sChannel, double dMaxTimeOut, st // Attivo il TimeOut per l'attesa del messaggio auto tStart = chrono::steady_clock::now() ; auto tTimeOut = chrono::milliseconds( static_cast( dMaxTimeOut)) ; - while ( ! s_bMessage) { + while ( ! AsyncRedisClient.m_ConnectionSubStatus.bMessage) { this_thread::sleep_for( chrono::milliseconds( 10)) ; if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), ( string{ "Timeout : No Message received on ["} + sChannel + "]").c_str()) @@ -1095,16 +1252,16 @@ ExeRedisAsyncSubscribeOneMessage( const string& sChannel, double dMaxTimeOut, st } } - // Invio del comando Unsubscribe - if ( redisAsyncCommand( s_pRedisAsyncSubContext, WaitMessageCallback, nullptr, + // Invio del comando UNSUBSCRIBE + if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr, ( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to Unsubscribe to "} + sChannel).c_str()) return false ; } // Controllo se ho ricevuto un messaggio - if ( s_bMessage) { - sMessage = s_sMessage ; + if ( AsyncRedisClient.m_ConnectionSubStatus.bMessage) { + sMessage = AsyncRedisClient.m_ConnectionSubStatus.sMessage ; LOG_INFO( GetCmdLogger(), ( string{ "Message received on ["} + sChannel + "]: " + sMessage).c_str()) return true ; } diff --git a/EgtExecutor.rc b/EgtExecutor.rc index 188988d..01e79d7 100644 Binary files a/EgtExecutor.rc and b/EgtExecutor.rc differ diff --git a/LUA_Redis.cpp b/LUA_Redis.cpp index 30fc5d8..8aef4b4 100644 --- a/LUA_Redis.cpp +++ b/LUA_Redis.cpp @@ -28,20 +28,24 @@ LuaRedisConnect( lua_State* L) LuaCheckParam( L, 1, sConnection) LuaClearStack( L) ; // Imposto la connessione - bool bOk = ExeRedisConnect( sConnection) ; + int nIdConnection = 0 ; + bool bOk = ExeRedisConnect( sConnection, nIdConnection) ; // Restituisco il risultato LuaSetParam( L, bOk) ; - return 1 ; + LuaSetParam( L, nIdConnection) ; + return 2 ; } //------------------------------------------------------------------------------- static int LuaRedisDisconnect( lua_State* L) { - // Nessun paramtro + // 1 parametro : nIdConnection + int nIdConnection = 0 ; + LuaCheckParam( L, 1, nIdConnection) ; LuaClearStack( L) ; // Imposto la connessione - bool bOk = ExeRedisDisconnect() ; + bool bOk = ExeRedisDisconnect( nIdConnection) ; // Restituisco il risultato LuaSetParam( L, bOk) ; return 1 ; @@ -51,14 +55,16 @@ LuaRedisDisconnect( lua_State* L) static int LuaRedisSetValFromKey( lua_State* L) { - // 2 parametri : sKey, sVal + // 3 parametri : nIdConnection, sKey, sVal + int nIdConnection = 0 ; + LuaCheckParam( L, 1, nIdConnection) ; string sKey ; - LuaCheckParam( L, 1, sKey) ; + LuaCheckParam( L, 2, sKey) ; string sVal ; - LuaCheckParam( L, 2, sVal) ; + LuaCheckParam( L, 3, sVal) ; LuaClearStack( L) ; // Imposto il valore della chiave - bool bOk = ExeRedisSetValFromKey( sKey, sVal) ; + bool bOk = ExeRedisSetValFromKey( nIdConnection, sKey, sVal) ; // Restituisco il risultato LuaSetParam( L, bOk) ; return 1 ; @@ -68,13 +74,15 @@ LuaRedisSetValFromKey( lua_State* L) static int LuaRedisGetValFromKey( lua_State* L) { - // 1 parametro : sKey + // 2 parametri : nIdConnection, sKey + int nIdConnection = 0 ; + LuaCheckParam( L, 1, nIdConnection) ; string sKey ; - LuaCheckParam( L, 1, sKey) + LuaCheckParam( L, 2, sKey) LuaClearStack( L) ; // Recupero il valore della chiave string sVal, sType ; - bool bOk = ExeRedisGetValFromKey( sKey, sVal) ; + bool bOk = ExeRedisGetValFromKey( nIdConnection, sKey, sVal) ; // Restituisco il risultato LuaSetParam( L, bOk) ; LuaSetParam( L, sVal) ; @@ -85,25 +93,29 @@ LuaRedisGetValFromKey( lua_State* L) static int LuaRedisAsyncConnect( lua_State* L) { - // 1 parametro : sConnection + // 2 parametri : sConnection string sConnection ; LuaCheckParam( L, 1, sConnection) LuaClearStack( L) ; // Imposto la connessione - bool bOk = ExeRedisAsyncConnect( sConnection) ; + int nIdConnection = -1 ; + bool bOk = ExeRedisAsyncConnect( sConnection, nIdConnection) ; // Restituisco il risultato LuaSetParam( L, bOk) ; - return 1 ; + LuaSetParam( L, nIdConnection) ; + return 2 ; } //------------------------------------------------------------------------------- static int LuaRedisAsyncDiconnect( lua_State* L) { - // Nessun parametro + // 1 parametro : nIdConnection + int nIdConnection = 0 ; + LuaCheckParam( L, 1, nIdConnection) ; LuaClearStack( L) ; // Imposto la connessione - bool bOK = ExeRedisAsyncDisconnect() ; + bool bOK = ExeRedisAsyncDisconnect( nIdConnection) ; // Restituisco il risultato LuaSetParam( L, bOK) ; return 1 ; @@ -113,14 +125,16 @@ LuaRedisAsyncDiconnect( lua_State* L) static int LuaRedisAsyncPublish( lua_State* L) { - // 2 parametri : sChannel, sMessage + // 3 parametri : nIdConnection, sChannel, sMessage + int nIdConnection = 0 ; + LuaCheckParam( L, 1, nIdConnection) ; string sChannel ; - LuaCheckParam( L, 1, sChannel) ; + LuaCheckParam( L, 2, sChannel) ; string sMessage ; - LuaCheckParam( L, 2, sMessage) ; + LuaCheckParam( L, 3, sMessage) ; LuaClearStack( L) ; // Pubblico il messaggio sul canale - bool bOk = ExeRedisAsyncPublish( sChannel, sMessage) ; + bool bOk = ExeRedisAsyncPublish( nIdConnection, sChannel, sMessage) ; // Restituisco il risultato LuaSetParam( L, bOk) ; return 1 ; @@ -130,12 +144,14 @@ LuaRedisAsyncPublish( lua_State* L) static int LuaRedisAsyncSubscribe( lua_State* L) { - // 1 parametro : sChannel + // 2 parametri : nIdConnection, sChannel + int nIdConnection = 0 ; + LuaCheckParam( L, 1, nIdConnection) ; string sChannel ; - LuaCheckParam( L, 1, sChannel) ; + LuaCheckParam( L, 2, sChannel) ; LuaClearStack( L) ; // Iscrizione al canale - bool bOk = ExeRedisAsyncSubscribe( sChannel) ; + bool bOk = ExeRedisAsyncSubscribe( nIdConnection, sChannel) ; // Restituisco il risultato LuaSetParam( L, bOk) ; return 1 ; @@ -145,12 +161,14 @@ LuaRedisAsyncSubscribe( lua_State* L) static int LuaRedisAsyncUnsubscribe( lua_State* L) { - // 1 parametro : sChannel + // 2 parametri : nIdConnection, sChannel + int nIdConnection = 0 ; + LuaCheckParam( L, 1, nIdConnection) ; string sChannel ; - LuaCheckParam( L, 1, sChannel) ; + LuaCheckParam( L, 2, sChannel) ; LuaClearStack( L) ; // Disiscrizione - bool bOk = ExeRedisAsyncUnsubscribe( sChannel) ; + bool bOk = ExeRedisAsyncUnsubscribe( nIdConnection, sChannel) ; // Restituisco il risultato LuaSetParam( L, bOk) ; return 1 ; @@ -160,15 +178,17 @@ LuaRedisAsyncUnsubscribe( lua_State* L) static int LuaRedisAsyncSubscribeOneMessage( lua_State* L) { - // 2 parametri : sChannel, dMaxTimeOut + // 3 parametri : nIdConnection, sChannel, dMaxTimeOut + int nIdConnection = 0 ; + LuaCheckParam( L, 1, nIdConnection) ; string sChannel ; - LuaCheckParam( L, 1, sChannel) ; + LuaCheckParam( L, 2, sChannel) ; double dMaxTimeOut = 0. ; - LuaCheckParam( L, 2, dMaxTimeOut) ; + LuaCheckParam( L, 3, dMaxTimeOut) ; LuaClearStack( L) ; - // Disiscrizione + // Eseguo funzione speciale di iscrizione - attesa messaggio - disiscrizione string sMessage ; - bool bOk = ExeRedisAsyncSubscribeOneMessage( sChannel, dMaxTimeOut, sMessage) ; + bool bOk = ExeRedisAsyncSubscribeOneMessage( nIdConnection, sChannel, dMaxTimeOut, sMessage) ; // Restituisco il risultato LuaSetParam( L, bOk) ; LuaSetParam( L, sMessage) ;