Update flush x scrittura DB Daily (da verificare...)
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
<body>
|
||||
<i>Modulo MP-IOC </i>
|
||||
<h4>Versione: 6.16.2604.813</h4>
|
||||
<h4>Versione: 6.16.2604.817</h4>
|
||||
<br /> Note di rilascio:
|
||||
<ul>
|
||||
<li>
|
||||
|
||||
@@ -1 +1 @@
|
||||
6.16.2604.813
|
||||
6.16.2604.817
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<item>
|
||||
<version>6.16.2604.813</version>
|
||||
<version>6.16.2604.817</version>
|
||||
<url>https://nexus.steamware.net/repository/SWS/MP-IOC/stable/LAST/MP.IOC.zip</url>
|
||||
<changelog>https://nexus.steamware.net/repository/SWS/MP-IOC/stable/LAST/ChangeLog.html</changelog>
|
||||
<mandatory>false</mandatory>
|
||||
|
||||
@@ -34,7 +34,8 @@ namespace MP.IOC.Services
|
||||
{
|
||||
try
|
||||
{
|
||||
await FlushLiveMetricsAsync();
|
||||
await ProcessDayLiveMetricsAsync();
|
||||
await ProcessHourLiveMetricsAsync();
|
||||
await Task.Delay(TimeSpan.FromSeconds(interval), stoppingToken);
|
||||
}
|
||||
catch (TaskCanceledException) { break; }
|
||||
@@ -66,7 +67,131 @@ namespace MP.IOC.Services
|
||||
|
||||
#region Private Methods
|
||||
|
||||
private async Task FlushLiveMetricsAsync()
|
||||
/// <summary>
|
||||
/// Processing dati giornalieri (da Redis a DB)
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
private async Task ProcessDayLiveMetricsAsync()
|
||||
{
|
||||
var aggrRecordsToInsert = new List<StatsAggregatedModel>();
|
||||
var keysToDelete = new List<RedisKey>();
|
||||
|
||||
bool deleteConfirmed = _config.GetValue<bool>("RouteMan:DeleteExpiredMetrics", false);
|
||||
DateTime now = DateTime.Now;
|
||||
|
||||
// Confini temporali per proteggere i dati in corso
|
||||
DateTime currentDayStart = new DateTime(now.Year, now.Month, now.Day, 0, 0, 0);
|
||||
|
||||
var endpoints = _mux.GetEndPoints();
|
||||
|
||||
foreach (var endpoint in endpoints)
|
||||
{
|
||||
var server = _mux.GetServer(endpoint);
|
||||
|
||||
if (server.IsReplica)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
string[] patternsToScan = {
|
||||
$"{_redisBaseKey}:stats:days:*"
|
||||
};
|
||||
|
||||
foreach (var pattern in patternsToScan)
|
||||
{
|
||||
// Nota: KeyScanAsync/KeysAsync e' disponibile su IServer
|
||||
await foreach (var indexKey in server.KeysAsync(pattern: pattern))
|
||||
{
|
||||
if (string.IsNullOrEmpty($"{indexKey}")) continue;
|
||||
|
||||
// CORREZIONE: Utilizzo di SortedSetRangeByRankAsync con range 0 a -1 per prendere tutto il set
|
||||
var memberKeys = await _db.SortedSetRangeByRankAsync(indexKey, 0, -1);
|
||||
|
||||
foreach (var statKey in memberKeys)
|
||||
{
|
||||
var sKey = (RedisKey)$"{statKey}";
|
||||
if (!TryParseKeyMetadata(sKey, out string dest, out string method, out DateTime timestamp, out bool isHourType))
|
||||
continue;
|
||||
|
||||
// Verifica se la chiave fosse scaduta rispetto all'orario corrente
|
||||
bool isExpired = isHourType
|
||||
//? timestamp < currentHourStart
|
||||
? false
|
||||
: timestamp < currentDayStart;
|
||||
|
||||
// Se fosse scaduta e abbiamo il permesso, segnamola per la cancellazione
|
||||
if (isExpired && deleteConfirmed)
|
||||
{
|
||||
keysToDelete.Add(sKey);
|
||||
}
|
||||
|
||||
// Recupero dati dalla Hash
|
||||
var hashData = await _db.HashGetAllAsync(sKey);
|
||||
if (hashData.Length == 0) continue;
|
||||
|
||||
var dict = hashData.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
|
||||
|
||||
if (dict.TryGetValue("count", out var countStr) &&
|
||||
dict.TryGetValue("totalMs", out var totalMsStr))
|
||||
{
|
||||
long count = long.Parse(countStr);
|
||||
count = long.Parse(countStr);
|
||||
double totalMs = double.Parse(totalMsStr, CultureInfo.InvariantCulture);
|
||||
|
||||
double maxMs = dict.ContainsKey("maxMs") ? double.Parse(dict["maxMs"], CultureInfo.InvariantCulture) : 0;
|
||||
double minMs = dict.ContainsKey("minMs") ? double.Parse(dict["minMs"], CultureInfo.InvariantCulture) : 0;
|
||||
|
||||
// TUA CORREZIONE: Reset sentinella per evitare valori fuori scala nel DB
|
||||
if (minMs >= SentinelValue) minMs = SentinelValue;
|
||||
if (maxMs >= SentinelValue) maxMs = SentinelValue;
|
||||
|
||||
if (count <= 0) continue;
|
||||
|
||||
aggrRecordsToInsert.Add(new StatsAggregatedModel
|
||||
{
|
||||
Destination = dest,
|
||||
Hour = timestamp,
|
||||
RequestCount = count,
|
||||
AvgDuration = totalMs / count,
|
||||
MinDuration = minMs,
|
||||
MaxDuration = maxMs,
|
||||
NoReply = 0
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- FASE UPSERT DB ---
|
||||
if (aggrRecordsToInsert.Count > 0)
|
||||
{
|
||||
await using var scope = _scopeFactory.CreateAsyncScope();
|
||||
var aggrService = scope.ServiceProvider.GetRequiredService<IStatsAggrService>();
|
||||
await aggrService.UpsertManyAsync(aggrRecordsToInsert, true);
|
||||
Log.Info($"[DAY] Upserted {aggrRecordsToInsert.Count} records to DB");
|
||||
}
|
||||
|
||||
// --- FASE PULIZIA REDIS ---
|
||||
if (deleteConfirmed && keysToDelete.Count > 0)
|
||||
{
|
||||
var batch = _db.CreateBatch();
|
||||
int deletedCount = 0;
|
||||
foreach (var key in keysToDelete)
|
||||
{
|
||||
_ = batch.KeyDeleteAsync(key);
|
||||
deletedCount++;
|
||||
}
|
||||
batch.Execute();
|
||||
Log.Info($"[CLEANUP DAY] Deleted {deletedCount} expired metric keys from Redis");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Processing dati orari (da Redis a DB)
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
private async Task ProcessHourLiveMetricsAsync()
|
||||
{
|
||||
var detailRecordsToInsert = new List<StatsDetailModel>();
|
||||
var keysToDelete = new List<RedisKey>();
|
||||
@@ -90,8 +215,7 @@ namespace MP.IOC.Services
|
||||
}
|
||||
|
||||
string[] patternsToScan = {
|
||||
$"{_redisBaseKey}:stats:hours:*",
|
||||
$"{_redisBaseKey}:stats:days:*"
|
||||
$"{_redisBaseKey}:stats:hours:*"
|
||||
};
|
||||
|
||||
foreach (var pattern in patternsToScan)
|
||||
@@ -110,12 +234,13 @@ namespace MP.IOC.Services
|
||||
if (!TryParseKeyMetadata(sKey, out string dest, out string method, out DateTime timestamp, out bool isHourType))
|
||||
continue;
|
||||
|
||||
// Verifica se la chiave � scaduta rispetto all'orario corrente
|
||||
// Verifica se la chiave fosse scaduta rispetto all'orario corrente
|
||||
bool isExpired = isHourType
|
||||
? timestamp < currentHourStart
|
||||
: timestamp < currentDayStart;
|
||||
: false;
|
||||
//: timestamp < currentDayStart;
|
||||
|
||||
// Se � scaduta e abbiamo il permesso, segnamola per la cancellazione
|
||||
// Se fosse scaduta e abbiamo il permesso, segnamola per la cancellazione
|
||||
if (isExpired && deleteConfirmed)
|
||||
{
|
||||
keysToDelete.Add(sKey);
|
||||
@@ -166,7 +291,7 @@ namespace MP.IOC.Services
|
||||
await using var scope = _scopeFactory.CreateAsyncScope();
|
||||
var detailService = scope.ServiceProvider.GetRequiredService<IStatsDetailService>();
|
||||
await detailService.UpsertManyAsync(detailRecordsToInsert, true);
|
||||
Log.Info("[HISTORICAL] Upserted {count} records to DB", detailRecordsToInsert.Count);
|
||||
Log.Info($"[HOUR] Upserted {detailRecordsToInsert.Count} records to DB");
|
||||
}
|
||||
|
||||
// --- FASE PULIZIA REDIS ---
|
||||
@@ -180,7 +305,7 @@ namespace MP.IOC.Services
|
||||
deletedCount++;
|
||||
}
|
||||
batch.Execute();
|
||||
Log.Info("[CLEANUP] Deleted {count} expired metric keys from Redis", deletedCount);
|
||||
Log.Info($"[CLEANUP HOUR] Deleted {deletedCount} expired metric keys from Redis");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user