#!/usr/bin/python # -*- coding: utf-8 -*- # readParallela v. 3.1.3 8/12 Ingressi # - single instance timer # - invio multiplo x send eventi accodati # - gestione segnali BLINKING # - gestione INVERSIONE segnali cv 10-VII-2018 # - gestione FILTRAGGIO segnali brevi cv 23-VII-2018 # - (2.3) gestione 12 bit cv 14-I-2020 # - (2.4) fix ingressi e conf apertura parallela + gestione vari bit filtraggio x nuovi ingressi + update conf con 12 parametri bit SEL 15-I-2020 # - (2.4.8) versione adatta a raspberry PI vecchia generazione (GPIO corto, 8bit) # - (2.5) Fix (hope) ciclo "wait send to complete", gestione timeout (rety infinito se IO riparte in modo anomalo) # - (2.5.1) Fix numero versione 18.05.2023 # - (2.5.2) Fix gestione eccezioni con report dettagliato # - (2.5.3) Fix gestione stringhe e print x python 3.11 in debian 12 / raspberry OS 2025 # - (2.6.0) Aggiunto gestione Redis x code salvate ogni minuto e ricaricate all'avvio 2025.04.17 # - (2.6.1) Cleanup generale vecchia queue post test vari # - (2.6.2) Fix in global di to_retry in send_coda per evitare problemi # - (3.0.0) Prima versione code-assisted: riformulazione e ottimizzazione globale programma # - (3.1.2) Ottimizzazione gestione letture GPIO (sempre code-assisted) # - (3.1.3) Fix 12bit output, test 8/12 bit OK import time import sys import os import logging import logging.handlers import threading import configparser from datetime import datetime, timezone from array import array import redis import requests import urllib3 # Disable urllib3 debug logging to keep application logs clean urllib3.disable_warnings() logging.getLogger("urllib3").setLevel(logging.WARNING) # Note: RPi.GPIO is imported inside the class or at runtime to prevent errors on non-Pi systems try: import RPi.GPIO as GPIO except ImportError: GPIO = None class ReadParallelaIOB: def __init__(self, config_path='IOB.cfg'): self.PROGRAM_NAME = "ReadPar IOB-pi v.3.1.3 (2026)" self.MAXRETRY = 10 self.MAX_COUNTER_BLINK = 10 # Configuration and State self.config = configparser # Hardware Pin Maps self._PINS_8 = [11, 12, 13, 15, 16, 18, 22, 7] self._PINS_12 = [11, 12, 13, 15, 16, 18, 22, 7, 29, 31, 32, 36] # Default configuration (will be overwritten by load_config) self.num_params = 12 self.input_pins = self._PINS_12 # Internal State Arrays self.i_counters = array('i', [0] * 12) self.B_blinking = array('B', [0] * 12) self.B_previous = array('B', [0] * 12) self.B_input = array('B', [0] * 12) self.B_output = array('B', [0] * 12) self.B_inverting = array('B', [0] * 12) self.B_filter = array('B', [0] * 12) self.B_filter_prev = array('B', [0] * 12) self.B_temp = array('B', [0] * 12) self.i_filter_counters = array('i', [0] * 12) # Load configuration after arrays are initialized self.load_config(config_path) # Control Variables self.cont = 0 self.onLine = '1' self.sending = False self.timer_busy = False self.to_enable = False self.to_short = self.TIMEOUTSHORT self.to_long = self.TIMEOUTLONG self.to_retry = self.MAXRETRY # Redis self.CodaR = redis.Redis(host='localhost', port=6379, db=0, password='24068Seriate') self.queue_name = 'IOB' # Logging self.setup_logging() def load_config(self, path): """ Loads configuration parameters from the specified .cfg file. Sets up timing, URL, logging, and bit-specific settings (blinking, inversion, filtering). """ config = configparser.RawConfigParser() if not os.path.exists(path): print(f"Error: Config file {path} not found.") sys.exit(1) config.read(path) # 1. Determine number of parameters try: val = config.getint('id', 'numParams', fallback=12) if val in [8, 12]: self.num_params = val else: self.num_params = 12 except ValueError: self.num_params = 12 # 2. Update hardware pins and arrays based on num_params if self.num_params == 8: self.input_pins = self._PINS_8 else: self.input_pins = self._PINS_12 self.i_counters = array('i', [0] * self.num_params) self.B_blinking = array('B', [0] * self.num_params) self.B_previous = array('B', [0] * self.num_params) self.B_input = array('B', [0] * self.num_params) self.B_output = array('B', [0] * self.num_params) self.B_inverting = array('B', [0] * self.num_params) self.B_filter = array('B', [0] * self.num_params) self.B_filter_prev = array('B', [0] * self.num_params) self.B_temp = array('B', [0] * self.num_params) self.i_filter_counters = array('i', [0] * self.num_params) # 3. Load other parameters self.idxMacchina = config.get('id', 'idxMacchina') self.SAMPLETIME = config.getfloat('time', 'SAMPLETIME') self.TIMEOUTSHORT = config.getfloat('time', 'TIMEOUTSHORT') self.TIMEOUTLONG = config.getfloat('time', 'TIMEOUTLONG') self.SENDURLTIME = config.getfloat('time', 'SENDURLTIME') self.NMAXSEND = config.getint('time', 'NMAXSEND') self.URLBASE = config.get('web', 'URLBASE') self.URLENABLED = config.get('web', 'URLENABLED') self.URLALIVE = config.get('web', 'URLALIVE') self.URLADV1 = config.get('web', 'URLADV1') self.LOGFILE = config.get('log', 'LOGFILE') self.LOGLEVEL = config.get('log', 'LOGLEVEL') # 4. Load bit settings using loops for i in range(self.num_params): self.B_blinking[i] = config.getint('blink', f'bit{i}') self.MAX_COUNTER_BLINK = config.getint('blink', 'MAX_COUNTER_BLINK') for i in range(self.num_params): self.B_inverting[i] = config.getint('invert', f'bit{i}') self.B_filter[i] = config.getint('filter', f'bit{i}') self.MAX_COUNTER_FILTER = config.getint('filter', 'MAX_COUNTER_FILTER') def setup_logging(self): """ Configures the logging system. """ logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(name)-8s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=self.LOGFILE, filemode='a' ) self.logQue = logging.getLogger('queue') self.logSnd = logging.getLogger('sendUrl') self.logPro = logging.getLogger('program') def setup_gpio(self): """ Initializes the GPIO pins for input. """ if GPIO is None: self.logPro.error("GPIO library not found. Are you on a Raspberry Pi?") sys.exit(1) try: GPIO.setmode(GPIO.BOARD) GPIO.setwarnings(False) for pin in self.input_pins: GPIO.setup(pin, GPIO.IN) self.logPro.info("GPIO initialized successfully.") except Exception as e: self.logPro.error(f"GPIO Setup Error: {e}") sys.exit(1) def rqEnqueue(self, item): """ Adds an item to the Redis queue. """ self.CodaR.rpush(self.queue_name, item) def rqDequeue(self): """ Removes and returns an item from the Redis queue. """ item = self.CodaR.lpop(self.queue_name) return item.decode('utf-8') if item else None def rqLen(self): """ Returns the current length of the Redis queue. """ return self.CodaR.llen(self.queue_name) def readParallelaFiltrata(self): """ Performs the core logic: reads GPIO, applies inversion, filtering, and blinking, then reconstructs the value as a hex string. """ try: # 1. Efficient GPIO Read # Using a local reference for speed in loops gpio_input = GPIO.input pins = self.input_pins inverting = self.B_inverting input_arr = self.B_input for i in range(self.num_params): # Read and invert logic immediately if required raw_val = 0 if gpio_input(pins[i]) else 1 if inverting[i]: raw_val = 1 - raw_val input_arr[i] = raw_val # 2. Processing Loop (Filtering & Blinking) # Pre-caching references to reduce attribute lookups in tight loops filter_arr = self.B_filter filter_prev = self.B_filter_prev filter_counters = self.i_filter_counters max_filter = self.MAX_COUNTER_FILTER blinking = self.B_blinking previous = self.B_previous output_arr = self.B_output blink_counters = self.i_counters for i in range(self.num_params): # Debounce / Filter logic if filter_arr[i]: curr_in = input_arr[i] prev_in = filter_prev[i] if curr_in != prev_in: # State change detected if curr_in == 1: filter_counters[i] = max_filter if filter_counters[i] == 0 else 0 self.B_temp[i] = 0 if filter_counters[i] == max_filter else 1 else: filter_counters[i] = max_filter if filter_counters[i] == 0 else 0 self.B_temp[i] = 1 if filter_counters[i] == max_filter else 0 filter_prev[i] = curr_in input_arr[i] = self.B_temp[i] else: # No state change, maintain or decrement counter if filter_counters[i] > 0: filter_counters[i] -= 1 self.B_temp[i] = 0 if curr_in == 1 else 1 else: self.B_temp[i] = 1 if curr_in == 1 else 0 # Update the actual input array with the filtered value input_arr[i] = self.B_temp[i] # Blinking Logic if blinking[i] == 0: output_arr[i] = input_arr[i] else: if previous[i] != input_arr[i]: previous[i] = input_arr[i] if input_arr[i] == 1: output_arr[i] = 1 blink_counters[i] = self.MAX_COUNTER_BLINK else: if input_arr[i] == 0 and blink_counters[i] > 0: blink_counters[i] -= 1 if blink_counters[i] == 0: output_arr[i] = 0 # 3. Optimized Bitwise Reconstruction new_value = 0 for i in range(self.num_params): if output_arr[i]: new_value |= (1 << i) # Different output for 8 and 12 bit config if self.num_params == 8: return f"{new_value:02X}" else: return f"{new_value:03X}" except Exception as e: self.logPro.error(f"Error in readParallelaFiltrata: {e}") return '' def accoda(self, value): try: dt_eve = datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S%f')[:-3] self.rqEnqueue(f"{dt_eve}#{value}#{self.cont}") except Exception as e: self.logPro.error(f"QUEUE ERROR: {e}") def svuotaCoda(self): if self.timer_busy: return self.timer_busy = True try: if self.rqLen() > 0: # Check connectivity using requests try: res_alive = requests.get(self.URLALIVE, timeout=5) if res_alive.text == 'OK': res_enabled = requests.get(self.URLENABLED + self.idxMacchina, timeout=5) if res_enabled.text == 'OK': if self.onLine == '0': self.logPro.info("IOB ONLINE!") self.onLine = '1' else: self.onLine = '0' else: self.onLine = '0' except Exception as e: self.logPro.error(f"Server Connection Error: {e}") self.onLine = '0' if self.onLine == '1' and not self.sending: self.sending = True for _ in range(self.NMAXSEND): if self.rqLen() == 0: break resp = self.rqDequeue() if not resp: break parts = resp.split("#") dt_eve, val, cnt = parts[0], parts[1], parts[2] dt_curr = datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S%f')[:-3] url = f"{self.URLBASE}{self.idxMacchina}{self.URLADV1}{val}&dtCurr={dt_curr}&dtEve={dt_eve}&cnt={cnt}" try: r = requests.get(url, timeout=5) self.logSnd.info(f"{val} [{cnt}] R:{r.text}") except Exception as e: self.logSnd.error(f"Send Error: {e}") self.sending = False elif self.sending: if self.to_retry > 0: self.to_retry -= 1 self.logPro.info("WAIT active send to complete") else: self.sending = False self.to_retry = self.MAXRETRY self.logPro.info("END WAIT, reset to_retry") except Exception as e: self.logPro.error(f"svuotaCoda Error: {e}") finally: self.timer_busy = False def run(self): """ Starts the main execution loop: 1. Initializes GPIO. 2. Spawns a background daemon thread to empty the Redis queue. 3. Enters a loop to sample GPIO inputs, apply filters/blinking, and queue changes. """ self.logPro.info(" ") self.logPro.info("-----------------------------") self.logPro.info(self.PROGRAM_NAME) self.logPro.info(f"v: {self.num_params} bit") self.logPro.info("-----------------------------") self.setup_gpio() # Start background thread for queue emptying def timer_worker(): while True: self.svuotaCoda() time.sleep(self.SENDURLTIME) threading.Thread(target=timer_worker, daemon=True).start() old_value = '' self.logPro.info("Starting main loop") self.logPro.info(" ") while True: try: time.sleep(self.SAMPLETIME) value = self.readParallelaFiltrata() if value != '': if value != old_value: self.logQue.info(f"{value} [{self.cont}]") self.accoda(value) self.cont = (self.cont + 1) % 10000 self.to_enable = True self.to_short = self.TIMEOUTSHORT self.to_long = self.TIMEOUTLONG old_value = value # Handle Timeouts if self.to_enable: self.to_short -= self.SAMPLETIME if self.to_short <= 0: self.logQue.info(f">{value} [{self.cont}]") self.accoda(value) self.to_short = self.TIMEOUTSHORT self.to_enable = False self.to_long = self.TIMEOUTLONG self.to_long -= self.SAMPLETIME if self.to_long <= 0: self.logQue.info(f">>{value} [{self.cont}]") self.accoda(value) self.to_long = self.TIMEOUTLONG except KeyboardInterrupt: self.logPro.info("Keyboard interrupt received. Exiting...") break except Exception as e: self.logPro.error(f"Main loop error: {e}") if __name__ == "__main__": app = ReadParallelaIOB() app.run()