diff --git a/IOB-PI/readParallela.py b/IOB-PI/readParallela.py index 2acce22..44f0e12 100644 --- a/IOB-PI/readParallela.py +++ b/IOB-PI/readParallela.py @@ -121,8 +121,7 @@ i_filter_counters = array ( 'i',[0,0,0,0,0,0,0,0,0,0,0,0]) # Gestione coda (condivisa) x registrazione eventi ed invio URL #print ("Creazione coda illimitata") -Coda = queue.Queue(0) -#Coda = Queue.Queue(0) +#Coda = queue.Queue(0) CodaR = redis.Redis(host='localhost', port=6379, db=0, password='24068Seriate') queue_name = 'IOB' @@ -133,27 +132,28 @@ queue_name = 'IOB' # per sostituzione 1:1 con coda in ram # Function to add an item to the queue (enqueue) -def enqueue(item): +def rqEnqueue(item): CodaR.rpush(queue_name, item) - print(f"Enqueued: {item}") + #logQue.info(f"Enqueued: {item}") # Function to remove an item from the queue (dequeue) -def dequeue(): +def rqDequeue(): item = CodaR.lpop(queue_name) if item: - print(f"Dequeued: {item.decode('utf-8')}") + #logQue.info(f"Dequeued: {item.decode('utf-8')}") return item.decode('utf-8') else: - print("Queue is empty") + logQue.info("Queue is empty, nothing to retrieve!") # Function to count queue actual lenght in Redis -def rqlen(): - clen = CodaR.llen(queue_name) - if clen: - print(f"Queue len: {clen}") - return clen +def rqLen(): + cLen = CodaR.llen(queue_name) + if cLen: + #logQue.info(f"Queue len: {cLen}") + return cLen else: - print("Queue is empty") + #logQue.info("Queue is empty") + return 0 #--------------------------------------------------------------- # lettura parallela @@ -390,18 +390,19 @@ def accoda(): try: dtEve = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3] - Coda.put(dtEve + '#' + value + '#' + cont) + rqEnqueue(dtEve + '#' + value + '#' + cont) + #Coda.put(dtEve + '#' + value + '#' + cont) - except queue.Full: - #except Queue.Full: - logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) ) + #except queue.Full: + ##except Queue.Full: + # logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) ) except Exception as e: - logPro.error( "NETWORK:Errore http-no com rete-timeout" + url ) + logPro.error( "QUEUE:Errore coda \n\n" ) logPro.error(str(e)) #-------------------------------------------------------------- # svuotaCoda x invio dati al server -def svuota_coda(): +def svuotaCoda(): global onLine global sending @@ -415,7 +416,8 @@ def svuota_coda(): #print ("start timer ok ") try: - if not Coda.empty(): + if not rqLen() == 0: + #if not Coda.empty(): #print ("coda da svuotare!") response = urllib.request.urlopen(URLALIVE) answ = response.read().decode('utf-8') @@ -454,13 +456,15 @@ def svuota_coda(): i = NMAXSEND while i >= 0: - if not Coda.empty(): + if not rqLen() == 0: + #if not Coda.empty(): # formatto dataOra corrente dtCurr = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3] #prendo primo elemento dalla coda - resp = Coda.get() + resp = rqDequeue() + #resp = Coda.get() # recupero valori da elemento coda! dtEve = resp.split("#")[0] @@ -756,12 +760,10 @@ avviaParallela() #-------------------------------------------------------------- -# MARCO: qui inserire avvio thread di "svuotaCoda" +# Qui avvio thread periodico di "svuotaCoda" -# avviaSvuotaCoda #print ("Avvia svuota coda") - -do_every ( SENDURLTIME , svuota_coda ); +do_every ( SENDURLTIME , svuotaCoda ); #--------------------------------------------------------------- # ciclo forever and ever diff --git a/IOB-PI/readParallela_12.py b/IOB-PI/readParallela_12.py index d8ddded..44f0e12 100644 --- a/IOB-PI/readParallela_12.py +++ b/IOB-PI/readParallela_12.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -# readParallela v. 2.5.3 12 Ingressi +# readParallela v. 2.6.0 12 Ingressi # - single instance timer # - invio multiplo x send eventi accodati # - gestione segnali BLINKING @@ -14,6 +14,7 @@ # - (2.5.1) Fix numero versione 18.05.2023 # - (2.5.2) Fix gestione eccezioni con report dettagliato # - (2.5.3) Fix gestione stringhe e print x python 3.11 in debian 12 / raspberry OS 2025 +# - (2.6.0) Aggiunto gestione Redis x code salvate ogni minuto e ricaricate all'avvio 2025.04.17 #--------------------------------------------------------------- # levare locking @@ -38,6 +39,7 @@ import logging.handlers import threading import queue #import Queue +import redis from array import * @@ -51,7 +53,7 @@ MAXRETRY = 10 # numero campioni filtraggio segnale ballerino MAX_COUNTER_BLINK = 10 -PROGRAM_NAME ="ReadPar IOB-pi v.2.5.3" +PROGRAM_NAME ="ReadPar IOB-pi v.2.6.0" # DA FILE CONF idxMacchina = "1001" @@ -119,11 +121,40 @@ i_filter_counters = array ( 'i',[0,0,0,0,0,0,0,0,0,0,0,0]) # Gestione coda (condivisa) x registrazione eventi ed invio URL #print ("Creazione coda illimitata") -Coda = queue.Queue(0) -#Coda = Queue.Queue(0) +#Coda = queue.Queue(0) +CodaR = redis.Redis(host='localhost', port=6379, db=0, password='24068Seriate') +queue_name = 'IOB' #queueLock = threading.Lock() +#----------------------------------- +# Gestione code REDIS +# per sostituzione 1:1 con coda in ram + +# Function to add an item to the queue (enqueue) +def rqEnqueue(item): + CodaR.rpush(queue_name, item) + #logQue.info(f"Enqueued: {item}") + +# Function to remove an item from the queue (dequeue) +def rqDequeue(): + item = CodaR.lpop(queue_name) + if item: + #logQue.info(f"Dequeued: {item.decode('utf-8')}") + return item.decode('utf-8') + else: + logQue.info("Queue is empty, nothing to retrieve!") + +# Function to count queue actual lenght in Redis +def rqLen(): + cLen = CodaR.llen(queue_name) + if cLen: + #logQue.info(f"Queue len: {cLen}") + return cLen + else: + #logQue.info("Queue is empty") + return 0 + #--------------------------------------------------------------- # lettura parallela # ritorna il byte letto pulito ( due char hex ) @@ -359,18 +390,19 @@ def accoda(): try: dtEve = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3] - Coda.put(dtEve + '#' + value + '#' + cont) + rqEnqueue(dtEve + '#' + value + '#' + cont) + #Coda.put(dtEve + '#' + value + '#' + cont) - #except Queue.Full: - except queue.Full: - logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) ) + #except queue.Full: + ##except Queue.Full: + # logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) ) except Exception as e: - logPro.error( "NETWORK:Errore http-no com rete-timeout" + url ) + logPro.error( "QUEUE:Errore coda \n\n" ) logPro.error(str(e)) #-------------------------------------------------------------- # svuotaCoda x invio dati al server -def svuota_coda(): +def svuotaCoda(): global onLine global sending @@ -384,7 +416,8 @@ def svuota_coda(): #print ("start timer ok ") try: - if not Coda.empty(): + if not rqLen() == 0: + #if not Coda.empty(): #print ("coda da svuotare!") response = urllib.request.urlopen(URLALIVE) answ = response.read().decode('utf-8') @@ -423,13 +456,15 @@ def svuota_coda(): i = NMAXSEND while i >= 0: - if not Coda.empty(): + if not rqLen() == 0: + #if not Coda.empty(): # formatto dataOra corrente dtCurr = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3] #prendo primo elemento dalla coda - resp = Coda.get() + resp = rqDequeue() + #resp = Coda.get() # recupero valori da elemento coda! dtEve = resp.split("#")[0] @@ -566,8 +601,8 @@ def avviaParallela(): # MAIN try: - #config = ConfigParser.RawConfigParser() config = configparser.RawConfigParser() + #config = ConfigParser.RawConfigParser() config.read ( 'IOB.cfg' ) SAMPLETIME = config.getfloat ( 'time' , 'SAMPLETIME' ) @@ -725,12 +760,10 @@ avviaParallela() #-------------------------------------------------------------- -# MARCO: qui inserire avvio thread di "svuotaCoda" +# Qui avvio thread periodico di "svuotaCoda" -# avviaSvuotaCoda #print ("Avvia svuota coda") - -do_every ( SENDURLTIME , svuota_coda ); +do_every ( SENDURLTIME , svuotaCoda ); #--------------------------------------------------------------- # ciclo forever and ever diff --git a/IOB-PI/readParallela_8.py b/IOB-PI/readParallela_8.py index 5665a20..6b6aeb1 100644 --- a/IOB-PI/readParallela_8.py +++ b/IOB-PI/readParallela_8.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -# readParallela v. 2.5.3 8 Ingressi +# readParallela v. 2.6.0 8 Ingressi # - single instance timer # - invio multiplo x send eventi accodati # - gestione segnali BLINKING @@ -14,6 +14,7 @@ # - (2.5.1) Fix numero versione 18.05.2023 # - (2.5.2) Fix gestione eccezioni con report dettagliato # - (2.5.3) Fix gestione stringhe e print x python 3.11 in debian 12 / raspberry OS 2025 +# - (2.6.0) Aggiunto gestione Redis x code salvate ogni minuto e ricaricate all'avvio 2025.04.17 #--------------------------------------------------------------- # levare locking @@ -38,6 +39,7 @@ import logging.handlers import threading import queue #import Queue +import redis from array import * @@ -51,7 +53,7 @@ MAXRETRY = 10 # numero campioni filtraggio segnale ballerino MAX_COUNTER_BLINK = 10 -PROGRAM_NAME ="ReadPar IOB-pi v.2.5.3" +PROGRAM_NAME ="ReadPar IOB-pi v.2.6.0" # DA FILE CONF idxMacchina = "1001" @@ -115,11 +117,40 @@ i_filter_counters = array ( 'i',[0,0,0,0,0,0,0,0]) # Gestione coda (condivisa) x registrazione eventi ed invio URL #print ("Creazione coda illimitata") -Coda = queue.Queue(0) -#Coda = Queue.Queue(0) +#Coda = queue.Queue(0) +CodaR = redis.Redis(host='localhost', port=6379, db=0, password='24068Seriate') +queue_name = 'IOB' #queueLock = threading.Lock() +#----------------------------------- +# Gestione code REDIS +# per sostituzione 1:1 con coda in ram + +# Function to add an item to the queue (enqueue) +def rqEnqueue(item): + CodaR.rpush(queue_name, item) + #logQue.info(f"Enqueued: {item}") + +# Function to remove an item from the queue (dequeue) +def rqDequeue(): + item = CodaR.lpop(queue_name) + if item: + #logQue.info(f"Dequeued: {item.decode('utf-8')}") + return item.decode('utf-8') + else: + logQue.info("Queue is empty, nothing to retrieve!") + +# Function to count queue actual lenght in Redis +def rqLen(): + cLen = CodaR.llen(queue_name) + if cLen: + #logQue.info(f"Queue len: {cLen}") + return cLen + else: + #logQue.info("Queue is empty") + return 0 + #--------------------------------------------------------------- # lettura parallela # ritorna il byte letto pulito ( due char hex ) @@ -316,18 +347,19 @@ def accoda(): try: dtEve = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3] - Coda.put(dtEve + '#' + value + '#' + cont) + rqEnqueue(dtEve + '#' + value + '#' + cont) + #Coda.put(dtEve + '#' + value + '#' + cont) - #except Queue.Full: - except queue.Full: - logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) ) + #except queue.Full: + ##except Queue.Full: + # logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) ) except Exception as e: - logPro.error( "NETWORK:Errore http-no com rete-timeout" + url ) + logPro.error( "QUEUE:Errore coda \n\n" ) logPro.error(str(e)) #-------------------------------------------------------------- # svuotaCoda x invio dati al server -def svuota_coda(): +def svuotaCoda(): global onLine global sending @@ -341,7 +373,8 @@ def svuota_coda(): #print ("start timer ok ") try: - if not Coda.empty(): + if not rqLen() == 0: + #if not Coda.empty(): #print ("coda da svuotare!") response = urllib.request.urlopen(URLALIVE) answ = response.read().decode('utf-8') @@ -380,13 +413,15 @@ def svuota_coda(): i = NMAXSEND while i >= 0: - if not Coda.empty(): + if not rqLen() == 0: + #if not Coda.empty(): # formatto dataOra corrente dtCurr = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3] #prendo primo elemento dalla coda - resp = Coda.get() + resp = rqDequeue() + #resp = Coda.get() # recupero valori da elemento coda! dtEve = resp.split("#")[0] @@ -515,8 +550,8 @@ def avviaParallela(): # MAIN try: - #config = ConfigParser.RawConfigParser() config = configparser.RawConfigParser() + #config = ConfigParser.RawConfigParser() config.read ( 'IOB.cfg' ) SAMPLETIME = config.getfloat ( 'time' , 'SAMPLETIME' ) @@ -662,12 +697,10 @@ avviaParallela() #-------------------------------------------------------------- -# MARCO: qui inserire avvio thread di "svuotaCoda" +# Qui avvio thread periodico di "svuotaCoda" -# avviaSvuotaCoda #print ("Avvia svuota coda") - -do_every ( SENDURLTIME , svuota_coda ); +do_every ( SENDURLTIME , svuotaCoda ); #--------------------------------------------------------------- # ciclo forever and ever