718 lines
29 KiB
C#
718 lines
29 KiB
C#
using MP.Data.DbModels.Utils;
|
|
using MP.Data.Services.Utils;
|
|
using NLog;
|
|
using StackExchange.Redis;
|
|
using System.Globalization;
|
|
|
|
namespace MP.IOC.Services
|
|
{
|
|
public class MetricsDbFlushService : BackgroundService
|
|
{
|
|
private readonly IServiceScopeFactory _scopeFactory;
|
|
private readonly RouteStatsManager _stats;
|
|
private readonly IConfiguration _config;
|
|
private readonly IDatabase _db;
|
|
private static readonly Logger Log = LogManager.GetCurrentClassLogger();
|
|
|
|
public MetricsDbFlushService(
|
|
RouteStatsManager stats,
|
|
IServiceScopeFactory scopeFactory,
|
|
IConfiguration config,
|
|
IConnectionMultiplexer mux)
|
|
{
|
|
_stats = stats;
|
|
_scopeFactory = scopeFactory;
|
|
_config = config;
|
|
_db = mux.GetDatabase();
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var interval = _config.GetValue<int>("RouteMan:MetricFlushIntervalSeconds", 120);
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
|
|
await FlushMetricsAsync();
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
break;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error flushing metrics to database");
|
|
}
|
|
}
|
|
}
|
|
|
|
public async Task RecoverHistoricalMetricsAsync()
|
|
{
|
|
try
|
|
{
|
|
var deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
|
|
|
|
Log.Info("Starting historical metrics recovery from Redis...");
|
|
|
|
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
var yesterday = DateTime.Now.AddDays(-1).Date;
|
|
|
|
await RecoverHistoricalDailyAsync(baseKey, yesterday, deleteConfirmed);
|
|
await RecoverHistoricalHourlyAsync(baseKey, yesterday, deleteConfirmed);
|
|
|
|
Log.Info("Historical metrics recovery completed");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error recovering historical metrics from Redis");
|
|
}
|
|
}
|
|
|
|
private async Task RecoverHistoricalHourlyAsync(string baseKey, DateTime cutoff, bool deleteConfirmed)
|
|
{
|
|
try
|
|
{
|
|
var allHoursIndices = await _db.KeyExistsAsync($"{baseKey}:stats:hours:*");
|
|
|
|
if (!allHoursIndices)
|
|
{
|
|
Log.Debug("No hourly indices found in Redis");
|
|
return;
|
|
}
|
|
|
|
var cursor = 0L;
|
|
var pattern = $"{baseKey}:stats:hours:*";
|
|
var allHourIndices = new List<RedisKey>();
|
|
|
|
do
|
|
{
|
|
var result = await _db.KeyScanAsync(pattern, cursor);
|
|
allHourIndices.AddRange(result.Keys.Where(k => k != null));
|
|
cursor = result.Cursor;
|
|
} while (cursor > 0 && allHourIndices.Count < 10000);
|
|
|
|
var detailRecordsToInsert = new List<StatsDetailModel>();
|
|
|
|
foreach (var hoursIndexKey in allHourIndices)
|
|
{
|
|
var keyStr = hoursIndexKey.ToString();
|
|
var parts = keyStr.Split(':');
|
|
|
|
if (parts.Length < 6) continue;
|
|
|
|
var dest = parts[4];
|
|
var method = parts[5];
|
|
var dayHourPart = string.Join(":", parts.Skip(6).TakeWhile(p => DateTime.TryParseExact(p, "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out _)));
|
|
|
|
if (!DateTime.TryParseExact(dayHourPart, "yyyyMMddHH", CultureInfo.InvariantCulture, DateTimeStyles.None, out var hourFromKey))
|
|
continue;
|
|
|
|
if (hourFromKey > cutoff) continue;
|
|
|
|
var allBuckets = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1);
|
|
|
|
foreach (var bucketKey in allBuckets)
|
|
{
|
|
if (!bucketKey.HasValue) continue;
|
|
|
|
var hashData = await _db.HashGetAllAsync(bucketKey);
|
|
|
|
if (hashData.Length == 0) continue;
|
|
|
|
var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count");
|
|
var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs");
|
|
var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs");
|
|
var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs");
|
|
|
|
if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue;
|
|
|
|
var count = long.TryParse(countVal.Value.ToString(), out var count) ? count : 0;
|
|
var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMs) ? totalMs : 0;
|
|
var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMs) ? maxMs : 0;
|
|
var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMs) ? minMs : 0;
|
|
|
|
if (count == 0) continue;
|
|
|
|
var avgDuration = totalMs / count;
|
|
|
|
detailRecordsToInsert.Add(new StatsDetailModel
|
|
{
|
|
Destination = dest,
|
|
Type = method,
|
|
Hour = hourFromKey.Date.AddHours(hourFromKey.Hour),
|
|
RequestCount = count,
|
|
AvgDuration = avgDuration,
|
|
MinDuration = minMs,
|
|
MaxDuration = maxMs,
|
|
NoReply = 0
|
|
});
|
|
}
|
|
}
|
|
|
|
if (detailRecordsToInsert.Count > 0)
|
|
{
|
|
await using var scope = _scopeFactory.CreateAsyncScope();
|
|
var detailService = scope.ServiceProvider.GetRequiredService<IStatsDetailService>();
|
|
await detailService.UpsertManyAsync(detailRecordsToInsert, false);
|
|
|
|
Log.Info("[HISTORICAL] Recovered {count} hourly detail records from Redis", detailRecordsToInsert.Count);
|
|
|
|
var sampleCount = Math.Min(5, detailRecordsToInsert.Count);
|
|
for (int i = 0; i < sampleCount; i++)
|
|
{
|
|
var r = detailRecordsToInsert[i];
|
|
Log.Info("[HISTORICAL SAMPLE] Env={env}, Type={type}, Hour={hour}, Count={count}, Avg={avg:F2}ms",
|
|
r.Destination, r.Type, r.Hour.ToString("yyyy-MM-dd HH:mm"), r.RequestCount, r.AvgDuration);
|
|
}
|
|
|
|
if (deleteConfirmed)
|
|
{
|
|
await DeleteExpiredHourlyKeysAsync(allHourIndices);
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error recovering historical hourly data from Redis");
|
|
}
|
|
}
|
|
|
|
private async Task RecoverHistoricalDailyAsync(string baseKey, DateTime cutoff, bool deleteConfirmed)
|
|
{
|
|
try
|
|
{
|
|
var cursor = 0L;
|
|
var pattern = $"{baseKey}:stats:days:*";
|
|
var allDayIndices = new List<RedisKey>();
|
|
|
|
do
|
|
{
|
|
var result = await _db.KeyScanAsync(pattern, cursor);
|
|
allDayIndices.AddRange(result.Keys.Where(k => k != null));
|
|
cursor = result.Cursor;
|
|
} while (cursor > 0 && allDayIndices.Count < 10000);
|
|
|
|
var aggrRecordsToInsert = new List<StatsAggregatedModel>();
|
|
|
|
foreach (var daysIndexKey in allDayIndices)
|
|
{
|
|
var keyStr = daysIndexKey.ToString();
|
|
var parts = keyStr.Split(':');
|
|
|
|
if (parts.Length < 5) continue;
|
|
|
|
var dest = parts[4];
|
|
|
|
var dayPart = string.Join(":", parts.Skip(5).TakeWhile(p => DateTime.TryParseExact(p, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out _)));
|
|
|
|
if (!DateTime.TryParseExact(dayPart, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dayFromKey))
|
|
continue;
|
|
|
|
if (dayFromKey.Date > cutoff) continue;
|
|
|
|
var allBuckets = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1);
|
|
|
|
foreach (var bucketKey in allBuckets)
|
|
{
|
|
if (!bucketKey.HasValue) continue;
|
|
|
|
var hashData = await _db.HashGetAllAsync(bucketKey);
|
|
|
|
if (hashData.Length == 0) continue;
|
|
|
|
var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count");
|
|
var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs");
|
|
var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs");
|
|
var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs");
|
|
|
|
if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue;
|
|
|
|
var count = long.TryParse(countVal.Value.ToString(), out var countValParsed) ? countValParsed : 0;
|
|
var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMsValParsed) ? totalMsValParsed : 0;
|
|
var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMsValParsed) ? maxMsValParsed : 0;
|
|
var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMsValParsed) ? minMsValParsed : 0;
|
|
|
|
if (count == 0) continue;
|
|
|
|
var avgDuration = totalMs / count;
|
|
|
|
aggrRecordsToInsert.Add(new StatsAggregatedModel
|
|
{
|
|
Destination = dest,
|
|
Hour = dayFromKey.Date,
|
|
RequestCount = count,
|
|
AvgDuration = avgDuration,
|
|
MinDuration = minMs,
|
|
MaxDuration = maxMs,
|
|
NoReply = 0
|
|
});
|
|
}
|
|
}
|
|
|
|
if (aggrRecordsToInsert.Count > 0)
|
|
{
|
|
await using var scope = _scopeFactory.CreateAsyncScope();
|
|
var aggrService = scope.ServiceProvider.GetRequiredService<IStatsAggrService>();
|
|
await aggrService.UpsertManyAsync(aggrRecordsToInsert, false);
|
|
|
|
Log.Info("[HISTORICAL] Recovered {count} daily aggregated records from Redis", aggrRecordsToInsert.Count);
|
|
|
|
var sampleCount = Math.Min(5, aggrRecordsToInsert.Count);
|
|
for (int i = 0; i < sampleCount; i++)
|
|
{
|
|
var r = aggrRecordsToInsert[i];
|
|
Log.Info("[HISTORICAL SAMPLE] Env={env}, Date={date}, Count={count}, Avg={avg:F2}ms",
|
|
r.Destination, r.Hour.ToString("yyyy-MM-dd"), r.RequestCount, r.AvgDuration);
|
|
}
|
|
|
|
if (deleteConfirmed)
|
|
{
|
|
await DeleteExpiredDailyKeysAsync(allDayIndices);
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error recovering historical daily data from Redis");
|
|
}
|
|
}
|
|
|
|
private async Task DeleteExpiredHourlyKeysAsync(List<RedisKey> allHourIndices)
|
|
{
|
|
try
|
|
{
|
|
var keysToDelete = new List<string>();
|
|
|
|
foreach (var hoursIndexKey in allHourIndices)
|
|
{
|
|
var allBuckets = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1);
|
|
keysToDelete.AddRange(allBuckets.Select(k => k.ToString()).Where(s => !string.IsNullOrEmpty(s)));
|
|
}
|
|
|
|
if (keysToDelete.Count > 0)
|
|
{
|
|
var uniqueKeys = keysToDelete.Distinct().ToList();
|
|
var deletedCount = await DeleteKeysAsync(uniqueKeys);
|
|
Log.Info("[DELETE] Removed {count} expired hourly bucket keys from Redis", deletedCount);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error deleting expired hourly keys from Redis");
|
|
}
|
|
}
|
|
|
|
private async Task DeleteExpiredDailyKeysAsync(List<RedisKey> allDayIndices)
|
|
{
|
|
try
|
|
{
|
|
var keysToDelete = new List<string>();
|
|
|
|
foreach (var daysIndexKey in allDayIndices)
|
|
{
|
|
var allBuckets = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1);
|
|
keysToDelete.AddRange(allBuckets.Select(k => k.ToString()).Where(s => !string.IsNullOrEmpty(s)));
|
|
}
|
|
|
|
if (keysToDelete.Count > 0)
|
|
{
|
|
var uniqueKeys = keysToDelete.Distinct().ToList();
|
|
var deletedCount = await DeleteKeysAsync(uniqueKeys);
|
|
Log.Info("[DELETE] Removed {count} expired daily bucket keys from Redis", deletedCount);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error deleting expired daily keys from Redis");
|
|
}
|
|
}
|
|
|
|
public async Task FlushLiveMetricsAsync()
|
|
{
|
|
try
|
|
{
|
|
var utcNow = DateTime.Now;
|
|
var hourStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, utcNow.Hour, 0, 0);
|
|
|
|
await FlushCurrentHourlyDataAsync(utcNow, hourStart);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error flushing live metrics from Redis");
|
|
}
|
|
}
|
|
|
|
private async Task FlushCurrentHourlyDataAsync(DateTime utcNow, DateTime hourStart)
|
|
{
|
|
try
|
|
{
|
|
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
|
|
var cursor = 0L;
|
|
var pattern = $"{baseKey}:stats:hours:*:{hourStart.ToString("yyyyMMddHH")}";
|
|
var matchingKeys = new List<RedisKey>();
|
|
|
|
do
|
|
{
|
|
var result = await _db.KeyScanAsync(pattern, cursor);
|
|
matchingKeys.AddRange(result.Keys.Where(k => k != null));
|
|
cursor = result.Cursor;
|
|
} while (cursor > 0 && matchingKeys.Count < 10000);
|
|
|
|
if (matchingKeys.Count == 0)
|
|
{
|
|
Log.Debug("No current hourly buckets found in Redis for flush");
|
|
return;
|
|
}
|
|
|
|
var detailRecords = new List<StatsDetailModel>();
|
|
|
|
foreach (var bucketKey in matchingKeys)
|
|
{
|
|
var keyStr = bucketKey.ToString();
|
|
var parts = keyStr.Split(':');
|
|
|
|
if (parts.Length < 7) continue;
|
|
|
|
var dest = parts[4];
|
|
var method = parts[5];
|
|
|
|
try
|
|
{
|
|
var hashData = await _db.HashGetAllAsync(bucketKey);
|
|
|
|
if (hashData.Length == 0) continue;
|
|
|
|
var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count");
|
|
var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs");
|
|
var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs");
|
|
var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs");
|
|
|
|
if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue;
|
|
|
|
var count = long.TryParse(countVal.Value.ToString(), out var countParsed) ? countParsed : 0;
|
|
var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMsParsed) ? totalMsParsed : 0;
|
|
var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMsParsed) ? maxMsParsed : 0;
|
|
var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMsParsed) ? minMsParsed : 0;
|
|
|
|
if (count == 0) continue;
|
|
|
|
var avgDuration = totalMs / count;
|
|
|
|
detailRecords.Add(new StatsDetailModel
|
|
{
|
|
Destination = dest,
|
|
Type = method,
|
|
Hour = hourStart,
|
|
RequestCount = count,
|
|
AvgDuration = avgDuration,
|
|
MinDuration = minMs,
|
|
MaxDuration = maxMs,
|
|
NoReply = 0
|
|
});
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error processing hourly bucket key {key}", keyStr);
|
|
}
|
|
}
|
|
|
|
if (detailRecords.Count > 0)
|
|
{
|
|
await using var scope = _scopeFactory.CreateAsyncScope();
|
|
var detailService = scope.ServiceProvider.GetRequiredService<IStatsDetailService>();
|
|
await detailService.UpsertManyAsync(detailRecords, false);
|
|
|
|
Log.Info("[LIVE] Flushed {count} current hourly detail records from Redis", detailRecords.Count);
|
|
|
|
var sampleCount = Math.Min(10, detailRecords.Count);
|
|
for (int i = 0; i < sampleCount; i++)
|
|
{
|
|
var r = detailRecords[i];
|
|
Log.Info("[LIVE SAMPLE] Env={env}, Type={type}, Hour={hour}, Count={count}, Avg={avg:F2}ms, Min={min:F2}ms, Max={max:F2}ms",
|
|
r.Destination, r.Type, r.Hour.ToString("yyyy-MM-dd HH:mm"), r.RequestCount, r.AvgDuration, r.MinDuration, r.MaxDuration);
|
|
}
|
|
|
|
await DeleteCurrentHourlyBucketKeysAsync(matchingKeys);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error flushing current hourly data from Redis");
|
|
}
|
|
}
|
|
|
|
private async Task FlushCurrentDailyDataAsync(DateTime utcNow, DateTime dayStart)
|
|
{
|
|
try
|
|
{
|
|
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
|
|
var cursor = 0L;
|
|
var pattern = $"{baseKey}:stats:days:*:{dayStart.ToString("yyyyMMdd")}";
|
|
var matchingKeys = new List<RedisKey>();
|
|
|
|
do
|
|
{
|
|
var result = await _db.KeyScanAsync(pattern, cursor);
|
|
matchingKeys.AddRange(result.Keys.Where(k => k != null));
|
|
cursor = result.Cursor;
|
|
} while (cursor > 0 && matchingKeys.Count < 10000);
|
|
|
|
if (matchingKeys.Count == 0)
|
|
{
|
|
Log.Debug("No current daily buckets found in Redis for flush");
|
|
return;
|
|
}
|
|
|
|
var aggrRecords = new List<StatsAggregatedModel>();
|
|
|
|
foreach (var bucketKey in matchingKeys)
|
|
{
|
|
var keyStr = bucketKey.ToString();
|
|
var parts = keyStr.Split(':');
|
|
|
|
if (parts.Length < 6) continue;
|
|
|
|
var dest = parts[4];
|
|
|
|
try
|
|
{
|
|
var hashData = await _db.HashGetAllAsync(bucketKey);
|
|
|
|
if (hashData.Length == 0) continue;
|
|
|
|
var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count");
|
|
var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs");
|
|
var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs");
|
|
var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs");
|
|
|
|
if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue;
|
|
|
|
var count = long.TryParse(countVal.Value.ToString(), out var countParsed) ? countParsed : 0;
|
|
var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMsParsed) ? totalMsParsed : 0;
|
|
var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMsParsed) ? maxMsParsed : 0;
|
|
var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMsParsed) ? minMsParsed : 0;
|
|
|
|
if (count == 0) continue;
|
|
|
|
var avgDuration = totalMs / count;
|
|
|
|
aggrRecords.Add(new StatsAggregatedModel
|
|
{
|
|
Destination = dest,
|
|
Hour = dayStart.Date,
|
|
RequestCount = count,
|
|
AvgDuration = avgDuration,
|
|
MinDuration = minMs,
|
|
MaxDuration = maxMs,
|
|
NoReply = 0
|
|
});
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error processing daily bucket key {key}", keyStr);
|
|
}
|
|
}
|
|
|
|
if (aggrRecords.Count > 0)
|
|
{
|
|
await using var scope = _scopeFactory.CreateAsyncScope();
|
|
var aggrService = scope.ServiceProvider.GetRequiredService<IStatsAggrService>();
|
|
await aggrService.UpsertManyAsync(aggrRecords, false);
|
|
|
|
Log.Info("[LIVE] Flushed {count} current daily aggregated records from Redis", aggrRecords.Count);
|
|
|
|
var sampleCount = Math.Min(10, aggrRecords.Count);
|
|
for (int i = 0; i < sampleCount; i++)
|
|
{
|
|
var r = aggrRecords[i];
|
|
Log.Info("[LIVE SAMPLE] Env={env}, Date={date}, Count={count}, Avg={avg:F2}ms",
|
|
r.Destination, r.Hour.ToString("yyyy-MM-dd"), r.RequestCount, r.AvgDuration);
|
|
}
|
|
|
|
await DeleteCurrentDailyBucketKeysAsync(matchingKeys);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error flushing current daily data from Redis");
|
|
}
|
|
}
|
|
|
|
private async Task CleanExpiredRedisKeysAsync(DateTime hourStart, DateTime dayStart)
|
|
{
|
|
try
|
|
{
|
|
var lastHour = hourStart.AddHours(-1);
|
|
var yesterday = dayStart.AddDays(-1);
|
|
var deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
|
|
|
|
if (!deleteConfirmed)
|
|
{
|
|
Log.Debug("Auto-delete of expired Redis keys disabled, skipping cleanup");
|
|
return;
|
|
}
|
|
|
|
var previewedKeys = new List<string>();
|
|
var deletedCount = 0;
|
|
|
|
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
|
|
// Get all unique methods from current snapshot and Redis indices
|
|
var methods = await GetAllMethodsAsync();
|
|
|
|
foreach (var method in methods)
|
|
{
|
|
var hoursIndexKey = $"{baseKey}:stats:hours:{method}";
|
|
var daysIndexKey = $"{baseKey}:stats:days:{method}";
|
|
|
|
if (!_db.KeyExists(hoursIndexKey) && !_db.KeyExists(daysIndexKey)) continue;
|
|
|
|
// Preview expired hourly keys
|
|
if (_db.KeyExists(hoursIndexKey))
|
|
{
|
|
var expiredHourKeys = await GetExpiredHourKeysAsync(method, lastHour);
|
|
previewedKeys.AddRange(expiredHourKeys);
|
|
|
|
Log.Info("[PREVIEW] Would delete {count} expired hourly buckets for method {method}",
|
|
expiredHourKeys.Count, method);
|
|
}
|
|
|
|
// Preview expired daily keys
|
|
if (_db.KeyExists(daysIndexKey))
|
|
{
|
|
var expiredDayKeys = await GetExpiredDayKeysAsync(method, yesterday);
|
|
previewedKeys.AddRange(expiredDayKeys);
|
|
|
|
Log.Info("[PREVIEW] Would delete {count} expired daily buckets for method {method}",
|
|
expiredDayKeys.Count, method);
|
|
}
|
|
}
|
|
|
|
// Delete keys if any found
|
|
if (previewedKeys.Count > 0)
|
|
{
|
|
var uniqueKeys = previewedKeys.Distinct().ToList();
|
|
deletedCount = await DeleteKeysAsync(uniqueKeys);
|
|
Log.Info("Deleted {count} expired Redis keys", deletedCount);
|
|
}
|
|
else
|
|
{
|
|
Log.Debug("No expired Redis keys to delete");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error cleaning expired Redis keys");
|
|
}
|
|
}
|
|
|
|
private async Task<List<string>> GetAllMethodsAsync()
|
|
{
|
|
var methods = new HashSet<string>();
|
|
var snapshot = _stats.Snapshot();
|
|
|
|
foreach (var method in snapshot.Keys)
|
|
{
|
|
methods.Add(method);
|
|
}
|
|
|
|
// Try to get more methods from Redis indices if needed
|
|
return methods.ToList();
|
|
}
|
|
|
|
private async Task<List<string>> GetExpiredHourKeysAsync(string method, DateTime cutoff)
|
|
{
|
|
var keys = new List<string>();
|
|
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
var hoursIndexKey = $"{baseKey}:stats:hours:{method}";
|
|
|
|
if (!_db.KeyExists(hoursIndexKey)) return keys;
|
|
|
|
try
|
|
{
|
|
var allKeys = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1);
|
|
|
|
foreach (var key in allKeys)
|
|
{
|
|
if (!key.HasValue) continue;
|
|
|
|
var parts = key.ToString().Split(':');
|
|
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMddHH",
|
|
CultureInfo.InvariantCulture, DateTimeStyles.None, out var hourFromKey))
|
|
{
|
|
if (hourFromKey < cutoff)
|
|
{
|
|
keys.Add(key.ToString());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error reading expired hourly keys for method {method}", method);
|
|
}
|
|
|
|
return keys;
|
|
}
|
|
|
|
private async Task<List<string>> GetExpiredDayKeysAsync(string method, DateTime cutoff)
|
|
{
|
|
var keys = new List<string>();
|
|
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
|
|
var daysIndexKey = $"{baseKey}:stats:days:{method}";
|
|
|
|
if (!_db.KeyExists(daysIndexKey)) return keys;
|
|
|
|
try
|
|
{
|
|
var allKeys = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1);
|
|
|
|
foreach (var key in allKeys)
|
|
{
|
|
if (!key.HasValue) continue;
|
|
|
|
var parts = key.ToString().Split(':');
|
|
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMdd",
|
|
CultureInfo.InvariantCulture, DateTimeStyles.None, out var dayFromKey))
|
|
{
|
|
if (dayFromKey.Date < cutoff.Date)
|
|
{
|
|
keys.Add(key.ToString());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Error(ex, "Error reading expired daily keys for method {method}", method);
|
|
}
|
|
|
|
return keys;
|
|
}
|
|
|
|
private async Task<int> DeleteKeysAsync(List<string> keys)
|
|
{
|
|
if (keys.Count == 0) return 0;
|
|
|
|
var successCount = 0;
|
|
var batch = _db.CreateBatch();
|
|
var tasks = new List<Task<bool>>();
|
|
|
|
foreach (var key in keys)
|
|
{
|
|
tasks.Add(batch.KeyDeleteAsync(key));
|
|
}
|
|
|
|
batch.Execute();
|
|
var results = await Task.WhenAll(tasks);
|
|
|
|
successCount = results.Count(r => r == true);
|
|
return successCount;
|
|
}
|
|
}
|
|
}
|