8fcf079126
- in Redis aggiunta gestione di connessioni multiple.
1272 lines
53 KiB
C++
1272 lines
53 KiB
C++
//----------------------------------------------------------------------------
|
|
// EgalTech 2025-2025
|
|
//----------------------------------------------------------------------------
|
|
// File : EXE_Redis.cpp Data : 17.09.25 Versione : 2.7i3
|
|
// Contenuto : Funzioni per interfacciarsi con server Redis.
|
|
//
|
|
//
|
|
//
|
|
// Modifiche : 17.09.25 RE Creazione modulo. ( ver. 2.7i3)
|
|
// Modifiche : 31.10.25 RE Aggiunta gestione connessioni multiple
|
|
//
|
|
//
|
|
//----------------------------------------------------------------------------
|
|
|
|
//--------------------------- Include ----------------------------------------
|
|
#include <winsock2.h>
|
|
#include <WS2tcpip.h>
|
|
#include "stdafx.h"
|
|
#include "EXE.h"
|
|
#include "EXE_Macro.h"
|
|
#include "AuxTools.h"
|
|
#include "/EgtDev/Include/EXeExecutor.h"
|
|
#include "/EgtDev/Include/EGkStringUtils3d.h"
|
|
#include "/EgtDev/Include/EGnStringKeyVal.h"
|
|
#pragma warning( disable: 4244) // conversione da 'uint64_t' a 'size_t'
|
|
#pragma warning( disable: 4200) // utilizzata estensione non standard: matrice di dimensioni zero in struct/union
|
|
#include "/EgtDev/Extern/hiredis/Include/async.h"
|
|
#include <thread>
|
|
#include <atomic>
|
|
#include <map>
|
|
#include <sstream>
|
|
#include <future>
|
|
|
|
using namespace std ;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Ambiente Debug
|
|
// ---------------------------------------------------------------------------
|
|
#define DEBUG 0
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Definizione variabili generali Redis
|
|
// ---------------------------------------------------------------------------
|
|
static const int REDIS_MIN_DB = 0 ;
|
|
static const int REDIS_MAX_DB = 15 ;
|
|
static const int DEFAULT_PORT = 6379 ;
|
|
static const int DEFAULT_SENTINEL_MASTER_PORT = 26379 ;
|
|
static const string DEFAULT_USER = "default" ;
|
|
static const int DEFAULT_DATABASE = 0 ;
|
|
static const int DEFAULT_TIMEOUT = 15000 ; // [ms]
|
|
static const int DEFAULT_ALIVE_TIME = 180 ; // [s]
|
|
static const int MAX_CONNECTION_NBR = 10 ;
|
|
static const int NO_CONNECTION = - 1 ;
|
|
static const string KEY_TYPE_STRING = "string" ;
|
|
static const string KEY_TYPE_LIST = "list" ; // non usato
|
|
static const string KEY_TYPE_SET = "set" ; // non usato
|
|
static const string KEY_TYPE_HASH = "hash" ; // non usato
|
|
static const string KEY_TYPE_ZSET = "zset" ; // non usato
|
|
static const string KEY_TYPE_JSON = "ReJSON-RL" ; // non usato, disponibile se versione >= 8 ( noi usiamo la 6)
|
|
|
|
// Struttura per parametri della stringa di Connessione
|
|
struct RedisConnectionInfo {
|
|
|
|
string sHost, sServiceName, sUser, sPassword ;
|
|
int nPort, nDefaultDataBase, nKeepAlive, nConnectTimeout, nSyncTimeout, nAsyncTimeout ;
|
|
bool bAbortConnect, bSsl, bAllowAdmin ;
|
|
|
|
RedisConnectionInfo() {
|
|
sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ;
|
|
nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ;
|
|
nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ;
|
|
bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ;
|
|
} ;
|
|
|
|
RedisConnectionInfo( string sMyHost, string sMyServiceName, string sMyUser, string sMyPassword, int nMyPort,
|
|
int nMyDefaultDataBase, int dMyKeepAlive, int dMyConnectTimeout, int dMySyncTimeout,
|
|
int dMyAsyncTimeout, bool bMyAbortConnect, bool bMySsl, bool bMyAllowAdmin)
|
|
: sHost( sMyHost), sServiceName( sMyServiceName), sUser( sMyUser), sPassword( sMyPassword), nPort( nMyPort),
|
|
nDefaultDataBase( nMyDefaultDataBase), nKeepAlive( dMyKeepAlive), nConnectTimeout( dMyConnectTimeout),
|
|
nSyncTimeout( dMySyncTimeout), nAsyncTimeout( dMyAsyncTimeout), bAbortConnect( bMyAbortConnect),
|
|
bSsl( bMySsl), bAllowAdmin( bMyAllowAdmin) {}
|
|
|
|
inline void Clear() {
|
|
sHost = "" ; sServiceName = "" ; sUser = DEFAULT_USER ; sPassword = "" ;
|
|
nPort = DEFAULT_PORT ; nDefaultDataBase = DEFAULT_DATABASE ; nKeepAlive = DEFAULT_ALIVE_TIME ;
|
|
nConnectTimeout = DEFAULT_TIMEOUT ; nSyncTimeout = DEFAULT_TIMEOUT ; nAsyncTimeout = DEFAULT_TIMEOUT ;
|
|
bAbortConnect = false ; bSsl = false ; bAllowAdmin = false ;
|
|
}
|
|
|
|
} ;
|
|
|
|
// Struttura per Connessiona Sincrona
|
|
struct RedisSync {
|
|
|
|
atomic<bool> bFreeConnection ;
|
|
redisContext* pRedisContext ;
|
|
RedisConnectionInfo ConnectionInfo ;
|
|
|
|
RedisSync()
|
|
: bFreeConnection( true), pRedisContext( nullptr), ConnectionInfo() {}
|
|
|
|
inline void Clear() {
|
|
bFreeConnection = true ;
|
|
redisFree( pRedisContext) ;
|
|
pRedisContext = nullptr ;
|
|
ConnectionInfo.Clear() ;
|
|
}
|
|
|
|
} ;
|
|
static array<RedisSync, MAX_CONNECTION_NBR> vRedisClients ;
|
|
|
|
// Struttura per stato di Connessione Asincrona
|
|
struct RedisConnectionStatus {
|
|
|
|
atomic<bool> bConnected ;
|
|
atomic<bool> bAuthenticated ;
|
|
atomic<bool> bSelectedDB ;
|
|
atomic<bool> bDisconnected ;
|
|
atomic<bool> bPublished ;
|
|
atomic<bool> bSubscribed ;
|
|
atomic<bool> bUnsubscribed ;
|
|
atomic<bool> bMessage ;
|
|
string sMessage ; // univoco grazie a bMessage
|
|
|
|
RedisConnectionStatus()
|
|
: bConnected( false), bAuthenticated( false), bSelectedDB( false), bDisconnected( false),
|
|
bPublished( false), bSubscribed( false), bUnsubscribed( false), bMessage( false),
|
|
sMessage( "") {}
|
|
|
|
inline void Clear() {
|
|
bConnected = false ;
|
|
bAuthenticated = false ;
|
|
bSelectedDB = false ;
|
|
bDisconnected = false ;
|
|
bPublished = false ;
|
|
bSubscribed = false ;
|
|
bUnsubscribed = false ;
|
|
bMessage = false ;
|
|
sMessage = "" ;
|
|
}
|
|
|
|
} ;
|
|
|
|
// Classe per Connessione Asincrona
|
|
class RedisAsync {
|
|
|
|
public :
|
|
atomic<bool> m_bFreeConnection ;
|
|
redisAsyncContext* m_pRedisAsyncPubContext ;
|
|
redisAsyncContext* m_pRedisAsyncSubContext ;
|
|
int m_nDataBase ;
|
|
string m_sPassword ;
|
|
string m_sUser ;
|
|
atomic<bool> m_bPubLoopRunning ;
|
|
atomic<bool> m_bSubLoopRunning ;
|
|
RedisConnectionInfo m_ConnectionInfo ;
|
|
RedisConnectionStatus m_ConnectionSubStatus ;
|
|
RedisConnectionStatus m_ConnectionPubStatus ;
|
|
WSADATA m_wsaData ;
|
|
|
|
RedisAsync()
|
|
: m_bFreeConnection( true), m_pRedisAsyncPubContext( nullptr), m_pRedisAsyncSubContext( nullptr),
|
|
m_nDataBase( REDIS_MIN_DB), m_sPassword( ""), m_sUser( DEFAULT_USER),
|
|
m_bPubLoopRunning( false), m_bSubLoopRunning( false),
|
|
m_ConnectionInfo(), m_ConnectionSubStatus(), m_ConnectionPubStatus() {} ;
|
|
|
|
inline void Clear() {
|
|
m_bFreeConnection = true ;
|
|
redisAsyncFree( m_pRedisAsyncPubContext) ;
|
|
m_pRedisAsyncPubContext = nullptr ;
|
|
redisAsyncFree( m_pRedisAsyncSubContext) ;
|
|
m_pRedisAsyncSubContext = nullptr ;
|
|
m_nDataBase = REDIS_MIN_DB ;
|
|
m_sPassword = "" ;
|
|
m_sUser = DEFAULT_USER ;
|
|
m_bPubLoopRunning = false ;
|
|
m_bSubLoopRunning = false ;
|
|
m_ConnectionInfo.Clear() ;
|
|
m_ConnectionSubStatus.Clear() ;
|
|
m_ConnectionPubStatus.Clear() ;
|
|
}
|
|
|
|
public :
|
|
void RedisEventLoop( bool bIsSub) const ;
|
|
|
|
} ;
|
|
static array<RedisAsync, MAX_CONNECTION_NBR> vAsyncRedisClients ;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [Redis] Funzione Ciclo degli eventi per Read/Write su Socket
|
|
// ---------------------------------------------------------------------------
|
|
void
|
|
RedisAsync::RedisEventLoop( bool bIsPub) const
|
|
{
|
|
// Verifica della validità del contesto
|
|
redisAsyncContext* ctx = nullptr ;
|
|
if ( bIsPub)
|
|
ctx = m_pRedisAsyncPubContext ;
|
|
else
|
|
ctx = m_pRedisAsyncSubContext ;
|
|
if ( ctx == nullptr || ctx->err != 0)
|
|
return ; // se attivazione loop prima della inizializzazione callBack
|
|
|
|
// Recupero del file descriptor del socket TCP usato da Redis.
|
|
SOCKET sock = ctx->c.fd ;
|
|
// Imposto condizione di ascolto degli eventi
|
|
bool bLoopRunning = bIsPub ? m_bPubLoopRunning : m_bSubLoopRunning ;
|
|
// Creazione dei due insiemi per il file descriptor ( lettura e scrittura )
|
|
// r = read, w = write
|
|
fd_set rfds, wfds ;
|
|
// Timeout per funzione select { secondi, millisecondi}
|
|
// La funzione select attende che il socket sia pronto per lettura e scrittura
|
|
// Resituisce :
|
|
// SOCKET_ERROR in caso di errore
|
|
// 0 se scade il timeout
|
|
// > 0 se ci sono eventi da gestire
|
|
timeval timeout = {0, 100000 } ; // 100ms
|
|
|
|
// Ciclo
|
|
while ( bLoopRunning) {
|
|
// Se contesto non valido, interrompo il ciclo
|
|
if ( ctx == nullptr || ( ctx->c.flags & REDIS_DISCONNECTING))
|
|
break ;
|
|
// Recupero file descriptor di lettura e scrittura
|
|
FD_ZERO( &rfds) ;
|
|
FD_ZERO( &wfds) ;
|
|
FD_SET( sock, &rfds) ;
|
|
FD_SET( sock, &wfds) ;
|
|
// Controllo il risultato ottenuto
|
|
int nRet = select( 0, &rfds, &wfds, nullptr, &timeout) ;
|
|
if ( nRet == SOCKET_ERROR) {
|
|
// Se errore -> interruzione del ciclo
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Socket select() failed -> "} +
|
|
ToString( WSAGetLastError())).c_str()) ;
|
|
break ;
|
|
}
|
|
else if ( nRet > 0) {
|
|
// Se Socket pronto per lettura/ scrittua
|
|
if ( ctx != nullptr && ! ( ctx->c.flags & REDIS_DISCONNECTING) && ctx->c.fd != -1) {
|
|
if ( FD_ISSET( sock, &rfds))
|
|
redisAsyncHandleRead( ctx) ;
|
|
if ( FD_ISSET( sock, &wfds))
|
|
redisAsyncHandleWrite( ctx) ;
|
|
}
|
|
// Evito saturazione CPU
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
}
|
|
|
|
// Reset del timeOut ad ogni ciclo
|
|
timeout.tv_sec = 0 ;
|
|
timeout.tv_usec = 100000 ; // 100ms
|
|
}
|
|
|
|
return ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [Utility] Funzione di Split parametri per Stringa di Connessione
|
|
// ---------------------------------------------------------------------------
|
|
static bool
|
|
GetParamsFromConnectionString( const string& sConnection, RedisConnectionInfo& ConnectionInfo)
|
|
{
|
|
// Se la stringa di connessione è vuota, errore
|
|
if ( sConnection.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Empty Connection String") ;
|
|
return false ;
|
|
}
|
|
|
|
// Ricerco le informazioni Splittando la stringa di connessione per ","
|
|
string sHostPort, sParams ;
|
|
SplitFirst( sConnection, ",", sHostPort, sParams) ;
|
|
if ( ! sHostPort.empty()) {
|
|
// Host e Porta
|
|
string sHost, sPort ;
|
|
SplitFirst( sHostPort, ":", sHost, sPort) ;
|
|
if ( sHost.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Invalid Host") ;
|
|
return false ;
|
|
}
|
|
int nPort = DEFAULT_PORT ;
|
|
if ( ! sPort.empty()) {
|
|
if ( ! FromString( sPort, nPort)) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Invalid Port") ;
|
|
return false ;
|
|
}
|
|
}
|
|
ConnectionInfo.sHost = sHost ;
|
|
ConnectionInfo.nPort = nPort ;
|
|
}
|
|
|
|
// Recupero i parametri ( rimuovo tutti gli spazi )
|
|
sParams.erase( remove( sParams.begin(), sParams.end(), ' '), sParams.end()) ;
|
|
GetValInNotes( sParams, "serviceName", ",", ConnectionInfo.sServiceName) ;
|
|
GetValInNotes( sParams, "user", ",", ConnectionInfo.sUser) ;
|
|
GetValInNotes( sParams, "password", ",", ConnectionInfo.sPassword) ;
|
|
GetValInNotes( sParams, "DefaultDatabase", ",", ConnectionInfo.nDefaultDataBase) ;
|
|
if ( ConnectionInfo.nDefaultDataBase < 0 || ConnectionInfo.nDefaultDataBase > REDIS_MAX_DB) {
|
|
ConnectionInfo.Clear() ;
|
|
LOG_INFO( GetCmdLogger(), "Error : DB Out of range [0, 15]")
|
|
return false ;
|
|
}
|
|
GetValInNotes( sParams, "keepAlive", ",", ConnectionInfo.nKeepAlive) ;
|
|
GetValInNotes( sParams, "connectTimeout", ",", ConnectionInfo.nConnectTimeout) ;
|
|
GetValInNotes( sParams, "syncTimeout", ",", ConnectionInfo.nSyncTimeout) ;
|
|
GetValInNotes( sParams, "asyncTimeout", ",", ConnectionInfo.nAsyncTimeout) ;
|
|
GetValInNotes( sParams, "abortConnect", ",", ConnectionInfo.bAbortConnect) ;
|
|
GetValInNotes( sParams, "ssl", "," ,ConnectionInfo.bSsl) ;
|
|
GetValInNotes( sParams, "allowAdmin", ",", ConnectionInfo.bAllowAdmin) ;
|
|
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// [Utility] Funzione di Controllo per Id Connessione
|
|
// ---------------------------------------------------------------------------
|
|
static bool
|
|
CheckIdConnection( int nIdConnection)
|
|
{
|
|
if ( nIdConnection < 0 || nIdConnection >= MAX_CONNECTION_NBR) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Not a Valid id Connestion : [1, "} + ToString( MAX_CONNECTION_NBR) + "]").c_str())
|
|
return false ;
|
|
}
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando CONNECT Sincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisConnect( const string& sConnection, int& nIdConnection)
|
|
{
|
|
// Inizializzo una nuova connessione, recuperando l'Id
|
|
int nMyIdConnection = NO_CONNECTION ;
|
|
for ( int i = 0 ; i < int( vRedisClients.size()) ; ++ i) {
|
|
if ( vRedisClients[i].bFreeConnection) {
|
|
nMyIdConnection = i ;
|
|
break ;
|
|
}
|
|
}
|
|
if ( nMyIdConnection == NO_CONNECTION) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} +
|
|
ToString( MAX_CONNECTION_NBR) + " )").c_str())
|
|
return false ;
|
|
}
|
|
|
|
// Recupero i riferimenti necessari e blocco la connessione corrente
|
|
RedisSync& SyncRedisClient = vRedisClients[nMyIdConnection] ;
|
|
SyncRedisClient.bFreeConnection = false ;
|
|
RedisConnectionInfo& ConnectionInfo = SyncRedisClient.ConnectionInfo ;
|
|
|
|
// Recupero i parametri dalla stringa di connessione
|
|
if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) {
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// Se connessione senza TimeOut
|
|
if ( ConnectionInfo.nSyncTimeout <= 0) {
|
|
// Imposto il contesto per connesione sincrona
|
|
SyncRedisClient.pRedisContext = redisConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
|
|
if ( SyncRedisClient.pRedisContext == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
|
|
return false ;
|
|
}
|
|
else if ( SyncRedisClient.pRedisContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str())
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
}
|
|
// Se connessione con parametri
|
|
else {
|
|
// TimeOut per connessione
|
|
struct timeval TimeOutConnection{} ;
|
|
TimeOutConnection.tv_sec = static_cast<long>( ConnectionInfo.nSyncTimeout) ;
|
|
TimeOutConnection.tv_usec = static_cast<long>( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ;
|
|
|
|
// Imposto il contesto per connesione sincrona con TimeOut
|
|
SyncRedisClient.pRedisContext = redisConnectWithTimeout( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort, TimeOutConnection) ;
|
|
if ( SyncRedisClient.pRedisContext == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
|
|
return false ;
|
|
}
|
|
else if ( SyncRedisClient.pRedisContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str())
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// TimeOut per richiesta I/O
|
|
struct timeval TimeOutIO{} ;
|
|
TimeOutIO.tv_sec = static_cast<long>( ConnectionInfo.nSyncTimeout) ;
|
|
TimeOutIO.tv_usec = static_cast<long>( ( ConnectionInfo.nSyncTimeout - TimeOutConnection.tv_sec) * 1e6) ;
|
|
|
|
// Imposto TimeOut per I/O
|
|
int nResult = redisSetTimeout( SyncRedisClient.pRedisContext, TimeOutIO) ;
|
|
if ( nResult != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't set timeout I/O") ;
|
|
return false ;
|
|
}
|
|
}
|
|
|
|
// Verifico se la Connessione è di tipo Sentinel
|
|
redisReply* replySentinel = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SENTINEL get-master-addr-by-name %s",
|
|
ConnectionInfo.sServiceName.c_str()) ;
|
|
if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) {
|
|
if ( replySentinel != nullptr)
|
|
freeReplyObject( replySentinel) ;
|
|
}
|
|
else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr &&
|
|
replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) {
|
|
// --- Nodo sentinella
|
|
string sMasterHost = replySentinel->element[0]->str ;
|
|
int nMasterPort = DEFAULT_SENTINEL_MASTER_PORT ;
|
|
FromString( replySentinel->element[1]->str, nMasterPort) ;
|
|
freeReplyObject( replySentinel) ;
|
|
|
|
// Effettuo la connessione al master redis
|
|
freeReplyObject( SyncRedisClient.pRedisContext) ;
|
|
SyncRedisClient.pRedisContext = redisConnect( sMasterHost.c_str(), nMasterPort) ;
|
|
if ( SyncRedisClient.pRedisContext == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
|
|
return false ;
|
|
}
|
|
else if ( SyncRedisClient.pRedisContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str())
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "Connected to Redis !")
|
|
|
|
// Verifico se richiesta autenticazione
|
|
if ( ! ConnectionInfo.sPassword.empty()) {
|
|
redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "AUTH %s %s",
|
|
ConnectionInfo.sUser.c_str(), ConnectionInfo.sPassword.c_str()) ;
|
|
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Authentication failed") ;
|
|
if ( reply == nullptr)
|
|
freeReplyObject( reply) ;
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
freeReplyObject( reply) ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "Authenticated to Redis !")
|
|
|
|
// Seleziono il DataBase
|
|
redisReply* reply = ( redisReply*)redisCommand( SyncRedisClient.pRedisContext, "SELECT %d", ConnectionInfo.nDefaultDataBase) ;
|
|
if ( reply == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + SyncRedisClient.pRedisContext->errstr).c_str()) ;
|
|
return false ;
|
|
}
|
|
else if ( reply->type == REDIS_REPLY_ERROR) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + reply->str).c_str())
|
|
freeReplyObject( reply) ;
|
|
SyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
freeReplyObject( reply) ;
|
|
LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ;
|
|
|
|
// Restituisco l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua)
|
|
nIdConnection = nMyIdConnection + 1 ;
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando DISCONNECT Sincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisDisconnect( int nIdConnection)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( vRedisClients[nMyIdConnection].bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
|
|
// Pulisco la connessione corrente
|
|
vRedisClients[nMyIdConnection].Clear() ;
|
|
LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ;
|
|
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando SET Sincrono ( sola scrittura {key:string, val:string})
|
|
//----------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisSetValFromKey( int nIdConnection, const string& sKey, const string& sVal)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( vRedisClients[nMyIdConnection].bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la connessione corrente
|
|
RedisSync& RedisClient = vRedisClients[nMyIdConnection] ;
|
|
|
|
// Il valore associato alla chiave può essere solo di tipo stringa
|
|
redisReply* reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "SET %s %s", sKey.c_str(), sVal.c_str()) ;
|
|
if ( reply == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Null reply") ;
|
|
return false ;
|
|
}
|
|
else if ( reply->type != REDIS_REPLY_STATUS || string( reply->str ) != "OK") {
|
|
LOG_INFO( GetCmdLogger(), "Error SET : No answer") ;
|
|
freeReplyObject( reply) ;
|
|
return false ;
|
|
}
|
|
freeReplyObject( reply) ;
|
|
|
|
LOG_INFO( GetCmdLogger(), ( string{ "New Key Set : ["} + sKey + ":" + sVal + "]").c_str())
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando Get Sincrono ( sola lettura {key:string, val:string})
|
|
//----------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisGetValFromKey( int nIdConnection, const string& sKey, string& sVal)
|
|
{
|
|
sVal.clear() ;
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( vRedisClients[nMyIdConnection].bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la connessione corrente
|
|
RedisSync& RedisClient = vRedisClients[nMyIdConnection] ;
|
|
|
|
// Recupero il tipo associato alla chiave da Redis
|
|
redisReply* typeReply = ( redisReply*)redisCommand( RedisClient.pRedisContext, "TYPE %s", sKey.c_str()) ;
|
|
if ( typeReply == nullptr || typeReply->type != REDIS_REPLY_STATUS) {
|
|
string sErr = ( typeReply != nullptr ? string( typeReply->str) : "null reply") ;
|
|
if ( typeReply != nullptr)
|
|
freeReplyObject( typeReply) ;
|
|
LOG_INFO( GetCmdLogger(), ( "Error : Failed to get key type, " + sErr).c_str()) ;
|
|
return false ;
|
|
}
|
|
string sType = typeReply->str ;
|
|
freeReplyObject( typeReply) ;
|
|
|
|
// Determino il formato per la chiamata al DataBase Redis
|
|
// Per ora accettate solamente valori string
|
|
string sFormat = ( sType == KEY_TYPE_STRING ? "Get %s" : "") ;
|
|
if ( sFormat.empty()) {
|
|
LOG_ERROR( GetCmdLogger(), "Error : Not valid Key Type") ;
|
|
return false ;
|
|
}
|
|
|
|
// Effettuo la chiamata
|
|
redisReply* reply = nullptr ;
|
|
reply = ( redisReply*)redisCommand( RedisClient.pRedisContext, sFormat.c_str(), sKey.c_str()) ;
|
|
if ( reply == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Null reply") ;
|
|
return false ;
|
|
}
|
|
|
|
// Recupero il Valore
|
|
if ( sType == KEY_TYPE_STRING) {
|
|
if ( reply->type != REDIS_REPLY_STRING) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Failed to read the value") ;
|
|
freeReplyObject( reply) ;
|
|
return false ;
|
|
}
|
|
sVal = string( reply->str) ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Key Read : ["} + sKey + ":" + sVal + "]").c_str())
|
|
|
|
freeReplyObject( reply) ;
|
|
return true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comando SELECT Asincrono ( Selezione DB)
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisDbSElectionCallBack( redisAsyncContext* ctx, void* r, void*)
|
|
{
|
|
// Invio la richiesta
|
|
redisReply* reply = static_cast<redisReply*>( r) ;
|
|
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
|
|
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Db Selection -> "} + sErrMsg).c_str()) ;
|
|
return ;
|
|
}
|
|
// Imposto flag di Autenticazione
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
MyStatus->bSelectedDB = true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comando AUTH Asincrono ( Autenticazione)
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisAuthenticationCallBack( redisAsyncContext* ctx, void* r, void*)
|
|
{
|
|
// Invio la richiesta
|
|
redisReply* reply = static_cast<redisReply*>( r) ;
|
|
if ( reply == nullptr || reply->type == REDIS_REPLY_ERROR) {
|
|
string sErrMsg = ( reply != nullptr && reply->str != nullptr) ? reply->str : "Null answer" ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Authentication -> "} + sErrMsg).c_str()) ;
|
|
return ;
|
|
}
|
|
// Imposto flag di Autenticazione
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
MyStatus->bAuthenticated = true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comandi PUBLISH, SUBSCRIBE, MESSAGE, UNSUBSCRIBE
|
|
// Asincroni
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisPubSubUnsubMsgCallback( redisAsyncContext* ctx, void* r, void*)
|
|
{
|
|
// Verifico che il contesto sia definito
|
|
if ( ctx == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack")
|
|
return ;
|
|
}
|
|
|
|
// Recupero della risposta dal server redis ( nullptr nel caso di residui)
|
|
redisReply* reply = static_cast<redisReply*>( r) ;
|
|
if ( reply == nullptr)
|
|
return ;
|
|
|
|
LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked")
|
|
|
|
if ( reply->type == REDIS_REPLY_INTEGER) {
|
|
// Recupero lo Stato
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Imposto stato di pubblicazione
|
|
MyStatus->bPublished = true ;
|
|
// Recupero il numero di clients iscritti al canale
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Message published to "} + ToString( reply->integer) + " clients !").c_str()) ;
|
|
return ;
|
|
}
|
|
else if ( reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) {
|
|
// Recupero la tipologia di chiamata effettuata
|
|
const char* msgType = ( reply->element[0] != nullptr && reply->element[0]->str != nullptr ?
|
|
reply->element[0]->str : "") ;
|
|
|
|
// --- Se Messaggio
|
|
if ( strcmp( msgType, "message") == 0 && reply->elements == 3) {
|
|
// Messaggio ricevut da un canale
|
|
if ( reply->element[1] != nullptr && reply->element[1]->str != nullptr &&
|
|
reply->element[2] != nullptr && reply->element[2]->str != nullptr) {
|
|
string sChannel = reply->element[1]->str ;
|
|
string sMessage = reply->element[2]->str ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Message Received on ["} + sChannel + "] : " + sMessage).c_str())
|
|
}
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Errror : Invalid Message received")
|
|
return ;
|
|
}
|
|
}
|
|
// --- Se Subscribe
|
|
else if ( strcmp( msgType, "subscribe") == 0 && reply->elements >= 3) {
|
|
if ( reply->element[1] != nullptr && reply->element[1] != nullptr &&
|
|
reply->element[2] != nullptr) {
|
|
// Recupero lo Stato
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Imposto stato di Subscribe
|
|
MyStatus->bSubscribed = true ;
|
|
// Comunico il risultato
|
|
string sChannel = reply->element[1]->str ;
|
|
int nCount = reply->element[2]->integer ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Subscribed to ["} + sChannel + "]," +
|
|
" total subscribtions : " + ToString( nCount)).c_str()) ;
|
|
}
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Errror : Invalid Subscription reply")
|
|
return ;
|
|
}
|
|
}
|
|
// --- se Unsubscribe
|
|
else if ( strcmp( msgType, "unsubscribe") == 0 && reply->elements >= 3) {
|
|
if ( reply->element[1] != nullptr && reply->element[1] != nullptr &&
|
|
reply->element[2] != nullptr) {
|
|
// Recupero lo Stato
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Imposto stato di pubblicazione
|
|
MyStatus->bUnsubscribed = true ;
|
|
// Comunico il risultato
|
|
string sChannel = reply->element[1]->str ;
|
|
int nCount = reply->element[2]->integer ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Unsubscribed to ["} + sChannel + "]," +
|
|
" subscriptions left : " + ToString( nCount)).c_str())
|
|
}
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Errror : Invalid Unsubscription reply")
|
|
return ;
|
|
}
|
|
}
|
|
// --- Undefined
|
|
else {
|
|
LOG_INFO( GetCmdLogger(), "Error : Undefined reply in CallBack from Redis")
|
|
return ;
|
|
}
|
|
}
|
|
|
|
return ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per ricezione MESSAGE Asincrona
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
WaitMessageCallback( redisAsyncContext* ctx, void* r, void*)
|
|
{
|
|
// Verifico che il contesto sia definito
|
|
if ( ctx == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Redis context null in CallBack")
|
|
return ;
|
|
}
|
|
LOG_INFO( GetCmdLogger(), " --- RedisMessageCallback invoked")
|
|
|
|
// Recupero della risposta dal server redis
|
|
redisReply* reply = static_cast<redisReply*>( r) ;
|
|
if ( reply == nullptr)
|
|
return ;
|
|
|
|
if ( reply->type != REDIS_REPLY_ARRAY || reply->elements < 3)
|
|
LOG_INFO( GetCmdLogger(), "Error : Invalid Message CallBack reply")
|
|
else {
|
|
// Recupero il Messaggio ( se callBack riferita al tipo "message")
|
|
if ( reply->element[0] == nullptr || reply->element[0]->str == nullptr)
|
|
LOG_INFO( GetCmdLogger(), "Error : Null message type received")
|
|
else {
|
|
const char* msgType = reply->element[0]->str ;
|
|
if ( strcmp( msgType, "message") == 0) {
|
|
if ( reply->element[2] == nullptr || reply->element[2]->str == nullptr)
|
|
LOG_INFO( GetCmdLogger(), "Error : Null message received")
|
|
else {
|
|
// Recupero lo Stato
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
// Imposto stato di pubblicazione
|
|
MyStatus->bMessage = true ;
|
|
MyStatus->sMessage = reply->element[2]->str ;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comando CONNECT Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisConnectCallBack( const redisAsyncContext* ctx, int nStatus)
|
|
{
|
|
// Verifico che il contesto e lo stato siano validi
|
|
if ( ctx == nullptr || nStatus != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Async Disconnection CallBack failed ") ;
|
|
return ;
|
|
}
|
|
|
|
// Imposto flag di Connessione
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
MyStatus->bConnected = true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// [CB] Funzione di CallBack per comando DISCONNECT Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
static void
|
|
RedisDisconnectCallBack( const redisAsyncContext* ctx, int nStatus)
|
|
{
|
|
// Verifico che il contesto e lo stato siano validi
|
|
if ( ctx == nullptr || nStatus != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Async Disconnection CallBack failed ") ;
|
|
return ;
|
|
}
|
|
|
|
// Imposto flag di Disconnessione
|
|
RedisConnectionStatus* MyStatus = static_cast<RedisConnectionStatus*>( ctx->data) ;
|
|
if ( MyStatus == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : No Status detected") ;
|
|
return ;
|
|
}
|
|
MyStatus->bDisconnected = true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando CONNECT Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncConnect( const string& sConnection, int& nIdConnection)
|
|
{
|
|
// Inizializzo una nuova connessione, recuperando l'Id
|
|
int nMyIdConnection = NO_CONNECTION ;
|
|
for ( int i = 0 ; i < int( vAsyncRedisClients.size()) ; ++ i) {
|
|
if ( vAsyncRedisClients[i].m_bFreeConnection) {
|
|
nMyIdConnection = i ;
|
|
break ;
|
|
}
|
|
}
|
|
if ( nMyIdConnection == NO_CONNECTION) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : Limit connection Exceeded ( "} +
|
|
ToString( MAX_CONNECTION_NBR) + " )").c_str())
|
|
return false ;
|
|
}
|
|
|
|
// Recupero i riferimenti necessari e blocco la connessione corrente
|
|
RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ;
|
|
AsyncRedisClient.m_bFreeConnection = false ;
|
|
RedisConnectionInfo& ConnectionInfo = AsyncRedisClient.m_ConnectionInfo ;
|
|
|
|
// Recupero i parametri dalla stringa di connessione
|
|
if ( ! GetParamsFromConnectionString( sConnection, ConnectionInfo)) {
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// Inizializzazione libreria Winsock, necessaria per usare le funzionalità di rete su Windows,
|
|
// come socket TCP/IP. Versione utilizzata 2.2
|
|
int nStatus = WSAStartup( MAKEWORD( 2, 2), &AsyncRedisClient.m_wsaData) ;
|
|
if ( nStatus != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : WSAStartup -> "} + ToString( nStatus)).c_str()) ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// Verifico se si tratta di un nodo Sentinella
|
|
redisContext* sentinelCtx = redisConnect( AsyncRedisClient.m_ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
|
|
if ( sentinelCtx == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate redis context")
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
else if ( sentinelCtx->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + sentinelCtx->errstr).c_str())
|
|
redisFree( sentinelCtx) ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
redisReply* replySentinel = ( redisReply*)redisCommand( sentinelCtx, "SENTINEL get-master-addr-by-name %s",
|
|
ConnectionInfo.sServiceName.c_str()) ;
|
|
if ( replySentinel == nullptr || replySentinel->type != REDIS_REPLY_ARRAY || replySentinel->elements != 2) {
|
|
if ( replySentinel != nullptr)
|
|
freeReplyObject( replySentinel) ;
|
|
}
|
|
else if ( replySentinel->element[0] != nullptr && replySentinel->element[0]->str != nullptr &&
|
|
replySentinel->element[1] != nullptr && replySentinel->element[1]->str != nullptr) {
|
|
// --- Nodo sentinella
|
|
ConnectionInfo.sHost = replySentinel->element[0]->str ;
|
|
ConnectionInfo.nPort = DEFAULT_SENTINEL_MASTER_PORT ;
|
|
FromString( replySentinel->element[1]->str, ConnectionInfo.nPort) ;
|
|
freeReplyObject( replySentinel) ;
|
|
redisFree( sentinelCtx) ;
|
|
}
|
|
|
|
// Definisco le 2 connessioni asincroni con Host e Porta ( per PUBLISH e SUBSCRIBE/UNSUBSCRIBE)
|
|
AsyncRedisClient.m_pRedisAsyncPubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
|
|
AsyncRedisClient.m_pRedisAsyncSubContext = redisAsyncConnect( ConnectionInfo.sHost.c_str(), ConnectionInfo.nPort) ;
|
|
if ( AsyncRedisClient.m_pRedisAsyncPubContext == nullptr || AsyncRedisClient.m_pRedisAsyncSubContext == nullptr) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Can't allocate Redis Async context")
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
else if ( AsyncRedisClient.m_pRedisAsyncPubContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncPubContext->errstr).c_str())
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
else if ( AsyncRedisClient.m_pRedisAsyncSubContext->err != 0) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error : "} + AsyncRedisClient.m_pRedisAsyncSubContext->errstr).c_str())
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
|
|
// Definizione e partenza del ciclo degli eventi in un thread separato ( in BackGround)
|
|
AsyncRedisClient.m_bPubLoopRunning = true ;
|
|
AsyncRedisClient.m_bSubLoopRunning = true ;
|
|
thread tPubEventLoop( &RedisAsync::RedisEventLoop, &AsyncRedisClient, true) ;
|
|
thread tSubEventLoop( &RedisAsync::RedisEventLoop, &AsyncRedisClient, false) ;
|
|
tPubEventLoop.detach() ;
|
|
tSubEventLoop.detach() ;
|
|
|
|
// Imposto e Definisco la funzione di CallBack per contesto PUB/SUB per Connessione asincrona
|
|
AsyncRedisClient.m_pRedisAsyncPubContext->data = &AsyncRedisClient.m_ConnectionPubStatus ;
|
|
AsyncRedisClient.m_pRedisAsyncSubContext->data = &AsyncRedisClient.m_ConnectionSubStatus ;
|
|
redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisConnectCallBack) ;
|
|
redisAsyncSetConnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisConnectCallBack) ;
|
|
// Attendo la risposta di CallBack
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bConnected || ! AsyncRedisClient.m_ConnectionSubStatus.bConnected) {
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "Connected to Redis !")
|
|
#if DEBUG
|
|
double dMsConnectionTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsConnectionTime) + " ms").c_str())
|
|
#endif
|
|
|
|
// Se presente una Password, eseguo l'autenticazione
|
|
if ( AsyncRedisClient.m_sPassword.empty()) {
|
|
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisAuthenticationCallBack,
|
|
nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ;
|
|
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisAuthenticationCallBack,
|
|
nullptr, "AUTH %s %s", AsyncRedisClient.m_sUser.c_str(), AsyncRedisClient.m_sPassword.c_str()) ;
|
|
tStart = chrono::steady_clock::now() ;
|
|
tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated || ! AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated) {
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Autentication exceeded") ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "Authenticated to Redis !")
|
|
#if DEBUG
|
|
double dMsAuthenticationTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsAuthenticationTime) + " ms").c_str())
|
|
#endif
|
|
}
|
|
else {
|
|
AsyncRedisClient.m_ConnectionPubStatus.bAuthenticated = true ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.bAuthenticated = true ;
|
|
}
|
|
|
|
// Se Presente una Selezione del Database, eseguo la selezione
|
|
if ( AsyncRedisClient.m_nDataBase != DEFAULT_DATABASE) {
|
|
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDbSElectionCallBack,
|
|
nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ;
|
|
redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDbSElectionCallBack,
|
|
nullptr, "SELECT %d", AsyncRedisClient.m_nDataBase) ;
|
|
tStart = chrono::steady_clock::now() ;
|
|
tTimeOut = chrono::milliseconds( static_cast<int>( ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB || ! AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB) {
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout DB Selection exceeded") ;
|
|
AsyncRedisClient.Clear() ;
|
|
return false ;
|
|
}
|
|
}
|
|
LOG_INFO( GetCmdLogger(), "DB Selected to Redis !") ;
|
|
#if DEBUG
|
|
double dMsDbSelectionTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDbSelectionTime) + " ms").c_str())
|
|
#endif
|
|
}
|
|
else {
|
|
AsyncRedisClient.m_ConnectionPubStatus.bSelectedDB = true ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.bSelectedDB = true ;
|
|
}
|
|
|
|
// Imposto e Definisco la funzione di CallBack per la Disconnessione Pub/Sub asincrona
|
|
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncPubContext, RedisDisconnectCallBack) ;
|
|
redisAsyncSetDisconnectCallback( AsyncRedisClient.m_pRedisAsyncSubContext, RedisDisconnectCallBack) ;
|
|
|
|
// Restituisco l'Id della connessione [1, MAX_CONNECTION_NBR] ( per Lua)
|
|
nIdConnection = nMyIdConnection + 1 ;
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando DISCONNECT Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncDisconnect( int nIdConnection)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la connessione
|
|
RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ;
|
|
|
|
// Effettuo la Disconnesione asincrona ( libera la memoria in automatico)
|
|
redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncPubContext) ;
|
|
redisAsyncDisconnect( AsyncRedisClient.m_pRedisAsyncSubContext) ;
|
|
|
|
// Attendo la CallBack di disconnessione
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bDisconnected ||
|
|
! AsyncRedisClient.m_ConnectionSubStatus.bDisconnected) {
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Connection exceeded") ;
|
|
return false ;
|
|
}
|
|
}
|
|
|
|
// Rendo libera la connessione
|
|
AsyncRedisClient.m_bFreeConnection = true ;
|
|
AsyncRedisClient.Clear() ;
|
|
|
|
// Pulisco WinSock2 ( timer ?)
|
|
// ???
|
|
|
|
LOG_INFO( GetCmdLogger(), "Disconnected to Redis !") ;
|
|
#if DEBUG
|
|
double dMsDisconnectionTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsDisconnectionTime) + " ms").c_str())
|
|
#endif
|
|
return true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Funzione per comando PUBLISH Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncPublish( int nIdConnection, const string& sChannel, const string& sMessage)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun canale -> errore
|
|
if ( sChannel.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun messaggio -> warning
|
|
if ( sMessage.empty())
|
|
LOG_INFO( GetCmdLogger(), "Warning : Empty Message") ;
|
|
|
|
// Recupero la connessione
|
|
RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ;
|
|
|
|
// Eseguo il comando PUBLISH
|
|
AsyncRedisClient.m_ConnectionPubStatus.bPublished = false ;
|
|
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncPubContext, RedisPubSubUnsubMsgCallback, nullptr,
|
|
"PUBLISH %s %s", sChannel.c_str(), sMessage.c_str()) != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Error: redisAsyncCommand PUBLISH failed")
|
|
return false ;
|
|
}
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionPubStatus.bPublished) {
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Subscribe exceeded") ;
|
|
return false ;
|
|
}
|
|
}
|
|
#if DEBUG
|
|
double dMsPublishTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsPublishTime) + " ms").c_str())
|
|
#endif
|
|
return true ;
|
|
}
|
|
|
|
//----------------------------------------------------------------------------
|
|
// Funzione per comando SUBSCRIBE Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncSubscribe( int nIdConnection, const string& sChannel)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun canale, errore
|
|
if ( sChannel.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel") ;
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la connessione
|
|
RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ;
|
|
|
|
// Eseguo il comando SUBSCRIBE
|
|
AsyncRedisClient.m_ConnectionPubStatus.bSubscribed = false ;
|
|
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
|
|
( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand SUBSCRIBE failed")
|
|
return false ;
|
|
}
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bSubscribed) {
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Subscribe exceeded") ;
|
|
return false ;
|
|
}
|
|
}
|
|
#if DEBUG
|
|
double dMsSubscribeTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsSubscribeTime) + " ms").c_str())
|
|
#endif
|
|
return true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Funzione per comando UNSUBSCRIBE Asincrono
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncUnsubscribe( int nIdConnection, const string& sChannel)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun canale, errore
|
|
if ( sChannel.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la connessione
|
|
RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ;
|
|
|
|
// Eseguo il comando UNSUBSCRIBE
|
|
AsyncRedisClient.m_ConnectionPubStatus.bUnsubscribed = false ;
|
|
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, RedisPubSubUnsubMsgCallback, nullptr,
|
|
( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), "Error : redisAsyncCommand UNSUBSCRIBE failed")
|
|
return false ;
|
|
}
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( AsyncRedisClient.m_ConnectionInfo.nAsyncTimeout)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bUnsubscribed) {
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), "Timeout Unsubscribe exceeded") ;
|
|
return false ;
|
|
}
|
|
}
|
|
#if DEBUG
|
|
double dMsUnsubscribeTime = chrono::duration_cast<chrono::milliseconds>( chrono::steady_clock::now() - tStart).count() ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ " + "} + ToString( dMsUnsubscribeTime) + " ms").c_str())
|
|
#endif
|
|
return true ;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// La funzione esegue una Subscribe e una Unsubscribe
|
|
// Se entro il tempo stabilito riceve un messaggio, lo comunica, altrimenti errore
|
|
// ---------------------------------------------------------------------------
|
|
bool
|
|
ExeRedisAsyncSubscribeOneMessage( int nIdConnection, const string& sChannel, double dMaxTimeOut,
|
|
string& sMessage)
|
|
{
|
|
// Verifico che l'Id di connesione sia valido
|
|
int nMyIdConnection = nIdConnection - 1 ;
|
|
if ( ! CheckIdConnection( nMyIdConnection))
|
|
return false ;
|
|
// Verifico che la connessione sia attiva
|
|
if ( vAsyncRedisClients[nMyIdConnection].m_bFreeConnection) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Id Connection not found")
|
|
return false ;
|
|
}
|
|
// Se non ho alcun canale, errore
|
|
if ( sChannel.empty()) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Not a valid Channel")
|
|
return false ;
|
|
}
|
|
// Controllo validità tempo di TimeOut
|
|
if ( dMaxTimeOut <= 0.) {
|
|
LOG_INFO( GetCmdLogger(), "Error : Invalid timeout value") ;
|
|
return false ;
|
|
}
|
|
|
|
// Recupero la connessione
|
|
RedisAsync& AsyncRedisClient = vAsyncRedisClients[nMyIdConnection] ;
|
|
|
|
// Invio del comando di SUBSCRIBE
|
|
AsyncRedisClient.m_ConnectionSubStatus.bMessage = false ;
|
|
AsyncRedisClient.m_ConnectionSubStatus.sMessage = "" ;
|
|
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr,
|
|
( "SUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to subscribe to"} + sChannel).c_str())
|
|
return false ;
|
|
}
|
|
|
|
// Attivo il TimeOut per l'attesa del messaggio
|
|
auto tStart = chrono::steady_clock::now() ;
|
|
auto tTimeOut = chrono::milliseconds( static_cast<int>( dMaxTimeOut)) ;
|
|
while ( ! AsyncRedisClient.m_ConnectionSubStatus.bMessage) {
|
|
this_thread::sleep_for( chrono::milliseconds( 10)) ;
|
|
if ( chrono::steady_clock::now() - tStart > tTimeOut) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Timeout : No Message received on ["} + sChannel + "]").c_str())
|
|
return false ;
|
|
}
|
|
}
|
|
|
|
// Invio del comando UNSUBSCRIBE
|
|
if ( redisAsyncCommand( AsyncRedisClient.m_pRedisAsyncSubContext, WaitMessageCallback, nullptr,
|
|
( "UNSUBSCRIBE " + sChannel).c_str()) != REDIS_OK) {
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Error: Failed to Unsubscribe to "} + sChannel).c_str())
|
|
return false ;
|
|
}
|
|
|
|
// Controllo se ho ricevuto un messaggio
|
|
if ( AsyncRedisClient.m_ConnectionSubStatus.bMessage) {
|
|
sMessage = AsyncRedisClient.m_ConnectionSubStatus.sMessage ;
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Message received on ["} + sChannel + "]: " + sMessage).c_str())
|
|
return true ;
|
|
}
|
|
|
|
LOG_INFO( GetCmdLogger(), ( string{ "Timeout: No message received on ["} + sChannel + "]").c_str()) ;
|
|
return false ;
|
|
}
|