using Microsoft.AspNetCore.Identity.UI.Services;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using NLog;
using NLog.LayoutRenderers.Wrappers;
using StackExchange.Redis;
using System.Diagnostics;
using WebDoorCreator.Core;
using WebDoorCreator.Data.DTO;
namespace WebDoorCreator.Data.Services
{
public class QueueDataService : IDisposable
{
#region Public Constructors
///
/// Init classe
///
///
///
///
public QueueDataService(IConfiguration configuration, IEmailSender emailSender, IConnectionMultiplexer redisConn)
{
Log.Info("QueueDataService starting...");
_configuration = configuration;
_emailSender = emailSender;
// setup componenti REDIS
var redConnString = _configuration.GetConnectionString("Redis");
if (redConnString == null)
{
Log.Error("REDIS ConnString empty!");
}
else
{
this.redisConn = ConnectionMultiplexer.Connect(redConnString);
redisDb = this.redisConn.GetDatabase();
}
// setup canali pub/sub
CalcReqPipe = new MessagePipe(redisConn, Constants.CALC_REQ_QUEUE);
CalcDonePipe = new MessagePipe(redisConn, Constants.CALC_DONE_QUEUE);
// json serializer... FIX errore loop circolare https://www.ryadel.com/en/jsonserializationexception-self-referencing-loop-detected-error-fix-entity-framework-asp-net-core/
JSSettings = new JsonSerializerSettings()
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
};
// leggo conf speciali
vetoRemoveProcSec = _configuration.GetValue("RuntimeOpt:VetoRemoveProcessing");
logTimingEnable = configuration.GetValue("RuntimeOpt:LogTimingEnable");
statSampleSize = configuration.GetValue("RuntimeOpt:StatSampleSize");
// fix dati img missing...
missingFilePath = Path.Combine(Directory.GetCurrentDirectory(), "wwwroot/images", "MissingOrange.svg");
if (File.Exists(missingFilePath))
{
missingSvgContent = File.ReadAllText(missingFilePath);
}
Log.Info("QueueDataService started!");
}
#endregion Public Constructors
#region Public Properties
///
/// Message pipe esecuzione elaborazione CAM --> UI
///
public MessagePipe CalcDonePipe { get; set; } = null!;
///
/// Message pipe richieste elaborazione UI --> CAM
///
public MessagePipe CalcReqPipe { get; set; } = null!;
#endregion Public Properties
#region Public Methods
public void Dispose()
{
redisConn.Dispose();
}
///
/// Effettua pulizia cache dati calcolo porte
///
///
///
public async Task DoorCalcCacheCleanup()
{
/*----------------------------------
* Recupero Rev corrente porta o inizializza cercando
* - coda calcoli pending
* - coda errori
* - coda calcoli eseguiti
*
* se non trova riparte da 1
*
----------------------------------*/
bool fatto = false;
int numDayPre = 30;
if (!string.IsNullOrEmpty(_configuration.GetValue("RuntimeOpt:MaxDayCalcCache")))
{
numDayPre = _configuration.GetValue("RuntimeOpt:MaxDayCalcCache");
}
DateTime dtLimite = DateTime.Now.AddDays(-numDayPre);
// recupero elenco di tutti i refresh porte...
var refreshStatus = await DoorRefreshStatus();
// ciclo su tutti e cerco quelli + vecchi di x giorni
if (refreshStatus != null && refreshStatus.Count > 0)
{
DateTime dtCurr = DateTime.Now;
foreach (var item in refreshStatus)
{
// confronto data-ora della singola porta...
if (!string.IsNullOrEmpty(item.Key))
{
try
{
var rawDate = JsonConvert.DeserializeObject(item.Value);
// se esiste e SE è scaduto
if (rawDate > DateTime.MinValue && rawDate < dtLimite)
{
// cancello i dati!
await RequestProcessingRemove(item.Key);
await RequestErrRemove(item.Key);
await RequestDoneRemove(item.Key);
await DoorRefreshRemove(item.Key);
}
}
catch
{ }
}
}
}
await Task.Delay(1);
return fatto;
}
///
/// Restitusice gli errori della porta in oggetto
///
/// formato DoorId:Versione
///
public async Task DoorErrExists(string doorIdVers)
{
bool hasErr = false;
// cerco versione errore x la porta...
var doorData = doorIdVers.Split(":");
if (doorData.Length == 2)
{
RedisKey currErrKey = new RedisKey($"{Constants.CALC_REQ_ERRS}");
int currId = await RedHashGetInt(currErrKey, doorData[0]);
hasErr = doorData[1] == $"{currId}";
}
return hasErr;
}
///
/// Restitusice gli errori della porta in oggetto
///
/// formato DoorId:Versione
///
public async Task DoorErrGet(string doorIdVers)
{
RedisKey currErrKey = new RedisKey($"{Constants.CALC_REQ_ERRS}:{doorIdVers}");
RedisValue errVal = await redisDb.StringGetAsync(currErrKey);
return errVal.ToString();
}
///
/// Cerca nelle esecuzioni ultimo SVG della porta e lo restituisce...
///
///
///
public async Task DoorGetLastSvg(int DoorId)
{
string answ = "";
// cerco in hashtable dei task eseguiti ultimo x porta corrente...
var doorData = await RequestDoneGetSingle(DoorId);
if (doorData != null && doorData.Count > 0)
{
var firstRecord = doorData.FirstOrDefault();
// recupero da REDIS
var currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{firstRecord.Key}:{firstRecord.Value}");
var rawData = await redisDb.StringGetAsync(currSvgKey);
if (rawData.HasValue)
{
answ = $"{rawData}";
}
}
return answ;
}
///
/// Restituisce SVG dell'immagine missing...
///
///
///
public string DoorGetMissingSvg()
{
string answ = missingSvgContent;
if (string.IsNullOrEmpty(answ))
{
// fix dati img missing...
missingFilePath = Path.Combine(Directory.GetCurrentDirectory(), "wwwroot/images", "MissingOrange.svg");
if (File.Exists(missingFilePath))
{
missingSvgContent = File.ReadAllText(missingFilePath);
answ = missingSvgContent;
}
}
return answ;
}
///
/// Cerca nei VetoProc se ci sia x porta corrente
///
///
///
public async Task DoorProcVetoGetAsync(int DoorId)
{
string answ = "";
// recupero da REDIS
var currVetoKey = new RedisKey($"{Constants.CALC_REQ_VETO_REC}:{DoorId}");
var rawData = await redisDb.StringGetAsync(currVetoKey);
if (rawData.HasValue)
{
answ = $"{rawData}";
}
return answ;
}
///
/// imposta un VetoProc x porta corrente x 1 sec
///
/// ID porta da vietare x ricalcolo
/// Durata veto in ms
///
public async Task DoorProcVetoSetAsync(int DoorId, int msVeto)
{
// recupero da REDIS
var currVetoKey = new RedisKey($"{Constants.CALC_REQ_VETO_REC}:{DoorId}");
var rawData = await redisDb.StringSetAsync(currVetoKey, $"{DoorId}", TimeSpan.FromMilliseconds(msVeto));
}
///
/// Remove for single hash record
///
///
public async Task DoorRefreshRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Get Door (last) Refresh status
///
/// Dictionary of DoorId, saveVersNumb
public async Task> DoorRefreshStatus()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("DoorRefreshStatus", sw.Elapsed, 1, 0.3);
}
return dictResult;
}
///
/// Upsert info ultimo update di una porta
///
/// Dictionary of DoorId, saveVersNumb
public async Task DoorRefreshUpsert(string doorId)
{
DateTime adesso = DateTime.Now;
string sAdesso = JsonConvert.SerializeObject(adesso, JSSettings);
RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY);
long numReq = await RedHashUpsert(currKey, doorId, sAdesso);
return numReq;
}
///
/// Restitusice l'SVG della porta in oggetto
///
/// formato DoorId:Versione
///
public async Task DoorSvgGet(string doorIdVers)
{
RedisKey currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{doorIdVers}");
RedisValue svgVal = await redisDb.StringGetAsync(currSvgKey);
return svgVal.ToString();
}
///
/// Check if specified DoorId is in Template door list
///
/// Door Id to search
/// Found (true/false)
public async Task DoorTplListContainsDoor(string DoorId)
{
bool found = false;
RedisKey currKey = new RedisKey(Constants.DOOR_TPL_LIST);
var rawVal = await RedHashGetString(currKey, DoorId, true);
found = !string.IsNullOrEmpty(rawVal) && (rawVal == DoorId);
return found;
}
///
/// Add specified DoorIdList to Template door list
///
/// List to add
/// Num of items in list
public async Task DoorTplListUpsert(List DoorIdList)
{
RedisKey currKey = new RedisKey(Constants.DOOR_TPL_LIST);
long numReq = 0;
foreach (var item in DoorIdList)
{
numReq = await RedHashUpsert(currKey, item, item);
}
return numReq;
}
public async Task FlushRedisCache()
{
await Task.Delay(1);
RedisValue pattern = new RedisValue($"{Constants.BASE_HASH}:Cache*");
bool answ = await ExecFlushRedisPattern(pattern);
return answ;
}
///
/// Get # of calculation request done
///
public async Task NumRequestDone()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = await redisDb.HashLengthAsync(currKey);
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("NumRequestDone", sw.Elapsed, 2, 10);
}
return numReq;
}
///
/// Get # of calculation request with errors
///
public async Task NumRequestErrors()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = await redisDb.HashLengthAsync(currKey);
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("NumRequestErrors", sw.Elapsed, 2, 10);
}
return numReq;
}
///
/// Get # of calculation request pending
///
public async Task NumRequestPending()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = await redisDb.HashLengthAsync(currKey);
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("NumRequestPending", sw.Elapsed, 2, 10);
}
return numReq;
}
///
/// Get # of calculation request processing
///
public async Task NumRequestProcessing()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = await redisDb.HashLengthAsync(currKey);
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("NumRequestProcessing", sw.Elapsed, 2, 10);
}
return numReq;
}
///
/// Get single hash record
///
/// Redis Key for Hashlist
/// Requested key on list
/// Value as Int
public async Task RedHashGetInt(RedisKey currKey, string chiave)
{
int result = 0;
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
var hasVal = await redisDb.HashExistsAsync(currKey, chiave);
if (hasVal)
{
var rawRes = await redisDb.HashGetAsync(currKey, chiave);
if (rawRes.HasValue)
{
int.TryParse($"{rawRes}", out result);
}
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RedHashGetInt", sw.Elapsed, 0, 0.3);
}
return result;
}
///
/// Get single hash record
///
/// Redis Key for Hashlist
/// Requested key on list
/// Execute time log processing
/// Value as string
public async Task RedHashGetString(RedisKey currKey, string chiave, bool doTimeLog)
{
string result = "";
Stopwatch sw = new Stopwatch();
if (logTimingEnable && doTimeLog)
{
sw.Start();
}
var hasVal = await redisDb.HashExistsAsync(currKey, chiave);
if (hasVal)
{
var rawRes = await redisDb.HashGetAsync(currKey, chiave);
if (rawRes.HasValue)
{
result = $"{rawRes}";
}
}
if (logTimingEnable && doTimeLog)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RedHashGetString", sw.Elapsed, 0, 0.3);
}
return result;
}
///
/// Remove for single hash record
///
/// Chiave redis della Hashlist
/// Chiave nella HashList
/// Esito rimozione
public async Task RedHashRemove(RedisKey currKey, string chiave)
{
bool fatto = false;
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
fatto = await redisDb.HashDeleteAsync(currKey, chiave);
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RedHashRemove", sw.Elapsed, 0, 0.3);
}
return fatto;
}
///
/// Effettua upsert in HasList redis
///
/// Chiave redis della Hashlist
/// Chiave nella HashList
/// Valore da salvare
/// Gestione log time abilitata
/// Num record nella HashList
public async Task RedHashUpsert(RedisKey currKey, string chiave, string valore, bool doTimeLog = true)
{
long numReq = 0;
Stopwatch sw = new Stopwatch();
if (logTimingEnable && doTimeLog)
{
sw.Start();
}
await redisDb.HashSetAsync(currKey, chiave, valore);
numReq = await redisDb.HashLengthAsync(currKey);
if (logTimingEnable && doTimeLog)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RedHashUpsert", sw.Elapsed, 0, 0.3);
}
return numReq;
}
///
/// Rimuove info della porta in oggetto
///
/// formato DoorId:Versione
///
public async Task RedisBulkDelHashByKey(int doorId)
{
await DoorRefreshRemove($"{doorId}");
await RequestDoneRemove($"{doorId}");
await RequestErrRemove($"{doorId}");
await RequestTypeRemove($"{doorId}");
await RequestPendingRemove($"{doorId}");
await RequestProcessingRemove($"{doorId}");
return true;
}
///
/// Get Queue request done
///
/// Dictionary of DoorId, saveVersNumb
public async Task> RequestDone()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RequestDone", sw.Elapsed, 1, 10);
}
return dictResult;
}
///
/// Get LAST request done for DoorId
///
///
///
public async Task> RequestDoneGetSingle(int DoorId)
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAsync(currKey, $"{DoorId}");
if (rawData.HasValue)
{
dictResult.Add($"{DoorId}", $"{rawData}");
}
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RequestDoneGetSingle", sw.Elapsed, 1, 0.3);
}
return dictResult;
}
///
/// Remove for single hash record
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestDoneRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Upsert for single hash record
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestDoneUpsert(string doorId, string vers)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE);
long numReq = await RedHashUpsert(currKey, doorId, vers);
return numReq;
}
///
/// Get Queue request with errors
///
/// Dictionary of DoorId, saveVersNumb
public async Task> RequestErr()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RequestErr", sw.Elapsed, 1, 10);
}
return dictResult;
}
///
/// Rimuove hash record errori
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestErrRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Upsert record errori
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestErrUpsert(string doorId, string vers)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS);
long numReq = await RedHashUpsert(currKey, doorId, vers);
return numReq;
}
///
/// Get Queue request pending
///
/// Dictionary of DoorId, saveVersNumb
public async Task> RequestPending()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RequestPending", sw.Elapsed, 1, 10);
}
return dictResult;
}
///
/// Remove for single hash record
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestPendingRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Upsert for single hash record of req pending
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestPendingUpsert(string doorId, string vers)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
long numReq = await RedHashUpsert(currKey, doorId, vers);
return numReq;
}
///
/// Get Queue request processing
///
/// Dictionary of DoorId, saveVersNumb
public async Task> RequestProcessing()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RequestProcessing", sw.Elapsed, 1, 10);
}
return dictResult;
}
///
/// Remove for single hash record
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestProcessingRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Upsert for single hash record in processing
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestProcessingUpsert(string doorId, string vers)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
long numReq = await RedHashUpsert(currKey, doorId, vers);
// aggiungo cache veto rimozione da coda processing
RedisKey currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:{doorId}");
await redisDb.StringSetAsync(currVetoKey, vers, TimeSpan.FromSeconds(vetoRemoveProcSec));
return numReq;
}
///
/// Get list of request type
///
/// Dictionary of DoorId, saveVersNumb
public async Task> RequestType()
{
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_TYPE);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
dictResult.Add($"{item.Name}", $"{item.Value}");
}
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("RequestType", sw.Elapsed, 1, 10);
}
return dictResult;
}
///
/// Remove for single hash record from TYPE hash table
///
/// Dictionary of DoorId, saveVersNumb
public async Task RequestTypeRemove(string doorId)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_TYPE);
bool fatto = await RedHashRemove(currKey, doorId);
return fatto;
}
///
/// Upsert for single hash record of TYPE
///
/// Dictionary of DoorId, Type proc requested
public async Task RequestTypeUpsert(string doorId, string type)
{
RedisKey currKey = new RedisKey(Constants.CALC_REQ_TYPE);
long numReq = 0;
bool answ = false;
if (!string.IsNullOrEmpty(type))
{
numReq = await RedHashUpsert(currKey, doorId, type);
answ = numReq > 0;
}
// se vuoto --> rimuovo!
else
{
await RedHashRemove(currKey, doorId);
answ = true;
}
return answ;
}
///
/// Reset to queue request all processing/processed data
///
/// Dictionary of DoorId, saveVersNumb
public async Task ResetQueue()
{
bool fatto = false;
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
// cerco le richieste processing
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestProcessingRemove(item.Name!);
fatto = true;
}
// cerco le richieste con errori
currKey = new RedisKey(Constants.CALC_REQ_ERRS);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestErrRemove(item.Name!);
fatto = true;
}
// cerco le richieste processed
currKey = new RedisKey(Constants.CALC_REQ_DONE);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestDoneRemove(item.Name!);
fatto = true;
}
// svuoto in blocco hashatble tipo richieste...
currKey = new RedisKey(Constants.CALC_REQ_TYPE);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
await RequestTypeRemove(item.Name!);
fatto = true;
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("ResetQueue", sw.Elapsed, 1, 0.3);
}
return fatto;
}
///
/// Reset to queue request for specified DoorIdList
///
/// Dictionary of DoorId, saveVersNumb
public async Task> ResetQueueByDoorList(List DoorIdList)
{
int num2proc = DoorIdList.Count;
List listDone = new List();
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
// cerco le richieste processing
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
// se è nell'elenco...
if (DoorIdList.Contains($"{item.Name}"))
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestProcessingRemove(item.Name!);
listDone.Add($"{item.Name}");
}
}
// cerco le richieste con errori
currKey = new RedisKey(Constants.CALC_REQ_ERRS);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
// se è nell'elenco...
if (DoorIdList.Contains($"{item.Name}"))
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestErrRemove(item.Name!);
listDone.Add($"{item.Name}");
}
}
// cerco le richieste processed
currKey = new RedisKey(Constants.CALC_REQ_DONE);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
// se è nell'elenco...
if (DoorIdList.Contains($"{item.Name}"))
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestDoneRemove(item.Name!);
listDone.Add($"{item.Name}");
}
}
// svuoto hashatble tipo richieste...
currKey = new RedisKey(Constants.CALC_REQ_TYPE);
rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
// se è nell'elenco...
if (DoorIdList.Contains($"{item.Name}"))
{
await RequestTypeRemove(item.Name!);
listDone.Add($"{item.Name}");
}
}
if (logTimingEnable)
{
sw.Stop();
Log.Debug($"ResetQueueByDoorList | doors #: {num2proc} --> {DoorIdList.Count} | EXEC in: {sw.Elapsed.TotalMilliseconds} ms");
}
return DoorIdList;
}
///
/// Reset to queue request all processing stuck data (veto check)
///
/// Boolean, executed
public async Task ResetQueueProcessing()
{
bool fatto = false;
await Task.Delay(1);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
// cerco le richieste processing
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC);
RedisKey currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:0");
var rawData = await redisDb.HashGetAllAsync(currKey);
foreach (var item in rawData)
{
// verifico SE siano senza veto rimozione...
currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:{item.Name}");
var redisVal = await redisDb.StringGetAsync(currVetoKey);
if (!redisVal.HasValue)
{
await RequestPendingUpsert(item.Name!, item.Value!);
await RequestProcessingRemove(item.Name!);
fatto = true;
}
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("ResetQueueProcessing", sw.Elapsed, 1, 10);
}
return fatto;
}
///
/// Salvo elenco risultati elaborazioni (modalità boolean di esecuzione corretta)
///
/// Risultati elaborazioni in formato CalcResultDTO
///
public async Task SaveProcessingResult(List calcResults)
{
bool answ = true;
if (calcResults != null && calcResults.Count > 0)
{
string sDoorId = "";
string sCurrVers = "";
bool doorIsTpl = false;
foreach (var calcTask in calcResults)
{
RedisKey currSvgKey = new RedisKey("");
var doorData = calcTask.DoorIdVers.Split(".");
sDoorId = doorData.Length > 0 ? doorData[0] : "";
sCurrVers = doorData.Length > 0 ? doorData[1] : "";
// verifico se sia template --> durata indefinita...
doorIsTpl = await DoorTplListContainsDoor(sDoorId);
// se valido salvo SVG...
if (calcTask.Validated)
{
// salvo in area REDIS
currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{sDoorId}:{sCurrVers}");
// se template --> no scadenza
if (doorIsTpl)
{
await redisDb.StringSetAsync(currSvgKey, calcTask.RawContent);
}
else
{
// 2024.02.15 salvo in cache x 2 settimane, fare 1+ mesi?!?
await redisDb.StringSetAsync(currSvgKey, calcTask.RawContent, DblWeekLongCache);
}
// elimino dalle code proc/err
await RequestProcessingRemove(sDoorId);
await RequestErrRemove(sDoorId);
// metto in coda done
await RequestDoneUpsert(sDoorId, sCurrVers);
}
// altrimenti salvo errore e metto in coda errori
else
{
// salvo in area REDIS
currSvgKey = new RedisKey($"{Constants.CALC_REQ_ERRS}:{sDoorId}:{sCurrVers}");
if (doorIsTpl)
{
await redisDb.StringSetAsync(currSvgKey, calcTask.ErrorMsg);
}
else
{
await redisDb.StringSetAsync(currSvgKey, calcTask.ErrorMsg, WeekLongCache);
}
// elimino dalle code proc/done
await RequestProcessingRemove(sDoorId);
await RequestDoneRemove(sDoorId);
// metto in coda err
await RequestErrUpsert(sDoorId, sCurrVers);
}
// invio il messaggio di ritorno...
CalcDonePipe.saveAndSendMessage(Constants.LAST_CALC_DONE_KEY, calcTask.DoorIdVers);
}
}
else
{
await Task.Delay(1);
}
return answ;
}
///
/// Invio richiesta di calcolo per la porta indicata, dato il suo DDF
///
///
/// Contenuto completo del DDF
/// Tipo di file richeisto in OUT: svg / 3dm...
/// indice/versione del calcolo DDF
public async Task SendCalcReq(int DoorId, string FullDDF, string? MimeType = "")
{
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
string sDoorId = $"{DoorId}";
int currVers = await DoorCalcRev(sDoorId);
string sCurrVers = $"{currVers}";
// salvo il mimeType (se nullo è "svg")
string mimeType = MimeType ?? "";
await RequestTypeUpsert(sDoorId, mimeType);
// elimino da code errori (SE fosse presente)
await RequestErrRemove(sDoorId);
// salvo nell'archivio REDIS delle porte il DDF corrente (key=doorId.versNumb), potrebbe
// venire buono anche x eventuale UNDO...
RedisKey currDdfKey = new RedisKey($"{Constants.CALC_REQ_DDF_CACHE}:{sDoorId}:{sCurrVers}");
await redisDb.StringSetAsync(currDdfKey, FullDDF, DblWeekLongCache);
// invio sul canale dei messaggi il numero di items in coda attuali x chiedere esecuzione...
long numPending = await NumRequestPending();
bool answ = CalcReqPipe.saveAndSendMessage(Constants.LAST_CALC_REQ_KEY, $"{numPending}");
if (logTimingEnable)
{
sw.Stop();
Log.Debug($"SendCalcReq | DoorId: {DoorId} enqueued in: {sw.Elapsed.TotalMilliseconds} ms");
}
return currVers;
}
///
/// Restituisce dati statistica richiesta
///
/// Nome statistica
///
public async Task StatGetAsync(string statName)
{
ExecStats answ = new ExecStats(0, new TimeSpan());
RedisKey currKey = new RedisKey(Constants.STATS_DATA);
// cerco nella hashTable...
var rawData = await RedHashGetString(currKey, statName, false);
// se c'è rileggo ed aggiorno
if (!string.IsNullOrEmpty(rawData))
{
var currStat = JsonConvert.DeserializeObject(rawData);
if (currStat != null)
{
answ = currStat;
}
}
return answ;
}
///
/// Resetta dati statistica esecuzione x log restituendo i valori precedentemente contenuti
///
/// Nome statistica
///
public async Task StatReset(string statName)
{
RedisKey currKey = new RedisKey(Constants.STATS_DATA);
ExecStats answ = await StatGetAsync(statName);
// ora rimuovo valore
await RedHashRemove(currKey, statName);
return answ;
}
///
/// Effettua upsert dati statistica esecuzione x log
///
/// Nome statistica
/// Durata esecuzione come timespan
/// moltiplicatore soglia x log statistica (default = 1)
///
public async Task StatUpsert(string statName, TimeSpan duration, double evMultipl)
{
bool answ = false;
RedisKey currKey = new RedisKey(Constants.STATS_DATA);
// cerco nella hashTable...
var rawData = await RedHashGetString(currKey, statName, false);
// di default inizializzo a null...
ExecStats newStat = new ExecStats(1, duration);
// se c'è rileggo ed aggiorno
if (!string.IsNullOrEmpty(rawData))
{
var currStat = JsonConvert.DeserializeObject(rawData);
if (currStat != null)
{
currStat.NumCall++;
currStat.TotalTime += duration;
newStat = currStat;
}
}
// ora aggiorno su redis
rawData = JsonConvert.SerializeObject(newStat);
var numRec = await RedHashUpsert(currKey, statName, rawData, false);
// verifico se da considerare superato limite sample
answ = newStat.NumCall >= (int)(statSampleSize * evMultipl);
return answ;
}
///
/// Get Queue request pending, removing from queue and putting on processing queue
///
/// Dictionary of DoorId, saveVersNumb
public async Task> TakeProcessingItems(int numItems)
{
int maxTake = Math.Min(10, numItems);
long numReq = 0;
Dictionary dictResult = new Dictionary();
// cerco da cache
RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND);
Stopwatch sw = new Stopwatch();
if (logTimingEnable)
{
sw.Start();
}
// calcolo il totale delle richieste pending
numReq = redisDb.HashLength(currKey);
if (numReq > 0)
{
// per prima cosa aspetto che NON sia locked la coda...
while (queueLock)
{
Thread.Sleep(rnd.Next(1, 20));
}
// blocco
queueLock = true;
var rawData = await redisDb.HashGetAllAsync(currKey);
// sposto fino a concorrenza...
foreach (var item in rawData)
{
// per prima cosa tolgo da item richiesti e metto in processing
await RequestPendingRemove(item.Name!);
await RequestProcessingUpsert(item.Name!, item.Value!);
maxTake--;
// recupero il DDF...
RedisKey currDdfKey = new RedisKey($"{Constants.CALC_REQ_DDF_CACHE}:{item.Name}:{item.Value}");
var rawDDF = await redisDb.StringGetAsync(currDdfKey);
// verifico il tpo richiesta, se non trovassi prendo "svg" default...
var curTypeKey = new RedisKey(Constants.CALC_REQ_TYPE);
var rawType = await RedHashGetString(curTypeKey, item.Name!, true);
string mimeType = string.IsNullOrEmpty(rawType) ? "svg" : $"{rawType}";
// preparo DTO
CalcReqtDTO currReq = new CalcReqtDTO()
{
MimeType = mimeType,
DDF = $"{rawDDF}"
};
dictResult.Add($"{item.Name}.{item.Value}", currReq);
if (maxTake <= 0)
{
break;
}
}
// sblocco
queueLock = false;
}
if (logTimingEnable)
{
sw.Stop();
// gestione statistiche
await ProcStatLog("TakeProcessingItems", sw.Elapsed, 1, 0.3);
}
return dictResult;
}
#endregion Public Methods
#region Protected Fields
protected const string rKeyCalcOreFase = "Check:OreFasi";
protected const string rKeyFasiAct = "Check:FasiAct";
protected const string rKeyProjAct = "Check:ProjAct";
///
/// TTL da 1 min x cache Redis
///
protected const int shortTTL = 60 * 5;
protected int idxSim = 0;
protected Random rnd = new Random();
#endregion Protected Fields
#region Protected Properties
protected static bool queueLock { get; set; } = false;
#endregion Protected Properties
#region Protected Methods
///
/// Recupera revisione corrente della porta o inizializza
///
///
///
protected async Task DoorCalcRev(string sDoorId)
{
/*----------------------------------
* Recupero Rev corrente porta o inizializza cercando
* - coda calcoli pending
* - coda errori
* - coda calcoli eseguiti
*
* se non trova riparte da 1
*
----------------------------------*/
int currVers = 0;
string sCurrVers = "";
// per prima cosa controllo se ho GIA' in coda qualcosa come richieste
if (await NumRequestPending() > 0)
{
var currPending = await RequestPending();
if (currPending != null)
{
if (currPending.ContainsKey(sDoorId))
{
sCurrVers = currPending[sDoorId];
}
}
}
// cerco coda errori
if (string.IsNullOrEmpty(sCurrVers) && await NumRequestErrors() > 0)
{
var currErrors = await RequestErr();
if (currErrors != null)
{
if (currErrors.ContainsKey(sDoorId))
{
sCurrVers = currErrors[sDoorId];
}
}
}
// cerco coda task completati
if (string.IsNullOrEmpty(sCurrVers) && await NumRequestDone() > 0)
{
var currDone = await RequestDone();
if (currDone != null)
{
if (currDone.ContainsKey(sDoorId))
{
sCurrVers = currDone[sDoorId];
}
}
}
// calcolo ed incremento
int.TryParse(sCurrVers, out currVers);
currVers++;
// inserisco in coda x calcoli
await RequestPendingUpsert(sDoorId, $"{currVers}");
// metto in coda "last calc" la porta corrente x pulizia successiva
await DoorRefreshUpsert(sDoorId);
return currVers;
}
///
/// Recupero chiave da redis
///
///
///
protected async Task getRSV(string rKey)
{
string answ = "";
var rawData = await redisDb.StringGetAsync(rKey);
if (rawData.HasValue)
{
answ = $"{rawData}";
}
return answ;
}
///
/// Salvataggio chiave in redis
///
///
///
///
///
protected async Task setRSV(string rKey, string rVal, int ttlSec)
{
bool fatto = false;
fatto = await redisDb.StringSetAsync(rKey, rVal, TimeSpan.FromSeconds(ttlSec));
return fatto;
}
///
/// Salvataggio chiave in redis
///
///
///
///
///
protected async Task setRSV(string rKey, int rValInt, int ttlSec)
{
bool fatto = false;
fatto = await redisDb.StringSetAsync(rKey, rValInt, TimeSpan.FromSeconds(ttlSec));
return fatto;
}
///
/// Registra in cache chiave se non fosse già in elenco
///
///
protected void trackCache(string newKey)
{
if (!cachedDataList.Contains(newKey))
{
cachedDataList.Add(newKey);
}
}
#endregion Protected Methods
#region Private Fields
private static IConfiguration _configuration = null!;
private static JsonSerializerSettings? JSSettings;
private static Logger Log = LogManager.GetCurrentClassLogger();
///
/// Dimensione campione da registrare e trascrivere in log
///
private static int statSampleSize = 1;
private readonly IEmailSender _emailSender;
///
/// Elenco obj in cache
///
private List cachedDataList = new List();
///
/// Durata cache lunga IN SECONDI
///
private int cacheTtlLong = 60 * 5;
///
/// Durata cache breve IN SECONDI
///
private int cacheTtlShort = 60 * 1;
///
/// abilitazione time log con stopwatch
///
private bool logTimingEnable = false;
///
/// Path dell'immagine "missing"
///
private string missingFilePath = "";
///
/// Contenuto SVG dell'immagine missing
///
private string missingSvgContent = "";
///
/// Oggetto per connessione a REDIS
///
private ConnectionMultiplexer redisConn = null!;
///
/// Oggetto DB redis da impiegare x chiamate R/W
///
private IDatabase redisDb = null!;
#endregion Private Fields
#region Private Properties
///
/// Durata cache di 24h
///
private TimeSpan DayLongCache
{
get => TimeSpan.FromHours(24);
}
///
/// Durata cache di 2 settimane/15gg
///
private TimeSpan DblWeekLongCache
{
get => TimeSpan.FromHours(24 * 15 * rnd.Next(1000, 1100) / 1000);
}
///
/// Durata cache lunga (+ perturbazione percentuale +/-10%)
///
private TimeSpan FastCache
{
get => TimeSpan.FromSeconds(cacheTtlShort * rnd.Next(900, 1100) / 1000);
}
///
/// Durata cache lunga (+ perturbazione percentuale +/-10%)
///
private TimeSpan LongCache
{
get => TimeSpan.FromSeconds(cacheTtlLong * rnd.Next(900, 1100) / 1000);
}
///
/// Durata cache di 24h
///
private TimeSpan MonthLongCache
{
get => TimeSpan.FromDays(30);
}
///
/// Durata cache lunga (+ perturbazione percentuale +/-10%)
///
private TimeSpan UltraLongCache
{
get => TimeSpan.FromSeconds(cacheTtlLong * 10 * rnd.Next(900, 1100) / 1000);
}
private int vetoRemoveProcSec { get; set; } = 10;
///
/// Durata cache di 1 settimana
///
private TimeSpan WeekLongCache
{
get => TimeSpan.FromHours(24 * 7 * rnd.Next(1000, 1100) / 1000);
}
#endregion Private Properties
#region Private Methods
///
/// Esegue flush memoria redis dato pattern
///
///
///
private async Task ExecFlushRedisPattern(RedisValue pattern)
{
bool answ = false;
var listEndpoints = redisConn.GetEndPoints();
foreach (var endPoint in listEndpoints)
{
//var server = redisConnAdmin.GetServer(listEndpoints[0]);
var server = redisConn.GetServer(endPoint);
if (server != null)
{
var keyList = server.Keys(redisDb.Database, pattern);
foreach (var item in keyList)
{
await redisDb.KeyDeleteAsync(item);
}
// brutalmente rimuovo intero contenuto DB... DANGER
//await server.FlushDatabaseAsync();
answ = true;
}
}
return answ;
}
///
/// Processa log statistiche:
/// - accumula dati
/// - se superata soglia (soglia x evMultipl) registra log
///
/// nome statistica
/// durata evento tracciato
///
/// livello richiesto: 0:trace, 1:debug, 2:info, 3:warn, 4:error, 5: fatal, 6: off
///
/// moltiplicatore soglia x log statistica (default = 1)
///
private async Task ProcStatLog(string statName, TimeSpan elapsed, int logLevReq, double evMultipl)
{
bool doWrite = await StatUpsert(statName, elapsed, evMultipl);
if (doWrite)
{
// recupero e resetto
ExecStats statRec = await StatReset(statName);
string logMsg = $"Eseguito {statName} x {statRec.NumCall} | {statRec.AvgTime.TotalMilliseconds:N3}ms";
switch (logLevReq)
{
case 0:
Log.Trace(logMsg);
break;
case 1:
Log.Debug(logMsg);
break;
case 2:
Log.Info(logMsg);
break;
case 3:
Log.Warn(logMsg);
break;
case 4:
Log.Error(logMsg);
break;
case 5:
Log.Fatal(logMsg);
break;
case 6:
default:
Log.Error(logMsg);
break;
}
}
}
#endregion Private Methods
}
}