256 lines
11 KiB
C#
256 lines
11 KiB
C#
using NLog;
|
|
using StackExchange.Redis;
|
|
using System.Globalization;
|
|
|
|
namespace MP.RIOC.Services
|
|
{
|
|
public class MetricsCalcService : BackgroundService
|
|
{
|
|
#region Public Constructors
|
|
|
|
/// <summary>
|
|
/// Metodo x calcolo metriche/statistiche di esecuzone realtime
|
|
/// </summary>
|
|
/// <param name="stats"></param>
|
|
/// <param name="config"></param>
|
|
/// <param name="mux"></param>
|
|
public MetricsCalcService(RouteStatsManager stats,
|
|
LuaScriptProvider luaProvider,
|
|
IConfiguration config,
|
|
IConnectionMultiplexer mux)
|
|
{
|
|
_stats = stats;
|
|
_config = config;
|
|
_db = mux.GetDatabase();
|
|
_redisBaseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
_updateScript = luaProvider.Get("Update");
|
|
}
|
|
|
|
#endregion Public Constructors
|
|
|
|
#region Protected Methods
|
|
|
|
/// <summary>
|
|
/// Valore SentinelValue x valore minimo in reids/Lua
|
|
/// Usiamo un valore molto grande (es. 10 milioni di secondi = ~115 giorni).
|
|
/// E' impossibile che una singola richiesta HTTP duri cos� tanto, quindi usarlo come "SentinelValue" � sicuro.
|
|
/// </summary>
|
|
private const string SentinelValue = "999999999";
|
|
|
|
/// <summary>
|
|
/// Script update Redis in Lua, recuperato dal provider script.
|
|
/// Lo script gestisce l'incremento e la logica condizionale per Min/Max in un'unica operazione atomica.
|
|
/// </summary>
|
|
private readonly string _updateScript;
|
|
|
|
|
|
// Classe di supporto per l'aggregazione locale dei valori Daily
|
|
private class AggregatedStats
|
|
{
|
|
public long Count;
|
|
public long NoReply;
|
|
public double TotalMs;
|
|
public double MaxMs;
|
|
public double MinMs = double.MaxValue;
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var interval = _config.GetValue<int>("RouteMan:MetricCalcIntervalSeconds", 30);
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
|
|
var snapshot = _stats.Snapshot();
|
|
if (snapshot.Count == 0) continue;
|
|
|
|
var adesso = DateTime.Now;
|
|
var hourStart = new DateTime(adesso.Year, adesso.Month, adesso.Day, adesso.Hour, 0, 0);
|
|
var dayStart = new DateTime(adesso.Year, adesso.Month, adesso.Day, 0, 0, 0);
|
|
|
|
if (_db == null) continue;
|
|
|
|
var batch = _db.CreateBatch();
|
|
var tasks = new List<Task>();
|
|
|
|
// Dizionario per aggregare i valori Daily prima di inviarli a Redis
|
|
var dailyAggregates = new Dictionary<string, AggregatedStats>();
|
|
|
|
foreach (var kv in snapshot)
|
|
{
|
|
string dest = "IO";
|
|
string method = "NA";
|
|
string machineId = "ALL";
|
|
string rawKey = kv.Key;
|
|
if (rawKey.Contains("|"))
|
|
{
|
|
var splitVal = rawKey.Split("|");
|
|
dest = splitVal[0];
|
|
if (splitVal.Length > 1)
|
|
method = splitVal[1];
|
|
if (splitVal.Length > 2)
|
|
machineId = splitVal[2];
|
|
}
|
|
else
|
|
{
|
|
method = rawKey;
|
|
}
|
|
|
|
var stat = kv.Value;
|
|
var count = Interlocked.Read(ref stat.Count);
|
|
var totalMs = stat.TotalDuration.TotalMilliseconds;
|
|
var maxMs = stat.MaxDuration.TotalMilliseconds;
|
|
// Se la durata è ancora MaxValue, usiamo la SentinelValue per dire a Redis "non aggiornare"
|
|
var minMs = (stat.MinDuration == TimeSpan.MaxValue)
|
|
? double.Parse(SentinelValue)
|
|
: stat.MinDuration.TotalMilliseconds;
|
|
|
|
// --- LOGICA HOURLY (Per ogni route/method e' una chiave distinta) ---
|
|
var hourKey = HourBucketKey(dest, method, hourStart);
|
|
var hoursIndex = HoursIndexKey(dest, method);
|
|
var hourScore = ToEpochSeconds(hourStart);
|
|
|
|
// Calcolo NoReply: Somma di tutti i codici >= 400 o errori espliciti
|
|
long noReplyCount = stat.ErrorMessages.Sum(x => x.Value);
|
|
|
|
// 1. INVIO BUCKET PRINCIPALE (con NoReply)
|
|
// Usiamo lo script Lua per l'aggiornamento atomico dell'ora
|
|
tasks.Add(batch.ScriptEvaluateAsync(_updateScript,
|
|
new RedisKey[] { hourKey },
|
|
new RedisValue[] {
|
|
count.ToString(CultureInfo.InvariantCulture),
|
|
totalMs.ToString(CultureInfo.InvariantCulture),
|
|
maxMs.ToString(CultureInfo.InvariantCulture),
|
|
minMs.ToString(CultureInfo.InvariantCulture),
|
|
noReplyCount.ToString(CultureInfo.InvariantCulture),
|
|
SentinelValue
|
|
}));
|
|
|
|
// 2. INVIO DISTRIBUZIONE STATUS CODES
|
|
if (stat.StatusCodes.Any())
|
|
{
|
|
var statusKey = hourKey + ":status";
|
|
foreach (var status in stat.StatusCodes)
|
|
{
|
|
tasks.Add(batch.HashIncrementAsync(statusKey, status.Key.ToString(), status.Value));
|
|
}
|
|
// Aggiungiamo anche questa chiave all'indice per la pulizia automatica
|
|
tasks.Add(batch.SortedSetAddAsync(hoursIndex, statusKey, hourScore));
|
|
}
|
|
|
|
// 3. INVIO DETTAGLIO ERRORI
|
|
if (stat.ErrorMessages.Any())
|
|
{
|
|
var errorKey = hourKey + ":errors";
|
|
foreach (var error in stat.ErrorMessages)
|
|
{
|
|
// Usiamo HashIncrement per aggregare messaggi uguali
|
|
tasks.Add(batch.HashIncrementAsync(errorKey, error.Key, error.Value));
|
|
}
|
|
tasks.Add(batch.SortedSetAddAsync(hoursIndex, errorKey, hourScore));
|
|
}
|
|
|
|
tasks.Add(batch.SortedSetAddAsync(hoursIndex, hourKey, hourScore));
|
|
|
|
// --- LOGICA DAILY (Aggregazione locale per evitare sovrascritture nel loop) ---
|
|
var dayKey = DayBucketKey(dest, machineId, dayStart);
|
|
var daysIndex = DaysIndexKey(dest, machineId);
|
|
var dayScore = ToEpochSeconds(dayStart);
|
|
|
|
if (!dailyAggregates.TryGetValue(dayKey, out var agg))
|
|
{
|
|
agg = new AggregatedStats();
|
|
dailyAggregates[dayKey] = agg;
|
|
}
|
|
|
|
agg.Count += count;
|
|
agg.NoReply += noReplyCount;
|
|
agg.TotalMs += totalMs;
|
|
agg.MaxMs = Math.Max(agg.MaxMs, maxMs);
|
|
if (minMs != double.MaxValue)
|
|
agg.MinMs = Math.Min(agg.MinMs, minMs);
|
|
|
|
// Aggiungiamo l'indice al batch
|
|
tasks.Add(batch.SortedSetAddAsync(daysIndex, dayKey, dayScore));
|
|
}
|
|
|
|
// --- INVIO AGGREGATI DAILY A REDIS ---
|
|
foreach (var dayKV in dailyAggregates)
|
|
{
|
|
var key = dayKV.Key;
|
|
var agg = dayKV.Value;
|
|
// Se agg.MinMs è inizializzato a double.MaxValue, lo portiamo alla SentinelValue
|
|
double finalMin = (agg.MinMs == double.MaxValue || agg.MinMs >= double.Parse(SentinelValue))
|
|
? double.Parse(SentinelValue)
|
|
: agg.MinMs;
|
|
|
|
tasks.Add(batch.ScriptEvaluateAsync(_updateScript,
|
|
new RedisKey[] { key },
|
|
new RedisValue[] {
|
|
agg.Count.ToString(CultureInfo.InvariantCulture),
|
|
agg.TotalMs.ToString(CultureInfo.InvariantCulture),
|
|
agg.MaxMs.ToString(CultureInfo.InvariantCulture),
|
|
finalMin.ToString(CultureInfo.InvariantCulture),
|
|
agg.NoReply.ToString(CultureInfo.InvariantCulture),
|
|
SentinelValue
|
|
}));
|
|
}
|
|
|
|
// Esegui tutto il batch in un unico round-trip
|
|
batch.Execute();
|
|
await Task.WhenAll(tasks);
|
|
|
|
_stats.Clear();
|
|
}
|
|
catch (TaskCanceledException) { }
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error flushing metrics");
|
|
}
|
|
}
|
|
}
|
|
#endregion Protected Methods
|
|
|
|
#region Private Fields
|
|
|
|
private static string _redisBaseKey = "";
|
|
private static Logger Log = LogManager.GetCurrentClassLogger();
|
|
private readonly IConfiguration _config;
|
|
private readonly IDatabase _db;
|
|
private readonly RouteStatsManager _stats;
|
|
|
|
#endregion Private Fields
|
|
|
|
#region Private Methods
|
|
|
|
private static string DayBucketKey(string dest, string machId, DateTime dtRif)
|
|
{
|
|
return $"{_redisBaseKey}:stats:day:{dest}:{machId}:{dtRif.ToString("yyyyMMdd", CultureInfo.InvariantCulture)}";
|
|
}
|
|
|
|
private static string DaysIndexKey(string dest, string machId)
|
|
{
|
|
return $"{_redisBaseKey}:stats:days:{dest}:{machId}";
|
|
}
|
|
|
|
private static string HourBucketKey(string dest, string method, DateTime dtRif)
|
|
{
|
|
return $"{_redisBaseKey}:stats:hour:{dest}:{method}:{dtRif.ToString("yyyyMMddHH", CultureInfo.InvariantCulture)}";
|
|
}
|
|
|
|
private static string HoursIndexKey(string dest, string method)
|
|
{
|
|
return $"{_redisBaseKey}:stats:hours:{dest}:{method}";
|
|
}
|
|
|
|
private static long ToEpochSeconds(DateTime dt)
|
|
{
|
|
return new DateTimeOffset(dt).ToUnixTimeSeconds();
|
|
}
|
|
|
|
#endregion Private Methods
|
|
}
|
|
}
|