Files

372 lines
12 KiB
C#

using Maat.Core;
using Maat.Core.CONF;
using Maat.Data;
using Maat.Data.Services;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.FileSystemGlobbing.Internal;
using Newtonsoft.Json;
using NLog;
using StackExchange.Redis;
using System;
using System.Diagnostics;
using System.Reflection;
using System.Threading;
using System.Xml.Linq;
using static Maat.Core.CONF.RestApiConf;
// init parte config, vedere https://blog.hildenco.com/2020/05/configuration-in-net-core-console.html
var env = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT");
var builder = new ConfigurationBuilder()
.AddJsonFile($"appsettings.json", true, true)
.AddJsonFile($"appsettings.{env}.json", true, true)
.AddEnvironmentVariables();
IConfigurationRoot? config = builder.Build();
// imposto variabili di base REDIS e FS
string lineSep = "---------------------------------------------";
string redisConf = config.GetConnectionString("Redis") ?? "???????????????????????????";
string confPath = Path.Combine(Directory.GetCurrentDirectory(), "conf");
Logger Log = LogManager.GetCurrentClassLogger();
string apiUrlStd = config.GetValue<string>("SrvConf:Prog.ApiUrl") ?? "";
if (string.IsNullOrEmpty(apiUrlStd))
{
Log.Error($"Errore: ApiUrl vuota!");
}
// gestore della configurazione principale da cui derivare gestione...
JobConfigConf currConf = new JobConfigConf();
// fix numero minimo dei thread pool x evitare collasso chiamate redis
ThreadPool.SetMinThreads(10, 10);
List<Thread> jobsThreadList = new List<Thread>();
// setup variabili accessorie
Dictionary<string, DateTime> LastKeySave = new Dictionary<string, DateTime>();
DateTime lastLog = DateTime.Now.AddMinutes(-1);
bool verboseLog = false;
bool logWriting = false;
string appName = "Maat";
int MinSaveIntSec = 60;
int SqlCmdTimeoutMinutes = 10;
int StepSec = 60;
string JobConfigFile = "";
// avvio e loggo
Log.Info(lineSep);
Log.Info($"Starting Maat Egalware's Task & Update Manager");
Log.Info($"vers.{Assembly.GetExecutingAssembly().GetName().Version}");
Log.Info("");
Log.Info($"Redis server param: {redisConf.Substring(0, 20)}...");
Log.Info(lineSep);
Log.Info("");
Log.Info("Running - press CTRL-C to stop");
Log.Info("");
// Setup REDIS
ConnectionMultiplexer.SetFeatureFlag("preventthreadtheft", true);
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(redisConf);
ISubscriber sub = redis.GetSubscriber();
IDatabase? redisDb = redis.GetDatabase();
// Effettuo setup configurazioni
setupConf();
/* --------------------------------------------------------
* MAIN
*
* Preparazione ed avvio dei thread
* -------------------------------------------------------- */
// preparo i thread secondo config
if (currConf != null)
{
Log.Info("Starting job threads...");
// cero i thread x tipo MsSql...
if (currConf.MsSqlJobs == null)
{
Log.Info("No MsSqlJob, skipping...");
}
else
{
if (currConf.MsSqlJobs.ServerList == null || currConf.MsSqlJobs.ServerList.Count == 0)
{
Log.Info("No Servers found in MsSqlJob.ServerList, skipping...");
}
else
{
foreach (var currSrv in currConf.MsSqlJobs.ServerList)
{
// verifico di avere i singoli DB da processare...
if (currSrv.DbList == null || currSrv.DbList.Count == 0)
{
Log.Info("No DB found in MsSqlJob.ServerList.DbList, skipping...");
}
else
{
foreach (var dbName in currSrv.DbList)
{
// compongo connstring
string dbConnStr = $"Server={currSrv.ServerName}; Database={dbName}; User ID={currSrv.DbUser}; Password={currSrv.DbPasswd}; integrated security=False; MultipleActiveResultSets=True; App={appName}";
string threadName = $"ThdMsSql_{currSrv.ServerName}_{dbName}";
Thread newThread = new Thread(() => doCronDB(threadName, dbConnStr, SqlCmdTimeoutMinutes));
newThread.Name = threadName;
jobsThreadList.Add(newThread);
Log.Info($"Added {newThread.Name} to jobsThreadList");
}
}
}
}
}
// cero i thread x tipo Rest...
if (currConf.RestApiJobs == null)
{
Log.Info("No RestApiJobs, skipping...");
}
else
{
if (currConf.RestApiJobs.ServerList == null || currConf.RestApiJobs.ServerList.Count == 0)
{
Log.Info("No Servers found in RestApiJobs.ServerList, skipping...");
}
else
{
foreach (var currSrv in currConf.RestApiJobs.ServerList)
{
// verifico di avere i singoli DB da processare...
if (currSrv.CallList == null || currSrv.CallList.Count == 0)
{
Log.Info("No DB found in RestApiJobs.ServerList.CallList, skipping...");
}
else
{
// thread unico x ogni server
string threadName = $"ThdRest_{currSrv.SrvName}";
Thread newThread = new Thread(() => doCronRest(threadName, currSrv.ApiUrl, currSrv.CallList));
newThread.Name = threadName;
jobsThreadList.Add(newThread);
Log.Info($"Added {newThread.Name} to jobsThreadList");
}
}
}
}
// avvio tutti i thread preparati...
foreach (var cThread in jobsThreadList)
{
cThread.Start();
Log.Info($"{cThread.Name} | Thread started!");
}
}
/* --------------------------------------------------------
* UTILS
*
* Funzioni sub e task da avviare su thread separati
* -------------------------------------------------------- */
/// <summary>
/// Setup + salvataggio redis delle conf (opzionale)
/// </summary>
void setupConf()
{
/* ----------------------------------------------------------
* Possibile miglioramento:
* - togliere conf sensibili da file (es user/pwd)
* - cercare su REDIS
* - se non trovate --> chiede in esecuzione interattiva (1° lancio)
* - scrivere su REDIS x successive esecuzioni
* ---------------------------------------------------------- */
ConfigManager configManager = new ConfigManager(redisConf, confPath);
appName = config.GetValue<string>("SrvConf:AppName") ?? "Maat.Runner";
StepSec = config.GetValue<int>("SrvConf:StepSec");
MinSaveIntSec = config.GetValue<int>("SrvConf:MinSaveIntSec");
var rawJCF = config.GetValue<string>("SrvConf:JobConfigFile");
SqlCmdTimeoutMinutes = config.GetValue<int>("SrvConf:SqlCmdTimeoutMinutes");
if (string.IsNullOrEmpty(rawJCF))
{
Log.Error($"Cannot start: missing JobConfigFile configuration");
}
else
{
JobConfigFile = rawJCF;
currConf = configManager.getJobConfig(JobConfigFile);
}
}
void saveAndSendMessage(string memKey, string channelName, string message)
{
// effettuo la scrittura nell'area di memoria indicata SE passato intervallo minimo
bool doSave = true;
if (LastKeySave.ContainsKey(memKey))
{
if (DateTime.Now.Subtract(LastKeySave[memKey]).TotalSeconds < MinSaveIntSec)
{
doSave = false;
}
}
else
{
LastKeySave.Add(memKey, DateTime.Now);
}
if (doSave)
{
if (redisDb != null)
{
redisDb.StringSetAsync(memKey, message);
LastKeySave[memKey] = DateTime.Now;
Log.Info($"Redis Cache Key: {memKey}");
}
}
// invio notifica tramite il canale richiesto
RedisChannel notifyChannel = new RedisChannel(channelName, RedisChannel.PatternMode.Auto);
sub.Publish(notifyChannel, message);
if (verboseLog)
{
Log.Info($"[{channelName}] key: {memKey} | val/message: {message}");
}
else
{
try
{
if (!logWriting)
{
logWriting = true;
// vedo se loggare...
DateTime adesso = DateTime.Now;
if (adesso.Subtract(lastLog).TotalSeconds > 15)
{
lastLog = adesso;
Log.Info(lineSep);
}
logWriting = false;
}
}
catch (Exception ex)
{
Log.Error($"ERROR{Environment.NewLine}{ex}");
}
}
}
void doCronDB(string threadName, string dbConnStr, int cmdTimeOut)
{
string tName = threadName;
Log.Info($"{tName} | Starting dbCronDb task");
// periodo standard 1 minuto
int stepPeriod = 60000;
MsSqlTaskService tRun = new MsSqlTaskService(tName, dbConnStr, redisConf, apiUrlStd, cmdTimeOut);
// ciclo infinito esegue task poi attende quanto manca a prox scadenza...
do
{
//var testServer = tRun.CheckRestServer();
// eseguo i miei task e recupero esito
var newStatus = tRun.CheckExecute();
// se ho qualcosa...
if (newStatus != null && newStatus.Count > 0)
{
// serializzo ed invio
string rawData = JsonConvert.SerializeObject(newStatus);
saveAndSendMessage(Const.TASK_LOG_CURR_KEY, Const.TASK_LOG_M_QUEUE, rawData);
Log.Info($"{tName} | Executed {newStatus.Count} task");
}
else
{
Log.Trace($"{tName} | Nothing to run...");
}
// calcolo quanto manca al prossimo step
DateTime adesso = DateTime.Now;
DateTime nextDt = adesso.Ceil(TimeSpan.FromMilliseconds(stepPeriod));
double waitTime = nextDt.Subtract(adesso).TotalMilliseconds;
if (waitTime == 0)
{
waitTime = stepPeriod * 90 / 100;
}
// attesa
Thread.Sleep((int)waitTime);
} while (true);
}
void doCronRest(string threadName, string baseUrl, List<CallConfig> callList)
{
string tName = threadName;
Log.Info($"{tName} | Starting doCronRest task");
// periodo standard 1 minuto
int stepPeriod = 60000;
// init servizio x chiamate REST...
RestCallService sRun = new RestCallService(baseUrl);
// il servizio usa la callList di oggetti da chiamare con indicazione tempistiche, inizializzo la tabella delle attese...
DateTime adesso = DateTime.Now;
var waitList = callList.Select(x => new RestWaitConf()
{
CallId = x.CallId,
WaitTime = x.WaitMinutes,
NextExe = adesso.AddMinutes(x.WaitMinutes)
}).ToList();
// ciclo infinito esegue task poi attende quanto manca al prox minuto...
do
{
// verifico in primis quale servizio sia eventualmente scaduto...
DateTime exeTime = DateTime.Now;
foreach (var item in waitList)
{
if (item.NextExe < exeTime)
{
// recupero la call...
var currCall = callList.Where(x => x.CallId == item.CallId).FirstOrDefault();
if (currCall != null)
{
// eseguo call
var callResp = sRun.CallRestGet(baseUrl, currCall.Resource);
}
// rischedulo!
item.NextExe = exeTime.AddMinutes(item.WaitTime);
}
}
#if false
// eseguo i miei task e recupero esito
var newStatus = tRun.CheckExecute();
// se ho qualcosa...
if (newStatus != null && newStatus.Count > 0)
{
// serializzo ed invio
string rawData = JsonConvert.SerializeObject(newStatus);
saveAndSendMessage(Const.TASK_LOG_CURR_KEY, Const.TASK_LOG_M_QUEUE, rawData);
}
else
{
Log.Trace("Nothing to run..");
}
#endif
// calcolo quanto manca al prossimo step
DateTime nextDt = exeTime.Ceil(TimeSpan.FromMilliseconds(stepPeriod));
double waitTime = nextDt.Subtract(exeTime).TotalMilliseconds;
if (waitTime <= 0)
{
waitTime = stepPeriod * 90 / 100;
}
// attesa
Thread.Sleep((int)waitTime);
} while (true);
}