Files
mapo-core/MP.IOC/Services/MetricsCalcService.cs
T
2026-04-10 09:55:53 +02:00

240 lines
10 KiB
C#

using NLog;
using StackExchange.Redis;
using System.Globalization;
namespace MP.IOC.Services
{
public class MetricsCalcService : BackgroundService
{
#region Public Constructors
/// <summary>
/// Metodo x calcolo metriche/statistiche di esecuzone realtime
/// </summary>
/// <param name="stats"></param>
/// <param name="config"></param>
/// <param name="mux"></param>
public MetricsCalcService(RouteStatsManager stats, IConfiguration config, IConnectionMultiplexer mux)
{
_stats = stats;
_config = config;
_db = mux.GetDatabase();
_redisBaseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
}
#endregion Public Constructors
#region Protected Methods
/// <summary>
/// Valore SentinelValue x valore minimo in reids/Lua
/// Usiamo un valore molto grande (es. 10 milioni di secondi = ~115 giorni).
/// E' impossibile che una singola richiesta HTTP duri cos tanto, quindi usarlo come "SentinelValue" sicuro.
/// </summary>
private const string SentinelValue = "999999999";
/// <summary>
/// Script update Redis in Lua, definito come costante per efficienza.
/// Lo script gestisce l'incremento e la logica condizionale per Min/Max in un'unica operazione atomica.
/// </summary>
private const string RedisUpdateScript = @"
local key = KEYS[1]
local countInc = tonumber(ARGV[1])
local totalMsInc = tonumber(ARGV[2])
local newMax = tonumber(ARGV[3])
local newMin = tonumber(ARGV[4])
local sentinel = tonumber(ARGV[5])
-- 1. Incremento Count e TotalDuration
redis.call('HINCRBY', key, 'count', countInc)
redis.call('HINCRBYFLOAT', key, 'totalMs', totalMsInc)
-- 2. Aggiornamento Max (solo se il nuovo valore è maggiore del precedente E non la SentinelValue)
local currentMax = redis.call('HGET', key, 'maxMs')
if newMax < sentinel and (not currentMax or newMax > tonumber(currentMax)) then
redis.call('HSET', key, 'maxMs', tostring(newMax))
end
-- 3. Aggiornamento Min (solo se il nuovo valore è minore del precedente E non la SentinelValue)
local currentMin = redis.call('HGET', key, 'minMs')
if newMin < sentinel and (not currentMin or newMin < tonumber(currentMin)) then
redis.call('HSET', key, 'minMs', tostring(newMin))
end
return 1
";
// Classe di supporto per l'aggregazione locale dei valori Daily
private class AggregatedStats
{
public long Count;
public double TotalMs;
public double MaxMs;
public double MinMs = double.MaxValue;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var interval = _config.GetValue<int>("RouteMan:MetricCalcIntervalSeconds", 20);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
var snapshot = _stats.Snapshot();
if (snapshot.Count == 0) continue;
var adesso = DateTime.Now;
var hourStart = new DateTime(adesso.Year, adesso.Month, adesso.Day, adesso.Hour, 0, 0);
var dayStart = new DateTime(adesso.Year, adesso.Month, adesso.Day, 0, 0, 0);
if (_db == null) continue;
var batch = _db.CreateBatch();
var tasks = new List<Task>();
// Dizionario per aggregare i valori Daily prima di inviarli a Redis
var dailyAggregates = new Dictionary<string, AggregatedStats>();
foreach (var kv in snapshot)
{
string dest = "IO";
string method = "NA";
string rawKey = kv.Key;
if (rawKey.Contains("|"))
{
var splitVal = rawKey.Split("|");
dest = splitVal[0];
method = splitVal[1];
}
else { method = rawKey; }
var stat = kv.Value;
var count = Interlocked.Read(ref stat.Count);
var totalMs = stat.TotalDuration.TotalMilliseconds;
var maxMs = stat.MaxDuration.TotalMilliseconds;
// Se la durata è ancora MaxValue, usiamo la SentinelValue per dire a Redis "non aggiornare"
var minMs = (stat.MinDuration == TimeSpan.MaxValue)
? double.Parse(SentinelValue)
: stat.MinDuration.TotalMilliseconds;
// --- LOGICA HOURLY (Per ogni route/method una chiave distinta) ---
var hourKey = HourBucketKey(dest, method, hourStart);
var hoursIndex = HoursIndexKey(dest, method);
var hourScore = ToEpochSeconds(hourStart);
// Usiamo lo script Lua per l'aggiornamento atomico dell'ora
tasks.Add(batch.ScriptEvaluateAsync(RedisUpdateScript,
new RedisKey[] { hourKey },
new RedisValue[] {
count.ToString(),
totalMs.ToString(),
maxMs.ToString(),
minMs.ToString(),
SentinelValue
}));
tasks.Add(batch.SortedSetAddAsync(hoursIndex, hourKey, hourScore));
// --- LOGICA DAILY (Aggregazione locale per evitare sovrascritture nel loop) ---
var dayKey = DayBucketKey(dest, dayStart);
var daysIndex = DaysIndexKey(dest);
var dayScore = ToEpochSeconds(dayStart);
if (!dailyAggregates.TryGetValue(dayKey, out var agg))
{
agg = new AggregatedStats();
dailyAggregates[dayKey] = agg;
}
agg.Count += count;
agg.TotalMs += totalMs;
agg.MaxMs = Math.Max(agg.MaxMs, maxMs);
if (minMs != double.MaxValue)
agg.MinMs = Math.Min(agg.MinMs, minMs);
// Aggiungiamo l'indice al batch
tasks.Add(batch.SortedSetAddAsync(daysIndex, dayKey, dayScore));
}
// --- INVIO AGGREGATI DAILY A REDIS ---
foreach (var dayKV in dailyAggregates)
{
var key = dayKV.Key;
var agg = dayKV.Value;
// Se agg.MinMs è inizializzato a double.MaxValue, lo portiamo alla SentinelValue
double finalMin = (agg.MinMs == double.MaxValue || agg.MinMs >= double.Parse(SentinelValue))
? double.Parse(SentinelValue)
: agg.MinMs;
tasks.Add(batch.ScriptEvaluateAsync(RedisUpdateScript,
new RedisKey[] { key },
new RedisValue[] {
agg.Count.ToString(),
agg.TotalMs.ToString(),
agg.MaxMs.ToString(),
finalMin.ToString(),
SentinelValue
}));
}
// Esegui tutto il batch in un unico round-trip
batch.Execute();
await Task.WhenAll(tasks);
_stats.Clear();
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
Log.Error(ex, "Error flushing metrics");
}
}
}
#endregion Protected Methods
#region Private Fields
private static string _redisBaseKey = "";
private static Logger Log = LogManager.GetCurrentClassLogger();
private readonly IConfiguration _config;
private readonly IDatabase _db;
private readonly RouteStatsManager _stats;
#endregion Private Fields
#region Private Methods
//private static string DayBucketKey(string dest, string method, DateTime dtRif)
private static string DayBucketKey(string dest, DateTime dtRif)
{
return $"{_redisBaseKey}:stats:day:{dest}:{dtRif.ToString("yyyyMMdd", CultureInfo.InvariantCulture)}";
//return $"{_redisBaseKey}:stats:day:{dest}:{method}:{dtRif.ToString("yyyyMMdd", CultureInfo.InvariantCulture)}";
}
//private static string DaysIndexKey(string dest, string method)
private static string DaysIndexKey(string dest)
{
return $"{_redisBaseKey}:stats:days:{dest}";
//return $"{_redisBaseKey}:stats:days:{dest}:{method}";
}
private static string HourBucketKey(string dest, string method, DateTime dtRif)
{
return $"{_redisBaseKey}:stats:hour:{dest}:{method}:{dtRif.ToString("yyyyMMddHH", CultureInfo.InvariantCulture)}";
}
private static string HoursIndexKey(string dest, string method)
{
return $"{_redisBaseKey}:stats:hours:{dest}:{method}";
}
private static long ToEpochSeconds(DateTime dt)
{
return new DateTimeOffset(dt).ToUnixTimeSeconds();
}
#endregion Private Methods
}
}