using Newtonsoft.Json; using StackExchange.Redis; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace IOB_UT_NEXT { public class RedisIobCache { #region Public Fields /// /// Hash REDIS x dati IOB /// public string redIobKey = ""; /// /// Hash REDIS x dati IOB /// public string redIobTrackKey = ""; #endregion Public Fields #region Public Constructors /// /// init classe "implicita" /// /// /// public RedisIobCache() { initRedisConn(); initMemKeys(); setVetoPeriod(); } /// /// init classe gestione dati IOB su Redis /// /// IP/nome server /// Cod IOB /// Tipo di IOB /// /// Minima differenza in secondi x considerare variazione dati DataOra /// public RedisIobCache(string codServer, string codIob, string tipoIob, int minDeltaS) { initRedisConn(); // init dati di base... currCodIob = codIob; currIobType = tipoIob; initMemKeys(); setVetoPeriod(); // init oggetti gestiti ServerMpStatus newSrvStatus = new ServerMpStatus() { IP = codServer, online = false }; servStatus = newSrvStatus; // init oggetto IOB IobWinStatus newIobStatus = new IobWinStatus() { CodIob = currCodIob, IobType = currIobType, online = false, minDeltaSec = minDeltaS }; iobStatus = newIobStatus; } #endregion Public Constructors #region Public Enums /// /// Tipologia di ordinamento x liste KVP /// public enum kvpOrderBy { /// /// Ordinamento ASCending per KEY /// KeyAsc, /// /// Ordinamento DESCending per KEY /// KeyDesc, /// /// Ordinamento ASCending per VAL /// ValAsc, /// /// Ordinamento DESCending per VAL /// ValDesc } #endregion Public Enums #region Public Properties /// /// Oggetto DB REDIS corrente /// public IDatabase currDb { get { IDatabase answ; // se già valorizzato uso oggetto private... if (_currDB != null) { answ = _currDB; } else { // init DB (sullo 0) answ = connRedis.GetDatabase(); int dbNum = baseUtils.CRI("redisDb"); // gestione override... if (dbNum >= 0) { // in questo caso uso il DB configurato in app.config... answ = connRedis.GetDatabase(dbNum); } _currDB = answ; Logging.Instance.Info($"Apertura di un Redis Database: {dbNum}"); } // restituisco oggetto DB return answ; } } /// /// Accesso all'oggetto stato IOB da esterno /// public IobWinStatus iobStatus { get { IobWinStatus answ = null; // cerco in REDIS string rawData = getRSV(redIobKey); // se non ci fosse creo e salvo if (string.IsNullOrEmpty(rawData)) { answ = new IobWinStatus() { CodIob = currCodIob, IobType = currIobType, online = false }; // salvo serializzando... rawData = JsonConvert.SerializeObject(answ); setRSV(redIobKey, rawData); } else { // deserializzo answ = JsonConvert.DeserializeObject(rawData); } return answ; } set { string rawData = JsonConvert.SerializeObject(value); saveAndSendMessage(redIobKey, redIobChannel, rawData); } } /// /// Message Dispatcher: oggetto comunicazione pub/sub via REDIS channels corrente /// public ISubscriber messageDisp { get { ISubscriber answ; // se già valorizzato uso oggetto private... if (_currSub != null) { answ = _currSub; } else { // init DB (sullo 0) answ = connRedis.GetSubscriber(); _currSub = answ; Logging.Instance.Info($"Apertura di un Redis Message Subscriber"); } // restituisco oggetto DB return answ; } } /// /// Restituisce numero record in Redis DB /// public long numRecRedis { get { long answ = 0; try { foreach (var ep in connRedis.GetEndPoints()) { var server = connRedis.GetServer(ep); answ += server.DatabaseSize(); } } catch (Exception exc) { Logging.Instance.Error($"numRecRedis {exc}"); } return answ; } } /// /// Accesso all'oggetto stato server da esterno /// public ServerMpStatus servStatus { get { ServerMpStatus answ = null; // cerco in REDIS string rawData = getRSV(redServKey); // se non ci fosse creo e salvo if (string.IsNullOrEmpty(rawData)) { answ = new ServerMpStatus() { online = false }; // salvo serializzando... rawData = JsonConvert.SerializeObject(answ); setRSV(redServKey, rawData); } else { // deserializzo answ = JsonConvert.DeserializeObject(rawData); } return answ; } set { string rawData = JsonConvert.SerializeObject(value); setRSV(redServKey, rawData); } } #endregion Public Properties #region Public Methods /// /// Pulisce gli eventi più vecchi di un certo numero di minuti sia dal Sorted Set che dalla Hash Table. /// /// Chiave ZSet eventi tracciati per recupero recenti /// Chiave hashset valore eventi /// Il numero di minuti oltre il quale gli eventi sono considerati "scaduti". public async Task CleanExpiredEventsAsync(string rkeyTS, string rkeyHash, int minutesOldThreshold) { long cutoffTimeMillis = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - (long)TimeSpan.FromMinutes(minutesOldThreshold).TotalMilliseconds; // PRIMA: Recupera gli ID degli eventi che saranno rimossi dal Sorted Set. // Facciamo questo per poter poi rimuovere i loro dettagli dalla Hash Table. // Utilizziamo ZRangeByScoreAsync con un intervallo da -infinito fino al cutoffTimeMillis. RedisValue[] eventIdsToClean = await currDb.SortedSetRangeByScoreAsync( rkeyTS, //start: double.NegativeInfinity, // Per ottenere tutti gli elementi dal più vecchio, uso default stop: cutoffTimeMillis, exclude: Exclude.None, order: Order.Ascending // Non strettamente necessario, ma può aiutare per debug ); if (eventIdsToClean == null || eventIdsToClean.Length == 0) { Logging.Instance.Debug("Nessun evento scaduto da pulire."); return; } // Operazione 1 nel batch: Rimuovi gli eventi scaduti dal Sorted Set. // ZRemRangeByScoreAsync rimuove tutti i membri con score <= al max specificato. await currDb.SortedSetRemoveRangeByScoreAsync(rkeyTS, double.NegativeInfinity, cutoffTimeMillis); // Operazione 2 nel batch: Rimuovi i dettagli corrispondenti dalla Hash Table. // Dobbiamo convertire i RedisValue[] in string[] per HDelAsync che accetta params RedisValue[] RedisValue[] eventIdsAsRedisValues = eventIdsToClean.Select(id => (RedisValue)id.ToString()).ToArray(); await currDb.HashDeleteAsync(rkeyHash, eventIdsAsRedisValues); Logging.Instance.Debug($"Pulizia completata. Rimossi {eventIdsToClean.Length} eventi più vecchi di {minutesOldThreshold} minuti."); } /// /// Effettua comaprazione x VALORE in KVP ASC /// /// /// /// public int CompareVal(KeyValuePair x, KeyValuePair y) { return x.Value.CompareTo(y.Value); } /// /// Effettua comaprazione x VALORE in KVP DESC /// /// /// /// public int CompareValDesc(KeyValuePair x, KeyValuePair y) { return y.Value.CompareTo(x.Value); } /// /// Deserializzazione di un valore string in oggetto generico /// /// /// public object deserializeVal(string serVal) { object answ = ""; try { answ = JsonConvert.DeserializeObject(serVal); } catch (Exception exc) { Logging.Instance.Error($"deserializeVal {exc}"); } return answ; } public int getKReqCount(string keyReq) { string keyReqHash = redHash($"IOB:{currCodIob}:KRCOUNT:{keyReq}"); int answ = 0; string rawData = getRSV(keyReqHash); if (!string.IsNullOrEmpty(rawData)) { int.TryParse(rawData, out answ); } return answ; } /// /// Restituisce una chiave COUNTER in RedisCache /// /// /// public int getRCnt(string chiave) { int answInt = 0; string answ = ""; try { answ = currDb.StringGet(chiave); answInt = Convert.ToInt32(answ); } catch (Exception exc) { Logging.Instance.Error($"getRCnt {exc}"); } return answInt; } /// /// Recupera gli eventi degli ultimi minuti specificati e ne somma la quantità di dati. /// /// Chiave ZSet eventi tracciati per recupero recenti /// Chiave hashset valore eventi /// Il numero di minuti da cui recuperare gli eventi. /// Una tupla contenente la quantità totale di dati scambiati e un elenco degli eventi trovati. public async Task<(long TotalDataExchanged, List<(string EventId, long Data)> EventList)> GetRecentEventsAndSumAsync(string rkeyTS, string rkeyHash, int minutesBack) { long currentTimeMillis = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); long fiveMinutesAgoMillis = currentTimeMillis - (long)TimeSpan.FromMinutes(minutesBack).TotalMilliseconds; // Recupera gli ID degli eventi dal Sorted Set nell'intervallo di tempo // `SortedSetRangeByScoreAsync` restituisce i membri il cui score rientra nell'intervallo. // I membri sono RedisValue, quindi potrebbero essere necessari .ToString() o .ToByteArray() RedisValue[] eventIds = await currDb.SortedSetRangeByScoreAsync( rkeyTS, start: fiveMinutesAgoMillis, stop: currentTimeMillis, exclude: Exclude.None, // Includi entrambi gli estremi order: Order.Ascending // Ordina per timestamp crescente ); if (eventIds == null || eventIds.Length == 0) { return (0, new List<(string, long)>()); } // Prepara l'array di RedisValue per la query HMGET RedisValue[] eventIdsAsRedisValues = eventIds.ToList().Select(id => (RedisValue)id.ToString()).ToArray(); // Recupera le quantità di dati dalla Hash Table per gli ID trovati // HMGET restituisce un array di RedisValue corrispondente ai field richiesti. RedisValue[] dataQuantities = await currDb.HashGetAsync(rkeyHash, eventIdsAsRedisValues); long totalDataExchanged = 0; var eventList = new List<(string EventId, long Data)>(); for (int i = 0; i < eventIds.Length; i++) { string eventId = eventIds[i].ToString(); RedisValue quantityRedisValue = dataQuantities[i]; if (quantityRedisValue.HasValue && long.TryParse(quantityRedisValue.ToString(), out long quantity)) { totalDataExchanged += quantity; eventList.Add((eventId, quantity)); } else { Logging.Instance.Debug("Avviso: Dati mancanti o non validi per l'evento ID: {eventId}"); } } Logging.Instance.Debug("Totale dati scambiati negli ultimi {minutesBack} minuti: {totalDataExchanged}"); Logging.Instance.Debug("Elenco eventi trovati: {eventList.Count}"); return (totalDataExchanged, eventList); } /// /// Restituisce un pò di info sul server redis connesso /// /// public string getRedisInfoData() { string answ = ""; StringBuilder sb = new StringBuilder(); try { sb.AppendLine($"Configuration: {connRedis.Configuration}"); sb.AppendLine($"Connected: {connRedis.IsConnected}"); sb.AppendLine($"ClientName: {connRedis.ClientName}"); sb.AppendLine($"Total Ops: {connRedis.OperationCount}"); sb.AppendLine($"Status: {connRedis.GetStatus()}"); answ = sb.ToString(); } catch (Exception exc) { Logging.Instance.Error($"getRedisInfoData {exc}"); } return answ; } /// /// Restituisce un set KVP (Key Value Pair) salvati in RedisCache /// /// /// public RedisValue[] getRKeys(RedisKey[] chiavi) { RedisValue[] answ = null; try { answ = currDb.StringGet(chiavi); } catch (Exception exc) { Logging.Instance.Error($"getRKeys {exc}"); } return answ; } /// /// Restituisce una chiave salvata in RedisCache /// /// /// public string getRSV(string chiave) { string answ = ""; try { answ = currDb.StringGet(chiave); } catch (Exception exc) { Logging.Instance.Info($"getRSV {exc}"); } return answ; } /// /// Conta num oggetti currDb redis che rispondono a pattern /// /// ** = tutti /// public int redCountKey(string keyPattern) { int answ = 0; // cerco se ci sia valore in redis... se vuoto = ALL... keyPattern = string.IsNullOrEmpty(keyPattern) ? "**" : keyPattern; try { foreach (var ep in connRedis.GetEndPoints()) { var server = connRedis.GetServer(ep); foreach (var key in server.Keys(pattern: keyPattern)) { answ++; } } } catch (Exception exc) { Logging.Instance.Error($"redCountKey {exc}"); } return answ; } /// /// Elimina una key (hash, string) /// /// /// public bool redDelKey(string key) { bool answ = false; // cerco se ci sia valore in redis... try { RedisKey chiave = key; currDb.KeyDelete(chiave); answ = true; } catch (Exception exc) { Logging.Instance.Error($"redDelKey {exc}"); } return answ; } /// /// Flush completo currDb redis /// /// ** = tutti /// public bool redFlushKey(string keyPattern) { bool answ = false; // cerco se ci sia valore in redis... se vuoto = ALL... keyPattern = string.IsNullOrEmpty(keyPattern) ? "**" : keyPattern; try { foreach (var ep in connRedis.GetEndPoints()) { var server = connRedis.GetServer(ep); var keys = server.Keys(database: baseUtils.CRI("redisDb"), pattern: $"{keyPattern}*"); foreach (var key in keys) { currDb.KeyDelete(key); } } answ = true; } catch (Exception exc) { Logging.Instance.Error($"redFlushKey {exc}"); } return answ; } /// /// Restituisce oggetti currDb redis che rispondono a pattern /// /// ** = tutti /// Tipo di ordinamento per kvp /// public List> redGetCounterByKey(string keyPattern, kvpOrderBy orderBy) { int numAnsw = redCountKey(keyPattern); RedisKey[] chiavi = new RedisKey[numAnsw]; List> answ = new List>(); // se vuoto = ALL... keyPattern = string.IsNullOrEmpty(keyPattern) ? "**" : keyPattern; // recupero in primis elenco chiavi try { int i = 0; foreach (var ep in connRedis.GetEndPoints()) { var server = connRedis.GetServer(ep); foreach (var key in server.Keys(pattern: keyPattern)) { chiavi[i] = key; i++; } } } catch (Exception exc) { Logging.Instance.Error($"redGetCounterByKey 01 {exc}"); } // ora recupero valori! var valori = getRKeys(chiavi); int currVal = 0; // popolo rispsota try { for (int i = 0; i < numAnsw; i++) { Int32.TryParse(valori[i], out currVal); answ.Add(new KeyValuePair(chiavi[i], currVal)); } } catch (Exception exc) { Logging.Instance.Error($"redGetCounterByKey 02 {exc}"); } // se richiesto riordino... switch (orderBy) { case kvpOrderBy.KeyAsc: answ.Sort(CompareKey); break; case kvpOrderBy.KeyDesc: answ.Sort(CompareKeyDesc); break; case kvpOrderBy.ValAsc: answ.Sort(CompareVal); break; case kvpOrderBy.ValDesc: answ.Sort(CompareValDesc); break; default: break; } return answ; } /// /// Recupera tutti i valori dalla hash /// /// /// public KeyValuePair[] redGetHash(string hashKey) { KeyValuePair[] answ = new KeyValuePair[1]; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; HashEntry[] valori = currDb.HashGetAll(chiave); answ = new KeyValuePair[valori.Length]; int i = 0; foreach (HashEntry item in valori) { answ[i] = new KeyValuePair(item.Name, item.Value); i++; } } catch (Exception exc) { Logging.Instance.Error($"redGetHash {exc}"); } return answ; } /// /// Recupera tutti i valori dalla hash in formato Dictionary /// /// /// public Dictionary redGetHashDict(string hashKey) { Dictionary answ = new Dictionary(); // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; HashEntry[] valori = currDb.HashGetAll(chiave); foreach (HashEntry item in valori) { answ.Add(item.Name, item.Value); } } catch (Exception exc) { Logging.Instance.Error($"redGetHashDict {exc}"); } return answ; } /// /// Recupera UN SINGOLO VALORE dalla hash per un dato field /// /// /// /// public string redGetHashField(string hashKey, string hashField) { string answ = ""; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; RedisValue campo = hashField; RedisValue valOut = currDb.HashGet(chiave, campo); answ = valOut.ToString(); } catch (Exception exc) { Logging.Instance.Error($"redGetHashField {exc}"); } return answ; } /// /// Nome della variabile HASH da utilizzare (dato CodModulo / Server / DB impiegato da /// funzionalita' DbConfig) + keyName richiesto... /// public string redHash(string keyName) { keyName = keyName.Replace("\\", "_"); string answ = keyName; try { answ = string.Format($"{baseUtils.CRS("appName")}:{keyName}"); } catch (Exception exc) { Logging.Instance.Error($"redHash {exc}"); } return answ; } /// /// Verifica se ci siano valori nella hash indicata... /// /// /// public bool redHashPresent(RedisKey key) { bool answ = false; // cerco se ci sia valore in redis... try { answ = currDb.HashGetAll(key).Length > 0; } catch (Exception exc) { Logging.Instance.Error($"redHashPresent {exc}"); } return answ; } /// /// Verifica se ci siano valori nella hash indicata (string) /// /// /// public bool redHashPresentSz(string key) { bool answ = false; try { RedisKey chiave = key; answ = redHashPresent(chiave); } catch (Exception exc) { Logging.Instance.Error($"redHashPresentSz {exc}"); } return answ; } /// /// Incremento conteggio di un valore dentro una hash /// /// chiave /// valore con conteggio da tracciare /// scadenza preimpostata hash datetime, se null NON scade /// valore incremento desiderato (default 1) /// public bool redIncrHashCount(string hashKey, string hashField, DateTime? hashExpire, double valIncr = 1) { bool answ = false; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; currDb.HashIncrement(chiave, hashField, valIncr); if (hashExpire != null) { currDb.KeyExpire(chiave, hashExpire); } } catch (Exception exc) { Logging.Instance.Error($"redIncrHashCount {exc}"); } return answ; } /// /// Verifica se ci siano valori nella KEY indicata... /// /// /// public bool redKeyPresent(RedisKey key) { bool answ = false; // cerco se ci sia valore in redis... try { answ = currDb.KeyExists(key); } catch (Exception exc) { Logging.Instance.Error($"redKeyPresent {exc}"); } return answ; } /// /// Verifica se ci siano valori nella KEY indicata (string) /// /// /// public bool redKeyPresentSz(string key) { bool answ = false; try { RedisKey chiave = key; answ = redKeyPresent(chiave); } catch (Exception exc) { Logging.Instance.Error($"redKeyPresentSz {exc}"); } return answ; } /// /// Conteggio elementi in QUEUE (FIFO) /// /// public long redQueueCount(RedisKey queueName) { return currDb.ListLength(queueName); } /// /// Recupero valore in QUEUE (FIFO) /// /// public RedisValue redQueuePop(RedisKey queueName) { return currDb.ListLeftPop(queueName); } /// /// Recupero list di TUTTI i valori in QUEUE (FIFO) /// /// /// num max di elementi da recuperare public List redQueuePopAll(RedisKey queueName) { // vecchio metodo (NON resetta) #if false long nCount = currDb.ListLength(queueName); List listData = currDb.ListRange(queueName, 0, nCount).ToList(); return listData; #endif // lettura + reset in blocco var listData = currDb.ListRange(queueName, 0, -1).ToList(); if (listData.Count > 0) { currDb.KeyDelete(queueName); // remove the entire list } return listData; // in alternativa lettura 1:1 #if false var results = new List(); RedisValue item; while ((item = currDb.ListLeftPop(queueName)) != RedisValue.Null) { results.Add(item); } return results; #endif } /// /// Recupero una list di valori in QUEUE (FIFO) /// /// /// num max di elementi da recuperare public List redQueuePopList(RedisKey queueName, int maxElem) { // vecchia modalità senza rimozione #if false long nCount = currDb.ListLength(queueName); nCount = nCount < maxElem ? nCount : maxElem; List listData = currDb.ListRange(queueName, 0, nCount).ToList(); return listData; #endif // nuovo metodo con rimozione var results = new List(maxElem); for (int i = 0; i < maxElem; i++) { var item = currDb.ListLeftPop(queueName); if (item.IsNull) break; // queue empty results.Add(item); } return results; } /// /// Scrittura valore in QUEUE (FIFO) /// /// /// public long redQueuePush(RedisKey queueName, RedisValue value) { long qLen = currDb.ListRightPush(queueName, value); return qLen; } /// /// RIMUOVE un singolo valore (riga) dalla hash /// /// /// /// public string redRemoveHashField(string hashKey, string hashField) { string answ = ""; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; RedisValue campo = hashField; RedisValue valOut = currDb.HashDelete(chiave, campo); answ = valOut.ToString(); } catch (Exception exc) { Logging.Instance.Error($"redRemoveHashField{Environment.NewLine}{exc}"); } return answ; } /// /// Salvataggio di una hash di valori /// /// chiave /// valori /// scadenza preimpostata hash datetime, se null NON scade /// public bool redSaveHash(string hashKey, KeyValuePair[] hashFields, DateTime? hashExpire) { bool answ = false; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; answ = redSaveHash(hashKey, hashFields); if (hashExpire != null) { currDb.KeyExpire(chiave, hashExpire); } } catch (Exception exc) { Logging.Instance.Error($"redSaveHash {exc}"); } return answ; } /// /// Salvataggio di una hash di valori /// /// chiave /// valori /// public bool redSaveHash(string hashKey, KeyValuePair[] hashFields) { bool answ = false; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; HashEntry[] valori = new HashEntry[hashFields.Length]; int i = 0; foreach (KeyValuePair kvp in hashFields) { valori[i] = new HashEntry(kvp.Key, kvp.Value); i++; } currDb.HashSet(chiave, valori); answ = true; } catch (Exception exc) { Logging.Instance.Error($"redSaveHash {exc}"); } return answ; } /// /// Salvataggio di una hash di valori /// /// chiave /// valori /// /// scadenza preimpostata hash (secondi) | default = -1 (non scade) /// /// public bool redSaveHash(string hashKey, KeyValuePair[] hashFields, double expireSeconds = -1) { bool answ = false; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; answ = redSaveHash(hashKey, hashFields); if (expireSeconds > 0) { currDb.KeyExpire(chiave, DateTime.Now.AddSeconds(expireSeconds)); } } catch (Exception exc) { Logging.Instance.Error($"redSaveHash {exc}"); } return answ; } /// /// Salvataggio di una hash di valori in formato Dictionary /// /// chiave /// valori /// public bool redSaveHashDict(string hashKey, Dictionary hashFields) { bool answ = false; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; HashEntry[] valori = new HashEntry[hashFields.Count]; int i = 0; foreach (KeyValuePair kvp in hashFields) { valori[i] = new HashEntry(kvp.Key, kvp.Value); i++; } currDb.HashSet(chiave, valori); answ = true; } catch (Exception exc) { Logging.Instance.Error($"redSaveHashDict {exc}"); } return answ; } /// /// Salvataggio di una hash di valori in formato Dictionary /// /// chiave /// valori /// /// scadenza preimpostata hash (secondi) | default = -1 (non scade) /// /// public bool redSaveHashDict(string hashKey, Dictionary hashFields, double expireSeconds = -1) { bool answ = false; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; answ = redSaveHashDict(hashKey, hashFields); if (expireSeconds > 0) { currDb.KeyExpire(chiave, DateTime.Now.AddSeconds(expireSeconds)); } //answ = true; } catch (Exception exc) { Logging.Instance.Error($"redSaveHashDict {exc}"); } return answ; } /// /// Salvataggio di una hash di valori in formato Dictionary /// /// chiave /// valori /// scadenza hash COME DATETIME (NULL = NON SCADE) /// public bool redSaveHashDict(string hashKey, Dictionary hashFields, DateTime? expireDT) { bool answ = false; // cerco se ci sia valore in redis... try { RedisKey chiave = hashKey; answ = redSaveHashDict(hashKey, hashFields); if (expireDT != null) { currDb.KeyExpire(chiave, expireDT); } //answ = true; } catch (Exception exc) { Logging.Instance.Error($"redSaveHashDict {exc}"); } return answ; } /// /// Salvataggio di una hash di valori /// /// chiave /// valori come lista KVP /// public bool redSaveHashList(string hashKey, List> hashListKVP) { bool answ = false; if (connRedis.IsConnected) { // cerco se ci sia valore in redis... IDatabase cache = connRedis.GetDatabase(); try { RedisKey chiave = hashKey; HashEntry[] valori = new HashEntry[hashListKVP.Count]; int i = 0; foreach (KeyValuePair kvp in hashListKVP) { valori[i] = new HashEntry(kvp.Key, kvp.Value); i++; } cache.HashSet(chiave, valori); answ = true; } catch (Exception exc) { Logging.Instance.Error($"redSaveHashList {exc}"); } } return answ; } /// /// Conteggio elementi in QUEUE (LIFO) /// /// /// public long redStackCount(RedisKey queueName) { return currDb.ListLength(queueName); } /// /// Recupero valore in STACK (LIFO) /// /// /// public RedisValue redStackPop(RedisKey stackName) { return currDb.ListRightPop(stackName); } /// /// Scrittura valore in STACK (LIFO) /// /// /// public void redStackPush(RedisKey stackName, RedisValue value) { currDb.ListRightPush(stackName, value); } /// /// Resetta (elimina) un contatore in Redis /// /// /// public bool resetRCnt(string chiave) { bool answ = false; try { answ = currDb.KeyDelete(chiave); } catch (Exception exc) { Logging.Instance.Error($"resetRCnt {exc}"); } return answ; } /// /// Invia un messaggio tramite notify channel + salva in currDb (se scaduto periodo veto) /// /// /// /// /// public bool saveAndSendMessage(string memKey, string notifyChannel, string message) { bool fatto = false; // effettuo la scrittura nell'area di memoria indicata SE passato intervallo minimo di 60 sec bool doSave = false; if (LastKeySave.ContainsKey(memKey)) { if (DateTime.Now.Subtract(LastKeySave[memKey]).TotalSeconds > vetoSavePeriod) { doSave = true; } } else { LastKeySave.Add(memKey, DateTime.Now); doSave = true; } // se scaduto veto periodo salvo if (doSave) { currDb.StringSet(memKey, message); LastKeySave[memKey] = DateTime.Now; Logging.Instance.Trace($"Redis Cache Key saved: {memKey}"); } // invio notifica tramite il canale richiesto RedisChannel pubNotifyChannel = new RedisChannel(notifyChannel, RedisChannel.PatternMode.Literal); messageDisp.Publish(pubNotifyChannel, message); fatto = true; // restituisco! return fatto; } /// /// Serializzazione di un oggetto generico /// /// /// public string serializeVal(object origVal) { string answ = ""; try { answ = JsonConvert.SerializeObject(origVal); } catch (Exception exc) { Logging.Instance.Error($"serializeVal {exc}"); } return answ; } public bool setKReqCount(string keyReq, int val) { string keyReqHash = redHash($"IOB:{currCodIob}:KRCOUNT:{keyReq}"); // salvo ogni chiave x 25 h (in sec 60*60*25) bool done = setRSV(keyReqHash, $"{val}", 60 * 60 * 25); return done; } /// /// Decrementa un contatore in Redis /// /// /// public long setRCntD(string chiave) { long answ = 0; try { answ = currDb.StringDecrement(chiave, 1); } catch (Exception exc) { Logging.Instance.Error($"setRCntD {exc}"); } return answ; } /// /// Incrementa un contatore in Redis /// /// /// public long setRCntI(string chiave) { long answ = 0; try { answ = currDb.StringIncrement(chiave, 1); } catch (Exception exc) { Logging.Instance.Error($"setRCntI {exc}"); } return answ; } /// /// Salva un set KVP (Key Value Pair) in RedisCache /// /// Set KVP chiave-valore da salvare /// public bool setRKeys(KeyValuePair[] valori) { bool answ = false; try { currDb.StringSet(valori); answ = true; } catch (Exception exc) { Logging.Instance.Error($"setRKeys {exc}"); } return answ; } /// /// Salva una chiave in RedisCache /// /// /// /// public bool setRSV(string chiave, string valore) { bool answ = false; try { currDb.StringSet(chiave, valore); answ = true; } catch (Exception exc) { Logging.Instance.Error($"setRSV {exc}"); } return answ; } /// /// Salva una chiave in RedisCache /// /// /// /// in secondi /// public bool setRSV(string chiave, string valore, int TTL_sec) { bool answ = false; try { TimeSpan expT = new TimeSpan(0, 0, TTL_sec); // salvo con expyry... currDb.StringSet(chiave, valore, expT); answ = true; } catch (Exception exc) { Logging.Instance.Error($"setRSV {exc}"); } return answ; } /// /// Registra un nuovo evento di scambio dati su redis /// /// Chiave ZSet eventi tracciati per recupero recenti /// Chiave hashset valore eventi /// Quantità di dati scambiati (byte) /// L'ID univoco dell'evento generato. public string TrackExchData(string rkeyTS, string rkeyHash, long dataQty) { // Usa un timestamp in millisecondi per maggiore granularità e corrispondenza con JS/Java long timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); string eventId = Guid.NewGuid().ToString(); // Genera un ID univoco // Aggiungi al Sorted Set: score = timestamp, member = eventId currDb.SortedSetAdd(rkeyTS, eventId, timestamp); // Memorizza i dettagli nella Hash Table: field = eventId, value = dataQty currDb.HashSet(rkeyHash, eventId, dataQty.ToString()); return eventId; } /// /// Registra un nuovo evento di scambio dati su redis /// /// Chiave ZSet eventi tracciati per recupero recenti /// Chiave hashset valore eventi /// Quantità di dati scambiati (byte) /// L'ID univoco dell'evento generato. public async Task TrackExchDataAsync(string rkeyTS, string rkeyHash, long dataQty) { // Usa un timestamp in millisecondi per maggiore granularità e corrispondenza con JS/Java long timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); string eventId = Guid.NewGuid().ToString(); // Genera un ID univoco // Aggiungi al Sorted Set: score = timestamp, member = eventId await currDb.SortedSetAddAsync(rkeyTS, eventId, timestamp); // Memorizza i dettagli nella Hash Table: field = eventId, value = dataQty await currDb.HashSetAsync(rkeyHash, eventId, dataQty.ToString()); #if false // Crea un batch per eseguire le operazioni in modo transazionale var batch = cache.CreateBatch(); // Aggiungi al Sorted Set: score = timestamp, member = eventId await batch.SortedSetAddAsync(EventsTimeSeriesKey, eventId, timestamp); // Memorizza i dettagli nella Hash Table: field = eventId, value = dataExchanged await batch.HashSetAsync(EventDetailsHashKey, eventId, dataExchanged.ToString()); // StackExchange.Redis memorizza RedisValue // Esegui il batch in modo sincrono (o non va...) batch.Execute(); //await batch.ExecuteAsync(); #endif Logging.Instance.Debug($"Evento registrato: ID={eventId}, Timestamp={timestamp}, Dati={dataQty}"); return eventId; } #endregion Public Methods #region Protected Fields /// /// Cod IOB /// protected string currCodIob = "000"; /// /// Tipo IOB /// protected string currIobType = "ND"; /// /// Nome del channel REDIS x dati IOB /// protected string redIobChannel = "IobChannel"; /// /// /Chiave varabili scambiate con IOB-MAN /// protected string redManKey = ""; /// /// Hash REDIS x dati server /// protected string redServKey = ""; #endregion Protected Fields #region Private Fields /// /// Dizionario delle ultime scritture in currDb redis dei messaggi /// private Dictionary LastKeySave = new Dictionary(); /// /// veriodo di veto salvataggio chaivi in REDIS quando inviate tramite pub/sub in realtime /// private int vetoSavePeriod = 10; #endregion Private Fields #region Private Properties /// /// Oggetto currentDb REDIS locale /// private IDatabase _currDB { get; set; } /// /// Oggetto subscriber x pubblicazione/sottoscrizione canali REDIS /// private ISubscriber _currSub { get; set; } /// /// Oggetto statico connessione redis /// private ConnectionMultiplexer connRedis { get; set; } #endregion Private Properties #region Private Methods /// /// Effettua comaprazione x CHIAVE in KVP ASC /// /// /// /// private int CompareKey(KeyValuePair x, KeyValuePair y) { return x.Key.CompareTo(y.Key); } /// /// Effettua comaprazione x CHIAVE in KVP DESC /// /// /// /// private int CompareKeyDesc(KeyValuePair x, KeyValuePair y) { return y.Key.CompareTo(x.Key); } private void initMemKeys() { // init dati di base... redServKey = redHash($"MP"); redManKey = redHash($"MAN"); redIobKey = redHash($"IOB:{currCodIob}"); redIobTrackKey = redHash($"IOB:FluxLog:{currCodIob}"); redIobChannel = $"IobChannel_{currCodIob}"; } /// /// Init oggetti connessione REDIS /// private void initRedisConn() { // init obj standard string RedisConn = baseUtils.CRS("RedisConn"); if (string.IsNullOrEmpty(RedisConn)) { RedisConn = "localhost,abortConnect=false,ssl=false"; } connRedis = ConnectionMultiplexer.Connect(RedisConn); Logging.Instance.Info($"Apertura di un Redis Multiplexer"); } /// /// Impostazione periodo veto secondo il comportamento di IOB-MAN /// private void setVetoPeriod() { // cerco in redis la chiave di veto (scritta da IOB-MAN) string rKey = $"{redManKey}:vetoSave"; string rawVal = getRSV(rKey); // se la trovo la metto da qui if (!string.IsNullOrEmpty(rawVal)) { int.TryParse(rawVal, out vetoSavePeriod); } //se non la trovo di default è 15 else { vetoSavePeriod = 15; // salvo in redis il valore impostato setRSV(rKey, $"{vetoSavePeriod}"); } } #endregion Private Methods } }