433 lines
18 KiB
C#
433 lines
18 KiB
C#
using MP.Data.DbModels.Utils;
|
|
using MP.Data.Services.Utils;
|
|
using NLog;
|
|
using StackExchange.Redis;
|
|
using System.Globalization;
|
|
|
|
namespace MP.RIOC.Services
|
|
{
|
|
public class MetricsDbFlushService : BackgroundService
|
|
{
|
|
#region Public Constructors
|
|
|
|
public MetricsDbFlushService(
|
|
IServiceScopeFactory scopeFactory,
|
|
IConfiguration config,
|
|
IConnectionMultiplexer mux)
|
|
{
|
|
_scopeFactory = scopeFactory;
|
|
_config = config;
|
|
_mux = mux;
|
|
_db = mux.GetDatabase();
|
|
}
|
|
|
|
#endregion Public Constructors
|
|
|
|
#region Protected Methods
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var interval = _config.GetValue<int>("RouteMan:MetricFlushIntervalSeconds", 240);
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await ProcessDayLiveMetricsAsync();
|
|
await ProcessHourLiveMetricsAsync();
|
|
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
|
|
}
|
|
catch (TaskCanceledException) { break; }
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error flushing metrics to database");
|
|
}
|
|
}
|
|
}
|
|
|
|
#endregion Protected Methods
|
|
|
|
#region Private Fields
|
|
|
|
private const double SentinelValue = 999999999;
|
|
private static readonly Logger Log = LogManager.GetCurrentClassLogger();
|
|
private readonly IConfiguration _config;
|
|
private readonly IDatabase _db;
|
|
private readonly IConnectionMultiplexer _mux;
|
|
private readonly IServiceScopeFactory _scopeFactory;
|
|
|
|
#endregion Private Fields
|
|
|
|
#region Private Properties
|
|
|
|
private string _redisBaseKey => _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
|
|
#endregion Private Properties
|
|
|
|
#region Private Methods
|
|
|
|
/// <summary>
|
|
/// Processing dati giornalieri (da Redis a DB)
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
private async Task ProcessDayLiveMetricsAsync()
|
|
{
|
|
var aggrRecordsToInsert = new List<StatsAggregatedModel>();
|
|
var keysToDelete = new List<RedisKey>();
|
|
|
|
bool deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
|
|
DateTime now = DateTime.Now;
|
|
|
|
// Confini temporali per proteggere i dati in corso
|
|
DateTime currentDayStart = DateTime.Today;
|
|
|
|
var endpoints = _mux.GetEndPoints();
|
|
|
|
foreach (var endpoint in endpoints)
|
|
{
|
|
var server = _mux.GetServer(endpoint);
|
|
|
|
if (server.IsReplica)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
string[] patternsToScan = {
|
|
$"{_redisBaseKey}:stats:days:*"
|
|
};
|
|
|
|
var batch = _db.CreateBatch();
|
|
foreach (var pattern in patternsToScan)
|
|
{
|
|
// Nota: KeyScanAsync/KeysAsync e' disponibile su IServer
|
|
await foreach (var indexKey in server.KeysAsync(pattern: pattern))
|
|
{
|
|
if (string.IsNullOrEmpty($"{indexKey}")) continue;
|
|
|
|
// CORREZIONE: Utilizzo di SortedSetRangeByRankAsync con range 0 a -1 per prendere tutto il set
|
|
var memberKeys = await _db.SortedSetRangeByRankAsync(indexKey, 0, -1);
|
|
|
|
foreach (var statKey in memberKeys)
|
|
{
|
|
var sKey = (RedisKey)$"{statKey}";
|
|
#if false
|
|
if (!TryParseKeyMetadata(sKey, out string dest, out string method, out string machId, out DateTime timestamp, out bool isHourType))
|
|
continue;
|
|
#endif
|
|
if (!TryParseKeyMetadata(sKey, out var meta) || meta.IsHourType) continue;
|
|
|
|
// Se fosse scaduta e abbiamo il permesso, segnamola per la cancellazione
|
|
if (meta.Timestamp < currentDayStart && deleteConfirmed)
|
|
{
|
|
// 1. Segna la chiave Hash (Dati) per l'eliminazione
|
|
keysToDelete.Add(sKey);
|
|
|
|
// 2. CORREZIONE: Devi rimuovere il riferimento dal Sorted Set (Indice)
|
|
// Usiamo il batch per essere efficienti
|
|
_ = batch.SortedSetRemoveAsync(indexKey, statKey);
|
|
}
|
|
|
|
// Recupero dati dalla Hash
|
|
var hashData = await _db.HashGetAllAsync(sKey);
|
|
if (hashData.Length == 0) continue;
|
|
|
|
var dict = hashData.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
|
|
|
|
if (dict.TryGetValue("count", out var countStr) &&
|
|
dict.TryGetValue("totalMs", out var totalMsStr))
|
|
{
|
|
long count = long.Parse(countStr);
|
|
count = long.Parse(countStr);
|
|
double totalMs = double.Parse(totalMsStr, CultureInfo.InvariantCulture);
|
|
|
|
double maxMs = dict.ContainsKey("maxMs") ? double.Parse(dict["maxMs"], CultureInfo.InvariantCulture) : 0;
|
|
double minMs = dict.ContainsKey("minMs") ? double.Parse(dict["minMs"], CultureInfo.InvariantCulture) : 0;
|
|
|
|
// TUA CORREZIONE: Reset sentinella per evitare valori fuori scala nel DB
|
|
if (minMs >= SentinelValue) minMs = SentinelValue;
|
|
if (maxMs >= SentinelValue) maxMs = SentinelValue;
|
|
|
|
if (count <= 0) continue;
|
|
|
|
// recupero se presente noReply
|
|
long noReply = 0;
|
|
if (dict.TryGetValue("noReply", out var sNoReply))
|
|
{
|
|
noReply = long.Parse(sNoReply);
|
|
}
|
|
|
|
aggrRecordsToInsert.Add(new StatsAggregatedModel
|
|
{
|
|
Destination = meta.Dest,
|
|
MachineId = meta.MachId,
|
|
Hour = meta.Timestamp,
|
|
RequestCount = count,
|
|
AvgDuration = totalMs / count,
|
|
MinDuration = minMs,
|
|
MaxDuration = maxMs,
|
|
NoReply = noReply
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
batch.Execute();
|
|
}
|
|
|
|
// --- FASE UPSERT DB ---
|
|
if (aggrRecordsToInsert.Any())
|
|
{
|
|
await using var scope = _scopeFactory.CreateAsyncScope();
|
|
var aggrService = scope.ServiceProvider.GetRequiredService<IStatsAggrService>();
|
|
await aggrService.UpsertManyAsync(aggrRecordsToInsert, false);
|
|
Log.Info($"[DAY] Upserted {aggrRecordsToInsert.Count} records to DB");
|
|
}
|
|
|
|
// --- FASE PULIZIA REDIS ---
|
|
if (deleteConfirmed && keysToDelete.Any())
|
|
{
|
|
var batch = _db.CreateBatch();
|
|
int deletedCount = 0;
|
|
foreach (var key in keysToDelete)
|
|
{
|
|
_ = batch.KeyDeleteAsync(key);
|
|
deletedCount++;
|
|
}
|
|
batch.Execute();
|
|
Log.Info($"[CLEANUP DAY] Deleted {deletedCount} expired metric keys from Redis");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Processing dati orari (da Redis a DB)
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
private async Task ProcessHourLiveMetricsAsync()
|
|
{
|
|
var detailRecordsToInsert = new List<StatsDetailModel>();
|
|
var keysToDelete = new List<RedisKey>();
|
|
|
|
bool deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
|
|
DateTime now = DateTime.Now;
|
|
|
|
// Confini temporali per proteggere i dati in corso
|
|
DateTime currentHourStart = new DateTime(now.Year, now.Month, now.Day, now.Hour, 0, 0);
|
|
DateTime currentDayStart = new DateTime(now.Year, now.Month, now.Day, 0, 0, 0);
|
|
|
|
var endpoints = _mux.GetEndPoints();
|
|
|
|
foreach (var endpoint in endpoints)
|
|
{
|
|
var server = _mux.GetServer(endpoint);
|
|
|
|
if (server.IsReplica)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
string[] patternsToScan = {
|
|
$"{_redisBaseKey}:stats:hours:*"
|
|
};
|
|
|
|
var batch = _db.CreateBatch();
|
|
foreach (var pattern in patternsToScan)
|
|
{
|
|
// Nota: KeyScanAsync/KeysAsync e' disponibile su IServer
|
|
await foreach (var indexKey in server.KeysAsync(pattern: pattern))
|
|
{
|
|
if (string.IsNullOrEmpty($"{indexKey}")) continue;
|
|
|
|
// CORREZIONE: Utilizzo di SortedSetRangeByRankAsync con range 0 a -1 per prendere tutto il set
|
|
var memberKeys = await _db.SortedSetRangeByRankAsync(indexKey, 0, -1);
|
|
|
|
foreach (var statKey in memberKeys)
|
|
{
|
|
var sKey = (RedisKey)$"{statKey}";
|
|
if (!TryParseKeyMetadata(sKey, out var meta) || !meta.IsHourType) continue;
|
|
#if false
|
|
if (!TryParseKeyMetadata(sKey, out string dest, out string method, out string machId, out DateTime timestamp, out bool isHourType))
|
|
continue;
|
|
|
|
// Verifica se la chiave fosse scaduta rispetto all'orario corrente
|
|
bool isExpired = isHourType
|
|
? timestamp < currentHourStart
|
|
: false;
|
|
//: timestamp < currentDayStart;
|
|
#endif
|
|
|
|
// Se fosse scaduta e abbiamo il permesso, segnamola per la cancellazione
|
|
if (meta.Timestamp < currentHourStart && deleteConfirmed)
|
|
{
|
|
// 1. Segna la chiave Hash (Dati) per l'eliminazione
|
|
keysToDelete.Add(sKey);
|
|
|
|
// 2. CORREZIONE: Devi rimuovere il riferimento dal Sorted Set (Indice)
|
|
// Usiamo il batch per essere efficienti
|
|
_ = batch.SortedSetRemoveAsync(indexKey, statKey);
|
|
}
|
|
|
|
// Recupero dati dalla Hash
|
|
var hashData = await _db.HashGetAllAsync(sKey);
|
|
if (hashData.Length == 0) continue;
|
|
|
|
var dict = hashData.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
|
|
|
|
if (dict.TryGetValue("count", out var countStr) &&
|
|
dict.TryGetValue("totalMs", out var totalMsStr))
|
|
{
|
|
long count = long.Parse(countStr);
|
|
count = long.Parse(countStr);
|
|
double totalMs = double.Parse(totalMsStr, CultureInfo.InvariantCulture);
|
|
|
|
double maxMs = dict.ContainsKey("maxMs") ? double.Parse(dict["maxMs"], CultureInfo.InvariantCulture) : 0;
|
|
double minMs = dict.ContainsKey("minMs") ? double.Parse(dict["minMs"], CultureInfo.InvariantCulture) : 0;
|
|
|
|
// TUA CORREZIONE: Reset sentinella per evitare valori fuori scala nel DB
|
|
if (minMs >= SentinelValue) minMs = SentinelValue;
|
|
if (maxMs >= SentinelValue) maxMs = SentinelValue;
|
|
|
|
if (count <= 0) continue;
|
|
// recupero se presente noReply
|
|
long noReply = 0;
|
|
if (dict.TryGetValue("noReply", out var sNoReply))
|
|
{
|
|
noReply = long.Parse(sNoReply);
|
|
}
|
|
|
|
detailRecordsToInsert.Add(new StatsDetailModel
|
|
{
|
|
Destination = meta.Dest,
|
|
Type = meta.Method,
|
|
Hour = meta.Timestamp,
|
|
RequestCount = count,
|
|
AvgDuration = totalMs / count,
|
|
MinDuration = minMs,
|
|
MaxDuration = maxMs,
|
|
NoReply = noReply
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
batch.Execute();
|
|
}
|
|
|
|
// --- FASE UPSERT DB ---
|
|
if (detailRecordsToInsert.Count > 0)
|
|
{
|
|
await using var scope = _scopeFactory.CreateAsyncScope();
|
|
var detailService = scope.ServiceProvider.GetRequiredService<IStatsDetailService>();
|
|
await detailService.UpsertManyAsync(detailRecordsToInsert, false);
|
|
Log.Info($"[HOUR] Upserted {detailRecordsToInsert.Count} records to DB");
|
|
}
|
|
|
|
// --- FASE PULIZIA REDIS ---
|
|
if (deleteConfirmed && keysToDelete.Count > 0)
|
|
{
|
|
var batch = _db.CreateBatch();
|
|
int deletedCount = 0;
|
|
foreach (var key in keysToDelete)
|
|
{
|
|
_ = batch.KeyDeleteAsync(key);
|
|
deletedCount++;
|
|
}
|
|
batch.Execute();
|
|
Log.Info($"[CLEANUP HOUR] Deleted {deletedCount} expired metric keys from Redis");
|
|
}
|
|
}
|
|
private bool TryParseKeyMetadata(RedisKey key, out KeyMeta meta)
|
|
{
|
|
meta = new KeyMeta();
|
|
try
|
|
{
|
|
string k = key.ToString();
|
|
string relativeKey = k.Replace($"{_redisBaseKey}:", "");
|
|
var p = relativeKey.Split(':');
|
|
if (p.Length < 4) return false;
|
|
|
|
meta.IsHourType = p[1].Equals("hour", StringComparison.InvariantCultureIgnoreCase);
|
|
meta.Dest = p[2];
|
|
if (meta.IsHourType)
|
|
{
|
|
meta.Method = p[3];
|
|
if (p.Length >= 5) DateTime.TryParseExact(p[4], "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out meta.Timestamp);
|
|
}
|
|
else
|
|
{
|
|
meta.Method = "DAILY";
|
|
meta.MachId = p[3];
|
|
if (p.Length >= 5) DateTime.TryParseExact(p[4], "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out meta.Timestamp);
|
|
}
|
|
return meta.Timestamp != DateTime.MinValue;
|
|
}
|
|
catch { return false; }
|
|
}
|
|
|
|
#if false
|
|
private bool TryParseKeyMetadata(RedisKey key, out string dest, out string method, out string machId, out DateTime timestamp, out bool isHourType)
|
|
{
|
|
dest = "NA";
|
|
method = "NA";
|
|
machId = "ALL";
|
|
timestamp = DateTime.MinValue;
|
|
isHourType = true;
|
|
try
|
|
{
|
|
string k = key.ToString();
|
|
string relativeKey = k.Replace($"{_redisBaseKey}:", "");
|
|
var parts = relativeKey.Split(':');
|
|
|
|
if (parts.Length < 4) return false;
|
|
|
|
string type = parts[1]; // "hour" o "day"
|
|
dest = parts[2];
|
|
|
|
if (type == "hour")
|
|
{
|
|
isHourType = true;
|
|
method = parts[3];
|
|
if (parts.Length >= 5 && DateTime.TryParseExact(parts[4], "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dt))
|
|
{
|
|
timestamp = dt;
|
|
return true;
|
|
}
|
|
}
|
|
else if (type == "day")
|
|
{
|
|
isHourType = false;
|
|
method = "DAILY";
|
|
string rawDate = "";
|
|
if (parts.Length >= 5)
|
|
{
|
|
machId = parts[3];
|
|
rawDate = parts[4];
|
|
}
|
|
else
|
|
{
|
|
rawDate = parts[3];
|
|
}
|
|
if (DateTime.TryParseExact(rawDate, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dt))
|
|
{
|
|
timestamp = dt;
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
catch { }
|
|
return false;
|
|
}
|
|
#endif
|
|
|
|
private record KeyMeta
|
|
{
|
|
public string Dest = "NA";
|
|
public string Method = "NA";
|
|
public string MachId = "ALL";
|
|
public DateTime Timestamp = DateTime.MinValue;
|
|
public bool IsHourType = true;
|
|
}
|
|
|
|
#endregion Private Methods
|
|
}
|
|
}
|