using NLog; using StackExchange.Redis; using System.Globalization; namespace MP.RIOC.Services { public class MetricsCalcService : BackgroundService { #region Public Constructors /// /// Metodo x calcolo metriche/statistiche di esecuzone realtime /// /// /// /// public MetricsCalcService(RouteStatsManager stats, LuaScriptProvider luaProvider, IConfiguration config, IConnectionMultiplexer mux) { _stats = stats; _config = config; _db = mux.GetDatabase(); _redisBaseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; _updateScript = luaProvider.Get("Update"); } #endregion Public Constructors #region Protected Methods /// /// Valore SentinelValue x valore minimo in reids/Lua /// Usiamo un valore molto grande (es. 10 milioni di secondi = ~115 giorni). /// E' impossibile che una singola richiesta HTTP duri cos� tanto, quindi usarlo come "SentinelValue" � sicuro. /// private const string SentinelValue = "999999999"; /// /// Script update Redis in Lua, recuperato dal provider script. /// Lo script gestisce l'incremento e la logica condizionale per Min/Max in un'unica operazione atomica. /// private readonly string _updateScript; // Classe di supporto per l'aggregazione locale dei valori Daily private class AggregatedStats { public long Count; public long NoReply; public double TotalMs; public double MaxMs; public double MinMs = double.MaxValue; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var interval = _config.GetValue("RouteMan:MetricCalcIntervalSeconds", 30); while (!stoppingToken.IsCancellationRequested) { try { await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken); var snapshot = _stats.Snapshot(); if (snapshot.Count == 0) continue; var adesso = DateTime.Now; var hourStart = new DateTime(adesso.Year, adesso.Month, adesso.Day, adesso.Hour, 0, 0); var dayStart = new DateTime(adesso.Year, adesso.Month, adesso.Day, 0, 0, 0); if (_db == null) continue; var batch = _db.CreateBatch(); var tasks = new List(); // Dizionario per aggregare i valori Daily prima di inviarli a Redis var dailyAggregates = new Dictionary(); foreach (var kv in snapshot) { string dest = "IO"; string method = "NA"; string machineId = "ALL"; string rawKey = kv.Key; if (rawKey.Contains("|")) { var splitVal = rawKey.Split("|"); dest = splitVal[0]; if (splitVal.Length > 1) method = splitVal[1]; if (splitVal.Length > 2) machineId = splitVal[2]; } else { method = rawKey; } var stat = kv.Value; var count = Interlocked.Read(ref stat.Count); var totalMs = stat.TotalDuration.TotalMilliseconds; var maxMs = stat.MaxDuration.TotalMilliseconds; // Se la durata è ancora MaxValue, usiamo la SentinelValue per dire a Redis "non aggiornare" var minMs = (stat.MinDuration == TimeSpan.MaxValue) ? double.Parse(SentinelValue) : stat.MinDuration.TotalMilliseconds; // --- LOGICA HOURLY (Per ogni route/method e' una chiave distinta) --- var hourKey = HourBucketKey(dest, method, hourStart); var hoursIndex = HoursIndexKey(dest, method); var hourScore = ToEpochSeconds(hourStart); // Calcolo NoReply: Somma di tutti i codici >= 400 o errori espliciti long noReplyCount = stat.ErrorMessages.Sum(x => x.Value); // 1. INVIO BUCKET PRINCIPALE (con NoReply) // Usiamo lo script Lua per l'aggiornamento atomico dell'ora tasks.Add(batch.ScriptEvaluateAsync(_updateScript, new RedisKey[] { hourKey }, new RedisValue[] { count.ToString(CultureInfo.InvariantCulture), totalMs.ToString(CultureInfo.InvariantCulture), maxMs.ToString(CultureInfo.InvariantCulture), minMs.ToString(CultureInfo.InvariantCulture), noReplyCount.ToString(CultureInfo.InvariantCulture), SentinelValue })); // 2. INVIO DISTRIBUZIONE STATUS CODES if (stat.StatusCodes.Any()) { var statusKey = hourKey + ":status"; foreach (var status in stat.StatusCodes) { tasks.Add(batch.HashIncrementAsync(statusKey, status.Key.ToString(), status.Value)); } // Aggiungiamo anche questa chiave all'indice per la pulizia automatica tasks.Add(batch.SortedSetAddAsync(hoursIndex, statusKey, hourScore)); } // 3. INVIO DETTAGLIO ERRORI if (stat.ErrorMessages.Any()) { var errorKey = hourKey + ":errors"; foreach (var error in stat.ErrorMessages) { // Usiamo HashIncrement per aggregare messaggi uguali tasks.Add(batch.HashIncrementAsync(errorKey, error.Key, error.Value)); } tasks.Add(batch.SortedSetAddAsync(hoursIndex, errorKey, hourScore)); } tasks.Add(batch.SortedSetAddAsync(hoursIndex, hourKey, hourScore)); // --- LOGICA DAILY (Aggregazione locale per evitare sovrascritture nel loop) --- var dayKey = DayBucketKey(dest, machineId, dayStart); var daysIndex = DaysIndexKey(dest, machineId); var dayScore = ToEpochSeconds(dayStart); if (!dailyAggregates.TryGetValue(dayKey, out var agg)) { agg = new AggregatedStats(); dailyAggregates[dayKey] = agg; } agg.Count += count; agg.NoReply += noReplyCount; agg.TotalMs += totalMs; agg.MaxMs = Math.Max(agg.MaxMs, maxMs); if (minMs != double.MaxValue) agg.MinMs = Math.Min(agg.MinMs, minMs); // Aggiungiamo l'indice al batch tasks.Add(batch.SortedSetAddAsync(daysIndex, dayKey, dayScore)); } // --- INVIO AGGREGATI DAILY A REDIS --- foreach (var dayKV in dailyAggregates) { var key = dayKV.Key; var agg = dayKV.Value; // Se agg.MinMs è inizializzato a double.MaxValue, lo portiamo alla SentinelValue double finalMin = (agg.MinMs == double.MaxValue || agg.MinMs >= double.Parse(SentinelValue)) ? double.Parse(SentinelValue) : agg.MinMs; tasks.Add(batch.ScriptEvaluateAsync(_updateScript, new RedisKey[] { key }, new RedisValue[] { agg.Count.ToString(CultureInfo.InvariantCulture), agg.TotalMs.ToString(CultureInfo.InvariantCulture), agg.MaxMs.ToString(CultureInfo.InvariantCulture), finalMin.ToString(CultureInfo.InvariantCulture), agg.NoReply.ToString(CultureInfo.InvariantCulture), SentinelValue })); } // Esegui tutto il batch in un unico round-trip batch.Execute(); await Task.WhenAll(tasks); _stats.Clear(); } catch (TaskCanceledException) { } catch (Exception ex) { Log.Error(ex, "Error flushing metrics"); } } } #endregion Protected Methods #region Private Fields private static string _redisBaseKey = ""; private static Logger Log = LogManager.GetCurrentClassLogger(); private readonly IConfiguration _config; private readonly IDatabase _db; private readonly RouteStatsManager _stats; #endregion Private Fields #region Private Methods private static string DayBucketKey(string dest, string machId, DateTime dtRif) { return $"{_redisBaseKey}:stats:day:{dest}:{machId}:{dtRif.ToString("yyyyMMdd", CultureInfo.InvariantCulture)}"; } private static string DaysIndexKey(string dest, string machId) { return $"{_redisBaseKey}:stats:days:{dest}:{machId}"; } private static string HourBucketKey(string dest, string method, DateTime dtRif) { return $"{_redisBaseKey}:stats:hour:{dest}:{method}:{dtRif.ToString("yyyyMMddHH", CultureInfo.InvariantCulture)}"; } private static string HoursIndexKey(string dest, string method) { return $"{_redisBaseKey}:stats:hours:{dest}:{method}"; } private static long ToEpochSeconds(DateTime dt) { return new DateTimeOffset(dt).ToUnixTimeSeconds(); } #endregion Private Methods } }