using Microsoft.AspNetCore.Identity.UI.Services; using Microsoft.Extensions.Configuration; using Newtonsoft.Json; using NLog; using StackExchange.Redis; using System.Diagnostics; using WebDoorCreator.Core; using WebDoorCreator.Data.DTO; namespace WebDoorCreator.Data.Services { public class QueueDataService : IDisposable { #region Public Constructors /// /// Init classe /// /// /// /// public QueueDataService(IConfiguration configuration, IEmailSender emailSender, IConnectionMultiplexer redisConn) { Log.Info("QueueDataService starting..."); _configuration = configuration; _emailSender = emailSender; // setup componenti REDIS var redConnString = _configuration.GetConnectionString("Redis"); if (redConnString == null) { Log.Error("REDIS ConnString empty!"); } else { this.redisConn = ConnectionMultiplexer.Connect(redConnString); redisDb = this.redisConn.GetDatabase(); } // setup canali pub/sub CalcReqPipe = new MessagePipe(redisConn, Constants.CALC_REQ_QUEUE); CalcDonePipe = new MessagePipe(redisConn, Constants.CALC_DONE_QUEUE); // json serializer... FIX errore loop circolare https://www.ryadel.com/en/jsonserializationexception-self-referencing-loop-detected-error-fix-entity-framework-asp-net-core/ JSSettings = new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }; // leggo conf speciali vetoRemoveProcSec = _configuration.GetValue("RuntimeOpt:VetoRemoveProcessing"); Log.Info("QueueDataService started!"); } #endregion Public Constructors #region Public Properties /// /// Message pipe esecuzione elaborazione CAM --> UI /// public MessagePipe CalcDonePipe { get; set; } = null!; /// /// Message pipe richieste elaborazione UI --> CAM /// public MessagePipe CalcReqPipe { get; set; } = null!; #endregion Public Properties #region Public Methods public void Dispose() { redisConn.Dispose(); } /// /// Effettua pulizia cache dati calcolo porte /// /// /// public async Task DoorCalcCacheCleanup() { /*---------------------------------- * Recupero Rev corrente porta o inizializza cercando * - coda calcoli pending * - coda errori * - coda calcoli eseguiti * * se non trova riparte da 1 * ----------------------------------*/ bool fatto = false; int numDayPre = 30; if (!string.IsNullOrEmpty(_configuration.GetValue("RuntimeOpt:MaxDayCalcCache"))) { numDayPre = _configuration.GetValue("RuntimeOpt:MaxDayCalcCache"); } DateTime dtLimite = DateTime.Now.AddDays(-numDayPre); // recupero elenco di tutti i refresh porte... var refreshStatus = await DoorRefreshStatus(); // ciclo su tutti e cerco quelli + vecchi di x giorni if (refreshStatus != null && refreshStatus.Count > 0) { DateTime dtCurr = DateTime.Now; foreach (var item in refreshStatus) { // confronto data-ora della singola porta... if (!string.IsNullOrEmpty(item.Key)) { try { var rawDate = JsonConvert.DeserializeObject(item.Value); // se esiste e SE è scaduto if (rawDate > DateTime.MinValue && rawDate < dtLimite) { // cancello i dati! await RequestProcessingRemove(item.Key); await RequestErrRemove(item.Key); await RequestDoneRemove(item.Key); await DoorRefreshRemove(item.Key); } } catch { } } } } await Task.Delay(1); return fatto; } /// /// Restitusice gli errori della porta in oggetto /// /// formato DoorId:Versione /// public async Task DoorErrExists(string doorIdVers) { bool hasErr = false; // cerco versione errore x la porta... var doorData = doorIdVers.Split(":"); if (doorData.Length == 2) { RedisKey currErrKey = new RedisKey($"{Constants.CALC_REQ_ERRS}"); int currId = await RedHashGetInt(currErrKey, doorData[0]); hasErr = doorData[1] == $"{currId}"; } return hasErr; } /// /// Restitusice gli errori della porta in oggetto /// /// formato DoorId:Versione /// public async Task DoorErrGet(string doorIdVers) { RedisKey currErrKey = new RedisKey($"{Constants.CALC_REQ_ERRS}:{doorIdVers}"); RedisValue errVal = await redisDb.StringGetAsync(currErrKey); return errVal.ToString(); } /// /// Cerca nelle esecuzioni ultimo SVG della porta e lo restituisce... /// /// /// public async Task DoorGetLastSvg(int DoorId) { string answ = ""; // cerco in hashtable dei task eseguiti ultimo x porta corrente... var doorData = await RequestDoneGetSingle(DoorId); if (doorData != null && doorData.Count > 0) { var firstRecord = doorData.FirstOrDefault(); // recupero da REDIS var currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{firstRecord.Key}:{firstRecord.Value}"); var rawData = await redisDb.StringGetAsync(currSvgKey); if (rawData.HasValue) { answ = $"{rawData}"; } } // se non trovata --> missing! if (string.IsNullOrEmpty(answ)) { // leggo missing standard... string missingFilePath = Path.Combine(System.IO.Directory.GetCurrentDirectory(), "wwwroot/images", "MissingOrange.svg"); if (File.Exists(missingFilePath)) { answ = File.ReadAllText(missingFilePath); } } return answ; } /// /// Remove for single hash record /// /// public async Task DoorRefreshRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Get Door (last) Refresh status /// /// Dictionary of DoorId, saveVersNumb public async Task> DoorRefreshStatus() { string source = "REDIS"; long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"DoorRefreshStatus | {source} in: {ts.TotalMilliseconds} ms"); return dictResult; } /// /// Upsert info ultimo update di una porta /// /// Dictionary of DoorId, saveVersNumb public async Task DoorRefreshUpsert(string doorId) { DateTime adesso = DateTime.Now; string sAdesso = JsonConvert.SerializeObject(adesso, JSSettings); RedisKey currKey = new RedisKey(Constants.LAST_DOOR_REFR_KEY); long numReq = await RedHashUpsert(currKey, doorId, sAdesso); return numReq; } /// /// Restitusice l'SVG della porta in oggetto /// /// formato DoorId:Versione /// public async Task DoorSvgGet(string doorIdVers) { RedisKey currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{doorIdVers}"); RedisValue svgVal = await redisDb.StringGetAsync(currSvgKey); return svgVal.ToString(); } /// /// Check if specified DoorId is in Template door list /// /// Door Id to search /// Found (true/false) public async Task DoorTplListContainsDoor(string DoorId) { bool found = false; RedisKey currKey = new RedisKey(Constants.DOOR_TPL_LIST); var rawVal = await RedHashGetString(currKey, DoorId); found = !string.IsNullOrEmpty(rawVal) && (rawVal == DoorId); return found; } /// /// Add specified DoorIdList to Template door list /// /// List to add /// Num of items in list public async Task DoorTplListUpsert(List DoorIdList) { RedisKey currKey = new RedisKey(Constants.DOOR_TPL_LIST); long numReq = 0; foreach (var item in DoorIdList) { numReq = await RedHashUpsert(currKey, item, item); } return numReq; } public async Task FlushRedisCache() { await Task.Delay(1); RedisValue pattern = new RedisValue($"{Constants.BASE_HASH}:Cache*"); bool answ = await ExecFlushRedisPattern(pattern); return answ; } /// /// Get # of calculation request done /// public async Task NumRequestDone() { long numReq = 0; string source = "REDIS"; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = await redisDb.HashLengthAsync(currKey); stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"NumRequestDone | {source} in: {ts.TotalMilliseconds} ms"); return numReq; } /// /// Get # of calculation request with errors /// public async Task NumRequestErrors() { long numReq = 0; string source = "REDIS"; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = await redisDb.HashLengthAsync(currKey); stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"NumRequestErrors | {source} in: {ts.TotalMilliseconds} ms"); return numReq; } /// /// Get # of calculation request pending /// public async Task NumRequestPending() { long numReq = 0; string source = "REDIS"; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = await redisDb.HashLengthAsync(currKey); stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"NumRequestPending | {source} in: {ts.TotalMilliseconds} ms"); return numReq; } /// /// Get # of calculation request processing /// public async Task NumRequestProcessing() { long numReq = 0; string source = "REDIS"; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = await redisDb.HashLengthAsync(currKey); stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"NumRequestProcessing | {source} in: {ts.TotalMilliseconds} ms"); return numReq; } /// /// Get single hash record /// /// Redis Key for Hashlist /// Requested key on list /// Value as Int public async Task RedHashGetInt(RedisKey currKey, string chiave) { int result = 0; Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); var hasVal = await redisDb.HashExistsAsync(currKey, chiave); if (hasVal) { var rawRes = await redisDb.HashGetAsync(currKey, chiave); if (rawRes.HasValue) { int.TryParse($"{rawRes}", out result); } } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Trace($"RedHashGetInt | {currKey} | in: {ts.TotalMilliseconds} ms"); return result; } /// /// Get single hash record /// /// Redis Key for Hashlist /// Requested key on list /// Value as string public async Task RedHashGetString(RedisKey currKey, string chiave) { string result = ""; Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); var hasVal = await redisDb.HashExistsAsync(currKey, chiave); if (hasVal) { var rawRes = await redisDb.HashGetAsync(currKey, chiave); if (rawRes.HasValue) { result = $"{rawRes}"; } } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Trace($"RedHashGetString | {currKey} | in: {ts.TotalMilliseconds} ms"); return result; } /// /// Remove for single hash record /// /// Chiave redis della Hashlist /// Chiave nella HashList /// Esito rimozione public async Task RedHashRemove(RedisKey currKey, string chiave) { bool fatto = false; Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); fatto = await redisDb.HashDeleteAsync(currKey, chiave); stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Trace($"RedHashRemove | {currKey} | in: {ts.TotalMilliseconds} ms"); return fatto; } /// /// Restitusice gli errori della porta in oggetto /// /// formato DoorId:Versione /// public async Task RedisBulkDelHashByKey(int doorId) { await DoorRefreshRemove($"{doorId}"); await RequestDoneRemove($"{doorId}"); await RequestErrRemove($"{doorId}"); await RequestPendingRemove($"{doorId}"); await RequestProcessingRemove($"{doorId}"); return true; } /// /// Get Queue request done /// /// Dictionary of DoorId, saveVersNumb public async Task> RequestDone() { string source = "REDIS"; long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"RequestDone | {source} in: {ts.TotalMilliseconds} ms"); return dictResult; } /// /// Get LAST request done for DoorId /// /// /// public async Task> RequestDoneGetSingle(int DoorId) { string source = "REDIS"; long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAsync(currKey, $"{DoorId}"); if (rawData.HasValue) { dictResult.Add($"{DoorId}", $"{rawData}"); } } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"RequestDoneGetSingle | {source} in: {ts.TotalMilliseconds} ms"); return dictResult; } /// /// Remove for single hash record /// /// Dictionary of DoorId, saveVersNumb public async Task RequestDoneRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Upsert for single hash record /// /// Dictionary of DoorId, saveVersNumb public async Task RequestDoneUpsert(string doorId, string vers) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_DONE); long numReq = await RedHashUpsert(currKey, doorId, vers); return numReq; } /// /// Get Queue request with errors /// /// Dictionary of DoorId, saveVersNumb public async Task> RequestErr() { string source = "REDIS"; long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"RequestErr | {source} in: {ts.TotalMilliseconds} ms"); return dictResult; } /// /// Rimuove hash record errori /// /// Dictionary of DoorId, saveVersNumb public async Task RequestErrRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Upsert record errori /// /// Dictionary of DoorId, saveVersNumb public async Task RequestErrUpsert(string doorId, string vers) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_ERRS); long numReq = await RedHashUpsert(currKey, doorId, vers); return numReq; } /// /// Get Queue request pending /// /// Dictionary of DoorId, saveVersNumb public async Task> RequestPending() { string source = "REDIS"; long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"RequestPending | {source} in: {ts.TotalMilliseconds} ms"); return dictResult; } /// /// Remove for single hash record /// /// Dictionary of DoorId, saveVersNumb public async Task RequestPendingRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Upsert for single hash record /// /// Dictionary of DoorId, saveVersNumb public async Task RequestPendingUpsert(string doorId, string vers) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); long numReq = await RedHashUpsert(currKey, doorId, vers); return numReq; } /// /// Get Queue request processing /// /// Dictionary of DoorId, saveVersNumb public async Task> RequestProcessing() { string source = "REDIS"; long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); numReq = redisDb.HashLength(currKey); if (numReq > 0) { var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { dictResult.Add($"{item.Name}", $"{item.Value}"); } } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"RequestProcessing | {source} in: {ts.TotalMilliseconds} ms"); return dictResult; } /// /// Remove for single hash record /// /// Dictionary of DoorId, saveVersNumb public async Task RequestProcessingRemove(string doorId) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); bool fatto = await RedHashRemove(currKey, doorId); return fatto; } /// /// Upsert for single hash record in processing /// /// Dictionary of DoorId, saveVersNumb public async Task RequestProcessingUpsert(string doorId, string vers) { RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); long numReq = await RedHashUpsert(currKey, doorId, vers); // aggiungo cache veto rimozione da coda processing RedisKey currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:{doorId}"); await redisDb.StringSetAsync(currVetoKey, vers, TimeSpan.FromSeconds(vetoRemoveProcSec)); return numReq; } /// /// Reset to queue request all processing/processed data /// /// Dictionary of DoorId, saveVersNumb public async Task ResetQueue() { bool fatto = false; await Task.Delay(1); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); // cerco le richieste processing RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestProcessingRemove(item.Name!); fatto = true; } // cerco le richieste con errori currKey = new RedisKey(Constants.CALC_REQ_ERRS); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestErrRemove(item.Name!); fatto = true; } // cerco le richieste processed currKey = new RedisKey(Constants.CALC_REQ_DONE); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestDoneRemove(item.Name!); fatto = true; } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"ResetQueue | REDIS EXEC in: {ts.TotalMilliseconds} ms"); return fatto; } /// /// Reset to queue request for specified DoorIdList /// /// Dictionary of DoorId, saveVersNumb public async Task> ResetQueueByDoorList(List DoorIdList) { int num2proc = DoorIdList.Count; List listDone = new List(); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); // cerco le richieste processing RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { // se è nell'elenco... if (DoorIdList.Contains(item.Name.ToString())) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestProcessingRemove(item.Name!); listDone.Add(item.Name.ToString()); } } // cerco le richieste con errori currKey = new RedisKey(Constants.CALC_REQ_ERRS); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { // se è nell'elenco... if (DoorIdList.Contains(item.Name.ToString())) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestErrRemove(item.Name!); listDone.Add(item.Name.ToString()); } } // cerco le richieste processed currKey = new RedisKey(Constants.CALC_REQ_DONE); rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { // se è nell'elenco... if (DoorIdList.Contains(item.Name.ToString())) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestDoneRemove(item.Name!); listDone.Add(item.Name.ToString()); } } #if false // calcolo eventuali porte NON ancora richieste... foreach (var item in listDone) { DoorIdList.Remove(item); } #endif stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"ResetQueueByDoorList | doors #: {num2proc} --> {DoorIdList.Count} | EXEC in: {ts.TotalMilliseconds} ms"); return DoorIdList; } /// /// Reset to queue request all processing stuck data (veto check) /// /// Boolean, executed public async Task ResetQueueProcessing() { bool fatto = false; await Task.Delay(1); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); // cerco le richieste processing RedisKey currKey = new RedisKey(Constants.CALC_REQ_PROC); RedisKey currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:0"); var rawData = await redisDb.HashGetAllAsync(currKey); foreach (var item in rawData) { // verifico SE siano senza veto rimozione... currVetoKey = new RedisKey($"{Constants.CALC_REQ_PROC}:{item.Name}"); var redisVal = await redisDb.StringGetAsync(currVetoKey); if (!redisVal.HasValue) { await RequestPendingUpsert(item.Name!, item.Value!); await RequestProcessingRemove(item.Name!); fatto = true; } } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"ResetQueueProcessing | REDIS EXEC in: {ts.TotalMilliseconds} ms"); return fatto; } /// /// Salvo elenco risultati elaborazioni (modalità boolean di esecuzione corretta) /// /// Risultati elaborazioni in formato CalcResultDTO /// public async Task SaveProcessingResult(List calcResults) { bool answ = true; if (calcResults != null && calcResults.Count > 0) { string sDoorId = ""; string sCurrVers = ""; bool doorIsTpl = false; foreach (var calcTask in calcResults) { RedisKey currSvgKey = new RedisKey(""); var doorData = calcTask.DoorIdVers.Split("."); sDoorId = doorData.Length > 0 ? doorData[0] : ""; sCurrVers = doorData.Length > 0 ? doorData[1] : ""; // verifico se sia template --> durata indefinita... doorIsTpl = await DoorTplListContainsDoor(sDoorId); // se valido salvo SVG... if (calcTask.Validated) { // salvo in area REDIS currSvgKey = new RedisKey($"{Constants.CALC_REQ_SVG_CACHE}:{sDoorId}:{sCurrVers}"); // se template --> no scadenza if (doorIsTpl) { await redisDb.StringSetAsync(currSvgKey, calcTask.SvgGen); } else { await redisDb.StringSetAsync(currSvgKey, calcTask.SvgGen, WeekLongCache); } // elimino dalle code proc/err await RequestProcessingRemove(sDoorId); await RequestErrRemove(sDoorId); // metto in coda done await RequestDoneUpsert(sDoorId, sCurrVers); } // altrimenti salvo errore e metto in coda errori else { // salvo in area REDIS currSvgKey = new RedisKey($"{Constants.CALC_REQ_ERRS}:{sDoorId}:{sCurrVers}"); if (doorIsTpl) { await redisDb.StringSetAsync(currSvgKey, calcTask.ErrorMsg); } else { await redisDb.StringSetAsync(currSvgKey, calcTask.ErrorMsg, WeekLongCache); } // elimino dalle code proc/done await RequestProcessingRemove(sDoorId); await RequestDoneRemove(sDoorId); // metto in coda err await RequestErrUpsert(sDoorId, sCurrVers); } // invio il messaggio di ritorno... CalcDonePipe.saveAndSendMessage(Constants.LAST_CALC_DONE_KEY, calcTask.DoorIdVers); } } else { await Task.Delay(1); } return answ; } /// /// Invio richiesta di calcolo per la porta indicata, dato il suo DDF /// /// /// Contenuto completo del DDF /// indice/versione del calcolo DDF public async Task SendCalcReq(int DoorId, string FullDDF) { Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); string sDoorId = $"{DoorId}"; int currVers = await DoorCalcRev(sDoorId); string sCurrVers = $"{currVers}"; // elimino da code errori (SE fosse presente) await RequestErrRemove(sDoorId); // salvo nell'archivio REDIS delle porte il DDF corrente (key=doorId.versNumb), potrebbe // venire buono anche x eventuale UNDO... RedisKey currDdfKey = new RedisKey($"{Constants.CALC_REQ_DDF_CACHE}:{sDoorId}:{sCurrVers}"); await redisDb.StringSetAsync(currDdfKey, FullDDF, WeekLongCache); // invio sul canale dei messaggi il numero di items in coda attuali x chiedere esecuzione... long numPending = await NumRequestPending(); bool answ = CalcReqPipe.saveAndSendMessage(Constants.LAST_CALC_REQ_KEY, $"{numPending}"); stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"SendCalcReq | DoorId: {DoorId} enqueued in: {ts.TotalMilliseconds} ms"); return currVers; } /// /// Get Queue request pending, removing from queue and putting on processing queue /// /// Dictionary of DoorId, saveVersNumb public async Task> TakeProcessingItems(int numItems) { int maxTake = Math.Min(10, numItems); long numReq = 0; Dictionary dictResult = new Dictionary(); // cerco da cache RedisKey currKey = new RedisKey(Constants.CALC_REQ_PEND); Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); // calcolo il totale delle richieste pending numReq = redisDb.HashLength(currKey); if (numReq > 0) { // per prima cosa aspetto che NON sia locked la coda... while (queueLock) { Thread.Sleep(rnd.Next(1, 20)); } // blocco queueLock = true; var rawData = await redisDb.HashGetAllAsync(currKey); // sposto fino a concorrenza... foreach (var item in rawData) { // per prima cosa tolgo da item richiesti e metto in processing await RequestPendingRemove(item.Name!); await RequestProcessingUpsert(item.Name!, item.Value!); maxTake--; // recupero il DDF... RedisKey currDdfKey = new RedisKey($"{Constants.CALC_REQ_DDF_CACHE}:{item.Name}:{item.Value}"); var rawDDF = await redisDb.StringGetAsync(currDdfKey); dictResult.Add($"{item.Name}.{item.Value}", $"{rawDDF}"); if (maxTake <= 0) { break; } } // sblocco queueLock = false; } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Debug($"TakeProcessingItems | REDIS in: {ts.TotalMilliseconds} ms"); return dictResult; } #endregion Public Methods #region Protected Fields protected const string rKeyCalcOreFase = "Check:OreFasi"; protected const string rKeyFasiAct = "Check:FasiAct"; protected const string rKeyProjAct = "Check:ProjAct"; /// /// TTL da 1 min x cache Redis /// protected const int shortTTL = 60 * 5; protected int idxSim = 0; protected Random rnd = new Random(); #endregion Protected Fields #region Protected Properties protected static bool queueLock { get; set; } = false; #endregion Protected Properties #region Protected Methods /// /// Recupera revisione corrente della porta o inizializza /// /// /// protected async Task DoorCalcRev(string sDoorId) { /*---------------------------------- * Recupero Rev corrente porta o inizializza cercando * - coda calcoli pending * - coda errori * - coda calcoli eseguiti * * se non trova riparte da 1 * ----------------------------------*/ int currVers = 1; string sCurrVers = ""; // per prima cosa controllo se ho GIA' in coda qualcosa come richieste if (await NumRequestPending() > 0) { var currPending = await RequestPending(); if (currPending != null) { if (currPending.ContainsKey(sDoorId)) { sCurrVers = currPending[sDoorId]; int.TryParse(sCurrVers, out currVers); currVers++; } } } // cerco coda errori if (await NumRequestErrors() > 0) { var currErrors = await RequestErr(); if (currErrors != null) { if (currErrors.ContainsKey(sDoorId)) { sCurrVers = currErrors[sDoorId]; int.TryParse(sCurrVers, out currVers); currVers++; } } } // cerco coda task completati if (await NumRequestDone() > 0) { var currDone = await RequestDone(); if (currDone != null) { if (currDone.ContainsKey(sDoorId)) { sCurrVers = currDone[sDoorId]; int.TryParse(sCurrVers, out currVers); currVers++; } } } // inserisco in coda x calcoli await RequestPendingUpsert(sDoorId, $"{currVers}"); // metto in coda "last calc" la porta corrente x pulizia successiva await DoorRefreshUpsert(sDoorId); return currVers; } /// /// Recupero chiave da redis /// /// /// protected async Task getRSV(string rKey) { string answ = ""; var rawData = await redisDb.StringGetAsync(rKey); if (rawData.HasValue) { answ = $"{rawData}"; } return answ; } /// /// Effettua upsert in HasList redis /// /// Chiave redis della Hashlist /// Chiave nella HashList /// Valore da salvare /// Num record nella HashList protected async Task RedHashUpsert(RedisKey currKey, string chiave, string valore) { long numReq = 0; Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); await redisDb.HashSetAsync(currKey, chiave, valore); numReq = await redisDb.HashLengthAsync(currKey); stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; Log.Trace($"RedHashUpsert | {currKey} | in: {ts.TotalMilliseconds} ms"); return numReq; } /// /// Salvataggio chiave in redis /// /// /// /// /// protected async Task setRSV(string rKey, string rVal, int ttlSec) { bool fatto = false; await redisDb.StringSetAsync(rKey, rVal, TimeSpan.FromSeconds(ttlSec)); fatto = true; return fatto; } /// /// Salvataggio chiave in redis /// /// /// /// /// protected async Task setRSV(string rKey, int rValInt, int ttlSec) { bool fatto = false; await redisDb.StringSetAsync(rKey, rValInt, TimeSpan.FromSeconds(ttlSec)); fatto = true; return fatto; } /// /// Registra in cache chiave se non fosse già in elenco /// /// protected void trackCache(string newKey) { if (!cachedDataList.Contains(newKey)) { cachedDataList.Add(newKey); } } #endregion Protected Methods #region Private Fields private static IConfiguration _configuration = null!; private static JsonSerializerSettings? JSSettings; private static Logger Log = LogManager.GetCurrentClassLogger(); private readonly IEmailSender _emailSender; /// /// Elenco obj in cache /// private List cachedDataList = new List(); /// /// Durata cache lunga IN SECONDI /// private int cacheTtlLong = 60 * 5; /// /// Durata cache breve IN SECONDI /// private int cacheTtlShort = 60 * 1; /// /// Oggetto per connessione a REDIS /// private ConnectionMultiplexer redisConn = null!; /// /// Oggetto DB redis da impiegare x chiamate R/W /// private IDatabase redisDb = null!; #endregion Private Fields #region Private Properties /// /// Durata cache di 24h /// private TimeSpan DayLongCache { get => TimeSpan.FromHours(24); } /// /// Durata cache lunga (+ perturbazione percentuale +/-10%) /// private TimeSpan FastCache { get => TimeSpan.FromSeconds(cacheTtlShort * rnd.Next(900, 1100) / 1000); } /// /// Durata cache lunga (+ perturbazione percentuale +/-10%) /// private TimeSpan LongCache { get => TimeSpan.FromSeconds(cacheTtlLong * rnd.Next(900, 1100) / 1000); } /// /// Durata cache di 24h /// private TimeSpan MonthLongCache { get => TimeSpan.FromDays(30); } /// /// Durata cache lunga (+ perturbazione percentuale +/-10%) /// private TimeSpan UltraLongCache { get => TimeSpan.FromSeconds(cacheTtlLong * 10 * rnd.Next(900, 1100) / 1000); } private int vetoRemoveProcSec { get; set; } = 10; /// /// Durata cache di 1 settimana /// private TimeSpan WeekLongCache { get => TimeSpan.FromHours(24 * 7 * rnd.Next(1000, 1100) / 1000); } #endregion Private Properties #region Private Methods /// /// Esegue flush memoria redis dato pattern /// /// /// private async Task ExecFlushRedisPattern(RedisValue pattern) { bool answ = false; var listEndpoints = redisConn.GetEndPoints(); foreach (var endPoint in listEndpoints) { //var server = redisConnAdmin.GetServer(listEndpoints[0]); var server = redisConn.GetServer(endPoint); if (server != null) { var keyList = server.Keys(redisDb.Database, pattern); foreach (var item in keyList) { await redisDb.KeyDeleteAsync(item); } // brutalmente rimuovo intero contenuto DB... DANGER //await server.FlushDatabaseAsync(); answ = true; } } return answ; } #endregion Private Methods } }