using MP.Data.DbModels.Utils; using MP.Data.Services.Utils; using NLog; using StackExchange.Redis; using System.Globalization; namespace MP.IOC.Services { public class MetricsDbFlushService : BackgroundService { #region Public Constructors public MetricsDbFlushService( IServiceScopeFactory scopeFactory, IConfiguration config, IConnectionMultiplexer mux) { _scopeFactory = scopeFactory; _config = config; _mux = mux; _db = mux.GetDatabase(); } #endregion Public Constructors #region Protected Methods protected override async Task ExecuteAsync(CancellationToken stoppingToken) { //var interval = _config.GetValue("RouteMan:MetricFlushIntervalSeconds", 120); var interval = _config.GetValue("RouteMan:MetricFlushIntervalSeconds", 300); while (!stoppingToken.IsCancellationRequested) { try { await ProcessDayLiveMetricsAsync(); await ProcessHourLiveMetricsAsync(); await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken); } catch (TaskCanceledException) { break; } catch (Exception ex) { Log.Error(ex, "Error flushing metrics to database"); } } } #endregion Protected Methods #region Private Fields private const double SentinelValue = 999999999; private static readonly Logger Log = LogManager.GetCurrentClassLogger(); private readonly IConfiguration _config; private readonly IDatabase _db; private readonly IConnectionMultiplexer _mux; private readonly IServiceScopeFactory _scopeFactory; #endregion Private Fields #region Private Properties private string _redisBaseKey => _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; #endregion Private Properties #region Private Methods /// /// Processing dati giornalieri (da Redis a DB) /// /// private async Task ProcessDayLiveMetricsAsync() { var aggrRecordsToInsert = new List(); var keysToDelete = new List(); bool deleteConfirmed = _config.GetValue("RouteMan:DeleteExpiredMetrics", false); DateTime now = DateTime.Now; // Confini temporali per proteggere i dati in corso DateTime currentDayStart = new DateTime(now.Year, now.Month, now.Day, 0, 0, 0); var endpoints = _mux.GetEndPoints(); foreach (var endpoint in endpoints) { var server = _mux.GetServer(endpoint); if (server.IsReplica) { continue; } string[] patternsToScan = { $"{_redisBaseKey}:stats:days:*" }; foreach (var pattern in patternsToScan) { // Nota: KeyScanAsync/KeysAsync e' disponibile su IServer await foreach (var indexKey in server.KeysAsync(pattern: pattern)) { if (string.IsNullOrEmpty($"{indexKey}")) continue; // CORREZIONE: Utilizzo di SortedSetRangeByRankAsync con range 0 a -1 per prendere tutto il set var memberKeys = await _db.SortedSetRangeByRankAsync(indexKey, 0, -1); foreach (var statKey in memberKeys) { var sKey = (RedisKey)$"{statKey}"; if (!TryParseKeyMetadata(sKey, out string dest, out string method, out DateTime timestamp, out bool isHourType)) continue; // Verifica se la chiave fosse scaduta rispetto all'orario corrente bool isExpired = isHourType //? timestamp < currentHourStart ? false : timestamp < currentDayStart; // Se fosse scaduta e abbiamo il permesso, segnamola per la cancellazione if (isExpired && deleteConfirmed) { keysToDelete.Add(sKey); } // Recupero dati dalla Hash var hashData = await _db.HashGetAllAsync(sKey); if (hashData.Length == 0) continue; var dict = hashData.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString()); if (dict.TryGetValue("count", out var countStr) && dict.TryGetValue("totalMs", out var totalMsStr)) { long count = long.Parse(countStr); count = long.Parse(countStr); double totalMs = double.Parse(totalMsStr, CultureInfo.InvariantCulture); double maxMs = dict.ContainsKey("maxMs") ? double.Parse(dict["maxMs"], CultureInfo.InvariantCulture) : 0; double minMs = dict.ContainsKey("minMs") ? double.Parse(dict["minMs"], CultureInfo.InvariantCulture) : 0; // TUA CORREZIONE: Reset sentinella per evitare valori fuori scala nel DB if (minMs >= SentinelValue) minMs = SentinelValue; if (maxMs >= SentinelValue) maxMs = SentinelValue; if (count <= 0) continue; aggrRecordsToInsert.Add(new StatsAggregatedModel { Destination = dest, Hour = timestamp, RequestCount = count, AvgDuration = totalMs / count, MinDuration = minMs, MaxDuration = maxMs, NoReply = 0 }); } } } } } // --- FASE UPSERT DB --- if (aggrRecordsToInsert.Count > 0) { await using var scope = _scopeFactory.CreateAsyncScope(); var aggrService = scope.ServiceProvider.GetRequiredService(); await aggrService.UpsertManyAsync(aggrRecordsToInsert, false); Log.Info($"[DAY] Upserted {aggrRecordsToInsert.Count} records to DB"); } // --- FASE PULIZIA REDIS --- if (deleteConfirmed && keysToDelete.Count > 0) { var batch = _db.CreateBatch(); int deletedCount = 0; foreach (var key in keysToDelete) { _ = batch.KeyDeleteAsync(key); deletedCount++; } batch.Execute(); Log.Info($"[CLEANUP DAY] Deleted {deletedCount} expired metric keys from Redis"); } } /// /// Processing dati orari (da Redis a DB) /// /// private async Task ProcessHourLiveMetricsAsync() { var detailRecordsToInsert = new List(); var keysToDelete = new List(); bool deleteConfirmed = _config.GetValue("RouteMan:DeleteExpiredMetrics", false); DateTime now = DateTime.Now; // Confini temporali per proteggere i dati in corso DateTime currentHourStart = new DateTime(now.Year, now.Month, now.Day, now.Hour, 0, 0); DateTime currentDayStart = new DateTime(now.Year, now.Month, now.Day, 0, 0, 0); var endpoints = _mux.GetEndPoints(); foreach (var endpoint in endpoints) { var server = _mux.GetServer(endpoint); if (server.IsReplica) { continue; } string[] patternsToScan = { $"{_redisBaseKey}:stats:hours:*" }; foreach (var pattern in patternsToScan) { // Nota: KeyScanAsync/KeysAsync e' disponibile su IServer await foreach (var indexKey in server.KeysAsync(pattern: pattern)) { if (string.IsNullOrEmpty($"{indexKey}")) continue; // CORREZIONE: Utilizzo di SortedSetRangeByRankAsync con range 0 a -1 per prendere tutto il set var memberKeys = await _db.SortedSetRangeByRankAsync(indexKey, 0, -1); foreach (var statKey in memberKeys) { var sKey = (RedisKey)$"{statKey}"; if (!TryParseKeyMetadata(sKey, out string dest, out string method, out DateTime timestamp, out bool isHourType)) continue; // Verifica se la chiave fosse scaduta rispetto all'orario corrente bool isExpired = isHourType ? timestamp < currentHourStart : false; //: timestamp < currentDayStart; // Se fosse scaduta e abbiamo il permesso, segnamola per la cancellazione if (isExpired && deleteConfirmed) { keysToDelete.Add(sKey); } // Recupero dati dalla Hash var hashData = await _db.HashGetAllAsync(sKey); if (hashData.Length == 0) continue; var dict = hashData.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString()); if (dict.TryGetValue("count", out var countStr) && dict.TryGetValue("totalMs", out var totalMsStr)) { long count = long.Parse(countStr); count = long.Parse(countStr); double totalMs = double.Parse(totalMsStr, CultureInfo.InvariantCulture); double maxMs = dict.ContainsKey("maxMs") ? double.Parse(dict["maxMs"], CultureInfo.InvariantCulture) : 0; double minMs = dict.ContainsKey("minMs") ? double.Parse(dict["minMs"], CultureInfo.InvariantCulture) : 0; // TUA CORREZIONE: Reset sentinella per evitare valori fuori scala nel DB if (minMs >= SentinelValue) minMs = SentinelValue; if (maxMs >= SentinelValue) maxMs = SentinelValue; if (count <= 0) continue; detailRecordsToInsert.Add(new StatsDetailModel { Destination = dest, Type = method, Hour = timestamp, RequestCount = count, AvgDuration = totalMs / count, MinDuration = minMs, MaxDuration = maxMs, NoReply = 0 }); } } } } } // --- FASE UPSERT DB --- if (detailRecordsToInsert.Count > 0) { await using var scope = _scopeFactory.CreateAsyncScope(); var detailService = scope.ServiceProvider.GetRequiredService(); await detailService.UpsertManyAsync(detailRecordsToInsert, false); Log.Info($"[HOUR] Upserted {detailRecordsToInsert.Count} records to DB"); } // --- FASE PULIZIA REDIS --- if (deleteConfirmed && keysToDelete.Count > 0) { var batch = _db.CreateBatch(); int deletedCount = 0; foreach (var key in keysToDelete) { _ = batch.KeyDeleteAsync(key); deletedCount++; } batch.Execute(); Log.Info($"[CLEANUP HOUR] Deleted {deletedCount} expired metric keys from Redis"); } } private bool TryParseKeyMetadata(RedisKey key, out string dest, out string method, out DateTime timestamp, out bool isHourType) { dest = "NA"; method = "NA"; timestamp = DateTime.MinValue; isHourType = true; try { string k = key.ToString(); string relativeKey = k.Replace($"{_redisBaseKey}:", ""); var parts = relativeKey.Split(':'); if (parts.Length < 4) return false; string type = parts[1]; // "hour" o "day" dest = parts[2]; if (type == "hour") { isHourType = true; method = parts[3]; if (parts.Length >= 5 && DateTime.TryParseExact(parts[4], "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dt)) { timestamp = dt; return true; } } else if (type == "day") { isHourType = false; method = "DAILY"; if (DateTime.TryParseExact(parts[3], "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dt)) { timestamp = dt; return true; } } } catch { } return false; } #endregion Private Methods } }