Files
mapo-core/MP.IOC/Services/MetricsDbFlushService.cs
T
2026-04-08 11:29:45 +02:00

800 lines
33 KiB
C#

using MP.Data.DbModels.Utils;
using MP.Data.Services.Utils;
using NLog;
using StackExchange.Redis;
using System.Globalization;
namespace MP.IOC.Services
{
public class MetricsDbFlushService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly RouteStatsManager _stats;
private readonly IConfiguration _config;
private readonly IDatabase _db;
private readonly IConnectionMultiplexer _mux;
private static readonly Logger Log = LogManager.GetCurrentClassLogger();
public MetricsDbFlushService(
RouteStatsManager stats,
IServiceScopeFactory scopeFactory,
IConfiguration config,
IConnectionMultiplexer mux)
{
_stats = stats;
_scopeFactory = scopeFactory;
_config = config;
_mux = mux;
_db = mux.GetDatabase();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var interval = _config.GetValue<int>("RouteMan:MetricFlushIntervalSeconds", 120);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
await FlushLiveMetricsAsync();
}
catch (TaskCanceledException)
{
break;
}
catch (Exception ex)
{
Log.Error(ex, "Error flushing metrics to database");
}
}
}
public async Task RecoverHistoricalMetricsAsync()
{
try
{
var deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
Log.Info("Starting historical metrics recovery from Redis...");
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
var yesterday = DateTime.Now.AddDays(-1).Date;
await RecoverHistoricalDailyAsync(baseKey, yesterday, deleteConfirmed);
await RecoverHistoricalHourlyAsync(baseKey, yesterday, deleteConfirmed);
Log.Info("Historical metrics recovery completed");
}
catch (Exception ex)
{
Log.Error(ex, "Error recovering historical metrics from Redis");
}
}
private async Task RecoverHistoricalHourlyAsync(string baseKey, DateTime cutoff, bool deleteConfirmed)
{
try
{
var detailRecordsToInsert = new List<StatsDetailModel>();
var allHourIndexKeys = new List<string>();
// Get all hours index keys by pattern matching on existing indices
var dests = await GetAllDestinationsAsync(baseKey);
foreach (var dest in dests)
{
var hourIndexKey = $"{baseKey}:stats:hours:{dest}:*";
try
{
var allKeys = await _db.KeysAsync(hourIndexKey, 0, -1);
foreach (var key in allKeys)
{
if (!key.IsNullOrEmpty)
{
allHourIndexKeys.Add(key.ToString());
// Read bucket keys from sorted set index
var bucketKeys = await _db.SortedSetRangeByRankAsync(key, 0, -1);
foreach (var bucketKeyRedis in bucketKeys)
{
if (bucketKeyRedis.IsNullOrEmpty) continue;
var bucketStr = bucketKeyRedis.ToString();
var parts = bucketStr.Split(':');
if (parts.Length < 7) continue;
// Parse date from key: {base}:stats:hour:{dest}:{method}:{yyyyMMddHH}
var dayHourPart = string.Join(":", parts.Skip(6));
if (!DateTime.TryParseExact(dayHourPart, "yyyyMMddHH",
CultureInfo.InvariantCulture, DateTimeStyles.None, out var hourFromKey))
continue;
if (hourFromKey > cutoff) continue;
// Read hash data from bucket
var hashData = await _db.HashGetAllAsync(bucketKeyRedis);
if (hashData.Length == 0) continue;
var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count");
var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs");
var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs");
var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs");
if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue;
var count = long.TryParse(countVal.Value.ToString(), out var cp) ? cp : 0;
var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var tp) ? tp : 0;
var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var mp) ? mp : 0;
var minMs = double.TryParse(minMsVal.Value.ToString(), out var mip) ? mip : 0;
if (count == 0) continue;
var avgDuration = totalMs / count;
detailRecordsToInsert.Add(new StatsDetailModel
{
Destination = dest,
Type = parts[5],
Hour = hourFromKey.Date.AddHours(hourFromKey.Hour),
RequestCount = count,
AvgDuration = avgDuration,
MinDuration = minMs,
MaxDuration = maxMs,
NoReply = 0
});
}
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error reading hourly index for dest {dest}", dest);
}
}
if (detailRecordsToInsert.Count > 0)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var detailService = scope.ServiceProvider.GetRequiredService<IStatsDetailService>();
await detailService.UpsertManyAsync(detailRecordsToInsert, false);
Log.Info("[HISTORICAL] Recovered {count} hourly detail records from Redis", detailRecordsToInsert.Count);
var sampleCount = Math.Min(5, detailRecordsToInsert.Count);
for (int i = 0; i < sampleCount; i++)
{
var r = detailRecordsToInsert[i];
Log.Info("[HISTORICAL SAMPLE] Env={env}, Type={type}, Hour={hour}, Count={count}, Avg={avg:F2}ms",
r.Destination, r.Type, r.Hour.ToString("yyyy-MM-dd HH:mm"), r.RequestCount, r.AvgDuration);
}
if (deleteConfirmed)
{
await DeleteExpiredHourlyKeysAsync(allHourIndexKeys);
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error recovering historical hourly data from Redis");
}
}
private async Task RecoverHistoricalDailyAsync(string baseKey, DateTime cutoff, bool deleteConfirmed)
{
try
{
var cursor = 0L;
var pattern = $"{baseKey}:stats:days:*";
var allDayIndices = new List<RedisKey>();
do
{
var result = await _db.KeyScanAsync(pattern, cursor);
allDayIndices.AddRange(result.Keys.Where(k => k != null));
cursor = result.Cursor;
} while (cursor > 0 && allDayIndices.Count < 10000);
// Group hourly buckets by destination and day to compute proper min/max
var dailyStatsByDest = new Dictionary<(string dest, DateTime day), (long totalCount, double totalMs, double minMs, double maxMs)>();
foreach (var daysIndexKey in allDayIndices)
{
var keyStr = daysIndexKey.ToString();
var parts = keyStr.Split(':');
if (parts.Length < 5) continue;
var dest = parts[4];
var dayPart = string.Join(":", parts.Skip(5).TakeWhile(p => DateTime.TryParseExact(p, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out _)));
if (!DateTime.TryParseExact(dayPart, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dayFromKey))
continue;
if (dayFromKey.Date > cutoff) continue;
var allBuckets = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1);
foreach (var bucketKey in allBuckets)
{
if (!bucketKey.HasValue) continue;
var hashData = await _db.HashGetAllAsync(bucketKey);
if (hashData.Length == 0) continue;
var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count");
var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs");
var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs");
var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs");
if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue;
var count = long.TryParse(countVal.Value.ToString(), out var countParsed) ? countParsed : 0;
var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMsParsed) ? totalMsParsed : 0;
var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMsParsed) ? maxMsParsed : 0;
var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMsParsed) ? minMsParsed : 0;
if (count == 0) continue;
var key = (dest, dayFromKey.Date);
if (!dailyStatsByDest.ContainsKey(key))
{
dailyStatsByDest[key] = (0, 0, double.MaxValue, double.MinValue);
}
var current = dailyStatsByDest[key];
dailyStatsByDest[key] = (
totalCount: current.totalCount + count,
totalMs: current.totalMs + totalMs,
minMs: Math.Min(current.minMs, minMs),
maxMs: Math.Max(current.maxMs, maxMs)
);
}
}
var aggrRecordsToInsert = new List<StatsAggregatedModel>();
foreach (var kvp in dailyStatsByDest)
{
var key = kvp.Key;
var stats = kvp.Value;
if (stats.totalCount == 0) continue;
var avgDuration = stats.totalMs / stats.totalCount;
// Handle edge cases where no valid min/max found
var finalMin = stats.minMs == double.MaxValue ? 0 : stats.minMs;
var finalMax = stats.maxMs == double.MinValue ? 0 : stats.maxMs;
aggrRecordsToInsert.Add(new StatsAggregatedModel
{
Destination = key.dest,
Hour = key.day,
RequestCount = stats.totalCount,
AvgDuration = avgDuration,
MinDuration = finalMin,
MaxDuration = finalMax,
NoReply = 0
});
}
if (aggrRecordsToInsert.Count > 0)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var aggrService = scope.ServiceProvider.GetRequiredService<IStatsAggrService>();
await aggrService.UpsertManyAsync(aggrRecordsToInsert, false);
Log.Info("[HISTORICAL] Recovered {count} daily aggregated records from Redis", aggrRecordsToInsert.Count);
var sampleCount = Math.Min(5, aggrRecordsToInsert.Count);
for (int i = 0; i < sampleCount; i++)
{
var r = aggrRecordsToInsert[i];
Log.Info("[HISTORICAL SAMPLE] Env={env}, Date={date}, Count={count}, Avg={avg:F2}ms",
r.Destination, r.Hour.ToString("yyyy-MM-dd"), r.RequestCount, r.AvgDuration);
}
if (deleteConfirmed)
{
await DeleteExpiredDailyKeysAsync(allDayIndices);
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error recovering historical daily data from Redis");
}
}
private async Task DeleteExpiredHourlyKeysAsync(List<RedisKey> allHourIndices)
{
try
{
var keysToDelete = new List<string>();
foreach (var hoursIndexKey in allHourIndices)
{
var allBuckets = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1);
keysToDelete.AddRange(allBuckets.Select(k => k.ToString()).Where(s => !string.IsNullOrEmpty(s)));
}
if (keysToDelete.Count > 0)
{
var uniqueKeys = keysToDelete.Distinct().ToList();
var deletedCount = await DeleteKeysAsync(uniqueKeys);
Log.Info("[DELETE] Removed {count} expired hourly bucket keys from Redis", deletedCount);
}
}
catch (Exception ex)
{
Log.Error(ex, "Error deleting expired hourly keys from Redis");
}
}
private async Task DeleteExpiredDailyKeysAsync(List<RedisKey> allDayIndices)
{
try
{
var keysToDelete = new List<string>();
foreach (var daysIndexKey in allDayIndices)
{
var allBuckets = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1);
keysToDelete.AddRange(allBuckets.Select(k => k.ToString()).Where(s => !string.IsNullOrEmpty(s)));
}
if (keysToDelete.Count > 0)
{
var uniqueKeys = keysToDelete.Distinct().ToList();
var deletedCount = await DeleteKeysAsync(uniqueKeys);
Log.Info("[DELETE] Removed {count} expired daily bucket keys from Redis", deletedCount);
}
}
catch (Exception ex)
{
Log.Error(ex, "Error deleting expired daily keys from Redis");
}
}
public async Task FlushLiveMetricsAsync()
{
try
{
var utcNow = DateTime.Now;
var hourStart = new DateTime(utcNow.Year, utcNow.Month, utcNow.Day, utcNow.Hour, 0, 0);
await FlushCurrentHourlyDataAsync(utcNow, hourStart);
}
catch (Exception ex)
{
Log.Error(ex, "Error flushing live metrics from Redis");
}
}
private async Task FlushCurrentHourlyDataAsync(DateTime utcNow, DateTime hourStart)
{
try
{
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
var cursor = 0L;
var pattern = $"{baseKey}:stats:hours:*:{hourStart.ToString("yyyyMMddHH")}";
var matchingKeys = new List<RedisKey>();
do
{
var result = await _db.KeyScanAsync(pattern, cursor);
matchingKeys.AddRange(result.Keys.Where(k => k != null));
cursor = result.Cursor;
} while (cursor > 0 && matchingKeys.Count < 10000);
if (matchingKeys.Count == 0)
{
Log.Debug("No current hourly buckets found in Redis for flush");
return;
}
var detailRecords = new List<StatsDetailModel>();
foreach (var bucketKey in matchingKeys)
{
var keyStr = bucketKey.ToString();
var parts = keyStr.Split(':');
if (parts.Length < 7) continue;
var dest = parts[4];
var method = parts[5];
try
{
var hashData = await _db.HashGetAllAsync(bucketKey);
if (hashData.Length == 0) continue;
var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count");
var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs");
var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs");
var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs");
if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue;
var count = long.TryParse(countVal.Value.ToString(), out var countParsed) ? countParsed : 0;
var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMsParsed) ? totalMsParsed : 0;
var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMsParsed) ? maxMsParsed : 0;
var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMsParsed) ? minMsParsed : 0;
if (count == 0) continue;
var avgDuration = totalMs / count;
detailRecords.Add(new StatsDetailModel
{
Destination = dest,
Type = method,
Hour = hourStart,
RequestCount = count,
AvgDuration = avgDuration,
MinDuration = minMs,
MaxDuration = maxMs,
NoReply = 0
});
}
catch (Exception ex)
{
Log.Error(ex, "Error processing hourly bucket key {key}", keyStr);
}
}
if (detailRecords.Count > 0)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var detailService = scope.ServiceProvider.GetRequiredService<IStatsDetailService>();
await detailService.UpsertManyAsync(detailRecords, false);
Log.Info("[LIVE] Flushed {count} current hourly detail records from Redis", detailRecords.Count);
var sampleCount = Math.Min(10, detailRecords.Count);
for (int i = 0; i < sampleCount; i++)
{
var r = detailRecords[i];
Log.Info("[LIVE SAMPLE] Env={env}, Type={type}, Hour={hour}, Count={count}, Avg={avg:F2}ms, Min={min:F2}ms, Max={max:F2}ms",
r.Destination, r.Type, r.Hour.ToString("yyyy-MM-dd HH:mm"), r.RequestCount, r.AvgDuration, r.MinDuration, r.MaxDuration);
}
await DeleteCurrentHourlyBucketKeysAsync(matchingKeys);
}
}
catch (Exception ex)
{
Log.Error(ex, "Error flushing current hourly data from Redis");
}
}
private async Task FlushCurrentDailyDataAsync(DateTime utcNow, DateTime dayStart)
{
try
{
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
var cursor = 0L;
var pattern = $"{baseKey}:stats:days:*:{dayStart.ToString("yyyyMMdd")}";
var matchingKeys = new List<RedisKey>();
do
{
var result = await _db.KeyScanAsync(pattern, cursor);
matchingKeys.AddRange(result.Keys.Where(k => k != null));
cursor = result.Cursor;
} while (cursor > 0 && matchingKeys.Count < 10000);
if (matchingKeys.Count == 0)
{
Log.Debug("No current daily buckets found in Redis for flush");
return;
}
var aggrRecords = new List<StatsAggregatedModel>();
foreach (var bucketKey in matchingKeys)
{
var keyStr = bucketKey.ToString();
var parts = keyStr.Split(':');
if (parts.Length < 6) continue;
var dest = parts[4];
try
{
var hashData = await _db.HashGetAllAsync(bucketKey);
if (hashData.Length == 0) continue;
var countVal = hashData.FirstOrDefault(f => f.Name.ToString() == "count");
var totalMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "totalMs");
var maxMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "maxMs");
var minMsVal = hashData.FirstOrDefault(f => f.Name.ToString() == "minMs");
if (countVal.Value.IsNullOrEmpty || totalMsVal.Value.IsNullOrEmpty) continue;
var count = long.TryParse(countVal.Value.ToString(), out var countParsed) ? countParsed : 0;
var totalMs = double.TryParse(totalMsVal.Value.ToString(), out var totalMsParsed) ? totalMsParsed : 0;
var maxMs = double.TryParse(maxMsVal.Value.ToString(), out var maxMsParsed) ? maxMsParsed : 0;
var minMs = double.TryParse(minMsVal.Value.ToString(), out var minMsParsed) ? minMsParsed : 0;
if (count == 0) continue;
var avgDuration = totalMs / count;
aggrRecords.Add(new StatsAggregatedModel
{
Destination = dest,
Hour = dayStart.Date,
RequestCount = count,
AvgDuration = avgDuration,
MinDuration = minMs,
MaxDuration = maxMs,
NoReply = 0
});
}
catch (Exception ex)
{
Log.Error(ex, "Error processing daily bucket key {key}", keyStr);
}
}
if (aggrRecords.Count > 0)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var aggrService = scope.ServiceProvider.GetRequiredService<IStatsAggrService>();
await aggrService.UpsertManyAsync(aggrRecords, false);
Log.Info("[LIVE] Flushed {count} current daily aggregated records from Redis", aggrRecords.Count);
var sampleCount = Math.Min(10, aggrRecords.Count);
for (int i = 0; i < sampleCount; i++)
{
var r = aggrRecords[i];
Log.Info("[LIVE SAMPLE] Env={env}, Date={date}, Count={count}, Avg={avg:F2}ms",
r.Destination, r.Hour.ToString("yyyy-MM-dd"), r.RequestCount, r.AvgDuration);
}
await DeleteCurrentDailyBucketKeysAsync(matchingKeys);
}
}
catch (Exception ex)
{
Log.Error(ex, "Error flushing current daily data from Redis");
}
}
private async Task CleanExpiredRedisKeysAsync(DateTime hourStart, DateTime dayStart)
{
try
{
var lastHour = hourStart.AddHours(-1);
var yesterday = dayStart.AddDays(-1);
var deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
if (!deleteConfirmed)
{
Log.Debug("Auto-delete of expired Redis keys disabled, skipping cleanup");
return;
}
var previewedKeys = new List<string>();
var deletedCount = 0;
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
// Get all unique methods from current snapshot and Redis indices
var methods = await GetAllMethodsAsync();
foreach (var method in methods)
{
var hoursIndexKey = $"{baseKey}:stats:hours:{method}";
var daysIndexKey = $"{baseKey}:stats:days:{method}";
if (!_db.KeyExists(hoursIndexKey) && !_db.KeyExists(daysIndexKey)) continue;
// Preview expired hourly keys
if (_db.KeyExists(hoursIndexKey))
{
var expiredHourKeys = await GetExpiredHourKeysAsync(method, lastHour);
previewedKeys.AddRange(expiredHourKeys);
Log.Info("[PREVIEW] Would delete {count} expired hourly buckets for method {method}",
expiredHourKeys.Count, method);
}
// Preview expired daily keys
if (_db.KeyExists(daysIndexKey))
{
var expiredDayKeys = await GetExpiredDayKeysAsync(method, yesterday);
previewedKeys.AddRange(expiredDayKeys);
Log.Info("[PREVIEW] Would delete {count} expired daily buckets for method {method}",
expiredDayKeys.Count, method);
}
}
// Delete keys if any found
if (previewedKeys.Count > 0)
{
var uniqueKeys = previewedKeys.Distinct().ToList();
deletedCount = await DeleteKeysAsync(uniqueKeys);
Log.Info("Deleted {count} expired Redis keys", deletedCount);
}
else
{
Log.Debug("No expired Redis keys to delete");
}
}
catch (Exception ex)
{
Log.Error(ex, "Error cleaning expired Redis keys");
}
}
private async Task<List<string>> GetAllMethodsAsync()
{
var methods = new HashSet<string>();
var snapshot = _stats.Snapshot();
foreach (var method in snapshot.Keys)
{
methods.Add(method);
}
// Try to get more methods from Redis indices if needed
return methods.ToList();
}
/// <summary>
/// Recupera tutte le chiavi dati gli indici
/// </summary>
/// <param name="baseKey"></param>
/// <param name="limit"></param>
/// <returns></returns>
public async Task<List<RedisKey>> GetDayIndicesAsync(string baseKey, int limit = 10000)
{
var allDayIndices = new List<RedisKey>();
var pattern = $"{baseKey}:stats:days:*"; // Assicurati che il pattern sia corretto
// 1. Otteniamo tutti gli endpoint (in caso di cluster o replica, potrebbero essere pi di uno)
var endpoints = _mux.GetEndPoints();
foreach (var endpoint in endpoints)
{
// 2. Otteniamo l'oggetto Server per quell'endpoint
var server = _mux.GetServer(endpoint);
// 3. Usiamo KeysAsync che restituisce un IAsyncEnumerable<RedisKey>
// Questo metodo gestisce internamente il comando SCAN e il cursore di Redis!
try
{
// L'iterazione asincrona e molto efficiente
await foreach (var key in server.KeysAsync(pattern: pattern))
{
//if (key != null)
//{
//}
allDayIndices.Add(key);
// Applichiamo il limite di sicurezza che avevi impostato
if (allDayIndices.Count >= limit)
break;
}
}
catch (RedisException ex)
{
Log.Error(ex, $"Errore durante la scansione sull'endpoint {endpoint}");
}
}
return allDayIndices;
}
private async Task<List<string>> GetExpiredHourKeysAsync(string method, DateTime cutoff)
{
var keys = new List<string>();
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
var hoursIndexKey = $"{baseKey}:stats:hours:{method}";
if (!_db.KeyExists(hoursIndexKey)) return keys;
try
{
var allKeys = await _db.SortedSetRangeByRankAsync(hoursIndexKey, 0, -1);
foreach (var key in allKeys)
{
if (!key.HasValue) continue;
var parts = key.ToString().Split(':');
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMddHH",
CultureInfo.InvariantCulture, DateTimeStyles.None, out var hourFromKey))
{
if (hourFromKey < cutoff)
{
keys.Add(key.ToString());
}
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error reading expired hourly keys for method {method}", method);
}
return keys;
}
private async Task<List<string>> GetExpiredDayKeysAsync(string method, DateTime cutoff)
{
var keys = new List<string>();
var baseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "MP_IOC";
var daysIndexKey = $"{baseKey}:stats:days:{method}";
if (!_db.KeyExists(daysIndexKey)) return keys;
try
{
var allKeys = await _db.SortedSetRangeByRankAsync(daysIndexKey, 0, -1);
foreach (var key in allKeys)
{
if (!key.HasValue) continue;
var parts = key.ToString().Split(':');
if (parts.Length >= 4 && DateTime.TryParseExact(parts[3], "yyyyMMdd",
CultureInfo.InvariantCulture, DateTimeStyles.None, out var dayFromKey))
{
if (dayFromKey.Date < cutoff.Date)
{
keys.Add(key.ToString());
}
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error reading expired daily keys for method {method}", method);
}
return keys;
}
private async Task<int> DeleteKeysAsync(List<string> keys)
{
if (keys.Count == 0) return 0;
var successCount = 0;
var batch = _db.CreateBatch();
var tasks = new List<Task<bool>>();
foreach (var key in keys)
{
tasks.Add(batch.KeyDeleteAsync(key));
}
batch.Execute();
var results = await Task.WhenAll(tasks);
successCount = results.Count(r => r == true);
return successCount;
}
}
}