EgtExecutor :

- in Redis aggiunta gestione per riconnessioni.
This commit is contained in:
Riccardo Elitropi
2025-12-22 17:05:30 +01:00
parent c50ecf7d53
commit 9a719b94cf
+484 -156
View File
@@ -117,6 +117,7 @@ struct RedisConnectionStatus {
atomic<int> nIdConnection ;
atomic<bool> bConnected ;
atomic<bool> bAlive ;
atomic<bool> bAuthenticated ;
atomic<bool> bSelectedDB ;
atomic<bool> bDisconnected ;
@@ -128,13 +129,14 @@ struct RedisConnectionStatus {
RedisConnectionStatus()
: nIdConnection( 0),
bConnected( false), bAuthenticated( false), bSelectedDB( false), bDisconnected( false),
bConnected( false), bAlive( false), bAuthenticated( false), bSelectedDB( false), bDisconnected( false),
bPublished( false), bSubscribed( false), bUnsubscribed( false), bMessage( false),
sMessage( "") {}
inline void Clear() {
nIdConnection = 0 ;
bConnected = false ;
bAlive = false ;
bAuthenticated = false ;
bSelectedDB = false ;
bDisconnected = false ;
@@ -154,6 +156,8 @@ class RedisAsync {
atomic<bool> m_bFreeConnection ;
redisAsyncContext* m_pRedisAsyncPubContext ;
redisAsyncContext* m_pRedisAsyncSubContext ;
string m_sConnectionString ;
unordered_set<string> m_set_SubChannels ;
int m_nDataBase ;
string m_sPassword ;
string m_sUser ;
@@ -162,28 +166,63 @@ class RedisAsync {
RedisConnectionInfo m_ConnectionInfo ;
RedisConnectionStatus m_ConnectionSubStatus ;
RedisConnectionStatus m_ConnectionPubStatus ;
thread m_PubThread ;
thread m_SubThread ;
WSADATA m_wsaData ;
RedisAsync()
: m_bFreeConnection( true), m_pRedisAsyncPubContext( nullptr), m_pRedisAsyncSubContext( nullptr),
m_nDataBase( REDIS_MIN_DB), m_sPassword( ""), m_sUser( DEFAULT_USER),
m_sConnectionString( ""), m_set_SubChannels(), m_nDataBase( REDIS_MIN_DB), m_sPassword( ""), m_sUser( DEFAULT_USER),
m_bPubLoopRunning( false), m_bSubLoopRunning( false),
m_ConnectionInfo(), m_ConnectionSubStatus(), m_ConnectionPubStatus() {} ;
// thread è già inizializzato con il suo costruttore di Default valido
inline void Clear() {
m_bFreeConnection = true ;
redisAsyncFree( m_pRedisAsyncPubContext) ;
m_pRedisAsyncPubContext = nullptr ;
redisAsyncFree( m_pRedisAsyncSubContext) ;
m_pRedisAsyncSubContext = nullptr ;
// Loop da terminare
m_bPubLoopRunning.store( false, memory_order_release) ;
m_bSubLoopRunning.store( false, memory_order_release) ;
// Sveglio i Thread bloccanti
if ( m_pRedisAsyncPubContext)
redisAsyncDisconnect( m_pRedisAsyncPubContext) ;
if ( m_pRedisAsyncSubContext)
redisAsyncDisconnect( m_pRedisAsyncSubContext) ;
// Join dei Thread
if ( m_PubThread.joinable())
m_PubThread.join() ;
if ( m_SubThread.joinable())
m_SubThread.join() ;
// Rilascio dei contesti Redis
if ( m_pRedisAsyncPubContext) {
redisAsyncFree( m_pRedisAsyncPubContext) ;
m_pRedisAsyncPubContext = nullptr ;
}
if ( m_pRedisAsyncSubContext) {
redisAsyncFree( m_pRedisAsyncSubContext) ;
m_pRedisAsyncSubContext = nullptr ;
}
// Reset dello Stato
m_bFreeConnection.store( true, memory_order_release) ;
m_sConnectionString.clear() ;
m_nDataBase = REDIS_MIN_DB ;
m_sPassword = "" ;
m_sPassword.clear() ;
m_sUser = DEFAULT_USER ;
m_bPubLoopRunning = false ;
m_bSubLoopRunning = false ;
m_ConnectionInfo.Clear() ;
m_ConnectionSubStatus.Clear() ;
m_ConnectionPubStatus.Clear() ;
m_set_SubChannels.clear() ;
}
~RedisAsync() {
if ( m_PubThread.joinable())
m_PubThread.join() ;
if ( m_SubThread.joinable())
m_SubThread.join();
}
public :
@@ -209,18 +248,12 @@ void
RedisAsync::RedisEventLoop( bool bIsPub) const
{
// Verifica della validità del contesto
redisAsyncContext* ctx = nullptr ;
if ( bIsPub)
ctx = m_pRedisAsyncPubContext ;
else
ctx = m_pRedisAsyncSubContext ;
redisAsyncContext* ctx = ( bIsPub ? m_pRedisAsyncPubContext : m_pRedisAsyncSubContext) ;
if ( ctx == nullptr || ctx->err != 0)
return ; // se attivazione loop prima della inizializzazione callBack
// Recupero del file descriptor del socket TCP usato da Redis.
SOCKET sock = ctx->c.fd ;
// Imposto condizione di ascolto degli eventi
bool bLoopRunning = bIsPub ? m_bPubLoopRunning : m_bSubLoopRunning ;
// Creazione dei due insiemi per il file descriptor ( lettura e scrittura )
// r = read, w = write
fd_set rfds, wfds ;
@@ -233,7 +266,8 @@ RedisAsync::RedisEventLoop( bool bIsPub) const
timeval timeout = {0, 100000 } ; // 100ms
// Ciclo
while ( bLoopRunning) {
while ( ( bIsPub ? m_bPubLoopRunning.load( memory_order_acquire)
: m_bSubLoopRunning.load( memory_order_acquire))) {
// Se contesto non valido, interrompo il ciclo
if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING))
break ;
@@ -259,7 +293,7 @@ RedisAsync::RedisEventLoop( bool bIsPub) const
redisAsyncHandleWrite( ctx) ;
}
// Evito saturazione CPU
this_thread::sleep_for( chrono::milliseconds( 10)) ;
this_thread::sleep_for( chrono::milliseconds( 1)) ;
}
// Reset del timeOut ad ogni ciclo
@@ -270,7 +304,6 @@ RedisAsync::RedisEventLoop( bool bIsPub) const
return ;
}
//----------------------------------------------------------------------------
// [Utility] Funzione di Controllo per Id Connessione
// ---------------------------------------------------------------------------
@@ -632,7 +665,9 @@ RedisDbSElectionCallBack( redisAsyncContext* ctx, void* r, void*)
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
MyStatus->bSelectedDB = true ;
// Selezione riuscita
MyStatus->bSelectedDB.store( true, memory_order_release) ;
MyStatus->bSelectedDB.notify_one() ;
}
//----------------------------------------------------------------------------
@@ -654,7 +689,9 @@ RedisAuthenticationCallBack( redisAsyncContext* ctx, void* r, void*)
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
MyStatus->bAuthenticated = true ;
// Autenticazione riuscita
MyStatus->bAuthenticated.store( true, memory_order_release) ;
MyStatus->bAuthenticated.notify_one() ;
}
// ---------------------------------------------------------------------------
@@ -670,13 +707,14 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*)
return ;
}
// Recupero della risposta dal server redis ( nullptr nel caso di residui)
// Recupero della risposta dal server redis
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr)
return ;
LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked")
// Caso PUBLISH: hiredis risponde con REDIS_REPLY_INTEGER (numero di subscribers)
if ( reply->type == REDIS_REPLY_INTEGER) {
// Recupero lo Stato
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
@@ -685,11 +723,13 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*)
return ;
}
// Imposto stato di pubblicazione
MyStatus->bPublished = true ;
MyStatus->bPublished.store( true, memory_order_release) ;
MyStatus->bPublished.notify_one() ;
// Recupero il numero di clients iscritti al canale
LOG_INFO( GetCmdLogger(), ( string{ "Message published to "} + ToString( reply->integer) + " clients !").c_str()) ;
return ;
}
// Caso ARRAY: Subscribe / Unsubscribe / Message
else if ( reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) {
// Recupero la tipologia di chiamata effettuata
const char* msgType = ( reply->element[0] != nullptr && reply->element[0]->str != nullptr ?
@@ -700,8 +740,8 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*)
// Messaggio ricevuto da un canale
if ( reply->element[1] != nullptr && reply->element[1]->str != nullptr &&
reply->element[2] != nullptr && reply->element[2]->str != nullptr) {
string sChannel = reply->element[1]->str ;
string sMessage = reply->element[2]->str ;
string sChannel{ reply->element[1]->str, reply->element[1]->len} ;
string sMessage{ reply->element[2]->str, reply->element[2]->len} ;
#if DEBUG
LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " + sMessage).c_str())
#endif
@@ -729,7 +769,7 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*)
return ;
}
}
// --- Se Subscribe
// --- Se SUBSCRIBE
else if ( strcmp( msgType, "subscribe") == 0 && reply->elements >= 3) {
if ( reply->element[1] != nullptr && reply->element[1] != nullptr &&
reply->element[2] != nullptr) {
@@ -740,7 +780,8 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*)
return ;
}
// Imposto stato di Subscribe
MyStatus->bSubscribed = true ;
MyStatus->bSubscribed.store( true, memory_order_release) ;
MyStatus->bSubscribed.notify_one() ;
// Comunico il risultato
string sChannel = reply->element[1]->str ;
int nCount = reply->element[2]->integer ;
@@ -762,8 +803,9 @@ RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*)
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// Imposto stato di pubblicazione
MyStatus->bUnsubscribed = true ;
// Imposto stato Unsubscribe
MyStatus->bUnsubscribed.store( true, memory_order_release) ;
MyStatus->bUnsubscribed.notify_one() ;
// Comunico il risultato
string sChannel = reply->element[1]->str ;
int nCount = reply->element[2]->integer ;
@@ -836,19 +878,32 @@ WaitMessageCallback( redisAsyncContext* ctx, void* r, void*)
static void
RedisConnectCallBack( const redisAsyncContext* ctx, int nStatus)
{
// Verifico che il contesto e lo stato siano validi
if ( ctx == nullptr || nStatus != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : Async Disconnection CallBack failed ") ;
// Verifico che il Contesto sia valido
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack failed ") ;
return ;
}
// Imposto flag di Connessione
// Recupero dal Contesto lo stato della connessione
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
MyStatus->bConnected = true ;
// Verifico che lo stato internno al contesto Redis sia OK
if ( nStatus != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : Redis Async connection failed") ;
MyStatus->bAlive.store( false, memory_order_release) ;
MyStatus->bConnected.store( false, memory_order_release) ;
MyStatus->bConnected.notify_one() ;
return ;
}
// Connessione riuscita
MyStatus->bAlive.store( true, memory_order_release) ;
MyStatus->bConnected.store( true, memory_order_release) ;
MyStatus->bConnected.notify_one() ;
}
// ---------------------------------------------------------------------------
@@ -857,19 +912,29 @@ RedisConnectCallBack( const redisAsyncContext* ctx, int nStatus)
static void
RedisDisconnectCallBack( const redisAsyncContext* ctx, int nStatus)
{
// Verifico che il contesto e lo stato siano validi
if ( ctx == nullptr || nStatus != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : Async Disconnection CallBack failed ") ;
// Verifico che il Contesto sia valido
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Null Context for Disconnetion CallBack ") ;
return ;
}
// Imposto flag di Disconnessione
// Recupero lo stato della connessione
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
MyStatus->bDisconnected = true ;
// nStatus non è affidabile in questo contesto, può essere != REDIS_OK anche in disconnessione normale
// NB. nStatus è solo informativo, ignora tutte le disconnessioni reali
LOG_INFO( GetCmdLogger(), "Redis Async Disconnected ( x2, called for PUB and SUB)") ;
// Aggiorno gli stati ed eventuali thread in attesa
MyStatus->bAlive.store( false, memory_order_release) ;
MyStatus->bConnected.store( false, memory_order_release) ;
MyStatus->bConnected.notify_one() ;
MyStatus->bDisconnected.store( true, memory_order_release) ;
MyStatus->bDisconnected.notify_one() ;
}
//----------------------------------------------------------------------------
@@ -881,7 +946,7 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection)
// Inizializzo una nuova connessione, recuperando l'Id
int nMyIdConnection = NO_CONNECTION ;
for ( int i = 0 ; i < int( s_vAsyncRedisClients.size()) ; ++ i) {
if ( s_vAsyncRedisClients[i].m_bFreeConnection) {
if ( s_vAsyncRedisClients[i].m_bFreeConnection.load( memory_order_acquire)) {
nMyIdConnection = i ;
break ;
}
@@ -891,13 +956,16 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection)
ToString( MAX_CONNECTION_NBR) + " )").c_str())
return false ;
}
// Recupero lo Slot
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
AsyncRedisClient.Clear() ; // per possibile riconnessione
AsyncRedisClient.m_bFreeConnection.store( false, memory_order_release) ;
// Recupero i riferimenti necessari e blocco la connessione corrente
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
AsyncRedisClient.m_bFreeConnection = false ;
RedisConnectionInfo& ConnectionInfo = AsyncRedisClient.m_ConnectionInfo ;
// Recupero i parametri dalla stringa di connessione
AsyncRedisClient.m_sConnectionString = sConnection ; // per riconnessione
if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) {
AsyncRedisClient.Clear() ;
return false ;
@@ -961,28 +1029,44 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection)
}
// Definizione e partenza del ciclo degli eventi in un thread separato ( in BackGround)
AsyncRedisClient.m_bPubLoopRunning = true ;
AsyncRedisClient.m_bSubLoopRunning = true ;
thread tPubEventLoop( &RedisAsync::RedisEventLoop, &AsyncRedisClient, true) ;
thread tSubEventLoop( &RedisAsync::RedisEventLoop, &AsyncRedisClient, false) ;
tPubEventLoop.detach() ;
tSubEventLoop.detach() ;
AsyncRedisClient.m_ConnectionPubStatus.Clear() ;
AsyncRedisClient.m_ConnectionSubStatus.Clear() ;
AsyncRedisClient.m_bPubLoopRunning.store( true, memory_order_release) ;
AsyncRedisClient.m_bSubLoopRunning.store( true, memory_order_release) ;
AsyncRedisClient.m_PubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, true) ;
AsyncRedisClient.m_SubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, false) ;
// Imposto e Definisco la funzione di CallBack per contesto PUB/SUB per Connessione asincrona
// Imposto e Definisco la funzione di CallBack per contesto PUB/SUB per Connessione/Disconnessione asincrona
AsyncRedisClient.m_pRedisAsyncPubContext->data = &AsyncRedisClient.m_ConnectionPubStatus ;
AsyncRedisClient.m_pRedisAsyncSubContext->data = &AsyncRedisClient.m_ConnectionSubStatus ;
AsyncRedisClient.m_ConnectionPubStatus.bConnected.store( false, memory_order_release) ;
AsyncRedisClient.m_ConnectionSubStatus.bConnected.store( false, memory_order_release) ;
redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisConnectCallBack) ;
redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisConnectCallBack) ;
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDisconnectCallBack) ;
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDisconnectCallBack) ;
// Attendo la risposta di CallBack
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bConnected || ! AsyncRedisClient.m_ConnectionSubStatus.bConnected) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bConnected.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bConnected.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
LOG_INFO( GetCmdLogger(), "Connected to Redis !")
#if DEBUG
@@ -991,20 +1075,34 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection)
#endif
// Se presente una Password, eseguo l'autenticazione
if ( AsyncRedisClient.m_sPassword.empty()) {
if ( ! AsyncRedisClient.m_sPassword.empty()) {
AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( false, memory_order_relaxed) ;
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisAuthenticationCallBack,
nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ;
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisAuthenticationCallBack,
nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ;
tStart = chrono::steady_clock::now() ;
tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated || ! AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
LOG_INFO( GetCmdLogger(), "Authenticated to Redis !")
#if DEBUG
@@ -1013,25 +1111,39 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection)
#endif
}
else {
AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated = true ;
AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated = true ;
AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( true, memory_order_release) ;
AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( true, memory_order_release) ;
}
// Se Presente una Selezione del Database, eseguo la selezione
if ( AsyncRedisClient.m_nDataBase != DEFAULT_DATABASE) {
AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( false, memory_order_relaxed) ;
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDbSElectionCallBack,
nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ;
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDbSElectionCallBack,
nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ;
tStart = chrono::steady_clock::now() ;
tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB || ! AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ;
#if DEBUG
@@ -1040,14 +1152,10 @@ ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection)
#endif
}
else {
AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB = true ;
AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB = true ;
AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( true, memory_order_release) ;
AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( true, memory_order_release) ;
}
// Imposto e Definisco la funzione di CallBack per la Disconnessione Pub/Sub asincrona
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDisconnectCallBack) ;
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDisconnectCallBack) ;
// Restituisco e salvo l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua)
nIdConnection = nMyIdConnection + 1 ;
AsyncRedisClient.m_ConnectionPubStatus.nIdConnection = nIdConnection ;
@@ -1065,14 +1173,21 @@ ExeRedisAsyncDisconnect( int nIdConnection)
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
// Recupero la connessione
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
// Verifico che la connessione sia utilizzata, e non libera
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Recupero la connessione
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
// Reset dei Flag prima della connessione
AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionPubStatus.bAlive.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionSubStatus.bAlive.store( false, memory_order_relaxed) ;
// Effettuo la Disconnesione asincrona ( libera la memoria in automatico)
redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncPubContext) ;
@@ -1081,9 +1196,15 @@ ExeRedisAsyncDisconnect( int nIdConnection)
// Attendo la CallBack di disconnessione
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bDisconnected ||
! AsyncRedisClient.m_ConnectionSubStatus.bDisconnected) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.load( memory_order_acquire)) {
AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.wait( false) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
return false ;
}
}
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.load( memory_order_acquire)) {
AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.wait( false) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
return false ;
@@ -1091,15 +1212,12 @@ ExeRedisAsyncDisconnect( int nIdConnection)
}
// Rendo libera la connessione
AsyncRedisClient.m_bFreeConnection = true ;
AsyncRedisClient.m_bFreeConnection.store( true) ;
AsyncRedisClient.Clear() ;
// Pulisco WinSock2 ( timer ?)
// ???
// Elimino i Messaggi dalla mappa
for ( auto Iter = s_vAsyncRedisMessages.begin() ; Iter != s_vAsyncRedisMessages.end() ; ) {
if ( Iter->first.compare( GetMessageMapKey( nIdConnection, "")) == 0)
if ( Iter->first.starts_with( GetMessageMapKey( nIdConnection, "")))
Iter = s_vAsyncRedisMessages.erase( Iter) ;
else
++ Iter ;
@@ -1123,8 +1241,10 @@ ExeRedisAsyncPublish( int nIdConnection, const string& sChannel, const string& s
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
@@ -1137,30 +1257,98 @@ ExeRedisAsyncPublish( int nIdConnection, const string& sChannel, const string& s
if ( sMessage.empty())
LOG_INFO( GetCmdLogger(), "Warning : Empty Message") ;
// Recupero la connessione
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
// Numero massimo di Tentativi per effettuare PUBLISH
const int MAX_RETRY = 3 ;
for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) {
// Eseguo il comando PUBLISH
AsyncRedisClient.m_ConnectionPubStatus.bPublished = false ;
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisPubSubUnsubMsgCallback, nullptr,
"PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error: redisAsyncCommand PUBLISH failed")
return false ;
}
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bPublished) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Subscribe exceeded") ;
return false ;
// Rileggo il Client, dato che potrebbe essere cambiato dopo la riconnessione
RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& curStatus = currClient.m_ConnectionPubStatus ;
// Se la connessione non è attiva, provo a riconnetermi
if ( ! currClient.m_ConnectionPubStatus.bAlive.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), ( string{ "Publish: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ;
// Recupero la stringa di connessione corrente
string sConnStr = s_vAsyncRedisClients[nMyIdConnection].m_sConnectionString ;
// Recupero i Canali su cui ero iscritto
unordered_set<string> setSubChannels = currClient.m_set_SubChannels ;
// Pulisco il contesto corrente
AsyncRedisClient.Clear() ;
int nNewId = 0 ;
bool bOkReconnect = false ;
if ( ExeRedisAsyncConnect( sConnStr, nNewId)) {
// Se ho scelto lo stesso slot, aggiorno nMyIdConnection
int nNewIndex = nNewId - 1 ;
if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) {
// Sostituisco il riferimento Client con il nuovo
if ( nNewIndex != nMyIdConnection)
nMyIdConnection = nNewIndex ;
}
bOkReconnect = true ;
// Mi iscrivo a tutti i canali a cui ero precedentemente iscritto
for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) {
if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter))
return false ;
}
}
else {
LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ;
// riprovo al ciclo successivo...
}
// Se non mi sono riconesso metto un piccolo Delay per estrema sicureza
if ( ! bOkReconnect) {
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
}
// Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione)
RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& nextStatus = nextClient.m_ConnectionPubStatus ;
// Ricalcolo timeout dal client aggiornato
auto tTimeOut = chrono::milliseconds( static_cast<int>( nextClient.m_ConnectionInfo.nAsyncTimeout)) ;
// Reset dello stato di pubblicazione
nextStatus.bPublished.store( false, memory_order_relaxed) ;
// Invio del comando PUBLISH
int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncPubContext, RedisPubSubUnsubMsgCallback, nullptr,
"PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) ;
if ( nResult != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Publish: redisAsyncCommand failed") ;
nextStatus.bAlive.store( false, memory_order_release) ;
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
auto tStart = chrono::steady_clock::now() ;
while ( ! nextStatus.bPublished.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ;
break ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
if ( nextStatus.bPublished.load( memory_order_acquire)) {
#if DEBUG
double dMsPublishTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ;
#endif
return true ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Publish failed, attempt "} + ToString( nAttempt)).c_str()) ;
// Breve pausa prima del Retry
this_thread::sleep_for( chrono::milliseconds( 100)) ;
}
#if DEBUG
double dMsPublishTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsPublishTime) + " ms").c_str())
#endif
return true ;
// Tutti i tentativi sono falliti
return false ;
}
//----------------------------------------------------------------------------
@@ -1174,7 +1362,8 @@ ExeRedisAsyncSubscribe( int nIdConnection, const string& sChannel)
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
@@ -1184,36 +1373,105 @@ ExeRedisAsyncSubscribe( int nIdConnection, const string& sChannel)
return false ;
}
// Recupero la connessione
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
// Numero massimo di Tentativi per effettuare SUBSCRIBE
const int MAX_RETRY = 3 ;
for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) {
// Eseguo il comando SUBSCRIBE
AsyncRedisClient.m_ConnectionPubStatus.bSubscribed = false ;
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand SUBSCRIBE failed")
return false ;
}
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bSubscribed) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Subscribe exceeded") ;
return false ;
// Rileggo il client e lo stato (potrebbero cambiare dopo reconnect)
RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ;
// Se la connessione non è viva, provo a riconnettere
if ( ! currStatus.bAlive.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), ( string{ "Subscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ;
// Recupero la stringa di connessione corrente
string sConnStr = currClient.m_sConnectionString ;
// Recupero i Canali su cui ero iscritto
unordered_set<string> setSubChannels = currClient.m_set_SubChannels ;
// Pulisco il contesto corrente
AsyncRedisClient.Clear() ;
int nNewId = 0 ;
bool bOkReconnect = false ;
if ( ExeRedisAsyncConnect( sConnStr, nNewId)) {
// Se ho scelto lo stesso slot, aggiorno nMyIdConnection
int nNewIndex = nNewId - 1 ;
if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) {
// Sostituisco il riferimento Client con il nuovo
if ( nNewIndex != nMyIdConnection)
nMyIdConnection = nNewIndex ;
}
bOkReconnect = true ;
// Mi iscrivo a tutti i canali a cui ero precedentemente iscritto
for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) {
if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter))
return false ;
}
}
else {
LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ;
// riprovo al ciclo successivo...
}
// Se non mi sono riconesso metto un piccolo Delay per estrema sicureza
if ( ! bOkReconnect) {
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
}
// Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione)
RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ;
// Ricalcolo timeout dal client aggiornato
auto tTimeOut = chrono::milliseconds( static_cast<int>( nextClient.m_ConnectionInfo.nAsyncTimeout)) ;
// Reset dello stato di pubblicazione
nextStatus.bSubscribed.store( false, memory_order_relaxed) ;
int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
"SUBSCRIBE %s", sChannel.c_str()) ;
if ( nResult != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Subscribe: redisAsyncCommand failed") ;
nextStatus.bAlive.store( false, memory_order_release) ;
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
auto tStart = chrono::steady_clock::now() ;
while ( ! nextStatus.bSubscribed.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ;
break ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
if ( nextStatus.bSubscribed.load( memory_order_acquire)) {
// Uso l'ID corrente ( aggiornato) per la chiave messaggi
int nEffectiveId = nMyIdConnection + 1 ;
string sKey = GetMessageMapKey( nEffectiveId, sChannel) ;
s_vAsyncRedisMessages.try_emplace( sKey) ;
s_vAsyncRedisMessages[sKey].Lock.clear() ;
#if DEBUG
double dMsPublishTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ;
#endif
// Memorizzo il canale nella set dei canali
nextClient.m_set_SubChannels.insert( sChannel) ;
return true ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Subscribe failed, attempt "} + ToString( nAttempt)).c_str()) ;
// Breve pausa prima del Retry
this_thread::sleep_for( chrono::milliseconds( 100)) ;
}
// Aggiorno la Mappa per la lettura dei Messaggio
string sKey = GetMessageMapKey( nIdConnection, sChannel) ;
s_vAsyncRedisMessages.try_emplace( sKey) ;
s_vAsyncRedisMessages[sKey].Lock.clear() ;
#if DEBUG
double dMsSubscribeTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsSubscribeTime) + " ms").c_str())
#endif
return true ;
// Tutti i tentativi sono falliti
return false ;
}
// ---------------------------------------------------------------------------
@@ -1227,7 +1485,8 @@ ExeRedisAsyncUnsubscribe( int nIdConnection, const string& sChannel)
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
@@ -1237,39 +1496,108 @@ ExeRedisAsyncUnsubscribe( int nIdConnection, const string& sChannel)
return false ;
}
// Recupero la connessione
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
const int MAX_RETRY = 3 ;
for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) {
// Eseguo il comando UNSUBSCRIBE
AsyncRedisClient.m_ConnectionPubStatus.bUnsubscribed = false ;
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed")
return false ;
}
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bUnsubscribed) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Unsubscribe exceeded") ;
return false ;
// Rileggo il client e lo stato ( potrebbero cambiare dopo reconnect)
RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ;
// Se la connessione non è viva, provo a riconnettere
if ( ! currStatus.bAlive.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), ( string{ "Unubscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ;
// Recupero la stringa di connessione corrente
string sConnStr = currClient.m_sConnectionString ;
// Recupero i Canali su cui ero iscritto
unordered_set<string> setSubChannels = currClient.m_set_SubChannels ;
// Pulisco il contesto corrente
AsyncRedisClient.Clear() ;
int nNewId = 0 ;
bool bOkReconnect = false ;
if ( ExeRedisAsyncConnect( sConnStr, nNewId)) {
// Se ho scelto lo stesso slot, aggiorno nMyIdConnection
int nNewIndex = nNewId - 1 ;
if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) {
// Sostituisco il riferimento Client con il nuovo
if ( nNewIndex != nMyIdConnection)
nMyIdConnection = nNewIndex ;
}
bOkReconnect = true ;
// Mi iscrivo a tutti i canali a cui ero precedentemente iscritto
for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) {
if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter))
return false ;
}
}
else {
LOG_INFO( GetCmdLogger(), "Unsubscribe: reconnect attempt failed") ;
// riprovo al ciclo successivo...
}
// Se non mi sono riconesso metto un piccolo Delay per estrema sicureza
if ( ! bOkReconnect) {
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
}
// Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione)
RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ;
// Ricalcolo timeout dal client aggiornato
auto tTimeOut = chrono::milliseconds( static_cast<int>( nextClient.m_ConnectionInfo.nAsyncTimeout)) ;
// Reset dello stato di pubblicazione
nextStatus.bUnsubscribed.store( false, memory_order_relaxed) ;
int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
"UNSUBSCRIBE %s", sChannel.c_str()) ;
if ( nResult != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed") ;
nextStatus.bAlive.store( false, memory_order_release) ;
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
auto tStart = chrono::steady_clock::now() ;
while ( ! nextStatus.bUnsubscribed.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Unsubscribe: timeout waiting for publish reply") ;
break ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
if ( nextStatus.bUnsubscribed.load( memory_order_acquire)) {
// Rimuovo la chiave dalla mappa messaggi usando l'ID effettivo (aggiornato)
int nEffectiveId = nMyIdConnection + 1 ;
string keyToRemove = GetMessageMapKey( nEffectiveId, sChannel) ;
for ( auto it = s_vAsyncRedisMessages.begin() ; it != s_vAsyncRedisMessages.end() ; ) {
if ( it->first == keyToRemove)
it = s_vAsyncRedisMessages.erase( it) ;
else
++ it ;
}
#if DEBUG
double dMsUnsubscribeTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsUnsubscribeTime) + " ms").c_str()) ;
#endif
// Tolgo il canale dalla lista
nextClient.m_set_SubChannels.erase( sChannel) ;
return true ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribe failed, attempt "} + ToString( nAttempt)).c_str()) ;
// Breve pausa prima del Retry
this_thread::sleep_for( chrono::milliseconds( 100)) ;
}
// Elimino i Messaggi dalla mappa
for ( auto Iter = s_vAsyncRedisMessages.begin() ; Iter != s_vAsyncRedisMessages.end() ; ) {
if ( Iter->first.compare( GetMessageMapKey( nIdConnection, sChannel)) == 0)
Iter = s_vAsyncRedisMessages.erase( Iter) ;
else
++ Iter ;
}
#if DEBUG
double dMsUnsubscribeTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsUnsubscribeTime) + " ms").c_str())
#endif
return true ;
// Tutti i tentativi sono falliti
return false ;
}
// ---------------------------------------------------------------------------