From 9a719b94cf7adcdc2ac11deb45bfaf7f263a604b Mon Sep 17 00:00:00 2001 From: Riccardo Elitropi Date: Mon, 22 Dec 2025 17:05:30 +0100 Subject: [PATCH] EgtExecutor : - in Redis aggiunta gestione per riconnessioni. --- EXE_Redis.cpp | 640 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 484 insertions(+), 156 deletions(-) diff --git a/EXE_Redis.cpp b/EXE_Redis.cpp index d0351bd..3c37fb1 100644 --- a/EXE_Redis.cpp +++ b/EXE_Redis.cpp @@ -117,6 +117,7 @@ struct RedisConnectionStatus { atomic nIdConnection ; atomic bConnected ; + atomic bAlive ; atomic bAuthenticated ; atomic bSelectedDB ; atomic bDisconnected ; @@ -128,13 +129,14 @@ struct RedisConnectionStatus { RedisConnectionStatus() : nIdConnection( 0), - bConnected( false), bAuthenticated( false), bSelectedDB( false), bDisconnected( false), + bConnected( false), bAlive( false), bAuthenticated( false), bSelectedDB( false), bDisconnected( false), bPublished( false), bSubscribed( false), bUnsubscribed( false), bMessage( false), sMessage( "") {} inline void Clear() { nIdConnection = 0 ; bConnected = false ; + bAlive = false ; bAuthenticated = false ; bSelectedDB = false ; bDisconnected = false ; @@ -154,6 +156,8 @@ class RedisAsync { atomic m_bFreeConnection ; redisAsyncContext* m_pRedisAsyncPubContext ; redisAsyncContext* m_pRedisAsyncSubContext ; + string m_sConnectionString ; + unordered_set m_set_SubChannels ; int m_nDataBase ; string m_sPassword ; string m_sUser ; @@ -162,28 +166,63 @@ class RedisAsync { RedisConnectionInfo m_ConnectionInfo ; RedisConnectionStatus m_ConnectionSubStatus ; RedisConnectionStatus m_ConnectionPubStatus ; + thread m_PubThread ; + thread m_SubThread ; WSADATA m_wsaData ; RedisAsync() : m_bFreeConnection( true), m_pRedisAsyncPubContext( nullptr), m_pRedisAsyncSubContext( nullptr), - m_nDataBase( REDIS_MIN_DB), m_sPassword( ""), m_sUser( DEFAULT_USER), + m_sConnectionString( ""), m_set_SubChannels(), m_nDataBase( REDIS_MIN_DB), m_sPassword( ""), m_sUser( DEFAULT_USER), m_bPubLoopRunning( false), m_bSubLoopRunning( false), m_ConnectionInfo(), m_ConnectionSubStatus(), m_ConnectionPubStatus() {} ; + // thread è già inizializzato con il suo costruttore di Default valido inline void Clear() { - m_bFreeConnection = true ; - redisAsyncFree( m_pRedisAsyncPubContext) ; - m_pRedisAsyncPubContext = nullptr ; - redisAsyncFree( m_pRedisAsyncSubContext) ; - m_pRedisAsyncSubContext = nullptr ; + + // Loop da terminare + m_bPubLoopRunning.store( false, memory_order_release) ; + m_bSubLoopRunning.store( false, memory_order_release) ; + + // Sveglio i Thread bloccanti + if ( m_pRedisAsyncPubContext) + redisAsyncDisconnect( m_pRedisAsyncPubContext) ; + if ( m_pRedisAsyncSubContext) + redisAsyncDisconnect( m_pRedisAsyncSubContext) ; + + // Join dei Thread + if ( m_PubThread.joinable()) + m_PubThread.join() ; + if ( m_SubThread.joinable()) + m_SubThread.join() ; + + // Rilascio dei contesti Redis + if ( m_pRedisAsyncPubContext) { + redisAsyncFree( m_pRedisAsyncPubContext) ; + m_pRedisAsyncPubContext = nullptr ; + } + if ( m_pRedisAsyncSubContext) { + redisAsyncFree( m_pRedisAsyncSubContext) ; + m_pRedisAsyncSubContext = nullptr ; + } + + // Reset dello Stato + m_bFreeConnection.store( true, memory_order_release) ; + m_sConnectionString.clear() ; m_nDataBase = REDIS_MIN_DB ; - m_sPassword = "" ; + m_sPassword.clear() ; m_sUser = DEFAULT_USER ; - m_bPubLoopRunning = false ; - m_bSubLoopRunning = false ; + m_ConnectionInfo.Clear() ; m_ConnectionSubStatus.Clear() ; m_ConnectionPubStatus.Clear() ; + m_set_SubChannels.clear() ; + } + + ~RedisAsync() { + if ( m_PubThread.joinable()) + m_PubThread.join() ; + if ( m_SubThread.joinable()) + m_SubThread.join(); } public : @@ -209,18 +248,12 @@ void RedisAsync::RedisEventLoop( bool bIsPub) const { // Verifica della validità del contesto - redisAsyncContext* ctx = nullptr ; - if ( bIsPub) - ctx = m_pRedisAsyncPubContext ; - else - ctx = m_pRedisAsyncSubContext ; + redisAsyncContext* ctx = ( bIsPub ? m_pRedisAsyncPubContext : m_pRedisAsyncSubContext) ; if ( ctx == nullptr || ctx->err != 0) return ; // se attivazione loop prima della inizializzazione callBack // Recupero del file descriptor del socket TCP usato da Redis. SOCKET sock = ctx->c.fd ; - // Imposto condizione di ascolto degli eventi - bool bLoopRunning = bIsPub ? m_bPubLoopRunning : m_bSubLoopRunning ; // Creazione dei due insiemi per il file descriptor ( lettura e scrittura ) // r = read, w = write fd_set rfds, wfds ; @@ -233,7 +266,8 @@ RedisAsync::RedisEventLoop( bool bIsPub) const timeval timeout = {0, 100000 } ; // 100ms // Ciclo - while ( bLoopRunning) { + while ( ( bIsPub ? m_bPubLoopRunning.load( memory_order_acquire) + : m_bSubLoopRunning.load( memory_order_acquire))) { // Se contesto non valido, interrompo il ciclo if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING)) break ; @@ -259,7 +293,7 @@ RedisAsync::RedisEventLoop( bool bIsPub) const redisAsyncHandleWrite( ctx) ; } // Evito saturazione CPU - this_thread::sleep_for( chrono::milliseconds( 10)) ; + this_thread::sleep_for( chrono::milliseconds( 1)) ; } // Reset del timeOut ad ogni ciclo @@ -270,7 +304,6 @@ RedisAsync::RedisEventLoop( bool bIsPub) const return ; } - //---------------------------------------------------------------------------- // [Utility] Funzione di Controllo per Id Connessione // --------------------------------------------------------------------------- @@ -632,7 +665,9 @@ RedisDbSElectionCallBack( redisAsyncContext* ctx, void* r, void*) LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } - MyStatus->bSelectedDB = true ; + // Selezione riuscita + MyStatus->bSelectedDB.store( true, memory_order_release) ; + MyStatus->bSelectedDB.notify_one() ; } //---------------------------------------------------------------------------- @@ -654,7 +689,9 @@ RedisAuthenticationCallBack( redisAsyncContext* ctx, void* r, void*) LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } - MyStatus->bAuthenticated = true ; + // Autenticazione riuscita + MyStatus->bAuthenticated.store( true, memory_order_release) ; + MyStatus->bAuthenticated.notify_one() ; } // --------------------------------------------------------------------------- @@ -670,13 +707,14 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*) return ; } - // Recupero della risposta dal server redis ( nullptr nel caso di residui) + // Recupero della risposta dal server redis redisReply* reply = static_cast( r) ; if ( reply == nullptr) return ; LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked") + // Caso PUBLISH: hiredis risponde con REDIS_REPLY_INTEGER (numero di subscribers) if ( reply->type == REDIS_REPLY_INTEGER) { // Recupero lo Stato RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; @@ -685,11 +723,13 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*) return ; } // Imposto stato di pubblicazione - MyStatus->bPublished = true ; + MyStatus->bPublished.store( true, memory_order_release) ; + MyStatus->bPublished.notify_one() ; // Recupero il numero di clients iscritti al canale LOG_INFO( GetCmdLogger(), ( string{ "Message published to "} + ToString( reply->integer) + " clients !").c_str()) ; return ; } + // Caso ARRAY: Subscribe / Unsubscribe / Message else if ( reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) { // Recupero la tipologia di chiamata effettuata const char* msgType = ( reply->element[0] != nullptr && reply->element[0]->str != nullptr ? @@ -700,8 +740,8 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*) // Messaggio ricevuto da un canale if ( reply->element[1] != nullptr && reply->element[1]->str != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) { - string sChannel = reply->element[1]->str ; - string sMessage = reply->element[2]->str ; + string sChannel{ reply->element[1]->str, reply->element[1]->len} ; + string sMessage{ reply->element[2]->str, reply->element[2]->len} ; #if DEBUG LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " + sMessage).c_str()) #endif @@ -729,7 +769,7 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*) return ; } } - // --- Se Subscribe + // --- Se SUBSCRIBE else if ( strcmp( msgType, "subscribe") == 0 && reply->elements >= 3) { if ( reply->element[1] != nullptr && reply->element[1] != nullptr && reply->element[2] != nullptr) { @@ -740,7 +780,8 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*) return ; } // Imposto stato di Subscribe - MyStatus->bSubscribed = true ; + MyStatus->bSubscribed.store( true, memory_order_release) ; + MyStatus->bSubscribed.notify_one() ; // Comunico il risultato string sChannel = reply->element[1]->str ; int nCount = reply->element[2]->integer ; @@ -762,8 +803,9 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*) LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } - // Imposto stato di pubblicazione - MyStatus->bUnsubscribed = true ; + // Imposto stato Unsubscribe + MyStatus->bUnsubscribed.store( true, memory_order_release) ; + MyStatus->bUnsubscribed.notify_one() ; // Comunico il risultato string sChannel = reply->element[1]->str ; int nCount = reply->element[2]->integer ; @@ -836,19 +878,32 @@ WaitMessageCallback( redisAsyncContext* ctx, void* r, void*) static void RedisConnectCallBack( const redisAsyncContext* ctx, int nStatus) { - // Verifico che il contesto e lo stato siano validi - if ( ctx == nullptr || nStatus != REDIS_OK) { - LOG_INFO( GetCmdLogger(), "Error : Async Disconnection CallBack failed ") ; + // Verifico che il Contesto sia valido + if ( ctx == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack failed ") ; return ; } - // Imposto flag di Connessione + // Recupero dal Contesto lo stato della connessione RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } - MyStatus->bConnected = true ; + + // Verifico che lo stato internno al contesto Redis sia OK + if ( nStatus != REDIS_OK) { + LOG_INFO( GetCmdLogger(), "Error : Redis Async connection failed") ; + MyStatus->bAlive.store( false, memory_order_release) ; + MyStatus->bConnected.store( false, memory_order_release) ; + MyStatus->bConnected.notify_one() ; + return ; + } + + // Connessione riuscita + MyStatus->bAlive.store( true, memory_order_release) ; + MyStatus->bConnected.store( true, memory_order_release) ; + MyStatus->bConnected.notify_one() ; } // --------------------------------------------------------------------------- @@ -857,19 +912,29 @@ RedisConnectCallBack( const redisAsyncContext* ctx, int nStatus) static void RedisDisconnectCallBack( const redisAsyncContext* ctx, int nStatus) { - // Verifico che il contesto e lo stato siano validi - if ( ctx == nullptr || nStatus != REDIS_OK) { - LOG_INFO( GetCmdLogger(), "Error : Async Disconnection CallBack failed ") ; + // Verifico che il Contesto sia valido + if ( ctx == nullptr) { + LOG_INFO( GetCmdLogger(), "Error : Null Context for Disconnetion CallBack ") ; return ; } - // Imposto flag di Disconnessione + // Recupero lo stato della connessione RedisConnectionStatus* MyStatus = static_cast( ctx->data) ; if ( MyStatus == nullptr) { LOG_INFO( GetCmdLogger(), "Error : No Status detected") ; return ; } - MyStatus->bDisconnected = true ; + + // nStatus non è affidabile in questo contesto, può essere != REDIS_OK anche in disconnessione normale + // NB. nStatus è solo informativo, ignora tutte le disconnessioni reali + LOG_INFO( GetCmdLogger(), "Redis Async Disconnected ( x2, called for PUB and SUB)") ; + + // Aggiorno gli stati ed eventuali thread in attesa + MyStatus->bAlive.store( false, memory_order_release) ; + MyStatus->bConnected.store( false, memory_order_release) ; + MyStatus->bConnected.notify_one() ; + MyStatus->bDisconnected.store( true, memory_order_release) ; + MyStatus->bDisconnected.notify_one() ; } //---------------------------------------------------------------------------- @@ -881,7 +946,7 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection) // Inizializzo una nuova connessione, recuperando l'Id int nMyIdConnection = NO_CONNECTION ; for ( int i = 0 ; i < int( s_vAsyncRedisClients.size()) ; ++ i) { - if ( s_vAsyncRedisClients[i].m_bFreeConnection) { + if ( s_vAsyncRedisClients[i].m_bFreeConnection.load( memory_order_acquire)) { nMyIdConnection = i ; break ; } @@ -891,13 +956,16 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection) ToString( MAX_CONNECTION_NBR) + " )").c_str()) return false ; } + // Recupero lo Slot + RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; + AsyncRedisClient.Clear() ; // per possibile riconnessione + AsyncRedisClient.m_bFreeConnection.store( false, memory_order_release) ; // Recupero i riferimenti necessari e blocco la connessione corrente - RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; - AsyncRedisClient.m_bFreeConnection = false ; RedisConnectionInfo& ConnectionInfo = AsyncRedisClient.m_ConnectionInfo ; // Recupero i parametri dalla stringa di connessione + AsyncRedisClient.m_sConnectionString = sConnection ; // per riconnessione if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) { AsyncRedisClient.Clear() ; return false ; @@ -961,28 +1029,44 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection) } // Definizione e partenza del ciclo degli eventi in un thread separato ( in BackGround) - AsyncRedisClient.m_bPubLoopRunning = true ; - AsyncRedisClient.m_bSubLoopRunning = true ; - thread tPubEventLoop( &RedisAsync::RedisEventLoop, &AsyncRedisClient, true) ; - thread tSubEventLoop( &RedisAsync::RedisEventLoop, &AsyncRedisClient, false) ; - tPubEventLoop.detach() ; - tSubEventLoop.detach() ; + AsyncRedisClient.m_ConnectionPubStatus.Clear() ; + AsyncRedisClient.m_ConnectionSubStatus.Clear() ; + AsyncRedisClient.m_bPubLoopRunning.store( true, memory_order_release) ; + AsyncRedisClient.m_bSubLoopRunning.store( true, memory_order_release) ; + AsyncRedisClient.m_PubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, true) ; + AsyncRedisClient.m_SubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, false) ; - // Imposto e Definisco la funzione di CallBack per contesto PUB/SUB per Connessione asincrona + // Imposto e Definisco la funzione di CallBack per contesto PUB/SUB per Connessione/Disconnessione asincrona AsyncRedisClient.m_pRedisAsyncPubContext->data = &AsyncRedisClient.m_ConnectionPubStatus ; AsyncRedisClient.m_pRedisAsyncSubContext->data = &AsyncRedisClient.m_ConnectionSubStatus ; + AsyncRedisClient.m_ConnectionPubStatus.bConnected.store( false, memory_order_release) ; + AsyncRedisClient.m_ConnectionSubStatus.bConnected.store( false, memory_order_release) ; redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisConnectCallBack) ; redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisConnectCallBack) ; + redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDisconnectCallBack) ; + redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDisconnectCallBack) ; // Attendo la risposta di CallBack auto tStart = chrono::steady_clock::now() ; auto tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; - while ( ! AsyncRedisClient.m_ConnectionPubStatus.bConnected || ! AsyncRedisClient.m_ConnectionSubStatus.bConnected) { - this_thread::sleep_for( chrono::milliseconds( 10)) ; + while ( ! AsyncRedisClient.m_ConnectionPubStatus.bConnected.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; AsyncRedisClient.Clear() ; return false ; } + // NB. non usare la wait( false) !!! se in rete perdo qualche informazione + // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro + this_thread::sleep_for( chrono::milliseconds( 10)) ; + } + while ( ! AsyncRedisClient.m_ConnectionSubStatus.bConnected.load( memory_order_acquire)) { + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; + AsyncRedisClient.Clear() ; + return false ; + } + // NB. non usare la wait( false) !!! se in rete perdo qualche informazione + // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro + this_thread::sleep_for( chrono::milliseconds( 10)) ; } LOG_INFO( GetCmdLogger(), "Connected to Redis !") #if DEBUG @@ -991,20 +1075,34 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection) #endif // Se presente una Password, eseguo l'autenticazione - if ( AsyncRedisClient.m_sPassword.empty()) { + if ( ! AsyncRedisClient.m_sPassword.empty()) { + AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( false, memory_order_relaxed) ; + AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( false, memory_order_relaxed) ; redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisAuthenticationCallBack, nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ; redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisAuthenticationCallBack, nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ; tStart = chrono::steady_clock::now() ; tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; - while ( ! AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated || ! AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated) { - this_thread::sleep_for( chrono::milliseconds( 10)) ; + while ( ! AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ; AsyncRedisClient.Clear() ; return false ; } + // NB. non usare la wait( false) !!! se in rete perdo qualche informazione + // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro + this_thread::sleep_for( chrono::milliseconds( 10)) ; + } + while ( ! AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.load( memory_order_acquire)) { + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ; + AsyncRedisClient.Clear() ; + return false ; + } + // NB. non usare la wait( false) !!! se in rete perdo qualche informazione + // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro + this_thread::sleep_for( chrono::milliseconds( 10)) ; } LOG_INFO( GetCmdLogger(), "Authenticated to Redis !") #if DEBUG @@ -1013,25 +1111,39 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection) #endif } else { - AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated = true ; - AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated = true ; + AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( true, memory_order_release) ; + AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( true, memory_order_release) ; } // Se Presente una Selezione del Database, eseguo la selezione if ( AsyncRedisClient.m_nDataBase != DEFAULT_DATABASE) { + AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( false, memory_order_relaxed) ; + AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( false, memory_order_relaxed) ; redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDbSElectionCallBack, nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ; redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDbSElectionCallBack, nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ; tStart = chrono::steady_clock::now() ; tTimeOut = chrono::milliseconds( static_cast( ConnectionInfo.nAsyncTimeout)) ; - while ( ! AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB || ! AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB) { - this_thread::sleep_for( chrono::milliseconds( 10)) ; + while ( ! AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.load( memory_order_acquire)) { if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ; AsyncRedisClient.Clear() ; return false ; } + // NB. non usare la wait( false) !!! se in rete perdo qualche informazione + // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro + this_thread::sleep_for( chrono::milliseconds( 10)) ; + } + while ( ! AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.load( memory_order_acquire)) { + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ; + AsyncRedisClient.Clear() ; + return false ; + } + // NB. non usare la wait( false) !!! se in rete perdo qualche informazione + // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro + this_thread::sleep_for( chrono::milliseconds( 10)) ; } LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ; #if DEBUG @@ -1040,14 +1152,10 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection) #endif } else { - AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB = true ; - AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB = true ; + AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( true, memory_order_release) ; + AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( true, memory_order_release) ; } - // Imposto e Definisco la funzione di CallBack per la Disconnessione Pub/Sub asincrona - redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDisconnectCallBack) ; - redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDisconnectCallBack) ; - // Restituisco e salvo l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua) nIdConnection = nMyIdConnection + 1 ; AsyncRedisClient.m_ConnectionPubStatus.nIdConnection = nIdConnection ; @@ -1065,14 +1173,21 @@ ExeRedisAsyncDisconnect( int nIdConnection) int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; - // Verifico che la connessione sia attiva - if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { + + // Recupero la connessione + RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; + + // Verifico che la connessione sia utilizzata, e non libera + if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } - // Recupero la connessione - RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; + // Reset dei Flag prima della connessione + AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.store( false, memory_order_relaxed) ; + AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.store( false, memory_order_relaxed) ; + AsyncRedisClient.m_ConnectionPubStatus.bAlive.store( false, memory_order_relaxed) ; + AsyncRedisClient.m_ConnectionSubStatus.bAlive.store( false, memory_order_relaxed) ; // Effettuo la Disconnesione asincrona ( libera la memoria in automatico) redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncPubContext) ; @@ -1081,9 +1196,15 @@ ExeRedisAsyncDisconnect( int nIdConnection) // Attendo la CallBack di disconnessione auto tStart = chrono::steady_clock::now() ; auto tTimeOut = chrono::milliseconds( static_cast( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ; - while ( ! AsyncRedisClient.m_ConnectionPubStatus.bDisconnected || - ! AsyncRedisClient.m_ConnectionSubStatus.bDisconnected) { - this_thread::sleep_for( chrono::milliseconds( 10)) ; + while ( ! AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.load( memory_order_acquire)) { + AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.wait( false) ; + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; + return false ; + } + } + while ( ! AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.load( memory_order_acquire)) { + AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.wait( false) ; if ( chrono::steady_clock::now() - tStart > tTimeOut) { LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ; return false ; @@ -1091,15 +1212,12 @@ ExeRedisAsyncDisconnect( int nIdConnection) } // Rendo libera la connessione - AsyncRedisClient.m_bFreeConnection = true ; + AsyncRedisClient.m_bFreeConnection.store( true) ; AsyncRedisClient.Clear() ; - // Pulisco WinSock2 ( timer ?) - // ??? - // Elimino i Messaggi dalla mappa for ( auto Iter = s_vAsyncRedisMessages.begin() ; Iter != s_vAsyncRedisMessages.end() ; ) { - if ( Iter->first.compare( GetMessageMapKey( nIdConnection, "")) == 0) + if ( Iter->first.starts_with( GetMessageMapKey( nIdConnection, ""))) Iter = s_vAsyncRedisMessages.erase( Iter) ; else ++ Iter ; @@ -1123,8 +1241,10 @@ ExeRedisAsyncPublish( int nIdConnection, const string& sChannel, const string& s int nMyIdConnection = nIdConnection - 1 ; if ( ! CheckIdConnection( nMyIdConnection)) return false ; + // Verifico che la connessione sia attiva - if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { + RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; + if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } @@ -1137,30 +1257,98 @@ ExeRedisAsyncPublish( int nIdConnection, const string& sChannel, const string& s if ( sMessage.empty()) LOG_INFO( GetCmdLogger(), "Warning : Empty Message") ; - // Recupero la connessione - RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; + // Numero massimo di Tentativi per effettuare PUBLISH + const int MAX_RETRY = 3 ; + for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) { - // Eseguo il comando PUBLISH - AsyncRedisClient.m_ConnectionPubStatus.bPublished = false ; - if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisPubSubUnsubMsgCallback, nullptr, - "PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) != REDIS_OK) { - LOG_INFO( GetCmdLogger(), "Error: redisAsyncCommand PUBLISH failed") - return false ; - } - auto tStart = chrono::steady_clock::now() ; - auto tTimeOut = chrono::milliseconds( static_cast( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ; - while ( ! AsyncRedisClient.m_ConnectionPubStatus.bPublished) { - this_thread::sleep_for( chrono::milliseconds( 10)) ; - if ( chrono::steady_clock::now() - tStart > tTimeOut) { - LOG_INFO( GetCmdLogger(), "Timeout Subscribe exceeded") ; - return false ; + // Rileggo il Client, dato che potrebbe essere cambiato dopo la riconnessione + RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ; + RedisConnectionStatus& curStatus = currClient.m_ConnectionPubStatus ; + + // Se la connessione non è attiva, provo a riconnetermi + if ( ! currClient.m_ConnectionPubStatus.bAlive.load( memory_order_acquire)) { + LOG_INFO( GetCmdLogger(), ( string{ "Publish: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ; + + // Recupero la stringa di connessione corrente + string sConnStr = s_vAsyncRedisClients[nMyIdConnection].m_sConnectionString ; + // Recupero i Canali su cui ero iscritto + unordered_set setSubChannels = currClient.m_set_SubChannels ; + + // Pulisco il contesto corrente + AsyncRedisClient.Clear() ; + + int nNewId = 0 ; + bool bOkReconnect = false ; + if ( ExeRedisAsyncConnect( sConnStr, nNewId)) { + // Se ho scelto lo stesso slot, aggiorno nMyIdConnection + int nNewIndex = nNewId - 1 ; + if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) { + // Sostituisco il riferimento Client con il nuovo + if ( nNewIndex != nMyIdConnection) + nMyIdConnection = nNewIndex ; + } + bOkReconnect = true ; + // Mi iscrivo a tutti i canali a cui ero precedentemente iscritto + for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) { + if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter)) + return false ; + } + } + else { + LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ; + // riprovo al ciclo successivo... + } + // Se non mi sono riconesso metto un piccolo Delay per estrema sicureza + if ( ! bOkReconnect) { + this_thread::sleep_for( chrono::milliseconds( 100)) ; + continue ; + } } + + // Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione) + RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ; + RedisConnectionStatus& nextStatus = nextClient.m_ConnectionPubStatus ; + + // Ricalcolo timeout dal client aggiornato + auto tTimeOut = chrono::milliseconds( static_cast( nextClient.m_ConnectionInfo.nAsyncTimeout)) ; + + // Reset dello stato di pubblicazione + nextStatus.bPublished.store( false, memory_order_relaxed) ; + + // Invio del comando PUBLISH + int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncPubContext, RedisPubSubUnsubMsgCallback, nullptr, + "PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) ; + if ( nResult != REDIS_OK) { + LOG_INFO( GetCmdLogger(), "Publish: redisAsyncCommand failed") ; + nextStatus.bAlive.store( false, memory_order_release) ; + this_thread::sleep_for( chrono::milliseconds( 100)) ; + continue ; + } + auto tStart = chrono::steady_clock::now() ; + while ( ! nextStatus.bPublished.load( memory_order_acquire)) { + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ; + break ; + } + // NB. non usare la wait( false) !!! se in rete perdo qualche informazione + // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro + this_thread::sleep_for( chrono::milliseconds( 10)) ; + } + if ( nextStatus.bPublished.load( memory_order_acquire)) { + #if DEBUG + double dMsPublishTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ; + #endif + return true ; + } + LOG_INFO( GetCmdLogger(), ( string{ "Publish failed, attempt "} + ToString( nAttempt)).c_str()) ; + + // Breve pausa prima del Retry + this_thread::sleep_for( chrono::milliseconds( 100)) ; } - #if DEBUG - double dMsPublishTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; - LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsPublishTime) + " ms").c_str()) - #endif - return true ; + + // Tutti i tentativi sono falliti + return false ; } //---------------------------------------------------------------------------- @@ -1174,7 +1362,8 @@ ExeRedisAsyncSubscribe( int nIdConnection, const string& sChannel) if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva - if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { + RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; + if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } @@ -1184,36 +1373,105 @@ ExeRedisAsyncSubscribe( int nIdConnection, const string& sChannel) return false ; } - // Recupero la connessione - RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; + // Numero massimo di Tentativi per effettuare SUBSCRIBE + const int MAX_RETRY = 3 ; + for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) { - // Eseguo il comando SUBSCRIBE - AsyncRedisClient.m_ConnectionPubStatus.bSubscribed = false ; - if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr, - ( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { - LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand SUBSCRIBE failed") - return false ; - } - auto tStart = chrono::steady_clock::now() ; - auto tTimeOut = chrono::milliseconds( static_cast( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ; - while ( ! AsyncRedisClient.m_ConnectionSubStatus.bSubscribed) { - this_thread::sleep_for( chrono::milliseconds( 10)) ; - if ( chrono::steady_clock::now() - tStart > tTimeOut) { - LOG_INFO( GetCmdLogger(), "Timeout Subscribe exceeded") ; - return false ; + // Rileggo il client e lo stato (potrebbero cambiare dopo reconnect) + RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ; + RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ; + + // Se la connessione non è viva, provo a riconnettere + if ( ! currStatus.bAlive.load( memory_order_acquire)) { + LOG_INFO( GetCmdLogger(), ( string{ "Subscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ; + + // Recupero la stringa di connessione corrente + string sConnStr = currClient.m_sConnectionString ; + // Recupero i Canali su cui ero iscritto + unordered_set setSubChannels = currClient.m_set_SubChannels ; + + // Pulisco il contesto corrente + AsyncRedisClient.Clear() ; + + int nNewId = 0 ; + bool bOkReconnect = false ; + if ( ExeRedisAsyncConnect( sConnStr, nNewId)) { + // Se ho scelto lo stesso slot, aggiorno nMyIdConnection + int nNewIndex = nNewId - 1 ; + if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) { + // Sostituisco il riferimento Client con il nuovo + if ( nNewIndex != nMyIdConnection) + nMyIdConnection = nNewIndex ; + } + bOkReconnect = true ; + // Mi iscrivo a tutti i canali a cui ero precedentemente iscritto + for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) { + if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter)) + return false ; + } + } + else { + LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ; + // riprovo al ciclo successivo... + } + // Se non mi sono riconesso metto un piccolo Delay per estrema sicureza + if ( ! bOkReconnect) { + this_thread::sleep_for( chrono::milliseconds( 100)) ; + continue ; + } } + + // Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione) + RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ; + RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ; + + // Ricalcolo timeout dal client aggiornato + auto tTimeOut = chrono::milliseconds( static_cast( nextClient.m_ConnectionInfo.nAsyncTimeout)) ; + + // Reset dello stato di pubblicazione + nextStatus.bSubscribed.store( false, memory_order_relaxed) ; + + int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr, + "SUBSCRIBE %s", sChannel.c_str()) ; + if ( nResult != REDIS_OK) { + LOG_INFO( GetCmdLogger(), "Subscribe: redisAsyncCommand failed") ; + nextStatus.bAlive.store( false, memory_order_release) ; + this_thread::sleep_for( chrono::milliseconds( 100)) ; + continue ; + } + auto tStart = chrono::steady_clock::now() ; + while ( ! nextStatus.bSubscribed.load( memory_order_acquire)) { + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ; + break ; + } + // NB. non usare la wait( false) !!! se in rete perdo qualche informazione + // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro + this_thread::sleep_for( chrono::milliseconds( 10)) ; + } + + if ( nextStatus.bSubscribed.load( memory_order_acquire)) { + // Uso l'ID corrente ( aggiornato) per la chiave messaggi + int nEffectiveId = nMyIdConnection + 1 ; + string sKey = GetMessageMapKey( nEffectiveId, sChannel) ; + s_vAsyncRedisMessages.try_emplace( sKey) ; + s_vAsyncRedisMessages[sKey].Lock.clear() ; + #if DEBUG + double dMsPublishTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ; + #endif + // Memorizzo il canale nella set dei canali + nextClient.m_set_SubChannels.insert( sChannel) ; + return true ; + } + LOG_INFO( GetCmdLogger(), ( string{ "Subscribe failed, attempt "} + ToString( nAttempt)).c_str()) ; + + // Breve pausa prima del Retry + this_thread::sleep_for( chrono::milliseconds( 100)) ; } - // Aggiorno la Mappa per la lettura dei Messaggio - string sKey = GetMessageMapKey( nIdConnection, sChannel) ; - s_vAsyncRedisMessages.try_emplace( sKey) ; - s_vAsyncRedisMessages[sKey].Lock.clear() ; - - #if DEBUG - double dMsSubscribeTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; - LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsSubscribeTime) + " ms").c_str()) - #endif - return true ; + // Tutti i tentativi sono falliti + return false ; } // --------------------------------------------------------------------------- @@ -1227,7 +1485,8 @@ ExeRedisAsyncUnsubscribe( int nIdConnection, const string& sChannel) if ( ! CheckIdConnection( nMyIdConnection)) return false ; // Verifico che la connessione sia attiva - if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) { + RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; + if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) { LOG_INFO( GetCmdLogger(), "Error : Id Connection not found") return false ; } @@ -1237,39 +1496,108 @@ ExeRedisAsyncUnsubscribe( int nIdConnection, const string& sChannel) return false ; } - // Recupero la connessione - RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ; + const int MAX_RETRY = 3 ; + for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) { - // Eseguo il comando UNSUBSCRIBE - AsyncRedisClient.m_ConnectionPubStatus.bUnsubscribed = false ; - if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr, - ( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) { - LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed") - return false ; - } - auto tStart = chrono::steady_clock::now() ; - auto tTimeOut = chrono::milliseconds( static_cast( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ; - while ( ! AsyncRedisClient.m_ConnectionSubStatus.bUnsubscribed) { - this_thread::sleep_for( chrono::milliseconds( 10)) ; - if ( chrono::steady_clock::now() - tStart > tTimeOut) { - LOG_INFO( GetCmdLogger(), "Timeout Unsubscribe exceeded") ; - return false ; + // Rileggo il client e lo stato ( potrebbero cambiare dopo reconnect) + RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ; + RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ; + + // Se la connessione non è viva, provo a riconnettere + if ( ! currStatus.bAlive.load( memory_order_acquire)) { + LOG_INFO( GetCmdLogger(), ( string{ "Unubscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ; + + // Recupero la stringa di connessione corrente + string sConnStr = currClient.m_sConnectionString ; + // Recupero i Canali su cui ero iscritto + unordered_set setSubChannels = currClient.m_set_SubChannels ; + + // Pulisco il contesto corrente + AsyncRedisClient.Clear() ; + + int nNewId = 0 ; + bool bOkReconnect = false ; + if ( ExeRedisAsyncConnect( sConnStr, nNewId)) { + // Se ho scelto lo stesso slot, aggiorno nMyIdConnection + int nNewIndex = nNewId - 1 ; + if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) { + // Sostituisco il riferimento Client con il nuovo + if ( nNewIndex != nMyIdConnection) + nMyIdConnection = nNewIndex ; + } + bOkReconnect = true ; + // Mi iscrivo a tutti i canali a cui ero precedentemente iscritto + for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) { + if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter)) + return false ; + } + } + else { + LOG_INFO( GetCmdLogger(), "Unsubscribe: reconnect attempt failed") ; + // riprovo al ciclo successivo... + } + // Se non mi sono riconesso metto un piccolo Delay per estrema sicureza + if ( ! bOkReconnect) { + this_thread::sleep_for( chrono::milliseconds( 100)) ; + continue ; + } } + + // Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione) + RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ; + RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ; + + // Ricalcolo timeout dal client aggiornato + auto tTimeOut = chrono::milliseconds( static_cast( nextClient.m_ConnectionInfo.nAsyncTimeout)) ; + + // Reset dello stato di pubblicazione + nextStatus.bUnsubscribed.store( false, memory_order_relaxed) ; + + int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr, + "UNSUBSCRIBE %s", sChannel.c_str()) ; + if ( nResult != REDIS_OK) { + LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed") ; + nextStatus.bAlive.store( false, memory_order_release) ; + this_thread::sleep_for( chrono::milliseconds( 100)) ; + continue ; + } + auto tStart = chrono::steady_clock::now() ; + while ( ! nextStatus.bUnsubscribed.load( memory_order_acquire)) { + if ( chrono::steady_clock::now() - tStart > tTimeOut) { + LOG_INFO( GetCmdLogger(), "Unsubscribe: timeout waiting for publish reply") ; + break ; + } + // NB. non usare la wait( false) !!! se in rete perdo qualche informazione + // rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro + this_thread::sleep_for( chrono::milliseconds( 10)) ; + } + + if ( nextStatus.bUnsubscribed.load( memory_order_acquire)) { + // Rimuovo la chiave dalla mappa messaggi usando l'ID effettivo (aggiornato) + int nEffectiveId = nMyIdConnection + 1 ; + string keyToRemove = GetMessageMapKey( nEffectiveId, sChannel) ; + for ( auto it = s_vAsyncRedisMessages.begin() ; it != s_vAsyncRedisMessages.end() ; ) { + if ( it->first == keyToRemove) + it = s_vAsyncRedisMessages.erase( it) ; + else + ++ it ; + } + #if DEBUG + double dMsUnsubscribeTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; + LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsUnsubscribeTime) + " ms").c_str()) ; + #endif + // Tolgo il canale dalla lista + nextClient.m_set_SubChannels.erase( sChannel) ; + return true ; + } + LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribe failed, attempt "} + ToString( nAttempt)).c_str()) ; + + // Breve pausa prima del Retry + this_thread::sleep_for( chrono::milliseconds( 100)) ; } - // Elimino i Messaggi dalla mappa - for ( auto Iter = s_vAsyncRedisMessages.begin() ; Iter != s_vAsyncRedisMessages.end() ; ) { - if ( Iter->first.compare( GetMessageMapKey( nIdConnection, sChannel)) == 0) - Iter = s_vAsyncRedisMessages.erase( Iter) ; - else - ++ Iter ; - } - - #if DEBUG - double dMsUnsubscribeTime = chrono::duration_cast( chrono::steady_clock::now() - tStart).count() ; - LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsUnsubscribeTime) + " ms").c_str()) - #endif - return true ; + // Tutti i tentativi sono falliti + return false ; } // ---------------------------------------------------------------------------