342 lines
12 KiB
C#
342 lines
12 KiB
C#
using EgwCoreLib.Utils;
|
|
using MP.Data.DbModels.Utils;
|
|
using MP.Data.Services.Utils;
|
|
using NLog;
|
|
using StackExchange.Redis;
|
|
using System.Globalization;
|
|
|
|
namespace MP.IOC.Services
|
|
{
|
|
public class MetricsDbFlushService : BackgroundService
|
|
{
|
|
private const int FlushIntervalSeconds = 30;
|
|
private readonly RouteStatsManager _stats;
|
|
private readonly IStatsAggrService _aggrService;
|
|
private readonly IStatsDetailService _detailService;
|
|
private readonly IConfiguration _config;
|
|
private readonly IDatabase _db;
|
|
private static readonly Logger Log = LogManager.GetCurrentClassLogger();
|
|
|
|
public MetricsDbFlushService(
|
|
RouteStatsManager stats,
|
|
IStatsAggrService aggrService,
|
|
IStatsDetailService detailService,
|
|
IConfiguration config,
|
|
IConnectionMultiplexer mux)
|
|
{
|
|
_stats = stats;
|
|
_aggrService = aggrService;
|
|
_detailService = detailService;
|
|
_config = config;
|
|
_db = mux.GetDatabase();
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var interval = _config.GetValue<int>("RouteMan:FlushIntervalSeconds", FlushIntervalSeconds);
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
|
|
await FlushMetricsAsync();
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
break;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error flushing metrics to database");
|
|
}
|
|
}
|
|
}
|
|
|
|
public async Task FlushMetricsAsync()
|
|
{
|
|
var utcNow = DateTime.Now;
|
|
var hourStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, utcNow.Hour, 0, 0);
|
|
var dayStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, 0, 0, 0);
|
|
|
|
await ProcessExpiredDataAsync(utcNow, hourStart, dayStart);
|
|
await FlushCurrentDataAsync(utcNow, hourStart, dayStart);
|
|
}
|
|
|
|
private async Task ProcessExpiredDataAsync(DateTime utcNow, DateTime hourStart, DateTime dayStart)
|
|
{
|
|
try
|
|
{
|
|
var thirtyDaysAgo = utcNow.AddDays(-30);
|
|
var lastHour = hourStart.AddHours(-1);
|
|
var yesterday = dayStart.AddDays(-1);
|
|
|
|
var aggRange = new DtUtils.Periodo(DtUtils.PeriodSet.Today)
|
|
{
|
|
Inicio = thirtyDaysAgo,
|
|
Fine = utcNow
|
|
};
|
|
|
|
var aggrRecords = await _aggrService.GetFiltAsync(thirtyDaysAgo, utcNow);
|
|
var detailRecords = await _detailService.GetFiltAsync(thirtyDaysAgo, utcNow);
|
|
|
|
if (aggrRecords.Count == 0 && detailRecords.Count == 0)
|
|
{
|
|
Log.Debug("No expired data found to process");
|
|
return;
|
|
}
|
|
|
|
var keysToDelete = new List<string>();
|
|
var logMessages = new List<string>();
|
|
|
|
foreach (var method in aggrRecords.Select(r => r.Type).Distinct())
|
|
{
|
|
var hoursIndexKey = GetHoursIndexKey(method);
|
|
var daysIndexKey = GetDaysIndexKey(method);
|
|
|
|
if (_db.KeyExists(hoursIndexKey))
|
|
{
|
|
var lastHourKeys = await GetExpiredHourKeysAsync(method, lastHour);
|
|
if (lastHourKeys.Count > 0)
|
|
{
|
|
keysToDelete.AddRange(lastHourKeys);
|
|
logMessages.Add($"[PREVIEW] Would delete {lastHourKeys.Count} expired hourly buckets for method {method}");
|
|
}
|
|
}
|
|
|
|
if (_db.KeyExists(daysIndexKey))
|
|
{
|
|
var yesterdayKeys = await GetExpiredDayKeysAsync(method, yesterday);
|
|
if (yesterdayKeys.Count > 0)
|
|
{
|
|
keysToDelete.AddRange(yesterdayKeys);
|
|
logMessages.Add($"[PREVIEW] Would delete {yesterdayKeys.Count} expired daily buckets for method {method}");
|
|
}
|
|
}
|
|
}
|
|
|
|
Log.Info("Expired data preview: {count} records found", aggrRecords.Count + detailRecords.Count);
|
|
foreach (var msg in logMessages)
|
|
{
|
|
Log.Info(msg);
|
|
}
|
|
|
|
var deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
|
|
if (deleteConfirmed && keysToDelete.Count > 0)
|
|
{
|
|
await DeleteKeysAsync(keysToDelete.Distinct().ToList());
|
|
Log.Info("Deleted {count} expired Redis keys", keysToDelete.Count);
|
|
}
|
|
|
|
ProcessAggregatedRecords(aggrRecords, utcNow, hourStart);
|
|
ProcessDetailRecords(detailRecords, utcNow, hourStart);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error processing expired data");
|
|
}
|
|
}
|
|
|
|
private async Task<List<string>> GetExpiredHourKeysAsync(string method, DateTime cutoff)
|
|
{
|
|
var keys = new List<string>();
|
|
var hoursIndexKey = GetHoursIndexKey(method);
|
|
|
|
if (!_db.KeyExists(hoursIndexKey)) return keys;
|
|
|
|
var allKeys = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1);
|
|
foreach (var key in allKeys)
|
|
{
|
|
if (key == null) continue;
|
|
var hourFromKey = ExtractHourFromKey(key);
|
|
if (hourFromKey.HasValue && hourFromKey.Value < cutoff)
|
|
{
|
|
keys.Add(key.ToString());
|
|
}
|
|
}
|
|
|
|
return keys;
|
|
}
|
|
|
|
private async Task<List<string>> GetExpiredDayKeysAsync(string method, DateTime cutoff)
|
|
{
|
|
var keys = new List<string>();
|
|
var daysIndexKey = GetDaysIndexKey(method);
|
|
|
|
if (!_db.KeyExists(daysIndexKey)) return keys;
|
|
|
|
var allKeys = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1);
|
|
foreach (var key in allKeys)
|
|
{
|
|
if (key == null) continue;
|
|
var dayFromKey = ExtractDayFromKey(key);
|
|
if (dayFromKey.HasValue && dayFromKey.Value.Date < cutoff.Date)
|
|
{
|
|
keys.Add(key.ToString());
|
|
}
|
|
}
|
|
|
|
return keys;
|
|
}
|
|
|
|
private async Task DeleteKeysAsync(List<string> keys)
|
|
{
|
|
var batch = _db.CreateBatch();
|
|
var tasks = keys.Select(k => batch.KeyDeleteAsync(k)).ToList();
|
|
batch.Execute();
|
|
await Task.WhenAll(tasks.ToArray());
|
|
}
|
|
|
|
private static DateTime? ExtractHourFromKey(string key)
|
|
{
|
|
try
|
|
{
|
|
var parts = key.Split(':');
|
|
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMddHH", null, DateTimeStyles.None, out var dt))
|
|
{
|
|
return dt;
|
|
}
|
|
}
|
|
catch { }
|
|
return null;
|
|
}
|
|
|
|
private static DateTime? ExtractDayFromKey(string key)
|
|
{
|
|
try
|
|
{
|
|
var parts = key.Split(':');
|
|
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMdd", null, DateTimeStyles.None, out var dt))
|
|
{
|
|
return dt;
|
|
}
|
|
}
|
|
catch { }
|
|
return null;
|
|
}
|
|
|
|
private async Task FlushCurrentDataAsync(DateTime utcNow, DateTime hourStart, DateTime dayStart)
|
|
{
|
|
try
|
|
{
|
|
var snapshot = _stats.Snapshot();
|
|
|
|
if (snapshot.Count == 0)
|
|
{
|
|
Log.Debug("No metrics in snapshot to flush");
|
|
return;
|
|
}
|
|
|
|
var aggrRecords = new List<StatsAggregatedModel>();
|
|
var detailRecords = new List<StatsDetailModel>();
|
|
|
|
foreach (var kv in snapshot)
|
|
{
|
|
var method = kv.Key;
|
|
var stat = kv.Value;
|
|
var count = Interlocked.Read(ref stat.Count);
|
|
var totalMs = stat.TotalDuration.TotalMilliseconds;
|
|
|
|
if (count == 0) continue;
|
|
|
|
var avgDuration = totalMs / count;
|
|
|
|
var aggrRecord = new StatsAggregatedModel
|
|
{
|
|
Hour = hourStart,
|
|
Type = method,
|
|
RequestCount = count,
|
|
AvgDuration = avgDuration,
|
|
MaxDuration = totalMs,
|
|
Perc05Duration = 0,
|
|
Perc95Duration = 0,
|
|
MinDuration = 0,
|
|
NoReply = 0
|
|
};
|
|
|
|
aggrRecords.Add(aggrRecord);
|
|
|
|
foreach (var dest in stat.Destinations)
|
|
{
|
|
var detailRecord = new StatsDetailModel
|
|
{
|
|
Environment = dest.Key,
|
|
Type = method,
|
|
Hour = hourStart,
|
|
RequestCount = dest.Value,
|
|
AvgDuration = totalMs / dest.Value,
|
|
MaxDuration = totalMs,
|
|
Perc05Duration = 0,
|
|
Perc95Duration = 0,
|
|
MinDuration = 0,
|
|
NoReply = 0
|
|
};
|
|
|
|
detailRecords.Add(detailRecord);
|
|
}
|
|
}
|
|
|
|
if (aggrRecords.Count > 0)
|
|
{
|
|
await _aggrService.UpsertManyAsync(aggrRecords, false);
|
|
Log.Info("Flushed {count} current aggregated stats records", aggrRecords.Count);
|
|
}
|
|
|
|
if (detailRecords.Count > 0)
|
|
{
|
|
await _detailService.UpsertManyAsync(detailRecords, false);
|
|
Log.Info("Flushed {count} current detail stats records", detailRecords.Count);
|
|
}
|
|
|
|
_stats.Clear();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error processing current metrics for database flush");
|
|
}
|
|
}
|
|
|
|
private void ProcessAggregatedRecords(List<StatsAggregatedModel> records, DateTime utcNow, DateTime hourStart)
|
|
{
|
|
var expiredToday = records.Where(r => r.Hour < hourStart && r.Hour.Date == utcNow.Date).ToList();
|
|
if (expiredToday.Count > 0)
|
|
{
|
|
Log.Info("Processing {count} hourly records from previous hour", expiredToday.Count);
|
|
}
|
|
|
|
var toUpsert = expiredToday.Concat(records.Where(r => r.Hour < hourStart)).ToList();
|
|
if (toUpsert.Count > 0)
|
|
{
|
|
_aggrService.UpsertManyAsync(toUpsert, false).Wait();
|
|
}
|
|
}
|
|
|
|
private void ProcessDetailRecords(List<StatsDetailModel> records, DateTime utcNow, DateTime hourStart)
|
|
{
|
|
var expiredToday = records.Where(r => r.Hour < hourStart && r.Hour.Date == utcNow.Date).ToList();
|
|
if (expiredToday.Count > 0)
|
|
{
|
|
Log.Info("Processing {count} detail records from previous hour", expiredToday.Count);
|
|
}
|
|
|
|
var toUpsert = expiredToday.Concat(records.Where(r => r.Hour < hourStart)).ToList();
|
|
if (toUpsert.Count > 0)
|
|
{
|
|
_detailService.UpsertManyAsync(toUpsert, false).Wait();
|
|
}
|
|
}
|
|
|
|
private string GetHoursIndexKey(string method)
|
|
{
|
|
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
return $"{baseKey}:stats:hours:{method}";
|
|
}
|
|
|
|
private string GetDaysIndexKey(string method)
|
|
{
|
|
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
return $"{baseKey}:stats:days:{method}";
|
|
}
|
|
}
|
|
}
|