Files
2024-07-01 15:21:53 +02:00

153 lines
4.2 KiB
C#

using NLog;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MagMan.Core
{
public class MessagePipe
{
#region Public Constructors
public MessagePipe(IConnectionMultiplexer redisConn, string channelName, bool enableLog = false)
{
_channel = new RedisChannel(channelName, RedisChannel.PatternMode.Literal); ;
redis = redisConn;
redisDb = redis.GetDatabase();
this.enableLog = enableLog;
// aggiungo sottoscrittore
setupSubscriber();
}
#endregion Public Constructors
#region Public Events
public event EventHandler EA_NewMessage = delegate { };
#endregion Public Events
#region Public Methods
/// <summary>
/// Invio messaggio sul canale + salvataggio in cache REDIS
/// </summary>
/// <param name="memKey">Chiave REDIS x salvare valore</param>
/// <param name="message"></param>
public bool saveAndSendMessage(string memKey, string message)
{
bool answ = false;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
// invio notifica tramite il canale richiesto
answ = sendMessage(message);
if (redisDb != null)
{
redisDb.StringSetAsync(memKey, message);
}
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
if (numSent.ContainsKey(memKey))
{
numSent[memKey]++;
}
else
{
numSent.Add(memKey, 1);
}
if (enableLog || numSent[memKey] > 30)
{
Log.Info($"saveAndSendMessage| mKey {memKey} x {numSent[memKey]} | {message.Length} size | {ts.TotalMilliseconds} ms");
numSent[memKey] = 0;
}
return answ;
}
/// <summary>
/// Invio messaggio sul canale
/// </summary>
/// <param name="newMess"></param>
/// <returns></returns>
public bool sendMessage(string newMess)
{
bool answ = false;
ISubscriber sub = redis.GetSubscriber();
sub.Publish(_channel, newMess);
return answ;
}
#endregion Public Methods
#region Protected Fields
protected static Logger Log = LogManager.GetCurrentClassLogger();
#endregion Protected Fields
#region Private Fields
private bool enableLog = false;
private Dictionary<string, int> numSent = new Dictionary<string, int>();
private IConnectionMultiplexer redis;
private IDatabase? redisDb;
#endregion Private Fields
#region Private Properties
/// <summary>
/// Canale associato al gestore pipeline messaggi
/// </summary>
private RedisChannel _channel { get; set; } = new RedisChannel("Default", RedisChannel.PatternMode.Literal);
#endregion Private Properties
#region Private Methods
private void setupSubscriber()
{
ISubscriber sub = redis.GetSubscriber();
//Subscribe to the channel named messages
sub.Subscribe(_channel, (channel, message) =>
{
Log.Trace($"ch {channel} | {message}");
// messaggio
PubSubEventArgs mea = new PubSubEventArgs($"{message}");
// se qualcuno ascolta sollevo evento nuovo valore...
if (EA_NewMessage != null)
{
EA_NewMessage(this, mea);
}
});
Log.Info($"Subscribed {_channel}");
}
#endregion Private Methods
}
public class PubSubEventArgs : EventArgs
{
#region Public Constructors
public PubSubEventArgs(string messaggio)
{
this.newMessage = messaggio;
}
#endregion Public Constructors
#region Public Properties
public string newMessage { get; set; } = "";
#endregion Public Properties
}
}