Files
2026-04-29 10:49:00 +02:00

1591 lines
53 KiB
C#

using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace IOB_UT_NEXT
{
public class RedisIobCache
{
#region Public Fields
/// <summary>
/// Hash REDIS x dati IOB
/// </summary>
public string redIobKey = "";
/// <summary>
/// Hash REDIS x dati IOB
/// </summary>
public string redIobTrackKey = "";
#endregion Public Fields
#region Public Constructors
/// <summary>
/// init classe "implicita"
/// </summary>
/// <param name="codServer"></param>
/// <param name="codIob"></param>
public RedisIobCache()
{
initRedisConn();
initMemKeys();
setVetoPeriod();
}
/// <summary>
/// init classe gestione dati IOB su Redis
/// </summary>
/// <param name="codServer">IP/nome server</param>
/// <param name="codIob">Cod IOB</param>
/// <param name="tipoIob">Tipo di IOB</param>
/// <param name="minDeltaS">
/// Minima differenza in secondi x considerare variazione dati DataOra
/// </param>
public RedisIobCache(string codServer, string codIob, string tipoIob, int minDeltaS)
{
initRedisConn();
// init dati di base...
currCodIob = codIob;
currIobType = tipoIob;
initMemKeys();
setVetoPeriod();
// init oggetti gestiti
ServerMpStatus newSrvStatus = new ServerMpStatus()
{
IP = codServer,
online = false
};
servStatus = newSrvStatus;
// init oggetto IOB
IobWinStatus newIobStatus = new IobWinStatus()
{
CodIob = currCodIob,
IobType = currIobType,
online = false,
minDeltaSec = minDeltaS
};
iobStatus = newIobStatus;
}
#endregion Public Constructors
#region Public Enums
/// <summary>
/// Tipologia di ordinamento x liste KVP
/// </summary>
public enum kvpOrderBy
{
/// <summary>
/// Ordinamento ASCending per KEY
/// </summary>
KeyAsc,
/// <summary>
/// Ordinamento DESCending per KEY
/// </summary>
KeyDesc,
/// <summary>
/// Ordinamento ASCending per VAL
/// </summary>
ValAsc,
/// <summary>
/// Ordinamento DESCending per VAL
/// </summary>
ValDesc
}
#endregion Public Enums
#region Public Properties
/// <summary>
/// Oggetto DB REDIS corrente
/// </summary>
public IDatabase currDb
{
get
{
IDatabase answ;
// se già valorizzato uso oggetto private...
if (_currDB != null)
{
answ = _currDB;
}
else
{
// init DB (sullo 0)
answ = connRedis.GetDatabase();
int dbNum = baseUtils.CRI("redisDb");
// gestione override...
if (dbNum >= 0)
{
// in questo caso uso il DB configurato in app.config...
answ = connRedis.GetDatabase(dbNum);
}
_currDB = answ;
Logging.Instance.Info($"Apertura di un Redis Database: {dbNum}");
}
// restituisco oggetto DB
return answ;
}
}
/// <summary>
/// Accesso all'oggetto stato IOB da esterno
/// </summary>
public IobWinStatus iobStatus
{
get
{
IobWinStatus answ = null;
// cerco in REDIS
string rawData = getRSV(redIobKey);
// se non ci fosse creo e salvo
if (string.IsNullOrEmpty(rawData))
{
answ = new IobWinStatus()
{
CodIob = currCodIob,
IobType = currIobType,
online = false
};
// salvo serializzando...
rawData = JsonConvert.SerializeObject(answ);
setRSV(redIobKey, rawData);
}
else
{
// deserializzo
answ = JsonConvert.DeserializeObject<IobWinStatus>(rawData);
}
return answ;
}
set
{
string rawData = JsonConvert.SerializeObject(value);
saveAndSendMessage(redIobKey, redIobChannel, rawData);
}
}
/// <summary>
/// Message Dispatcher: oggetto comunicazione pub/sub via REDIS channels corrente
/// </summary>
public ISubscriber messageDisp
{
get
{
ISubscriber answ;
// se già valorizzato uso oggetto private...
if (_currSub != null)
{
answ = _currSub;
}
else
{
// init DB (sullo 0)
answ = connRedis.GetSubscriber();
_currSub = answ;
Logging.Instance.Info($"Apertura di un Redis Message Subscriber");
}
// restituisco oggetto DB
return answ;
}
}
/// <summary>
/// Restituisce numero record in Redis DB
/// </summary>
public long numRecRedis
{
get
{
long answ = 0;
try
{
foreach (var ep in connRedis.GetEndPoints())
{
var server = connRedis.GetServer(ep);
answ += server.DatabaseSize();
}
}
catch (Exception exc)
{
Logging.Instance.Error($"numRecRedis {exc}");
}
return answ;
}
}
/// <summary>
/// Accesso all'oggetto stato server da esterno
/// </summary>
public ServerMpStatus servStatus
{
get
{
ServerMpStatus answ = null;
// cerco in REDIS
string rawData = getRSV(redServKey);
// se non ci fosse creo e salvo
if (string.IsNullOrEmpty(rawData))
{
answ = new ServerMpStatus()
{
online = false
};
// salvo serializzando...
rawData = JsonConvert.SerializeObject(answ);
setRSV(redServKey, rawData);
}
else
{
// deserializzo
answ = JsonConvert.DeserializeObject<ServerMpStatus>(rawData);
}
return answ;
}
set
{
string rawData = JsonConvert.SerializeObject(value);
setRSV(redServKey, rawData);
}
}
#endregion Public Properties
#region Public Methods
/// <summary>
/// Pulisce gli eventi più vecchi di un certo numero di minuti sia dal Sorted Set che dalla Hash Table.
/// </summary>
/// <param name="rkeyTS">Chiave ZSet eventi tracciati per recupero recenti</param>
/// <param name="rkeyHash">Chiave hashset valore eventi</param>
/// <param name="minutesOldThreshold">Il numero di minuti oltre il quale gli eventi sono considerati "scaduti".</param>
public async Task CleanExpiredEventsAsync(string rkeyTS, string rkeyHash, int minutesOldThreshold)
{
long cutoffTimeMillis = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - (long)TimeSpan.FromMinutes(minutesOldThreshold).TotalMilliseconds;
// PRIMA: Recupera gli ID degli eventi che saranno rimossi dal Sorted Set.
// Facciamo questo per poter poi rimuovere i loro dettagli dalla Hash Table.
// Utilizziamo ZRangeByScoreAsync con un intervallo da -infinito fino al cutoffTimeMillis.
RedisValue[] eventIdsToClean = await currDb.SortedSetRangeByScoreAsync(
rkeyTS,
//start: double.NegativeInfinity, // Per ottenere tutti gli elementi dal più vecchio, uso default
stop: cutoffTimeMillis,
exclude: Exclude.None,
order: Order.Ascending // Non strettamente necessario, ma può aiutare per debug
);
if (eventIdsToClean == null || eventIdsToClean.Length == 0)
{
Logging.Instance.Debug("Nessun evento scaduto da pulire.");
return;
}
// Operazione 1 nel batch: Rimuovi gli eventi scaduti dal Sorted Set.
// ZRemRangeByScoreAsync rimuove tutti i membri con score <= al max specificato.
await currDb.SortedSetRemoveRangeByScoreAsync(rkeyTS, double.NegativeInfinity, cutoffTimeMillis);
// Operazione 2 nel batch: Rimuovi i dettagli corrispondenti dalla Hash Table.
// Dobbiamo convertire i RedisValue[] in string[] per HDelAsync che accetta params RedisValue[]
RedisValue[] eventIdsAsRedisValues = eventIdsToClean.Select(id => (RedisValue)id.ToString()).ToArray();
await currDb.HashDeleteAsync(rkeyHash, eventIdsAsRedisValues);
Logging.Instance.Debug($"Pulizia completata. Rimossi {eventIdsToClean.Length} eventi più vecchi di {minutesOldThreshold} minuti.");
}
/// <summary>
/// Effettua comaprazione x VALORE in KVP ASC
/// </summary>
/// <param name="x"></param>
/// <param name="y"></param>
/// <returns></returns>
public int CompareVal(KeyValuePair<string, int> x, KeyValuePair<string, int> y)
{
return x.Value.CompareTo(y.Value);
}
/// <summary>
/// Effettua comaprazione x VALORE in KVP DESC
/// </summary>
/// <param name="x"></param>
/// <param name="y"></param>
/// <returns></returns>
public int CompareValDesc(KeyValuePair<string, int> x, KeyValuePair<string, int> y)
{
return y.Value.CompareTo(x.Value);
}
/// <summary>
/// Deserializzazione di un valore string in oggetto generico
/// </summary>
/// <param name="serVal"></param>
/// <returns></returns>
public object deserializeVal(string serVal)
{
object answ = "";
try
{
answ = JsonConvert.DeserializeObject(serVal);
}
catch (Exception exc)
{
Logging.Instance.Error($"deserializeVal {exc}");
}
return answ;
}
public int getKReqCount(string keyReq)
{
string keyReqHash = redHash($"IOB:{currCodIob}:KRCOUNT:{keyReq}");
int answ = 0;
string rawData = getRSV(keyReqHash);
if (!string.IsNullOrEmpty(rawData))
{
int.TryParse(rawData, out answ);
}
return answ;
}
/// <summary>
/// Restituisce una chiave COUNTER in RedisCache
/// </summary>
/// <param name="chiave"></param>
/// <returns></returns>
public int getRCnt(string chiave)
{
int answInt = 0;
string answ = "";
try
{
answ = currDb.StringGet(chiave);
answInt = Convert.ToInt32(answ);
}
catch (Exception exc)
{
Logging.Instance.Error($"getRCnt {exc}");
}
return answInt;
}
/// <summary>
/// Recupera gli eventi degli ultimi minuti specificati e ne somma la quantità di dati.
/// </summary>
/// <param name="rkeyTS">Chiave ZSet eventi tracciati per recupero recenti</param>
/// <param name="rkeyHash">Chiave hashset valore eventi</param>
/// <param name="minutesBack">Il numero di minuti da cui recuperare gli eventi.</param>
/// <returns>Una tupla contenente la quantità totale di dati scambiati e un elenco degli eventi trovati.</returns>
public async Task<(long TotalDataExchanged, List<(string EventId, long Data)> EventList)> GetRecentEventsAndSumAsync(string rkeyTS, string rkeyHash, int minutesBack)
{
long currentTimeMillis = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
long fiveMinutesAgoMillis = currentTimeMillis - (long)TimeSpan.FromMinutes(minutesBack).TotalMilliseconds;
// Recupera gli ID degli eventi dal Sorted Set nell'intervallo di tempo
// `SortedSetRangeByScoreAsync` restituisce i membri il cui score rientra nell'intervallo.
// I membri sono RedisValue, quindi potrebbero essere necessari .ToString() o .ToByteArray()
RedisValue[] eventIds = await currDb.SortedSetRangeByScoreAsync(
rkeyTS,
start: fiveMinutesAgoMillis,
stop: currentTimeMillis,
exclude: Exclude.None, // Includi entrambi gli estremi
order: Order.Ascending // Ordina per timestamp crescente
);
if (eventIds == null || eventIds.Length == 0)
{
return (0, new List<(string, long)>());
}
// Prepara l'array di RedisValue per la query HMGET
RedisValue[] eventIdsAsRedisValues = eventIds.ToList().Select(id => (RedisValue)id.ToString()).ToArray();
// Recupera le quantità di dati dalla Hash Table per gli ID trovati
// HMGET restituisce un array di RedisValue corrispondente ai field richiesti.
RedisValue[] dataQuantities = await currDb.HashGetAsync(rkeyHash, eventIdsAsRedisValues);
long totalDataExchanged = 0;
var eventList = new List<(string EventId, long Data)>();
for (int i = 0; i < eventIds.Length; i++)
{
string eventId = eventIds[i].ToString();
RedisValue quantityRedisValue = dataQuantities[i];
if (quantityRedisValue.HasValue && long.TryParse(quantityRedisValue.ToString(), out long quantity))
{
totalDataExchanged += quantity;
eventList.Add((eventId, quantity));
}
else
{
Logging.Instance.Debug("Avviso: Dati mancanti o non validi per l'evento ID: {eventId}");
}
}
Logging.Instance.Debug("Totale dati scambiati negli ultimi {minutesBack} minuti: {totalDataExchanged}");
Logging.Instance.Debug("Elenco eventi trovati: {eventList.Count}");
return (totalDataExchanged, eventList);
}
/// <summary>
/// Restituisce un pò di info sul server redis connesso
/// </summary>
/// <returns></returns>
public string getRedisInfoData()
{
string answ = "";
StringBuilder sb = new StringBuilder();
try
{
sb.AppendLine($"Configuration: {connRedis.Configuration}");
sb.AppendLine($"Connected: {connRedis.IsConnected}");
sb.AppendLine($"ClientName: {connRedis.ClientName}");
sb.AppendLine($"Total Ops: {connRedis.OperationCount}");
sb.AppendLine($"Status: {connRedis.GetStatus()}");
answ = sb.ToString();
}
catch (Exception exc)
{
Logging.Instance.Error($"getRedisInfoData {exc}");
}
return answ;
}
/// <summary>
/// Restituisce un set KVP (Key Value Pair) salvati in RedisCache
/// </summary>
/// <param name="chiavi"></param>
/// <returns></returns>
public RedisValue[] getRKeys(RedisKey[] chiavi)
{
RedisValue[] answ = null;
try
{
answ = currDb.StringGet(chiavi);
}
catch (Exception exc)
{
Logging.Instance.Error($"getRKeys {exc}");
}
return answ;
}
/// <summary>
/// Restituisce una chiave salvata in RedisCache
/// </summary>
/// <param name="chiave"></param>
/// <returns></returns>
public string getRSV(string chiave)
{
string answ = "";
try
{
answ = currDb.StringGet(chiave);
}
catch (Exception exc)
{
Logging.Instance.Info($"getRSV {exc}");
}
return answ;
}
/// <summary>
/// Conta num oggetti currDb redis che rispondono a pattern
/// </summary>
/// <param name="keyPattern">** = tutti</param>
/// <returns></returns>
public int redCountKey(string keyPattern)
{
int answ = 0;
// cerco se ci sia valore in redis... se vuoto = ALL...
keyPattern = string.IsNullOrEmpty(keyPattern) ? "**" : keyPattern;
try
{
foreach (var ep in connRedis.GetEndPoints())
{
var server = connRedis.GetServer(ep);
foreach (var key in server.Keys(pattern: keyPattern))
{
answ++;
}
}
}
catch (Exception exc)
{
Logging.Instance.Error($"redCountKey {exc}");
}
return answ;
}
/// <summary>
/// Elimina una key (hash, string)
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public bool redDelKey(string key)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = key;
currDb.KeyDelete(chiave);
answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"redDelKey {exc}");
}
return answ;
}
/// <summary>
/// Flush completo currDb redis
/// </summary>
/// <param name="keyPattern">** = tutti</param>
/// <returns></returns>
public bool redFlushKey(string keyPattern)
{
bool answ = false;
// cerco se ci sia valore in redis... se vuoto = ALL...
keyPattern = string.IsNullOrEmpty(keyPattern) ? "**" : keyPattern;
try
{
foreach (var ep in connRedis.GetEndPoints())
{
var server = connRedis.GetServer(ep);
var keys = server.Keys(database: baseUtils.CRI("redisDb"), pattern: $"{keyPattern}*");
foreach (var key in keys)
{
currDb.KeyDelete(key);
}
}
answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"redFlushKey {exc}");
}
return answ;
}
/// <summary>
/// Restituisce oggetti currDb redis che rispondono a pattern
/// </summary>
/// <param name="keyPattern">** = tutti</param>
/// <param name="orderBy">Tipo di ordinamento per kvp</param>
/// <returns></returns>
public List<KeyValuePair<string, int>> redGetCounterByKey(string keyPattern, kvpOrderBy orderBy)
{
int numAnsw = redCountKey(keyPattern);
RedisKey[] chiavi = new RedisKey[numAnsw];
List<KeyValuePair<string, int>> answ = new List<KeyValuePair<string, int>>();
// se vuoto = ALL...
keyPattern = string.IsNullOrEmpty(keyPattern) ? "**" : keyPattern;
// recupero in primis elenco chiavi
try
{
int i = 0;
foreach (var ep in connRedis.GetEndPoints())
{
var server = connRedis.GetServer(ep);
foreach (var key in server.Keys(pattern: keyPattern))
{
chiavi[i] = key;
i++;
}
}
}
catch (Exception exc)
{
Logging.Instance.Error($"redGetCounterByKey 01 {exc}");
}
// ora recupero valori!
var valori = getRKeys(chiavi);
int currVal = 0;
// popolo rispsota
try
{
for (int i = 0; i < numAnsw; i++)
{
Int32.TryParse(valori[i], out currVal);
answ.Add(new KeyValuePair<string, int>(chiavi[i], currVal));
}
}
catch (Exception exc)
{
Logging.Instance.Error($"redGetCounterByKey 02 {exc}");
}
// se richiesto riordino...
switch (orderBy)
{
case kvpOrderBy.KeyAsc:
answ.Sort(CompareKey);
break;
case kvpOrderBy.KeyDesc:
answ.Sort(CompareKeyDesc);
break;
case kvpOrderBy.ValAsc:
answ.Sort(CompareVal);
break;
case kvpOrderBy.ValDesc:
answ.Sort(CompareValDesc);
break;
default:
break;
}
return answ;
}
/// <summary>
/// Recupera tutti i valori dalla hash
/// </summary>
/// <param name="hashKey"></param>
/// <returns></returns>
public KeyValuePair<string, string>[] redGetHash(string hashKey)
{
KeyValuePair<string, string>[] answ = new KeyValuePair<string, string>[1];
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
HashEntry[] valori = currDb.HashGetAll(chiave);
answ = new KeyValuePair<string, string>[valori.Length];
int i = 0;
foreach (HashEntry item in valori)
{
answ[i] = new KeyValuePair<string, string>(item.Name, item.Value);
i++;
}
}
catch (Exception exc)
{
Logging.Instance.Error($"redGetHash {exc}");
}
return answ;
}
/// <summary>
/// Recupera tutti i valori dalla hash in formato Dictionary
/// </summary>
/// <param name="hashKey"></param>
/// <returns></returns>
public Dictionary<string, string> redGetHashDict(string hashKey)
{
Dictionary<string, string> answ = new Dictionary<string, string>();
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
HashEntry[] valori = currDb.HashGetAll(chiave);
foreach (HashEntry item in valori)
{
answ.Add(item.Name, item.Value);
}
}
catch (Exception exc)
{
Logging.Instance.Error($"redGetHashDict {exc}");
}
return answ;
}
/// <summary>
/// Recupera UN SINGOLO VALORE dalla hash per un dato field
/// </summary>
/// <param name="hashKey"></param>
/// <param name="hashField"></param>
/// <returns></returns>
public string redGetHashField(string hashKey, string hashField)
{
string answ = "";
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
RedisValue campo = hashField;
RedisValue valOut = currDb.HashGet(chiave, campo);
answ = valOut.ToString();
}
catch (Exception exc)
{
Logging.Instance.Error($"redGetHashField {exc}");
}
return answ;
}
/// <summary>
/// Nome della variabile HASH da utilizzare (dato CodModulo / Server / DB impiegato da
/// funzionalita' DbConfig) + keyName richiesto...
/// </summary>
public string redHash(string keyName)
{
keyName = keyName.Replace("\\", "_");
string answ = keyName;
try
{
answ = string.Format($"{baseUtils.CRS("appName")}:{keyName}");
}
catch (Exception exc)
{
Logging.Instance.Error($"redHash {exc}");
}
return answ;
}
/// <summary>
/// Verifica se ci siano valori nella hash indicata...
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public bool redHashPresent(RedisKey key)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
answ = currDb.HashGetAll(key).Length > 0;
}
catch (Exception exc)
{
Logging.Instance.Error($"redHashPresent {exc}");
}
return answ;
}
/// <summary>
/// Verifica se ci siano valori nella hash indicata (string)
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public bool redHashPresentSz(string key)
{
bool answ = false;
try
{
RedisKey chiave = key;
answ = redHashPresent(chiave);
}
catch (Exception exc)
{
Logging.Instance.Error($"redHashPresentSz {exc}");
}
return answ;
}
/// <summary>
/// Incremento conteggio di un valore dentro una hash
/// </summary>
/// <param name="hashKey">chiave</param>
/// <param name="hashField">valore con conteggio da tracciare</param>
/// <param name="hashExpire">scadenza preimpostata hash datetime, se null NON scade</param>
/// <param name="valIncr">valore incremento desiderato (default 1)</param>
/// <returns></returns>
public bool redIncrHashCount(string hashKey, string hashField, DateTime? hashExpire, double valIncr = 1)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
currDb.HashIncrement(chiave, hashField, valIncr);
if (hashExpire != null)
{
currDb.KeyExpire(chiave, hashExpire);
}
}
catch (Exception exc)
{
Logging.Instance.Error($"redIncrHashCount {exc}");
}
return answ;
}
/// <summary>
/// Verifica se ci siano valori nella KEY indicata...
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public bool redKeyPresent(RedisKey key)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
answ = currDb.KeyExists(key);
}
catch (Exception exc)
{
Logging.Instance.Error($"redKeyPresent {exc}");
}
return answ;
}
/// <summary>
/// Verifica se ci siano valori nella KEY indicata (string)
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public bool redKeyPresentSz(string key)
{
bool answ = false;
try
{
RedisKey chiave = key;
answ = redKeyPresent(chiave);
}
catch (Exception exc)
{
Logging.Instance.Error($"redKeyPresentSz {exc}");
}
return answ;
}
/// <summary>
/// Conteggio elementi in QUEUE (FIFO)
/// </summary>
/// <param name="queueName"></param>
public long redQueueCount(RedisKey queueName)
{
return currDb.ListLength(queueName);
}
/// <summary>
/// Recupero valore in QUEUE (FIFO)
/// </summary>
/// <param name="queueName"></param>
public RedisValue redQueuePop(RedisKey queueName)
{
return currDb.ListLeftPop(queueName);
}
/// <summary>
/// Recupero list di TUTTI i valori in QUEUE (FIFO)
/// </summary>
/// <param name="queueName"></param>
/// <param name="maxElem">num max di elementi da recuperare</param>
public List<RedisValue> redQueuePopAll(RedisKey queueName)
{
// vecchio metodo (NON resetta)
#if false
long nCount = currDb.ListLength(queueName);
List<RedisValue> listData = currDb.ListRange(queueName, 0, nCount).ToList();
return listData;
#endif
// lettura + reset in blocco
var listData = currDb.ListRange(queueName, 0, -1).ToList();
if (listData.Count > 0)
{
currDb.KeyDelete(queueName); // remove the entire list
}
return listData;
// in alternativa lettura 1:1
#if false
var results = new List<RedisValue>();
RedisValue item;
while ((item = currDb.ListLeftPop(queueName)) != RedisValue.Null)
{
results.Add(item);
}
return results;
#endif
}
/// <summary>
/// Recupero una list di valori in QUEUE (FIFO)
/// </summary>
/// <param name="queueName"></param>
/// <param name="maxElem">num max di elementi da recuperare</param>
public List<RedisValue> redQueuePopList(RedisKey queueName, int maxElem)
{
// vecchia modalità senza rimozione
#if false
long nCount = currDb.ListLength(queueName);
nCount = nCount < maxElem ? nCount : maxElem;
List<RedisValue> listData = currDb.ListRange(queueName, 0, nCount).ToList();
return listData;
#endif
// nuovo metodo con rimozione
var results = new List<RedisValue>(maxElem);
for (int i = 0; i < maxElem; i++)
{
var item = currDb.ListLeftPop(queueName);
if (item.IsNull) break; // queue empty
results.Add(item);
}
return results;
}
/// <summary>
/// Scrittura valore in QUEUE (FIFO)
/// </summary>
/// <param name="queueName"></param>
/// <param name="value"></param>
public long redQueuePush(RedisKey queueName, RedisValue value)
{
long qLen = currDb.ListRightPush(queueName, value);
return qLen;
}
/// <summary>
/// RIMUOVE un singolo valore (riga) dalla hash
/// </summary>
/// <param name="hashKey"></param>
/// <param name="hashField"></param>
/// <returns></returns>
public string redRemoveHashField(string hashKey, string hashField)
{
string answ = "";
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
RedisValue campo = hashField;
RedisValue valOut = currDb.HashDelete(chiave, campo);
answ = valOut.ToString();
}
catch (Exception exc)
{
Logging.Instance.Error($"redRemoveHashField{Environment.NewLine}{exc}");
}
return answ;
}
/// <summary>
/// Salvataggio di una hash di valori
/// </summary>
/// <param name="hashKey">chiave</param>
/// <param name="hashFields">valori</param>
/// <param name="hashExpire">scadenza preimpostata hash datetime, se null NON scade</param>
/// <returns></returns>
public bool redSaveHash(string hashKey, KeyValuePair<string, string>[] hashFields, DateTime? hashExpire)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
answ = redSaveHash(hashKey, hashFields);
if (hashExpire != null)
{
currDb.KeyExpire(chiave, hashExpire);
}
}
catch (Exception exc)
{
Logging.Instance.Error($"redSaveHash {exc}");
}
return answ;
}
/// <summary>
/// Salvataggio di una hash di valori
/// </summary>
/// <param name="hashKey">chiave</param>
/// <param name="hashFields">valori</param>
/// <returns></returns>
public bool redSaveHash(string hashKey, KeyValuePair<string, string>[] hashFields)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
HashEntry[] valori = new HashEntry[hashFields.Length];
int i = 0;
foreach (KeyValuePair<string, string> kvp in hashFields)
{
valori[i] = new HashEntry(kvp.Key, kvp.Value);
i++;
}
currDb.HashSet(chiave, valori);
answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"redSaveHash {exc}");
}
return answ;
}
/// <summary>
/// Salvataggio di una hash di valori
/// </summary>
/// <param name="hashKey">chiave</param>
/// <param name="hashFields">valori</param>
/// <param name="expireSeconds">
/// scadenza preimpostata hash (secondi) | default = -1 (non scade)
/// </param>
/// <returns></returns>
public bool redSaveHash(string hashKey, KeyValuePair<string, string>[] hashFields, double expireSeconds = -1)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
answ = redSaveHash(hashKey, hashFields);
if (expireSeconds > 0)
{
currDb.KeyExpire(chiave, DateTime.Now.AddSeconds(expireSeconds));
}
}
catch (Exception exc)
{
Logging.Instance.Error($"redSaveHash {exc}");
}
return answ;
}
/// <summary>
/// Salvataggio di una hash di valori in formato Dictionary
/// </summary>
/// <param name="hashKey">chiave</param>
/// <param name="hashFields">valori</param>
/// <returns></returns>
public bool redSaveHashDict(string hashKey, Dictionary<string, string> hashFields)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
HashEntry[] valori = new HashEntry[hashFields.Count];
int i = 0;
foreach (KeyValuePair<string, string> kvp in hashFields)
{
valori[i] = new HashEntry(kvp.Key, kvp.Value);
i++;
}
currDb.HashSet(chiave, valori);
answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"redSaveHashDict {exc}");
}
return answ;
}
/// <summary>
/// Salvataggio di una hash di valori in formato Dictionary
/// </summary>
/// <param name="hashKey">chiave</param>
/// <param name="hashFields">valori</param>
/// <param name="expireSeconds">
/// scadenza preimpostata hash (secondi) | default = -1 (non scade)
/// </param>
/// <returns></returns>
public bool redSaveHashDict(string hashKey, Dictionary<string, string> hashFields, double expireSeconds = -1)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
answ = redSaveHashDict(hashKey, hashFields);
if (expireSeconds > 0)
{
currDb.KeyExpire(chiave, DateTime.Now.AddSeconds(expireSeconds));
}
//answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"redSaveHashDict {exc}");
}
return answ;
}
/// <summary>
/// Salvataggio di una hash di valori in formato Dictionary
/// </summary>
/// <param name="hashKey">chiave</param>
/// <param name="hashFields">valori</param>
/// <param name="expireDT">scadenza hash COME DATETIME (NULL = NON SCADE)</param>
/// <returns></returns>
public bool redSaveHashDict(string hashKey, Dictionary<string, string> hashFields, DateTime? expireDT)
{
bool answ = false;
// cerco se ci sia valore in redis...
try
{
RedisKey chiave = hashKey;
answ = redSaveHashDict(hashKey, hashFields);
if (expireDT != null)
{
currDb.KeyExpire(chiave, expireDT);
}
//answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"redSaveHashDict {exc}");
}
return answ;
}
/// <summary>
/// Salvataggio di una hash di valori
/// </summary>
/// <param name="hashKey">chiave</param>
/// <param name="hashListKVP">valori come lista KVP</param>
/// <returns></returns>
public bool redSaveHashList(string hashKey, List<KeyValuePair<string, string>> hashListKVP)
{
bool answ = false;
if (connRedis.IsConnected)
{
// cerco se ci sia valore in redis...
IDatabase cache = connRedis.GetDatabase();
try
{
RedisKey chiave = hashKey;
HashEntry[] valori = new HashEntry[hashListKVP.Count];
int i = 0;
foreach (KeyValuePair<string, string> kvp in hashListKVP)
{
valori[i] = new HashEntry(kvp.Key, kvp.Value);
i++;
}
cache.HashSet(chiave, valori);
answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"redSaveHashList {exc}");
}
}
return answ;
}
/// <summary>
/// Conteggio elementi in QUEUE (LIFO)
/// </summary>
/// <param name="queueName"></param>
/// <param name="value"></param>
public long redStackCount(RedisKey queueName)
{
return currDb.ListLength(queueName);
}
/// <summary>
/// Recupero valore in STACK (LIFO)
/// </summary>
/// <param name="stackName"></param>
/// <returns></returns>
public RedisValue redStackPop(RedisKey stackName)
{
return currDb.ListRightPop(stackName);
}
/// <summary>
/// Scrittura valore in STACK (LIFO)
/// </summary>
/// <param name="stackName"></param>
/// <param name="value"></param>
public void redStackPush(RedisKey stackName, RedisValue value)
{
currDb.ListRightPush(stackName, value);
}
/// <summary>
/// Resetta (elimina) un contatore in Redis
/// </summary>
/// <param name="chiave"></param>
/// <returns></returns>
public bool resetRCnt(string chiave)
{
bool answ = false;
try
{
answ = currDb.KeyDelete(chiave);
}
catch (Exception exc)
{
Logging.Instance.Error($"resetRCnt {exc}");
}
return answ;
}
/// <summary>
/// Invia un messaggio tramite notify channel + salva in currDb (se scaduto periodo veto)
/// </summary>
/// <param name="memKey"></param>
/// <param name="notifyChannel"></param>
/// <param name="message"></param>
/// <returns></returns>
public bool saveAndSendMessage(string memKey, string notifyChannel, string message)
{
bool fatto = false;
// effettuo la scrittura nell'area di memoria indicata SE passato intervallo minimo di 60 sec
bool doSave = false;
if (LastKeySave.ContainsKey(memKey))
{
if (DateTime.Now.Subtract(LastKeySave[memKey]).TotalSeconds > vetoSavePeriod)
{
doSave = true;
}
}
else
{
LastKeySave.Add(memKey, DateTime.Now);
doSave = true;
}
// se scaduto veto periodo salvo
if (doSave)
{
currDb.StringSet(memKey, message);
LastKeySave[memKey] = DateTime.Now;
Logging.Instance.Trace($"Redis Cache Key saved: {memKey}");
}
// invio notifica tramite il canale richiesto
RedisChannel pubNotifyChannel = new RedisChannel(notifyChannel, RedisChannel.PatternMode.Literal);
messageDisp.Publish(pubNotifyChannel, message);
fatto = true;
// restituisco!
return fatto;
}
/// <summary>
/// Serializzazione di un oggetto generico
/// </summary>
/// <param name="origVal"></param>
/// <returns></returns>
public string serializeVal(object origVal)
{
string answ = "";
try
{
answ = JsonConvert.SerializeObject(origVal);
}
catch (Exception exc)
{
Logging.Instance.Error($"serializeVal {exc}");
}
return answ;
}
public bool setKReqCount(string keyReq, int val)
{
string keyReqHash = redHash($"IOB:{currCodIob}:KRCOUNT:{keyReq}");
// salvo ogni chiave x 25 h (in sec 60*60*25)
bool done = setRSV(keyReqHash, $"{val}", 60 * 60 * 25);
return done;
}
/// <summary>
/// Decrementa un contatore in Redis
/// </summary>
/// <param name="chiave"></param>
/// <returns></returns>
public long setRCntD(string chiave)
{
long answ = 0;
try
{
answ = currDb.StringDecrement(chiave, 1);
}
catch (Exception exc)
{
Logging.Instance.Error($"setRCntD {exc}");
}
return answ;
}
/// <summary>
/// Incrementa un contatore in Redis
/// </summary>
/// <param name="chiave"></param>
/// <returns></returns>
public long setRCntI(string chiave)
{
long answ = 0;
try
{
answ = currDb.StringIncrement(chiave, 1);
}
catch (Exception exc)
{
Logging.Instance.Error($"setRCntI {exc}");
}
return answ;
}
/// <summary>
/// Salva un set KVP (Key Value Pair) in RedisCache
/// </summary>
/// <param name="valori">Set KVP chiave-valore da salvare</param>
/// <returns></returns>
public bool setRKeys(KeyValuePair<RedisKey, RedisValue>[] valori)
{
bool answ = false;
try
{
currDb.StringSet(valori);
answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"setRKeys {exc}");
}
return answ;
}
/// <summary>
/// Salva una chiave in RedisCache
/// </summary>
/// <param name="chiave"></param>
/// <param name="valore"></param>
/// <returns></returns>
public bool setRSV(string chiave, string valore)
{
bool answ = false;
try
{
currDb.StringSet(chiave, valore);
answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"setRSV {exc}");
}
return answ;
}
/// <summary>
/// Salva una chiave in RedisCache
/// </summary>
/// <param name="chiave"></param>
/// <param name="valore"></param>
/// <param name="TTL_sec">in secondi</param>
/// <returns></returns>
public bool setRSV(string chiave, string valore, int TTL_sec)
{
bool answ = false;
try
{
TimeSpan expT = new TimeSpan(0, 0, TTL_sec);
// salvo con expyry...
currDb.StringSet(chiave, valore, expT);
answ = true;
}
catch (Exception exc)
{
Logging.Instance.Error($"setRSV {exc}");
}
return answ;
}
/// <summary>
/// Registra un nuovo evento di scambio dati su redis
/// </summary>
/// <param name="rkeyTS">Chiave ZSet eventi tracciati per recupero recenti</param>
/// <param name="rkeyHash">Chiave hashset valore eventi</param>
/// <param name="dataQty">Quantità di dati scambiati (byte)</param>
/// <returns>L'ID univoco dell'evento generato.</returns>
public string TrackExchData(string rkeyTS, string rkeyHash, long dataQty)
{
// Usa un timestamp in millisecondi per maggiore granularità e corrispondenza con JS/Java
long timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
string eventId = Guid.NewGuid().ToString(); // Genera un ID univoco
// Aggiungi al Sorted Set: score = timestamp, member = eventId
currDb.SortedSetAdd(rkeyTS, eventId, timestamp);
// Memorizza i dettagli nella Hash Table: field = eventId, value = dataQty
currDb.HashSet(rkeyHash, eventId, dataQty.ToString());
return eventId;
}
/// <summary>
/// Registra un nuovo evento di scambio dati su redis
/// </summary>
/// <param name="rkeyTS">Chiave ZSet eventi tracciati per recupero recenti</param>
/// <param name="rkeyHash">Chiave hashset valore eventi</param>
/// <param name="dataQty">Quantità di dati scambiati (byte)</param>
/// <returns>L'ID univoco dell'evento generato.</returns>
public async Task<string> TrackExchDataAsync(string rkeyTS, string rkeyHash, long dataQty)
{
// Usa un timestamp in millisecondi per maggiore granularità e corrispondenza con JS/Java
long timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
string eventId = Guid.NewGuid().ToString(); // Genera un ID univoco
// Aggiungi al Sorted Set: score = timestamp, member = eventId
await currDb.SortedSetAddAsync(rkeyTS, eventId, timestamp);
// Memorizza i dettagli nella Hash Table: field = eventId, value = dataQty
await currDb.HashSetAsync(rkeyHash, eventId, dataQty.ToString());
#if false
// Crea un batch per eseguire le operazioni in modo transazionale
var batch = cache.CreateBatch();
// Aggiungi al Sorted Set: score = timestamp, member = eventId
await batch.SortedSetAddAsync(EventsTimeSeriesKey, eventId, timestamp);
// Memorizza i dettagli nella Hash Table: field = eventId, value = dataExchanged
await batch.HashSetAsync(EventDetailsHashKey, eventId, dataExchanged.ToString()); // StackExchange.Redis memorizza RedisValue
// Esegui il batch in modo sincrono (o non va...)
batch.Execute();
//await batch.ExecuteAsync();
#endif
Logging.Instance.Debug($"Evento registrato: ID={eventId}, Timestamp={timestamp}, Dati={dataQty}");
return eventId;
}
#endregion Public Methods
#region Protected Fields
/// <summary>
/// Cod IOB
/// </summary>
protected string currCodIob = "000";
/// <summary>
/// Tipo IOB
/// </summary>
protected string currIobType = "ND";
/// <summary>
/// Nome del channel REDIS x dati IOB
/// </summary>
protected string redIobChannel = "IobChannel";
/// <summary>
/// /Chiave varabili scambiate con IOB-MAN
/// </summary>
protected string redManKey = "";
/// <summary>
/// Hash REDIS x dati server
/// </summary>
protected string redServKey = "";
#endregion Protected Fields
#region Private Fields
/// <summary>
/// Dizionario delle ultime scritture in currDb redis dei messaggi
/// </summary>
private Dictionary<string, DateTime> LastKeySave = new Dictionary<string, DateTime>();
/// <summary>
/// veriodo di veto salvataggio chaivi in REDIS quando inviate tramite pub/sub in realtime
/// </summary>
private int vetoSavePeriod = 10;
#endregion Private Fields
#region Private Properties
/// <summary>
/// Oggetto currentDb REDIS locale
/// </summary>
private IDatabase _currDB { get; set; }
/// <summary>
/// Oggetto subscriber x pubblicazione/sottoscrizione canali REDIS
/// </summary>
private ISubscriber _currSub { get; set; }
/// <summary>
/// Oggetto statico connessione redis
/// </summary>
private ConnectionMultiplexer connRedis { get; set; }
#endregion Private Properties
#region Private Methods
/// <summary>
/// Effettua comaprazione x CHIAVE in KVP ASC
/// </summary>
/// <param name="x"></param>
/// <param name="y"></param>
/// <returns></returns>
private int CompareKey(KeyValuePair<string, int> x, KeyValuePair<string, int> y)
{
return x.Key.CompareTo(y.Key);
}
/// <summary>
/// Effettua comaprazione x CHIAVE in KVP DESC
/// </summary>
/// <param name="x"></param>
/// <param name="y"></param>
/// <returns></returns>
private int CompareKeyDesc(KeyValuePair<string, int> x, KeyValuePair<string, int> y)
{
return y.Key.CompareTo(x.Key);
}
private void initMemKeys()
{
// init dati di base...
redServKey = redHash($"MP");
redManKey = redHash($"MAN");
redIobKey = redHash($"IOB:{currCodIob}");
redIobTrackKey = redHash($"IOB:FluxLog:{currCodIob}");
redIobChannel = $"IobChannel_{currCodIob}";
}
/// <summary>
/// Init oggetti connessione REDIS
/// </summary>
private void initRedisConn()
{
// init obj standard
string RedisConn = baseUtils.CRS("RedisConn");
if (string.IsNullOrEmpty(RedisConn))
{
RedisConn = "localhost,abortConnect=false,ssl=false";
}
connRedis = ConnectionMultiplexer.Connect(RedisConn);
Logging.Instance.Info($"Apertura di un Redis Multiplexer");
}
/// <summary>
/// Impostazione periodo veto secondo il comportamento di IOB-MAN
/// </summary>
private void setVetoPeriod()
{
// cerco in redis la chiave di veto (scritta da IOB-MAN)
string rKey = $"{redManKey}:vetoSave";
string rawVal = getRSV(rKey);
// se la trovo la metto da qui
if (!string.IsNullOrEmpty(rawVal))
{
int.TryParse(rawVal, out vetoSavePeriod);
}
//se non la trovo di default è 15
else
{
vetoSavePeriod = 15;
// salvo in redis il valore impostato
setRSV(rKey, $"{vetoSavePeriod}");
}
}
#endregion Private Methods
}
}