using EgwCoreLib.Utils; using MP.Data.DbModels.Utils; using MP.Data.Services.Utils; using NLog; using StackExchange.Redis; using System.Globalization; namespace MP.IOC.Services { public class MetricsDbFlushService : BackgroundService { private const int FlushIntervalSeconds = 30; private readonly RouteStatsManager _stats; private readonly IStatsAggrService _aggrService; private readonly IStatsDetailService _detailService; private readonly IConfiguration _config; private readonly IDatabase _db; private static readonly Logger Log = LogManager.GetCurrentClassLogger(); public MetricsDbFlushService( RouteStatsManager stats, IStatsAggrService aggrService, IStatsDetailService detailService, IConfiguration config, IConnectionMultiplexer mux) { _stats = stats; _aggrService = aggrService; _detailService = detailService; _config = config; _db = mux.GetDatabase(); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var interval = _config.GetValue("RouteMan:FlushIntervalSeconds", FlushIntervalSeconds); while (!stoppingToken.IsCancellationRequested) { try { await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken); await FlushMetricsAsync(); } catch (TaskCanceledException) { break; } catch (Exception ex) { Log.Error(ex, "Error flushing metrics to database"); } } } public async Task FlushMetricsAsync() { var utcNow = DateTime.Now; var hourStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, utcNow.Hour, 0, 0); var dayStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, 0, 0, 0); await ProcessExpiredDataAsync(utcNow, hourStart, dayStart); await FlushCurrentDataAsync(utcNow, hourStart, dayStart); } private async Task ProcessExpiredDataAsync(DateTime utcNow, DateTime hourStart, DateTime dayStart) { try { var thirtyDaysAgo = utcNow.AddDays(-30); var lastHour = hourStart.AddHours(-1); var yesterday = dayStart.AddDays(-1); var aggRange = new DtUtils.Periodo(DtUtils.PeriodSet.Today) { Inizio = thirtyDaysAgo, Fine = utcNow }; var aggrRecords = await _aggrService.GetFiltAsync(thirtyDaysAgo, utcNow); var detailRecords = await _detailService.GetFiltAsync(thirtyDaysAgo, utcNow); if (aggrRecords.Count == 0 && detailRecords.Count == 0) { Log.Debug("No expired data found to process"); return; } var keysToDelete = new List(); var logMessages = new List(); foreach (var method in aggrRecords.Select(r => r.Type).Distinct()) { var hoursIndexKey = GetHoursIndexKey(method); var daysIndexKey = GetDaysIndexKey(method); if (_db.KeyExists(hoursIndexKey)) { var lastHourKeys = await GetExpiredHourKeysAsync(method, lastHour); if (lastHourKeys.Count > 0) { keysToDelete.AddRange(lastHourKeys); logMessages.Add($"[PREVIEW] Would delete {lastHourKeys.Count} expired hourly buckets for method {method}"); } } if (_db.KeyExists(daysIndexKey)) { var yesterdayKeys = await GetExpiredDayKeysAsync(method, yesterday); if (yesterdayKeys.Count > 0) { keysToDelete.AddRange(yesterdayKeys); logMessages.Add($"[PREVIEW] Would delete {yesterdayKeys.Count} expired daily buckets for method {method}"); } } } Log.Info("Expired data preview: {count} records found", aggrRecords.Count + detailRecords.Count); foreach (var msg in logMessages) { Log.Info(msg); } var deleteConfirmed = _config.GetValue("RouteMan:DeleteExpiredMetrics", false); if (deleteConfirmed && keysToDelete.Count > 0) { await DeleteKeysAsync(keysToDelete.Distinct().ToList()); Log.Info("Deleted {count} expired Redis keys", keysToDelete.Count); } ProcessAggregatedRecords(aggrRecords, utcNow, hourStart); ProcessDetailRecords(detailRecords, utcNow, hourStart); } catch (Exception ex) { Log.Error(ex, "Error processing expired data"); } } private async Task> GetExpiredHourKeysAsync(string method, DateTime cutoff) { var keys = new List(); var hoursIndexKey = GetHoursIndexKey(method); if (!_db.KeyExists(hoursIndexKey)) return keys; var allKeys = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1); foreach (var key in allKeys) { if (!key.HasValue) continue; var hourFromKey = ExtractHourFromKey(key); if (hourFromKey.HasValue && hourFromKey.Value < cutoff) { keys.Add(key.ToString()); } } return keys; } private async Task> GetExpiredDayKeysAsync(string method, DateTime cutoff) { var keys = new List(); var daysIndexKey = GetDaysIndexKey(method); if (!_db.KeyExists(daysIndexKey)) return keys; var allKeys = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1); foreach (var key in allKeys) { if (!key.HasValue) continue; var dayFromKey = ExtractDayFromKey(key); if (dayFromKey.HasValue && dayFromKey.Value.Date < cutoff.Date) { keys.Add(key.ToString()); } } return keys; } private async Task DeleteKeysAsync(List keys) { var batch = _db.CreateBatch(); var tasks = keys.Select(k => batch.KeyDeleteAsync(k)).ToList(); batch.Execute(); await Task.WhenAll(tasks.ToArray()); } private static DateTime? ExtractHourFromKey(string key) { try { var parts = key.Split(':'); if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMddHH", null, DateTimeStyles.None, out var dt)) { return dt; } } catch { } return null; } private static DateTime? ExtractDayFromKey(string key) { try { var parts = key.Split(':'); if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMdd", null, DateTimeStyles.None, out var dt)) { return dt; } } catch { } return null; } private async Task FlushCurrentDataAsync(DateTime utcNow, DateTime hourStart, DateTime dayStart) { try { var snapshot = _stats.Snapshot(); if (snapshot.Count == 0) { Log.Debug("No metrics in snapshot to flush"); return; } var aggrRecords = new List(); var detailRecords = new List(); foreach (var kv in snapshot) { var method = kv.Key; var stat = kv.Value; var count = Interlocked.Read(ref stat.Count); var totalMs = stat.TotalDuration.TotalMilliseconds; if (count == 0) continue; var avgDuration = totalMs / count; var aggrRecord = new StatsAggregatedModel { Hour = hourStart, Type = method, RequestCount = count, AvgDuration = avgDuration, MaxDuration = totalMs, MinDuration = 0, NoReply = 0 }; aggrRecords.Add(aggrRecord); foreach (var dest in stat.Destinations) { var detailRecord = new StatsDetailModel { Environment = dest.Key, Type = method, Hour = hourStart, RequestCount = dest.Value, AvgDuration = totalMs / dest.Value, MaxDuration = totalMs, MinDuration = 0, NoReply = 0 }; detailRecords.Add(detailRecord); } } if (aggrRecords.Count > 0) { await _aggrService.UpsertManyAsync(aggrRecords, false); Log.Info("Flushed {count} current aggregated stats records", aggrRecords.Count); } if (detailRecords.Count > 0) { await _detailService.UpsertManyAsync(detailRecords, false); Log.Info("Flushed {count} current detail stats records", detailRecords.Count); } _stats.Clear(); } catch (Exception ex) { Log.Error(ex, "Error processing current metrics for database flush"); } } private void ProcessAggregatedRecords(List records, DateTime utcNow, DateTime hourStart) { var expiredToday = records.Where(r => r.Hour < hourStart && r.Hour.Date == utcNow.Date).ToList(); if (expiredToday.Count > 0) { Log.Info("Processing {count} hourly records from previous hour", expiredToday.Count); } var toUpsert = expiredToday.Concat(records.Where(r => r.Hour < hourStart)).ToList(); if (toUpsert.Count > 0) { _aggrService.UpsertManyAsync(toUpsert, false).Wait(); } } private void ProcessDetailRecords(List records, DateTime utcNow, DateTime hourStart) { var expiredToday = records.Where(r => r.Hour < hourStart && r.Hour.Date == utcNow.Date).ToList(); if (expiredToday.Count > 0) { Log.Info("Processing {count} detail records from previous hour", expiredToday.Count); } var toUpsert = expiredToday.Concat(records.Where(r => r.Hour < hourStart)).ToList(); if (toUpsert.Count > 0) { _detailService.UpsertManyAsync(toUpsert, false).Wait(); } } private string GetHoursIndexKey(string method) { var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; return $"{baseKey}:stats:hours:{method}"; } private string GetDaysIndexKey(string method) { var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; return $"{baseKey}:stats:days:{method}"; } } }