using MP.Data.DbModels.Utils; using MP.Data.Services.Utils; using NLog; using StackExchange.Redis; using System.Globalization; namespace MP.IOC.Services { public class MetricsDbFlushService : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private readonly RouteStatsManager _stats; private readonly IConfiguration _config; private readonly IDatabase _db; private readonly IConnectionMultiplexer _mux; private static readonly Logger Log = LogManager.GetCurrentClassLogger(); public MetricsDbFlushService( RouteStatsManager stats, IServiceScopeFactory scopeFactory, IConfiguration config, IConnectionMultiplexer mux) { _stats = stats; _scopeFactory = scopeFactory; _config = config; _mux = mux; _db = mux.GetDatabase(); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var interval = _config.GetValue("RouteMan:MetricFlushIntervalSeconds", 120); while (!stoppingToken.IsCancellationRequested) { try { await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken); await FlushLiveMetricsAsync(); } catch (TaskCanceledException) { break; } catch (Exception ex) { Log.Error(ex, "Error flushing metrics to database"); } } } public async Task RecoverHistoricalMetricsAsync() { try { var deleteConfirmed = _config.GetValue("RouteMan:DeleteExpiredMetrics", false); Log.Info("Starting historical metrics recovery from Redis..."); var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; var yesterday = DateTime.Now.AddDays(-1).Date; await RecoverHistoricalDailyAsync(baseKey, yesterday, deleteConfirmed); await RecoverHistoricalHourlyAsync(baseKey, yesterday, deleteConfirmed); Log.Info("Historical metrics recovery completed"); } catch (Exception ex) { Log.Error(ex, "Error recovering historical metrics from Redis"); } } private async Task RecoverHistoricalHourlyAsync(string baseKey, DateTime cutoff, bool deleteConfirmed) { try { var detailRecordsToInsert = new List(); var allHourIndexKeys = new List(); // Get all hours index keys by pattern matching on existing indices var dests = await GetAllDestinationsAsync(baseKey); foreach (var dest in dests) { var hourIndexKey = $"{baseKey}:stats:hours:{dest}:*"; try { var allKeys = await _db.KeysAsync(hourIndexKey, 0, -1); foreach (var key in allKeys) { if (!key.IsNullOrEmpty) { allHourIndexKeys.Add(key.ToString()); // Read bucket keys from sorted set index var bucketKeys = await _db.SortedSetRangeByRankAsync(key, 0, -1); foreach (var bucketKeyRedis in bucketKeys) { if (bucketKeyRedis.IsNullOrEmpty) continue; var bucketStr = bucketKeyRedis.ToString(); var parts = bucketStr.Split(':'); if (parts.Length < 7) continue; // Parse date from key: {base}:stats:hour:{dest}:{method}:{yyyyMMddHH} var dayHourPart = string.Join(":", parts.Skip(6)); if (!DateTime.TryParseExact(dayHourPart, "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out var hourFromKey)) continue; if (hourFromKey > cutoff) continue; // Read hash data from bucket var hashData = await _db.HashGetAllAsync(bucketKeyRedis); if (hashData.Length == 0) continue; var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count"); var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs"); var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs"); var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs"); if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue; var count = long.TryParse(countVal.Value.ToString(), out var cp) ? cp : 0; var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var tp) ? tp : 0; var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var mp) ? mp : 0; var minMs = double.TryParse(minMsVal.Value.ToString(), out var mip) ? mip : 0; if (count == 0) continue; var avgDuration = totalMs / count; detailRecordsToInsert.Add(new StatsDetailModel { Destination = dest, Type = parts[5], Hour = hourFromKey.Date.AddHours(hourFromKey.Hour), RequestCount = count, AvgDuration = avgDuration, MinDuration = minMs, MaxDuration = maxMs, NoReply = 0 }); } } } } catch (Exception ex) { Log.Error(ex, "Error reading hourly index for dest {dest}", dest); } } if (detailRecordsToInsert.Count > 0) { await using var scope = _scopeFactory.CreateAsyncScope(); var detailService = scope.ServiceProvider.GetRequiredService(); await detailService.UpsertManyAsync(detailRecordsToInsert, false); Log.Info("[HISTORICAL] Recovered {count} hourly detail records from Redis", detailRecordsToInsert.Count); var sampleCount = Math.Min(5, detailRecordsToInsert.Count); for (int i = 0; i < sampleCount; i++) { var r = detailRecordsToInsert[i]; Log.Info("[HISTORICAL SAMPLE] Env={env}, Type={type}, Hour={hour}, Count={count}, Avg={avg:F2}ms", r.Destination, r.Type, r.Hour.ToString("yyyy-MM-dd HH:mm"), r.RequestCount, r.AvgDuration); } if (deleteConfirmed) { await DeleteExpiredHourlyKeysAsync(allHourIndexKeys); } } } catch (Exception ex) { Log.Error(ex, "Error recovering historical hourly data from Redis"); } } private async Task RecoverHistoricalDailyAsync(string baseKey, DateTime cutoff, bool deleteConfirmed) { try { var cursor = 0L; var pattern = $"{baseKey}:stats:days:*"; var allDayIndices = new List(); do { var result = await _db.KeyScanAsync(pattern, cursor); allDayIndices.AddRange(result.Keys.Where(k => k != null)); cursor = result.Cursor; } while (cursor > 0 && allDayIndices.Count < 10000); // Group hourly buckets by destination and day to compute proper min/max var dailyStatsByDest = new Dictionary<(string dest, DateTime day), (long totalCount, double totalMs, double minMs, double maxMs)>(); foreach (var daysIndexKey in allDayIndices) { var keyStr = daysIndexKey.ToString(); var parts = keyStr.Split(':'); if (parts.Length < 5) continue; var dest = parts[4]; var dayPart = string.Join(":", parts.Skip(5).TakeWhile(p => DateTime.TryParseExact(p, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out _))); if (!DateTime.TryParseExact(dayPart, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dayFromKey)) continue; if (dayFromKey.Date > cutoff) continue; var allBuckets = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1); foreach (var bucketKey in allBuckets) { if (!bucketKey.HasValue) continue; var hashData = await _db.HashGetAllAsync(bucketKey); if (hashData.Length == 0) continue; var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count"); var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs"); var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs"); var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs"); if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue; var count = long.TryParse(countVal.Value.ToString(), out var countParsed) ? countParsed : 0; var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMsParsed) ? totalMsParsed : 0; var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMsParsed) ? maxMsParsed : 0; var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMsParsed) ? minMsParsed : 0; if (count == 0) continue; var key = (dest, dayFromKey.Date); if (!dailyStatsByDest.ContainsKey(key)) { dailyStatsByDest[key] = (0, 0, double.MaxValue, double.MinValue); } var current = dailyStatsByDest[key]; dailyStatsByDest[key] = ( totalCount: current.totalCount + count, totalMs: current.totalMs + totalMs, minMs: Math.Min(current.minMs, minMs), maxMs: Math.Max(current.maxMs, maxMs) ); } } var aggrRecordsToInsert = new List(); foreach (var kvp in dailyStatsByDest) { var key = kvp.Key; var stats = kvp.Value; if (stats.totalCount == 0) continue; var avgDuration = stats.totalMs / stats.totalCount; // Handle edge cases where no valid min/max found var finalMin = stats.minMs == double.MaxValue ? 0 : stats.minMs; var finalMax = stats.maxMs == double.MinValue ? 0 : stats.maxMs; aggrRecordsToInsert.Add(new StatsAggregatedModel { Destination = key.dest, Hour = key.day, RequestCount = stats.totalCount, AvgDuration = avgDuration, MinDuration = finalMin, MaxDuration = finalMax, NoReply = 0 }); } if (aggrRecordsToInsert.Count > 0) { await using var scope = _scopeFactory.CreateAsyncScope(); var aggrService = scope.ServiceProvider.GetRequiredService(); await aggrService.UpsertManyAsync(aggrRecordsToInsert, false); Log.Info("[HISTORICAL] Recovered {count} daily aggregated records from Redis", aggrRecordsToInsert.Count); var sampleCount = Math.Min(5, aggrRecordsToInsert.Count); for (int i = 0; i < sampleCount; i++) { var r = aggrRecordsToInsert[i]; Log.Info("[HISTORICAL SAMPLE] Env={env}, Date={date}, Count={count}, Avg={avg:F2}ms", r.Destination, r.Hour.ToString("yyyy-MM-dd"), r.RequestCount, r.AvgDuration); } if (deleteConfirmed) { await DeleteExpiredDailyKeysAsync(allDayIndices); } } } catch (Exception ex) { Log.Error(ex, "Error recovering historical daily data from Redis"); } } private async Task DeleteExpiredHourlyKeysAsync(List allHourIndices) { try { var keysToDelete = new List(); foreach (var hoursIndexKey in allHourIndices) { var allBuckets = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1); keysToDelete.AddRange(allBuckets.Select(k => k.ToString()).Where(s => !string.IsNullOrEmpty(s))); } if (keysToDelete.Count > 0) { var uniqueKeys = keysToDelete.Distinct().ToList(); var deletedCount = await DeleteKeysAsync(uniqueKeys); Log.Info("[DELETE] Removed {count} expired hourly bucket keys from Redis", deletedCount); } } catch (Exception ex) { Log.Error(ex, "Error deleting expired hourly keys from Redis"); } } private async Task DeleteExpiredDailyKeysAsync(List allDayIndices) { try { var keysToDelete = new List(); foreach (var daysIndexKey in allDayIndices) { var allBuckets = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1); keysToDelete.AddRange(allBuckets.Select(k => k.ToString()).Where(s => !string.IsNullOrEmpty(s))); } if (keysToDelete.Count > 0) { var uniqueKeys = keysToDelete.Distinct().ToList(); var deletedCount = await DeleteKeysAsync(uniqueKeys); Log.Info("[DELETE] Removed {count} expired daily bucket keys from Redis", deletedCount); } } catch (Exception ex) { Log.Error(ex, "Error deleting expired daily keys from Redis"); } } public async Task FlushLiveMetricsAsync() { try { var utcNow = DateTime.Now; var hourStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, utcNow.Hour, 0, 0); await FlushCurrentHourlyDataAsync(utcNow, hourStart); } catch (Exception ex) { Log.Error(ex, "Error flushing live metrics from Redis"); } } private async Task FlushCurrentHourlyDataAsync(DateTime utcNow, DateTime hourStart) { try { var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; var cursor = 0L; var pattern = $"{baseKey}:stats:hours:*:{hourStart.ToString("yyyyMMddHH")}"; var matchingKeys = new List(); do { var result = await _db.KeyScanAsync(pattern, cursor); matchingKeys.AddRange(result.Keys.Where(k => k != null)); cursor = result.Cursor; } while (cursor > 0 && matchingKeys.Count < 10000); if (matchingKeys.Count == 0) { Log.Debug("No current hourly buckets found in Redis for flush"); return; } var detailRecords = new List(); foreach (var bucketKey in matchingKeys) { var keyStr = bucketKey.ToString(); var parts = keyStr.Split(':'); if (parts.Length < 7) continue; var dest = parts[4]; var method = parts[5]; try { var hashData = await _db.HashGetAllAsync(bucketKey); if (hashData.Length == 0) continue; var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count"); var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs"); var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs"); var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs"); if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue; var count = long.TryParse(countVal.Value.ToString(), out var countParsed) ? countParsed : 0; var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMsParsed) ? totalMsParsed : 0; var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMsParsed) ? maxMsParsed : 0; var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMsParsed) ? minMsParsed : 0; if (count == 0) continue; var avgDuration = totalMs / count; detailRecords.Add(new StatsDetailModel { Destination = dest, Type = method, Hour = hourStart, RequestCount = count, AvgDuration = avgDuration, MinDuration = minMs, MaxDuration = maxMs, NoReply = 0 }); } catch (Exception ex) { Log.Error(ex, "Error processing hourly bucket key {key}", keyStr); } } if (detailRecords.Count > 0) { await using var scope = _scopeFactory.CreateAsyncScope(); var detailService = scope.ServiceProvider.GetRequiredService(); await detailService.UpsertManyAsync(detailRecords, false); Log.Info("[LIVE] Flushed {count} current hourly detail records from Redis", detailRecords.Count); var sampleCount = Math.Min(10, detailRecords.Count); for (int i = 0; i < sampleCount; i++) { var r = detailRecords[i]; Log.Info("[LIVE SAMPLE] Env={env}, Type={type}, Hour={hour}, Count={count}, Avg={avg:F2}ms, Min={min:F2}ms, Max={max:F2}ms", r.Destination, r.Type, r.Hour.ToString("yyyy-MM-dd HH:mm"), r.RequestCount, r.AvgDuration, r.MinDuration, r.MaxDuration); } await DeleteCurrentHourlyBucketKeysAsync(matchingKeys); } } catch (Exception ex) { Log.Error(ex, "Error flushing current hourly data from Redis"); } } private async Task FlushCurrentDailyDataAsync(DateTime utcNow, DateTime dayStart) { try { var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; var cursor = 0L; var pattern = $"{baseKey}:stats:days:*:{dayStart.ToString("yyyyMMdd")}"; var matchingKeys = new List(); do { var result = await _db.KeyScanAsync(pattern, cursor); matchingKeys.AddRange(result.Keys.Where(k => k != null)); cursor = result.Cursor; } while (cursor > 0 && matchingKeys.Count < 10000); if (matchingKeys.Count == 0) { Log.Debug("No current daily buckets found in Redis for flush"); return; } var aggrRecords = new List(); foreach (var bucketKey in matchingKeys) { var keyStr = bucketKey.ToString(); var parts = keyStr.Split(':'); if (parts.Length < 6) continue; var dest = parts[4]; try { var hashData = await _db.HashGetAllAsync(bucketKey); if (hashData.Length == 0) continue; var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count"); var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs"); var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs"); var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs"); if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue; var count = long.TryParse(countVal.Value.ToString(), out var countParsed) ? countParsed : 0; var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMsParsed) ? totalMsParsed : 0; var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMsParsed) ? maxMsParsed : 0; var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMsParsed) ? minMsParsed : 0; if (count == 0) continue; var avgDuration = totalMs / count; aggrRecords.Add(new StatsAggregatedModel { Destination = dest, Hour = dayStart.Date, RequestCount = count, AvgDuration = avgDuration, MinDuration = minMs, MaxDuration = maxMs, NoReply = 0 }); } catch (Exception ex) { Log.Error(ex, "Error processing daily bucket key {key}", keyStr); } } if (aggrRecords.Count > 0) { await using var scope = _scopeFactory.CreateAsyncScope(); var aggrService = scope.ServiceProvider.GetRequiredService(); await aggrService.UpsertManyAsync(aggrRecords, false); Log.Info("[LIVE] Flushed {count} current daily aggregated records from Redis", aggrRecords.Count); var sampleCount = Math.Min(10, aggrRecords.Count); for (int i = 0; i < sampleCount; i++) { var r = aggrRecords[i]; Log.Info("[LIVE SAMPLE] Env={env}, Date={date}, Count={count}, Avg={avg:F2}ms", r.Destination, r.Hour.ToString("yyyy-MM-dd"), r.RequestCount, r.AvgDuration); } await DeleteCurrentDailyBucketKeysAsync(matchingKeys); } } catch (Exception ex) { Log.Error(ex, "Error flushing current daily data from Redis"); } } private async Task CleanExpiredRedisKeysAsync(DateTime hourStart, DateTime dayStart) { try { var lastHour = hourStart.AddHours(-1); var yesterday = dayStart.AddDays(-1); var deleteConfirmed = _config.GetValue("RouteMan:DeleteExpiredMetrics", false); if (!deleteConfirmed) { Log.Debug("Auto-delete of expired Redis keys disabled, skipping cleanup"); return; } var previewedKeys = new List(); var deletedCount = 0; var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; // Get all unique methods from current snapshot and Redis indices var methods = await GetAllMethodsAsync(); foreach (var method in methods) { var hoursIndexKey = $"{baseKey}:stats:hours:{method}"; var daysIndexKey = $"{baseKey}:stats:days:{method}"; if (!_db.KeyExists(hoursIndexKey) && !_db.KeyExists(daysIndexKey)) continue; // Preview expired hourly keys if (_db.KeyExists(hoursIndexKey)) { var expiredHourKeys = await GetExpiredHourKeysAsync(method, lastHour); previewedKeys.AddRange(expiredHourKeys); Log.Info("[PREVIEW] Would delete {count} expired hourly buckets for method {method}", expiredHourKeys.Count, method); } // Preview expired daily keys if (_db.KeyExists(daysIndexKey)) { var expiredDayKeys = await GetExpiredDayKeysAsync(method, yesterday); previewedKeys.AddRange(expiredDayKeys); Log.Info("[PREVIEW] Would delete {count} expired daily buckets for method {method}", expiredDayKeys.Count, method); } } // Delete keys if any found if (previewedKeys.Count > 0) { var uniqueKeys = previewedKeys.Distinct().ToList(); deletedCount = await DeleteKeysAsync(uniqueKeys); Log.Info("Deleted {count} expired Redis keys", deletedCount); } else { Log.Debug("No expired Redis keys to delete"); } } catch (Exception ex) { Log.Error(ex, "Error cleaning expired Redis keys"); } } private async Task> GetAllMethodsAsync() { var methods = new HashSet(); var snapshot = _stats.Snapshot(); foreach (var method in snapshot.Keys) { methods.Add(method); } // Try to get more methods from Redis indices if needed return methods.ToList(); } /// /// Recupera tutte le chiavi dati gli indici /// /// /// /// public async Task> GetDayIndicesAsync(string baseKey, int limit = 10000) { var allDayIndices = new List(); var pattern = $"{baseKey}:stats:days:*"; // Assicurati che il pattern sia corretto // 1. Otteniamo tutti gli endpoint (in caso di cluster o replica, potrebbero essere pių di uno) var endpoints = _mux.GetEndPoints(); foreach (var endpoint in endpoints) { // 2. Otteniamo l'oggetto Server per quell'endpoint var server = _mux.GetServer(endpoint); // 3. Usiamo KeysAsync che restituisce un IAsyncEnumerable // Questo metodo gestisce internamente il comando SCAN e il cursore di Redis! try { // L'iterazione č asincrona e molto efficiente await foreach (var key in server.KeysAsync(pattern: pattern)) { //if (key != null) //{ //} allDayIndices.Add(key); // Applichiamo il limite di sicurezza che avevi impostato if (allDayIndices.Count >= limit) break; } } catch (RedisException ex) { Log.Error(ex, $"Errore durante la scansione sull'endpoint {endpoint}"); } } return allDayIndices; } private async Task> GetExpiredHourKeysAsync(string method, DateTime cutoff) { var keys = new List(); var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; var hoursIndexKey = $"{baseKey}:stats:hours:{method}"; if (!_db.KeyExists(hoursIndexKey)) return keys; try { var allKeys = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1); foreach (var key in allKeys) { if (!key.HasValue) continue; var parts = key.ToString().Split(':'); if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out var hourFromKey)) { if (hourFromKey < cutoff) { keys.Add(key.ToString()); } } } } catch (Exception ex) { Log.Error(ex, "Error reading expired hourly keys for method {method}", method); } return keys; } private async Task> GetExpiredDayKeysAsync(string method, DateTime cutoff) { var keys = new List(); var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; var daysIndexKey = $"{baseKey}:stats:days:{method}"; if (!_db.KeyExists(daysIndexKey)) return keys; try { var allKeys = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1); foreach (var key in allKeys) { if (!key.HasValue) continue; var parts = key.ToString().Split(':'); if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dayFromKey)) { if (dayFromKey.Date < cutoff.Date) { keys.Add(key.ToString()); } } } } catch (Exception ex) { Log.Error(ex, "Error reading expired daily keys for method {method}", method); } return keys; } private async Task DeleteKeysAsync(List keys) { if (keys.Count == 0) return 0; var successCount = 0; var batch = _db.CreateBatch(); var tasks = new List>(); foreach (var key in keys) { tasks.Add(batch.KeyDeleteAsync(key)); } batch.Execute(); var results = await Task.WhenAll(tasks); successCount = results.Count(r => r == true); return successCount; } } }