Files
lux/EgwCoreLib.Lux.Data/Services/Internal/RedisSubscriptionManager.cs
T
2026-03-25 08:24:25 +01:00

107 lines
3.3 KiB
C#

namespace EgwCoreLib.Lux.Data.Services.Internal
{
public class RedisSubscriptionManager : IRedisSubscriptionManager, IDisposable
{
#region Public Constructors
public RedisSubscriptionManager(IConnectionMultiplexer connection)
{
_subscriber = connection.GetSubscriber();
}
#endregion Public Constructors
#region Public Methods
public void Dispose()
{
if (_disposed) return;
try
{
foreach (var channel in _subscriptions.Keys.ToList())
{
Unsubscribe(channel);
}
Log.Info($"RedisSubscriptionManager cleaned up all subscriptions");
}
catch (Exception ex)
{
Log.Error($"Error during RedisSubscriptionManager cleanup: {ex}");
}
_disposed = true;
}
public IEnumerable<string> GetActiveChannels() => _subscriptions.Keys;
public bool Subscribe(string channel, Action<RedisChannel, RedisValue> handler)
{
if (_disposed) throw new ObjectDisposedException(nameof(RedisSubscriptionManager));
lock (_subscriptions) // Prevent race condition
{
if (_subscriptions.ContainsKey(channel))
return false;
RedisChannel rChannel = new RedisChannel(channel, RedisChannel.PatternMode.Literal);
var subscription = new ChannelSubscription(channel, handler);
try
{
_subscriber.SubscribeAsync(rChannel, (channel, message) =>
{
handler(channel, message);
}).Wait(); // Wait for async operation
_subscriptions[channel] = subscription;
Log.Info($"✅ Subscribed to channel: {channel}");
}
catch (Exception ex)
{
Log.Error($"Failed to subscribe to channel {channel}: {ex}");
return false;
}
}
return true;
}
public bool TrySubscribe(string channel, Action<RedisChannel, RedisValue> handler)
{
try
{
return Subscribe(channel, handler);
}
catch
{
return false;
}
}
public bool Unsubscribe(string channel)
{
if (!_subscriptions.TryRemove(channel, out var sub))
return false;
RedisChannel rChannel = new RedisChannel(channel, RedisChannel.PatternMode.Literal);
_subscriber.Unsubscribe(rChannel, sub.Handler);
Log.Info($"❌ Unsubscribed from channel: {channel}");
return true;
}
#endregion Public Methods
#region Private Fields
private static Logger Log = LogManager.GetCurrentClassLogger();
private readonly ISubscriber _subscriber;
private readonly ConcurrentDictionary<string, ChannelSubscription> _subscriptions = new();
private bool _disposed = false;
#endregion Private Fields
}
public record ChannelSubscription(string Channel, Action<RedisChannel, RedisValue> Handler);
}