using MP.Data.DbModels.Utils; using MP.Data.Services.Utils; using NLog; using StackExchange.Redis; using System.Globalization; namespace MP.IOC.Services { public class MetricsDbFlushService : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private const int FlushIntervalSeconds = 30; private readonly RouteStatsManager _stats; //private readonly IStatsAggrService _aggrService; //private readonly IStatsDetailService _detailService; private readonly IConfiguration _config; private readonly IDatabase _db; private static readonly Logger Log = LogManager.GetCurrentClassLogger(); public MetricsDbFlushService( RouteStatsManager stats, IServiceScopeFactory scopeFactory, //IStatsAggrService aggrService, //IStatsDetailService detailService, IConfiguration config, IConnectionMultiplexer mux) { _stats = stats; _scopeFactory = scopeFactory; //_aggrService = aggrService; //_detailService = detailService; _config = config; _db = mux.GetDatabase(); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var interval = _config.GetValue("RouteMan:FlushIntervalSeconds", FlushIntervalSeconds); while (!stoppingToken.IsCancellationRequested) { try { await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken); await FlushMetricsAsync(); } catch (TaskCanceledException) { break; } catch (Exception ex) { Log.Error(ex, "Error flushing metrics to database"); } } } public async Task FlushMetricsAsync() { var utcNow = DateTime.Now; var hourStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, utcNow.Hour, 0, 0); var dayStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, 0, 0, 0); await FlushCurrentDataAsync(utcNow, hourStart, dayStart); await CleanExpiredRedisKeysAsync(hourStart, dayStart); } private async Task FlushCurrentDataAsync(DateTime utcNow, DateTime hourStart, DateTime dayStart) { try { var snapshot = _stats.Snapshot(); if (snapshot.Count == 0) { Log.Debug("No metrics in snapshot to flush"); return; } var aggrRecords = new List(); var detailRecords = new List(); foreach (var kv in snapshot) { var method = kv.Key; var stat = kv.Value; var count = Interlocked.Read(ref stat.Count); var totalMs = stat.TotalDuration.TotalMilliseconds; var maxMs = stat.MaxDuration.TotalMilliseconds; var minMs = (stat.MinDuration == TimeSpan.MaxValue) ? 0 : stat.MinDuration.TotalMilliseconds; if (count == 0) continue; var avgDuration = totalMs / count; var finalMaxMs = (maxMs == 0) ? avgDuration : maxMs; var finalMinMs = (minMs == 0) ? avgDuration : minMs; var aggrRecord = new StatsAggregatedModel { Hour = hourStart, Type = method, RequestCount = count, AvgDuration = avgDuration, MaxDuration = finalMaxMs, MinDuration = finalMinMs, NoReply = 0 }; aggrRecords.Add(aggrRecord); foreach (var destStat in stat.Destinations) { var detailCount = Interlocked.Read(ref destStat.Value.Count); var detailTotalMs = destStat.Value.TotalDuration.TotalMilliseconds; var detailMaxMs = destStat.Value.MaxDuration.TotalMilliseconds; var detailMinMs = (destStat.Value.MinDuration == TimeSpan.MaxValue) ? 0 : destStat.Value.MinDuration.TotalMilliseconds; if (detailCount == 0) continue; var finalDetailMaxMs = (detailMaxMs == 0) ? (detailTotalMs / detailCount) : detailMaxMs; var finalDetailMinMs = (detailMinMs == 0) ? (detailTotalMs / detailCount) : detailMinMs; var detailRecord = new StatsDetailModel { Environment = destStat.Key, Type = method, Hour = hourStart, RequestCount = detailCount, AvgDuration = detailTotalMs / detailCount, MaxDuration = finalDetailMaxMs, MinDuration = finalDetailMinMs, NoReply = 0 }; detailRecords.Add(detailRecord); } } await using var scope = _scopeFactory.CreateAsyncScope(); if (aggrRecords.Count > 0) { var _aggrService = scope.ServiceProvider.GetRequiredService(); await _aggrService.UpsertManyAsync(aggrRecords, false); Log.Info("Flushed {count} current aggregated stats records", aggrRecords.Count); var sampleCount = Math.Min(10, aggrRecords.Count); for (int i = 0; i < sampleCount; i++) { var r = aggrRecords[i]; Log.Info("[SAMPLE] INSERT Aggregated: Hour={hour}, Type={type}, Count={count}, Avg={avg:F2}ms, Min={min:F2}ms, Max={max:F2}ms", r.Hour.ToString("yyyy-MM-dd HH:mm"), r.Type, r.RequestCount, r.AvgDuration, r.MinDuration, r.MaxDuration); } } if (detailRecords.Count > 0) { var _detailService = scope.ServiceProvider.GetRequiredService(); await _detailService.UpsertManyAsync(detailRecords, false); Log.Info("Flushed {count} current detail stats records", detailRecords.Count); var sampleCount = Math.Min(10, detailRecords.Count); for (int i = 0; i < sampleCount; i++) { var r = detailRecords[i]; Log.Info("[SAMPLE] INSERT Detail: Env={env}, Type={type}, Hour={hour}, Count={count}, Avg={avg:F2}ms, Min={min:F2}ms, Max={max:F2}ms", r.Environment, r.Type, r.Hour.ToString("yyyy-MM-dd HH:mm"), r.RequestCount, r.AvgDuration, r.MinDuration, r.MaxDuration); } } _stats.Clear(); } catch (Exception ex) { Log.Error(ex, "Error processing current metrics for database flush"); } } private async Task CleanExpiredRedisKeysAsync(DateTime hourStart, DateTime dayStart) { try { var lastHour = hourStart.AddHours(-1); var yesterday = dayStart.AddDays(-1); var deleteConfirmed = _config.GetValue("RouteMan:DeleteExpiredMetrics", false); if (!deleteConfirmed) { Log.Debug("Auto-delete of expired Redis keys disabled, skipping cleanup"); return; } var previewedKeys = new List(); var deletedCount = 0; var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; // Get all unique methods from current snapshot and Redis indices var methods = await GetAllMethodsAsync(); foreach (var method in methods) { var hoursIndexKey = $"{baseKey}:stats:hours:{method}"; var daysIndexKey = $"{baseKey}:stats:days:{method}"; if (!_db.KeyExists(hoursIndexKey) && !_db.KeyExists(daysIndexKey)) continue; // Preview expired hourly keys if (_db.KeyExists(hoursIndexKey)) { var expiredHourKeys = await GetExpiredHourKeysAsync(method, lastHour); previewedKeys.AddRange(expiredHourKeys); Log.Info("[PREVIEW] Would delete {count} expired hourly buckets for method {method}", expiredHourKeys.Count, method); } // Preview expired daily keys if (_db.KeyExists(daysIndexKey)) { var expiredDayKeys = await GetExpiredDayKeysAsync(method, yesterday); previewedKeys.AddRange(expiredDayKeys); Log.Info("[PREVIEW] Would delete {count} expired daily buckets for method {method}", expiredDayKeys.Count, method); } } // Delete keys if any found if (previewedKeys.Count > 0) { var uniqueKeys = previewedKeys.Distinct().ToList(); deletedCount = await DeleteKeysAsync(uniqueKeys); Log.Info("Deleted {count} expired Redis keys", deletedCount); } else { Log.Debug("No expired Redis keys to delete"); } } catch (Exception ex) { Log.Error(ex, "Error cleaning expired Redis keys"); } } private async Task> GetAllMethodsAsync() { var methods = new HashSet(); var snapshot = _stats.Snapshot(); foreach (var method in snapshot.Keys) { methods.Add(method); } // Try to get more methods from Redis indices if needed return methods.ToList(); } private async Task> GetExpiredHourKeysAsync(string method, DateTime cutoff) { var keys = new List(); var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; var hoursIndexKey = $"{baseKey}:stats:hours:{method}"; if (!_db.KeyExists(hoursIndexKey)) return keys; try { var allKeys = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1); foreach (var key in allKeys) { if (!key.HasValue) continue; var parts = key.ToString().Split(':'); if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out var hourFromKey)) { if (hourFromKey < cutoff) { keys.Add(key.ToString()); } } } } catch (Exception ex) { Log.Error(ex, "Error reading expired hourly keys for method {method}", method); } return keys; } private async Task> GetExpiredDayKeysAsync(string method, DateTime cutoff) { var keys = new List(); var baseKey = _config.GetValue("ServerConf:RedisBaseKey") ?? "MP_IOC"; var daysIndexKey = $"{baseKey}:stats:days:{method}"; if (!_db.KeyExists(daysIndexKey)) return keys; try { var allKeys = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1); foreach (var key in allKeys) { if (!key.HasValue) continue; var parts = key.ToString().Split(':'); if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dayFromKey)) { if (dayFromKey.Date < cutoff.Date) { keys.Add(key.ToString()); } } } } catch (Exception ex) { Log.Error(ex, "Error reading expired daily keys for method {method}", method); } return keys; } private async Task DeleteKeysAsync(List keys) { if (keys.Count == 0) return 0; var successCount = 0; var batch = _db.CreateBatch(); var tasks = new List>(); foreach (var key in keys) { tasks.Add(batch.KeyDeleteAsync(key)); } batch.Execute(); var results = await Task.WhenAll(tasks); successCount = results.Count(r => r == true); return successCount; } } }