using Maat.Core; using Maat.Core.CONF; using Maat.Data; using Maat.Data.Services; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.FileSystemGlobbing.Internal; using Newtonsoft.Json; using NLog; using StackExchange.Redis; using System; using System.Diagnostics; using System.Reflection; using System.Threading; using System.Xml.Linq; using static Maat.Core.CONF.RestApiConf; // init parte config, vedere https://blog.hildenco.com/2020/05/configuration-in-net-core-console.html var env = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT"); var builder = new ConfigurationBuilder() .AddJsonFile($"appsettings.json", true, true) .AddJsonFile($"appsettings.{env}.json", true, true) .AddEnvironmentVariables(); IConfigurationRoot? config = builder.Build(); // imposto variabili di base REDIS e FS string lineSep = "---------------------------------------------"; string redisConf = config.GetConnectionString("Redis") ?? "???????????????????????????"; string confPath = Path.Combine(Directory.GetCurrentDirectory(), "conf"); Logger Log = LogManager.GetCurrentClassLogger(); string apiUrlStd = config.GetValue("SrvConf:Prog.ApiUrl") ?? ""; if (string.IsNullOrEmpty(apiUrlStd)) { Log.Error($"Errore: ApiUrl vuota!"); } // gestore della configurazione principale da cui derivare gestione... JobConfigConf currConf = new JobConfigConf(); // fix numero minimo dei thread pool x evitare collasso chiamate redis ThreadPool.SetMinThreads(10, 10); List jobsThreadList = new List(); // setup variabili accessorie Dictionary LastKeySave = new Dictionary(); DateTime lastLog = DateTime.Now.AddMinutes(-1); bool verboseLog = false; bool logWriting = false; string appName = "Maat"; int MinSaveIntSec = 60; int SqlCmdTimeoutMinutes = 10; int StepSec = 60; string JobConfigFile = ""; // avvio e loggo Log.Info(lineSep); Log.Info($"Starting Maat Egalware's Task & Update Manager"); Log.Info($"vers.{Assembly.GetExecutingAssembly().GetName().Version}"); Log.Info(""); Log.Info($"Redis server param: {redisConf.Substring(0, 20)}..."); Log.Info(lineSep); Log.Info(""); Log.Info("Running - press CTRL-C to stop"); Log.Info(""); // Setup REDIS ConnectionMultiplexer.SetFeatureFlag("preventthreadtheft", true); ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(redisConf); ISubscriber sub = redis.GetSubscriber(); IDatabase? redisDb = redis.GetDatabase(); // Effettuo setup configurazioni setupConf(); /* -------------------------------------------------------- * MAIN * * Preparazione ed avvio dei thread * -------------------------------------------------------- */ // preparo i thread secondo config if (currConf != null) { Log.Info("Starting job threads..."); // cero i thread x tipo MsSql... if (currConf.MsSqlJobs == null) { Log.Info("No MsSqlJob, skipping..."); } else { if (currConf.MsSqlJobs.ServerList == null || currConf.MsSqlJobs.ServerList.Count == 0) { Log.Info("No Servers found in MsSqlJob.ServerList, skipping..."); } else { foreach (var currSrv in currConf.MsSqlJobs.ServerList) { // verifico di avere i singoli DB da processare... if (currSrv.DbList == null || currSrv.DbList.Count == 0) { Log.Info("No DB found in MsSqlJob.ServerList.DbList, skipping..."); } else { foreach (var dbName in currSrv.DbList) { // compongo connstring string dbConnStr = $"Server={currSrv.ServerName}; Database={dbName}; User ID={currSrv.DbUser}; Password={currSrv.DbPasswd}; integrated security=False; MultipleActiveResultSets=True; App={appName}"; string threadName = $"ThdMsSql_{currSrv.ServerName}_{dbName}"; Thread newThread = new Thread(() => doCronDB(threadName, dbConnStr, SqlCmdTimeoutMinutes)); newThread.Name = threadName; jobsThreadList.Add(newThread); Log.Info($"Added {newThread.Name} to jobsThreadList"); } } } } } // cero i thread x tipo Rest... if (currConf.RestApiJobs == null) { Log.Info("No RestApiJobs, skipping..."); } else { if (currConf.RestApiJobs.ServerList == null || currConf.RestApiJobs.ServerList.Count == 0) { Log.Info("No Servers found in RestApiJobs.ServerList, skipping..."); } else { foreach (var currSrv in currConf.RestApiJobs.ServerList) { // verifico di avere i singoli DB da processare... if (currSrv.CallList == null || currSrv.CallList.Count == 0) { Log.Info("No DB found in RestApiJobs.ServerList.CallList, skipping..."); } else { // thread unico x ogni server string threadName = $"ThdRest_{currSrv.SrvName}"; Thread newThread = new Thread(() => doCronRest(threadName, currSrv.ApiUrl, currSrv.CallList)); newThread.Name = threadName; jobsThreadList.Add(newThread); Log.Info($"Added {newThread.Name} to jobsThreadList"); } } } } // avvio tutti i thread preparati... foreach (var cThread in jobsThreadList) { cThread.Start(); Log.Info($"{cThread.Name} | Thread started!"); } } /* -------------------------------------------------------- * UTILS * * Funzioni sub e task da avviare su thread separati * -------------------------------------------------------- */ /// /// Setup + salvataggio redis delle conf (opzionale) /// void setupConf() { /* ---------------------------------------------------------- * Possibile miglioramento: * - togliere conf sensibili da file (es user/pwd) * - cercare su REDIS * - se non trovate --> chiede in esecuzione interattiva (1° lancio) * - scrivere su REDIS x successive esecuzioni * ---------------------------------------------------------- */ ConfigManager configManager = new ConfigManager(redisConf, confPath); appName = config.GetValue("SrvConf:AppName") ?? "Maat.Runner"; StepSec = config.GetValue("SrvConf:StepSec"); MinSaveIntSec = config.GetValue("SrvConf:MinSaveIntSec"); var rawJCF = config.GetValue("SrvConf:JobConfigFile"); SqlCmdTimeoutMinutes = config.GetValue("SrvConf:SqlCmdTimeoutMinutes"); if (string.IsNullOrEmpty(rawJCF)) { Log.Error($"Cannot start: missing JobConfigFile configuration"); } else { JobConfigFile = rawJCF; currConf = configManager.getJobConfig(JobConfigFile); } } void saveAndSendMessage(string memKey, string channelName, string message) { // effettuo la scrittura nell'area di memoria indicata SE passato intervallo minimo bool doSave = true; if (LastKeySave.ContainsKey(memKey)) { if (DateTime.Now.Subtract(LastKeySave[memKey]).TotalSeconds < MinSaveIntSec) { doSave = false; } } else { LastKeySave.Add(memKey, DateTime.Now); } if (doSave) { if (redisDb != null) { redisDb.StringSetAsync(memKey, message); LastKeySave[memKey] = DateTime.Now; Log.Info($"Redis Cache Key: {memKey}"); } } // invio notifica tramite il canale richiesto RedisChannel notifyChannel = new RedisChannel(channelName, RedisChannel.PatternMode.Auto); sub.Publish(notifyChannel, message); if (verboseLog) { Log.Info($"[{channelName}] key: {memKey} | val/message: {message}"); } else { try { if (!logWriting) { logWriting = true; // vedo se loggare... DateTime adesso = DateTime.Now; if (adesso.Subtract(lastLog).TotalSeconds > 15) { lastLog = adesso; Log.Info(lineSep); } logWriting = false; } } catch (Exception ex) { Log.Error($"ERROR{Environment.NewLine}{ex}"); } } } void doCronDB(string threadName, string dbConnStr, int cmdTimeOut) { string tName = threadName; Log.Info($"{tName} | Starting dbCronDb task"); // periodo standard 1 minuto int stepPeriod = 60000; MsSqlTaskService tRun = new MsSqlTaskService(tName, dbConnStr, redisConf, apiUrlStd, cmdTimeOut); // ciclo infinito esegue task poi attende quanto manca a prox scadenza... do { //var testServer = tRun.CheckRestServer(); // eseguo i miei task e recupero esito var newStatus = tRun.CheckExecute(); // se ho qualcosa... if (newStatus != null && newStatus.Count > 0) { // serializzo ed invio string rawData = JsonConvert.SerializeObject(newStatus); saveAndSendMessage(Const.TASK_LOG_CURR_KEY, Const.TASK_LOG_M_QUEUE, rawData); Log.Info($"{tName} | Executed {newStatus.Count} task"); } else { Log.Trace($"{tName} | Nothing to run..."); } // calcolo quanto manca al prossimo step DateTime adesso = DateTime.Now; DateTime nextDt = adesso.Ceil(TimeSpan.FromMilliseconds(stepPeriod)); double waitTime = nextDt.Subtract(adesso).TotalMilliseconds; if (waitTime == 0) { waitTime = stepPeriod * 90 / 100; } // attesa Thread.Sleep((int)waitTime); } while (true); } void doCronRest(string threadName, string baseUrl, List callList) { string tName = threadName; Log.Info($"{tName} | Starting doCronRest task"); // periodo standard 1 minuto int stepPeriod = 60000; // init servizio x chiamate REST... RestCallService sRun = new RestCallService(baseUrl); // il servizio usa la callList di oggetti da chiamare con indicazione tempistiche, inizializzo la tabella delle attese... DateTime adesso = DateTime.Now; var waitList = callList.Select(x => new RestWaitConf() { CallId = x.CallId, WaitTime = x.WaitMinutes, NextExe = adesso.AddMinutes(x.WaitMinutes) }).ToList(); // ciclo infinito esegue task poi attende quanto manca al prox minuto... do { // verifico in primis quale servizio sia eventualmente scaduto... DateTime exeTime = DateTime.Now; foreach (var item in waitList) { if (item.NextExe < exeTime) { // recupero la call... var currCall = callList.Where(x => x.CallId == item.CallId).FirstOrDefault(); if (currCall != null) { // eseguo call var callResp = sRun.CallRestGet(baseUrl, currCall.Resource); } // rischedulo! item.NextExe = exeTime.AddMinutes(item.WaitTime); } } #if false // eseguo i miei task e recupero esito var newStatus = tRun.CheckExecute(); // se ho qualcosa... if (newStatus != null && newStatus.Count > 0) { // serializzo ed invio string rawData = JsonConvert.SerializeObject(newStatus); saveAndSendMessage(Const.TASK_LOG_CURR_KEY, Const.TASK_LOG_M_QUEUE, rawData); } else { Log.Trace("Nothing to run.."); } #endif // calcolo quanto manca al prossimo step DateTime nextDt = exeTime.Ceil(TimeSpan.FromMilliseconds(stepPeriod)); double waitTime = nextDt.Subtract(exeTime).TotalMilliseconds; if (waitTime <= 0) { waitTime = stepPeriod * 90 / 100; } // attesa Thread.Sleep((int)waitTime); } while (true); }