using Microsoft.AspNetCore.Identity.UI.Services;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using NLog;
using StackExchange.Redis;
using System.Diagnostics;
using WebDoorCreator.Core;
using WebDoorCreator.Data.DTO;
namespace WebDoorCreator.Data.Services
{
public class QueueDataService : IDisposable
{
#region Public Constructors
///
/// Init classe
///
///
///
///
public QueueDataService(IConfiguration configuration, IEmailSender emailSender, IConnectionMultiplexer redisConn)
{
Log.Info("QueueDataService starting...");
_configuration = configuration;
_emailSender = emailSender;
// setup componenti REDIS
var redConnString = _configuration.GetConnectionString("Redis");
if (redConnString == null)
{
Log.Error("REDIS ConnString empty!");
}
else
{
this.redisConn = ConnectionMultiplexer.Connect(redConnString);
redisDb = this.redisConn.GetDatabase();
}
// setup canali pub/sub
CalcReqPipe = new MessagePipe(redisConn, Constants.CALC_REQ_QUEUE);
CalcDonePipe = new MessagePipe(redisConn, Constants.CALC_DONE_QUEUE);
// json serializer... FIX errore loop circolare https://www.ryadel.com/en/jsonserializationexception-self-referencing-loop-detected-error-fix-entity-framework-asp-net-core/
JSSettings = new JsonSerializerSettings()
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
};
// leggo conf speciali
vetoRemoveProcSec = _configuration.GetValue("RuntimeOpt:VetoRemoveProcessing");
Log.Info("QueueDataService started!");
}
#endregion Public Constructors
#region Public Properties
///
/// Message pipe esecuzione elaborazione CAM --> UI
///
public MessagePipe CalcDonePipe { get; set; } = null!;
///
/// Message pipe richieste elaborazione UI --> CAM
///
public MessagePipe CalcReqPipe { get; set; } = null!;
#endregion Public Properties
#region Public Methods
public void Dispose()
{
redisConn.Dispose();
}
///
/// Effettua pulizia cache dati calcolo porte
///
///
///
public async Task DoorCalcCacheCleanup()
{
/*----------------------------------
* Recupero Rev corrente porta o inizializza cercando
* - coda calcoli pending
* - coda errori
* - coda calcoli eseguiti
*
* se non trova riparte da 1
*
----------------------------------*/
bool fatto = false;
int numDayPre = 30;
if (!string.IsNullOrEmpty(_configuration.GetValue("RuntimeOpt:MaxDayCalcCache")))
{
numDayPre = _configuration.GetValue("RuntimeOpt:MaxDayCalcCache");
}
DateTime dtLimite = DateTime.Now.AddDays(-numDayPre);
// recupero elenco di tutti i refresh porte...
var refreshStatus = await DoorRefreshStatus();
// ciclo su tutti e cerco quelli + vecchi di x giorni
if (refreshStatus != null && refreshStatus.Count > 0)
{
DateTime dtCurr = DateTime.Now;
foreach (var item in refreshStatus)
{
// confronto data-ora della singola porta...
if (!string.IsNullOrEmpty(item.Key))
{
try
{
var rawDate = JsonConvert.DeserializeObject(item.Value);
// se esiste e SE è scaduto
if (rawDate > DateTime.MinValue && rawDate < dtLimite)
{
// cancello i dati!
await RequestProcessingRemove(item.Key);
await RequestErrRemove(item.Key);
await RequestDoneRemove(item.Key);
await DoorRefreshRemove(item.Key);
}
}
catch
{ }
}
}
}
await Task.Delay(1);
return fatto;
}
///
/// Restitusice gli errori della porta in oggetto
///
/// formato DoorId:Versione
///
public async Task DoorErrExists(string doorIdVers)
{
bool hasErr = false;
// cerco versione errore x la porta...
var doorData = doorIdVers.Split(":");
if (doorData.Length == 2)
{
RedisKey currErrKey = new RedisKey($"{Constants.CALC_REQ_ERRS}");
int currId = await RedHashGetInt(currErrKey, doorData[0]);
hasErr = doorData[1] == $"{currId}";
}
return hasErr;
}
///
/// Restitusice gli errori della porta in oggetto
///
/// formato DoorId:Versione
///
public async Task DoorErrGet(string doorIdVers)
{
RedisKey currErrKey = new RedisKey($"{Constants.CALC_REQ_ERRS}:{doorIdVers}");
RedisValue errVal = await redisDb.StringGetAsync(currErrKey);
return errVal.ToString();
}
///
/// Cerca nelle esecuzioni ultimo SVG della porta e lo restituisce...
///
///
///
public async Task DoorGetLastSvg(int DoorId)
{
string answ = "";
// cerco in hashtable dei task eseguiti ultimo x porta corrente...
var doorData = await RequestDoneGetSingle(DoorId);
if (doorData != null && doorData.Count > 0)
{
var firstRecord = doorData.FirstOrDefault();
// recupero da REDIS
var currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{firstRecord.Key}:{firstRecord.Value}");
var rawData = await redisDb.StringGetAsync(currSvgKey);
if (rawData.HasValue)
{
answ = $"{rawData}";
}
}
// se non trovata --> missing!
if (string.IsNullOrEmpty(answ))
{
// leggo missing standard...
string missingFilePath = Path.Combine(System.IO.Directory.GetCurrentDirectory(), "wwwroot/images", "MissingOrange.svg");
if (File.Exists(missingFilePath))
{
answ = File.ReadAllText(missingFilePath);
}
}
return answ;
}
///
/// Remove for single hash record
///
///
public async Task DoorRefreshRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Get Door (last) Refresh status
///
/// Dictionary of DoorId, saveVersNumb
public async Task> DoorRefreshStatus()
{
string source = "REDIS";
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"DoorRefreshStatus | {source} in: {ts.TotalMilliseconds} ms");
return dictResult;
}
///
/// Upsert info ultimo update di una porta
///
/// Dictionary of DoorId, saveVersNumb
public async Task DoorRefreshUpsert(string doorId)
{
DateTime adesso = DateTime.Now;
string sAdesso = JsonConvert.SerializeObject(adesso, JSSettings);
RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY);
long numReq = await RedHashUpsert(currKey, doorId, sAdesso);
return numReq;
}
///
/// Restitusice l'SVG della porta in oggetto
///
/// formato DoorId:Versione
///
public async Task DoorSvgGet(string doorIdVers)
{
RedisKey currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{doorIdVers}");
RedisValue svgVal = await redisDb.StringGetAsync(currSvgKey);
return svgVal.ToString();
}
///
/// Check if specified DoorId is in Template door list
///
/// Door Id to search
/// Found (true/false)
public async Task DoorTplListContainsDoor(string DoorId)
{
bool found = false;
RedisKey currKey = new RedisKey(Constants.DOOR_TPL_LIST);
var rawVal = await RedHashGetString(currKey, DoorId);
found = !string.IsNullOrEmpty(rawVal) && (rawVal == DoorId);
return found;
}
///
/// Add specified DoorIdList to Template door list
///
/// List to add
/// Num of items in list
public async Task DoorTplListUpsert(List DoorIdList)
{
RedisKey currKey = new RedisKey(Constants.DOOR_TPL_LIST);
long numReq = 0;
foreach (var item in DoorIdList)
{
numReq = await RedHashUpsert(currKey, item, item);
}
return numReq;
}
public async Task FlushRedisCache()
{
await Task.Delay(1);
RedisValue pattern = new RedisValue($"{Constants.BASE_HASH}:Cache*");
bool answ = await ExecFlushRedisPattern(pattern);
return answ;
}
///
/// Get # of calculation request done
///
public async Task NumRequestDone()
{
long numReq = 0;
string source = "REDIS";
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = await redisDb.HashLengthAsync(currKey);
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"NumRequestDone | {source} in: {ts.TotalMilliseconds} ms");
return numReq;
}
///
/// Get # of calculation request with errors
///
public async Task NumRequestErrors()
{
long numReq = 0;
string source = "REDIS";
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = await redisDb.HashLengthAsync(currKey);
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"NumRequestErrors | {source} in: {ts.TotalMilliseconds} ms");
return numReq;
}
///
/// Get # of calculation request pending
///
public async Task NumRequestPending()
{
long numReq = 0;
string source = "REDIS";
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = await redisDb.HashLengthAsync(currKey);
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"NumRequestPending | {source} in: {ts.TotalMilliseconds} ms");
return numReq;
}
///
/// Get # of calculation request processing
///
public async Task NumRequestProcessing()
{
long numReq = 0;
string source = "REDIS";
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = await redisDb.HashLengthAsync(currKey);
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"NumRequestProcessing | {source} in: {ts.TotalMilliseconds} ms");
return numReq;
}
///
/// Get single hash record
///
/// Redis Key for Hashlist
/// Requested key on list
/// Value as Int
public async Task RedHashGetInt(RedisKey currKey, string chiave)
{
int result = 0;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
var hasVal = await redisDb.HashExistsAsync(currKey, chiave);
if (hasVal)
{
var rawRes = await redisDb.HashGetAsync(currKey, chiave);
if (rawRes.HasValue)
{
int.TryParse($"{rawRes}", out result);
}
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Trace($"RedHashGetInt | {currKey} | in: {ts.TotalMilliseconds} ms");
return result;
}
///
/// Get single hash record
///
/// Redis Key for Hashlist
/// Requested key on list
/// Value as string
public async Task RedHashGetString(RedisKey currKey, string chiave)
{
string result = "";
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
var hasVal = await redisDb.HashExistsAsync(currKey, chiave);
if (hasVal)
{
var rawRes = await redisDb.HashGetAsync(currKey, chiave);
if (rawRes.HasValue)
{
result = $"{rawRes}";
}
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Trace($"RedHashGetString | {currKey} | in: {ts.TotalMilliseconds} ms");
return result;
}
///
/// Remove for single hash record
///
/// Chiave redis della Hashlist
/// Chiave nella HashList
/// Esito rimozione
public async Task RedHashRemove(RedisKey currKey, string chiave)
{
bool fatto = false;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
fatto = await redisDb.HashDeleteAsync(currKey, chiave);
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Trace($"RedHashRemove | {currKey} | in: {ts.TotalMilliseconds} ms");
return fatto;
}
///
/// Restitusice gli errori della porta in oggetto
///
/// formato DoorId:Versione
///
public async Task RedisBulkDelHashByKey(int doorId)
{
await DoorRefreshRemove($"{doorId}");
await RequestDoneRemove($"{doorId}");
await RequestErrRemove($"{doorId}");
await RequestPendingRemove($"{doorId}");
await RequestProcessingRemove($"{doorId}");
return true;
}
///
/// Get Queue request done
///
/// Dictionary of DoorId, saveVersNumb
public async Task> RequestDone()
{
string source = "REDIS";
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"RequestDone | {source} in: {ts.TotalMilliseconds} ms");
return dictResult;
}
///
/// Get LAST request done for DoorId
///
///
///
public async Task> RequestDoneGetSingle(int DoorId)
{
string source = "REDIS";
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAsync(currKey, $"{DoorId}");
if (rawData.HasValue)
{
dictResult.Add($"{DoorId}", $"{rawData}");
}
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"RequestDoneGetSingle | {source} in: {ts.TotalMilliseconds} ms");
return dictResult;
}
///
/// Remove for single hash record
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestDoneRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Upsert for single hash record
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestDoneUpsert(string doorId, string vers)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
long numReq = await RedHashUpsert(currKey, doorId, vers);
return numReq;
}
///
/// Get Queue request with errors
///
/// Dictionary of DoorId, saveVersNumb
public async Task> RequestErr()
{
string source = "REDIS";
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"RequestErr | {source} in: {ts.TotalMilliseconds} ms");
return dictResult;
}
///
/// Rimuove hash record errori
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestErrRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Upsert record errori
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestErrUpsert(string doorId, string vers)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS);
long numReq = await RedHashUpsert(currKey, doorId, vers);
return numReq;
}
///
/// Get Queue request pending
///
/// Dictionary of DoorId, saveVersNumb
public async Task> RequestPending()
{
string source = "REDIS";
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"RequestPending | {source} in: {ts.TotalMilliseconds} ms");
return dictResult;
}
///
/// Remove for single hash record
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestPendingRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Upsert for single hash record
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestPendingUpsert(string doorId, string vers)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
long numReq = await RedHashUpsert(currKey, doorId, vers);
return numReq;
}
///
/// Get Queue request processing
///
/// Dictionary of DoorId, saveVersNumb
public async Task> RequestProcessing()
{
string source = "REDIS";
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"RequestProcessing | {source} in: {ts.TotalMilliseconds} ms");
return dictResult;
}
///
/// Remove for single hash record
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestProcessingRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Upsert for single hash record in processing
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestProcessingUpsert(string doorId, string vers)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
long numReq = await RedHashUpsert(currKey, doorId, vers);
// aggiungo cache veto rimozione da coda processing
RedisKey currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:{doorId}");
await redisDb.StringSetAsync(currVetoKey, vers, TimeSpan.FromSeconds(vetoRemoveProcSec));
return numReq;
}
///
/// Reset to queue request all processing/processed data
///
/// Dictionary of DoorId, saveVersNumb
public async Task ResetQueue()
{
bool fatto = false;
await Task.Delay(1);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
// cerco le richieste processing
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestProcessingRemove(item.Name!);
fatto = true;
}
// cerco le richieste con errori
currKey = new RedisKey(Constants.CALC_REQ_ERRS);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestErrRemove(item.Name!);
fatto = true;
}
// cerco le richieste processed
currKey = new RedisKey(Constants.CALC_REQ_DONE);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestDoneRemove(item.Name!);
fatto = true;
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"ResetQueue | REDIS EXEC in: {ts.TotalMilliseconds} ms");
return fatto;
}
///
/// Reset to queue request for specified DoorIdList
///
/// Dictionary of DoorId, saveVersNumb
public async Task> ResetQueueByDoorList(List DoorIdList)
{
int num2proc = DoorIdList.Count;
List listDone = new List();
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
// cerco le richieste processing
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
// se è nell'elenco...
if (DoorIdList.Contains(item.Name.ToString()))
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestProcessingRemove(item.Name!);
listDone.Add(item.Name.ToString());
}
}
// cerco le richieste con errori
currKey = new RedisKey(Constants.CALC_REQ_ERRS);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
// se è nell'elenco...
if (DoorIdList.Contains(item.Name.ToString()))
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestErrRemove(item.Name!);
listDone.Add(item.Name.ToString());
}
}
// cerco le richieste processed
currKey = new RedisKey(Constants.CALC_REQ_DONE);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
// se è nell'elenco...
if (DoorIdList.Contains(item.Name.ToString()))
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestDoneRemove(item.Name!);
listDone.Add(item.Name.ToString());
}
}
#if false
// calcolo eventuali porte NON ancora richieste...
foreach (var item in listDone)
{
DoorIdList.Remove(item);
}
#endif
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"ResetQueueByDoorList | doors #: {num2proc} --> {DoorIdList.Count} | EXEC in: {ts.TotalMilliseconds} ms");
return DoorIdList;
}
///
/// Reset to queue request all processing stuck data (veto check)
///
/// Boolean, executed
public async Task ResetQueueProcessing()
{
bool fatto = false;
await Task.Delay(1);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
// cerco le richieste processing
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
RedisKey currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:0");
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
// verifico SE siano senza veto rimozione...
currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:{item.Name}");
var redisVal = await redisDb.StringGetAsync(currVetoKey);
if (!redisVal.HasValue)
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestProcessingRemove(item.Name!);
fatto = true;
}
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"ResetQueueProcessing | REDIS EXEC in: {ts.TotalMilliseconds} ms");
return fatto;
}
///
/// Salvo elenco risultati elaborazioni (modalità boolean di esecuzione corretta)
///
/// Risultati elaborazioni in formato CalcResultDTO
///
public async Task SaveProcessingResult(List calcResults)
{
bool answ = true;
if (calcResults != null && calcResults.Count > 0)
{
string sDoorId = "";
string sCurrVers = "";
bool doorIsTpl = false;
foreach (var calcTask in calcResults)
{
RedisKey currSvgKey = new RedisKey("");
var doorData = calcTask.DoorIdVers.Split(".");
sDoorId = doorData.Length > 0 ? doorData[0] : "";
sCurrVers = doorData.Length > 0 ? doorData[1] : "";
// verifico se sia template --> durata indefinita...
doorIsTpl = await DoorTplListContainsDoor(sDoorId);
// se valido salvo SVG...
if (calcTask.Validated)
{
// salvo in area REDIS
currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{sDoorId}:{sCurrVers}");
// se template --> no scadenza
if (doorIsTpl)
{
await redisDb.StringSetAsync(currSvgKey, calcTask.SvgGen);
}
else
{
await redisDb.StringSetAsync(currSvgKey, calcTask.SvgGen, WeekLongCache);
}
// elimino dalle code proc/err
await RequestProcessingRemove(sDoorId);
await RequestErrRemove(sDoorId);
// metto in coda done
await RequestDoneUpsert(sDoorId, sCurrVers);
}
// altrimenti salvo errore e metto in coda errori
else
{
// salvo in area REDIS
currSvgKey = new RedisKey($"{Constants.CALC_REQ_ERRS}:{sDoorId}:{sCurrVers}");
if (doorIsTpl)
{
await redisDb.StringSetAsync(currSvgKey, calcTask.ErrorMsg);
}
else
{
await redisDb.StringSetAsync(currSvgKey, calcTask.ErrorMsg, WeekLongCache);
}
// elimino dalle code proc/done
await RequestProcessingRemove(sDoorId);
await RequestDoneRemove(sDoorId);
// metto in coda err
await RequestErrUpsert(sDoorId, sCurrVers);
}
// invio il messaggio di ritorno...
CalcDonePipe.saveAndSendMessage(Constants.LAST_CALC_DONE_KEY, calcTask.DoorIdVers);
}
}
else
{
await Task.Delay(1);
}
return answ;
}
///
/// Invio richiesta di calcolo per la porta indicata, dato il suo DDF
///
///
/// Contenuto completo del DDF
/// indice/versione del calcolo DDF
public async Task SendCalcReq(int DoorId, string FullDDF)
{
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
string sDoorId = $"{DoorId}";
int currVers = await DoorCalcRev(sDoorId);
string sCurrVers = $"{currVers}";
// elimino da code errori (SE fosse presente)
await RequestErrRemove(sDoorId);
// salvo nell'archivio REDIS delle porte il DDF corrente (key=doorId.versNumb), potrebbe
// venire buono anche x eventuale UNDO...
RedisKey currDdfKey = new RedisKey($"{Constants.CALC_REQ_DDF_CACHE}:{sDoorId}:{sCurrVers}");
await redisDb.StringSetAsync(currDdfKey, FullDDF, WeekLongCache);
// invio sul canale dei messaggi il numero di items in coda attuali x chiedere esecuzione...
long numPending = await NumRequestPending();
bool answ = CalcReqPipe.saveAndSendMessage(Constants.LAST_CALC_REQ_KEY, $"{numPending}");
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"SendCalcReq | DoorId: {DoorId} enqueued in: {ts.TotalMilliseconds} ms");
return currVers;
}
///
/// Get Queue request pending, removing from queue and putting on processing queue
///
/// Dictionary of DoorId, saveVersNumb
public async Task> TakeProcessingItems(int numItems)
{
int maxTake = Math.Min(10, numItems);
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
// calcolo il totale delle richieste pending
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
// per prima cosa aspetto che NON sia locked la coda...
while (queueLock)
{
Thread.Sleep(rnd.Next(1, 20));
}
// blocco
queueLock = true;
var rawData = await redisDb.HashGetAllAsync(currKey);
// sposto fino a concorrenza...
foreach (var item in rawData)
{
// per prima cosa tolgo da item richiesti e metto in processing
await RequestPendingRemove(item.Name!);
await RequestProcessingUpsert(item.Name!, item.Value!);
maxTake--;
// recupero il DDF...
RedisKey currDdfKey = new RedisKey($"{Constants.CALC_REQ_DDF_CACHE}:{item.Name}:{item.Value}");
var rawDDF = await redisDb.StringGetAsync(currDdfKey);
dictResult.Add($"{item.Name}.{item.Value}", $"{rawDDF}");
if (maxTake <= 0)
{
break;
}
}
// sblocco
queueLock = false;
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Debug($"TakeProcessingItems | REDIS in: {ts.TotalMilliseconds} ms");
return dictResult;
}
#endregion Public Methods
#region Protected Fields
protected const string rKeyCalcOreFase = "Check:OreFasi";
protected const string rKeyFasiAct = "Check:FasiAct";
protected const string rKeyProjAct = "Check:ProjAct";
///
/// TTL da 1 min x cache Redis
///
protected const int shortTTL = 60 * 5;
protected int idxSim = 0;
protected Random rnd = new Random();
#endregion Protected Fields
#region Protected Properties
protected static bool queueLock { get; set; } = false;
#endregion Protected Properties
#region Protected Methods
///
/// Recupera revisione corrente della porta o inizializza
///
///
///
protected async Task DoorCalcRev(string sDoorId)
{
/*----------------------------------
* Recupero Rev corrente porta o inizializza cercando
* - coda calcoli pending
* - coda errori
* - coda calcoli eseguiti
*
* se non trova riparte da 1
*
----------------------------------*/
int currVers = 1;
string sCurrVers = "";
// per prima cosa controllo se ho GIA' in coda qualcosa come richieste
if (await NumRequestPending() > 0)
{
var currPending = await RequestPending();
if (currPending != null)
{
if (currPending.ContainsKey(sDoorId))
{
sCurrVers = currPending[sDoorId];
int.TryParse(sCurrVers, out currVers);
currVers++;
}
}
}
// cerco coda errori
if (await NumRequestErrors() > 0)
{
var currErrors = await RequestErr();
if (currErrors != null)
{
if (currErrors.ContainsKey(sDoorId))
{
sCurrVers = currErrors[sDoorId];
int.TryParse(sCurrVers, out currVers);
currVers++;
}
}
}
// cerco coda task completati
if (await NumRequestDone() > 0)
{
var currDone = await RequestDone();
if (currDone != null)
{
if (currDone.ContainsKey(sDoorId))
{
sCurrVers = currDone[sDoorId];
int.TryParse(sCurrVers, out currVers);
currVers++;
}
}
}
// inserisco in coda x calcoli
await RequestPendingUpsert(sDoorId, $"{currVers}");
// metto in coda "last calc" la porta corrente x pulizia successiva
await DoorRefreshUpsert(sDoorId);
return currVers;
}
///
/// Recupero chiave da redis
///
///
///
protected async Task getRSV(string rKey)
{
string answ = "";
var rawData = await redisDb.StringGetAsync(rKey);
if (rawData.HasValue)
{
answ = $"{rawData}";
}
return answ;
}
///
/// Effettua upsert in HasList redis
///
/// Chiave redis della Hashlist
/// Chiave nella HashList
/// Valore da salvare
/// Num record nella HashList
protected async Task RedHashUpsert(RedisKey currKey, string chiave, string valore)
{
long numReq = 0;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
await redisDb.HashSetAsync(currKey, chiave, valore);
numReq = await redisDb.HashLengthAsync(currKey);
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
Log.Trace($"RedHashUpsert | {currKey} | in: {ts.TotalMilliseconds} ms");
return numReq;
}
///
/// Salvataggio chiave in redis
///
///
///
///
///
protected async Task setRSV(string rKey, string rVal, int ttlSec)
{
bool fatto = false;
await redisDb.StringSetAsync(rKey, rVal, TimeSpan.FromSeconds(ttlSec));
fatto = true;
return fatto;
}
///
/// Salvataggio chiave in redis
///
///
///
///
///
protected async Task setRSV(string rKey, int rValInt, int ttlSec)
{
bool fatto = false;
await redisDb.StringSetAsync(rKey, rValInt, TimeSpan.FromSeconds(ttlSec));
fatto = true;
return fatto;
}
///
/// Registra in cache chiave se non fosse già in elenco
///
///
protected void trackCache(string newKey)
{
if (!cachedDataList.Contains(newKey))
{
cachedDataList.Add(newKey);
}
}
#endregion Protected Methods
#region Private Fields
private static IConfiguration _configuration = null!;
private static JsonSerializerSettings? JSSettings;
private static Logger Log = LogManager.GetCurrentClassLogger();
private readonly IEmailSender _emailSender;
///
/// Elenco obj in cache
///
private List cachedDataList = new List();
///
/// Durata cache lunga IN SECONDI
///
private int cacheTtlLong = 60 * 5;
///
/// Durata cache breve IN SECONDI
///
private int cacheTtlShort = 60 * 1;
///
/// Oggetto per connessione a REDIS
///
private ConnectionMultiplexer redisConn = null!;
///
/// Oggetto DB redis da impiegare x chiamate R/W
///
private IDatabase redisDb = null!;
#endregion Private Fields
#region Private Properties
///
/// Durata cache di 24h
///
private TimeSpan DayLongCache
{
get => TimeSpan.FromHours(24);
}
///
/// Durata cache lunga (+ perturbazione percentuale +/-10%)
///
private TimeSpan FastCache
{
get => TimeSpan.FromSeconds(cacheTtlShort * rnd.Next(900, 1100) / 1000);
}
///
/// Durata cache lunga (+ perturbazione percentuale +/-10%)
///
private TimeSpan LongCache
{
get => TimeSpan.FromSeconds(cacheTtlLong * rnd.Next(900, 1100) / 1000);
}
///
/// Durata cache di 24h
///
private TimeSpan MonthLongCache
{
get => TimeSpan.FromDays(30);
}
///
/// Durata cache lunga (+ perturbazione percentuale +/-10%)
///
private TimeSpan UltraLongCache
{
get => TimeSpan.FromSeconds(cacheTtlLong * 10 * rnd.Next(900, 1100) / 1000);
}
private int vetoRemoveProcSec { get; set; } = 10;
///
/// Durata cache di 1 settimana
///
private TimeSpan WeekLongCache
{
get => TimeSpan.FromHours(24 * 7 * rnd.Next(1000, 1100) / 1000);
}
#endregion Private Properties
#region Private Methods
///
/// Esegue flush memoria redis dato pattern
///
///
///
private async Task ExecFlushRedisPattern(RedisValue pattern)
{
bool answ = false;
var listEndpoints = redisConn.GetEndPoints();
foreach (var endPoint in listEndpoints)
{
//var server = redisConnAdmin.GetServer(listEndpoints[0]);
var server = redisConn.GetServer(endPoint);
if (server != null)
{
var keyList = server.Keys(redisDb.Database, pattern);
foreach (var item in keyList)
{
await redisDb.KeyDeleteAsync(item);
}
// brutalmente rimuovo intero contenuto DB... DANGER
//await server.FlushDatabaseAsync();
answ = true;
}
}
return answ;
}
#endregion Private Methods
}
}