313 lines
11 KiB
C#
313 lines
11 KiB
C#
using EgwCoreLib.Lux.Core.Generic;
|
|
using EgwCoreLib.Lux.Core.RestPayload;
|
|
using EgwMultiEngineManager.Data;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Newtonsoft.Json;
|
|
using NLog;
|
|
using StackExchange.Redis;
|
|
using System.Diagnostics;
|
|
|
|
namespace EgwCoreLib.Lux.Data.Services
|
|
{
|
|
public class ProdService : BaseServ
|
|
{
|
|
#region Public Constructors
|
|
|
|
public ProdService(IConfiguration configuration, IConnectionMultiplexer RedisConn, IRedisService redisService) : base(configuration, RedisConn)
|
|
{
|
|
// conf redis service
|
|
_redisService = redisService;
|
|
chPub = _config.GetValue<string>("ServerConf:ChannelPub") ?? "";
|
|
queueCalcKey = (RedisKey)$"{redisBaseKey}:CalcQueue";
|
|
queueWaitKey = (RedisKey)$"{queueCalcKey}:Wait";
|
|
queueRunKey = (RedisKey)$"{queueCalcKey}:Run";
|
|
redisOrderReqKey = $"{redisBaseKey}:OrderReq";
|
|
redisOrderRunKey = $"{redisBaseKey}:OrderRun";
|
|
redisOrderDoneKey = $"{redisBaseKey}:OrderDone";
|
|
Log.Info($"ProdService | Init OK");
|
|
}
|
|
|
|
#endregion Public Constructors
|
|
|
|
#region Public Enums
|
|
|
|
public enum QueueType
|
|
{
|
|
waiting,
|
|
running,
|
|
done
|
|
}
|
|
|
|
#endregion Public Enums
|
|
|
|
#region Public Methods
|
|
|
|
/// <summary>
|
|
/// Accoda una richiesta di calcolo
|
|
/// </summary>
|
|
/// <param name="reqType"></param>
|
|
/// <param name="reqUid"></param>
|
|
/// <param name="currRequest"></param>
|
|
/// <returns></returns>
|
|
public async Task<bool> EnqueueRequest(string reqType, string reqUid, CalcRequestDTO currRequest)
|
|
{
|
|
bool done = false;
|
|
int nId = 1;
|
|
// salvo su cache x successivo reinvio da currRequest
|
|
QuestionDTO calcRequest = new QuestionDTO(nId, currRequest.EnvType, currRequest.DictExec);
|
|
// salvo in cache contenuto della richiesta x UID
|
|
string currKey = $"{redisOrderReqKey}:{reqUid.Replace("/", ":")}";
|
|
done = await _redisService.SetAsync(currKey, calcRequest.sProcessArgs);
|
|
// invio ed accodo!
|
|
_redisService.QueuePush(queueWaitKey, (RedisValue)reqUid);
|
|
// dizionario richieste: è il serializzato dell'elenco degli UID da calcolare...
|
|
List<RedisValue> currList = await _redisService.QueueListAllAsync(queueWaitKey);
|
|
Dictionary<string, string> calcDict = new Dictionary<string, string>();
|
|
calcDict.Add("ReqLen", $"{calcDict.Count}");
|
|
string listReq = JsonConvert.SerializeObject(currList);
|
|
calcDict.Add("ReqList", listReq);
|
|
// preparo richiesta di calcolo x UID da inviare
|
|
QuestionDTO chRequest = new QuestionDTO(nId, currRequest.EnvType, calcDict);
|
|
// invio sul channel redis della richiesta di processing
|
|
await _redisService.PublishAsync(chPub, chRequest.sProcessArgs);
|
|
// ritorno
|
|
return done;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Restituzione singolo Job specifico
|
|
/// </summary>
|
|
/// <param name="id"></param>
|
|
/// <returns></returns>
|
|
public async Task<string> GetJob(string id)
|
|
{
|
|
Stopwatch sw = new Stopwatch();
|
|
sw.Start();
|
|
string result = "";
|
|
// recupero richiesta serializzata
|
|
string currKey = $"{redisOrderReqKey}:{id.Replace("/", ":")}";
|
|
var rawRes = await _redisService.GetAsync(currKey);
|
|
if (!string.IsNullOrEmpty(rawRes))
|
|
{
|
|
result = rawRes;
|
|
}
|
|
sw.Stop();
|
|
Log.Info($"GetJob | {id} | {sw.Elapsed.TotalMilliseconds:N3} ms");
|
|
return result;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Restituisce il contenuto della prox richiesta da eseguire
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public async Task<string> GetNext(QueueType qType = QueueType.waiting)
|
|
{
|
|
Stopwatch sw = new Stopwatch();
|
|
sw.Start();
|
|
string result = "";
|
|
RedisKey rKey = qKey(qType);
|
|
// prendo dalla coda primo job (rimuovendolo...)
|
|
var rawReq = await _redisService.QueuePopAsync(rKey);
|
|
if (rawReq.HasValue)
|
|
{
|
|
string reqUid = $"{rawReq}";
|
|
// metto UID in coda running nella successiva.. se non ultima...
|
|
if (qType < QueueType.running)
|
|
{
|
|
// FixMe ToDo !!!: salvataggio data-ora per indicare avvio calcolo...
|
|
_redisService.QueuePush(qKey(qType + 1), (RedisValue)reqUid);
|
|
}
|
|
// recupero richiesta serializzata
|
|
string currKey = $"{redisOrderReqKey}:{reqUid.Replace("/", ":")}";
|
|
var rawRes = await _redisService.GetAsync(currKey);
|
|
if (!string.IsNullOrEmpty(rawRes))
|
|
{
|
|
result = rawRes;
|
|
}
|
|
}
|
|
sw.Stop();
|
|
Log.Info($"GetNext | {rKey} | {sw.Elapsed.TotalMilliseconds:N3} ms");
|
|
return result;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Numero di Job in attesa su coda
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public async Task<long> QueueLen(QueueType qType = QueueType.waiting)
|
|
{
|
|
Stopwatch sw = new Stopwatch();
|
|
sw.Start();
|
|
RedisKey rKey = qKey(qType);
|
|
long numWaiting = await _redisService.QueueCountAsync(rKey);
|
|
sw.Stop();
|
|
Log.Info($"QueueLen | {rKey} | {sw.Elapsed.TotalMilliseconds:N3} ms");
|
|
return numWaiting;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Restituisce elenco corrente delle richieste...
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public async Task<List<RedisValue>> QueueListAllAsync(QueueType qType)
|
|
{
|
|
RedisKey rKey = qKey(qType);
|
|
List<RedisValue> currList = await _redisService.QueueListAllAsync(rKey);
|
|
return currList;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reset coda del tipo richiestorichieste...
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public bool QueueReset(QueueType qType)
|
|
{
|
|
RedisKey rKey = qKey(qType);
|
|
bool answ = _redisService.QueueReset(rKey);
|
|
return answ;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reset coda del tipo richiestorichieste...
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public async Task<bool> QueueResetAsync(QueueType qType)
|
|
{
|
|
RedisKey rKey = qKey(qType);
|
|
bool answ = await _redisService.QueueResetAsync(rKey);
|
|
return answ;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reset coda running riaccodando su coda queue...
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public async Task<bool> ResetRunning()
|
|
{
|
|
bool fatto = false;
|
|
RedisKey rKey = qKey(QueueType.running);
|
|
var listRunning = await _redisService.QueuePopAllAsync(rKey);
|
|
if (listRunning != null)
|
|
{
|
|
foreach (var item in listRunning)
|
|
{
|
|
if (item.HasValue)
|
|
{
|
|
// FixMe ToDo !!!: salvataggio data-ora per indicare avvio calcolo...
|
|
_redisService.QueuePush(qKey(QueueType.waiting), item);
|
|
}
|
|
}
|
|
}
|
|
return fatto;
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Rimette un run un singolo job eliminando da coda Run e mettendo in coda waiting
|
|
/// </summary>
|
|
/// <param name="codJob"></param>
|
|
/// <returns></returns>
|
|
public async Task<bool> ReRunJob(string codJob)
|
|
{
|
|
bool fatto = false;
|
|
RedisKey rKey = qKey(QueueType.running);
|
|
// recupero lista completa SENZA eliminare...
|
|
var listRunning = await _redisService.QueueListAllAsync(rKey);
|
|
if (listRunning != null && listRunning.Contains(codJob))
|
|
{
|
|
var numRem = await _redisService.QueueRemoveAsync(rKey, (RedisValue)codJob);
|
|
if (numRem > 0)
|
|
{
|
|
var numAdd = _redisService.QueuePush(qKey(QueueType.waiting), codJob);
|
|
fatto = numAdd > 0;
|
|
}
|
|
}
|
|
return fatto;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Richiesta di esecuzione di un job di tipo Balance
|
|
/// </summary>
|
|
/// <param name="codJob"></param>
|
|
/// <param name="balData"></param>
|
|
/// <returns></returns>
|
|
public async Task<bool> BalanceJob(string codJob, BalanceReqDto balData)
|
|
{
|
|
bool fatto = false;
|
|
#if false
|
|
RedisKey rKey = qKey(QueueType.running);
|
|
// recupero lista completa SENZA eliminare...
|
|
var listRunning = await _redisService.QueueListAllAsync(rKey);
|
|
if (listRunning != null && listRunning.Contains(codJob))
|
|
{
|
|
var numRem = await _redisService.QueueRemoveAsync(rKey, (RedisValue)codJob);
|
|
if (numRem > 0)
|
|
{
|
|
var numAdd = _redisService.QueuePush(qKey(QueueType.waiting), codJob);
|
|
fatto = numAdd > 0;
|
|
}
|
|
}
|
|
#endif
|
|
return fatto;
|
|
}
|
|
|
|
#endregion Public Methods
|
|
|
|
#region Private Fields
|
|
|
|
private static Logger Log = LogManager.GetCurrentClassLogger();
|
|
|
|
private readonly IRedisService _redisService;
|
|
|
|
private readonly string chPub = "";
|
|
|
|
/// <summary>
|
|
/// Key della coda redis delle richieste in waiting x PROD Engine
|
|
/// </summary>
|
|
private RedisKey queueCalcKey;
|
|
|
|
/// <summary>
|
|
/// Coda RUN
|
|
/// </summary>
|
|
private RedisKey queueRunKey;
|
|
|
|
/// <summary>
|
|
/// Coda attesa
|
|
/// </summary>
|
|
private RedisKey queueWaitKey;
|
|
|
|
private string redisBaseKey = "Lux:Prod";
|
|
|
|
private string redisOrderDoneKey = "";
|
|
|
|
private string redisOrderReqKey = "";
|
|
|
|
private string redisOrderRunKey = "";
|
|
|
|
#endregion Private Fields
|
|
|
|
#region Private Methods
|
|
|
|
private string qKey(QueueType qType)
|
|
{
|
|
RedisKey rKey = queueCalcKey;
|
|
switch (qType)
|
|
{
|
|
case QueueType.waiting:
|
|
rKey = queueWaitKey;
|
|
break;
|
|
|
|
case QueueType.running:
|
|
rKey = queueRunKey;
|
|
break;
|
|
|
|
default:
|
|
rKey = queueCalcKey;
|
|
break;
|
|
}
|
|
return rKey;
|
|
}
|
|
|
|
#endregion Private Methods
|
|
}
|
|
} |