using Microsoft.AspNetCore.Identity.UI.Services; using Microsoft.Extensions.Configuration; using Newtonsoft.Json; using NLog; using NLog.LayoutRenderers.Wrappers; using StackExchange.Redis; using System.Diagnostics; using WebDoorCreator.Core; using WebDoorCreator.Data.DTO; namespace WebDoorCreator.Data.Services { public class QueueDataService : IDisposable { #region Public Constructors /// /// Init classe /// /// /// /// public QueueDataService(IConfiguration configuration, IEmailSender emailSender, IConnectionMultiplexer redisConn) { Log.Info("QueueDataService starting..."); _configuration = configuration; _emailSender = emailSender; // setup componenti REDIS var redConnString = _configuration.GetConnectionString("Redis"); if (redConnString == null) { Log.Error("REDIS ConnString empty!"); } else { this.redisConn = ConnectionMultiplexer.Connect(redConnString); redisDb = this.redisConn.GetDatabase(); } // setup canali pub/sub CalcReqPipe = new MessagePipe(redisConn, Constants.CALC_REQ_QUEUE); CalcDonePipe = new MessagePipe(redisConn, Constants.CALC_DONE_QUEUE); // json serializer... FIX errore loop circolare https://www.ryadel.com/en/jsonserializationexception-self-referencing-loop-detected-error-fix-entity-framework-asp-net-core/ JSSettings = new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }; // leggo conf speciali vetoRemoveProcSec = _configuration.GetValue("RuntimeOpt:VetoRemoveProcessing"); logTimingEnable = configuration.GetValue("RuntimeOpt:LogTimingEnable"); statSampleSize = configuration.GetValue("RuntimeOpt:StatSampleSize"); // fix dati img missing... missingFilePath = Path.Combine(Directory.GetCurrentDirectory(), "wwwroot/images", "MissingOrange.svg"); if (File.Exists(missingFilePath)) { missingSvgContent = File.ReadAllText(missingFilePath); } Log.Info("QueueDataService started!"); } #endregion Public Constructors #region Public Properties /// /// Message pipe esecuzione elaborazione CAM --> UI /// public MessagePipe CalcDonePipe { get; set; } = null!; /// /// Message pipe richieste elaborazione UI --> CAM /// public MessagePipe CalcReqPipe { get; set; } = null!; #endregion Public Properties #region Public Methods public void Dispose() { redisConn.Dispose(); } /// /// Effettua pulizia cache dati calcolo porte /// /// /// public async Task DoorCalcCacheCleanup() { /*---------------------------------- * Recupero Rev corrente porta o inizializza cercando * - coda calcoli pending * - coda errori * - coda calcoli eseguiti * * se non trova riparte da 1 * ----------------------------------*/ bool fatto = false; int numDayPre = 30; if (!string.IsNullOrEmpty(_configuration.GetValue("RuntimeOpt:MaxDayCalcCache"))) { numDayPre = _configuration.GetValue("RuntimeOpt:MaxDayCalcCache"); } DateTime dtLimite = DateTime.Now.AddDays(-numDayPre); // recupero elenco di tutti i refresh porte... var refreshStatus = await DoorRefreshStatus(); // ciclo su tutti e cerco quelli + vecchi di x giorni if (refreshStatus != null && refreshStatus.Count > 0) { DateTime dtCurr = DateTime.Now; foreach (var item in refreshStatus) { // confronto data-ora della singola porta... if (!string.IsNullOrEmpty(item.Key)) { try { var rawDate = JsonConvert.DeserializeObject(item.Value); // se esiste e SE è scaduto if (rawDate > DateTime.MinValue && rawDate < dtLimite) { // cancello i dati! await RequestProcessingRemove(item.Key); await RequestErrRemove(item.Key); await RequestDoneRemove(item.Key); await DoorRefreshRemove(item.Key); } } catch { } } } } await Task.Delay(1); return fatto; } /// /// Restitusice gli errori della porta in oggetto /// /// formato DoorId:Versione /// public async Task DoorErrExists(string doorIdVers) { bool hasErr = false; // cerco versione errore x la porta... var doorData = doorIdVers.Split(":"); if (doorData.Length == 2) { RedisKey currErrKey = new RedisKey($"{Constants.CALC_REQ_ERRS}"); int currId = await RedHashGetInt(currErrKey, doorData[0]); hasErr = doorData[1] == $"{currId}"; } return hasErr; } /// /// Restitusice gli errori della porta in oggetto /// /// formato DoorId:Versione /// public async Task DoorErrGet(string doorIdVers) { RedisKey currErrKey = new RedisKey($"{Constants.CALC_REQ_ERRS}:{doorIdVers}"); RedisValue errVal = await redisDb.StringGetAsync(currErrKey); return errVal.ToString(); } /// /// Cerca nelle esecuzioni ultimo SVG della porta e lo restituisce... /// /// /// public async Task DoorGetLastSvg(int DoorId) { string answ = ""; // cerco in hashtable dei task eseguiti ultimo x porta corrente... var doorData = await RequestDoneGetSingle(DoorId); if (doorData != null && doorData.Count > 0) { var firstRecord = doorData.FirstOrDefault(); // recupero da REDIS var currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{firstRecord.Key}:{firstRecord.Value}"); var rawData = await redisDb.StringGetAsync(currSvgKey); if (rawData.HasValue) { answ = $"{rawData}"; } } return answ; } /// /// Restituisce SVG dell'immagine missing... /// /// /// public string DoorGetMissingSvg() { string answ = missingSvgContent; if (string.IsNullOrEmpty(answ)) { // fix dati img missing... missingFilePath = Path.Combine(Directory.GetCurrentDirectory(), "wwwroot/images", "MissingOrange.svg"); if (File.Exists(missingFilePath)) { missingSvgContent = File.ReadAllText(missingFilePath); answ = missingSvgContent; } } return answ; } /// /// Cerca nei VetoProc se ci sia x porta corrente /// /// /// public async Task DoorProcVetoGetAsync(int DoorId) { string answ = ""; // recupero da REDIS var currVetoKey = new RedisKey($"{Constants.CALC_REQ_VETO_REC}:{DoorId}"); var rawData = await redisDb.StringGetAsync(currVetoKey); if (rawData.HasValue) { answ = $"{rawData}"; } return answ; } /// /// imposta un VetoProc x porta corrente x 1 sec /// /// ID porta da vietare x ricalcolo /// Durata veto in ms /// public async Task DoorProcVetoSetAsync(int DoorId, int msVeto) { // recupero da REDIS var currVetoKey = new RedisKey($"{Constants.CALC_REQ_VETO_REC}:{DoorId}"); var rawData = await redisDb.StringSetAsync(currVetoKey, $"{DoorId}", TimeSpan.FromMilliseconds(msVeto)); } /// /// Remove for single hash record /// /// public async Task DoorRefreshRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Get Door (last) Refresh status /// /// Dictionary of DoorId, saveVersNumb public async Task> DoorRefreshStatus() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("DoorRefreshStatus", sw.Elapsed, 1, 0.3); } return dictResult; } /// /// Upsert info ultimo update di una porta /// /// Dictionary of DoorId, saveVersNumb public async Task DoorRefreshUpsert(string doorId) { DateTime adesso = DateTime.Now; string sAdesso = JsonConvert.SerializeObject(adesso, JSSettings); RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY); long numReq = await RedHashUpsert(currKey, doorId, sAdesso); return numReq; } /// /// Restitusice l'SVG della porta in oggetto /// /// formato DoorId:Versione /// public async Task DoorSvgGet(string doorIdVers) { RedisKey currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{doorIdVers}"); RedisValue svgVal = await redisDb.StringGetAsync(currSvgKey); return svgVal.ToString(); } /// /// Check if specified DoorId is in Template door list /// /// Door Id to search /// Found (true/false) public async Task DoorTplListContainsDoor(string DoorId) { bool found = false; RedisKey currKey = new RedisKey(Constants.DOOR_TPL_LIST); var rawVal = await RedHashGetString(currKey, DoorId, true); found = !string.IsNullOrEmpty(rawVal) && (rawVal == DoorId); return found; } /// /// Add specified DoorIdList to Template door list /// /// List to add /// Num of items in list public async Task DoorTplListUpsert(List DoorIdList) { RedisKey currKey = new RedisKey(Constants.DOOR_TPL_LIST); long numReq = 0; foreach (var item in DoorIdList) { numReq = await RedHashUpsert(currKey, item, item); } return numReq; } public async Task FlushRedisCache() { await Task.Delay(1); RedisValue pattern = new RedisValue($"{Constants.BASE_HASH}:Cache*"); bool answ = await ExecFlushRedisPattern(pattern); return answ; } /// /// Get # of calculation request done /// public async Task NumRequestDone() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = await redisDb.HashLengthAsync(currKey); if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("NumRequestDone", sw.Elapsed, 2, 10); } return numReq; } /// /// Get # of calculation request with errors /// public async Task NumRequestErrors() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = await redisDb.HashLengthAsync(currKey); if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("NumRequestErrors", sw.Elapsed, 2, 10); } return numReq; } /// /// Get # of calculation request pending /// public async Task NumRequestPending() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = await redisDb.HashLengthAsync(currKey); if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("NumRequestPending", sw.Elapsed, 2, 10); } return numReq; } /// /// Get # of calculation request processing /// public async Task NumRequestProcessing() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = await redisDb.HashLengthAsync(currKey); if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("NumRequestProcessing", sw.Elapsed, 2, 10); } return numReq; } /// /// Get single hash record /// /// Redis Key for Hashlist /// Requested key on list /// Value as Int public async Task RedHashGetInt(RedisKey currKey, string chiave) { int result = 0; Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } var hasVal = await redisDb.HashExistsAsync(currKey, chiave); if (hasVal) { var rawRes = await redisDb.HashGetAsync(currKey, chiave); if (rawRes.HasValue) { int.TryParse($"{rawRes}", out result); } } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("RedHashGetInt", sw.Elapsed, 0, 0.3); } return result; } /// /// Get single hash record /// /// Redis Key for Hashlist /// Requested key on list /// Execute time log processing /// Value as string public async Task RedHashGetString(RedisKey currKey, string chiave, bool doTimeLog) { string result = ""; Stopwatch sw = new Stopwatch(); if (logTimingEnable && doTimeLog) { sw.Start(); } var hasVal = await redisDb.HashExistsAsync(currKey, chiave); if (hasVal) { var rawRes = await redisDb.HashGetAsync(currKey, chiave); if (rawRes.HasValue) { result = $"{rawRes}"; } } if (logTimingEnable && doTimeLog) { sw.Stop(); // gestione statistiche await ProcStatLog("RedHashGetString", sw.Elapsed, 0, 0.3); } return result; } /// /// Remove for single hash record /// /// Chiave redis della Hashlist /// Chiave nella HashList /// Esito rimozione public async Task RedHashRemove(RedisKey currKey, string chiave) { bool fatto = false; Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } fatto = await redisDb.HashDeleteAsync(currKey, chiave); if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("RedHashRemove", sw.Elapsed, 0, 0.3); } return fatto; } /// /// Effettua upsert in HasList redis /// /// Chiave redis della Hashlist /// Chiave nella HashList /// Valore da salvare /// Gestione log time abilitata /// Num record nella HashList public async Task RedHashUpsert(RedisKey currKey, string chiave, string valore, bool doTimeLog = true) { long numReq = 0; Stopwatch sw = new Stopwatch(); if (logTimingEnable && doTimeLog) { sw.Start(); } await redisDb.HashSetAsync(currKey, chiave, valore); numReq = await redisDb.HashLengthAsync(currKey); if (logTimingEnable && doTimeLog) { sw.Stop(); // gestione statistiche await ProcStatLog("RedHashUpsert", sw.Elapsed, 0, 0.3); } return numReq; } /// /// Rimuove info della porta in oggetto /// /// formato DoorId:Versione /// public async Task RedisBulkDelHashByKey(int doorId) { await DoorRefreshRemove($"{doorId}"); await RequestDoneRemove($"{doorId}"); await RequestErrRemove($"{doorId}"); await RequestTypeRemove($"{doorId}"); await RequestPendingRemove($"{doorId}"); await RequestProcessingRemove($"{doorId}"); return true; } /// /// Get Queue request done /// /// Dictionary of DoorId, saveVersNumb public async Task> RequestDone() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("RequestDone", sw.Elapsed, 1, 10); } return dictResult; } /// /// Get LAST request done for DoorId /// /// /// public async Task> RequestDoneGetSingle(int DoorId) { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAsync(currKey, $"{DoorId}"); if (rawData.HasValue) { dictResult.Add($"{DoorId}", $"{rawData}"); } } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("RequestDoneGetSingle", sw.Elapsed, 1, 0.3); } return dictResult; } /// /// Remove for single hash record /// /// Dictionary of DoorId, saveVersNumb public async Task RequestDoneRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Upsert for single hash record /// /// Dictionary of DoorId, saveVersNumb public async Task RequestDoneUpsert(string doorId, string vers) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); long numReq = await RedHashUpsert(currKey, doorId, vers); return numReq; } /// /// Get Queue request with errors /// /// Dictionary of DoorId, saveVersNumb public async Task> RequestErr() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("RequestErr", sw.Elapsed, 1, 10); } return dictResult; } /// /// Rimuove hash record errori /// /// Dictionary of DoorId, saveVersNumb public async Task RequestErrRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Upsert record errori /// /// Dictionary of DoorId, saveVersNumb public async Task RequestErrUpsert(string doorId, string vers) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS); long numReq = await RedHashUpsert(currKey, doorId, vers); return numReq; } /// /// Get Queue request pending /// /// Dictionary of DoorId, saveVersNumb public async Task> RequestPending() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("RequestPending", sw.Elapsed, 1, 10); } return dictResult; } /// /// Remove for single hash record /// /// Dictionary of DoorId, saveVersNumb public async Task RequestPendingRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Upsert for single hash record of req pending /// /// Dictionary of DoorId, saveVersNumb public async Task RequestPendingUpsert(string doorId, string vers) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); long numReq = await RedHashUpsert(currKey, doorId, vers); return numReq; } /// /// Get Queue request processing /// /// Dictionary of DoorId, saveVersNumb public async Task> RequestProcessing() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("RequestProcessing", sw.Elapsed, 1, 10); } return dictResult; } /// /// Remove for single hash record /// /// Dictionary of DoorId, saveVersNumb public async Task RequestProcessingRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Upsert for single hash record in processing /// /// Dictionary of DoorId, saveVersNumb public async Task RequestProcessingUpsert(string doorId, string vers) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); long numReq = await RedHashUpsert(currKey, doorId, vers); // aggiungo cache veto rimozione da coda processing RedisKey currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:{doorId}"); await redisDb.StringSetAsync(currVetoKey, vers, TimeSpan.FromSeconds(vetoRemoveProcSec)); return numReq; } /// /// Get list of request type /// /// Dictionary of DoorId, saveVersNumb public async Task> RequestType() { long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_TYPE); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("RequestType", sw.Elapsed, 1, 10); } return dictResult; } /// /// Remove for single hash record from TYPE hash table /// /// Dictionary of DoorId, saveVersNumb public async Task RequestTypeRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_TYPE); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Upsert for single hash record of TYPE /// /// Dictionary of DoorId, Type proc requested public async Task RequestTypeUpsert(string doorId, string type) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_TYPE); long numReq = 0; bool answ = false; if (!string.IsNullOrEmpty(type)) { numReq = await RedHashUpsert(currKey, doorId, type); answ = numReq > 0; } // se vuoto --> rimuovo! else { await RedHashRemove(currKey, doorId); answ = true; } return answ; } /// /// Reset to queue request all processing/processed data /// /// Dictionary of DoorId, saveVersNumb public async Task ResetQueue() { bool fatto = false; Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } // cerco le richieste processing RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestProcessingRemove(item.Name!); fatto = true; } // cerco le richieste con errori currKey = new RedisKey(Constants.CALC_REQ_ERRS); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestErrRemove(item.Name!); fatto = true; } // cerco le richieste processed currKey = new RedisKey(Constants.CALC_REQ_DONE); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestDoneRemove(item.Name!); fatto = true; } // svuoto in blocco hashatble tipo richieste... currKey = new RedisKey(Constants.CALC_REQ_TYPE); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { await RequestTypeRemove(item.Name!); fatto = true; } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("ResetQueue", sw.Elapsed, 1, 0.3); } return fatto; } /// /// Reset to queue request for specified DoorIdList /// /// Dictionary of DoorId, saveVersNumb public async Task> ResetQueueByDoorList(List DoorIdList) { int num2proc = DoorIdList.Count; List listDone = new List(); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } // cerco le richieste processing RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { // se è nell'elenco... if (DoorIdList.Contains($"{item.Name}")) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestProcessingRemove(item.Name!); listDone.Add($"{item.Name}"); } } // cerco le richieste con errori currKey = new RedisKey(Constants.CALC_REQ_ERRS); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { // se è nell'elenco... if (DoorIdList.Contains($"{item.Name}")) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestErrRemove(item.Name!); listDone.Add($"{item.Name}"); } } // cerco le richieste processed currKey = new RedisKey(Constants.CALC_REQ_DONE); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { // se è nell'elenco... if (DoorIdList.Contains($"{item.Name}")) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestDoneRemove(item.Name!); listDone.Add($"{item.Name}"); } } // svuoto hashatble tipo richieste... currKey = new RedisKey(Constants.CALC_REQ_TYPE); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { // se è nell'elenco... if (DoorIdList.Contains($"{item.Name}")) { await RequestTypeRemove(item.Name!); listDone.Add($"{item.Name}"); } } if (logTimingEnable) { sw.Stop(); Log.Debug($"ResetQueueByDoorList | doors #: {num2proc} --> {DoorIdList.Count} | EXEC in: {sw.Elapsed.TotalMilliseconds} ms"); } return DoorIdList; } /// /// Reset to queue request all processing stuck data (veto check) /// /// Boolean, executed public async Task ResetQueueProcessing() { bool fatto = false; await Task.Delay(1); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } // cerco le richieste processing RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); RedisKey currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:0"); var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { // verifico SE siano senza veto rimozione... currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:{item.Name}"); var redisVal = await redisDb.StringGetAsync(currVetoKey); if (!redisVal.HasValue) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestProcessingRemove(item.Name!); fatto = true; } } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("ResetQueueProcessing", sw.Elapsed, 1, 10); } return fatto; } /// /// Salvo elenco risultati elaborazioni (modalità boolean di esecuzione corretta) /// /// Risultati elaborazioni in formato CalcResultDTO /// public async Task SaveProcessingResult(List calcResults) { bool answ = true; if (calcResults != null && calcResults.Count > 0) { string sDoorId = ""; string sCurrVers = ""; bool doorIsTpl = false; foreach (var calcTask in calcResults) { RedisKey currSvgKey = new RedisKey(""); var doorData = calcTask.DoorIdVers.Split("."); sDoorId = doorData.Length > 0 ? doorData[0] : ""; sCurrVers = doorData.Length > 0 ? doorData[1] : ""; // verifico se sia template --> durata indefinita... doorIsTpl = await DoorTplListContainsDoor(sDoorId); // se valido salvo SVG... if (calcTask.Validated) { // salvo in area REDIS currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{sDoorId}:{sCurrVers}"); // se template --> no scadenza if (doorIsTpl) { await redisDb.StringSetAsync(currSvgKey, calcTask.RawContent); } else { // 2024.02.15 salvo in cache x 2 settimane, fare 1+ mesi?!? await redisDb.StringSetAsync(currSvgKey, calcTask.RawContent, DblWeekLongCache); } // elimino dalle code proc/err await RequestProcessingRemove(sDoorId); await RequestErrRemove(sDoorId); // metto in coda done await RequestDoneUpsert(sDoorId, sCurrVers); } // altrimenti salvo errore e metto in coda errori else { // salvo in area REDIS currSvgKey = new RedisKey($"{Constants.CALC_REQ_ERRS}:{sDoorId}:{sCurrVers}"); if (doorIsTpl) { await redisDb.StringSetAsync(currSvgKey, calcTask.ErrorMsg); } else { await redisDb.StringSetAsync(currSvgKey, calcTask.ErrorMsg, WeekLongCache); } // elimino dalle code proc/done await RequestProcessingRemove(sDoorId); await RequestDoneRemove(sDoorId); // metto in coda err await RequestErrUpsert(sDoorId, sCurrVers); } // invio il messaggio di ritorno... CalcDonePipe.saveAndSendMessage(Constants.LAST_CALC_DONE_KEY, calcTask.DoorIdVers); } } else { await Task.Delay(1); } return answ; } /// /// Invio richiesta di calcolo per la porta indicata, dato il suo DDF /// /// /// Contenuto completo del DDF /// Tipo di file richeisto in OUT: svg / 3dm... /// indice/versione del calcolo DDF public async Task SendCalcReq(int DoorId, string FullDDF, string? MimeType = "") { Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } string sDoorId = $"{DoorId}"; int currVers = await DoorCalcRev(sDoorId); string sCurrVers = $"{currVers}"; // salvo il mimeType (se nullo è "svg") string mimeType = MimeType ?? ""; await RequestTypeUpsert(sDoorId, mimeType); // elimino da code errori (SE fosse presente) await RequestErrRemove(sDoorId); // salvo nell'archivio REDIS delle porte il DDF corrente (key=doorId.versNumb), potrebbe // venire buono anche x eventuale UNDO... RedisKey currDdfKey = new RedisKey($"{Constants.CALC_REQ_DDF_CACHE}:{sDoorId}:{sCurrVers}"); await redisDb.StringSetAsync(currDdfKey, FullDDF, DblWeekLongCache); // invio sul canale dei messaggi il numero di items in coda attuali x chiedere esecuzione... long numPending = await NumRequestPending(); bool answ = CalcReqPipe.saveAndSendMessage(Constants.LAST_CALC_REQ_KEY, $"{numPending}"); if (logTimingEnable) { sw.Stop(); Log.Debug($"SendCalcReq | DoorId: {DoorId} enqueued in: {sw.Elapsed.TotalMilliseconds} ms"); } return currVers; } /// /// Restituisce dati statistica richiesta /// /// Nome statistica /// public async Task StatGetAsync(string statName) { ExecStats answ = new ExecStats(0, new TimeSpan()); RedisKey currKey = new RedisKey(Constants.STATS_DATA); // cerco nella hashTable... var rawData = await RedHashGetString(currKey, statName, false); // se c'è rileggo ed aggiorno if (!string.IsNullOrEmpty(rawData)) { var currStat = JsonConvert.DeserializeObject(rawData); if (currStat != null) { answ = currStat; } } return answ; } /// /// Resetta dati statistica esecuzione x log restituendo i valori precedentemente contenuti /// /// Nome statistica /// public async Task StatReset(string statName) { RedisKey currKey = new RedisKey(Constants.STATS_DATA); ExecStats answ = await StatGetAsync(statName); // ora rimuovo valore await RedHashRemove(currKey, statName); return answ; } /// /// Effettua upsert dati statistica esecuzione x log /// /// Nome statistica /// Durata esecuzione come timespan /// moltiplicatore soglia x log statistica (default = 1) /// public async Task StatUpsert(string statName, TimeSpan duration, double evMultipl) { bool answ = false; RedisKey currKey = new RedisKey(Constants.STATS_DATA); // cerco nella hashTable... var rawData = await RedHashGetString(currKey, statName, false); // di default inizializzo a null... ExecStats newStat = new ExecStats(1, duration); // se c'è rileggo ed aggiorno if (!string.IsNullOrEmpty(rawData)) { var currStat = JsonConvert.DeserializeObject(rawData); if (currStat != null) { currStat.NumCall++; currStat.TotalTime += duration; newStat = currStat; } } // ora aggiorno su redis rawData = JsonConvert.SerializeObject(newStat); var numRec = await RedHashUpsert(currKey, statName, rawData, false); // verifico se da considerare superato limite sample answ = newStat.NumCall >= (int)(statSampleSize * evMultipl); return answ; } /// /// Get Queue request pending, removing from queue and putting on processing queue /// /// Dictionary of DoorId, saveVersNumb public async Task> TakeProcessingItems(int numItems) { int maxTake = Math.Min(10, numItems); long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); Stopwatch sw = new Stopwatch(); if (logTimingEnable) { sw.Start(); } // calcolo il totale delle richieste pending numReq = redisDb.HashLength(currKey); if (numReq > 0) { // per prima cosa aspetto che NON sia locked la coda... while (queueLock) { Thread.Sleep(rnd.Next(1, 20)); } // blocco queueLock = true; var rawData = await redisDb.HashGetAllAsync(currKey); // sposto fino a concorrenza... foreach (var item in rawData) { // per prima cosa tolgo da item richiesti e metto in processing await RequestPendingRemove(item.Name!); await RequestProcessingUpsert(item.Name!, item.Value!); maxTake--; // recupero il DDF... RedisKey currDdfKey = new RedisKey($"{Constants.CALC_REQ_DDF_CACHE}:{item.Name}:{item.Value}"); var rawDDF = await redisDb.StringGetAsync(currDdfKey); // verifico il tpo richiesta, se non trovassi prendo "svg" default... var curTypeKey = new RedisKey(Constants.CALC_REQ_TYPE); var rawType = await RedHashGetString(curTypeKey, item.Name!, true); string mimeType = string.IsNullOrEmpty(rawType) ? "svg" : $"{rawType}"; // preparo DTO CalcReqtDTO currReq = new CalcReqtDTO() { MimeType = mimeType, DDF = $"{rawDDF}" }; dictResult.Add($"{item.Name}.{item.Value}", currReq); if (maxTake <= 0) { break; } } // sblocco queueLock = false; } if (logTimingEnable) { sw.Stop(); // gestione statistiche await ProcStatLog("TakeProcessingItems", sw.Elapsed, 1, 0.3); } return dictResult; } #endregion Public Methods #region Protected Fields protected const string rKeyCalcOreFase = "Check:OreFasi"; protected const string rKeyFasiAct = "Check:FasiAct"; protected const string rKeyProjAct = "Check:ProjAct"; /// /// TTL da 1 min x cache Redis /// protected const int shortTTL = 60 * 5; protected int idxSim = 0; protected Random rnd = new Random(); #endregion Protected Fields #region Protected Properties protected static bool queueLock { get; set; } = false; #endregion Protected Properties #region Protected Methods /// /// Recupera revisione corrente della porta o inizializza /// /// /// protected async Task DoorCalcRev(string sDoorId) { /*---------------------------------- * Recupero Rev corrente porta o inizializza cercando * - coda calcoli pending * - coda errori * - coda calcoli eseguiti * * se non trova riparte da 1 * ----------------------------------*/ int currVers = 0; string sCurrVers = ""; // per prima cosa controllo se ho GIA' in coda qualcosa come richieste if (await NumRequestPending() > 0) { var currPending = await RequestPending(); if (currPending != null) { if (currPending.ContainsKey(sDoorId)) { sCurrVers = currPending[sDoorId]; } } } // cerco coda errori if (string.IsNullOrEmpty(sCurrVers) && await NumRequestErrors() > 0) { var currErrors = await RequestErr(); if (currErrors != null) { if (currErrors.ContainsKey(sDoorId)) { sCurrVers = currErrors[sDoorId]; } } } // cerco coda task completati if (string.IsNullOrEmpty(sCurrVers) && await NumRequestDone() > 0) { var currDone = await RequestDone(); if (currDone != null) { if (currDone.ContainsKey(sDoorId)) { sCurrVers = currDone[sDoorId]; } } } // calcolo ed incremento int.TryParse(sCurrVers, out currVers); currVers++; // inserisco in coda x calcoli await RequestPendingUpsert(sDoorId, $"{currVers}"); // metto in coda "last calc" la porta corrente x pulizia successiva await DoorRefreshUpsert(sDoorId); return currVers; } /// /// Recupero chiave da redis /// /// /// protected async Task getRSV(string rKey) { string answ = ""; var rawData = await redisDb.StringGetAsync(rKey); if (rawData.HasValue) { answ = $"{rawData}"; } return answ; } /// /// Salvataggio chiave in redis /// /// /// /// /// protected async Task setRSV(string rKey, string rVal, int ttlSec) { bool fatto = false; fatto = await redisDb.StringSetAsync(rKey, rVal, TimeSpan.FromSeconds(ttlSec)); return fatto; } /// /// Salvataggio chiave in redis /// /// /// /// /// protected async Task setRSV(string rKey, int rValInt, int ttlSec) { bool fatto = false; fatto = await redisDb.StringSetAsync(rKey, rValInt, TimeSpan.FromSeconds(ttlSec)); return fatto; } /// /// Registra in cache chiave se non fosse già in elenco /// /// protected void trackCache(string newKey) { if (!cachedDataList.Contains(newKey)) { cachedDataList.Add(newKey); } } #endregion Protected Methods #region Private Fields private static IConfiguration _configuration = null!; private static JsonSerializerSettings? JSSettings; private static Logger Log = LogManager.GetCurrentClassLogger(); /// /// Dimensione campione da registrare e trascrivere in log /// private static int statSampleSize = 1; private readonly IEmailSender _emailSender; /// /// Elenco obj in cache /// private List cachedDataList = new List(); /// /// Durata cache lunga IN SECONDI /// private int cacheTtlLong = 60 * 5; /// /// Durata cache breve IN SECONDI /// private int cacheTtlShort = 60 * 1; /// /// abilitazione time log con stopwatch /// private bool logTimingEnable = false; /// /// Path dell'immagine "missing" /// private string missingFilePath = ""; /// /// Contenuto SVG dell'immagine missing /// private string missingSvgContent = ""; /// /// Oggetto per connessione a REDIS /// private ConnectionMultiplexer redisConn = null!; /// /// Oggetto DB redis da impiegare x chiamate R/W /// private IDatabase redisDb = null!; #endregion Private Fields #region Private Properties /// /// Durata cache di 24h /// private TimeSpan DayLongCache { get => TimeSpan.FromHours(24); } /// /// Durata cache di 2 settimane/15gg /// private TimeSpan DblWeekLongCache { get => TimeSpan.FromHours(24 * 15 * rnd.Next(1000, 1100) / 1000); } /// /// Durata cache lunga (+ perturbazione percentuale +/-10%) /// private TimeSpan FastCache { get => TimeSpan.FromSeconds(cacheTtlShort * rnd.Next(900, 1100) / 1000); } /// /// Durata cache lunga (+ perturbazione percentuale +/-10%) /// private TimeSpan LongCache { get => TimeSpan.FromSeconds(cacheTtlLong * rnd.Next(900, 1100) / 1000); } /// /// Durata cache di 24h /// private TimeSpan MonthLongCache { get => TimeSpan.FromDays(30); } /// /// Durata cache lunga (+ perturbazione percentuale +/-10%) /// private TimeSpan UltraLongCache { get => TimeSpan.FromSeconds(cacheTtlLong * 10 * rnd.Next(900, 1100) / 1000); } private int vetoRemoveProcSec { get; set; } = 10; /// /// Durata cache di 1 settimana /// private TimeSpan WeekLongCache { get => TimeSpan.FromHours(24 * 7 * rnd.Next(1000, 1100) / 1000); } #endregion Private Properties #region Private Methods /// /// Esegue flush memoria redis dato pattern /// /// /// private async Task ExecFlushRedisPattern(RedisValue pattern) { bool answ = false; var listEndpoints = redisConn.GetEndPoints(); foreach (var endPoint in listEndpoints) { //var server = redisConnAdmin.GetServer(listEndpoints[0]); var server = redisConn.GetServer(endPoint); if (server != null) { var keyList = server.Keys(redisDb.Database, pattern); foreach (var item in keyList) { await redisDb.KeyDeleteAsync(item); } // brutalmente rimuovo intero contenuto DB... DANGER //await server.FlushDatabaseAsync(); answ = true; } } return answ; } /// /// Processa log statistiche: /// - accumula dati /// - se superata soglia (soglia x evMultipl) registra log /// /// nome statistica /// durata evento tracciato /// /// livello richiesto: 0:trace, 1:debug, 2:info, 3:warn, 4:error, 5: fatal, 6: off /// /// moltiplicatore soglia x log statistica (default = 1) /// private async Task ProcStatLog(string statName, TimeSpan elapsed, int logLevReq, double evMultipl) { bool doWrite = await StatUpsert(statName, elapsed, evMultipl); if (doWrite) { // recupero e resetto ExecStats statRec = await StatReset(statName); string logMsg = $"Eseguito {statName} x {statRec.NumCall} | {statRec.AvgTime.TotalMilliseconds:N3}ms"; switch (logLevReq) { case 0: Log.Trace(logMsg); break; case 1: Log.Debug(logMsg); break; case 2: Log.Info(logMsg); break; case 3: Log.Warn(logMsg); break; case 4: Log.Error(logMsg); break; case 5: Log.Fatal(logMsg); break; case 6: default: Log.Error(logMsg); break; } } } #endregion Private Methods } }