Files
mapo-core/MP.RIOC/Services/MetricsDbFlushService.cs
T
2026-06-11 20:58:02 +02:00

470 lines
20 KiB
C#

using MP.Data.DbModels.Utils;
using MP.Data.Services.Utils;
using NLog;
using StackExchange.Redis;
using System.Globalization;
namespace MP.RIOC.Services
{
public class MetricsDbFlushService : BackgroundService
{
#region Public Constructors
public MetricsDbFlushService(
IServiceScopeFactory scopeFactory,
IConfiguration config,
IConnectionMultiplexer mux)
{
_scopeFactory = scopeFactory;
_config = config;
_mux = mux;
_db = mux.GetDatabase();
}
#endregion Public Constructors
#region Protected Methods
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var interval = _config.GetValue<int>("RouteMan:MetricFlushIntervalSeconds", 240);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessDayLiveMetricsAsync();
await ProcessHourLiveMetricsAsync();
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
}
catch (TaskCanceledException) { break; }
catch (Exception ex)
{
Log.Error(ex, "Error flushing metrics to database");
}
}
}
#endregion Protected Methods
#region Private Fields
private const double SentinelValue = 999999999;
private static readonly Logger Log = LogManager.GetCurrentClassLogger();
private readonly IConfiguration _config;
private readonly IDatabase _db;
private readonly IConnectionMultiplexer _mux;
private readonly IServiceScopeFactory _scopeFactory;
#endregion Private Fields
#region Private Properties
private string _redisBaseKey => _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
#endregion Private Properties
#region Private Methods
/// <summary>
/// Cancellazione ricorsiva chiavi ausiliarie (:status, :errors) e rimozione dall'indice
/// </summary>
private async Task DeleteAuxKeysAndIndexAsync(RedisKey sKey, RedisKey indexKey, IBatch batch, List<Task> pendingOps)
{
string sKeyStr = sKey.ToString();
string keyDir = sKeyStr.Substring(0, sKeyStr.LastIndexOf(':'));
// Cancella :status dal sorted set e dalla hash
string statusKey = keyDir + ":status";
try
{
pendingOps.Add(batch.SortedSetRemoveAsync(indexKey, statusKey));
pendingOps.Add(batch.KeyDeleteAsync(statusKey));
}
catch { }
// Cancella :errors dal sorted set e dalla hash
string errorKey = keyDir + ":errors";
try
{
pendingOps.Add(batch.SortedSetRemoveAsync(indexKey, errorKey));
pendingOps.Add(batch.KeyDeleteAsync(errorKey));
}
catch { }
// Cancella chiave ausiliaria :status anche da un eventuale indice days se presente
if (statusKey.Contains(":stats:hours:"))
{
string daysIndex = statusKey.Replace(":stats:hours:", ":stats:days:");
try
{
pendingOps.Add(batch.SortedSetRemoveAsync(daysIndex, statusKey));
}
catch { }
}
// Cancella chiave ausiliaria :errors anche da un eventuale indice days se presente
if (errorKey.Contains(":stats:hours:"))
{
string daysIndex = errorKey.Replace(":stats:hours:", ":stats:days:");
try
{
pendingOps.Add(batch.SortedSetRemoveAsync(daysIndex, errorKey));
}
catch { }
}
}
/// <summary>
/// Recupera il TTL residuo di una chiave Redis (-1 = nessun TTL, -2 = chiave non esiste)
/// </summary>
private TimeSpan? GetKeyTtl(RedisKey key)
{
try
{
return _db.KeyTimeToLive(key);
}
catch { return null; }
}
/// <summary>
/// Processing dati giornalieri (da Redis a DB)
/// </summary>
/// <returns></returns>
private async Task ProcessDayLiveMetricsAsync()
{
var aggrRecordsToInsert = new List<StatsAggregatedModel>();
var keysToDelete = new List<RedisKey>();
bool deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
DateTime now = DateTime.Now;
// Confini temporali per proteggere i dati in corso
DateTime currentDayStart = DateTime.Today;
var endpoints = _mux.GetEndPoints();
foreach (var endpoint in endpoints)
{
var server = _mux.GetServer(endpoint);
if (server.IsReplica)
{
continue;
}
string[] patternsToScan = {
$"{_redisBaseKey}:stats:days:*"
};
var batch = _db.CreateBatch();
var pendingOps = new List<Task>();
foreach (var pattern in patternsToScan)
{
// Nota: KeyScanAsync/KeysAsync e' disponibile su IServer
await foreach (var indexKey in server.KeysAsync(pattern: pattern))
{
if (string.IsNullOrEmpty($"{indexKey}")) continue;
// CORREZIONE: Utilizzo di SortedSetRangeByRankAsync con range 0 a -1 per prendere tutto il set
var memberKeys = await _db.SortedSetRangeByRankAsync(indexKey, 0, -1);
foreach (var statKey in memberKeys)
{
var sKey = (RedisKey)$"{statKey}";
if (!TryParseKeyMetadata(sKey, out var meta) || meta.IsHourType) continue;
// Verifica se la chiave è "orfana"
bool isOrphanKey = false;
var keyTtl = GetKeyTtl(sKey);
if (keyTtl == null)
{
// Nessun TTL recuperato = chiave senza scadenza = orfana se > 30 gg
DateTime cutoffDay = DateTime.Today.AddDays(-30);
isOrphanKey = meta.Timestamp < cutoffDay;
}
else
{
isOrphanKey = keyTtl.Value.TotalSeconds < 0 || keyTtl.Value.TotalDays < 1;
}
// Se è orfana e abbiamo il permesso, segnamola per la cancellazione
// NOTA: non usiamo il confronto con currentDayStart perché taglierebbe fuori
// i dati delle ore/giorni precedenti che devono essere ancora processati dal DB
if (isOrphanKey && deleteConfirmed)
{
// 1. Segna la chiave Hash (Dati) per l'eliminazione
keysToDelete.Add(sKey);
// 2. Rimuovi il riferimento dal Sorted Set (Indice)
pendingOps.Add(batch.SortedSetRemoveAsync(indexKey, statKey));
// 3. Cancellazione ricorsiva delle chiavi ausiliarie (:status, :errors, :days)
await DeleteAuxKeysAndIndexAsync(sKey, indexKey, batch, pendingOps);
}
// Recupero dati dalla Hash
var hashData = await _db.HashGetAllAsync(sKey);
if (hashData.Length == 0) continue;
var dict = hashData.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
if (dict.TryGetValue("count", out var countStr) &&
dict.TryGetValue("totalMs", out var totalMsStr))
{
long count = long.Parse(countStr);
double totalMs = double.Parse(totalMsStr, CultureInfo.InvariantCulture);
double maxMs = dict.ContainsKey("maxMs") ? double.Parse(dict["maxMs"], CultureInfo.InvariantCulture) : 0;
double minMs = dict.ContainsKey("minMs") ? double.Parse(dict["minMs"], CultureInfo.InvariantCulture) : 0;
// TUA CORREZIONE: Reset sentinella per evitare valori fuori scala nel DB
if (minMs >= SentinelValue) minMs = SentinelValue;
if (maxMs >= SentinelValue) maxMs = SentinelValue;
if (count <= 0) continue;
// recupero se presente noReply
long noReply = 0;
if (dict.TryGetValue("noReply", out var sNoReply))
{
noReply = long.Parse(sNoReply);
}
aggrRecordsToInsert.Add(new StatsAggregatedModel
{
Destination = meta.Dest,
MachineId = meta.MachId,
Hour = meta.Timestamp,
RequestCount = count,
AvgDuration = totalMs / count,
MinDuration = minMs,
MaxDuration = maxMs,
NoReply = noReply
});
}
}
}
}
batch.Execute();
// attendo conclusione
await Task.WhenAll(pendingOps);
}
// --- FASE UPSERT DB ---
if (aggrRecordsToInsert.Any())
{
await using var scope = _scopeFactory.CreateAsyncScope();
var aggrService = scope.ServiceProvider.GetRequiredService<IStatsAggrService>();
await aggrService.UpsertManyAsync(aggrRecordsToInsert, false);
Log.Info($"[DAY] Upserted {aggrRecordsToInsert.Count} records to DB");
}
// --- FASE PULIZIA REDIS ---
if (deleteConfirmed && keysToDelete.Any())
{
var batch = _db.CreateBatch();
var pendingOps = new List<Task>();
int deletedCount = 0;
foreach (var key in keysToDelete)
{
pendingOps.Add(batch.KeyDeleteAsync(key));
deletedCount++;
}
batch.Execute();
// attendo conclusione
await Task.WhenAll(pendingOps);
Log.Info($"[CLEANUP DAY] Deleted {deletedCount} expired metric keys from Redis");
}
}
/// <summary>
/// Processing dati orari (da Redis a DB)
/// </summary>
/// <returns></returns>
private async Task ProcessHourLiveMetricsAsync()
{
var detailRecordsToInsert = new List<StatsDetailModel>();
var keysToDelete = new List<RedisKey>();
bool deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
DateTime now = DateTime.Now;
// Confini temporali per proteggere i dati in corso
DateTime currentHourStart = new DateTime(now.Year, now.Month, now.Day, now.Hour, 0, 0);
DateTime currentDayStart = new DateTime(now.Year, now.Month, now.Day, 0, 0, 0);
var endpoints = _mux.GetEndPoints();
foreach (var endpoint in endpoints)
{
var server = _mux.GetServer(endpoint);
if (server.IsReplica)
{
continue;
}
string[] patternsToScan = {
$"{_redisBaseKey}:stats:hours:*"
};
var batch = _db.CreateBatch();
var pendingOps = new List<Task>();
foreach (var pattern in patternsToScan)
{
// Nota: KeyScanAsync/KeysAsync e' disponibile su IServer
await foreach (var indexKey in server.KeysAsync(pattern: pattern))
{
if (string.IsNullOrEmpty($"{indexKey}")) continue;
// CORREZIONE: Utilizzo di SortedSetRangeByRankAsync con range 0 a -1 per prendere tutto il set
var memberKeys = await _db.SortedSetRangeByRankAsync(indexKey, 0, -1);
foreach (var statKey in memberKeys)
{
var sKey = (RedisKey)$"{statKey}";
if (!TryParseKeyMetadata(sKey, out var meta) || !meta.IsHourType) continue;
// Verifica se la chiave è "orfana"
bool isOrphanKey = false;
var keyTtl = GetKeyTtl(sKey);
if (keyTtl == null)
{
// Nessun TTL recuperato = chiave senza scadenza = orfana se > 7 gg
DateTime cutoffHour = DateTime.Now.AddDays(-7);
isOrphanKey = meta.Timestamp < cutoffHour;
}
else
{
isOrphanKey = keyTtl.Value.TotalSeconds < 0 || keyTtl.Value.TotalDays < 15;
}
// Se è orfana e abbiamo il permesso, segnamola per la cancellazione
// NOTA: non usiamo il confronto con currentHourStart perché taglierebbe fuori
// i dati delle ore precedenti che devono essere ancora processati dal DB
if (isOrphanKey && deleteConfirmed)
{
// 1. Segna la chiave Hash (Dati) per l'eliminazione
keysToDelete.Add(sKey);
// 2. Rimuovi il riferimento dal Sorted Set (Indice)
pendingOps.Add(batch.SortedSetRemoveAsync(indexKey, statKey));
// 3. Cancellazione ricorsiva chiavi ausiliarie
await DeleteAuxKeysAndIndexAsync(sKey, indexKey, batch, pendingOps);
}
// Recupero dati dalla Hash
var hashData = await _db.HashGetAllAsync(sKey);
if (hashData.Length == 0) continue;
var dict = hashData.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
if (dict.TryGetValue("count", out var countStr) &&
dict.TryGetValue("totalMs", out var totalMsStr))
{
long count = long.Parse(countStr);
double totalMs = double.Parse(totalMsStr, CultureInfo.InvariantCulture);
double maxMs = dict.ContainsKey("maxMs") ? double.Parse(dict["maxMs"], CultureInfo.InvariantCulture) : 0;
double minMs = dict.ContainsKey("minMs") ? double.Parse(dict["minMs"], CultureInfo.InvariantCulture) : 0;
// TUA CORREZIONE: Reset sentinella per evitare valori fuori scala nel DB
if (minMs >= SentinelValue) minMs = SentinelValue;
if (maxMs >= SentinelValue) maxMs = SentinelValue;
if (count <= 0) continue;
// recupero se presente noReply
long noReply = 0;
if (dict.TryGetValue("noReply", out var sNoReply))
{
noReply = long.Parse(sNoReply);
}
detailRecordsToInsert.Add(new StatsDetailModel
{
Destination = meta.Dest,
Type = meta.Method,
Hour = meta.Timestamp,
RequestCount = count,
AvgDuration = totalMs / count,
MinDuration = minMs,
MaxDuration = maxMs,
NoReply = noReply
});
}
}
}
}
batch.Execute();
// attendo conclusione
await Task.WhenAll(pendingOps);
}
// --- FASE UPSERT DB ---
if (detailRecordsToInsert.Count > 0)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var detailService = scope.ServiceProvider.GetRequiredService<IStatsDetailService>();
await detailService.UpsertManyAsync(detailRecordsToInsert, false);
Log.Info($"[HOUR] Upserted {detailRecordsToInsert.Count} records to DB");
}
// --- FASE PULIZIA REDIS ---
if (deleteConfirmed && keysToDelete.Count > 0)
{
var batch = _db.CreateBatch();
var pendingOps = new List<Task>();
int deletedCount = 0;
foreach (var key in keysToDelete)
{
pendingOps.Add(batch.KeyDeleteAsync(key));
deletedCount++;
}
batch.Execute();
// attendo conclusione
await Task.WhenAll(pendingOps);
Log.Info($"[CLEANUP HOUR] Deleted {deletedCount} expired metric keys from Redis");
}
}
private bool TryParseKeyMetadata(RedisKey key, out KeyMeta meta)
{
meta = new KeyMeta();
try
{
string k = key.ToString();
string relativeKey = k.Replace($"{_redisBaseKey}:", "");
var p = relativeKey.Split(':');
if (p.Length < 4) return false;
meta.IsHourType = p[1].Contains("hour", StringComparison.InvariantCultureIgnoreCase);
meta.Dest = p[2];
if (meta.IsHourType)
{
meta.Method = p[3];
if (p.Length >= 5) DateTime.TryParseExact(p[4], "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out meta.Timestamp);
}
else
{
meta.Method = "DAILY";
meta.MachId = p[3];
if (p.Length >= 5) DateTime.TryParseExact(p[4], "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out meta.Timestamp);
}
return meta.Timestamp != DateTime.MinValue;
}
catch { return false; }
}
#endregion Private Methods
private record KeyMeta
{
public string Dest = "NA";
public string Method = "NA";
public string MachId = "ALL";
public DateTime Timestamp = DateTime.MinValue;
public bool IsHourType = true;
}
}
}