84 lines
2.9 KiB
C#
84 lines
2.9 KiB
C#
using EgwCoreLib.Lux.Data.Services.Internal;
|
|
|
|
namespace Lux.API.Services
|
|
{
|
|
public class RedisSubscriberService : BackgroundService
|
|
{
|
|
#region Public Constructors
|
|
|
|
public RedisSubscriberService(
|
|
IConfiguration config,
|
|
IRedisSubscriptionManager subManager,
|
|
IServiceScopeFactory scopeFactory)
|
|
{
|
|
_config = config;
|
|
_subManager = subManager;
|
|
_scopeFactory = scopeFactory;
|
|
|
|
_redisBaseKey = _config.GetValue<string>("ServerConf:RedisBaseKey") ?? "Lux";
|
|
_channel = _config.GetValue<string>("ServerConf:ChannelSub") ?? "";
|
|
}
|
|
|
|
#endregion Public Constructors
|
|
|
|
#region Protected Fields
|
|
|
|
/// <summary>
|
|
/// Path base chiavi REDIS
|
|
/// </summary>
|
|
protected readonly string _redisBaseKey = "Lux:Cache";
|
|
|
|
#endregion Protected Fields
|
|
|
|
#region Protected Methods
|
|
|
|
protected override Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
#if false
|
|
_subManager.Subscribe(_channel, async (ch, msg) =>
|
|
{
|
|
// Ogni messaggio → nuovo scope
|
|
using var scope = _scopeFactory.CreateScope();
|
|
|
|
var processor = scope.ServiceProvider.GetRequiredService<ExternalMessageProcessor>();
|
|
|
|
await processor.HandleResultMessageAsync($"{ch}", $"{msg}");
|
|
});
|
|
#endif
|
|
// gestione messaggio che evita doppia esecuzione alla ricezione
|
|
_subManager.Subscribe(_channel, async (ch, msg) =>
|
|
{
|
|
using var scope = _scopeFactory.CreateScope();
|
|
var db = scope.ServiceProvider.GetRequiredService<IConnectionMultiplexer>().GetDatabase();
|
|
|
|
// 1. Genera una lock key basata sull'ID del messaggio (o sul contenuto se univoco)
|
|
string lockKey = $"LOCK:msg:{msg.GetHashCode()}";
|
|
|
|
// 2. Prova a impostare la chiave con un tempo di scadenza (TTL) di 5 secondi
|
|
// NX = Set if Not Exists
|
|
bool isLeader = await db.StringSetAsync(lockKey, "locked", TimeSpan.FromSeconds(5), When.NotExists);
|
|
|
|
if (isLeader)
|
|
{
|
|
// Solo l'istanza che arriva per prima entra qui
|
|
var processor = scope.ServiceProvider.GetRequiredService<ExternalMessageProcessor>();
|
|
await processor.HandleResultMessageAsync($"{ch}", $"{msg}");
|
|
}
|
|
// Se isLeader è false, l'altra istanza sta già elaborando o ha finito.
|
|
});
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
#endregion Protected Methods
|
|
|
|
#region Private Fields
|
|
|
|
private readonly string _channel;
|
|
private readonly IConfiguration _config;
|
|
private readonly IServiceScopeFactory _scopeFactory;
|
|
private readonly IRedisSubscriptionManager _subManager;
|
|
|
|
#endregion Private Fields
|
|
}
|
|
} |