Update readParallela con redis + copia in 12/8 bit
This commit is contained in:
+28
-26
@@ -121,8 +121,7 @@ i_filter_counters = array ( 'i',[0,0,0,0,0,0,0,0,0,0,0,0])
|
||||
# Gestione coda (condivisa) x registrazione eventi ed invio URL
|
||||
#print ("Creazione coda illimitata")
|
||||
|
||||
Coda = queue.Queue(0)
|
||||
#Coda = Queue.Queue(0)
|
||||
#Coda = queue.Queue(0)
|
||||
CodaR = redis.Redis(host='localhost', port=6379, db=0, password='24068Seriate')
|
||||
queue_name = 'IOB'
|
||||
|
||||
@@ -133,27 +132,28 @@ queue_name = 'IOB'
|
||||
# per sostituzione 1:1 con coda in ram
|
||||
|
||||
# Function to add an item to the queue (enqueue)
|
||||
def enqueue(item):
|
||||
def rqEnqueue(item):
|
||||
CodaR.rpush(queue_name, item)
|
||||
print(f"Enqueued: {item}")
|
||||
#logQue.info(f"Enqueued: {item}")
|
||||
|
||||
# Function to remove an item from the queue (dequeue)
|
||||
def dequeue():
|
||||
def rqDequeue():
|
||||
item = CodaR.lpop(queue_name)
|
||||
if item:
|
||||
print(f"Dequeued: {item.decode('utf-8')}")
|
||||
#logQue.info(f"Dequeued: {item.decode('utf-8')}")
|
||||
return item.decode('utf-8')
|
||||
else:
|
||||
print("Queue is empty")
|
||||
logQue.info("Queue is empty, nothing to retrieve!")
|
||||
|
||||
# Function to count queue actual lenght in Redis
|
||||
def rqlen():
|
||||
clen = CodaR.llen(queue_name)
|
||||
if clen:
|
||||
print(f"Queue len: {clen}")
|
||||
return clen
|
||||
def rqLen():
|
||||
cLen = CodaR.llen(queue_name)
|
||||
if cLen:
|
||||
#logQue.info(f"Queue len: {cLen}")
|
||||
return cLen
|
||||
else:
|
||||
print("Queue is empty")
|
||||
#logQue.info("Queue is empty")
|
||||
return 0
|
||||
|
||||
#---------------------------------------------------------------
|
||||
# lettura parallela
|
||||
@@ -390,18 +390,19 @@ def accoda():
|
||||
|
||||
try:
|
||||
dtEve = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]
|
||||
Coda.put(dtEve + '#' + value + '#' + cont)
|
||||
rqEnqueue(dtEve + '#' + value + '#' + cont)
|
||||
#Coda.put(dtEve + '#' + value + '#' + cont)
|
||||
|
||||
except queue.Full:
|
||||
#except Queue.Full:
|
||||
logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) )
|
||||
#except queue.Full:
|
||||
##except Queue.Full:
|
||||
# logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) )
|
||||
except Exception as e:
|
||||
logPro.error( "NETWORK:Errore http-no com rete-timeout" + url )
|
||||
logPro.error( "QUEUE:Errore coda \n\n" )
|
||||
logPro.error(str(e))
|
||||
|
||||
#--------------------------------------------------------------
|
||||
# svuotaCoda x invio dati al server
|
||||
def svuota_coda():
|
||||
def svuotaCoda():
|
||||
|
||||
global onLine
|
||||
global sending
|
||||
@@ -415,7 +416,8 @@ def svuota_coda():
|
||||
#print ("start timer ok ")
|
||||
|
||||
try:
|
||||
if not Coda.empty():
|
||||
if not rqLen() == 0:
|
||||
#if not Coda.empty():
|
||||
#print ("coda da svuotare!")
|
||||
response = urllib.request.urlopen(URLALIVE)
|
||||
answ = response.read().decode('utf-8')
|
||||
@@ -454,13 +456,15 @@ def svuota_coda():
|
||||
i = NMAXSEND
|
||||
|
||||
while i >= 0:
|
||||
if not Coda.empty():
|
||||
if not rqLen() == 0:
|
||||
#if not Coda.empty():
|
||||
|
||||
# formatto dataOra corrente
|
||||
dtCurr = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]
|
||||
|
||||
#prendo primo elemento dalla coda
|
||||
resp = Coda.get()
|
||||
resp = rqDequeue()
|
||||
#resp = Coda.get()
|
||||
|
||||
# recupero valori da elemento coda!
|
||||
dtEve = resp.split("#")[0]
|
||||
@@ -756,12 +760,10 @@ avviaParallela()
|
||||
|
||||
|
||||
#--------------------------------------------------------------
|
||||
# MARCO: qui inserire avvio thread di "svuotaCoda"
|
||||
# Qui avvio thread periodico di "svuotaCoda"
|
||||
|
||||
# avviaSvuotaCoda
|
||||
#print ("Avvia svuota coda")
|
||||
|
||||
do_every ( SENDURLTIME , svuota_coda );
|
||||
do_every ( SENDURLTIME , svuotaCoda );
|
||||
|
||||
#---------------------------------------------------------------
|
||||
# ciclo forever and ever
|
||||
|
||||
+51
-18
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# readParallela v. 2.5.3 12 Ingressi
|
||||
# readParallela v. 2.6.0 12 Ingressi
|
||||
# - single instance timer
|
||||
# - invio multiplo x send eventi accodati
|
||||
# - gestione segnali BLINKING
|
||||
@@ -14,6 +14,7 @@
|
||||
# - (2.5.1) Fix numero versione 18.05.2023
|
||||
# - (2.5.2) Fix gestione eccezioni con report dettagliato
|
||||
# - (2.5.3) Fix gestione stringhe e print x python 3.11 in debian 12 / raspberry OS 2025
|
||||
# - (2.6.0) Aggiunto gestione Redis x code salvate ogni minuto e ricaricate all'avvio 2025.04.17
|
||||
|
||||
#---------------------------------------------------------------
|
||||
# levare locking
|
||||
@@ -38,6 +39,7 @@ import logging.handlers
|
||||
import threading
|
||||
import queue
|
||||
#import Queue
|
||||
import redis
|
||||
|
||||
from array import *
|
||||
|
||||
@@ -51,7 +53,7 @@ MAXRETRY = 10
|
||||
# numero campioni filtraggio segnale ballerino
|
||||
MAX_COUNTER_BLINK = 10
|
||||
|
||||
PROGRAM_NAME ="ReadPar IOB-pi v.2.5.3"
|
||||
PROGRAM_NAME ="ReadPar IOB-pi v.2.6.0"
|
||||
|
||||
# DA FILE CONF
|
||||
idxMacchina = "1001"
|
||||
@@ -119,11 +121,40 @@ i_filter_counters = array ( 'i',[0,0,0,0,0,0,0,0,0,0,0,0])
|
||||
# Gestione coda (condivisa) x registrazione eventi ed invio URL
|
||||
#print ("Creazione coda illimitata")
|
||||
|
||||
Coda = queue.Queue(0)
|
||||
#Coda = Queue.Queue(0)
|
||||
#Coda = queue.Queue(0)
|
||||
CodaR = redis.Redis(host='localhost', port=6379, db=0, password='24068Seriate')
|
||||
queue_name = 'IOB'
|
||||
|
||||
#queueLock = threading.Lock()
|
||||
|
||||
#-----------------------------------
|
||||
# Gestione code REDIS
|
||||
# per sostituzione 1:1 con coda in ram
|
||||
|
||||
# Function to add an item to the queue (enqueue)
|
||||
def rqEnqueue(item):
|
||||
CodaR.rpush(queue_name, item)
|
||||
#logQue.info(f"Enqueued: {item}")
|
||||
|
||||
# Function to remove an item from the queue (dequeue)
|
||||
def rqDequeue():
|
||||
item = CodaR.lpop(queue_name)
|
||||
if item:
|
||||
#logQue.info(f"Dequeued: {item.decode('utf-8')}")
|
||||
return item.decode('utf-8')
|
||||
else:
|
||||
logQue.info("Queue is empty, nothing to retrieve!")
|
||||
|
||||
# Function to count queue actual lenght in Redis
|
||||
def rqLen():
|
||||
cLen = CodaR.llen(queue_name)
|
||||
if cLen:
|
||||
#logQue.info(f"Queue len: {cLen}")
|
||||
return cLen
|
||||
else:
|
||||
#logQue.info("Queue is empty")
|
||||
return 0
|
||||
|
||||
#---------------------------------------------------------------
|
||||
# lettura parallela
|
||||
# ritorna il byte letto pulito ( due char hex )
|
||||
@@ -359,18 +390,19 @@ def accoda():
|
||||
|
||||
try:
|
||||
dtEve = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]
|
||||
Coda.put(dtEve + '#' + value + '#' + cont)
|
||||
rqEnqueue(dtEve + '#' + value + '#' + cont)
|
||||
#Coda.put(dtEve + '#' + value + '#' + cont)
|
||||
|
||||
#except Queue.Full:
|
||||
except queue.Full:
|
||||
logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) )
|
||||
#except queue.Full:
|
||||
##except Queue.Full:
|
||||
# logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) )
|
||||
except Exception as e:
|
||||
logPro.error( "NETWORK:Errore http-no com rete-timeout" + url )
|
||||
logPro.error( "QUEUE:Errore coda \n\n" )
|
||||
logPro.error(str(e))
|
||||
|
||||
#--------------------------------------------------------------
|
||||
# svuotaCoda x invio dati al server
|
||||
def svuota_coda():
|
||||
def svuotaCoda():
|
||||
|
||||
global onLine
|
||||
global sending
|
||||
@@ -384,7 +416,8 @@ def svuota_coda():
|
||||
#print ("start timer ok ")
|
||||
|
||||
try:
|
||||
if not Coda.empty():
|
||||
if not rqLen() == 0:
|
||||
#if not Coda.empty():
|
||||
#print ("coda da svuotare!")
|
||||
response = urllib.request.urlopen(URLALIVE)
|
||||
answ = response.read().decode('utf-8')
|
||||
@@ -423,13 +456,15 @@ def svuota_coda():
|
||||
i = NMAXSEND
|
||||
|
||||
while i >= 0:
|
||||
if not Coda.empty():
|
||||
if not rqLen() == 0:
|
||||
#if not Coda.empty():
|
||||
|
||||
# formatto dataOra corrente
|
||||
dtCurr = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]
|
||||
|
||||
#prendo primo elemento dalla coda
|
||||
resp = Coda.get()
|
||||
resp = rqDequeue()
|
||||
#resp = Coda.get()
|
||||
|
||||
# recupero valori da elemento coda!
|
||||
dtEve = resp.split("#")[0]
|
||||
@@ -566,8 +601,8 @@ def avviaParallela():
|
||||
# MAIN
|
||||
|
||||
try:
|
||||
#config = ConfigParser.RawConfigParser()
|
||||
config = configparser.RawConfigParser()
|
||||
#config = ConfigParser.RawConfigParser()
|
||||
config.read ( 'IOB.cfg' )
|
||||
|
||||
SAMPLETIME = config.getfloat ( 'time' , 'SAMPLETIME' )
|
||||
@@ -725,12 +760,10 @@ avviaParallela()
|
||||
|
||||
|
||||
#--------------------------------------------------------------
|
||||
# MARCO: qui inserire avvio thread di "svuotaCoda"
|
||||
# Qui avvio thread periodico di "svuotaCoda"
|
||||
|
||||
# avviaSvuotaCoda
|
||||
#print ("Avvia svuota coda")
|
||||
|
||||
do_every ( SENDURLTIME , svuota_coda );
|
||||
do_every ( SENDURLTIME , svuotaCoda );
|
||||
|
||||
#---------------------------------------------------------------
|
||||
# ciclo forever and ever
|
||||
|
||||
+51
-18
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# readParallela v. 2.5.3 8 Ingressi
|
||||
# readParallela v. 2.6.0 8 Ingressi
|
||||
# - single instance timer
|
||||
# - invio multiplo x send eventi accodati
|
||||
# - gestione segnali BLINKING
|
||||
@@ -14,6 +14,7 @@
|
||||
# - (2.5.1) Fix numero versione 18.05.2023
|
||||
# - (2.5.2) Fix gestione eccezioni con report dettagliato
|
||||
# - (2.5.3) Fix gestione stringhe e print x python 3.11 in debian 12 / raspberry OS 2025
|
||||
# - (2.6.0) Aggiunto gestione Redis x code salvate ogni minuto e ricaricate all'avvio 2025.04.17
|
||||
|
||||
#---------------------------------------------------------------
|
||||
# levare locking
|
||||
@@ -38,6 +39,7 @@ import logging.handlers
|
||||
import threading
|
||||
import queue
|
||||
#import Queue
|
||||
import redis
|
||||
|
||||
from array import *
|
||||
|
||||
@@ -51,7 +53,7 @@ MAXRETRY = 10
|
||||
# numero campioni filtraggio segnale ballerino
|
||||
MAX_COUNTER_BLINK = 10
|
||||
|
||||
PROGRAM_NAME ="ReadPar IOB-pi v.2.5.3"
|
||||
PROGRAM_NAME ="ReadPar IOB-pi v.2.6.0"
|
||||
|
||||
# DA FILE CONF
|
||||
idxMacchina = "1001"
|
||||
@@ -115,11 +117,40 @@ i_filter_counters = array ( 'i',[0,0,0,0,0,0,0,0])
|
||||
# Gestione coda (condivisa) x registrazione eventi ed invio URL
|
||||
#print ("Creazione coda illimitata")
|
||||
|
||||
Coda = queue.Queue(0)
|
||||
#Coda = Queue.Queue(0)
|
||||
#Coda = queue.Queue(0)
|
||||
CodaR = redis.Redis(host='localhost', port=6379, db=0, password='24068Seriate')
|
||||
queue_name = 'IOB'
|
||||
|
||||
#queueLock = threading.Lock()
|
||||
|
||||
#-----------------------------------
|
||||
# Gestione code REDIS
|
||||
# per sostituzione 1:1 con coda in ram
|
||||
|
||||
# Function to add an item to the queue (enqueue)
|
||||
def rqEnqueue(item):
|
||||
CodaR.rpush(queue_name, item)
|
||||
#logQue.info(f"Enqueued: {item}")
|
||||
|
||||
# Function to remove an item from the queue (dequeue)
|
||||
def rqDequeue():
|
||||
item = CodaR.lpop(queue_name)
|
||||
if item:
|
||||
#logQue.info(f"Dequeued: {item.decode('utf-8')}")
|
||||
return item.decode('utf-8')
|
||||
else:
|
||||
logQue.info("Queue is empty, nothing to retrieve!")
|
||||
|
||||
# Function to count queue actual lenght in Redis
|
||||
def rqLen():
|
||||
cLen = CodaR.llen(queue_name)
|
||||
if cLen:
|
||||
#logQue.info(f"Queue len: {cLen}")
|
||||
return cLen
|
||||
else:
|
||||
#logQue.info("Queue is empty")
|
||||
return 0
|
||||
|
||||
#---------------------------------------------------------------
|
||||
# lettura parallela
|
||||
# ritorna il byte letto pulito ( due char hex )
|
||||
@@ -316,18 +347,19 @@ def accoda():
|
||||
|
||||
try:
|
||||
dtEve = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]
|
||||
Coda.put(dtEve + '#' + value + '#' + cont)
|
||||
rqEnqueue(dtEve + '#' + value + '#' + cont)
|
||||
#Coda.put(dtEve + '#' + value + '#' + cont)
|
||||
|
||||
#except Queue.Full:
|
||||
except queue.Full:
|
||||
logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) )
|
||||
#except queue.Full:
|
||||
##except Queue.Full:
|
||||
# logPro.error( "Queue full" + str(dtEve) + '#' + str(value) + '#' + str(cont) )
|
||||
except Exception as e:
|
||||
logPro.error( "NETWORK:Errore http-no com rete-timeout" + url )
|
||||
logPro.error( "QUEUE:Errore coda \n\n" )
|
||||
logPro.error(str(e))
|
||||
|
||||
#--------------------------------------------------------------
|
||||
# svuotaCoda x invio dati al server
|
||||
def svuota_coda():
|
||||
def svuotaCoda():
|
||||
|
||||
global onLine
|
||||
global sending
|
||||
@@ -341,7 +373,8 @@ def svuota_coda():
|
||||
#print ("start timer ok ")
|
||||
|
||||
try:
|
||||
if not Coda.empty():
|
||||
if not rqLen() == 0:
|
||||
#if not Coda.empty():
|
||||
#print ("coda da svuotare!")
|
||||
response = urllib.request.urlopen(URLALIVE)
|
||||
answ = response.read().decode('utf-8')
|
||||
@@ -380,13 +413,15 @@ def svuota_coda():
|
||||
i = NMAXSEND
|
||||
|
||||
while i >= 0:
|
||||
if not Coda.empty():
|
||||
if not rqLen() == 0:
|
||||
#if not Coda.empty():
|
||||
|
||||
# formatto dataOra corrente
|
||||
dtCurr = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]
|
||||
|
||||
#prendo primo elemento dalla coda
|
||||
resp = Coda.get()
|
||||
resp = rqDequeue()
|
||||
#resp = Coda.get()
|
||||
|
||||
# recupero valori da elemento coda!
|
||||
dtEve = resp.split("#")[0]
|
||||
@@ -515,8 +550,8 @@ def avviaParallela():
|
||||
# MAIN
|
||||
|
||||
try:
|
||||
#config = ConfigParser.RawConfigParser()
|
||||
config = configparser.RawConfigParser()
|
||||
#config = ConfigParser.RawConfigParser()
|
||||
config.read ( 'IOB.cfg' )
|
||||
|
||||
SAMPLETIME = config.getfloat ( 'time' , 'SAMPLETIME' )
|
||||
@@ -662,12 +697,10 @@ avviaParallela()
|
||||
|
||||
|
||||
#--------------------------------------------------------------
|
||||
# MARCO: qui inserire avvio thread di "svuotaCoda"
|
||||
# Qui avvio thread periodico di "svuotaCoda"
|
||||
|
||||
# avviaSvuotaCoda
|
||||
#print ("Avvia svuota coda")
|
||||
|
||||
do_every ( SENDURLTIME , svuota_coda );
|
||||
do_every ( SENDURLTIME , svuotaCoda );
|
||||
|
||||
#---------------------------------------------------------------
|
||||
# ciclo forever and ever
|
||||
|
||||
Reference in New Issue
Block a user