From 390dd41dac4cf6f6aba54a391dcbe6ab686230f3 Mon Sep 17 00:00:00 2001 From: roberto Date: Wed, 9 Jul 2025 18:50:48 +0200 Subject: [PATCH] event_include.py --- Python/event_include.py | 255 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 255 insertions(+) create mode 100644 Python/event_include.py diff --git a/Python/event_include.py b/Python/event_include.py new file mode 100644 index 0000000..6e2a26f --- /dev/null +++ b/Python/event_include.py @@ -0,0 +1,255 @@ +import os +import subprocess +import time +import threading +import logging + +# --- Mock/Placeholder per le dipendenze esterne e configurazione --- +# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale. + +# Configura un logger di base per le funzioni di log +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def log_write(log_type, level, message): + """Simula la funzione log_write dal tuo script Bash.""" + if level == "info": + logging.info(f"[{log_type}] {message}") + elif level == "warning": + logging.warning(f"[{log_type}] {message}") + elif level == "error": + logging.error(f"[{log_type}] {message}") + else: + logging.debug(f"[{log_type}] {message}") + +def message_write(msg_type, message): + """Simula la funzione message_write dal tuo script Bash.""" + if msg_type == 'info': + logging.info(f"[message] INFO: {message}") + elif msg_type == 'warning': + logging.warning(f"[message] WARNING: {message}") + elif msg_type == 'success': + logging.info(f"[message] SUCCESS: {message}") + +def mqtt_status_mock(*args): + """Simula la funzione mqtt_status. In un'applicazione reale, chiamerebbe il tuo modulo MQTT.""" + log_write("mqtt", "info", f"Simulando mqtt_status con args: {args}") + +# Variabili di configurazione simulate +mock_config = { + "EVENT_DIR": "/tmp/piGarden_events", # Assicurati che questa directory esista per i test +} + +# --- Classe EventManager --- + +class EventManager: + def __init__(self, event_dir, log_writer, mqtt_status_func, message_writer_func=None): + self.event_dir = event_dir + self.log_write = log_writer + self.mqtt_status = mqtt_status_func + self.message_write = message_writer_func if message_writer_func else log_writer # Fallback if not provided + + self.CURRENT_EVENT = "" + self.CURRENT_EVENT_ALIAS = "" + + # Assicurati che la directory degli eventi esista per i test + os.makedirs(self.event_dir, exist_ok=True) + + def _build_script_args(self, event, *args): + """ + Costruisce la lista di argomenti per lo script esterno basandosi sul tipo di evento. + """ + script_args = [event] # Il primo argomento è sempre il nome dell'evento + + # Aggiungi il timestamp corrente come ultimo argomento per tutti i casi + current_timestamp = str(int(time.time())) + + if event in ["ev_open_before", "ev_open_after"]: + # ALIAS="$2", FORCE="$3" + alias = args[0] if len(args) > 0 else "" + force = args[1] if len(args) > 1 else "" + self.CURRENT_EVENT_ALIAS = alias + script_args.extend([alias, force, current_timestamp]) + elif event == "ev_open_in_before": + # ALIAS="$2", FORCE="$3", MINUTE_START="$4", MINUTE_STOP="$5" + alias = args[0] if len(args) > 0 else "" + force = args[1] if len(args) > 1 else "" + minute_start = args[2] if len(args) > 2 else "" + minute_stop = args[3] if len(args) > 3 else "" + self.CURRENT_EVENT_ALIAS = alias + script_args.extend([alias, force, minute_start, minute_stop, current_timestamp]) + elif event == "ev_open_in_after": + # ALIAS="$2", FORCE="$3", CRON_START="$4", CRON_STOP="$5" + alias = args[0] if len(args) > 0 else "" + force = args[1] if len(args) > 1 else "" + cron_start = args[2] if len(args) > 2 else "" + cron_stop = args[3] if len(args) > 3 else "" + self.CURRENT_EVENT_ALIAS = alias + script_args.extend([alias, force, cron_start, cron_stop, current_timestamp]) + elif event in ["ev_close_before", "ev_close_after"]: + # ALIAS="$2" + alias = args[0] if len(args) > 0 else "" + self.CURRENT_EVENT_ALIAS = alias + script_args.extend([alias, current_timestamp]) + elif event in ["check_rain_sensor_before", "check_rain_sensor_after", "check_rain_sensor_change"]: + # STATE="$2" + state = args[0] if len(args) > 0 else "" + script_args.extend([state, current_timestamp]) + elif event == "check_rain_online_before": + # STATE="$2" + state = args[0] if len(args) > 0 else "" + script_args.extend([state, current_timestamp]) + elif event in ["check_rain_online_after", "check_rain_online_change"]: + # STATE="$2", WEATHER="$3" + state = args[0] if len(args) > 0 else "" + weather = args[1] if len(args) > 1 else "" + script_args.extend([state, weather, current_timestamp]) + elif event in ["init_before", "init_after", "exec_poweroff_before", "exec_poweroff_after", + "exec_reboot_before", "exec_reboot_after"]: + # Nessun argomento specifico oltre l'evento, ma il Bash script passa $2 come CAUSE (spesso vuoto) + # e il timestamp. Qui passiamo solo il timestamp. + script_args.extend([current_timestamp]) + elif event in ["cron_add_before", "cron_add_after"]: + # CRON_TYPE="$2", CRON_ARG="$3", CRON_ELEMENT="$4" + cron_type = args[0] if len(args) > 0 else "" + cron_arg = args[1] if len(args) > 1 else "" + cron_element = args[2] if len(args) > 2 else "" + script_args.extend([cron_type, cron_arg, cron_element, current_timestamp]) + elif event in ["cron_del_before", "cron_del_after"]: + # CRON_TYPE="$2", CRON_ARG="$3" + cron_type = args[0] if len(args) > 0 else "" + cron_arg = args[1] if len(args) > 1 else "" + script_args.extend([cron_type, cron_arg, current_timestamp]) + else: # Caso generico + # EVENT="$1", CAUSE="$2" + cause = args[0] if len(args) > 0 else "" + script_args.extend([cause, current_timestamp]) + + return script_args + + def trigger_event(self, event, *args): + """ + Attiva un evento ed esegue gli script associati. + :param event: Nome dell'evento da attivare. + :param args: Argomenti aggiuntivi da passare agli script dell'evento. + :return: Codice di uscita dell'ultimo script eseguito, o 0 se tutto ok. + """ + self.log_write("event", "info", f"Triggering event: {event} with args: {args}") + + current_event_dir = os.path.join(self.event_dir, event) + return_code = 0 + + if os.path.isdir(current_event_dir): + # Ordina i file per garantire un ordine di esecuzione consistente + files_in_dir = sorted(os.listdir(current_event_dir)) + for f_name in files_in_dir: + script_path = os.path.join(current_event_dir, f_name) + + # Controlla se è un file eseguibile + if os.path.isfile(script_path) and os.access(script_path, os.X_OK): + script_args = self._build_script_args(event, *args) + + self.log_write("event", "info", f"Executing event script: {script_path} with args: {script_args}") + try: + # Esegue lo script esterno, reindirizzando stdout/stderr a DEVNULL per replicare &> /dev/null + result = subprocess.run( + [script_path] + script_args, + capture_output=True, + text=True, + check=True, # Solleva CalledProcessError per codici di uscita non zero + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + return_code = result.returncode + except subprocess.CalledProcessError as e: + return_code = e.returncode + self.log_write("event", "error", + f"Script evento '{script_path}' fallito con codice {return_code}. " + f"Output: {e.stdout.strip()} Errore: {e.stderr.strip()}") + + # Aggiorna CURRENT_EVENT e chiama mqtt_status in background + self.CURRENT_EVENT = event + threading.Thread(target=self.mqtt_status, args=(self.CURRENT_EVENT,)).start() + + self.log_write("event", "error", + f"Stop catena di eventi per codice di uscita {return_code} in {script_path}") + return return_code # Ferma l'esecuzione degli script successivi + + except FileNotFoundError: + self.log_write("event", "error", f"Script evento non trovato: {script_path}") + return_code = 127 # Codice di errore comune per "comando non trovato" + break # Ferma l'esecuzione degli script successivi + except Exception as e: + self.log_write("event", "error", f"Errore generico durante l'esecuzione di {script_path}: {e}") + return_code = 1 # Errore generico + break # Ferma l'esecuzione degli script successivi + else: + self.log_write("event", "info", f"Nessuna directory eventi trovata per: {event}") + + # Aggiorna CURRENT_EVENT e chiama mqtt_status in background, indipendentemente dal successo + self.CURRENT_EVENT = event + threading.Thread(target=self.mqtt_status, args=(self.CURRENT_EVENT,)).start() + + return return_code + +# --- Esempio di utilizzo --- +if __name__ == "__main__": + print("--- Test EventManager ---") + + # Crea una directory di test per gli eventi e alcuni script fittizi + test_event_dir = mock_config["EVENT_DIR"] + os.makedirs(os.path.join(test_event_dir, "test_event"), exist_ok=True) + os.makedirs(os.path.join(test_event_dir, "error_event"), exist_ok=True) + os.makedirs(os.path.join(test_event_dir, "ev_open_before"), exist_ok=True) + + # Script fittizio che ha successo + with open(os.path.join(test_event_dir, "test_event", "script_successo.sh"), "w") as f: + f.write("#!/bin/bash\n") + f.write("echo 'Script di successo eseguito per $1'\n") + f.write("exit 0\n") + os.chmod(os.path.join(test_event_dir, "test_event", "script_successo.sh"), 0o755) + + # Script fittizio che fallisce + with open(os.path.join(test_event_dir, "error_event", "script_fallimento.sh"), "w") as f: + f.write("#!/bin/bash\n") + f.write("echo 'Script di fallimento eseguito per $1'\n") + f.write("exit 10\n") # Codice di uscita non zero + os.chmod(os.path.join(test_event_dir, "error_event", "script_fallimento.sh"), 0o755) + + # Script fittizio per ev_open_before + with open(os.path.join(test_event_dir, "ev_open_before", "log_open_before.sh"), "w") as f: + f.write("#!/bin/bash\n") + f.write("echo 'ev_open_before: Evento=$1, Alias=$2, Force=$3, Timestamp=$4' >> /tmp/event_test.log\n") + f.write("exit 0\n") + os.chmod(os.path.join(test_event_dir, "ev_open_before", "log_open_before.sh"), 0o755) + + + event_manager = EventManager( + event_dir=mock_config["EVENT_DIR"], + log_writer=log_write, + mqtt_status_func=mqtt_status_mock, + message_writer_func=message_write + ) + + print("\n--- Esecuzione di un evento di successo ---") + result = event_manager.trigger_event("test_event", "some_cause") + print(f"Codice di ritorno: {result}") + print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}") + print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}") + + print("\n--- Esecuzione di un evento che fallisce ---") + result = event_manager.trigger_event("error_event", "another_cause") + print(f"Codice di ritorno: {result}") + print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}") + print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}") + + print("\n--- Esecuzione di un evento specifico (ev_open_before) ---") + result = event_manager.trigger_event("ev_open_before", "Zona_1", "true") + print(f"Codice di ritorno: {result}") + print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}") + print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}") + print("Controlla /tmp/event_test.log per l'output dello script.") + + print("\n--- Test completato ---") + # Pulisci le directory di test (opzionale) + # import shutil + # shutil.rmtree(test_event_dir)