9a719b94cf
- in Redis aggiunta gestione per riconnessioni.
1714 lines
73 KiB
C++
1714 lines
73 KiB
C++
//----------------------------------------------------------------------------
|
|
// EgalTech 2025-2025
|
|
//----------------------------------------------------------------------------
|
|
// File : EXE_Redis.cpp Data : 17.09.25 Versione : 2.7i3
|
|
// Contenuto : Funzioni per interfacciarsi con server Redis.
|
|
//
|
|
//
|
|
//
|
|
// Modifiche : 17.09.25 RE Creazione modulo. ( ver. 2.7i3)
|
|
// Modifiche : 31.10.25 RE Aggiunta gestione connessioni multiple. ( ver. 2.7j4)
|
|
// Modifiche : 04.12.25 RE Aggiunta Memorizzazione messaggio. ( ver 2.7l1)
|
|
//
|
|
//
|
|
//----------------------------------------------------------------------------
|
|
|
|
//--------------------------- Include ----------------------------------------
|
|
#include <winsock2.h>
|
|
#include <WS2tcpip.h>
|
|
#include "stdafx.h"
|
|
#include "EXE.h"
|
|
#include "EXE_Macro.h"
|
|
#include "AuxTools.h"
|
|
#include "/EgtDev/Include/EXeExecutor.h"
|
|
#include "/EgtDev/Include/EGkStringUtils3d.h"
|
|
#include "/EgtDev/Include/EGnStringKeyVal.h"
|
|
#pragma warning( disable: 4244) // conversione da 'uint64_t' a 'size_t'
|
|
#pragma warning( disable: 4200) // utilizzata estensione non standard: matrice di dimensioni zero in struct/union
|
|
#include "/EgtDev/Extern/hiredis/Include/async.h"
|
|
#include <thread>
|
|
#include <atomic>
|
|
#include <map>
|
|
#include <sstream>
|
|
#include <future>
|
|
|
|
using namespace std ;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Ambiente Debug
|
|
// ---------------------------------------------------------------------------
|
|
#define DEBUG 0
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Definizione variabili generali Redis
|
|
// ---------------------------------------------------------------------------
|
|
static const int REDIS_MIN_DB = 0 ;
|
|
static const int REDIS_MAX_DB = 15 ;
|
|
static const int DEFAULT_PORT = 6379 ;
|
|
static const int DEFAULT_SENTINEL_MASTER_PORT = 26379 ;
|
|
static const string DEFAULT_USER = "default" ;
|
|
static const int DEFAULT_DATABASE = 0 ;
|
|
static const int DEFAULT_TIMEOUT = 15000 ; // [ms]
|
|
static const int DEFAULT_ALIVE_TIME = 180 ; // [s]
|
|
static const int MAX_CONNECTION_NBR = 10 ;
|
|
static const int NO_CONNECTION = - 1 ;
|
|
static const string KEY_TYPE_STRING = "string" ;
|
|
static const string KEY_TYPE_LIST = "list" ; // non usato
|
|
static const string KEY_TYPE_SET = "set" ; // non usato
|
|
static const string KEY_TYPE_HASH = "hash" ; // non usato
|
|
static const string KEY_TYPE_ZSET = "zset" ; // non usato
|
|
static const string KEY_TYPE_JSON = "ReJSON-RL" ; // non usato, disponibile se versione >= 8 ( noi usiamo la 6)
|
|
|
|
// Struttura per parametri della stringa di Connessione
|
|
struct RedisConnectionInfo {
|
|
|
|
string sHost, sServiceName, sUser, sPassword ;
|
|
int nPort, nDefaultDataBase, nKeepAlive, nConnectTimeout, nSyncTimeout, nAsyncTimeout ;
|
|
bool bAbortConnect, bSsl, bAllowAdmin ;
|
|
|
|
RedisConnectionInfo() {
|
|
sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ;
|
|
nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ;
|
|
nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ;
|
|
bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ;
|
|
} ;
|
|
|
|
RedisConnectionInfo( string sMyHost, string sMyServiceName, string sMyUser, string sMyPassword,
|
|
int nMyConnection, int nMyPort, int nMyDefaultDataBase, int dMyKeepAlive,
|
|
int dMyConnectTimeout, int dMySyncTimeout, int dMyAsyncTimeout,
|
|
bool bMyAbortConnect, bool bMySsl, bool bMyAllowAdmin)
|
|
: sHost( sMyHost), sServiceName( sMyServiceName), sUser( sMyUser), sPassword( sMyPassword),
|
|
nPort( nMyPort), nDefaultDataBase( nMyDefaultDataBase), nKeepAlive( dMyKeepAlive),
|
|
nConnectTimeout( dMyConnectTimeout), nSyncTimeout( dMySyncTimeout),
|
|
nAsyncTimeout( dMyAsyncTimeout), bAbortConnect( bMyAbortConnect), bSsl( bMySsl),
|
|
bAllowAdmin( bMyAllowAdmin) {}
|
|
|
|
inline void Clear() {
|
|
sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ;
|
|
nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ;
|
|
nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ;
|
|
bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ;
|
|
}
|
|
|
|
} ;
|
|
|
|
// Struttura per Connessiona Sincrona
|
|
struct RedisSync {
|
|
|
|
atomic<bool> bFreeConnection ;
|
|
redisContext* pRedisContext ;
|
|
RedisConnectionInfo ConnectionInfo ;
|
|
|
|
RedisSync()
|
|
: bFreeConnection( true), pRedisContext( nullptr), ConnectionInfo() {}
|
|
|
|
inline void Clear() {
|
|
bFreeConnection = true ;
|
|
redisFree( pRedisContext) ;
|
|
pRedisContext = nullptr ;
|
|
ConnectionInfo.Clear() ;
|
|
}
|
|
|
|
} ;
|
|
static array<RedisSync, MAX_CONNECTION_NBR> s_vRedisClients ;
|
|
|
|
// Struttura per stato di Connessione Asincrona
|
|
struct RedisConnectionStatus {
|
|
|
|
atomic<int> nIdConnection ;
|
|
atomic<bool> bConnected ;
|
|
atomic<bool> bAlive ;
|
|
atomic<bool> bAuthenticated ;
|
|
atomic<bool> bSelectedDB ;
|
|
atomic<bool> bDisconnected ;
|
|
atomic<bool> bPublished ;
|
|
atomic<bool> bSubscribed ;
|
|
atomic<bool> bUnsubscribed ;
|
|
atomic<bool> bMessage ;
|
|
string sMessage ; // univoco grazie a bMessage
|
|
|
|
RedisConnectionStatus()
|
|
: nIdConnection( 0),
|
|
bConnected( false), bAlive( false), bAuthenticated( false), bSelectedDB( false), bDisconnected( false),
|
|
bPublished( false), bSubscribed( false), bUnsubscribed( false), bMessage( false),
|
|
sMessage( "") {}
|
|
|
|
inline void Clear() {
|
|
nIdConnection = 0 ;
|
|
bConnected = false ;
|
|
bAlive = false ;
|
|
bAuthenticated = false ;
|
|
bSelectedDB = false ;
|
|
bDisconnected = false ;
|
|
bPublished = false ;
|
|
bSubscribed = false ;
|
|
bUnsubscribed = false ;
|
|
bMessage = false ;
|
|
sMessage = "" ;
|
|
}
|
|
|
|
} ;
|
|
|
|
// Classe per Connessione Asincrona
|
|
class RedisAsync {
|
|
|
|
public :
|
|
atomic<bool> m_bFreeConnection ;
|
|
redisAsyncContext* m_pRedisAsyncPubContext ;
|
|
redisAsyncContext* m_pRedisAsyncSubContext ;
|
|
string m_sConnectionString ;
|
|
unordered_set<string> m_set_SubChannels ;
|
|
int m_nDataBase ;
|
|
string m_sPassword ;
|
|
string m_sUser ;
|
|
atomic<bool> m_bPubLoopRunning ;
|
|
atomic<bool> m_bSubLoopRunning ;
|
|
RedisConnectionInfo m_ConnectionInfo ;
|
|
RedisConnectionStatus m_ConnectionSubStatus ;
|
|
RedisConnectionStatus m_ConnectionPubStatus ;
|
|
thread m_PubThread ;
|
|
thread m_SubThread ;
|
|
WSADATA m_wsaData ;
|
|
|
|
RedisAsync()
|
|
: m_bFreeConnection( true), m_pRedisAsyncPubContext( nullptr), m_pRedisAsyncSubContext( nullptr),
|
|
m_sConnectionString( ""), m_set_SubChannels(), m_nDataBase( REDIS_MIN_DB), m_sPassword( ""), m_sUser( DEFAULT_USER),
|
|
m_bPubLoopRunning( false), m_bSubLoopRunning( false),
|
|
m_ConnectionInfo(), m_ConnectionSubStatus(), m_ConnectionPubStatus() {} ;
|
|
// thread è già inizializzato con il suo costruttore di Default valido
|
|
|
|
inline void Clear() {
|
|
|
|
// Loop da terminare
|
|
m_bPubLoopRunning.store( false, memory_order_release) ;
|
|
m_bSubLoopRunning.store( false, memory_order_release) ;
|
|
|
|
// Sveglio i Thread bloccanti
|
|
if ( m_pRedisAsyncPubContext)
|
|
redisAsyncDisconnect( m_pRedisAsyncPubContext) ;
|
|
if ( m_pRedisAsyncSubContext)
|
|
redisAsyncDisconnect( m_pRedisAsyncSubContext) ;
|
|
|
|
// Join dei Thread
|
|
if ( m_PubThread.joinable())
|
|
m_PubThread.join() ;
|
|
if ( m_SubThread.joinable())
|
|
m_SubThread.join() ;
|
|
|
|
// Rilascio dei contesti Redis
|
|
if ( m_pRedisAsyncPubContext) {
|
|
redisAsyncFree( m_pRedisAsyncPubContext) ;
|
|
m_pRedisAsyncPubContext = nullptr ;
|
|
}
|
|
if ( m_pRedisAsyncSubContext) {
|
|
redisAsyncFree( m_pRedisAsyncSubContext) ;
|
|
m_pRedisAsyncSubContext = nullptr ;
|
|
}
|
|
|
|
// Reset dello Stato
|
|
m_bFreeConnection.store( true, memory_order_release) ;
|
|
m_sConnectionString.clear() ;
|
|
m_nDataBase = REDIS_MIN_DB ;
|
|
m_sPassword.clear() ;
|
|
m_sUser = DEFAULT_USER ;
|
|
|
|
m_ConnectionInfo.Clear() ;
|
|
m_ConnectionSubStatus.Clear() ;
|
|
m_ConnectionPubStatus.Clear() ;
|
|
m_set_SubChannels.clear() ;
|
|
}
|
|
|
|
~RedisAsync() {
|
|
if ( m_PubThread.joinable())
|
|
m_PubThread.join() ;
|
|
if ( m_SubThread.joinable())
|
|
m_SubThread.join();
|
|
}
|
|
|
|
public :
|
|
void RedisEventLoop( bool bIsSub) const ;
|
|
|
|
} ;
|
|
static array<RedisAsync, MAX_CONNECTION_NBR> s_vAsyncRedisClients ;
|
|
|
|
// Mappa lettura Messaggi per Modalità Asicnrona
|
|
struct RedisAsyncMessage {
|
|
|
|
atomic_flag Lock = ATOMIC_FLAG_INIT ;
|
|
int nCount = 0 ;
|
|
string sMessage ;
|
|
|
|
} ;
|
|
static unordered_map<string, RedisAsyncMessage> s_vAsyncRedisMessages ;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [Redis] Funzione Ciclo degli eventi per Read/Write su Socket
|
|
// ---------------------------------------------------------------------------
|
|
void
|
|
RedisAsync::RedisEventLoop( bool bIsPub) const
|
|
{
|
|
// Verifica della validità del contesto
|
|
redisAsyncContext* ctx = ( bIsPub ? m_pRedisAsyncPubContext : m_pRedisAsyncSubContext) ;
|
|
if ( ctx == nullptr || ctx->err != 0)
|
|
return ; // se attivazione loop prima della inizializzazione callBack
|
|
|
|
// Recupero del file descriptor del socket TCP usato da Redis.
|
|
SOCKET sock = ctx->c.fd ;
|
|
// Creazione dei due insiemi per il file descriptor ( lettura e scrittura )
|
|
// r = read, w = write
|
|
fd_set rfds, wfds ;
|
|
// Timeout per funzione select { secondi, millisecondi}
|
|
// La funzione select attende che il socket sia pronto per lettura e scrittura
|
|
// Resituisce :
|
|
// SOCKET_ERROR in caso di errore
|
|
// 0 se scade il timeout
|
|
// > 0 se ci sono eventi da gestire
|
|
timeval timeout = {0, 100000 } ; // 100ms
|
|
|
|
// Ciclo
|
|
while ( ( bIsPub ? m_bPubLoopRunning.load( memory_order_acquire)
|
|
: m_bSubLoopRunning.load( memory_order_acquire))) {
|
|
// Se contesto non valido, interrompo il ciclo
|
|
if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING))
|
|
break ;
|
|
// Recupero file descriptor di lettura e scrittura
|
|
FD_ZERO( &rfds) ;
|
|
FD_ZERO( &wfds) ;
|
|
FD_SET( sock, &rfds) ;
|
|
FD_SET( sock, &wfds) ;
|
|
// Controllo il risultato ottenuto
|
|
int nRet = select( 0, &rfds, &wfds, nullptr, &timeout) ;
|
|
if ( nRet == SOCKET_ERROR) {
|
|
// Se errore -> interruzione del ciclo
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Socket select() failed -> "} +
|
|
ToString( WSAGetLastError())).c_str()) ;
|
|
break ;
|
|
}
|
|
else if ( nRet > 0) {
|
|
// Se Socket pronto per lettura/ scrittua
|
|
if ( ctx != nullptr && ! ( ctx->c.flags & REDIS_DISCONNECTING) && ctx->c.fd != -1) {
|
|
if ( FD_ISSET( sock, &rfds))
|
|
redisAsyncHandleRead( ctx) ;
|
|
if ( FD_ISSET( sock, &wfds))
|
|
redisAsyncHandleWrite( ctx) ;
|
|
}
|
|
// Evito saturazione CPU
|
|
this_thread::sleep_for( chrono::milliseconds( 1)) ;
|
|
}
|
|
|
|
// Reset del timeOut ad ogni ciclo
|
|
timeout.tv_sec = 0 ;
|
|
timeout.tv_usec = 100000 ; // 100ms
|
|
}
|
|
|
|
return ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// [Utility] Funzione di Controllo per Id Connessione
|
|
// ---------------------------------------------------------------------------
|
|
static bool
|
|
CheckIdConnection( int nIdConnection)
|
|
{
|
|
if ( nIdConnection < 0 || nIdConnection >= MAX_CONNECTION_NBR) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Not a Valid id Connestion : [1, "} + ToString( MAX_CONNECTION_NBR) + "]").c_str())
|
|
return false ;
|
|
}
|
|
return true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [Utility] Funzione di Split parametri per Stringa di Connessione
|
|
// ---------------------------------------------------------------------------
|
|
static bool
|
|
GetParamsFromConnectionString( const string& sConnection, RedisConnectionInfo& ConnectionInfo)
|
|
{
|
|
// Se la stringa di connessione è vuota, errore
|
|
if ( sConnection.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Empty Connection String") ;
|
|
return false ;
|
|
}
|
|
|
|
// Ricerco le informazioni Splittando la stringa di connessione per ","
|
|
string sHostPort, sParams ;
|
|
SplitFirst( sConnection, ",", sHostPort, sParams) ;
|
|
if ( ! sHostPort.empty()) {
|
|
// Host e Porta
|
|
string sHost, sPort ;
|
|
SplitFirst( sHostPort, ":", sHost, sPort) ;
|
|
if ( sHost.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Invalid Host") ;
|
|
return false ;
|
|
}
|
|
int nPort = DEFAULT_PORT ;
|
|
if ( ! sPort.empty()) {
|
|
if ( ! FromString( sPort, nPort)) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Invalid Port") ;
|
|
return false ;
|
|
}
|
|
}
|
|
ConnectionInfo.sHost = sHost ;
|
|
ConnectionInfo.nPort = nPort ;
|
|
}
|
|
|
|
// Recupero i parametri ( rimuovo tutti gli spazi )
|
|
sParams.erase( remove( sParams.begin(), sParams.end(), ' '), sParams.end()) ;
|
|
GetValInNotes( sParams, "serviceName", ",", ConnectionInfo.sServiceName) ;
|
|
GetValInNotes( sParams, "user", ",", ConnectionInfo.sUser) ;
|
|
GetValInNotes( sParams, "password", ",", ConnectionInfo.sPassword) ;
|
|
GetValInNotes( sParams, "DefaultDatabase", ",", ConnectionInfo.nDefaultDataBase) ;
|
|
if ( ConnectionInfo.nDefaultDataBase < 0 || ConnectionInfo.nDefaultDataBase > REDIS_MAX_DB) {
|
|
ConnectionInfo.Clear() ;
|
|
LOG_INFO( GetCmdLogger(), "Error : DB Out of range [0, 15]")
|
|
return false ;
|
|
}
|
|
GetValInNotes( sParams, "keepAlive", ",", ConnectionInfo.nKeepAlive) ;
|
|
GetValInNotes( sParams, "connectTimeout", ",", ConnectionInfo.nConnectTimeout) ;
|
|
GetValInNotes( sParams, "syncTimeout", ",", ConnectionInfo.nSyncTimeout) ;
|
|
GetValInNotes( sParams, "asyncTimeout", ",", ConnectionInfo.nAsyncTimeout) ;
|
|
GetValInNotes( sParams, "abortConnect", ",", ConnectionInfo.bAbortConnect) ;
|
|
GetValInNotes( sParams, "ssl", "," ,ConnectionInfo.bSsl) ;
|
|
GetValInNotes( sParams, "allowAdmin", ",", ConnectionInfo.bAllowAdmin) ;
|
|
|
|
return true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [Utility] Funzione di Creazione Chiave di accesso alla Mappa dei Messaggi
|
|
// ---------------------------------------------------------------------------
|
|
static string
|
|
GetMessageMapKey( int nIdConnection, const string& sChannel)
|
|
{
|
|
return ( ToString( nIdConnection) + "_" + sChannel) ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando CONNECT Sincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisConnect( const string& sConnection, int& nIdConnection)
|
|
{
|
|
// Inizializzo una nuova connessione, recuperando l'Id
|
|
int nMyIdConnection = NO_CONNECTION ;
|
|
for ( int i = 0 ; i < int( s_vRedisClients.size()) ; ++ i) {
|
|
if ( s_vRedisClients[i].bFreeConnection) {
|
|
nMyIdConnection = i ;
|
|
break ;
|
|
}
|
|
}
|
|
if ( nMyIdConnection == NO_CONNECTION) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} +
|
|
ToString( MAX_CONNECTION_NBR) + " )").c_str())
|
|
return false ;
|
|
}
|
|
|
|
// Recupero i riferimenti necessari e blocco la connessione corrente
|
|
RedisSync& SyncRedisClient = s_vRedisClients[nMyIdConnection] ;
|
|
SyncRedisClient.bFreeConnection = false ;
|
|
RedisConnectionInfo& ConnectionInfo = SyncRedisClient.ConnectionInfo ;
|
|
|
|
// Recupero i parametri dalla stringa di connessione
|
|
if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) {
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// Se connessione senza TimeOut
|
|
if ( ConnectionInfo.nSyncTimeout <= 0) {
|
|
// Imposto il contesto per connesione sincrona
|
|
SyncRedisClient.pRedisContext = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
|
|
if ( SyncRedisClient.pRedisContext == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
|
|
return false ;
|
|
}
|
|
else if ( SyncRedisClient.pRedisContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str())
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
}
|
|
// Se connessione con parametri
|
|
else {
|
|
// TimeOut per connessione
|
|
struct timeval TimeOutConnection{} ;
|
|
TimeOutConnection.tv_sec = static_cast<long>( ConnectionInfo.nSyncTimeout) ;
|
|
TimeOutConnection.tv_usec = static_cast<long>( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ;
|
|
|
|
// Imposto il contesto per connesione sincrona con TimeOut
|
|
SyncRedisClient.pRedisContext = redisConnectWithTimeout( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort, TimeOutConnection) ;
|
|
if ( SyncRedisClient.pRedisContext == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
|
|
return false ;
|
|
}
|
|
else if ( SyncRedisClient.pRedisContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str())
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// TimeOut per richiesta I/O
|
|
struct timeval TimeOutIO{} ;
|
|
TimeOutIO.tv_sec = static_cast<long>( ConnectionInfo.nSyncTimeout) ;
|
|
TimeOutIO.tv_usec = static_cast<long>( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ;
|
|
|
|
// Imposto TimeOut per I/O
|
|
int nResult = redisSetTimeout( SyncRedisClient.pRedisContext, TimeOutIO) ;
|
|
if ( nResult != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't set timeout I/O") ;
|
|
return false ;
|
|
}
|
|
}
|
|
|
|
// Verifico se la Connessione è di tipo Sentinel
|
|
redisReply* replySentinel = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SENTINEL get-master-addr-by-name %s",
|
|
ConnectionInfo.sServiceName.c_str()) ;
|
|
if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) {
|
|
if ( replySentinel != nullptr)
|
|
freeReplyObject( replySentinel) ;
|
|
}
|
|
else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr &&
|
|
replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) {
|
|
// --- Nodo sentinella
|
|
string sMasterHost = replySentinel->element[0]->str ;
|
|
int nMasterPort = DEFAULT_SENTINEL_MASTER_PORT ;
|
|
FromString( replySentinel->element[1]->str, nMasterPort) ;
|
|
freeReplyObject( replySentinel) ;
|
|
|
|
// Effettuo la connessione al master redis
|
|
freeReplyObject( SyncRedisClient.pRedisContext) ;
|
|
SyncRedisClient.pRedisContext = redisConnect( sMasterHost.c_str(), nMasterPort) ;
|
|
if ( SyncRedisClient.pRedisContext == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
|
|
return false ;
|
|
}
|
|
else if ( SyncRedisClient.pRedisContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str())
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "Connected to Redis !")
|
|
|
|
// Verifico se richiesta autenticazione
|
|
if ( ! ConnectionInfo.sPassword.empty()) {
|
|
redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "AUTH %s %s",
|
|
ConnectionInfo.sUser.c_str(), ConnectionInfo.sPassword.c_str()) ;
|
|
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Authentication failed") ;
|
|
if ( reply == nullptr)
|
|
freeReplyObject( reply) ;
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
freeReplyObject( reply) ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "Authenticated to Redis !")
|
|
|
|
// Seleziono il DataBase
|
|
redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SELECT %d", ConnectionInfo.nDefaultDataBase) ;
|
|
if ( reply == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) ;
|
|
return false ;
|
|
}
|
|
else if ( reply->type == REDIS_REPLY_ERROR) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + reply->str).c_str())
|
|
freeReplyObject( reply) ;
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
freeReplyObject( reply) ;
|
|
LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ;
|
|
|
|
// Restituisco l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua)
|
|
nIdConnection = nMyIdConnection + 1 ;
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando DISCONNECT Sincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisDisconnect( int nIdConnection)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( s_vRedisClients[nMyIdConnection].bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
|
|
// Pulisco la connessione corrente
|
|
s_vRedisClients[nMyIdConnection].Clear() ;
|
|
LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ;
|
|
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando SET Sincrono ( sola scrittura {key:string, val:string})
|
|
//----------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisSetValFromKey( int nIdConnection, const string& sKey, const string& sVal)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( s_vRedisClients[nMyIdConnection].bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la connessione corrente
|
|
RedisSync& RedisClient = s_vRedisClients[nMyIdConnection] ;
|
|
|
|
// Il valore associato alla chiave può essere solo di tipo stringa
|
|
redisReply* reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "SET %s %s", sKey.c_str(), sVal.c_str()) ;
|
|
if ( reply == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Null reply") ;
|
|
return false ;
|
|
}
|
|
else if ( reply->type != REDIS_REPLY_STATUS || string( reply->str ) != "OK") {
|
|
LOG_INFO( GetCmdLogger(), "Error SET : No answer") ;
|
|
freeReplyObject( reply) ;
|
|
return false ;
|
|
}
|
|
freeReplyObject( reply) ;
|
|
|
|
LOG_INFO( GetCmdLogger(), ( string{ "New Key Set : ["} + sKey + ":" + sVal + "]").c_str())
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando Get Sincrono ( sola lettura {key:string, val:string})
|
|
//----------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisGetValFromKey( int nIdConnection, const string& sKey, string& sVal)
|
|
{
|
|
sVal.clear() ;
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( s_vRedisClients[nMyIdConnection].bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la connessione corrente
|
|
RedisSync& RedisClient = s_vRedisClients[nMyIdConnection] ;
|
|
|
|
// Recupero il tipo associato alla chiave da Redis
|
|
redisReply* typeReply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "TYPE %s", sKey.c_str()) ;
|
|
if ( typeReply == nullptr || typeReply->type != REDIS_REPLY_STATUS) {
|
|
string sErr = ( typeReply != nullptr ? string( typeReply->str) : "null reply") ;
|
|
if ( typeReply != nullptr)
|
|
freeReplyObject( typeReply) ;
|
|
LOG_INFO( GetCmdLogger(), ( "Error : Failed to get key type, " + sErr).c_str()) ;
|
|
return false ;
|
|
}
|
|
string sType = typeReply->str ;
|
|
freeReplyObject( typeReply) ;
|
|
|
|
// Determino il formato per la chiamata al DataBase Redis
|
|
// Per ora accettate solamente valori string
|
|
string sFormat = ( sType == KEY_TYPE_STRING ? "Get %s" : "") ;
|
|
if ( sFormat.empty()) {
|
|
LOG_ERROR( GetCmdLogger(), "Error : Not valid Key Type") ;
|
|
return false ;
|
|
}
|
|
|
|
// Effettuo la chiamata
|
|
redisReply* reply = nullptr ;
|
|
reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, sFormat.c_str(), sKey.c_str()) ;
|
|
if ( reply == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Null reply") ;
|
|
return false ;
|
|
}
|
|
|
|
// Recupero il Valore
|
|
if ( sType == KEY_TYPE_STRING) {
|
|
if ( reply->type != REDIS_REPLY_STRING) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Failed to read the value") ;
|
|
freeReplyObject( reply) ;
|
|
return false ;
|
|
}
|
|
sVal = string( reply->str) ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Key Read : ["} + sKey + ":" + sVal + "]").c_str())
|
|
|
|
freeReplyObject( reply) ;
|
|
return true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comando SELECT Asincrono ( Selezione DB)
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisDbSElectionCallBack( redisAsyncContext* ctx, void* r, void*)
|
|
{
|
|
// Invio la richiesta
|
|
redisReply* reply = static_cast<redisReply*>( r) ;
|
|
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
|
|
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Db Selection -> "} + sErrMsg).c_str()) ;
|
|
return ;
|
|
}
|
|
// Imposto flag di Autenticazione
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Selezione riuscita
|
|
MyStatus->bSelectedDB.store( true, memory_order_release) ;
|
|
MyStatus->bSelectedDB.notify_one() ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comando AUTH Asincrono ( Autenticazione)
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisAuthenticationCallBack( redisAsyncContext* ctx, void* r, void*)
|
|
{
|
|
// Invio la richiesta
|
|
redisReply* reply = static_cast<redisReply*>( r) ;
|
|
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
|
|
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Authentication -> "} + sErrMsg).c_str()) ;
|
|
return ;
|
|
}
|
|
// Imposto flag di Autenticazione
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Autenticazione riuscita
|
|
MyStatus->bAuthenticated.store( true, memory_order_release) ;
|
|
MyStatus->bAuthenticated.notify_one() ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comandi PUBLISH, SUBSCRIBE, MESSAGE, UNSUBSCRIBE
|
|
// Asincroni
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*)
|
|
{
|
|
// Verifico che il contesto sia definito
|
|
if ( ctx == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack")
|
|
return ;
|
|
}
|
|
|
|
// Recupero della risposta dal server redis
|
|
redisReply* reply = static_cast<redisReply*>( r) ;
|
|
if ( reply == nullptr)
|
|
return ;
|
|
|
|
LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked")
|
|
|
|
// Caso PUBLISH: hiredis risponde con REDIS_REPLY_INTEGER (numero di subscribers)
|
|
if ( reply->type == REDIS_REPLY_INTEGER) {
|
|
// Recupero lo Stato
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Imposto stato di pubblicazione
|
|
MyStatus->bPublished.store( true, memory_order_release) ;
|
|
MyStatus->bPublished.notify_one() ;
|
|
// Recupero il numero di clients iscritti al canale
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Message published to "} + ToString( reply->integer) + " clients !").c_str()) ;
|
|
return ;
|
|
}
|
|
// Caso ARRAY: Subscribe / Unsubscribe / Message
|
|
else if ( reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) {
|
|
// Recupero la tipologia di chiamata effettuata
|
|
const char* msgType = ( reply->element[0] != nullptr && reply->element[0]->str != nullptr ?
|
|
reply->element[0]->str : "") ;
|
|
|
|
// --- Se Messaggio
|
|
if ( strcmp( msgType, "message") == 0 && reply->elements == 3) {
|
|
// Messaggio ricevuto da un canale
|
|
if ( reply->element[1] != nullptr && reply->element[1]->str != nullptr &&
|
|
reply->element[2] != nullptr && reply->element[2]->str != nullptr) {
|
|
string sChannel{ reply->element[1]->str, reply->element[1]->len} ;
|
|
string sMessage{ reply->element[2]->str, reply->element[2]->len} ;
|
|
#if DEBUG
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " + sMessage).c_str())
|
|
#endif
|
|
// Recupero lo Stato
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Aggiorno la Mappa dei Messaggi ( viene sempre sovrascritto all'ultimo)
|
|
string sKey = GetMessageMapKey( MyStatus->nIdConnection, sChannel) ;
|
|
auto Iter = s_vAsyncRedisMessages.find( sKey) ;
|
|
if ( Iter != s_vAsyncRedisMessages.end()) {
|
|
RedisAsyncMessage& LockedMessage = Iter->second ;
|
|
while ( LockedMessage.Lock.test_and_set( memory_order_acquire))
|
|
LockedMessage.Lock.wait( true, memory_order_relaxed) ;
|
|
++ LockedMessage.nCount ;
|
|
LockedMessage.sMessage = sMessage ;
|
|
LockedMessage.Lock.clear( memory_order_release) ;
|
|
LockedMessage.Lock.notify_one() ;
|
|
}
|
|
}
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Errror : Invalid Message received")
|
|
return ;
|
|
}
|
|
}
|
|
// --- Se SUBSCRIBE
|
|
else if ( strcmp( msgType, "subscribe") == 0 && reply->elements >= 3) {
|
|
if ( reply->element[1] != nullptr && reply->element[1] != nullptr &&
|
|
reply->element[2] != nullptr) {
|
|
// Recupero lo Stato
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Imposto stato di Subscribe
|
|
MyStatus->bSubscribed.store( true, memory_order_release) ;
|
|
MyStatus->bSubscribed.notify_one() ;
|
|
// Comunico il risultato
|
|
string sChannel = reply->element[1]->str ;
|
|
int nCount = reply->element[2]->integer ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Subscribed to ["} + sChannel + "]," +
|
|
" total subscribtions : " + ToString( nCount)).c_str()) ;
|
|
}
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Errror : Invalid Subscription reply")
|
|
return ;
|
|
}
|
|
}
|
|
// --- se Unsubscribe
|
|
else if ( strcmp( msgType, "unsubscribe") == 0 && reply->elements >= 3) {
|
|
if ( reply->element[1] != nullptr && reply->element[1] != nullptr &&
|
|
reply->element[2] != nullptr) {
|
|
// Recupero lo Stato
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Imposto stato Unsubscribe
|
|
MyStatus->bUnsubscribed.store( true, memory_order_release) ;
|
|
MyStatus->bUnsubscribed.notify_one() ;
|
|
// Comunico il risultato
|
|
string sChannel = reply->element[1]->str ;
|
|
int nCount = reply->element[2]->integer ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribed to ["} + sChannel + "]," +
|
|
" subscriptions left : " + ToString( nCount)).c_str())
|
|
}
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Errror : Invalid Unsubscription reply")
|
|
return ;
|
|
}
|
|
}
|
|
// --- Undefined
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Error : Undefined reply in CallBack from Redis")
|
|
return ;
|
|
}
|
|
}
|
|
|
|
return ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per ricezione MESSAGE Asincrona
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
WaitMessageCallback( redisAsyncContext* ctx, void* r, void*)
|
|
{
|
|
// Verifico che il contesto sia definito
|
|
if ( ctx == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack")
|
|
return ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked")
|
|
|
|
// Recupero della risposta dal server redis
|
|
redisReply* reply = static_cast<redisReply*>( r) ;
|
|
if ( reply == nullptr)
|
|
return ;
|
|
|
|
if ( reply->type != REDIS_REPLY_ARRAY || reply->elements < 3)
|
|
LOG_INFO( GetCmdLogger(), "Error : Invalid Message CallBack reply")
|
|
else {
|
|
// Recupero il Messaggio ( se callBack riferita al tipo "message")
|
|
if ( reply->element[0] == nullptr || reply->element[0]->str == nullptr)
|
|
LOG_INFO( GetCmdLogger(), "Error : Null message type received")
|
|
else {
|
|
const char* msgType = reply->element[0]->str ;
|
|
if ( strcmp( msgType, "message") == 0) {
|
|
if ( reply->element[2] == nullptr || reply->element[2]->str == nullptr)
|
|
LOG_INFO( GetCmdLogger(), "Error : Null message received")
|
|
else {
|
|
// Recupero lo Stato
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Imposto stato di pubblicazione
|
|
MyStatus->bMessage = true ;
|
|
MyStatus->sMessage = reply->element[2]->str ;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comando CONNECT Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisConnectCallBack( const redisAsyncContext* ctx, int nStatus)
|
|
{
|
|
// Verifico che il Contesto sia valido
|
|
if ( ctx == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Async Connect CallBack failed ") ;
|
|
return ;
|
|
}
|
|
|
|
// Recupero dal Contesto lo stato della connessione
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
|
|
// Verifico che lo stato internno al contesto Redis sia OK
|
|
if ( nStatus != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Redis Async connection failed") ;
|
|
MyStatus->bAlive.store( false, memory_order_release) ;
|
|
MyStatus->bConnected.store( false, memory_order_release) ;
|
|
MyStatus->bConnected.notify_one() ;
|
|
return ;
|
|
}
|
|
|
|
// Connessione riuscita
|
|
MyStatus->bAlive.store( true, memory_order_release) ;
|
|
MyStatus->bConnected.store( true, memory_order_release) ;
|
|
MyStatus->bConnected.notify_one() ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comando DISCONNECT Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisDisconnectCallBack( const redisAsyncContext* ctx, int nStatus)
|
|
{
|
|
// Verifico che il Contesto sia valido
|
|
if ( ctx == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Null Context for Disconnetion CallBack ") ;
|
|
return ;
|
|
}
|
|
|
|
// Recupero lo stato della connessione
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
|
|
// nStatus non è affidabile in questo contesto, può essere != REDIS_OK anche in disconnessione normale
|
|
// NB. nStatus è solo informativo, ignora tutte le disconnessioni reali
|
|
LOG_INFO( GetCmdLogger(), "Redis Async Disconnected ( x2, called for PUB and SUB)") ;
|
|
|
|
// Aggiorno gli stati ed eventuali thread in attesa
|
|
MyStatus->bAlive.store( false, memory_order_release) ;
|
|
MyStatus->bConnected.store( false, memory_order_release) ;
|
|
MyStatus->bConnected.notify_one() ;
|
|
MyStatus->bDisconnected.store( true, memory_order_release) ;
|
|
MyStatus->bDisconnected.notify_one() ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando CONNECT Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection)
|
|
{
|
|
// Inizializzo una nuova connessione, recuperando l'Id
|
|
int nMyIdConnection = NO_CONNECTION ;
|
|
for ( int i = 0 ; i < int( s_vAsyncRedisClients.size()) ; ++ i) {
|
|
if ( s_vAsyncRedisClients[i].m_bFreeConnection.load( memory_order_acquire)) {
|
|
nMyIdConnection = i ;
|
|
break ;
|
|
}
|
|
}
|
|
if ( nMyIdConnection == NO_CONNECTION) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} +
|
|
ToString( MAX_CONNECTION_NBR) + " )").c_str())
|
|
return false ;
|
|
}
|
|
// Recupero lo Slot
|
|
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
AsyncRedisClient.Clear() ; // per possibile riconnessione
|
|
AsyncRedisClient.m_bFreeConnection.store( false, memory_order_release) ;
|
|
|
|
// Recupero i riferimenti necessari e blocco la connessione corrente
|
|
RedisConnectionInfo& ConnectionInfo = AsyncRedisClient.m_ConnectionInfo ;
|
|
|
|
// Recupero i parametri dalla stringa di connessione
|
|
AsyncRedisClient.m_sConnectionString = sConnection ; // per riconnessione
|
|
if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) {
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// Inizializzazione libreria Winsock, necessaria per usare le funzionalità di rete su Windows,
|
|
// come socket TCP/IP. Versione utilizzata 2.2
|
|
int nStatus = WSAStartup( MAKEWORD( 2, 2), &AsyncRedisClient.m_wsaData) ;
|
|
if ( nStatus != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : WSAStartup -> "} + ToString( nStatus)).c_str()) ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// Verifico se si tratta di un nodo Sentinella
|
|
redisContext* sentinelCtx = redisConnect( AsyncRedisClient.m_ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
|
|
if ( sentinelCtx == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
else if ( sentinelCtx->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + sentinelCtx->errstr).c_str())
|
|
redisFree( sentinelCtx) ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
redisReply* replySentinel = ( redisReply*)redisCommand( sentinelCtx, "SENTINEL get-master-addr-by-name %s",
|
|
ConnectionInfo.sServiceName.c_str()) ;
|
|
if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) {
|
|
if ( replySentinel != nullptr)
|
|
freeReplyObject( replySentinel) ;
|
|
}
|
|
else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr &&
|
|
replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) {
|
|
// --- Nodo sentinella
|
|
ConnectionInfo.sHost = replySentinel->element[0]->str ;
|
|
ConnectionInfo.nPort = DEFAULT_SENTINEL_MASTER_PORT ;
|
|
FromString( replySentinel->element[1]->str, ConnectionInfo.nPort) ;
|
|
freeReplyObject( replySentinel) ;
|
|
redisFree( sentinelCtx) ;
|
|
}
|
|
|
|
// Definisco le 2 connessioni asincroni con Host e Porta ( per PUBLISH e SUBSCRIBE/UNSUBSCRIBE)
|
|
AsyncRedisClient.m_pRedisAsyncPubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
|
|
AsyncRedisClient.m_pRedisAsyncSubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
|
|
if ( AsyncRedisClient.m_pRedisAsyncPubContext == nullptr || AsyncRedisClient.m_pRedisAsyncSubContext == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate Redis Async context")
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
else if ( AsyncRedisClient.m_pRedisAsyncPubContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncPubContext->errstr).c_str())
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
else if ( AsyncRedisClient.m_pRedisAsyncSubContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncSubContext->errstr).c_str())
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// Definizione e partenza del ciclo degli eventi in un thread separato ( in BackGround)
|
|
AsyncRedisClient.m_ConnectionPubStatus.Clear() ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.Clear() ;
|
|
AsyncRedisClient.m_bPubLoopRunning.store( true, memory_order_release) ;
|
|
AsyncRedisClient.m_bSubLoopRunning.store( true, memory_order_release) ;
|
|
AsyncRedisClient.m_PubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, true) ;
|
|
AsyncRedisClient.m_SubThread = thread( &RedisAsync::RedisEventLoop, &AsyncRedisClient, false) ;
|
|
|
|
// Imposto e Definisco la funzione di CallBack per contesto PUB/SUB per Connessione/Disconnessione asincrona
|
|
AsyncRedisClient.m_pRedisAsyncPubContext->data = &AsyncRedisClient.m_ConnectionPubStatus ;
|
|
AsyncRedisClient.m_pRedisAsyncSubContext->data = &AsyncRedisClient.m_ConnectionSubStatus ;
|
|
AsyncRedisClient.m_ConnectionPubStatus.bConnected.store( false, memory_order_release) ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.bConnected.store( false, memory_order_release) ;
|
|
redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisConnectCallBack) ;
|
|
redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisConnectCallBack) ;
|
|
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDisconnectCallBack) ;
|
|
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDisconnectCallBack) ;
|
|
// Attendo la risposta di CallBack
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bConnected.load( memory_order_acquire)) {
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
|
|
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bConnected.load( memory_order_acquire)) {
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
|
|
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "Connected to Redis !")
|
|
#if DEBUG
|
|
double dMsConnectionTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsConnectionTime) + " ms").c_str())
|
|
#endif
|
|
|
|
// Se presente una Password, eseguo l'autenticazione
|
|
if ( ! AsyncRedisClient.m_sPassword.empty()) {
|
|
AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( false, memory_order_relaxed) ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( false, memory_order_relaxed) ;
|
|
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisAuthenticationCallBack,
|
|
nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ;
|
|
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisAuthenticationCallBack,
|
|
nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ;
|
|
tStart = chrono::steady_clock::now() ;
|
|
tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.load( memory_order_acquire)) {
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
|
|
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.load( memory_order_acquire)) {
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
|
|
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "Authenticated to Redis !")
|
|
#if DEBUG
|
|
double dMsAuthenticationTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsAuthenticationTime) + " ms").c_str())
|
|
#endif
|
|
}
|
|
else {
|
|
AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated.store( true, memory_order_release) ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated.store( true, memory_order_release) ;
|
|
}
|
|
|
|
// Se Presente una Selezione del Database, eseguo la selezione
|
|
if ( AsyncRedisClient.m_nDataBase != DEFAULT_DATABASE) {
|
|
AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( false, memory_order_relaxed) ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( false, memory_order_relaxed) ;
|
|
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDbSElectionCallBack,
|
|
nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ;
|
|
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDbSElectionCallBack,
|
|
nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ;
|
|
tStart = chrono::steady_clock::now() ;
|
|
tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.load( memory_order_acquire)) {
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
|
|
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.load( memory_order_acquire)) {
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
|
|
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ;
|
|
#if DEBUG
|
|
double dMsDbSelectionTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDbSelectionTime) + " ms").c_str())
|
|
#endif
|
|
}
|
|
else {
|
|
AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB.store( true, memory_order_release) ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB.store( true, memory_order_release) ;
|
|
}
|
|
|
|
// Restituisco e salvo l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua)
|
|
nIdConnection = nMyIdConnection + 1 ;
|
|
AsyncRedisClient.m_ConnectionPubStatus.nIdConnection = nIdConnection ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.nIdConnection = nIdConnection ;
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando DISCONNECT Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncDisconnect( int nIdConnection)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
|
|
// Recupero la connessione
|
|
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
|
|
// Verifico che la connessione sia utilizzata, e non libera
|
|
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
|
|
// Reset dei Flag prima della connessione
|
|
AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.store( false, memory_order_relaxed) ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.store( false, memory_order_relaxed) ;
|
|
AsyncRedisClient.m_ConnectionPubStatus.bAlive.store( false, memory_order_relaxed) ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.bAlive.store( false, memory_order_relaxed) ;
|
|
|
|
// Effettuo la Disconnesione asincrona ( libera la memoria in automatico)
|
|
redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncPubContext) ;
|
|
redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncSubContext) ;
|
|
|
|
// Attendo la CallBack di disconnessione
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.load( memory_order_acquire)) {
|
|
AsyncRedisClient.m_ConnectionPubStatus.bDisconnected.wait( false) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
|
|
return false ;
|
|
}
|
|
}
|
|
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.load( memory_order_acquire)) {
|
|
AsyncRedisClient.m_ConnectionSubStatus.bDisconnected.wait( false) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
|
|
return false ;
|
|
}
|
|
}
|
|
|
|
// Rendo libera la connessione
|
|
AsyncRedisClient.m_bFreeConnection.store( true) ;
|
|
AsyncRedisClient.Clear() ;
|
|
|
|
// Elimino i Messaggi dalla mappa
|
|
for ( auto Iter = s_vAsyncRedisMessages.begin() ; Iter != s_vAsyncRedisMessages.end() ; ) {
|
|
if ( Iter->first.starts_with( GetMessageMapKey( nIdConnection, "")))
|
|
Iter = s_vAsyncRedisMessages.erase( Iter) ;
|
|
else
|
|
++ Iter ;
|
|
}
|
|
|
|
LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ;
|
|
#if DEBUG
|
|
double dMsDisconnectionTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDisconnectionTime) + " ms").c_str())
|
|
#endif
|
|
return true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Funzione per comando PUBLISH Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncPublish( int nIdConnection, const string& sChannel, const string& sMessage)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
|
|
// Verifico che la connessione sia attiva
|
|
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun canale -> errore
|
|
if ( sChannel.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun messaggio -> warning
|
|
if ( sMessage.empty())
|
|
LOG_INFO( GetCmdLogger(), "Warning : Empty Message") ;
|
|
|
|
// Numero massimo di Tentativi per effettuare PUBLISH
|
|
const int MAX_RETRY = 3 ;
|
|
for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) {
|
|
|
|
// Rileggo il Client, dato che potrebbe essere cambiato dopo la riconnessione
|
|
RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
RedisConnectionStatus& curStatus = currClient.m_ConnectionPubStatus ;
|
|
|
|
// Se la connessione non è attiva, provo a riconnetermi
|
|
if ( ! currClient.m_ConnectionPubStatus.bAlive.load( memory_order_acquire)) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Publish: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ;
|
|
|
|
// Recupero la stringa di connessione corrente
|
|
string sConnStr = s_vAsyncRedisClients[nMyIdConnection].m_sConnectionString ;
|
|
// Recupero i Canali su cui ero iscritto
|
|
unordered_set<string> setSubChannels = currClient.m_set_SubChannels ;
|
|
|
|
// Pulisco il contesto corrente
|
|
AsyncRedisClient.Clear() ;
|
|
|
|
int nNewId = 0 ;
|
|
bool bOkReconnect = false ;
|
|
if ( ExeRedisAsyncConnect( sConnStr, nNewId)) {
|
|
// Se ho scelto lo stesso slot, aggiorno nMyIdConnection
|
|
int nNewIndex = nNewId - 1 ;
|
|
if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) {
|
|
// Sostituisco il riferimento Client con il nuovo
|
|
if ( nNewIndex != nMyIdConnection)
|
|
nMyIdConnection = nNewIndex ;
|
|
}
|
|
bOkReconnect = true ;
|
|
// Mi iscrivo a tutti i canali a cui ero precedentemente iscritto
|
|
for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) {
|
|
if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter))
|
|
return false ;
|
|
}
|
|
}
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ;
|
|
// riprovo al ciclo successivo...
|
|
}
|
|
// Se non mi sono riconesso metto un piccolo Delay per estrema sicureza
|
|
if ( ! bOkReconnect) {
|
|
this_thread::sleep_for( chrono::milliseconds( 100)) ;
|
|
continue ;
|
|
}
|
|
}
|
|
|
|
// Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione)
|
|
RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
RedisConnectionStatus& nextStatus = nextClient.m_ConnectionPubStatus ;
|
|
|
|
// Ricalcolo timeout dal client aggiornato
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( nextClient.m_ConnectionInfo.nAsyncTimeout)) ;
|
|
|
|
// Reset dello stato di pubblicazione
|
|
nextStatus.bPublished.store( false, memory_order_relaxed) ;
|
|
|
|
// Invio del comando PUBLISH
|
|
int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncPubContext, RedisPubSubUnsubMsgCallback, nullptr,
|
|
"PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) ;
|
|
if ( nResult != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Publish: redisAsyncCommand failed") ;
|
|
nextStatus.bAlive.store( false, memory_order_release) ;
|
|
this_thread::sleep_for( chrono::milliseconds( 100)) ;
|
|
continue ;
|
|
}
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
while ( ! nextStatus.bPublished.load( memory_order_acquire)) {
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ;
|
|
break ;
|
|
}
|
|
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
|
|
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
if ( nextStatus.bPublished.load( memory_order_acquire)) {
|
|
#if DEBUG
|
|
double dMsPublishTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ;
|
|
#endif
|
|
return true ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Publish failed, attempt "} + ToString( nAttempt)).c_str()) ;
|
|
|
|
// Breve pausa prima del Retry
|
|
this_thread::sleep_for( chrono::milliseconds( 100)) ;
|
|
}
|
|
|
|
// Tutti i tentativi sono falliti
|
|
return false ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando SUBSCRIBE Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncSubscribe( int nIdConnection, const string& sChannel)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
if ( AsyncRedisClient.m_bFreeConnection.load( memory_order_acquire)) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun canale, errore
|
|
if ( sChannel.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") ;
|
|
return false ;
|
|
}
|
|
|
|
// Numero massimo di Tentativi per effettuare SUBSCRIBE
|
|
const int MAX_RETRY = 3 ;
|
|
for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) {
|
|
|
|
// Rileggo il client e lo stato (potrebbero cambiare dopo reconnect)
|
|
RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ;
|
|
|
|
// Se la connessione non è viva, provo a riconnettere
|
|
if ( ! currStatus.bAlive.load( memory_order_acquire)) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Subscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ;
|
|
|
|
// Recupero la stringa di connessione corrente
|
|
string sConnStr = currClient.m_sConnectionString ;
|
|
// Recupero i Canali su cui ero iscritto
|
|
unordered_set<string> setSubChannels = currClient.m_set_SubChannels ;
|
|
|
|
// Pulisco il contesto corrente
|
|
AsyncRedisClient.Clear() ;
|
|
|
|
int nNewId = 0 ;
|
|
bool bOkReconnect = false ;
|
|
if ( ExeRedisAsyncConnect( sConnStr, nNewId)) {
|
|
// Se ho scelto lo stesso slot, aggiorno nMyIdConnection
|
|
int nNewIndex = nNewId - 1 ;
|
|
if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) {
|
|
// Sostituisco il riferimento Client con il nuovo
|
|
if ( nNewIndex != nMyIdConnection)
|
|
nMyIdConnection = nNewIndex ;
|
|
}
|
|
bOkReconnect = true ;
|
|
// Mi iscrivo a tutti i canali a cui ero precedentemente iscritto
|
|
for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) {
|
|
if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter))
|
|
return false ;
|
|
}
|
|
}
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Publish: reconnect attempt failed") ;
|
|
// riprovo al ciclo successivo...
|
|
}
|
|
// Se non mi sono riconesso metto un piccolo Delay per estrema sicureza
|
|
if ( ! bOkReconnect) {
|
|
this_thread::sleep_for( chrono::milliseconds( 100)) ;
|
|
continue ;
|
|
}
|
|
}
|
|
|
|
// Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione)
|
|
RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ;
|
|
|
|
// Ricalcolo timeout dal client aggiornato
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( nextClient.m_ConnectionInfo.nAsyncTimeout)) ;
|
|
|
|
// Reset dello stato di pubblicazione
|
|
nextStatus.bSubscribed.store( false, memory_order_relaxed) ;
|
|
|
|
int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
|
|
"SUBSCRIBE %s", sChannel.c_str()) ;
|
|
if ( nResult != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Subscribe: redisAsyncCommand failed") ;
|
|
nextStatus.bAlive.store( false, memory_order_release) ;
|
|
this_thread::sleep_for( chrono::milliseconds( 100)) ;
|
|
continue ;
|
|
}
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
while ( ! nextStatus.bSubscribed.load( memory_order_acquire)) {
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Publish: timeout waiting for publish reply") ;
|
|
break ;
|
|
}
|
|
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
|
|
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
|
|
if ( nextStatus.bSubscribed.load( memory_order_acquire)) {
|
|
// Uso l'ID corrente ( aggiornato) per la chiave messaggi
|
|
int nEffectiveId = nMyIdConnection + 1 ;
|
|
string sKey = GetMessageMapKey( nEffectiveId, sChannel) ;
|
|
s_vAsyncRedisMessages.try_emplace( sKey) ;
|
|
s_vAsyncRedisMessages[sKey].Lock.clear() ;
|
|
#if DEBUG
|
|
double dMsPublishTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsPublishTime) + " ms").c_str()) ;
|
|
#endif
|
|
// Memorizzo il canale nella set dei canali
|
|
nextClient.m_set_SubChannels.insert( sChannel) ;
|
|
return true ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Subscribe failed, attempt "} + ToString( nAttempt)).c_str()) ;
|
|
|
|
// Breve pausa prima del Retry
|
|
this_thread::sleep_for( chrono::milliseconds( 100)) ;
|
|
}
|
|
|
|
// Tutti i tentativi sono falliti
|
|
return false ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Funzione per comando UNSUBSCRIBE Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncUnsubscribe( int nIdConnection, const string& sChannel)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection.load( memory_order_acquire)) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun canale, errore
|
|
if ( sChannel.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
|
|
return false ;
|
|
}
|
|
|
|
const int MAX_RETRY = 3 ;
|
|
for ( int nAttempt = 0 ; nAttempt < MAX_RETRY ; ++ nAttempt) {
|
|
|
|
// Rileggo il client e lo stato ( potrebbero cambiare dopo reconnect)
|
|
RedisAsync& currClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
RedisConnectionStatus& currStatus = currClient.m_ConnectionSubStatus ;
|
|
|
|
// Se la connessione non è viva, provo a riconnettere
|
|
if ( ! currStatus.bAlive.load( memory_order_acquire)) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Unubscribe: connection not alive, reconnect attempt "} + ToString( nAttempt)).c_str()) ;
|
|
|
|
// Recupero la stringa di connessione corrente
|
|
string sConnStr = currClient.m_sConnectionString ;
|
|
// Recupero i Canali su cui ero iscritto
|
|
unordered_set<string> setSubChannels = currClient.m_set_SubChannels ;
|
|
|
|
// Pulisco il contesto corrente
|
|
AsyncRedisClient.Clear() ;
|
|
|
|
int nNewId = 0 ;
|
|
bool bOkReconnect = false ;
|
|
if ( ExeRedisAsyncConnect( sConnStr, nNewId)) {
|
|
// Se ho scelto lo stesso slot, aggiorno nMyIdConnection
|
|
int nNewIndex = nNewId - 1 ;
|
|
if ( nNewIndex >= 0 && nNewIndex < ssize( s_vAsyncRedisClients)) {
|
|
// Sostituisco il riferimento Client con il nuovo
|
|
if ( nNewIndex != nMyIdConnection)
|
|
nMyIdConnection = nNewIndex ;
|
|
}
|
|
bOkReconnect = true ;
|
|
// Mi iscrivo a tutti i canali a cui ero precedentemente iscritto
|
|
for ( auto Iter = setSubChannels.begin() ; Iter != setSubChannels.end() ; ++ Iter) {
|
|
if ( ! ExeRedisAsyncSubscribe( nMyIdConnection + 1, *Iter))
|
|
return false ;
|
|
}
|
|
}
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Unsubscribe: reconnect attempt failed") ;
|
|
// riprovo al ciclo successivo...
|
|
}
|
|
// Se non mi sono riconesso metto un piccolo Delay per estrema sicureza
|
|
if ( ! bOkReconnect) {
|
|
this_thread::sleep_for( chrono::milliseconds( 100)) ;
|
|
continue ;
|
|
}
|
|
}
|
|
|
|
// Rileggo il Clinet e lo Stato ( in caso di eventuale Riconnessione)
|
|
RedisAsync& nextClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
RedisConnectionStatus& nextStatus = nextClient.m_ConnectionSubStatus ;
|
|
|
|
// Ricalcolo timeout dal client aggiornato
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( nextClient.m_ConnectionInfo.nAsyncTimeout)) ;
|
|
|
|
// Reset dello stato di pubblicazione
|
|
nextStatus.bUnsubscribed.store( false, memory_order_relaxed) ;
|
|
|
|
int nResult = redisAsyncCommand( nextClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
|
|
"UNSUBSCRIBE %s", sChannel.c_str()) ;
|
|
if ( nResult != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed") ;
|
|
nextStatus.bAlive.store( false, memory_order_release) ;
|
|
this_thread::sleep_for( chrono::milliseconds( 100)) ;
|
|
continue ;
|
|
}
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
while ( ! nextStatus.bUnsubscribed.load( memory_order_acquire)) {
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Unsubscribe: timeout waiting for publish reply") ;
|
|
break ;
|
|
}
|
|
// NB. non usare la wait( false) !!! se in rete perdo qualche informazione
|
|
// rimango bloccato all'infinito, meglio usare un timer, meno efficiente ma sicuro
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
|
|
if ( nextStatus.bUnsubscribed.load( memory_order_acquire)) {
|
|
// Rimuovo la chiave dalla mappa messaggi usando l'ID effettivo (aggiornato)
|
|
int nEffectiveId = nMyIdConnection + 1 ;
|
|
string keyToRemove = GetMessageMapKey( nEffectiveId, sChannel) ;
|
|
for ( auto it = s_vAsyncRedisMessages.begin() ; it != s_vAsyncRedisMessages.end() ; ) {
|
|
if ( it->first == keyToRemove)
|
|
it = s_vAsyncRedisMessages.erase( it) ;
|
|
else
|
|
++ it ;
|
|
}
|
|
#if DEBUG
|
|
double dMsUnsubscribeTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{" + "} + ToString( dMsUnsubscribeTime) + " ms").c_str()) ;
|
|
#endif
|
|
// Tolgo il canale dalla lista
|
|
nextClient.m_set_SubChannels.erase( sChannel) ;
|
|
return true ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribe failed, attempt "} + ToString( nAttempt)).c_str()) ;
|
|
|
|
// Breve pausa prima del Retry
|
|
this_thread::sleep_for( chrono::milliseconds( 100)) ;
|
|
}
|
|
|
|
// Tutti i tentativi sono falliti
|
|
return false ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// La funzione esegue una Subscribe e una Unsubscribe
|
|
// Se entro il tempo stabilito riceve un messaggio, lo comunica, altrimenti errore
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncSubscribeOneMessage( int nIdConnection, const string& sChannel, double dMaxTimeOut,
|
|
string& sMessage)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun canale, errore
|
|
if ( sChannel.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
|
|
return false ;
|
|
}
|
|
// Controllo validità tempo di TimeOut
|
|
if ( dMaxTimeOut <= 0.) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Invalid timeout value") ;
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la connessione
|
|
RedisAsync& AsyncRedisClient = s_vAsyncRedisClients[nMyIdConnection] ;
|
|
|
|
// Invio del comando di SUBSCRIBE
|
|
AsyncRedisClient.m_ConnectionSubStatus.bMessage = false ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.sMessage = "" ;
|
|
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr,
|
|
( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to subscribe to"} + sChannel).c_str())
|
|
return false ;
|
|
}
|
|
|
|
// Attivo il TimeOut per l'attesa del messaggio
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( dMaxTimeOut)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bMessage) {
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Timeout : No Message received on ["} + sChannel + "]").c_str())
|
|
return false ;
|
|
}
|
|
}
|
|
|
|
// Invio del comando UNSUBSCRIBE
|
|
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr,
|
|
( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to Unsubscribe to "} + sChannel).c_str())
|
|
return false ;
|
|
}
|
|
|
|
// Controllo se ho ricevuto un messaggio
|
|
if ( AsyncRedisClient.m_ConnectionSubStatus.bMessage) {
|
|
sMessage = AsyncRedisClient.m_ConnectionSubStatus.sMessage ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Message received on ["} + sChannel + "]: " + sMessage).c_str())
|
|
return true ;
|
|
}
|
|
|
|
#if DEBUG
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Timeout: No message received on ["} + sChannel + "]").c_str()) ;
|
|
#endif
|
|
return false ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Funzione per lettura Asicnrona dell'ultimo messaggio su un canale
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncGetMessage( int nIdConnection, const string& sChannel, int& nCount, string& sMessage)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( s_vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun canale, errore
|
|
if ( sChannel.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la Chiave di accesso alla Mappa dei Messaggi
|
|
string sKey = GetMessageMapKey( nIdConnection, sChannel) ;
|
|
auto Iter = s_vAsyncRedisMessages.find( sKey) ;
|
|
if ( Iter == s_vAsyncRedisMessages.end()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Connection not Subscribed to this Channel") ;
|
|
return false ;
|
|
}
|
|
else {
|
|
RedisAsyncMessage& LockedMessage = Iter->second ;
|
|
while ( LockedMessage.Lock.test_and_set( memory_order_acquire))
|
|
LockedMessage.Lock.wait( true, memory_order_relaxed) ;
|
|
nCount = LockedMessage.nCount ;
|
|
sMessage = LockedMessage.sMessage ;
|
|
LockedMessage.Lock.clear( memory_order_release) ;
|
|
LockedMessage.Lock.notify_one() ;
|
|
}
|
|
|
|
return true ;
|
|
}
|