From ee043f81be1f57262d1ebcead41d43b8cf1a178f Mon Sep 17 00:00:00 2001 From: Samuele Locatelli Date: Tue, 12 Jul 2022 18:44:57 +0200 Subject: [PATCH] Update MON: - gestione tramite REDIS CHANNEL - refresh sincrono --- MP.Data/Constants.cs | 5 +- MP.Data/MessagePipe.cs | 135 ++++++++++++++++++++++++++++++++ MP.Mon/Data/MpDataService.cs | 76 ++++++++++++++++-- MP.Mon/MP.Mon.csproj | 2 +- MP.Mon/Pages/Index.razor.cs | 124 ++++++++++++++++++++++++----- MP.Mon/Resources/ChangeLog.html | 2 +- MP.Mon/Resources/VersNum.txt | 2 +- MP.Mon/Resources/manifest.xml | 2 +- 8 files changed, 318 insertions(+), 30 deletions(-) create mode 100644 MP.Data/MessagePipe.cs diff --git a/MP.Data/Constants.cs b/MP.Data/Constants.cs index da0fdf49..10d0028c 100644 --- a/MP.Data/Constants.cs +++ b/MP.Data/Constants.cs @@ -14,7 +14,8 @@ namespace MP.Data // REDIS KEY Dati correnti public static readonly string CONF_MON_KEY = $"{BASE_HASH}:Conf:MonDispData"; - public static readonly string ACT_FLUX_DATA_KEY = $"{BASE_HASH}:Current:FluxData"; - + public static readonly string ACT_MSE_DATA_KEY = $"{BASE_HASH}:Current:MSE"; + public static readonly string ACT_BLINK_KEY = $"{BASE_HASH}:Current:Blink"; + } } diff --git a/MP.Data/MessagePipe.cs b/MP.Data/MessagePipe.cs new file mode 100644 index 00000000..44988e21 --- /dev/null +++ b/MP.Data/MessagePipe.cs @@ -0,0 +1,135 @@ +using NLog; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MP.Data +{ + public class MessagePipe + { + #region Private Fields + + private bool enableLog = false; + private IConnectionMultiplexer redis; + private IDatabase? redisDb; + + #endregion Private Fields + + #region Protected Fields + + protected static Logger Log = LogManager.GetCurrentClassLogger(); + + #endregion Protected Fields + + #region Public Constructors + + public MessagePipe(IConnectionMultiplexer redisConn, string channelName, bool enableLog = false) + { + _channel = channelName; + redis = redisConn; + redisDb = redis.GetDatabase(); + this.enableLog = enableLog; + // aggiungo sottoscrittore + setupSubscriber(); + } + + #endregion Public Constructors + + #region Public Events + + public event EventHandler EA_NewMessage = delegate { }; + + #endregion Public Events + + #region Private Properties + + /// + /// Canale associato al gestore pipeline messaggi + /// + private string _channel { get; set; } = ""; + + #endregion Private Properties + + #region Private Methods + + private void setupSubscriber() + { + ISubscriber sub = redis.GetSubscriber(); + //Subscribe to the channel named messages + sub.Subscribe(_channel, (channel, message) => + { + Log.Trace($"ch {channel} | {message}"); + // messaggio + PubSubEventArgs mea = new PubSubEventArgs(message); + // se qualcuno ascolta sollevo evento nuovo valore... + if (EA_NewMessage != null) + { + EA_NewMessage(this, mea); + } + }); + Log.Info($"Subscribed {_channel}"); + } + + #endregion Private Methods + + #region Public Methods + + public bool saveAndSendMessage(string memKey, string message) + { + bool answ = false; + // invio notifica tramite il canale richiesto + answ = sendMessage(message); + if (redisDb != null) + { + redisDb.StringSetAsync(memKey, message); + if (enableLog) + { + Log.Info($"Redis Cache Key: {memKey}"); + } + } + return answ; + } + + /// + /// Invio messaggio sul canale + /// + /// + /// + public bool sendMessage(string newMess) + { + bool answ = false; + ISubscriber sub = redis.GetSubscriber(); + sub.Publish(_channel, newMess); + return answ; + } + + #endregion Public Methods + + /// + /// Invio messaggio sul canale + salvataggio in cache REDIS + /// + /// Chiave REDIS x salvare valore + /// + } + + public class PubSubEventArgs : EventArgs + { + #region Public Constructors + + public PubSubEventArgs(string messaggio) + { + this.newMessage = messaggio; + } + + #endregion Public Constructors + + #region Public Properties + + public string newMessage { get; set; } = ""; + + #endregion Public Properties + } +} diff --git a/MP.Mon/Data/MpDataService.cs b/MP.Mon/Data/MpDataService.cs index ab191f88..a72b6f2a 100644 --- a/MP.Mon/Data/MpDataService.cs +++ b/MP.Mon/Data/MpDataService.cs @@ -1,4 +1,5 @@ -using MP.Data.Conf; +using MP.Data; +using MP.Data.Conf; using MP.Data.DatabaseModels; using Newtonsoft.Json; using NLog; @@ -18,10 +19,11 @@ namespace MP.Mon.Data _configuration = configuration; // setup compoenti REDIS - this.redisConn = ConnectionMultiplexer.Connect(_configuration.GetConnectionString("Redis")); - this.redisDb = this.redisConn.GetDatabase(); - //// setup canali pub/sub - //actLogPipe = new MessagePipe(redisConn, Constants.ACT_LOG_M_QUEUE); + redisConn = ConnectionMultiplexer.Connect(_configuration.GetConnectionString("Redis")); + redisDb = redisConn.GetDatabase(); + // setup canali pub/sub + dataPipe = new MessagePipe(redisConn, Constants.ACT_MSE_DATA_KEY); + blinkPipe = new MessagePipe(redisConn, Constants.ACT_BLINK_KEY); // conf DB string connStr = _configuration.GetConnectionString("Mp.Mon"); @@ -40,6 +42,67 @@ namespace MP.Mon.Data // setup conf IOB da dizionario tryLoadIobTags(); + + // avvio timers... + startTimers(); + } + + private void startTimers() + { + fastTimer = new System.Timers.Timer(fastRefreshMs); + fastTimer.Elapsed += ElapsedFastTimer; + fastTimer.Enabled = true; + fastTimer.Start(); + } + + private void ElapsedFastTimer(object? source, System.Timers.ElapsedEventArgs e) + { + var pUpd = Task.Run(async () => + { + // secondi pari --> blink, secondi dispari --> ricarica + DateTime adesso = DateTime.Now; + int resto = 0; + Math.DivRem(adesso.Second, 2, out resto); + if (resto == 0) + { + // invio in channel blink il segnale + blinkPipe.sendMessage("true"); + Log.Trace("Elapsed Fast Timer Blink"); + } + else + { + // invio in channel blink segnale false + blinkPipe.sendMessage("false"); + // rileggo dati ed invio + await ReloadData(); + Log.Trace("Elapsed Fast Timer reload"); + } + }); + pUpd.Wait(); + } + + private async Task ReloadData() + { + // legge i dati e li invia tramite redis channels + var newData = await MseGetAll(); + // invio tramite la pipe... + dataPipe.sendMessage(JsonConvert.SerializeObject(newData)); + + } + + /// + /// Limite in formato data-ora per inviare dati rapidamente (incrementato come now + 1 min ad ogni chiamata client) + /// + private DateTime fastLimit = DateTime.Now; + private int fastRefreshMs = 1000; + private static System.Timers.Timer fastTimer = new System.Timers.Timer(4000); + + /// + /// Richiesta attivazione --> sposto avanti 1 minuto il periodo limite x fast running + /// + public void doActivate() + { + fastLimit = DateTime.Now.AddMinutes(1); } #endregion Public Constructors @@ -132,6 +195,9 @@ namespace MP.Mon.Data #endregion Public Methods + public MessagePipe dataPipe { get; set; } = null!; + public MessagePipe blinkPipe { get; set; } = null!; + #region Private Fields private static IConfiguration _configuration = null!; diff --git a/MP.Mon/MP.Mon.csproj b/MP.Mon/MP.Mon.csproj index 8a237fff..8bd12d2b 100644 --- a/MP.Mon/MP.Mon.csproj +++ b/MP.Mon/MP.Mon.csproj @@ -4,7 +4,7 @@ net6.0 enable enable - 6.15.2207.1217 + 6.15.2207.1218 diff --git a/MP.Mon/Pages/Index.razor.cs b/MP.Mon/Pages/Index.razor.cs index c4409d3a..92063d26 100644 --- a/MP.Mon/Pages/Index.razor.cs +++ b/MP.Mon/Pages/Index.razor.cs @@ -1,7 +1,9 @@ using Microsoft.AspNetCore.Components; +using MP.Data; using MP.Data.Conf; using MP.Data.DatabaseModels; using MP.Mon.Data; +using Newtonsoft.Json; using NLog; namespace MP.Mon.Pages @@ -15,12 +17,13 @@ namespace MP.Mon.Pages #if false fastTimer.Stop(); fastTimer.Dispose(); +#endif slowTimer.Stop(); slowTimer.Dispose(); -#endif disposeTimers(); } +#if false public void ElapsedFastTimer(object? source, System.Timers.ElapsedEventArgs e) { var pUpd = Task.Run(async () => @@ -33,6 +36,8 @@ namespace MP.Mon.Pages { doBlink = true; Log.Trace("Elapsed Fast Timer Blink"); + await Task.Delay(1); + await InvokeAsync(StateHasChanged); } else { @@ -40,11 +45,10 @@ namespace MP.Mon.Pages await ReloadData(); Log.Trace("Elapsed Fast Timer reload"); } - await Task.Delay(1); - await InvokeAsync(StateHasChanged); }); pUpd.Wait(); } +#endif public async void ElapsedSlowTimer(object? source, System.Timers.ElapsedEventArgs e) { @@ -59,11 +63,13 @@ namespace MP.Mon.Pages public void StartTimer() { +#if false // timer veloce fastTimer = new System.Timers.Timer(fastRefreshMs); fastTimer.Elapsed += ElapsedFastTimer; fastTimer.Enabled = true; fastTimer.Start(); +#endif // timer lento slowTimer = new System.Timers.Timer(slowRefreshMs); slowTimer.Elapsed += ElapsedSlowTimer; @@ -185,7 +191,9 @@ namespace MP.Mon.Pages protected override async Task OnInitializedAsync() { await setupConf(); - await ReloadData(); + //await ReloadData(); + MMDataService.dataPipe.EA_NewMessage += DataPipe_EA_NewMessage; + MMDataService.blinkPipe.EA_NewMessage += BlinkPipe_EA_NewMessage; StartTimer(); } @@ -193,41 +201,119 @@ namespace MP.Mon.Pages #region Private Fields - private static System.Timers.Timer fastTimer = new System.Timers.Timer(4000); + //private static System.Timers.Timer fastTimer = new System.Timers.Timer(4000); private static NLog.Logger Log = LogManager.GetCurrentClassLogger(); + private static System.Timers.Timer slowTimer = new System.Timers.Timer(300000); + private List? CurrConfig = null; + private bool doBlink = false; + private List? listMSE = null; #endregion Private Fields #region Private Methods + private void BlinkPipe_EA_NewMessage(object? sender, EventArgs e) + { + PubSubEventArgs currArgs = (PubSubEventArgs)e; + // conversione on-the-fly List --> allarmi + if (!string.IsNullOrEmpty(currArgs.newMessage)) + { + try + { + var dataRaw = JsonConvert.DeserializeObject(currArgs.newMessage); + if (dataRaw != null) + { + bool.TryParse($"{dataRaw}", out doBlink); + } + } + catch + { } + InvokeAsync(() => + { + StateHasChanged(); + }); + } + } + + /// + /// Ricevuto nuovi dati da mostrare! + /// + /// + /// + /// + private void DataPipe_EA_NewMessage(object? sender, EventArgs e) + { + //fastTimer.Stop(); + PubSubEventArgs currArgs = (PubSubEventArgs)e; + // conversione on-the-fly List --> allarmi + if (!string.IsNullOrEmpty(currArgs.newMessage)) + { + try + { + var dataList = JsonConvert.DeserializeObject>(currArgs.newMessage); + if (dataList != null) + { +#if DEBUG + // hack: legge 4 volte i dati x stressare sistema + var singleData = dataList; + listMSE = new List(); + for (int i = 0; i < 4; i++) + { + listMSE.AddRange(singleData); + } +#else + listMSE = dataList; +#endif + } + } + catch + { } + } + InvokeAsync(() => + { + // attesa random 0-50ms... + Random rnd = new Random(); + Task.Delay(rnd.Next(5, 50)); + StateHasChanged(); + }); + //fastTimer.Start(); + } + private void disposeTimers() { - fastTimer.Elapsed -= ElapsedFastTimer; - fastTimer.Stop(); - fastTimer.Dispose(); + //fastTimer.Elapsed -= ElapsedFastTimer; + //fastTimer.Stop(); + //fastTimer.Dispose(); slowTimer.Elapsed -= ElapsedSlowTimer; slowTimer.Stop(); slowTimer.Dispose(); } +#if false private async Task ReloadData() { -#if DEBUG - // hack: legge 4 volte i dati x stressare sistema - var singleData = await MMDataService.MseGetAll(); - listMSE = new List(); - for (int i = 0; i < 4; i++) - { - listMSE.AddRange(singleData); - } -#else - listMSE = await MMDataService.MseGetAll(); -#endif + // legge i dati e li invia tramite redis channels + var newData = await MMDataService.MseGetAll(); + // invio tramite la pipe... + MMDataService.dataPipe.sendMessage(JsonConvert.SerializeObject(newData)); + + //#if DEBUG + // // hack: legge 4 volte i dati x stressare sistema + // var singleData = await MMDataService.MseGetAll(); + // listMSE = new List(); + // for (int i = 0; i < 4; i++) + // { + // listMSE.AddRange(singleData); + // } + //#else + // listMSE = await MMDataService.MseGetAll(); + //#endif } +#endif private async Task setupConf() { diff --git a/MP.Mon/Resources/ChangeLog.html b/MP.Mon/Resources/ChangeLog.html index e6795ccf..f9de4f4a 100644 --- a/MP.Mon/Resources/ChangeLog.html +++ b/MP.Mon/Resources/ChangeLog.html @@ -1,6 +1,6 @@ Modulo MON MAPO -

Versione: 6.15.2207.1217

+

Versione: 6.15.2207.1218


Note di rilascio:
  • diff --git a/MP.Mon/Resources/VersNum.txt b/MP.Mon/Resources/VersNum.txt index 0a825dff..97d4a6fd 100644 --- a/MP.Mon/Resources/VersNum.txt +++ b/MP.Mon/Resources/VersNum.txt @@ -1 +1 @@ -6.15.2207.1217 +6.15.2207.1218 diff --git a/MP.Mon/Resources/manifest.xml b/MP.Mon/Resources/manifest.xml index aeae4046..f9ff8551 100644 --- a/MP.Mon/Resources/manifest.xml +++ b/MP.Mon/Resources/manifest.xml @@ -1,6 +1,6 @@ - 6.15.2207.1217 + 6.15.2207.1218 https://nexus.steamware.net/repository/SWS/MP-MON/stable/LAST/MP.Mon.zip https://nexus.steamware.net/repository/SWS/MP-MON/stable/LAST/ChangeLog.html false