176 lines
4.8 KiB
C#
176 lines
4.8 KiB
C#
using NLog;
|
|
using StackExchange.Redis;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Threading.Channels;
|
|
|
|
namespace MP.Data
|
|
{
|
|
public class MessagePipe : IDisposable
|
|
{
|
|
#region Public Constructors
|
|
|
|
public MessagePipe(IConnectionMultiplexer redisConn, string channelName, bool enableLog = false)
|
|
{
|
|
_channel = channelName;
|
|
rChannel = new RedisChannel(_channel, RedisChannel.PatternMode.Literal);
|
|
this.redisConn = redisConn;
|
|
redisDb = this.redisConn.GetDatabase();
|
|
redisSub = this.redisConn.GetSubscriber();
|
|
this.enableLog = enableLog;
|
|
// aggiungo sottoscrittore
|
|
setupSubscriber();
|
|
}
|
|
|
|
#endregion Public Constructors
|
|
|
|
#region Public Events
|
|
|
|
public event EventHandler EA_NewMessage = delegate { };
|
|
|
|
#endregion Public Events
|
|
|
|
#region Public Methods
|
|
|
|
public void Dispose()
|
|
{
|
|
redisDb = null;
|
|
redisSub = null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Invio messaggio sul canale + salvataggio in cache REDIS
|
|
/// </summary>
|
|
/// <param name="memKey">Chiave REDIS x salvare valore</param>
|
|
/// <param name="message">Messaggio serializzato da inviare</param>
|
|
public bool saveAndSendMessage(string memKey, string message)
|
|
{
|
|
bool answ = false;
|
|
Stopwatch stopWatch = new Stopwatch();
|
|
stopWatch.Start();
|
|
|
|
// invio notifica tramite il canale richiesto
|
|
answ = sendMessage(message);
|
|
if (redisDb != null)
|
|
{
|
|
redisDb.StringSetAsync(memKey, message);
|
|
}
|
|
stopWatch.Stop();
|
|
TimeSpan ts = stopWatch.Elapsed;
|
|
if (numSent.ContainsKey(memKey))
|
|
{
|
|
numSent[memKey]++;
|
|
}
|
|
else
|
|
{
|
|
numSent.Add(memKey, 1);
|
|
}
|
|
if (enableLog || numSent[memKey] > 30)
|
|
{
|
|
Log.Info($"saveAndSendMessage| mKey {memKey} x {numSent[memKey]} | {message.Length} size | {ts.TotalMilliseconds} ms");
|
|
|
|
numSent[memKey] = 0;
|
|
}
|
|
return answ;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Invio messaggio sul canale
|
|
/// </summary>
|
|
/// <param name="newMess"></param>
|
|
/// <returns></returns>
|
|
public bool sendMessage(string newMess)
|
|
{
|
|
bool answ = false;
|
|
if (!string.IsNullOrEmpty(_channel))
|
|
{
|
|
#if false
|
|
ISubscriber sub = redis.GetSubscriber();
|
|
sub.Publish(_channel, newMess);
|
|
#endif
|
|
var numCli = redisSub.Publish(rChannel, newMess);
|
|
answ = numCli > 0;
|
|
}
|
|
return answ;
|
|
}
|
|
|
|
#endregion Public Methods
|
|
|
|
#region Protected Fields
|
|
|
|
protected static Logger Log = LogManager.GetCurrentClassLogger();
|
|
|
|
#endregion Protected Fields
|
|
|
|
#region Private Fields
|
|
|
|
/// <summary>
|
|
/// Nome Canale associato al gestore pipeline messaggi
|
|
/// </summary>
|
|
private string _channel = "";
|
|
|
|
private bool enableLog = false;
|
|
private Dictionary<string, int> numSent = new Dictionary<string, int>();
|
|
|
|
/// <summary>
|
|
/// Channel di comunicazione REDIS
|
|
/// </summary>
|
|
private RedisChannel rChannel;
|
|
|
|
private IConnectionMultiplexer redisConn;
|
|
private IDatabase redisDb;
|
|
private ISubscriber redisSub;
|
|
|
|
#endregion Private Fields
|
|
|
|
#region Private Methods
|
|
|
|
private void setupSubscriber()
|
|
{
|
|
#if false
|
|
// Subscribe to the channel named messages
|
|
ISubscriber sub = redisConn.GetSubscriber();
|
|
sub.Subscribe(_channel, (channel, message) =>
|
|
#endif
|
|
redisSub.Subscribe(rChannel, (channel, message) =>
|
|
{
|
|
if (enableLog)
|
|
{
|
|
Log.Trace($"req setup ch {channel} | {message}");
|
|
}
|
|
// messaggio
|
|
PubSubEventArgs mea = new PubSubEventArgs(message);
|
|
// se qualcuno ascolta sollevo evento nuovo valore...
|
|
if (EA_NewMessage != null)
|
|
{
|
|
EA_NewMessage(this, mea);
|
|
}
|
|
});
|
|
if (enableLog)
|
|
{
|
|
Log.Info($"Subscribed {_channel}");
|
|
}
|
|
}
|
|
|
|
#endregion Private Methods
|
|
}
|
|
|
|
public class PubSubEventArgs : EventArgs
|
|
{
|
|
#region Public Constructors
|
|
|
|
public PubSubEventArgs(string messaggio)
|
|
{
|
|
this.newMessage = messaggio;
|
|
}
|
|
|
|
#endregion Public Constructors
|
|
|
|
#region Public Properties
|
|
|
|
public string newMessage { get; set; } = "";
|
|
|
|
#endregion Public Properties
|
|
}
|
|
} |