Files
mapo-core/MP.RIOC/Services/MetricsCalcService.cs
2026-05-11 10:11:58 +02:00

256 lines
11 KiB
C#

using NLog;
using StackExchange.Redis;
using System.Globalization;
namespace MP.RIOC.Services
{
public class MetricsCalcService : BackgroundService
{
#region Public Constructors
/// <summary>
/// Metodo x calcolo metriche/statistiche di esecuzone realtime
/// </summary>
/// <param name="stats"></param>
/// <param name="config"></param>
/// <param name="mux"></param>
public MetricsCalcService(RouteStatsManager stats,
LuaScriptProvider luaProvider,
IConfiguration config,
IConnectionMultiplexer mux)
{
_stats = stats;
_config = config;
_db = mux.GetDatabase();
_redisBaseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
_updateScript = luaProvider.Get("Update");
}
#endregion Public Constructors
#region Protected Methods
/// <summary>
/// Valore SentinelValue x valore minimo in reids/Lua
/// Usiamo un valore molto grande (es. 10 milioni di secondi = ~115 giorni).
/// E' impossibile che una singola richiesta HTTP duri cos tanto, quindi usarlo come "SentinelValue" sicuro.
/// </summary>
private const string SentinelValue = "999999999";
/// <summary>
/// Script update Redis in Lua, recuperato dal provider script.
/// Lo script gestisce l'incremento e la logica condizionale per Min/Max in un'unica operazione atomica.
/// </summary>
private readonly string _updateScript;
// Classe di supporto per l'aggregazione locale dei valori Daily
private class AggregatedStats
{
public long Count;
public long NoReply;
public double TotalMs;
public double MaxMs;
public double MinMs = double.MaxValue;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var interval = _config.GetValue<int>("RouteMan:MetricCalcIntervalSeconds", 30);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
var snapshot = _stats.Snapshot();
if (snapshot.Count == 0) continue;
var adesso = DateTime.Now;
var hourStart = new DateTime(adesso.Year, adesso.Month, adesso.Day, adesso.Hour, 0, 0);
var dayStart = new DateTime(adesso.Year, adesso.Month, adesso.Day, 0, 0, 0);
if (_db == null) continue;
var batch = _db.CreateBatch();
var tasks = new List<Task>();
// Dizionario per aggregare i valori Daily prima di inviarli a Redis
var dailyAggregates = new Dictionary<string, AggregatedStats>();
foreach (var kv in snapshot)
{
string dest = "IO";
string method = "NA";
string machineId = "ALL";
string rawKey = kv.Key;
if (rawKey.Contains("|"))
{
var splitVal = rawKey.Split("|");
dest = splitVal[0];
if (splitVal.Length > 1)
method = splitVal[1];
if (splitVal.Length > 2)
machineId = splitVal[2];
}
else
{
method = rawKey;
}
var stat = kv.Value;
var count = Interlocked.Read(ref stat.Count);
var totalMs = stat.TotalDuration.TotalMilliseconds;
var maxMs = stat.MaxDuration.TotalMilliseconds;
// Se la durata è ancora MaxValue, usiamo la SentinelValue per dire a Redis "non aggiornare"
var minMs = (stat.MinDuration == TimeSpan.MaxValue)
? double.Parse(SentinelValue)
: stat.MinDuration.TotalMilliseconds;
// --- LOGICA HOURLY (Per ogni route/method e' una chiave distinta) ---
var hourKey = HourBucketKey(dest, method, hourStart);
var hoursIndex = HoursIndexKey(dest, method);
var hourScore = ToEpochSeconds(hourStart);
// Calcolo NoReply: Somma di tutti i codici >= 400 o errori espliciti
long noReplyCount = stat.ErrorMessages.Sum(x => x.Value);
// 1. INVIO BUCKET PRINCIPALE (con NoReply)
// Usiamo lo script Lua per l'aggiornamento atomico dell'ora
tasks.Add(batch.ScriptEvaluateAsync(_updateScript,
new RedisKey[] { hourKey },
new RedisValue[] {
count.ToString(CultureInfo.InvariantCulture),
totalMs.ToString(CultureInfo.InvariantCulture),
maxMs.ToString(CultureInfo.InvariantCulture),
minMs.ToString(CultureInfo.InvariantCulture),
noReplyCount.ToString(CultureInfo.InvariantCulture),
SentinelValue
}));
// 2. INVIO DISTRIBUZIONE STATUS CODES
if (stat.StatusCodes.Any())
{
var statusKey = hourKey + ":status";
foreach (var status in stat.StatusCodes)
{
tasks.Add(batch.HashIncrementAsync(statusKey, status.Key.ToString(), status.Value));
}
// Aggiungiamo anche questa chiave all'indice per la pulizia automatica
tasks.Add(batch.SortedSetAddAsync(hoursIndex, statusKey, hourScore));
}
// 3. INVIO DETTAGLIO ERRORI
if (stat.ErrorMessages.Any())
{
var errorKey = hourKey + ":errors";
foreach (var error in stat.ErrorMessages)
{
// Usiamo HashIncrement per aggregare messaggi uguali
tasks.Add(batch.HashIncrementAsync(errorKey, error.Key, error.Value));
}
tasks.Add(batch.SortedSetAddAsync(hoursIndex, errorKey, hourScore));
}
tasks.Add(batch.SortedSetAddAsync(hoursIndex, hourKey, hourScore));
// --- LOGICA DAILY (Aggregazione locale per evitare sovrascritture nel loop) ---
var dayKey = DayBucketKey(dest, machineId, dayStart);
var daysIndex = DaysIndexKey(dest, machineId);
var dayScore = ToEpochSeconds(dayStart);
if (!dailyAggregates.TryGetValue(dayKey, out var agg))
{
agg = new AggregatedStats();
dailyAggregates[dayKey] = agg;
}
agg.Count += count;
agg.NoReply += noReplyCount;
agg.TotalMs += totalMs;
agg.MaxMs = Math.Max(agg.MaxMs, maxMs);
if (minMs != double.MaxValue)
agg.MinMs = Math.Min(agg.MinMs, minMs);
// Aggiungiamo l'indice al batch
tasks.Add(batch.SortedSetAddAsync(daysIndex, dayKey, dayScore));
}
// --- INVIO AGGREGATI DAILY A REDIS ---
foreach (var dayKV in dailyAggregates)
{
var key = dayKV.Key;
var agg = dayKV.Value;
// Se agg.MinMs è inizializzato a double.MaxValue, lo portiamo alla SentinelValue
double finalMin = (agg.MinMs == double.MaxValue || agg.MinMs >= double.Parse(SentinelValue))
? double.Parse(SentinelValue)
: agg.MinMs;
tasks.Add(batch.ScriptEvaluateAsync(_updateScript,
new RedisKey[] { key },
new RedisValue[] {
agg.Count.ToString(CultureInfo.InvariantCulture),
agg.TotalMs.ToString(CultureInfo.InvariantCulture),
agg.MaxMs.ToString(CultureInfo.InvariantCulture),
finalMin.ToString(CultureInfo.InvariantCulture),
agg.NoReply.ToString(CultureInfo.InvariantCulture),
SentinelValue
}));
}
// Esegui tutto il batch in un unico round-trip
batch.Execute();
await Task.WhenAll(tasks);
_stats.Clear();
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
Log.Error(ex, "Error flushing metrics");
}
}
}
#endregion Protected Methods
#region Private Fields
private static string _redisBaseKey = "";
private static Logger Log = LogManager.GetCurrentClassLogger();
private readonly IConfiguration _config;
private readonly IDatabase _db;
private readonly RouteStatsManager _stats;
#endregion Private Fields
#region Private Methods
private static string DayBucketKey(string dest, string machId, DateTime dtRif)
{
return $"{_redisBaseKey}:stats:day:{dest}:{machId}:{dtRif.ToString("yyyyMMdd", CultureInfo.InvariantCulture)}";
}
private static string DaysIndexKey(string dest, string machId)
{
return $"{_redisBaseKey}:stats:days:{dest}:{machId}";
}
private static string HourBucketKey(string dest, string method, DateTime dtRif)
{
return $"{_redisBaseKey}:stats:hour:{dest}:{method}:{dtRif.ToString("yyyyMMddHH", CultureInfo.InvariantCulture)}";
}
private static string HoursIndexKey(string dest, string method)
{
return $"{_redisBaseKey}:stats:hours:{dest}:{method}";
}
private static long ToEpochSeconds(DateTime dt)
{
return new DateTimeOffset(dt).ToUnixTimeSeconds();
}
#endregion Private Methods
}
}