Files
Mapo-IOB/IOB-PI/readParallela.py
2026-05-13 09:39:50 +02:00

439 lines
17 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# readParallela v. 3.1.3 8/12 Ingressi
# - single instance timer
# - invio multiplo x send eventi accodati
# - gestione segnali BLINKING
# - gestione INVERSIONE segnali cv 10-VII-2018
# - gestione FILTRAGGIO segnali brevi cv 23-VII-2018
# - (2.3) gestione 12 bit cv 14-I-2020
# - (2.4) fix ingressi e conf apertura parallela + gestione vari bit filtraggio x nuovi ingressi + update conf con 12 parametri bit SEL 15-I-2020
# - (2.4.8) versione adatta a raspberry PI vecchia generazione (GPIO corto, 8bit)
# - (2.5) Fix (hope) ciclo "wait send to complete", gestione timeout (rety infinito se IO riparte in modo anomalo)
# - (2.5.1) Fix numero versione 18.05.2023
# - (2.5.2) Fix gestione eccezioni con report dettagliato
# - (2.5.3) Fix gestione stringhe e print x python 3.11 in debian 12 / raspberry OS 2025
# - (2.6.0) Aggiunto gestione Redis x code salvate ogni minuto e ricaricate all'avvio 2025.04.17
# - (2.6.1) Cleanup generale vecchia queue post test vari
# - (2.6.2) Fix in global di to_retry in send_coda per evitare problemi
# - (3.0.0) Prima versione code-assisted: riformulazione e ottimizzazione globale programma
# - (3.1.2) Ottimizzazione gestione letture GPIO (sempre code-assisted)
# - (3.1.3) Fix 12bit output, test 8/12 bit OK
import time
import sys
import os
import logging
import logging.handlers
import threading
import configparser
from datetime import datetime, timezone
from array import array
import redis
import requests
import urllib3
# Disable urllib3 debug logging to keep application logs clean
urllib3.disable_warnings()
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Note: RPi.GPIO is imported inside the class or at runtime to prevent errors on non-Pi systems
try:
import RPi.GPIO as GPIO
except ImportError:
GPIO = None
class ReadParallelaIOB:
def __init__(self, config_path='IOB.cfg'):
self.PROGRAM_NAME = "ReadPar IOB-pi v.3.1.3 (2026)"
self.MAXRETRY = 10
self.MAX_COUNTER_BLINK = 10
# Configuration and State
self.config = configparser
# Hardware Pin Maps
self._PINS_8 = [11, 12, 13, 15, 16, 18, 22, 7]
self._PINS_12 = [11, 12, 13, 15, 16, 18, 22, 7, 29, 31, 32, 36]
# Default configuration (will be overwritten by load_config)
self.num_params = 12
self.input_pins = self._PINS_12
# Internal State Arrays
self.i_counters = array('i', [0] * 12)
self.B_blinking = array('B', [0] * 12)
self.B_previous = array('B', [0] * 12)
self.B_input = array('B', [0] * 12)
self.B_output = array('B', [0] * 12)
self.B_inverting = array('B', [0] * 12)
self.B_filter = array('B', [0] * 12)
self.B_filter_prev = array('B', [0] * 12)
self.B_temp = array('B', [0] * 12)
self.i_filter_counters = array('i', [0] * 12)
# Load configuration after arrays are initialized
self.load_config(config_path)
# Control Variables
self.cont = 0
self.onLine = '1'
self.sending = False
self.timer_busy = False
self.to_enable = False
self.to_short = self.TIMEOUTSHORT
self.to_long = self.TIMEOUTLONG
self.to_retry = self.MAXRETRY
# Redis
self.CodaR = redis.Redis(host='localhost', port=6379, db=0, password='24068Seriate')
self.queue_name = 'IOB'
# Logging
self.setup_logging()
def load_config(self, path):
"""
Loads configuration parameters from the specified .cfg file.
Sets up timing, URL, logging, and bit-specific settings (blinking, inversion, filtering).
"""
config = configparser.RawConfigParser()
if not os.path.exists(path):
print(f"Error: Config file {path} not found.")
sys.exit(1)
config.read(path)
# 1. Determine number of parameters
try:
val = config.getint('id', 'numParams', fallback=12)
if val in [8, 12]:
self.num_params = val
else:
self.num_params = 12
except ValueError:
self.num_params = 12
# 2. Update hardware pins and arrays based on num_params
if self.num_params == 8:
self.input_pins = self._PINS_8
else:
self.input_pins = self._PINS_12
self.i_counters = array('i', [0] * self.num_params)
self.B_blinking = array('B', [0] * self.num_params)
self.B_previous = array('B', [0] * self.num_params)
self.B_input = array('B', [0] * self.num_params)
self.B_output = array('B', [0] * self.num_params)
self.B_inverting = array('B', [0] * self.num_params)
self.B_filter = array('B', [0] * self.num_params)
self.B_filter_prev = array('B', [0] * self.num_params)
self.B_temp = array('B', [0] * self.num_params)
self.i_filter_counters = array('i', [0] * self.num_params)
# 3. Load other parameters
self.idxMacchina = config.get('id', 'idxMacchina')
self.SAMPLETIME = config.getfloat('time', 'SAMPLETIME')
self.TIMEOUTSHORT = config.getfloat('time', 'TIMEOUTSHORT')
self.TIMEOUTLONG = config.getfloat('time', 'TIMEOUTLONG')
self.SENDURLTIME = config.getfloat('time', 'SENDURLTIME')
self.NMAXSEND = config.getint('time', 'NMAXSEND')
self.URLBASE = config.get('web', 'URLBASE')
self.URLENABLED = config.get('web', 'URLENABLED')
self.URLALIVE = config.get('web', 'URLALIVE')
self.URLADV1 = config.get('web', 'URLADV1')
self.LOGFILE = config.get('log', 'LOGFILE')
self.LOGLEVEL = config.get('log', 'LOGLEVEL')
# 4. Load bit settings using loops
for i in range(self.num_params):
self.B_blinking[i] = config.getint('blink', f'bit{i}')
self.MAX_COUNTER_BLINK = config.getint('blink', 'MAX_COUNTER_BLINK')
for i in range(self.num_params):
self.B_inverting[i] = config.getint('invert', f'bit{i}')
self.B_filter[i] = config.getint('filter', f'bit{i}')
self.MAX_COUNTER_FILTER = config.getint('filter', 'MAX_COUNTER_FILTER')
def setup_logging(self):
"""
Configures the logging system.
"""
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(name)-8s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename=self.LOGFILE,
filemode='a'
)
self.logQue = logging.getLogger('queue')
self.logSnd = logging.getLogger('sendUrl')
self.logPro = logging.getLogger('program')
def setup_gpio(self):
"""
Initializes the GPIO pins for input.
"""
if GPIO is None:
self.logPro.error("GPIO library not found. Are you on a Raspberry Pi?")
sys.exit(1)
try:
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
for pin in self.input_pins:
GPIO.setup(pin, GPIO.IN)
self.logPro.info("GPIO initialized successfully.")
except Exception as e:
self.logPro.error(f"GPIO Setup Error: {e}")
sys.exit(1)
def rqEnqueue(self, item):
"""
Adds an item to the Redis queue.
"""
self.CodaR.rpush(self.queue_name, item)
def rqDequeue(self):
"""
Removes and returns an item from the Redis queue.
"""
item = self.CodaR.lpop(self.queue_name)
return item.decode('utf-8') if item else None
def rqLen(self):
"""
Returns the current length of the Redis queue.
"""
return self.CodaR.llen(self.queue_name)
def readParallelaFiltrata(self):
"""
Performs the core logic: reads GPIO, applies inversion, filtering,
and blinking, then reconstructs the value as a hex string.
"""
try:
# 1. Efficient GPIO Read
# Using a local reference for speed in loops
gpio_input = GPIO.input
pins = self.input_pins
inverting = self.B_inverting
input_arr = self.B_input
for i in range(self.num_params):
# Read and invert logic immediately if required
raw_val = 0 if gpio_input(pins[i]) else 1
if inverting[i]:
raw_val = 1 - raw_val
input_arr[i] = raw_val
# 2. Processing Loop (Filtering & Blinking)
# Pre-caching references to reduce attribute lookups in tight loops
filter_arr = self.B_filter
filter_prev = self.B_filter_prev
filter_counters = self.i_filter_counters
max_filter = self.MAX_COUNTER_FILTER
blinking = self.B_blinking
previous = self.B_previous
output_arr = self.B_output
blink_counters = self.i_counters
for i in range(self.num_params):
# Debounce / Filter logic
if filter_arr[i]:
curr_in = input_arr[i]
prev_in = filter_prev[i]
if curr_in != prev_in:
# State change detected
if curr_in == 1:
filter_counters[i] = max_filter if filter_counters[i] == 0 else 0
self.B_temp[i] = 0 if filter_counters[i] == max_filter else 1
else:
filter_counters[i] = max_filter if filter_counters[i] == 0 else 0
self.B_temp[i] = 1 if filter_counters[i] == max_filter else 0
filter_prev[i] = curr_in
input_arr[i] = self.B_temp[i]
else:
# No state change, maintain or decrement counter
if filter_counters[i] > 0:
filter_counters[i] -= 1
self.B_temp[i] = 0 if curr_in == 1 else 1
else:
self.B_temp[i] = 1 if curr_in == 1 else 0
# Update the actual input array with the filtered value
input_arr[i] = self.B_temp[i]
# Blinking Logic
if blinking[i] == 0:
output_arr[i] = input_arr[i]
else:
if previous[i] != input_arr[i]:
previous[i] = input_arr[i]
if input_arr[i] == 1:
output_arr[i] = 1
blink_counters[i] = self.MAX_COUNTER_BLINK
else:
if input_arr[i] == 0 and blink_counters[i] > 0:
blink_counters[i] -= 1
if blink_counters[i] == 0:
output_arr[i] = 0
# 3. Optimized Bitwise Reconstruction
new_value = 0
for i in range(self.num_params):
if output_arr[i]:
new_value |= (1 << i)
# Different output for 8 and 12 bit config
if self.num_params == 8:
return f"{new_value:02X}"
else:
return f"{new_value:03X}"
except Exception as e:
self.logPro.error(f"Error in readParallelaFiltrata: {e}")
return ''
def accoda(self, value):
try:
dt_eve = datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S%f')[:-3]
self.rqEnqueue(f"{dt_eve}#{value}#{self.cont}")
except Exception as e:
self.logPro.error(f"QUEUE ERROR: {e}")
def svuotaCoda(self):
if self.timer_busy:
return
self.timer_busy = True
try:
if self.rqLen() > 0:
# Check connectivity using requests
try:
res_alive = requests.get(self.URLALIVE, timeout=5)
if res_alive.text == 'OK':
res_enabled = requests.get(self.URLENABLED + self.idxMacchina, timeout=5)
if res_enabled.text == 'OK':
if self.onLine == '0':
self.logPro.info("IOB ONLINE!")
self.onLine = '1'
else:
self.onLine = '0'
else:
self.onLine = '0'
except Exception as e:
self.logPro.error(f"Server Connection Error: {e}")
self.onLine = '0'
if self.onLine == '1' and not self.sending:
self.sending = True
for _ in range(self.NMAXSEND):
if self.rqLen() == 0:
break
resp = self.rqDequeue()
if not resp: break
parts = resp.split("#")
dt_eve, val, cnt = parts[0], parts[1], parts[2]
dt_curr = datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S%f')[:-3]
url = f"{self.URLBASE}{self.idxMacchina}{self.URLADV1}{val}&dtCurr={dt_curr}&dtEve={dt_eve}&cnt={cnt}"
try:
r = requests.get(url, timeout=5)
self.logSnd.info(f"{val} [{cnt}] R:{r.text}")
except Exception as e:
self.logSnd.error(f"Send Error: {e}")
self.sending = False
elif self.sending:
if self.to_retry > 0:
self.to_retry -= 1
self.logPro.info("WAIT active send to complete")
else:
self.sending = False
self.to_retry = self.MAXRETRY
self.logPro.info("END WAIT, reset to_retry")
except Exception as e:
self.logPro.error(f"svuotaCoda Error: {e}")
finally:
self.timer_busy = False
def run(self):
"""
Starts the main execution loop:
1. Initializes GPIO.
2. Spawns a background daemon thread to empty the Redis queue.
3. Enters a loop to sample GPIO inputs, apply filters/blinking, and queue changes.
"""
self.logPro.info(" ")
self.logPro.info("-----------------------------")
self.logPro.info(self.PROGRAM_NAME)
self.logPro.info(f"v: {self.num_params} bit")
self.logPro.info("-----------------------------")
self.setup_gpio()
# Start background thread for queue emptying
def timer_worker():
while True:
self.svuotaCoda()
time.sleep(self.SENDURLTIME)
threading.Thread(target=timer_worker, daemon=True).start()
old_value = ''
self.logPro.info("Starting main loop")
self.logPro.info(" ")
while True:
try:
time.sleep(self.SAMPLETIME)
value = self.readParallelaFiltrata()
if value != '':
if value != old_value:
self.logQue.info(f"{value} [{self.cont}]")
self.accoda(value)
self.cont = (self.cont + 1) % 10000
self.to_enable = True
self.to_short = self.TIMEOUTSHORT
self.to_long = self.TIMEOUTLONG
old_value = value
# Handle Timeouts
if self.to_enable:
self.to_short -= self.SAMPLETIME
if self.to_short <= 0:
self.logQue.info(f">{value} [{self.cont}]")
self.accoda(value)
self.to_short = self.TIMEOUTSHORT
self.to_enable = False
self.to_long = self.TIMEOUTLONG
self.to_long -= self.SAMPLETIME
if self.to_long <= 0:
self.logQue.info(f">>{value} [{self.cont}]")
self.accoda(value)
self.to_long = self.TIMEOUTLONG
except KeyboardInterrupt:
self.logPro.info("Keyboard interrupt received. Exiting...")
break
except Exception as e:
self.logPro.error(f"Main loop error: {e}")
if __name__ == "__main__":
app = ReadParallelaIOB()
app.run()