import os import subprocess import time import threading import logging # --- Mock/Placeholder per le dipendenze esterne e configurazione --- # In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale. # Configura un logger di base per le funzioni di log logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def log_write(log_type, level, message): """Simula la funzione log_write dal tuo script Bash.""" if level == "info": logging.info(f"[{log_type}] {message}") elif level == "warning": logging.warning(f"[{log_type}] {message}") elif level == "error": logging.error(f"[{log_type}] {message}") else: logging.debug(f"[{log_type}] {message}") def message_write(msg_type, message): """Simula la funzione message_write dal tuo script Bash.""" if msg_type == 'info': logging.info(f"[message] INFO: {message}") elif msg_type == 'warning': logging.warning(f"[message] WARNING: {message}") elif msg_type == 'success': logging.info(f"[message] SUCCESS: {message}") def mqtt_status_mock(*args): """Simula la funzione mqtt_status. In un'applicazione reale, chiamerebbe il tuo modulo MQTT.""" log_write("mqtt", "info", f"Simulando mqtt_status con args: {args}") # Variabili di configurazione simulate mock_config = { "EVENT_DIR": "/tmp/piGarden_events", # Assicurati che questa directory esista per i test } # --- Classe EventManager --- class EventManager: def __init__(self, event_dir, log_writer, mqtt_status_func, message_writer_func=None): self.event_dir = event_dir self.log_write = log_writer self.mqtt_status = mqtt_status_func self.message_write = message_writer_func if message_writer_func else log_writer # Fallback if not provided self.CURRENT_EVENT = "" self.CURRENT_EVENT_ALIAS = "" # Assicurati che la directory degli eventi esista per i test os.makedirs(self.event_dir, exist_ok=True) def _build_script_args(self, event, *args): """ Costruisce la lista di argomenti per lo script esterno basandosi sul tipo di evento. """ script_args = [event] # Il primo argomento è sempre il nome dell'evento # Aggiungi il timestamp corrente come ultimo argomento per tutti i casi current_timestamp = str(int(time.time())) if event in ["ev_open_before", "ev_open_after"]: # ALIAS="$2", FORCE="$3" alias = args[0] if len(args) > 0 else "" force = args[1] if len(args) > 1 else "" self.CURRENT_EVENT_ALIAS = alias script_args.extend([alias, force, current_timestamp]) elif event == "ev_open_in_before": # ALIAS="$2", FORCE="$3", MINUTE_START="$4", MINUTE_STOP="$5" alias = args[0] if len(args) > 0 else "" force = args[1] if len(args) > 1 else "" minute_start = args[2] if len(args) > 2 else "" minute_stop = args[3] if len(args) > 3 else "" self.CURRENT_EVENT_ALIAS = alias script_args.extend([alias, force, minute_start, minute_stop, current_timestamp]) elif event == "ev_open_in_after": # ALIAS="$2", FORCE="$3", CRON_START="$4", CRON_STOP="$5" alias = args[0] if len(args) > 0 else "" force = args[1] if len(args) > 1 else "" cron_start = args[2] if len(args) > 2 else "" cron_stop = args[3] if len(args) > 3 else "" self.CURRENT_EVENT_ALIAS = alias script_args.extend([alias, force, cron_start, cron_stop, current_timestamp]) elif event in ["ev_close_before", "ev_close_after"]: # ALIAS="$2" alias = args[0] if len(args) > 0 else "" self.CURRENT_EVENT_ALIAS = alias script_args.extend([alias, current_timestamp]) elif event in ["check_rain_sensor_before", "check_rain_sensor_after", "check_rain_sensor_change"]: # STATE="$2" state = args[0] if len(args) > 0 else "" script_args.extend([state, current_timestamp]) elif event == "check_rain_online_before": # STATE="$2" state = args[0] if len(args) > 0 else "" script_args.extend([state, current_timestamp]) elif event in ["check_rain_online_after", "check_rain_online_change"]: # STATE="$2", WEATHER="$3" state = args[0] if len(args) > 0 else "" weather = args[1] if len(args) > 1 else "" script_args.extend([state, weather, current_timestamp]) elif event in ["init_before", "init_after", "exec_poweroff_before", "exec_poweroff_after", "exec_reboot_before", "exec_reboot_after"]: # Nessun argomento specifico oltre l'evento, ma il Bash script passa $2 come CAUSE (spesso vuoto) # e il timestamp. Qui passiamo solo il timestamp. script_args.extend([current_timestamp]) elif event in ["cron_add_before", "cron_add_after"]: # CRON_TYPE="$2", CRON_ARG="$3", CRON_ELEMENT="$4" cron_type = args[0] if len(args) > 0 else "" cron_arg = args[1] if len(args) > 1 else "" cron_element = args[2] if len(args) > 2 else "" script_args.extend([cron_type, cron_arg, cron_element, current_timestamp]) elif event in ["cron_del_before", "cron_del_after"]: # CRON_TYPE="$2", CRON_ARG="$3" cron_type = args[0] if len(args) > 0 else "" cron_arg = args[1] if len(args) > 1 else "" script_args.extend([cron_type, cron_arg, current_timestamp]) else: # Caso generico # EVENT="$1", CAUSE="$2" cause = args[0] if len(args) > 0 else "" script_args.extend([cause, current_timestamp]) return script_args def trigger_event(self, event, *args): """ Attiva un evento ed esegue gli script associati. :param event: Nome dell'evento da attivare. :param args: Argomenti aggiuntivi da passare agli script dell'evento. :return: Codice di uscita dell'ultimo script eseguito, o 0 se tutto ok. """ self.log_write("event", "info", f"Triggering event: {event} with args: {args}") current_event_dir = os.path.join(self.event_dir, event) return_code = 0 if os.path.isdir(current_event_dir): # Ordina i file per garantire un ordine di esecuzione consistente files_in_dir = sorted(os.listdir(current_event_dir)) for f_name in files_in_dir: script_path = os.path.join(current_event_dir, f_name) # Controlla se è un file eseguibile if os.path.isfile(script_path) and os.access(script_path, os.X_OK): script_args = self._build_script_args(event, *args) self.log_write("event", "info", f"Executing event script: {script_path} with args: {script_args}") try: # Esegue lo script esterno, reindirizzando stdout/stderr a DEVNULL per replicare &> /dev/null result = subprocess.run( [script_path] + script_args, capture_output=True, text=True, check=True, # Solleva CalledProcessError per codici di uscita non zero stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) return_code = result.returncode except subprocess.CalledProcessError as e: return_code = e.returncode self.log_write("event", "error", f"Script evento '{script_path}' fallito con codice {return_code}. " f"Output: {e.stdout.strip()} Errore: {e.stderr.strip()}") # Aggiorna CURRENT_EVENT e chiama mqtt_status in background self.CURRENT_EVENT = event threading.Thread(target=self.mqtt_status, args=(self.CURRENT_EVENT,)).start() self.log_write("event", "error", f"Stop catena di eventi per codice di uscita {return_code} in {script_path}") return return_code # Ferma l'esecuzione degli script successivi except FileNotFoundError: self.log_write("event", "error", f"Script evento non trovato: {script_path}") return_code = 127 # Codice di errore comune per "comando non trovato" break # Ferma l'esecuzione degli script successivi except Exception as e: self.log_write("event", "error", f"Errore generico durante l'esecuzione di {script_path}: {e}") return_code = 1 # Errore generico break # Ferma l'esecuzione degli script successivi else: self.log_write("event", "info", f"Nessuna directory eventi trovata per: {event}") # Aggiorna CURRENT_EVENT e chiama mqtt_status in background, indipendentemente dal successo self.CURRENT_EVENT = event threading.Thread(target=self.mqtt_status, args=(self.CURRENT_EVENT,)).start() return return_code # --- Esempio di utilizzo --- if __name__ == "__main__": print("--- Test EventManager ---") # Crea una directory di test per gli eventi e alcuni script fittizi test_event_dir = mock_config["EVENT_DIR"] os.makedirs(os.path.join(test_event_dir, "test_event"), exist_ok=True) os.makedirs(os.path.join(test_event_dir, "error_event"), exist_ok=True) os.makedirs(os.path.join(test_event_dir, "ev_open_before"), exist_ok=True) # Script fittizio che ha successo with open(os.path.join(test_event_dir, "test_event", "script_successo.sh"), "w") as f: f.write("#!/bin/bash\n") f.write("echo 'Script di successo eseguito per $1'\n") f.write("exit 0\n") os.chmod(os.path.join(test_event_dir, "test_event", "script_successo.sh"), 0o755) # Script fittizio che fallisce with open(os.path.join(test_event_dir, "error_event", "script_fallimento.sh"), "w") as f: f.write("#!/bin/bash\n") f.write("echo 'Script di fallimento eseguito per $1'\n") f.write("exit 10\n") # Codice di uscita non zero os.chmod(os.path.join(test_event_dir, "error_event", "script_fallimento.sh"), 0o755) # Script fittizio per ev_open_before with open(os.path.join(test_event_dir, "ev_open_before", "log_open_before.sh"), "w") as f: f.write("#!/bin/bash\n") f.write("echo 'ev_open_before: Evento=$1, Alias=$2, Force=$3, Timestamp=$4' >> /tmp/event_test.log\n") f.write("exit 0\n") os.chmod(os.path.join(test_event_dir, "ev_open_before", "log_open_before.sh"), 0o755) event_manager = EventManager( event_dir=mock_config["EVENT_DIR"], log_writer=log_write, mqtt_status_func=mqtt_status_mock, message_writer_func=message_write ) print("\n--- Esecuzione di un evento di successo ---") result = event_manager.trigger_event("test_event", "some_cause") print(f"Codice di ritorno: {result}") print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}") print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}") print("\n--- Esecuzione di un evento che fallisce ---") result = event_manager.trigger_event("error_event", "another_cause") print(f"Codice di ritorno: {result}") print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}") print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}") print("\n--- Esecuzione di un evento specifico (ev_open_before) ---") result = event_manager.trigger_event("ev_open_before", "Zona_1", "true") print(f"Codice di ritorno: {result}") print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}") print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}") print("Controlla /tmp/event_test.log per l'output dello script.") print("\n--- Test completato ---") # Pulisci le directory di test (opzionale) # import shutil # shutil.rmtree(test_event_dir)