using MP.Data.DbModels.Utils; using MP.Data.Services.Utils; using NLog; using StackExchange.Redis; using System.Globalization; namespace MP.RIOC.Services { public class MetricsDbFlushService : BackgroundService { #region Public Constructors public MetricsDbFlushService( IServiceScopeFactory scopeFactory, IConfiguration config, IConnectionMultiplexer mux) { _scopeFactory = scopeFactory; _config = config; _mux = mux; _db = mux.GetDatabase(); } #endregion Public Constructors #region Protected Methods protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var interval = _config.GetValue("RouteMan:MetricFlushIntervalSeconds", 240); while (!stoppingToken.IsCancellationRequested) { try { await ProcessDayLiveMetricsAsync(); await ProcessHourLiveMetricsAsync(); await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken); } catch (TaskCanceledException) { break; } catch (Exception ex) { Log.Error(ex, "Error flushing metrics to database"); } } } #endregion Protected Methods #region Private Fields private const double SentinelValue = 999999999; private static readonly Logger Log = LogManager.GetCurrentClassLogger(); private readonly IConfiguration _config; private readonly IDatabase _db; private readonly IConnectionMultiplexer _mux; private readonly IServiceScopeFactory _scopeFactory; #endregion Private Fields #region Private Properties private string _redisBaseKey => _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; #endregion Private Properties #region Private Methods /// /// Processing dati giornalieri (da Redis a DB) /// /// private async Task ProcessDayLiveMetricsAsync() { var aggrRecordsToInsert = new List(); var keysToDelete = new List(); bool deleteConfirmed = _config.GetValue("RouteMan:DeleteExpiredMetrics", false); DateTime now = DateTime.Now; // Confini temporali per proteggere i dati in corso DateTime currentDayStart = DateTime.Today; var endpoints = _mux.GetEndPoints(); foreach (var endpoint in endpoints) { var server = _mux.GetServer(endpoint); if (server.IsReplica) { continue; } string[] patternsToScan = { $"{_redisBaseKey}:stats:days:*" }; var batch = _db.CreateBatch(); foreach (var pattern in patternsToScan) { // Nota: KeyScanAsync/KeysAsync e' disponibile su IServer await foreach (var indexKey in server.KeysAsync(pattern: pattern)) { if (string.IsNullOrEmpty($"{indexKey}")) continue; // CORREZIONE: Utilizzo di SortedSetRangeByRankAsync con range 0 a -1 per prendere tutto il set var memberKeys = await _db.SortedSetRangeByRankAsync(indexKey, 0, -1); foreach (var statKey in memberKeys) { var sKey = (RedisKey)$"{statKey}"; #if false if (!TryParseKeyMetadata(sKey, out string dest, out string method, out string machId, out DateTime timestamp, out bool isHourType)) continue; #endif if (!TryParseKeyMetadata(sKey, out var meta) || meta.IsHourType) continue; // Se fosse scaduta e abbiamo il permesso, segnamola per la cancellazione if (meta.Timestamp < currentDayStart && deleteConfirmed) { // 1. Segna la chiave Hash (Dati) per l'eliminazione keysToDelete.Add(sKey); // 2. CORREZIONE: Devi rimuovere il riferimento dal Sorted Set (Indice) // Usiamo il batch per essere efficienti _ = batch.SortedSetRemoveAsync(indexKey, statKey); } // Recupero dati dalla Hash var hashData = await _db.HashGetAllAsync(sKey); if (hashData.Length == 0) continue; var dict = hashData.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString()); if (dict.TryGetValue("count", out var countStr) && dict.TryGetValue("totalMs", out var totalMsStr)) { long count = long.Parse(countStr); count = long.Parse(countStr); double totalMs = double.Parse(totalMsStr, CultureInfo.InvariantCulture); double maxMs = dict.ContainsKey("maxMs") ? double.Parse(dict["maxMs"], CultureInfo.InvariantCulture) : 0; double minMs = dict.ContainsKey("minMs") ? double.Parse(dict["minMs"], CultureInfo.InvariantCulture) : 0; // TUA CORREZIONE: Reset sentinella per evitare valori fuori scala nel DB if (minMs >= SentinelValue) minMs = SentinelValue; if (maxMs >= SentinelValue) maxMs = SentinelValue; if (count <= 0) continue; // recupero se presente noReply long noReply = 0; if (dict.TryGetValue("noReply", out var sNoReply)) { noReply = long.Parse(sNoReply); } aggrRecordsToInsert.Add(new StatsAggregatedModel { Destination = meta.Dest, MachineId = meta.MachId, Hour = meta.Timestamp, RequestCount = count, AvgDuration = totalMs / count, MinDuration = minMs, MaxDuration = maxMs, NoReply = noReply }); } } } } batch.Execute(); } // --- FASE UPSERT DB --- if (aggrRecordsToInsert.Any()) { await using var scope = _scopeFactory.CreateAsyncScope(); var aggrService = scope.ServiceProvider.GetRequiredService(); await aggrService.UpsertManyAsync(aggrRecordsToInsert, false); Log.Info($"[DAY] Upserted {aggrRecordsToInsert.Count} records to DB"); } // --- FASE PULIZIA REDIS --- if (deleteConfirmed && keysToDelete.Any()) { var batch = _db.CreateBatch(); int deletedCount = 0; foreach (var key in keysToDelete) { _ = batch.KeyDeleteAsync(key); deletedCount++; } batch.Execute(); Log.Info($"[CLEANUP DAY] Deleted {deletedCount} expired metric keys from Redis"); } } /// /// Processing dati orari (da Redis a DB) /// /// private async Task ProcessHourLiveMetricsAsync() { var detailRecordsToInsert = new List(); var keysToDelete = new List(); bool deleteConfirmed = _config.GetValue("RouteMan:DeleteExpiredMetrics", false); DateTime now = DateTime.Now; // Confini temporali per proteggere i dati in corso DateTime currentHourStart = new DateTime(now.Year, now.Month, now.Day, now.Hour, 0, 0); DateTime currentDayStart = new DateTime(now.Year, now.Month, now.Day, 0, 0, 0); var endpoints = _mux.GetEndPoints(); foreach (var endpoint in endpoints) { var server = _mux.GetServer(endpoint); if (server.IsReplica) { continue; } string[] patternsToScan = { $"{_redisBaseKey}:stats:hours:*" }; var batch = _db.CreateBatch(); foreach (var pattern in patternsToScan) { // Nota: KeyScanAsync/KeysAsync e' disponibile su IServer await foreach (var indexKey in server.KeysAsync(pattern: pattern)) { if (string.IsNullOrEmpty($"{indexKey}")) continue; // CORREZIONE: Utilizzo di SortedSetRangeByRankAsync con range 0 a -1 per prendere tutto il set var memberKeys = await _db.SortedSetRangeByRankAsync(indexKey, 0, -1); foreach (var statKey in memberKeys) { var sKey = (RedisKey)$"{statKey}"; if (!TryParseKeyMetadata(sKey, out var meta) || !meta.IsHourType) continue; #if false if (!TryParseKeyMetadata(sKey, out string dest, out string method, out string machId, out DateTime timestamp, out bool isHourType)) continue; // Verifica se la chiave fosse scaduta rispetto all'orario corrente bool isExpired = isHourType ? timestamp < currentHourStart : false; //: timestamp < currentDayStart; #endif // Se fosse scaduta e abbiamo il permesso, segnamola per la cancellazione if (meta.Timestamp < currentHourStart && deleteConfirmed) { // 1. Segna la chiave Hash (Dati) per l'eliminazione keysToDelete.Add(sKey); // 2. CORREZIONE: Devi rimuovere il riferimento dal Sorted Set (Indice) // Usiamo il batch per essere efficienti _ = batch.SortedSetRemoveAsync(indexKey, statKey); } // Recupero dati dalla Hash var hashData = await _db.HashGetAllAsync(sKey); if (hashData.Length == 0) continue; var dict = hashData.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString()); if (dict.TryGetValue("count", out var countStr) && dict.TryGetValue("totalMs", out var totalMsStr)) { long count = long.Parse(countStr); count = long.Parse(countStr); double totalMs = double.Parse(totalMsStr, CultureInfo.InvariantCulture); double maxMs = dict.ContainsKey("maxMs") ? double.Parse(dict["maxMs"], CultureInfo.InvariantCulture) : 0; double minMs = dict.ContainsKey("minMs") ? double.Parse(dict["minMs"], CultureInfo.InvariantCulture) : 0; // TUA CORREZIONE: Reset sentinella per evitare valori fuori scala nel DB if (minMs >= SentinelValue) minMs = SentinelValue; if (maxMs >= SentinelValue) maxMs = SentinelValue; if (count <= 0) continue; // recupero se presente noReply long noReply = 0; if (dict.TryGetValue("noReply", out var sNoReply)) { noReply = long.Parse(sNoReply); } detailRecordsToInsert.Add(new StatsDetailModel { Destination = meta.Dest, Type = meta.Method, Hour = meta.Timestamp, RequestCount = count, AvgDuration = totalMs / count, MinDuration = minMs, MaxDuration = maxMs, NoReply = noReply }); } } } } batch.Execute(); } // --- FASE UPSERT DB --- if (detailRecordsToInsert.Count > 0) { await using var scope = _scopeFactory.CreateAsyncScope(); var detailService = scope.ServiceProvider.GetRequiredService(); await detailService.UpsertManyAsync(detailRecordsToInsert, false); Log.Info($"[HOUR] Upserted {detailRecordsToInsert.Count} records to DB"); } // --- FASE PULIZIA REDIS --- if (deleteConfirmed && keysToDelete.Count > 0) { var batch = _db.CreateBatch(); int deletedCount = 0; foreach (var key in keysToDelete) { _ = batch.KeyDeleteAsync(key); deletedCount++; } batch.Execute(); Log.Info($"[CLEANUP HOUR] Deleted {deletedCount} expired metric keys from Redis"); } } private bool TryParseKeyMetadata(RedisKey key, out KeyMeta meta) { meta = new KeyMeta(); try { string k = key.ToString(); string relativeKey = k.Replace($"{_redisBaseKey}:", ""); var p = relativeKey.Split(':'); if (p.Length < 4) return false; meta.IsHourType = p[1].Equals("hour", StringComparison.InvariantCultureIgnoreCase); meta.Dest = p[2]; if (meta.IsHourType) { meta.Method = p[3]; if (p.Length >= 5) DateTime.TryParseExact(p[4], "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out meta.Timestamp); } else { meta.Method = "DAILY"; meta.MachId = p[3]; if (p.Length >= 5) DateTime.TryParseExact(p[4], "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out meta.Timestamp); } return meta.Timestamp != DateTime.MinValue; } catch { return false; } } #if false private bool TryParseKeyMetadata(RedisKey key, out string dest, out string method, out string machId, out DateTime timestamp, out bool isHourType) { dest = "NA"; method = "NA"; machId = "ALL"; timestamp = DateTime.MinValue; isHourType = true; try { string k = key.ToString(); string relativeKey = k.Replace($"{_redisBaseKey}:", ""); var parts = relativeKey.Split(':'); if (parts.Length < 4) return false; string type = parts[1]; // "hour" o "day" dest = parts[2]; if (type == "hour") { isHourType = true; method = parts[3]; if (parts.Length >= 5 && DateTime.TryParseExact(parts[4], "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dt)) { timestamp = dt; return true; } } else if (type == "day") { isHourType = false; method = "DAILY"; string rawDate = ""; if (parts.Length >= 5) { machId = parts[3]; rawDate = parts[4]; } else { rawDate = parts[3]; } if (DateTime.TryParseExact(rawDate, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dt)) { timestamp = dt; return true; } } } catch { } return false; } #endif private record KeyMeta { public string Dest = "NA"; public string Method = "NA"; public string MachId = "ALL"; public DateTime Timestamp = DateTime.MinValue; public bool IsHourType = true; } #endregion Private Methods } }