import os import re import logging import subprocess # Per eseguire comandi esterni come 'gpio' se necessario # --- Mock/Placeholder per le dipendenze esterne e configurazione --- # In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale. # Configura un logger di base per le funzioni di log logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def log_write(log_type, level, message): """Simula la funzione log_write dal tuo script Bash.""" if level == "info": logging.info(f"[{log_type}] {message}") elif level == "warning": logging.warning(f"[{log_type}] {message}") elif level == "error": logging.error(f"[{log_type}] {message}") else: logging.debug(f"[{log_type}] {message}") def message_write(msg_type, message): """Simula la funzione message_write dal tuo script Bash.""" if msg_type == 'info': logging.info(f"[message] INFO: {message}") elif msg_type == 'warning': logging.warning(f"[message] WARNING: {message}") elif msg_type == 'success': logging.info(f"[message] SUCCESS: {message}") # Variabili di configurazione simulate (dovrebbero venire dal piGarden.conf) # Queste saranno passate alla classe DriverManager mock_config = { "EV_TOTAL": 6, "SUPPLY_GPIO_1": 2, "SUPPLY_GPIO_2": 3, "RAIN_GPIO": 25, "WEATHER_SERVICE": "drv:openweathermap", # Esempio di driver di servizio meteo "RELE_GPIO_CLOSE": 0, "RELE_GPIO_OPEN": 1, "SUPPLY_GPIO_POS": 0, "SUPPLY_GPIO_NEG": 1, "GPIO": "/usr/local/bin/gpio", # Percorso al comando gpio (wiringPi) "CUT": "/usr/bin/cut", # Percorso al comando cut "LOG_OUTPUT_DRV_FILE": "/tmp/piGarden.drv.log", # Percorso per il log dei driver # Elettrovalvole di esempio per setup_drv "EV1_GPIO": "17", "EV2_GPIO": "drv:custom_rele", # Esempio di un GPIO gestito da un driver custom "EV3_GPIO": "22", "EV4_GPIO": "18", "EV5_GPIO": "23", "EV6_GPIO": "24", } # --- Classe DriverManager --- class DriverManager: def __init__(self, config, log_writer, message_writer): self.config = config self.log_write = log_writer self.message_write = message_writer self.list_drv = [] # Lista dei driver attivi rilevati # Percorsi degli strumenti esterni (dal config) self.gpio_cmd = self.config.get("GPIO") self.cut_cmd = self.config.get("CUT") self.log_output_drv_file = self.config.get("LOG_OUTPUT_DRV_FILE") # Inizializza il file di log dei driver se non esiste if not os.path.exists(self.log_output_drv_file): open(self.log_output_drv_file, 'a').close() # Configura un logger specifico per l'output dei driver, come nello script Bash self.drv_logger = logging.getLogger('driver_output') self.drv_logger.setLevel(logging.INFO) # Rimuovi handler esistenti per evitare duplicati se chiamato più volte if not self.drv_logger.handlers: drv_handler = logging.FileHandler(self.log_output_drv_file, mode='a') drv_formatter = logging.Formatter('%(asctime)s %(message)s') drv_handler.setFormatter(drv_formatter) self.drv_logger.addHandler(drv_handler) self.drv_logger.propagate = False # Evita che i log vadano al logger root # Placeholder per le funzioni GPIO dirette (sostituire con RPi.GPIO o gpiozero) # Esempio con subprocess per il comando 'gpio' (meno Pythonico ma più fedele al Bash) self._gpio_write = lambda gpio_id, value: self._run_gpio_command("write", gpio_id, value) self._gpio_mode = lambda gpio_id, mode: self._run_gpio_command("mode", gpio_id, mode) self._gpio_read = lambda gpio_id: self._run_gpio_command("read", gpio_id) # Inizializza i driver al momento della creazione dell'istanza self.setup_drv() def _run_gpio_command(self, action, gpio_id, value=None): """Esegue un comando 'gpio' tramite subprocess.""" cmd = [self.gpio_cmd, "-g", action, str(gpio_id)] if value is not None: cmd.append(str(value)) try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) self.drv_logger.info(f"GPIO command '{' '.join(cmd)}' output: {result.stdout.strip()}") return result.stdout.strip() except subprocess.CalledProcessError as e: self.log_write("drv", "error", f"Errore GPIO command '{' '.join(cmd)}': {e.stderr.strip()}") self.message_write("warning", f"Errore GPIO: {e.stderr.strip()}") return None # O solleva un'eccezione def setup_drv(self): """ Funzione eseguita ad ogni avvio, include i driver e lancia le funzioni di setup. """ self.list_drv = [] # Azzera la lista dei driver # Raccoglie i nomi dei driver utilizzati per le elettrovalvole ev_total = self.config.get("EV_TOTAL", 0) for i in range(1, ev_total + 1): gpio_val = self.config.get(f"EV{i}_GPIO", "") if gpio_val.startswith("drv:"): drv = gpio_val.split(":")[1] if drv not in self.list_drv: self.list_drv.append(drv) # Raccoglie i nomi dei driver utilizzati per gli altri gpio e servizi for key in ["SUPPLY_GPIO_1", "SUPPLY_GPIO_2", "RAIN_GPIO", "WEATHER_SERVICE"]: gpio_val = self.config.get(key, "") if isinstance(gpio_val, str) and gpio_val.startswith("drv:"): drv = gpio_val.split(":")[1] if drv not in self.list_drv: self.list_drv.append(drv) # Simula l'inclusione dei file dei driver e l'esecuzione della funzione di setup for drv in self.list_drv: # In un'applicazione reale, qui potresti caricare moduli Python specifici # per ogni driver o chiamare metodi dedicati. # Per ora, simuliamo la chiamata a drv__setup setup_func_name = f"drv_{drv}_setup" if hasattr(self, setup_func_name) and callable(getattr(self, setup_func_name)): self.drv_logger.info(f"{setup_func_name}") try: getattr(self, setup_func_name)() except Exception as e: self.log_write("drv", "error", f"Errore in {setup_func_name}: {e}") else: self.drv_logger.info(f"Nessuna funzione di setup trovata per driver: {drv}") def get_driver_callback(self, function_name, driver_id): """ Restituisce il nome del metodo interno da richiamare per una specifica funzione del driver. """ if isinstance(driver_id, str) and driver_id.startswith("drv:"): drv = driver_id.split(":")[1] if drv not in self.list_drv: return "drvnotfound" return f"drv_{drv}_{function_name}" return None # Nessun driver specifico, useremo il GPIO diretto # --- Implementazioni delle funzioni drv_* --- # Esempio di un driver custom (simulato) def drv_custom_rele_rele_init(self, gpio_id): self.drv_logger.info(f"Custom Relè Driver: Inizializzazione {gpio_id}") # Logica specifica per il relè custom # Esempio: self.custom_rele_board.init(gpio_id) return True def drv_custom_rele_rele_close(self, gpio_id): self.drv_logger.info(f"Custom Relè Driver: Chiusura {gpio_id}") # Logica specifica per il relè custom # Esempio: self.custom_rele_board.set_state(gpio_id, 'closed') return True def drv_custom_rele_rele_open(self, gpio_id): self.drv_logger.info(f"Custom Relè Driver: Apertura {gpio_id}") # Logica specifica per il relè custom # Esempio: self.custom_rele_board.set_state(gpio_id, 'open') return True def drv_openweathermap_rain_online_get(self, driver_id): self.drv_logger.info(f"OpenWeatherMap Driver: Recupero dati meteo online per {driver_id}") # Qui faresti una chiamata API reale a OpenWeatherMap # Esempio: # import requests # api_key = self.config.get("OPENWEATHERMAP_KEY") # location = self.config.get("OPENWEATHERMAP_LOCATION") # url = f"http://api.openweathermap.org/data/2.5/weather?{location}&appid={api_key}" # try: # response = requests.get(url) # response.raise_for_status() # Solleva un'eccezione per errori HTTP # data = response.json() # # Estrai lo stato della pioggia da 'data' # if 'rain' in data and data['rain']: # return "1" # Indica pioggia # return "0" # Nessuna pioggia # except requests.exceptions.RequestException as e: # self.log_write("drv", "error", f"Errore OpenWeatherMap API: {e}") # self.message_write("warning", "Errore servizio meteo online") # return "" return "0" # Mock return def drv_rele_init(self, gpio_id): """Inizializza un relè e lo porta nello stato aperto.""" fnc_name = self.get_driver_callback("rele_init", gpio_id) if fnc_name is None: # Nessun driver specifico, usa GPIO diretto self._gpio_write(gpio_id, self.config.get("RELE_GPIO_OPEN")) self._gpio_mode(gpio_id, "out") elif fnc_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {gpio_id}") self.message_write("warning", f"Driver non trovato: {gpio_id}") else: # Chiama la funzione del driver dinamico if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)): self.drv_logger.info(f"{fnc_name} arg:{gpio_id}") try: getattr(self, fnc_name)(gpio_id) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.") def drv_rele_close(self, gpio_id): """Chiude un relè.""" fnc_name = self.get_driver_callback("rele_close", gpio_id) if fnc_name is None: self._gpio_write(gpio_id, self.config.get("RELE_GPIO_CLOSE")) elif fnc_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {gpio_id}") self.message_write("warning", f"Driver non trovato: {gpio_id}") return False # Fallimento else: if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)): self.drv_logger.info(f"{fnc_name} arg:{gpio_id}") try: getattr(self, fnc_name)(gpio_id) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc_name}: {e}") return False else: self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.") return False return True # Successo def drv_rele_open(self, gpio_id): """Apre un relè.""" fnc_name = self.get_driver_callback("rele_open", gpio_id) if fnc_name is None: self._gpio_write(gpio_id, self.config.get("RELE_GPIO_OPEN")) elif fnc_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {gpio_id}") self.message_write("warning", f"Driver non trovato: {gpio_id}") return False else: if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)): self.drv_logger.info(f"{fnc_name} arg:{gpio_id}") try: getattr(self, fnc_name)(gpio_id) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc_name}: {e}") return False else: self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.") return False return True def drv_supply_bistable_init(self, idx1, idx2): """Inizializza i relè che gestiscono l'alimentazione per le valvole bistabili.""" fnc1_name = self.get_driver_callback("supply_bistable_init", idx1) fnc2_name = self.get_driver_callback("supply_bistable_init", idx2) if fnc1_name is None: self._gpio_write(idx1, 0) self._gpio_mode(idx1, "out") elif fnc1_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {idx1}") self.message_write("warning", f"Driver non trovato: {idx1}") return else: if hasattr(self, fnc1_name) and callable(getattr(self, fnc1_name)): self.drv_logger.info(f"{fnc1_name} arg:{idx1}") try: getattr(self, fnc1_name)(idx1) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc1_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc1_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc1_name}' non implementata.") if fnc2_name is None: self._gpio_write(idx2, 0) self._gpio_mode(idx2, "out") elif fnc2_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {idx2}") self.message_write("warning", f"Driver non trovato: {idx2}") else: if hasattr(self, fnc2_name) and callable(getattr(self, fnc2_name)): self.drv_logger.info(f"{fnc2_name} arg:{idx2}") try: getattr(self, fnc2_name)(idx2) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc2_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc2_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc2_name}' non implementata.") def drv_supply_positive(self, idx1, idx2): """Imposta la tensione positiva per le elettrovalvole bistabili.""" fnc1_name = self.get_driver_callback("supply_positive", idx1) fnc2_name = self.get_driver_callback("supply_positive", idx2) if fnc1_name is None: self._gpio_write(idx1, self.config.get("SUPPLY_GPIO_POS")) elif fnc1_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {idx1}") self.message_write("warning", f"Driver non trovato: {idx1}") return else: if hasattr(self, fnc1_name) and callable(getattr(self, fnc1_name)): self.drv_logger.info(f"{fnc1_name} arg:{idx1}") try: getattr(self, fnc1_name)(idx1) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc1_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc1_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc1_name}' non implementata.") if fnc2_name is None: self._gpio_write(idx2, self.config.get("SUPPLY_GPIO_POS")) elif fnc2_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {idx2}") self.message_write("warning", f"Driver non trovato: {idx2}") else: if hasattr(self, fnc2_name) and callable(getattr(self, fnc2_name)): self.drv_logger.info(f"{fnc2_name} arg:{idx2}") try: getattr(self, fnc2_name)(idx2) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc2_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc2_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc2_name}' non implementata.") def drv_supply_negative(self, idx1, idx2): """Imposta la tensione negativa per le elettrovalvole bistabili.""" fnc1_name = self.get_driver_callback("supply_negative", idx1) fnc2_name = self.get_driver_callback("supply_negative", idx2) if fnc1_name is None: self._gpio_write(idx1, self.config.get("SUPPLY_GPIO_NEG")) elif fnc1_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {idx1}") self.message_write("warning", f"Driver non trovato: {idx1}") return else: if hasattr(self, fnc1_name) and callable(getattr(self, fnc1_name)): self.drv_logger.info(f"{fnc1_name} arg:{idx1}") try: getattr(self, fnc1_name)(idx1) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc1_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc1_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc1_name}' non implementata.") if fnc2_name is None: self._gpio_write(idx2, self.config.get("SUPPLY_GPIO_NEG")) elif fnc2_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {idx2}") self.message_write("warning", f"Driver non trovato: {idx2}") else: if hasattr(self, fnc2_name) and callable(getattr(self, fnc2_name)): self.drv_logger.info(f"{fnc2_name} arg:{idx2}") try: getattr(self, fnc2_name)(idx2) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc2_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc2_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc2_name}' non implementata.") def drv_rain_sensor_init(self, gpio_id): """Inizializza il sensore della pioggia.""" fnc_name = self.get_driver_callback("rain_sensor_init", gpio_id) if fnc_name is None: self._gpio_mode(gpio_id, "in") elif fnc_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {gpio_id}") self.message_write("warning", f"Driver non trovato: {gpio_id}") else: if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)): self.drv_logger.info(f"{fnc_name} arg:{gpio_id}") try: getattr(self, fnc_name)(gpio_id) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.") def drv_rain_sensor_get(self, gpio_id): """Legge lo stato del sensore della pioggia.""" fnc_name = self.get_driver_callback("rain_sensor_get", gpio_id) vret = "" if fnc_name is None: vret = self._gpio_read(gpio_id) elif fnc_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato: {gpio_id}") self.message_write("warning", f"Driver non trovato: {gpio_id}") else: if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)): self.drv_logger.info(f"{fnc_name} arg:{gpio_id}") try: vret = getattr(self, fnc_name)(gpio_id) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.") return vret def drv_rain_online_get(self, driver_id): """Legge lo stato delle condizioni meteo dal servizio online.""" fnc_name = self.get_driver_callback("rain_online_get", driver_id) vret = "" if fnc_name is None or fnc_name == "drvnotfound": self.log_write("drv", "error", f"Driver non trovato o non specificato per il servizio meteo: {driver_id}") self.message_write("warning", f"Driver non trovato per il servizio meteo: {driver_id}") else: if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)): self.drv_logger.info(f"{fnc_name} arg:{driver_id}") try: vret = getattr(self, fnc_name)(driver_id) except Exception as e: self.log_write("drv", "error", f"Errore in {fnc_name}: {e}") else: self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.") self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.") return vret # --- Esempio di utilizzo --- if __name__ == "__main__": print("--- Test DriverManager ---") # Inizializza il DriverManager con la configurazione mock e le funzioni di log/messaggio driver_manager = DriverManager(mock_config, log_write, message_write) print("\n--- Test setup_drv (eseguito all'inizializzazione) ---") print(f"Driver rilevati: {driver_manager.list_drv}") print("\n--- Test drv_rele_init (GPIO diretto) ---") driver_manager.drv_rele_init("17") # EV1_GPIO = 17 print("\n--- Test drv_rele_close (GPIO diretto) ---") driver_manager.drv_rele_close("17") print("\n--- Test drv_rele_open (GPIO diretto) ---") driver_manager.drv_rele_open("17") print("\n--- Test drv_rele_init (Custom Driver) ---") driver_manager.drv_rele_init("drv:custom_rele") # EV2_GPIO = drv:custom_rele print("\n--- Test drv_supply_positive ---") driver_manager.drv_supply_positive(mock_config["SUPPLY_GPIO_1"], mock_config["SUPPLY_GPIO_2"]) print("\n--- Test drv_rain_sensor_init ---") driver_manager.drv_rain_sensor_init(mock_config["RAIN_GPIO"]) print("\n--- Test drv_rain_sensor_get ---") rain_sensor_state = driver_manager.drv_rain_sensor_get(mock_config["RAIN_GPIO"]) print(f"Stato sensore pioggia: {rain_sensor_state}") print("\n--- Test drv_rain_online_get (OpenWeatherMap Driver) ---") online_rain_state = driver_manager.drv_rain_online_get(mock_config["WEATHER_SERVICE"]) print(f"Stato pioggia online: {online_rain_state}") print("\n--- Test completato ---") print(f"Controlla il file di log dei driver: {driver_manager.log_output_drv_file}")