Files
Riccardo Elitropi 9a719b94cf EgtExecutor :
- in Redis aggiunta gestione per riconnessioni.
2025-12-22 17:05:30 +01:00

1714 lines
73 KiB
C++

//----------------------------------------------------------------------------
// EgalTech 2025-2025
//----------------------------------------------------------------------------
// File : EXE_Redis.cpp Data : 17.09.25 Versione : 2.7i3
// Contenuto : Funzioni per interfacciarsi con server Redis.
//
//
//
// Modifiche : 17.09.25 RE Creazione modulo. ( ver. 2.7i3)
// Modifiche : 31.10.25 RE Aggiunta gestione connessioni multiple. ( ver. 2.7j4)
// Modifiche : 04.12.25 RE Aggiunta Memorizzazione messaggio. ( ver 2.7l1)
//
//
//----------------------------------------------------------------------------
//--------------------------- Include ----------------------------------------
#include <winsock2.h>
#include <WS2tcpip.h>
#include "stdafx.h"
#include "EXE.h"
#include "EXE_Macro.h"
#include "AuxTools.h"
#include "/EgtDev/Include/EXeExecutor.h"
#include "/EgtDev/Include/EGkStringUtils3d.h"
#include "/EgtDev/Include/EGnStringKeyVal.h"
#pragma warning( disable: 4244) // conversione da 'uint64_t' a 'size_t'
#pragma warning( disable: 4200) // utilizzata estensione non standard: matrice di dimensioni zero in struct/union
#include "/EgtDev/Extern/hiredis/Include/async.h"
#include <thread>
#include <atomic>
#include <map>
#include <sstream>
#include <future>
using namespace std ;
// ---------------------------------------------------------------------------
// Ambiente Debug
// ---------------------------------------------------------------------------
#define DEBUG 0
// ---------------------------------------------------------------------------
// Definizione variabili generali Redis
// ---------------------------------------------------------------------------
static const int REDIS_MIN_DB = 0 ;
static const int REDIS_MAX_DB = 15 ;
static const int DEFAULT_PORT = 6379 ;
static const int DEFAULT_SENTINEL_MASTER_PORT = 26379 ;
static const string DEFAULT_USER = "default" ;
static const int DEFAULT_DATABASE = 0 ;
static const int DEFAULT_TIMEOUT = 15000 ; // [ms]
static const int DEFAULT_ALIVE_TIME = 180 ; // [s]
static const int MAX_CONNECTION_NBR = 10 ;
static const int NO_CONNECTION = - 1 ;
static const string KEY_TYPE_STRING = "string" ;
static const string KEY_TYPE_LIST = "list" ; // non usato
static const string KEY_TYPE_SET = "set" ; // non usato
static const string KEY_TYPE_HASH = "hash" ; // non usato
static const string KEY_TYPE_ZSET = "zset" ; // non usato
static const string KEY_TYPE_JSON = "ReJSON-RL" ; // non usato, disponibile se versione >= 8 ( noi usiamo la 6)
// Struttura per parametri della stringa di Connessione
struct RedisConnectionInfo {
string sHost, sServiceName, sUser, sPassword ;
int nPort, nDefaultDataBase, nKeepAlive, nConnectTimeout, nSyncTimeout, nAsyncTimeout ;
bool bAbortConnect, bSsl, bAllowAdmin ;
RedisConnectionInfo() {
sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ;
nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ;
nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ;
bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ;
} ;
RedisConnectionInfo( string sMyHost, string sMyServiceName, string sMyUser, string sMyPassword,
int nMyConnection, int nMyPort, int nMyDefaultDataBase, int dMyKeepAlive,
int dMyConnectTimeout, int dMySyncTimeout, int dMyAsyncTimeout,
bool bMyAbortConnect, bool bMySsl, bool bMyAllowAdmin)
: sHost( sMyHost), sServiceName( sMyServiceName), sUser( sMyUser), sPassword( sMyPassword),
nPort( nMyPort), nDefaultDataBase( nMyDefaultDataBase), nKeepAlive( dMyKeepAlive),
nConnectTimeout( dMyConnectTimeout), nSyncTimeout( dMySyncTimeout),
nAsyncTimeout( dMyAsyncTimeout), bAbortConnect( bMyAbortConnect), bSsl( bMySsl),
bAllowAdmin( bMyAllowAdmin) {}
inline void Clear() {
sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ;
nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ;
nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ;
bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ;
}
} ;
// Struttura per Connessiona Sincrona
struct RedisSync {
atomic<bool> bFreeConnection ;
redisContext* pRedisContext ;
RedisConnectionInfo ConnectionInfo ;
RedisSync()
: bFreeConnection( true), pRedisContext( nullptr), ConnectionInfo() {}
inline void Clear() {
bFreeConnection = true ;
redisFree( pRedisContext) ;
pRedisContext = nullptr ;
ConnectionInfo.Clear() ;
}
} ;
static array<RedisSync, MAX_CONNECTION_NBR> s_vRedisClients ;
// Struttura per stato di Connessione Asincrona
struct RedisConnectionStatus {
atomic<int> nIdConnection ;
atomic<bool> bConnected ;
atomic<bool> bAlive ;
atomic<bool> bAuthenticated ;
atomic<bool> bSelectedDB ;
atomic<bool> bDisconnected ;
atomic<bool> bPublished ;
atomic<bool> bSubscribed ;
atomic<bool> bUnsubscribed ;
atomic<bool> bMessage ;
string sMessage ; // univoco grazie a bMessage
RedisConnectionStatus()
: nIdConnection( 0),
bConnected( false), bAlive( false), bAuthenticated( false), bSelectedDB( false), bDisconnected( false),
bPublished( false), bSubscribed( false), bUnsubscribed( false), bMessage( false),
sMessage( "") {}
inline void Clear() {
nIdConnection = 0 ;
bConnected = false ;
bAlive = false ;
bAuthenticated = false ;
bSelectedDB = false ;
bDisconnected = false ;
bPublished = false ;
bSubscribed = false ;
bUnsubscribed = false ;
bMessage = false ;
sMessage = "" ;
}
} ;
// Classe per Connessione Asincrona
class RedisAsync {
public :
atomic<bool> m_bFreeConnection ;
redisAsyncContext* m_pRedisAsyncPubContext ;
redisAsyncContext* m_pRedisAsyncSubContext ;
string m_sConnectionString ;
unordered_set<string> m_set_SubChannels ;
int m_nDataBase ;
string m_sPassword ;
string m_sUser ;
atomic<bool> m_bPubLoopRunning ;
atomic<bool> m_bSubLoopRunning ;
RedisConnectionInfo m_ConnectionInfo ;
RedisConnectionStatus m_ConnectionSubStatus ;
RedisConnectionStatus m_ConnectionPubStatus ;
thread m_PubThread ;
thread m_SubThread ;
WSADATA m_wsaData ;
RedisAsync()
: m_bFreeConnection( true), m_pRedisAsyncPubContext( nullptr), m_pRedisAsyncSubContext( nullptr),
m_sConnectionString( ""), m_set_SubChannels(), m_nDataBase( REDIS_MIN_DB), m_sPassword( ""), m_sUser( DEFAULT_USER),
m_bPubLoopRunning( false), m_bSubLoopRunning( false),
m_ConnectionInfo(), m_ConnectionSubStatus(), m_ConnectionPubStatus() {} ;
// thread è già inizializzato con il suo costruttore di Default valido
inline void Clear() {
// Loop da terminare
m_bPubLoopRunning.store( false, memory_order_release) ;
m_bSubLoopRunning.store( false, memory_order_release) ;
// Sveglio i Thread bloccanti
if ( m_pRedisAsyncPubContext)
redisAsyncDisconnect( m_pRedisAsyncPubContext) ;
if ( m_pRedisAsyncSubContext)
redisAsyncDisconnect( m_pRedisAsyncSubContext) ;
// Join dei Thread
if ( m_PubThread.joinable())
m_PubThread.join() ;
if ( m_SubThread.joinable())
m_SubThread.join() ;
// Rilascio dei contesti Redis
if ( m_pRedisAsyncPubContext) {
redisAsyncFree( m_pRedisAsyncPubContext) ;
m_pRedisAsyncPubContext = nullptr ;
}
if ( m_pRedisAsyncSubContext) {
redisAsyncFree( m_pRedisAsyncSubContext) ;
m_pRedisAsyncSubContext = nullptr ;
}
// Reset dello Stato
m_bFreeConnection.store( true, memory_order_release) ;
m_sConnectionString.clear() ;
m_nDataBase = REDIS_MIN_DB ;
m_sPassword.clear() ;
m_sUser = DEFAULT_USER ;
m_ConnectionInfo.Clear() ;
m_ConnectionSubStatus.Clear() ;
m_ConnectionPubStatus.Clear() ;
m_set_SubChannels.clear() ;
}
~RedisAsync() {
if ( m_PubThread.joinable())
m_PubThread.join() ;
if ( m_SubThread.joinable())
m_SubThread.join();
}
public :
void RedisEventLoop( bool bIsSub) const ;
} ;
static array<RedisAsync, MAX_CONNECTION_NBR> s_vAsyncRedisClients ;
// Mappa lettura Messaggi per Modalità Asicnrona
struct RedisAsyncMessage {
atomic_flag Lock = ATOMIC_FLAG_INIT ;
int nCount = 0 ;
string sMessage ;
} ;
static unordered_map<string, RedisAsyncMessage> s_vAsyncRedisMessages ;
// ---------------------------------------------------------------------------
// [Redis] Funzione Ciclo degli eventi per Read/Write su Socket
// ---------------------------------------------------------------------------
void
RedisAsync::RedisEventLoop( bool bIsPub) const
{
// Verifica della validità del contesto
redisAsyncContext* ctx = ( bIsPub ? m_pRedisAsyncPubContext : m_pRedisAsyncSubContext) ;
if ( ctx == nullptr || ctx->err != 0)
return ; // se attivazione loop prima della inizializzazione callBack
// Recupero del file descriptor del socket TCP usato da Redis.
SOCKET sock = ctx->c.fd ;
// Creazione dei due insiemi per il file descriptor ( lettura e scrittura )
// r = read, w = write
fd_set rfds, wfds ;
// Timeout per funzione select { secondi, millisecondi}
// La funzione select attende che il socket sia pronto per lettura e scrittura
// Resituisce :
// SOCKET_ERROR in caso di errore
// 0 se scade il timeout
// > 0 se ci sono eventi da gestire
timeval timeout = {0, 100000 } ; // 100ms
// Ciclo
while ( ( bIsPub ? m_bPubLoopRunning.load( memory_order_acquire)
: m_bSubLoopRunning.load( memory_order_acquire))) {
// Se contesto non valido, interrompo il ciclo
if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING))
break ;
// Recupero file descriptor di lettura e scrittura
FD_ZERO( &rfds) ;
FD_ZERO( &wfds) ;
FD_SET( sock, &rfds) ;
FD_SET( sock, &wfds) ;
// Controllo il risultato ottenuto
int nRet = select( 0, &rfds, &wfds, nullptr, &timeout) ;
if ( nRet == SOCKET_ERROR) {
// Se errore -> interruzione del ciclo
LOG_INFO( GetCmdLogger(), ( string{ "Error : Socket select() failed -> "} +
ToString( WSAGetLastError())).c_str()) ;
break ;
}
else if ( nRet > 0) {
// Se Socket pronto per lettura/ scrittua
if ( ctx != nullptr && ! ( ctx->c.flags & REDIS_DISCONNECTING) && ctx->c.fd != -1) {
if ( FD_ISSET( sock, &rfds))
redisAsyncHandleRead( ctx) ;
if ( FD_ISSET( sock, &wfds))
redisAsyncHandleWrite( ctx) ;
}
// Evito saturazione CPU
this_thread::sleep_for( chrono::milliseconds( 1)) ;
}
// Reset del timeOut ad ogni ciclo
timeout.tv_sec = 0 ;
timeout.tv_usec = 100000 ; // 100ms
}
return ;
}
//----------------------------------------------------------------------------
// [Utility] Funzione di Controllo per Id Connessione
// ---------------------------------------------------------------------------
static bool
CheckIdConnection( int nIdConnection)
{
if ( nIdConnection < 0 || nIdConnection >= MAX_CONNECTION_NBR) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : Not a Valid id Connestion : [1, "} + ToString( MAX_CONNECTION_NBR) + "]").c_str())
return false ;
}
return true ;
}
// ---------------------------------------------------------------------------
// [Utility] Funzione di Split parametri per Stringa di Connessione
// ---------------------------------------------------------------------------
static bool
GetParamsFromConnectionString( const string& sConnection, RedisConnectionInfo& ConnectionInfo)
{
// Se la stringa di connessione è vuota, errore
if ( sConnection.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Empty Connection String") ;
return false ;
}
// Ricerco le informazioni Splittando la stringa di connessione per ","
string sHostPort, sParams ;
SplitFirst( sConnection, ",", sHostPort, sParams) ;
if ( ! sHostPort.empty()) {
// Host e Porta
string sHost, sPort ;
SplitFirst( sHostPort, ":", sHost, sPort) ;
if ( sHost.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Invalid Host") ;
return false ;
}
int nPort = DEFAULT_PORT ;
if ( ! sPort.empty()) {
if ( ! FromString( sPort, nPort)) {
LOG_INFO( GetCmdLogger(), "Error : Invalid Port") ;
return false ;
}
}
ConnectionInfo.sHost = sHost ;
ConnectionInfo.nPort = nPort ;
}
// Recupero i parametri ( rimuovo tutti gli spazi )
sParams.erase( remove( sParams.begin(), sParams.end(), ' '), sParams.end()) ;
GetValInNotes( sParams, "serviceName", ",", ConnectionInfo.sServiceName) ;
GetValInNotes( sParams, "user", ",", ConnectionInfo.sUser) ;
GetValInNotes( sParams, "password", ",", ConnectionInfo.sPassword) ;
GetValInNotes( sParams, "DefaultDatabase", ",", ConnectionInfo.nDefaultDataBase) ;
if ( ConnectionInfo.nDefaultDataBase < 0 || ConnectionInfo.nDefaultDataBase > REDIS_MAX_DB) {
ConnectionInfo.Clear() ;
LOG_INFO( GetCmdLogger(), "Error : DB Out of range [0, 15]")
return false ;
}
GetValInNotes( sParams, "keepAlive", ",", ConnectionInfo.nKeepAlive) ;
GetValInNotes( sParams, "connectTimeout", ",", ConnectionInfo.nConnectTimeout) ;
GetValInNotes( sParams, "syncTimeout", ",", ConnectionInfo.nSyncTimeout) ;
GetValInNotes( sParams, "asyncTimeout", ",", ConnectionInfo.nAsyncTimeout) ;
GetValInNotes( sParams, "abortConnect", ",", ConnectionInfo.bAbortConnect) ;
GetValInNotes( sParams, "ssl", "," ,ConnectionInfo.bSsl) ;
GetValInNotes( sParams, "allowAdmin", ",", ConnectionInfo.bAllowAdmin) ;
return true ;
}
// ---------------------------------------------------------------------------
// [Utility] Funzione di Creazione Chiave di accesso alla Mappa dei Messaggi
// ---------------------------------------------------------------------------
static string
GetMessageMapKey( int nIdConnection, const string& sChannel)
{
return ( ToString( nIdConnection) + "_" + sChannel) ;
}
//----------------------------------------------------------------------------
// Funzione per comando CONNECT Sincrono
// ---------------------------------------------------------------------------
bool
ExeRedisConnect( const string& sConnection, int& nIdConnection)
{
// Inizializzo una nuova connessione, recuperando l'Id
int nMyIdConnection = NO_CONNECTION ;
for ( int i = 0 ; i < int( s_vRedisClients.size()) ; ++ i) {
if ( s_vRedisClients[i].bFreeConnection) {
nMyIdConnection = i ;
break ;
}
}
if ( nMyIdConnection == NO_CONNECTION) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} +
ToString( MAX_CONNECTION_NBR) + " )").c_str())
return false ;
}
// Recupero i riferimenti necessari e blocco la connessione corrente
RedisSync& SyncRedisClient = s_vRedisClients[nMyIdConnection] ;
SyncRedisClient.bFreeConnection = false ;
RedisConnectionInfo& ConnectionInfo = SyncRedisClient.ConnectionInfo ;
// Recupero i parametri dalla stringa di connessione
if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) {
SyncRedisClient.Clear() ;
return false ;
}
// Se connessione senza TimeOut
if ( ConnectionInfo.nSyncTimeout <= 0) {
// Imposto il contesto per connesione sincrona
SyncRedisClient.pRedisContext = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
if ( SyncRedisClient.pRedisContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
return false ;
}
else if ( SyncRedisClient.pRedisContext->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str())
SyncRedisClient.Clear() ;
return false ;
}
}
// Se connessione con parametri
else {
// TimeOut per connessione
struct timeval TimeOutConnection{} ;
TimeOutConnection.tv_sec = static_cast<long>( ConnectionInfo.nSyncTimeout) ;
TimeOutConnection.tv_usec = static_cast<long>( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ;
// Imposto il contesto per connesione sincrona con TimeOut
SyncRedisClient.pRedisContext = redisConnectWithTimeout( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort, TimeOutConnection) ;
if ( SyncRedisClient.pRedisContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
return false ;
}
else if ( SyncRedisClient.pRedisContext->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str())
SyncRedisClient.Clear() ;
return false ;
}
// TimeOut per richiesta I/O
struct timeval TimeOutIO{} ;
TimeOutIO.tv_sec = static_cast<long>( ConnectionInfo.nSyncTimeout) ;
TimeOutIO.tv_usec = static_cast<long>( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ;
// Imposto TimeOut per I/O
int nResult = redisSetTimeout( SyncRedisClient.pRedisContext, TimeOutIO) ;
if ( nResult != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : Can't set timeout I/O") ;
return false ;
}
}
// Verifico se la Connessione è di tipo Sentinel
redisReply* replySentinel = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SENTINEL get-master-addr-by-name %s",
ConnectionInfo.sServiceName.c_str()) ;
if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) {
if ( replySentinel != nullptr)
freeReplyObject( replySentinel) ;
}
else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr &&
replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) {
// --- Nodo sentinella
string sMasterHost = replySentinel->element[0]->str ;
int nMasterPort = DEFAULT_SENTINEL_MASTER_PORT ;
FromString( replySentinel->element[1]->str, nMasterPort) ;
freeReplyObject( replySentinel) ;
// Effettuo la connessione al master redis
freeReplyObject( SyncRedisClient.pRedisContext) ;
SyncRedisClient.pRedisContext = redisConnect( sMasterHost.c_str(), nMasterPort) ;
if ( SyncRedisClient.pRedisContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
return false ;
}
else if ( SyncRedisClient.pRedisContext->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str())
SyncRedisClient.Clear() ;
return false ;
}
}
LOG_INFO( GetCmdLogger(), "Connected to Redis !")
// Verifico se richiesta autenticazione
if ( ! ConnectionInfo.sPassword.empty()) {
redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "AUTH %s %s",
ConnectionInfo.sUser.c_str(), ConnectionInfo.sPassword.c_str()) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
LOG_INFO( GetCmdLogger(), "Error : Authentication failed") ;
if ( reply == nullptr)
freeReplyObject( reply) ;
SyncRedisClient.Clear() ;
return false ;
}
freeReplyObject( reply) ;
}
LOG_INFO( GetCmdLogger(), "Authenticated to Redis !")
// Seleziono il DataBase
redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SELECT %d", ConnectionInfo.nDefaultDataBase) ;
if ( reply == nullptr) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) ;
return false ;
}
else if ( reply->type == REDIS_REPLY_ERROR) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + reply->str).c_str())
freeReplyObject( reply) ;
SyncRedisClient.Clear() ;
return false ;
}
freeReplyObject( reply) ;
LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ;
// Restituisco l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua)
nIdConnection = nMyIdConnection + 1 ;
return true ;
}
//----------------------------------------------------------------------------
// Funzione per comando DISCONNECT Sincrono
// ---------------------------------------------------------------------------
bool
ExeRedisDisconnect( int nIdConnection)
{
// Verifico che l'Id di connesione sia valido
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
if ( s_vRedisClients[nMyIdConnection].bFreeConnection) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Pulisco la connessione corrente
s_vRedisClients[nMyIdConnection].Clear() ;
LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ;
return true ;
}
//----------------------------------------------------------------------------
// Funzione per comando SET Sincrono ( sola scrittura {key:string, val:string})
//----------------------------------------------------------------------------
bool
ExeRedisSetValFromKey( int nIdConnection, const string& sKey, const string& sVal)
{
// Verifico che l'Id di connesione sia valido
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
if ( s_vRedisClients[nMyIdConnection].bFreeConnection) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Recupero la connessione corrente
RedisSync& RedisClient = s_vRedisClients[nMyIdConnection] ;
// Il valore associato alla chiave può essere solo di tipo stringa
redisReply* reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "SET %s %s", sKey.c_str(), sVal.c_str()) ;
if ( reply == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Null reply") ;
return false ;
}
else if ( reply->type != REDIS_REPLY_STATUS || string( reply->str ) != "OK") {
LOG_INFO( GetCmdLogger(), "Error SET : No answer") ;
freeReplyObject( reply) ;
return false ;
}
freeReplyObject( reply) ;
LOG_INFO( GetCmdLogger(), ( string{ "New Key Set : ["} + sKey + ":" + sVal + "]").c_str())
return true ;
}
//----------------------------------------------------------------------------
// Funzione per comando Get Sincrono ( sola lettura {key:string, val:string})
//----------------------------------------------------------------------------
bool
ExeRedisGetValFromKey( int nIdConnection, const string& sKey, string& sVal)
{
sVal.clear() ;
// Verifico che l'Id di connesione sia valido
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
if ( s_vRedisClients[nMyIdConnection].bFreeConnection) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Recupero la connessione corrente
RedisSync& RedisClient = s_vRedisClients[nMyIdConnection] ;
// Recupero il tipo associato alla chiave da Redis
redisReply* typeReply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "TYPE %s", sKey.c_str()) ;
if ( typeReply == nullptr || typeReply->type != REDIS_REPLY_STATUS) {
string sErr = ( typeReply != nullptr ? string( typeReply->str) : "null reply") ;
if ( typeReply != nullptr)
freeReplyObject( typeReply) ;
LOG_INFO( GetCmdLogger(), ( "Error : Failed to get key type, " + sErr).c_str()) ;
return false ;
}
string sType = typeReply->str ;
freeReplyObject( typeReply) ;
// Determino il formato per la chiamata al DataBase Redis
// Per ora accettate solamente valori string
string sFormat = ( sType == KEY_TYPE_STRING ? "Get %s" : "") ;
if ( sFormat.empty()) {
LOG_ERROR( GetCmdLogger(), "Error : Not valid Key Type") ;
return false ;
}
// Effettuo la chiamata
redisReply* reply = nullptr ;
reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, sFormat.c_str(), sKey.c_str()) ;
if ( reply == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Null reply") ;
return false ;
}
// Recupero il Valore
if ( sType == KEY_TYPE_STRING) {
if ( reply->type != REDIS_REPLY_STRING) {
LOG_INFO( GetCmdLogger(), "Error : Failed to read the value") ;
freeReplyObject( reply) ;
return false ;
}
sVal = string( reply->str) ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Key Read : ["} + sKey + ":" + sVal + "]").c_str())
freeReplyObject( reply) ;
return true ;
}
// ---------------------------------------------------------------------------
// [CB] Funzione di CallBack per comando SELECT Asincrono ( Selezione DB)
// ---------------------------------------------------------------------------
static void
RedisDbSElectionCallBack( redisAsyncContext* ctx, void* r, void*)
{
// Invio la richiesta
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
LOG_INFO( GetCmdLogger(), ( string{ "Error : Db Selection -> "} + sErrMsg).c_str()) ;
return ;
}
// Imposto flag di Autenticazione
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// Selezione riuscita
MyStatus->bSelectedDB.store( true, memory_order_release) ;
MyStatus->bSelectedDB.notify_one() ;
}
//----------------------------------------------------------------------------
// [CB] Funzione di CallBack per comando AUTH Asincrono ( Autenticazione)
// ---------------------------------------------------------------------------
static void
RedisAuthenticationCallBack( redisAsyncContext* ctx, void* r, void*)
{
// Invio la richiesta
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
LOG_INFO( GetCmdLogger(), ( string{ "Error : Authentication -> "} + sErrMsg).c_str()) ;
return ;
}
// Imposto flag di Autenticazione
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// Autenticazione riuscita
MyStatus->bAuthenticated.store( true, memory_order_release) ;
MyStatus->bAuthenticated.notify_one() ;
}
// ---------------------------------------------------------------------------
// [CB] Funzione di CallBack per comandi PUBLISH, SUBSCRIBE, MESSAGE, UNSUBSCRIBE
// Asincroni
// ---------------------------------------------------------------------------
static void
RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*)
{
// Verifico che il contesto sia definito
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack")
return ;
}
// Recupero della risposta dal server redis
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr)
return ;
LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked")
// Caso PUBLISH: hiredis risponde con REDIS_REPLY_INTEGER (numero di subscribers)
if ( reply->type == REDIS_REPLY_INTEGER) {
// Recupero lo Stato
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// Imposto stato di pubblicazione
MyStatus->bPublished.store( true, memory_order_release) ;
MyStatus->bPublished.notify_one() ;
// Recupero il numero di clients iscritti al canale
LOG_INFO( GetCmdLogger(), ( string{ "Message published to "} + ToString( reply->integer) + " clients !").c_str()) ;
return ;
}
// Caso ARRAY: Subscribe / Unsubscribe / Message
else if ( reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) {
// Recupero la tipologia di chiamata effettuata
const char* msgType = ( reply->element[0] != nullptr && reply->element[0]->str != nullptr ?
reply->element[0]->str : "") ;
// --- Se Messaggio
if ( strcmp( msgType, "message") == 0 && reply->elements == 3) {
// Messaggio ricevuto da un canale
if ( reply->element[1] != nullptr && reply->element[1]->str != nullptr &&
reply->element[2] != nullptr && reply->element[2]->str != nullptr) {
string sChannel{ reply->element[1]->str, reply->element[1]->len} ;
string sMessage{ reply->element[2]->str, reply->element[2]->len} ;
#if DEBUG
LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " + sMessage).c_str())
#endif
// Recupero lo Stato
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// Aggiorno la Mappa dei Messaggi ( viene sempre sovrascritto all'ultimo)
string sKey = GetMessageMapKey( MyStatus->nIdConnection, sChannel) ;
auto Iter = s_vAsyncRedisMessages.find( sKey) ;
if ( Iter != s_vAsyncRedisMessages.end()) {
RedisAsyncMessage& LockedMessage = Iter->second ;
while ( LockedMessage.Lock.test_and_set( memory_order_acquire))
LockedMessage.Lock.wait( true, memory_order_relaxed) ;
++ LockedMessage.nCount ;
LockedMessage.sMessage = sMessage ;
LockedMessage.Lock.clear( memory_order_release) ;
LockedMessage.Lock.notify_one() ;
}
}
else {
LOG_INFO( GetCmdLogger(), "Errror : Invalid Message received")
return ;
}
}
// --- Se SUBSCRIBE
else if ( strcmp( msgType, "subscribe") == 0 && reply->elements >= 3) {
if ( reply->element[1] != nullptr && reply->element[1] != nullptr &&
reply->element[2] != nullptr) {
// Recupero lo Stato
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// Imposto stato di Subscribe
MyStatus->bSubscribed.store( true, memory_order_release) ;
MyStatus->bSubscribed.notify_one() ;
// Comunico il risultato
string sChannel = reply->element[1]->str ;
int nCount = reply->element[2]->integer ;
LOG_INFO( GetCmdLogger(), ( string{ "Subscribed to ["} + sChannel + "]," +
" total subscribtions : " + ToString( nCount)).c_str()) ;
}
else {
LOG_INFO( GetCmdLogger(), "Errror : Invalid Subscription reply")
return ;
}
}
// --- se Unsubscribe
else if ( strcmp( msgType, "unsubscribe") == 0 && reply->elements >= 3) {
if ( reply->element[1] != nullptr && reply->element[1] != nullptr &&
reply->element[2] != nullptr) {
// Recupero lo Stato
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// Imposto stato Unsubscribe
MyStatus->bUnsubscribed.store( true, memory_order_release) ;
MyStatus->bUnsubscribed.notify_one() ;
// Comunico il risultato
string sChannel = reply->element[1]->str ;
int nCount = reply->element[2]->integer ;
LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribed to ["} + sChannel + "]," +
" subscriptions left : " + ToString( nCount)).c_str())
}
else {
LOG_INFO( GetCmdLogger(), "Errror : Invalid Unsubscription reply")
return ;
}
}
// --- Undefined
else {
LOG_INFO( GetCmdLogger(), "Error : Undefined reply in CallBack from Redis")
return ;
}
}
return ;
}
// ---------------------------------------------------------------------------
// [CB] Funzione di CallBack per ricezione MESSAGE Asincrona
// ---------------------------------------------------------------------------
static void
WaitMessageCallback( redisAsyncContext* ctx, void* r, void*)
{
// Verifico che il contesto sia definito
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack")
return ;
}
LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked")
// Recupero della risposta dal server redis
redisReply* reply = static_cast<redisReply*>( r) ;
if ( reply == nullptr)
return ;
if ( reply->type != REDIS_REPLY_ARRAY || reply->elements < 3)
LOG_INFO( GetCmdLogger(), "Error : Invalid Message CallBack reply")
else {
// Recupero il Messaggio ( se callBack riferita al tipo "message")
if ( reply->element[0] == nullptr || reply->element[0]->str == nullptr)
LOG_INFO( GetCmdLogger(), "Error : Null message type received")
else {
const char* msgType = reply->element[0]->str ;
if ( strcmp( msgType, "message") == 0) {
if ( reply->element[2] == nullptr || reply->element[2]->str == nullptr)
LOG_INFO( GetCmdLogger(), "Error : Null message received")
else {
// Recupero lo Stato
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// Imposto stato di pubblicazione
MyStatus->bMessage = true ;
MyStatus->sMessage = reply->element[2]->str ;
}
}
}
}
}
// ---------------------------------------------------------------------------
// [CB] Funzione di CallBack per comando CONNECT Asincrono
// ---------------------------------------------------------------------------
static void
RedisConnectCallBack( const redisAsyncContext* ctx, int nStatus)
{
// Verifico che il Contesto sia valido
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack failed ") ;
return ;
}
// Recupero dal Contesto lo stato della connessione
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// Verifico che lo stato internno al contesto Redis sia OK
if ( nStatus != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : Redis Async connection failed") ;
MyStatus->bAlive.store( false, memory_order_release) ;
MyStatus->bConnected.store( false, memory_order_release) ;
MyStatus->bConnected.notify_one() ;
return ;
}
// Connessione riuscita
MyStatus->bAlive.store( true, memory_order_release) ;
MyStatus->bConnected.store( true, memory_order_release) ;
MyStatus->bConnected.notify_one() ;
}
// ---------------------------------------------------------------------------
// [CB] Funzione di CallBack per comando DISCONNECT Asincrono
// ---------------------------------------------------------------------------
static void
RedisDisconnectCallBack( const redisAsyncContext* ctx, int nStatus)
{
// Verifico che il Contesto sia valido
if ( ctx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Null Context for Disconnetion CallBack ") ;
return ;
}
// Recupero lo stato della connessione
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
if ( MyStatus == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
return ;
}
// nStatus non è affidabile in questo contesto, può essere != REDIS_OK anche in disconnessione normale
// NB. nStatus è solo informativo, ignora tutte le disconnessioni reali
LOG_INFO( GetCmdLogger(), "Redis Async Disconnected ( x2, called for PUB and SUB)") ;
// Aggiorno gli stati ed eventuali thread in attesa
MyStatus->bAlive.store( false, memory_order_release) ;
MyStatus->bConnected.store( false, memory_order_release) ;
MyStatus->bConnected.notify_one() ;
MyStatus->bDisconnected.store( true, memory_order_release) ;
MyStatus->bDisconnected.notify_one() ;
}
//----------------------------------------------------------------------------
// Funzione per comando CONNECT Asincrono
// ---------------------------------------------------------------------------
bool
ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection)
{
// Inizializzo una nuova connessione, recuperando l'Id
int nMyIdConnection = NO_CONNECTION ;
for ( int i = 0 ; i < int( s_vAsyncRedisClients.size()) ; ++ i) {
if ( s_vAsyncRedisClients[i].m_bFreeConnection.load( memory_order_acquire)) {
nMyIdConnection = i ;
break ;
}
}
if ( nMyIdConnection == NO_CONNECTION) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} +
ToString( MAX_CONNECTION_NBR) + " )").c_str())
return false ;
}
// Recupero lo Slot
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
AsyncRedisClient.Clear() ; // per possibile riconnessione
AsyncRedisClient.m_bFreeConnection.store( false, memory_order_release) ;
// Recupero i riferimenti necessari e blocco la connessione corrente
RedisConnectionInfo& ConnectionInfo = AsyncRedisClient.m_ConnectionInfo ;
// Recupero i parametri dalla stringa di connessione
AsyncRedisClient.m_sConnectionString = sConnection ; // per riconnessione
if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) {
AsyncRedisClient.Clear() ;
return false ;
}
// Inizializzazione libreria Winsock, necessaria per usare le funzionalità di rete su Windows,
// come socket TCP/IP. Versione utilizzata 2.2
int nStatus = WSAStartup( MAKEWORD( 2, 2), &AsyncRedisClient.m_wsaData) ;
if ( nStatus != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : WSAStartup -> "} + ToString( nStatus)).c_str()) ;
AsyncRedisClient.Clear() ;
return false ;
}
// Verifico se si tratta di un nodo Sentinella
redisContext* sentinelCtx = redisConnect( AsyncRedisClient.m_ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
if ( sentinelCtx == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
AsyncRedisClient.Clear() ;
return false ;
}
else if ( sentinelCtx->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + sentinelCtx->errstr).c_str())
redisFree( sentinelCtx) ;
AsyncRedisClient.Clear() ;
return false ;
}
redisReply* replySentinel = ( redisReply*)redisCommand( sentinelCtx, "SENTINEL get-master-addr-by-name %s",
ConnectionInfo.sServiceName.c_str()) ;
if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) {
if ( replySentinel != nullptr)
freeReplyObject( replySentinel) ;
}
else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr &&
replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) {
// --- Nodo sentinella
ConnectionInfo.sHost = replySentinel->element[0]->str ;
ConnectionInfo.nPort = DEFAULT_SENTINEL_MASTER_PORT ;
FromString( replySentinel->element[1]->str, ConnectionInfo.nPort) ;
freeReplyObject( replySentinel) ;
redisFree( sentinelCtx) ;
}
// Definisco le 2 connessioni asincroni con Host e Porta ( per PUBLISH e SUBSCRIBE/UNSUBSCRIBE)
AsyncRedisClient.m_pRedisAsyncPubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
AsyncRedisClient.m_pRedisAsyncSubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
if ( AsyncRedisClient.m_pRedisAsyncPubContext == nullptr || AsyncRedisClient.m_pRedisAsyncSubContext == nullptr) {
LOG_INFO( GetCmdLogger(), "Error : Can't allocate Redis Async context")
AsyncRedisClient.Clear() ;
return false ;
}
else if ( AsyncRedisClient.m_pRedisAsyncPubContext->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncPubContext->errstr).c_str())
AsyncRedisClient.Clear() ;
return false ;
}
else if ( AsyncRedisClient.m_pRedisAsyncSubContext->err != 0) {
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncSubContext->errstr).c_str())
AsyncRedisClient.Clear() ;
return false ;
}
// Definizione e partenza del ciclo degli eventi in un thread separato ( in BackGround)
AsyncRedisClient.m_ConnectionPubStatus.Clear() ;
AsyncRedisClient.m_ConnectionSubStatus.Clear() ;
AsyncRedisClient.m_bPubLoopRunning.store( true, memory_order_release) ;
AsyncRedisClient.m_bSubLoopRunning.store( true, memory_order_release) ;
AsyncRedisClient.m_PubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, true) ;
AsyncRedisClient.m_SubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, false) ;
// Imposto e Definisco la funzione di CallBack per contesto PUB/SUB per Connessione/Disconnessione asincrona
AsyncRedisClient.m_pRedisAsyncPubContext->data = &AsyncRedisClient.m_ConnectionPubStatus ;
AsyncRedisClient.m_pRedisAsyncSubContext->data = &AsyncRedisClient.m_ConnectionSubStatus ;
AsyncRedisClient.m_ConnectionPubStatus.bConnected.store( false, memory_order_release) ;
AsyncRedisClient.m_ConnectionSubStatus.bConnected.store( false, memory_order_release) ;
redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisConnectCallBack) ;
redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisConnectCallBack) ;
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDisconnectCallBack) ;
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDisconnectCallBack) ;
// Attendo la risposta di CallBack
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bConnected.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bConnected.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
LOG_INFO( GetCmdLogger(), "Connected to Redis !")
#if DEBUG
double dMsConnectionTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsConnectionTime) + " ms").c_str())
#endif
// Se presente una Password, eseguo l'autenticazione
if ( ! AsyncRedisClient.m_sPassword.empty()) {
AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( false, memory_order_relaxed) ;
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisAuthenticationCallBack,
nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ;
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisAuthenticationCallBack,
nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ;
tStart = chrono::steady_clock::now() ;
tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
LOG_INFO( GetCmdLogger(), "Authenticated to Redis !")
#if DEBUG
double dMsAuthenticationTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsAuthenticationTime) + " ms").c_str())
#endif
}
else {
AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( true, memory_order_release) ;
AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( true, memory_order_release) ;
}
// Se Presente una Selezione del Database, eseguo la selezione
if ( AsyncRedisClient.m_nDataBase != DEFAULT_DATABASE) {
AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( false, memory_order_relaxed) ;
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDbSElectionCallBack,
nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ;
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDbSElectionCallBack,
nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ;
tStart = chrono::steady_clock::now() ;
tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ;
AsyncRedisClient.Clear() ;
return false ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ;
#if DEBUG
double dMsDbSelectionTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDbSelectionTime) + " ms").c_str())
#endif
}
else {
AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( true, memory_order_release) ;
AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( true, memory_order_release) ;
}
// Restituisco e salvo l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua)
nIdConnection = nMyIdConnection + 1 ;
AsyncRedisClient.m_ConnectionPubStatus.nIdConnection = nIdConnection ;
AsyncRedisClient.m_ConnectionSubStatus.nIdConnection = nIdConnection ;
return true ;
}
//----------------------------------------------------------------------------
// Funzione per comando DISCONNECT Asincrono
// ---------------------------------------------------------------------------
bool
ExeRedisAsyncDisconnect( int nIdConnection)
{
// Verifico che l'Id di connesione sia valido
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Recupero la connessione
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
// Verifico che la connessione sia utilizzata, e non libera
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Reset dei Flag prima della connessione
AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionPubStatus.bAlive.store( false, memory_order_relaxed) ;
AsyncRedisClient.m_ConnectionSubStatus.bAlive.store( false, memory_order_relaxed) ;
// Effettuo la Disconnesione asincrona ( libera la memoria in automatico)
redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncPubContext) ;
redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncSubContext) ;
// Attendo la CallBack di disconnessione
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.load( memory_order_acquire)) {
AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.wait( false) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
return false ;
}
}
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.load( memory_order_acquire)) {
AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.wait( false) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
return false ;
}
}
// Rendo libera la connessione
AsyncRedisClient.m_bFreeConnection.store( true) ;
AsyncRedisClient.Clear() ;
// Elimino i Messaggi dalla mappa
for ( auto Iter = s_vAsyncRedisMessages.begin() ; Iter != s_vAsyncRedisMessages.end() ; ) {
if ( Iter->first.starts_with( GetMessageMapKey( nIdConnection, "")))
Iter = s_vAsyncRedisMessages.erase( Iter) ;
else
++ Iter ;
}
LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ;
#if DEBUG
double dMsDisconnectionTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDisconnectionTime) + " ms").c_str())
#endif
return true ;
}
// ---------------------------------------------------------------------------
// Funzione per comando PUBLISH Asincrono
// ---------------------------------------------------------------------------
bool
ExeRedisAsyncPublish( int nIdConnection, const string& sChannel, const string& sMessage)
{
// Verifico che l'Id di connesione sia valido
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Se non ho alcun canale -> errore
if ( sChannel.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
return false ;
}
// Se non ho alcun messaggio -> warning
if ( sMessage.empty())
LOG_INFO( GetCmdLogger(), "Warning : Empty Message") ;
// Numero massimo di Tentativi per effettuare PUBLISH
const int MAX_RETRY = 3 ;
for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) {
// Rileggo il Client, dato che potrebbe essere cambiato dopo la riconnessione
RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& curStatus = currClient.m_ConnectionPubStatus ;
// Se la connessione non è attiva, provo a riconnetermi
if ( ! currClient.m_ConnectionPubStatus.bAlive.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), ( string{ "Publish: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ;
// Recupero la stringa di connessione corrente
string sConnStr = s_vAsyncRedisClients[nMyIdConnection].m_sConnectionString ;
// Recupero i Canali su cui ero iscritto
unordered_set<string> setSubChannels = currClient.m_set_SubChannels ;
// Pulisco il contesto corrente
AsyncRedisClient.Clear() ;
int nNewId = 0 ;
bool bOkReconnect = false ;
if ( ExeRedisAsyncConnect( sConnStr, nNewId)) {
// Se ho scelto lo stesso slot, aggiorno nMyIdConnection
int nNewIndex = nNewId - 1 ;
if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) {
// Sostituisco il riferimento Client con il nuovo
if ( nNewIndex != nMyIdConnection)
nMyIdConnection = nNewIndex ;
}
bOkReconnect = true ;
// Mi iscrivo a tutti i canali a cui ero precedentemente iscritto
for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) {
if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter))
return false ;
}
}
else {
LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ;
// riprovo al ciclo successivo...
}
// Se non mi sono riconesso metto un piccolo Delay per estrema sicureza
if ( ! bOkReconnect) {
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
}
// Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione)
RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& nextStatus = nextClient.m_ConnectionPubStatus ;
// Ricalcolo timeout dal client aggiornato
auto tTimeOut = chrono::milliseconds( static_cast<int>( nextClient.m_ConnectionInfo.nAsyncTimeout)) ;
// Reset dello stato di pubblicazione
nextStatus.bPublished.store( false, memory_order_relaxed) ;
// Invio del comando PUBLISH
int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncPubContext, RedisPubSubUnsubMsgCallback, nullptr,
"PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) ;
if ( nResult != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Publish: redisAsyncCommand failed") ;
nextStatus.bAlive.store( false, memory_order_release) ;
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
auto tStart = chrono::steady_clock::now() ;
while ( ! nextStatus.bPublished.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ;
break ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
if ( nextStatus.bPublished.load( memory_order_acquire)) {
#if DEBUG
double dMsPublishTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ;
#endif
return true ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Publish failed, attempt "} + ToString( nAttempt)).c_str()) ;
// Breve pausa prima del Retry
this_thread::sleep_for( chrono::milliseconds( 100)) ;
}
// Tutti i tentativi sono falliti
return false ;
}
//----------------------------------------------------------------------------
// Funzione per comando SUBSCRIBE Asincrono
// ---------------------------------------------------------------------------
bool
ExeRedisAsyncSubscribe( int nIdConnection, const string& sChannel)
{
// Verifico che l'Id di connesione sia valido
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Se non ho alcun canale, errore
if ( sChannel.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") ;
return false ;
}
// Numero massimo di Tentativi per effettuare SUBSCRIBE
const int MAX_RETRY = 3 ;
for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) {
// Rileggo il client e lo stato (potrebbero cambiare dopo reconnect)
RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ;
// Se la connessione non è viva, provo a riconnettere
if ( ! currStatus.bAlive.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), ( string{ "Subscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ;
// Recupero la stringa di connessione corrente
string sConnStr = currClient.m_sConnectionString ;
// Recupero i Canali su cui ero iscritto
unordered_set<string> setSubChannels = currClient.m_set_SubChannels ;
// Pulisco il contesto corrente
AsyncRedisClient.Clear() ;
int nNewId = 0 ;
bool bOkReconnect = false ;
if ( ExeRedisAsyncConnect( sConnStr, nNewId)) {
// Se ho scelto lo stesso slot, aggiorno nMyIdConnection
int nNewIndex = nNewId - 1 ;
if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) {
// Sostituisco il riferimento Client con il nuovo
if ( nNewIndex != nMyIdConnection)
nMyIdConnection = nNewIndex ;
}
bOkReconnect = true ;
// Mi iscrivo a tutti i canali a cui ero precedentemente iscritto
for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) {
if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter))
return false ;
}
}
else {
LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ;
// riprovo al ciclo successivo...
}
// Se non mi sono riconesso metto un piccolo Delay per estrema sicureza
if ( ! bOkReconnect) {
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
}
// Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione)
RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ;
// Ricalcolo timeout dal client aggiornato
auto tTimeOut = chrono::milliseconds( static_cast<int>( nextClient.m_ConnectionInfo.nAsyncTimeout)) ;
// Reset dello stato di pubblicazione
nextStatus.bSubscribed.store( false, memory_order_relaxed) ;
int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
"SUBSCRIBE %s", sChannel.c_str()) ;
if ( nResult != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Subscribe: redisAsyncCommand failed") ;
nextStatus.bAlive.store( false, memory_order_release) ;
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
auto tStart = chrono::steady_clock::now() ;
while ( ! nextStatus.bSubscribed.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ;
break ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
if ( nextStatus.bSubscribed.load( memory_order_acquire)) {
// Uso l'ID corrente ( aggiornato) per la chiave messaggi
int nEffectiveId = nMyIdConnection + 1 ;
string sKey = GetMessageMapKey( nEffectiveId, sChannel) ;
s_vAsyncRedisMessages.try_emplace( sKey) ;
s_vAsyncRedisMessages[sKey].Lock.clear() ;
#if DEBUG
double dMsPublishTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ;
#endif
// Memorizzo il canale nella set dei canali
nextClient.m_set_SubChannels.insert( sChannel) ;
return true ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Subscribe failed, attempt "} + ToString( nAttempt)).c_str()) ;
// Breve pausa prima del Retry
this_thread::sleep_for( chrono::milliseconds( 100)) ;
}
// Tutti i tentativi sono falliti
return false ;
}
// ---------------------------------------------------------------------------
// Funzione per comando UNSUBSCRIBE Asincrono
// ---------------------------------------------------------------------------
bool
ExeRedisAsyncUnsubscribe( int nIdConnection, const string& sChannel)
{
// Verifico che l'Id di connesione sia valido
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Se non ho alcun canale, errore
if ( sChannel.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
return false ;
}
const int MAX_RETRY = 3 ;
for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) {
// Rileggo il client e lo stato ( potrebbero cambiare dopo reconnect)
RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ;
// Se la connessione non è viva, provo a riconnettere
if ( ! currStatus.bAlive.load( memory_order_acquire)) {
LOG_INFO( GetCmdLogger(), ( string{ "Unubscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ;
// Recupero la stringa di connessione corrente
string sConnStr = currClient.m_sConnectionString ;
// Recupero i Canali su cui ero iscritto
unordered_set<string> setSubChannels = currClient.m_set_SubChannels ;
// Pulisco il contesto corrente
AsyncRedisClient.Clear() ;
int nNewId = 0 ;
bool bOkReconnect = false ;
if ( ExeRedisAsyncConnect( sConnStr, nNewId)) {
// Se ho scelto lo stesso slot, aggiorno nMyIdConnection
int nNewIndex = nNewId - 1 ;
if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) {
// Sostituisco il riferimento Client con il nuovo
if ( nNewIndex != nMyIdConnection)
nMyIdConnection = nNewIndex ;
}
bOkReconnect = true ;
// Mi iscrivo a tutti i canali a cui ero precedentemente iscritto
for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) {
if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter))
return false ;
}
}
else {
LOG_INFO( GetCmdLogger(), "Unsubscribe: reconnect attempt failed") ;
// riprovo al ciclo successivo...
}
// Se non mi sono riconesso metto un piccolo Delay per estrema sicureza
if ( ! bOkReconnect) {
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
}
// Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione)
RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ;
RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ;
// Ricalcolo timeout dal client aggiornato
auto tTimeOut = chrono::milliseconds( static_cast<int>( nextClient.m_ConnectionInfo.nAsyncTimeout)) ;
// Reset dello stato di pubblicazione
nextStatus.bUnsubscribed.store( false, memory_order_relaxed) ;
int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
"UNSUBSCRIBE %s", sChannel.c_str()) ;
if ( nResult != REDIS_OK) {
LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed") ;
nextStatus.bAlive.store( false, memory_order_release) ;
this_thread::sleep_for( chrono::milliseconds( 100)) ;
continue ;
}
auto tStart = chrono::steady_clock::now() ;
while ( ! nextStatus.bUnsubscribed.load( memory_order_acquire)) {
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), "Unsubscribe: timeout waiting for publish reply") ;
break ;
}
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
this_thread::sleep_for( chrono::milliseconds( 10)) ;
}
if ( nextStatus.bUnsubscribed.load( memory_order_acquire)) {
// Rimuovo la chiave dalla mappa messaggi usando l'ID effettivo (aggiornato)
int nEffectiveId = nMyIdConnection + 1 ;
string keyToRemove = GetMessageMapKey( nEffectiveId, sChannel) ;
for ( auto it = s_vAsyncRedisMessages.begin() ; it != s_vAsyncRedisMessages.end() ; ) {
if ( it->first == keyToRemove)
it = s_vAsyncRedisMessages.erase( it) ;
else
++ it ;
}
#if DEBUG
double dMsUnsubscribeTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsUnsubscribeTime) + " ms").c_str()) ;
#endif
// Tolgo il canale dalla lista
nextClient.m_set_SubChannels.erase( sChannel) ;
return true ;
}
LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribe failed, attempt "} + ToString( nAttempt)).c_str()) ;
// Breve pausa prima del Retry
this_thread::sleep_for( chrono::milliseconds( 100)) ;
}
// Tutti i tentativi sono falliti
return false ;
}
// ---------------------------------------------------------------------------
// La funzione esegue una Subscribe e una Unsubscribe
// Se entro il tempo stabilito riceve un messaggio, lo comunica, altrimenti errore
// ---------------------------------------------------------------------------
bool
ExeRedisAsyncSubscribeOneMessage( int nIdConnection, const string& sChannel, double dMaxTimeOut,
string& sMessage)
{
// Verifico che l'Id di connesione sia valido
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Se non ho alcun canale, errore
if ( sChannel.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
return false ;
}
// Controllo validità tempo di TimeOut
if ( dMaxTimeOut <= 0.) {
LOG_INFO( GetCmdLogger(), "Error : Invalid timeout value") ;
return false ;
}
// Recupero la connessione
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
// Invio del comando di SUBSCRIBE
AsyncRedisClient.m_ConnectionSubStatus.bMessage = false ;
AsyncRedisClient.m_ConnectionSubStatus.sMessage = "" ;
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr,
( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to subscribe to"} + sChannel).c_str())
return false ;
}
// Attivo il TimeOut per l'attesa del messaggio
auto tStart = chrono::steady_clock::now() ;
auto tTimeOut = chrono::milliseconds( static_cast<int>( dMaxTimeOut)) ;
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bMessage) {
this_thread::sleep_for( chrono::milliseconds( 10)) ;
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
LOG_INFO( GetCmdLogger(), ( string{ "Timeout : No Message received on ["} + sChannel + "]").c_str())
return false ;
}
}
// Invio del comando UNSUBSCRIBE
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr,
( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to Unsubscribe to "} + sChannel).c_str())
return false ;
}
// Controllo se ho ricevuto un messaggio
if ( AsyncRedisClient.m_ConnectionSubStatus.bMessage) {
sMessage = AsyncRedisClient.m_ConnectionSubStatus.sMessage ;
LOG_INFO( GetCmdLogger(), ( string{ "Message received on ["} + sChannel + "]: " + sMessage).c_str())
return true ;
}
#if DEBUG
LOG_INFO( GetCmdLogger(), ( string{ "Timeout: No message received on ["} + sChannel + "]").c_str()) ;
#endif
return false ;
}
// ---------------------------------------------------------------------------
// Funzione per lettura Asicnrona dell'ultimo messaggio su un canale
// ---------------------------------------------------------------------------
bool
ExeRedisAsyncGetMessage( int nIdConnection, const string& sChannel, int& nCount, string& sMessage)
{
// Verifico che l'Id di connesione sia valido
int nMyIdConnection = nIdConnection - 1 ;
if ( ! CheckIdConnection( nMyIdConnection))
return false ;
// Verifico che la connessione sia attiva
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
return false ;
}
// Se non ho alcun canale, errore
if ( sChannel.empty()) {
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
return false ;
}
// Recupero la Chiave di accesso alla Mappa dei Messaggi
string sKey = GetMessageMapKey( nIdConnection, sChannel) ;
auto Iter = s_vAsyncRedisMessages.find( sKey) ;
if ( Iter == s_vAsyncRedisMessages.end()) {
LOG_INFO( GetCmdLogger(), "Error : Connection not Subscribed to this Channel") ;
return false ;
}
else {
RedisAsyncMessage& LockedMessage = Iter->second ;
while ( LockedMessage.Lock.test_and_set( memory_order_acquire))
LockedMessage.Lock.wait( true, memory_order_relaxed) ;
nCount = LockedMessage.nCount ;
sMessage = LockedMessage.sMessage ;
LockedMessage.Lock.clear( memory_order_release) ;
LockedMessage.Lock.notify_one() ;
}
return true ;
}