using MP.Data.DbModels.Utils; using MP.Data.Services.Utils; using NLog; namespace MP.IOC.Services { public class MetricsDbFlushService : BackgroundService { private const int FlushIntervalSeconds = 30; private readonly RouteStatsManager _stats; private readonly IStatsAggrService _aggrService; private readonly IStatsDetailService _detailService; private readonly IConfiguration _config; private static readonly Logger Log = LogManager.GetCurrentClassLogger(); public MetricsDbFlushService( RouteStatsManager stats, IStatsAggrService aggrService, IStatsDetailService detailService, IConfiguration config) { _stats = stats; _aggrService = aggrService; _detailService = detailService; _config = config; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var interval = _config.GetValue("RouteMan:FlushIntervalSeconds", FlushIntervalSeconds); while (!stoppingToken.IsCancellationRequested) { try { await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken); await FlushMetricsAsync(); } catch (TaskCanceledException) { break; } catch (Exception ex) { Log.Error(ex, "Error flushing metrics to database"); } } } public async Task FlushMetricsAsync() { var snapshot = _stats.Snapshot(); if (snapshot.Count == 0) { return; } try { var utcNow = DateTime.Now; var hourStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, utcNow.Hour, 0, 0); var aggrRecords = new List(); var detailRecords = new List(); foreach (var kv in snapshot) { var method = kv.Key; var stat = kv.Value; var count = Interlocked.Read(ref stat.Count); var totalMs = stat.TotalDuration.TotalMilliseconds; if (count == 0) continue; var avgDuration = totalMs / count; var aggrRecord = new StatsAggregatedModel { Hour = hourStart, RequestCount = count, AvgDuration = avgDuration, MaxDuration = totalMs, MinDuration = 0, NoReply = 0 }; aggrRecords.Add(aggrRecord); foreach (var dest in stat.Destinations) { var detailRecord = new StatsDetailModel { Environment = dest.Key, Type = method, Hour = hourStart, RequestCount = dest.Value, AvgDuration = totalMs / dest.Value, MaxDuration = totalMs, MinDuration = 0, NoReply = 0 }; detailRecords.Add(detailRecord); } } if (aggrRecords.Count > 0) { await _aggrService.UpsertManyAsync(aggrRecords, true); Log.Info("Flushed {count} aggregated stats records", aggrRecords.Count); } if (detailRecords.Count > 0) { await _detailService.UpsertManyAsync(detailRecords, true); Log.Info("Flushed {count} detail stats records", detailRecords.Count); } _stats.Clear(); } catch (Exception ex) { Log.Error(ex, "Error processing metrics for database flush"); throw; } } } }