Files
mapo-core/MP.IOC/Services/MetricsDbFlushService.cs
T
2026-04-07 12:34:41 +02:00

507 lines
20 KiB
C#

using MP.Data.DbModels.Utils;
using MP.Data.Services.Utils;
using NLog;
using StackExchange.Redis;
using System.Globalization;
namespace MP.IOC.Services
{
public class MetricsDbFlushService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private const int FlushIntervalSeconds = 30;
private readonly RouteStatsManager _stats;
//private readonly IStatsAggrService _aggrService;
//private readonly IStatsDetailService _detailService;
private readonly IConfiguration _config;
private readonly IDatabase _db;
private static readonly Logger Log = LogManager.GetCurrentClassLogger();
public MetricsDbFlushService(
RouteStatsManager stats,
IServiceScopeFactory scopeFactory,
//IStatsAggrService aggrService,
//IStatsDetailService detailService,
IConfiguration config,
IConnectionMultiplexer mux)
{
_stats = stats;
_scopeFactory = scopeFactory;
//_aggrService = aggrService;
//_detailService = detailService;
_config = config;
_db = mux.GetDatabase();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await RecoverHistoricalMetricsAsync();
var interval = _config.GetValue<int>("RouteMan:FlushIntervalSeconds", FlushIntervalSeconds);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
await FlushMetricsAsync();
}
catch (TaskCanceledException)
{
break;
}
catch (Exception ex)
{
Log.Error(ex, "Error flushing metrics to database");
}
}
}
public async Task RecoverHistoricalMetricsAsync()
{
try
{
//var deleteConfirmed = _config.GetValue<bool>("RouteMan:RecoverHistoricalMetrics", false);
//if (!deleteConfirmed)
//{
// Log.Debug("Historical metrics recovery disabled, skipping");
// return;
//}
Log.Info("Starting historical metrics recovery from Redis...");
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
var lastHour = DateTime.UtcNow.AddHours(-1);
var yesterday = DateTime.UtcNow.AddDays(-1).Date;
var methods = await GetAllMethodsAsync();
foreach (var method in methods)
{
var hoursIndexKey = $"{baseKey}:stats:hours:{method}";
var daysIndexKey = $"{baseKey}:stats:days:{method}";
if (!_db.KeyExists(hoursIndexKey) && !_db.KeyExists(daysIndexKey)) continue;
await RecoverHistoricalHourlyAsync(method, hoursIndexKey, lastHour);
await RecoverHistoricalDailyAsync(method, daysIndexKey, yesterday);
}
Log.Info("Historical metrics recovery completed");
}
catch (Exception ex)
{
Log.Error(ex, "Error recovering historical metrics from Redis");
}
}
private async Task RecoverHistoricalHourlyAsync(string method, string hoursIndexKey, DateTime cutoff)
{
try
{
var allKeys = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1);
var recordsToInsert = new List<StatsAggregatedModel>();
foreach (var key in allKeys)
{
if (!key.HasValue) continue;
var parts = key.ToString().Split(':');
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMddHH",
CultureInfo.InvariantCulture, DateTimeStyles.None, out var hourFromKey))
{
if (hourFromKey <= cutoff)
{
var score = await _db.SortedSetScoreAsync(hoursIndexKey, key);
if (score.HasValue)
{
recordsToInsert.Add(new StatsAggregatedModel
{
Hour = hourFromKey.Date.AddHours(hourFromKey.Hour),
Type = method,
RequestCount = (long)score.Value,
AvgDuration = 0,
MinDuration = 0,
MaxDuration = 0,
NoReply = 0
});
}
}
}
}
if (recordsToInsert.Count > 0)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var _aggrService = scope.ServiceProvider.GetRequiredService<IStatsAggrService>();
await _aggrService.UpsertManyAsync(recordsToInsert, false);
Log.Info("[HISTORICAL] Recovered {count} hourly records for method {method}", recordsToInsert.Count, method);
var sampleCount = Math.Min(5, recordsToInsert.Count);
for (int i = 0; i < sampleCount; i++)
{
var r = recordsToInsert[i];
Log.Info("[HISTORICAL SAMPLE] Hour={hour}, Type={type}, Count={count}",
r.Hour.ToString("yyyy-MM-dd HH:mm"), r.Type, r.RequestCount);
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error recovering historical hourly data for method {method}", method);
}
}
private async Task RecoverHistoricalDailyAsync(string method, string daysIndexKey, DateTime cutoff)
{
try
{
var allKeys = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1);
var recordsToInsert = new List<StatsDetailModel>();
foreach (var key in allKeys)
{
if (!key.HasValue) continue;
var parts = key.ToString().Split(':');
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMdd",
CultureInfo.InvariantCulture, DateTimeStyles.None, out var dayFromKey))
{
if (dayFromKey.Date <= cutoff.Date)
{
var score = await _db.SortedSetScoreAsync(daysIndexKey, key);
if (score.HasValue)
{
recordsToInsert.Add(new StatsDetailModel
{
Environment = parts.Length > 4 ? parts[4] : "unknown",
Type = method,
Hour = dayFromKey.Date,
RequestCount = (long)score.Value,
AvgDuration = 0,
MinDuration = 0,
MaxDuration = 0,
NoReply = 0
});
}
}
}
}
if (recordsToInsert.Count > 0)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var _detailService = scope.ServiceProvider.GetRequiredService<IStatsDetailService>();
await _detailService.UpsertManyAsync(recordsToInsert, false);
Log.Info("[HISTORICAL] Recovered {count} daily records for method {method}", recordsToInsert.Count, method);
var sampleCount = Math.Min(5, recordsToInsert.Count);
for (int i = 0; i < sampleCount; i++)
{
var r = recordsToInsert[i];
Log.Info("[HISTORICAL SAMPLE] Env={env}, Type={type}, Date={date}, Count={count}",
r.Environment, r.Type, r.Hour.ToString("yyyy-MM-dd"), r.RequestCount);
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error recovering historical daily data for method {method}", method);
}
}
public async Task FlushMetricsAsync()
{
var utcNow = DateTime.Now;
var hourStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, utcNow.Hour, 0, 0);
var dayStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, 0, 0, 0);
await FlushCurrentDataAsync(utcNow, hourStart, dayStart);
await CleanExpiredRedisKeysAsync(hourStart, dayStart);
}
private async Task FlushCurrentDataAsync(DateTime utcNow, DateTime hourStart, DateTime dayStart)
{
try
{
var snapshot = _stats.Snapshot();
if (snapshot.Count == 0)
{
Log.Debug("No metrics in snapshot to flush");
return;
}
var aggrRecords = new List<StatsAggregatedModel>();
var detailRecords = new List<StatsDetailModel>();
foreach (var kv in snapshot)
{
var method = kv.Key;
var stat = kv.Value;
var count = Interlocked.Read(ref stat.Count);
var totalMs = stat.TotalDuration.TotalMilliseconds;
var maxMs = stat.MaxDuration.TotalMilliseconds;
var minMs = (stat.MinDuration == TimeSpan.MaxValue) ? 0 : stat.MinDuration.TotalMilliseconds;
if (count == 0) continue;
var avgDuration = totalMs / count;
var finalMaxMs = (maxMs == 0) ? avgDuration : maxMs;
var finalMinMs = (minMs == 0) ? avgDuration : minMs;
var aggrRecord = new StatsAggregatedModel
{
Hour = hourStart,
Type = method,
RequestCount = count,
AvgDuration = avgDuration,
MaxDuration = finalMaxMs,
MinDuration = finalMinMs,
NoReply = 0
};
aggrRecords.Add(aggrRecord);
foreach (var destStat in stat.Destinations)
{
var detailCount = Interlocked.Read(ref destStat.Value.Count);
var detailTotalMs = destStat.Value.TotalDuration.TotalMilliseconds;
var detailMaxMs = destStat.Value.MaxDuration.TotalMilliseconds;
var detailMinMs = (destStat.Value.MinDuration == TimeSpan.MaxValue) ? 0 : destStat.Value.MinDuration.TotalMilliseconds;
if (detailCount == 0) continue;
var finalDetailMaxMs = (detailMaxMs == 0) ? (detailTotalMs / detailCount) : detailMaxMs;
var finalDetailMinMs = (detailMinMs == 0) ? (detailTotalMs / detailCount) : detailMinMs;
var detailRecord = new StatsDetailModel
{
Environment = destStat.Key,
Type = method,
Hour = hourStart,
RequestCount = detailCount,
AvgDuration = detailTotalMs / detailCount,
MaxDuration = finalDetailMaxMs,
MinDuration = finalDetailMinMs,
NoReply = 0
};
detailRecords.Add(detailRecord);
}
}
await using var scope = _scopeFactory.CreateAsyncScope();
if (aggrRecords.Count > 0)
{
var _aggrService = scope.ServiceProvider.GetRequiredService<IStatsAggrService>();
await _aggrService.UpsertManyAsync(aggrRecords, false);
Log.Info("Flushed {count} current aggregated stats records", aggrRecords.Count);
var sampleCount = Math.Min(10, aggrRecords.Count);
for (int i = 0; i < sampleCount; i++)
{
var r = aggrRecords[i];
Log.Info("[SAMPLE] INSERT Aggregated: Hour={hour}, Type={type}, Count={count}, Avg={avg:F2}ms, Min={min:F2}ms, Max={max:F2}ms",
r.Hour.ToString("yyyy-MM-dd HH:mm"), r.Type, r.RequestCount, r.AvgDuration, r.MinDuration, r.MaxDuration);
}
}
if (detailRecords.Count > 0)
{
var _detailService = scope.ServiceProvider.GetRequiredService<IStatsDetailService>();
await _detailService.UpsertManyAsync(detailRecords, false);
Log.Info("Flushed {count} current detail stats records", detailRecords.Count);
var sampleCount = Math.Min(10, detailRecords.Count);
for (int i = 0; i < sampleCount; i++)
{
var r = detailRecords[i];
Log.Info("[SAMPLE] INSERT Detail: Env={env}, Type={type}, Hour={hour}, Count={count}, Avg={avg:F2}ms, Min={min:F2}ms, Max={max:F2}ms",
r.Environment, r.Type, r.Hour.ToString("yyyy-MM-dd HH:mm"), r.RequestCount, r.AvgDuration, r.MinDuration, r.MaxDuration);
}
}
_stats.Clear();
}
catch (Exception ex)
{
Log.Error(ex, "Error processing current metrics for database flush");
}
}
private async Task CleanExpiredRedisKeysAsync(DateTime hourStart, DateTime dayStart)
{
try
{
var lastHour = hourStart.AddHours(-1);
var yesterday = dayStart.AddDays(-1);
var deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
if (!deleteConfirmed)
{
Log.Debug("Auto-delete of expired Redis keys disabled, skipping cleanup");
return;
}
var previewedKeys = new List<string>();
var deletedCount = 0;
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
// Get all unique methods from current snapshot and Redis indices
var methods = await GetAllMethodsAsync();
foreach (var method in methods)
{
var hoursIndexKey = $"{baseKey}:stats:hours:{method}";
var daysIndexKey = $"{baseKey}:stats:days:{method}";
if (!_db.KeyExists(hoursIndexKey) && !_db.KeyExists(daysIndexKey)) continue;
// Preview expired hourly keys
if (_db.KeyExists(hoursIndexKey))
{
var expiredHourKeys = await GetExpiredHourKeysAsync(method, lastHour);
previewedKeys.AddRange(expiredHourKeys);
Log.Info("[PREVIEW] Would delete {count} expired hourly buckets for method {method}",
expiredHourKeys.Count, method);
}
// Preview expired daily keys
if (_db.KeyExists(daysIndexKey))
{
var expiredDayKeys = await GetExpiredDayKeysAsync(method, yesterday);
previewedKeys.AddRange(expiredDayKeys);
Log.Info("[PREVIEW] Would delete {count} expired daily buckets for method {method}",
expiredDayKeys.Count, method);
}
}
// Delete keys if any found
if (previewedKeys.Count > 0)
{
var uniqueKeys = previewedKeys.Distinct().ToList();
deletedCount = await DeleteKeysAsync(uniqueKeys);
Log.Info("Deleted {count} expired Redis keys", deletedCount);
}
else
{
Log.Debug("No expired Redis keys to delete");
}
}
catch (Exception ex)
{
Log.Error(ex, "Error cleaning expired Redis keys");
}
}
private async Task<List<string>> GetAllMethodsAsync()
{
var methods = new HashSet<string>();
var snapshot = _stats.Snapshot();
foreach (var method in snapshot.Keys)
{
methods.Add(method);
}
// Try to get more methods from Redis indices if needed
return methods.ToList();
}
private async Task<List<string>> GetExpiredHourKeysAsync(string method, DateTime cutoff)
{
var keys = new List<string>();
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
var hoursIndexKey = $"{baseKey}:stats:hours:{method}";
if (!_db.KeyExists(hoursIndexKey)) return keys;
try
{
var allKeys = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1);
foreach (var key in allKeys)
{
if (!key.HasValue) continue;
var parts = key.ToString().Split(':');
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMddHH",
CultureInfo.InvariantCulture, DateTimeStyles.None, out var hourFromKey))
{
if (hourFromKey < cutoff)
{
keys.Add(key.ToString());
}
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error reading expired hourly keys for method {method}", method);
}
return keys;
}
private async Task<List<string>> GetExpiredDayKeysAsync(string method, DateTime cutoff)
{
var keys = new List<string>();
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
var daysIndexKey = $"{baseKey}:stats:days:{method}";
if (!_db.KeyExists(daysIndexKey)) return keys;
try
{
var allKeys = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1);
foreach (var key in allKeys)
{
if (!key.HasValue) continue;
var parts = key.ToString().Split(':');
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMdd",
CultureInfo.InvariantCulture, DateTimeStyles.None, out var dayFromKey))
{
if (dayFromKey.Date < cutoff.Date)
{
keys.Add(key.ToString());
}
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error reading expired daily keys for method {method}", method);
}
return keys;
}
private async Task<int> DeleteKeysAsync(List<string> keys)
{
if (keys.Count == 0) return 0;
var successCount = 0;
var batch = _db.CreateBatch();
var tasks = new List<Task<bool>>();
foreach (var key in keys)
{
tasks.Add(batch.KeyDeleteAsync(key));
}
batch.Execute();
var results = await Task.WhenAll(tasks);
successCount = results.Count(r => r == true);
return successCount;
}
}
}