153 lines
4.2 KiB
C#
153 lines
4.2 KiB
C#
using NLog;
|
|
using StackExchange.Redis;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace MagMan.Core
|
|
{
|
|
public class MessagePipe
|
|
{
|
|
#region Public Constructors
|
|
|
|
public MessagePipe(IConnectionMultiplexer redisConn, string channelName, bool enableLog = false)
|
|
{
|
|
_channel = new RedisChannel(channelName, RedisChannel.PatternMode.Literal); ;
|
|
redis = redisConn;
|
|
redisDb = redis.GetDatabase();
|
|
this.enableLog = enableLog;
|
|
// aggiungo sottoscrittore
|
|
setupSubscriber();
|
|
}
|
|
|
|
#endregion Public Constructors
|
|
|
|
#region Public Events
|
|
|
|
public event EventHandler EA_NewMessage = delegate { };
|
|
|
|
#endregion Public Events
|
|
|
|
#region Public Methods
|
|
|
|
/// <summary>
|
|
/// Invio messaggio sul canale + salvataggio in cache REDIS
|
|
/// </summary>
|
|
/// <param name="memKey">Chiave REDIS x salvare valore</param>
|
|
/// <param name="message"></param>
|
|
public bool saveAndSendMessage(string memKey, string message)
|
|
{
|
|
bool answ = false;
|
|
Stopwatch stopWatch = new Stopwatch();
|
|
stopWatch.Start();
|
|
|
|
// invio notifica tramite il canale richiesto
|
|
answ = sendMessage(message);
|
|
if (redisDb != null)
|
|
{
|
|
redisDb.StringSetAsync(memKey, message);
|
|
}
|
|
stopWatch.Stop();
|
|
TimeSpan ts = stopWatch.Elapsed;
|
|
if (numSent.ContainsKey(memKey))
|
|
{
|
|
numSent[memKey]++;
|
|
}
|
|
else
|
|
{
|
|
numSent.Add(memKey, 1);
|
|
}
|
|
if (enableLog || numSent[memKey] > 30)
|
|
{
|
|
Log.Info($"saveAndSendMessage| mKey {memKey} x {numSent[memKey]} | {message.Length} size | {ts.TotalMilliseconds} ms");
|
|
|
|
numSent[memKey] = 0;
|
|
}
|
|
return answ;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Invio messaggio sul canale
|
|
/// </summary>
|
|
/// <param name="newMess"></param>
|
|
/// <returns></returns>
|
|
public bool sendMessage(string newMess)
|
|
{
|
|
bool answ = false;
|
|
ISubscriber sub = redis.GetSubscriber();
|
|
sub.Publish(_channel, newMess);
|
|
return answ;
|
|
}
|
|
|
|
#endregion Public Methods
|
|
|
|
#region Protected Fields
|
|
|
|
protected static Logger Log = LogManager.GetCurrentClassLogger();
|
|
|
|
#endregion Protected Fields
|
|
|
|
#region Private Fields
|
|
|
|
private bool enableLog = false;
|
|
private Dictionary<string, int> numSent = new Dictionary<string, int>();
|
|
private IConnectionMultiplexer redis;
|
|
private IDatabase? redisDb;
|
|
|
|
#endregion Private Fields
|
|
|
|
#region Private Properties
|
|
|
|
/// <summary>
|
|
/// Canale associato al gestore pipeline messaggi
|
|
/// </summary>
|
|
private RedisChannel _channel { get; set; } = new RedisChannel("Default", RedisChannel.PatternMode.Literal);
|
|
|
|
#endregion Private Properties
|
|
|
|
#region Private Methods
|
|
|
|
private void setupSubscriber()
|
|
{
|
|
ISubscriber sub = redis.GetSubscriber();
|
|
//Subscribe to the channel named messages
|
|
sub.Subscribe(_channel, (channel, message) =>
|
|
{
|
|
Log.Trace($"ch {channel} | {message}");
|
|
// messaggio
|
|
PubSubEventArgs mea = new PubSubEventArgs($"{message}");
|
|
// se qualcuno ascolta sollevo evento nuovo valore...
|
|
if (EA_NewMessage != null)
|
|
{
|
|
EA_NewMessage(this, mea);
|
|
}
|
|
});
|
|
Log.Info($"Subscribed {_channel}");
|
|
}
|
|
|
|
#endregion Private Methods
|
|
|
|
}
|
|
|
|
public class PubSubEventArgs : EventArgs
|
|
{
|
|
#region Public Constructors
|
|
|
|
public PubSubEventArgs(string messaggio)
|
|
{
|
|
this.newMessage = messaggio;
|
|
}
|
|
|
|
#endregion Public Constructors
|
|
|
|
#region Public Properties
|
|
|
|
public string newMessage { get; set; } = "";
|
|
|
|
#endregion Public Properties
|
|
}
|
|
}
|