using NLog; using StackExchange.Redis; using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading.Channels; namespace MP.Data { public class MessagePipe : IDisposable { #region Public Constructors public MessagePipe(IConnectionMultiplexer redisConn, string channelName, bool enableLog = false) { _channel = channelName; rChannel = new RedisChannel(_channel, RedisChannel.PatternMode.Literal); this.redisConn = redisConn; redisDb = this.redisConn.GetDatabase(); redisSub = this.redisConn.GetSubscriber(); this.enableLog = enableLog; // aggiungo sottoscrittore setupSubscriber(); } #endregion Public Constructors #region Public Events public event EventHandler EA_NewMessage = delegate { }; #endregion Public Events #region Public Methods public void Dispose() { redisDb = null; redisSub = null; } /// /// Invio messaggio sul canale + salvataggio in cache REDIS /// /// Chiave REDIS x salvare valore /// Messaggio serializzato da inviare public bool saveAndSendMessage(string memKey, string message) { bool answ = false; Stopwatch stopWatch = new Stopwatch(); stopWatch.Start(); // invio notifica tramite il canale richiesto answ = sendMessage(message); if (redisDb != null) { redisDb.StringSetAsync(memKey, message); } stopWatch.Stop(); TimeSpan ts = stopWatch.Elapsed; if (numSent.ContainsKey(memKey)) { numSent[memKey]++; } else { numSent.Add(memKey, 1); } if (enableLog || numSent[memKey] > 30) { Log.Info($"saveAndSendMessage| mKey {memKey} x {numSent[memKey]} | {message.Length} size | {ts.TotalMilliseconds} ms"); numSent[memKey] = 0; } return answ; } /// /// Invio messaggio sul canale /// /// /// public bool sendMessage(string newMess) { bool answ = false; if (!string.IsNullOrEmpty(_channel)) { #if false ISubscriber sub = redis.GetSubscriber(); sub.Publish(_channel, newMess); #endif var numCli = redisSub.Publish(rChannel, newMess); answ = numCli > 0; } return answ; } #endregion Public Methods #region Protected Fields protected static Logger Log = LogManager.GetCurrentClassLogger(); #endregion Protected Fields #region Private Fields /// /// Nome Canale associato al gestore pipeline messaggi /// private string _channel = ""; private bool enableLog = false; private Dictionary numSent = new Dictionary(); /// /// Channel di comunicazione REDIS /// private RedisChannel rChannel; private IConnectionMultiplexer redisConn; private IDatabase redisDb; private ISubscriber redisSub; #endregion Private Fields #region Private Methods private void setupSubscriber() { #if false // Subscribe to the channel named messages ISubscriber sub = redisConn.GetSubscriber(); sub.Subscribe(_channel, (channel, message) => #endif redisSub.Subscribe(rChannel, (channel, message) => { if (enableLog) { Log.Trace($"req setup ch {channel} | {message}"); } // messaggio PubSubEventArgs mea = new PubSubEventArgs(message); // se qualcuno ascolta sollevo evento nuovo valore... if (EA_NewMessage != null) { EA_NewMessage(this, mea); } }); if (enableLog) { Log.Info($"Subscribed {_channel}"); } } #endregion Private Methods } public class PubSubEventArgs : EventArgs { #region Public Constructors public PubSubEventArgs(string messaggio) { this.newMessage = messaggio; } #endregion Public Constructors #region Public Properties public string newMessage { get; set; } = ""; #endregion Public Properties } }