Files
Samuele Locatelli 932e6477d3 Update massivo CORE x gestione REDIS
- nuova gestioen flushPattern redis (solo master e async e con scan + furbo)
- update nuget
2025-09-12 11:27:32 +02:00

526 lines
19 KiB
C#

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using MP.TaskMan.Controllers;
using MP.TaskMan.Models;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NLog;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static MP.TaskMan.Objects.Enums;
namespace MP.TaskMan.Services
{
public class TaskService : BaseServ, IDisposable
{
#region Public Constructors
/// <summary>
/// Init servizio TAB
/// </summary>
/// <param name="configuration"></param>
public TaskService(IConfiguration configuration)
{
_configuration = configuration;
// setup componenti REDIS
redisConn = ConnectionMultiplexer.Connect(_configuration.GetConnectionString("Redis"));
redisDb = redisConn.GetDatabase();
// ConnString del DB x gestione task
string taskDbCS = _configuration.GetValue<string>("SpecialConf:TaskManConn");
ConnStr = _configuration.GetConnectionString(taskDbCS);
if (string.IsNullOrEmpty(ConnStr))
{
Log.Error("ConnString empty!");
}
else
{
StringBuilder sb = new StringBuilder();
MLController = new MpTaskController(configuration);
sb.AppendLine($"TaskService | MpTaskController OK");
Log.Info(sb.ToString());
// sistemo i parametri x redHas...
CodModulo = _configuration.GetValue<string>("SpecialConf:CodModulo");
var cstringArray = ConnStr.Split(";");
foreach (var item in cstringArray)
{
var cData = item.Trim().Split("=");
if (cData.Length == 2)
{
if (!connStrParams.ContainsKey(cData[0]))
{
connStrParams.Add(cData[0], cData[1]);
}
}
}
// sistemo
DataSource = connStrParams["Server"];
DataBase = connStrParams["Database"];
string rbc = _configuration.GetValue<string>("SpecialConf:RedisBaseConf");
if (string.IsNullOrEmpty(rbc))
{
rbc = "MP:TaskMan";
}
RedisBaseKey = $"{rbc}:{DataSource}:{DataBase}";
}
// conf rest call service
RCallService = new RestCallService(_configuration);
}
#endregion Public Constructors
#region Public Events
/// <summary>
/// Evento richiesta rilettura dati pagina (x refresh pagine aperte)
/// </summary>
public event EventHandler ReloadRequest = delegate { };
#endregion Public Events
#region Public Methods
public void Dispose()
{
// Clear database controller
MLController.Dispose();
// redis dispose
redisConn = null;
redisDb = null;
}
/// <summary>
/// Chiamata esecuzione di un singolo task programmato
/// </summary>
/// <param name="TaskRec">Task richiesto</param>
/// <param name="SchedNext">Se true rischedula successiva chiamata</param>
/// <returns></returns>
public async Task<TaskResultModel> ExecuteTask(TaskListModel TaskRec, bool SchedNext)
{
TaskResultModel answ = new TaskResultModel()
{
Task = $"TaskId: {TaskRec.TaskId} | {TaskRec.TType}",
ExecResult = -1,
TextResult = TaskRec.Enabled ? "Task Not Recognized" : "Task Not Enabled for Execution"
};
// in primis verifica che SIA abilitato...
if (TaskRec.Enabled)
{
// verifico tipo di task ed eseguo di conseguenza...
switch (TaskRec.TType)
{
//case Task2ExeType.ND:
// break;
//case Task2ExeType.Exe:
// break;
//case Task2ExeType.SqlCommand:
// break;
case Task2ExeType.SqlStored:
answ = MLController.ExecuteSqlTask(TaskRec.TaskId, SchedNext);
break;
case Task2ExeType.RestCallGet:
DateTime dtStart = DateTime.Now;
// in primis testo la chiamata al servizio Health
string rAnsw = await RCallService.CheckServer();
// se ok effettuo vera chiamata...
if (rAnsw.ToUpper() == "OK")
{
var callResp = await RCallService.CallRestGet(TaskRec.Command, TaskRec.Args);
DateTime dtEnd = DateTime.Now;
string formattedJson = JValue.Parse(callResp.Content).ToString(Formatting.Indented);
TaskExecModel tExeMod = new TaskExecModel()
{
DtEnd = dtEnd,
DtStart = dtStart,
IsError = callResp.StatusCode != System.Net.HttpStatusCode.OK,
TaskId = TaskRec.TaskId,
// deserializzazione come json indentato?!?
Result = formattedJson// $"{callResp.Content}".Replace("\"", ""),
};
// salvo su DB
answ = MLController.TaskExecSaveExecuted(TaskRec.TaskId, SchedNext, tExeMod);
}
break;
default:
break;
}
// svuoto cache!
await FlushCacheAsync();
}
return answ;
}
/// <summary>
/// Pulizia cache Redis (tutta)
/// </summary>
/// <returns></returns>
public bool FlushCache()
{
RedisValue pattern = new RedisValue($"{RedisBaseKey}:*");
bool answ = ExecFlushRedisPattern(pattern);
return answ;
}
/// <summary>
/// Pulizia cache Redis per chiave specifica (da RedisBaseKey...)
/// </summary>
/// <returns></returns>
public bool FlushCache(string KeyReq)
{
RedisValue pattern = new RedisValue($"{RedisBaseKey}:{KeyReq}:*");
bool answ = ExecFlushRedisPattern(pattern);
return answ;
}
/// <summary>
/// Pulizia cache Redis (tutta) modalità Async
/// </summary>
/// <returns></returns>
public async Task<bool> FlushCacheAsync()
{
RedisValue pattern = new RedisValue($"{RedisBaseKey}:*");
bool answ = await ExecFlushRedisPatternAsync(pattern);
return answ;
}
/// <summary>
/// Pulizia cache Redis per chiave specifica (da RedisBaseKey...) modalità Async
/// </summary>
/// <returns></returns>
public async Task<bool> FlushCacheAsync(string KeyReq)
{
RedisValue pattern = new RedisValue($"{RedisBaseKey}:{KeyReq}:*");
bool answ = await ExecFlushRedisPatternAsync(pattern);
return answ;
}
/// <summary>
/// Invio notifica rilettura (con parametro)
/// </summary>
/// <param name="message"></param>
public void NotifyReloadRequest(string message)
{
if (ReloadRequest != null)
{
// messaggio
ReloadEventArgs rea = new ReloadEventArgs(message);
ReloadRequest.Invoke(this, rea);
}
}
public void rollBackEdit(object item)
{
MLController.RollBackEntity(item);
}
/// <summary>
/// Ricerca task dato tipo + num max (desc)
/// </summary>
/// <param name="TaskId">TaskId da cui deriva</param>
/// <returns></returns>
public async Task<List<TaskExecModel>> TaskExecGetFilt(int TaskId, int maxRec, string searchVal)
{
// setup parametri costanti
string source = "DB";
Stopwatch sw = new Stopwatch();
sw.Start();
List<TaskExecModel>? result = new List<TaskExecModel>();
// cerco in redis...
DateTime adesso = DateTime.Now;
string currKey = $"{RedisBaseKey}:ExecList:{TaskId}:{adesso:yyMMdd}:{adesso:HHmm}:{maxRec}";
RedisValue rawData = await redisDb.StringGetAsync(currKey);
if (rawData.HasValue)
{
result = JsonConvert.DeserializeObject<List<TaskExecModel>>($"{rawData}");
source = "REDIS";
}
else
{
result = MLController.TaskExecGetFilt(TaskId, maxRec);
// serializzo e salvo...
rawData = JsonConvert.SerializeObject(result);
await redisDb.StringSetAsync(currKey, rawData, FastCache);
}
if (result == null)
{
result = new List<TaskExecModel>();
}
sw.Stop();
Log.Debug($"TaskExecGetFilt | {source} | {sw.Elapsed.TotalMilliseconds}ms");
return result;
}
/// <summary>
/// Elenco TaskList gestiti
/// </summary>
/// <param name="TType">Tipo task da recuperare</param>
/// <param name="enabRedis">Abilitazioen cache REDIS</param>
/// <param name="searchVal">Valore cercato</param>
/// <returns></returns>
public async Task<List<TaskListModel>> TaskListAll(Task2ExeType TType, bool enabRedis, string searchVal = "")
{
// setup parametri costanti
string source = "DB";
Stopwatch sw = new Stopwatch();
sw.Start();
List<TaskListModel>? result = new List<TaskListModel>();
// cerco in redis...
DateTime adesso = DateTime.Now;
if (enabRedis)
{
string currKey = $"{RedisBaseKey}:List:{TType}";
RedisValue rawData = await redisDb.StringGetAsync(currKey);
if (enabRedis && rawData.HasValue && rawData.Length() > 2)
{
result = JsonConvert.DeserializeObject<List<TaskListModel>>($"{rawData}");
source = "REDIS";
}
else
{
result = MLController.TaskListGetAll(TType);
// serializzo e salvo...
rawData = JsonConvert.SerializeObject(result);
await redisDb.StringSetAsync(currKey, rawData, FastCache);
}
}
else
{
result = MLController.TaskListGetAll(TType);
}
if (result == null)
{
result = new List<TaskListModel>();
}
// se necessario filtro..
if (!string.IsNullOrEmpty(searchVal))
{
result = result
.Where(x => x.Name.Contains(searchVal, StringComparison.InvariantCultureIgnoreCase)
|| x.Descript.Contains(searchVal, StringComparison.InvariantCultureIgnoreCase))
.ToList();
}
sw.Stop();
Log.Debug($"TaskListAll | {source} | {sw.Elapsed.TotalMilliseconds}ms");
return result;
}
/// <summary>
/// Update ordinamento task
/// </summary>
/// <param name="rec2upd">Record da spostare x priorità</param>
/// <returns></returns>
public async Task<bool> TaskListMove(TaskListModel rec2upd, bool moveUp)
{
bool dbResult = MLController.TaskListMove(rec2upd, moveUp);
await Task.Delay(50);
// svuoto cache!
await FlushCacheAsync();
return dbResult;
}
/// <summary>
/// Riordino record di un dato gruppo (da richiamare post cambio gruppo...)
/// </summary>
/// <param name="codGroup"></param>
/// <returns></returns>
public async Task<bool> TaskListReorder(int codGroup)
{
bool dbResult = MLController.TaskListReorder(codGroup);
await Task.Delay(50);
// svuoto cache!
await FlushCacheAsync();
return dbResult;
}
/// <summary>
/// Update/Insert record TaskList
/// </summary>
/// <param name="rec2upd"></param>
/// <returns></returns>
public async Task<bool> TaskListUpsert(TaskListModel rec2upd)
{
bool dbResult = MLController.TaskListUpsert(rec2upd);
await Task.Delay(50);
// svuoto cache!
await FlushCacheAsync();
return dbResult;
}
#endregion Public Methods
#region Protected Fields
/// <summary>
/// Oggetto per connessione a REDIS
/// </summary>
protected ConnectionMultiplexer redisConn = null!;
/// <summary>
/// Oggetto DB redis da impiegare x chiamate R/W
/// </summary>
protected IDatabase redisDb = null!;
#endregion Protected Fields
#region Private Fields
private static Logger Log = LogManager.GetCurrentClassLogger();
private string CodModulo = "";
private string ConnStr = "";
private Dictionary<string, string> connStrParams = new Dictionary<string, string>();
#endregion Private Fields
#region Private Properties
private string DataBase { get; set; } = "";
private string DataSource { get; set; } = "";
private MpTaskController MLController { get; set; } = null!;
private RestCallService RCallService { get; set; } = null!;
private string RedisBaseKey { get; set; } = "MP:TASK";
#endregion Private Properties
#region Private Methods
/// <summary>
/// Esegue flush memoria redis dato pat2Flush
/// </summary>
/// <param name="pat2Flush"></param>
/// <returns></returns>
private bool ExecFlushRedisPattern(RedisValue pat2Flush)
{
bool answ = false;
var masterEndpoint = redisConn.GetEndPoints()
.Where(ep => redisConn.GetServer(ep).IsConnected && !redisConn.GetServer(ep).IsReplica)
.FirstOrDefault();
// sepattern è "*" elimino intero DB...
if (masterEndpoint != null && (pat2Flush.Equals(new RedisValue("*")) || pat2Flush == RedisValue.Null))
{
redisConn.GetServer(masterEndpoint).FlushDatabase(database: redisDb.Database);
}
else
{
var server = redisConn.GetServer(masterEndpoint);
var keys = server.Keys(database: redisDb.Database, pattern: pat2Flush, pageSize: 1000);
var batch = new List<RedisKey>();
foreach (var key in keys)
{
batch.Add(key);
// Flush in batches of 1000
if (batch.Count >= 1000)
{
foreach (var item in batch)
redisDb.KeyDelete(item);
batch.Clear();
}
}
// Flush remaining keys
foreach (var item in batch)
redisDb.KeyDelete(item);
}
answ = true;
#if false
var listEndpoints = redisConn.GetEndPoints();
foreach (var endPoint in listEndpoints)
{
//var server = redisConnAdmin.GetServer(listEndpoints[0]);
var server = redisConn.GetServer(endPoint);
if (server != null)
{
var keyList = server.Keys(redisDb.Database, pattern);
foreach (var item in keyList)
{
redisDb.KeyDelete(item);
}
answ = true;
}
}
#endif
// notifico update ai client in ascolto x reset cache
NotifyReloadRequest($"FlushRedisCache | {pat2Flush}");
return answ;
}
/// <summary>
/// Esegue flush memoria redis dato pat2Flush modalità Async
/// </summary>
/// <param name="pat2Flush"></param>
/// <returns></returns>
private async Task<bool> ExecFlushRedisPatternAsync(RedisValue pat2Flush)
{
bool answ = false;
var masterEndpoint = redisConn.GetEndPoints()
.Where(ep => redisConn.GetServer(ep).IsConnected && !redisConn.GetServer(ep).IsReplica)
.FirstOrDefault();
// sepattern è "*" elimino intero DB...
if (masterEndpoint != null && (pat2Flush.Equals(new RedisValue("*")) || pat2Flush == RedisValue.Null))
{
redisConn.GetServer(masterEndpoint).FlushDatabase(database: redisDb.Database);
}
else
{
var server = redisConn.GetServer(masterEndpoint);
var keys = server.Keys(database: redisDb.Database, pattern: pat2Flush, pageSize: 1000);
var deleteTasks = new List<Task>();
foreach (var key in keys)
{
deleteTasks.Add(redisDb.KeyDeleteAsync(key));
if (deleteTasks.Count >= 1000)
{
await Task.WhenAll(deleteTasks);
deleteTasks.Clear();
}
}
if (deleteTasks.Count > 0)
{
await Task.WhenAll(deleteTasks);
}
}
answ = true;
#if false
var listEndpoints = redisConn.GetEndPoints();
foreach (var endPoint in listEndpoints)
{
//var server = redisConnAdmin.GetServer(listEndpoints[0]);
var server = redisConn.GetServer(endPoint);
if (server != null)
{
var keyList = server.Keys(redisDb.Database, pattern);
foreach (var item in keyList)
{
await redisDb.KeyDeleteAsync(item);
}
answ = true;
}
}
#endif
// notifico update ai client in ascolto x reset cache
NotifyReloadRequest($"FlushRedisCache | {pat2Flush}");
return answ;
}
#endregion Private Methods
}
}