roberto-patch-1 #2
255
Python/event_include.py
Normal file
255
Python/event_include.py
Normal file
@@ -0,0 +1,255 @@
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
import threading
|
||||
import logging
|
||||
|
||||
# --- Mock/Placeholder per le dipendenze esterne e configurazione ---
|
||||
# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale.
|
||||
|
||||
# Configura un logger di base per le funzioni di log
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def log_write(log_type, level, message):
|
||||
"""Simula la funzione log_write dal tuo script Bash."""
|
||||
if level == "info":
|
||||
logging.info(f"[{log_type}] {message}")
|
||||
elif level == "warning":
|
||||
logging.warning(f"[{log_type}] {message}")
|
||||
elif level == "error":
|
||||
logging.error(f"[{log_type}] {message}")
|
||||
else:
|
||||
logging.debug(f"[{log_type}] {message}")
|
||||
|
||||
def message_write(msg_type, message):
|
||||
"""Simula la funzione message_write dal tuo script Bash."""
|
||||
if msg_type == 'info':
|
||||
logging.info(f"[message] INFO: {message}")
|
||||
elif msg_type == 'warning':
|
||||
logging.warning(f"[message] WARNING: {message}")
|
||||
elif msg_type == 'success':
|
||||
logging.info(f"[message] SUCCESS: {message}")
|
||||
|
||||
def mqtt_status_mock(*args):
|
||||
"""Simula la funzione mqtt_status. In un'applicazione reale, chiamerebbe il tuo modulo MQTT."""
|
||||
log_write("mqtt", "info", f"Simulando mqtt_status con args: {args}")
|
||||
|
||||
# Variabili di configurazione simulate
|
||||
mock_config = {
|
||||
"EVENT_DIR": "/tmp/piGarden_events", # Assicurati che questa directory esista per i test
|
||||
}
|
||||
|
||||
# --- Classe EventManager ---
|
||||
|
||||
class EventManager:
|
||||
def __init__(self, event_dir, log_writer, mqtt_status_func, message_writer_func=None):
|
||||
self.event_dir = event_dir
|
||||
self.log_write = log_writer
|
||||
self.mqtt_status = mqtt_status_func
|
||||
self.message_write = message_writer_func if message_writer_func else log_writer # Fallback if not provided
|
||||
|
||||
self.CURRENT_EVENT = ""
|
||||
self.CURRENT_EVENT_ALIAS = ""
|
||||
|
||||
# Assicurati che la directory degli eventi esista per i test
|
||||
os.makedirs(self.event_dir, exist_ok=True)
|
||||
|
||||
def _build_script_args(self, event, *args):
|
||||
"""
|
||||
Costruisce la lista di argomenti per lo script esterno basandosi sul tipo di evento.
|
||||
"""
|
||||
script_args = [event] # Il primo argomento è sempre il nome dell'evento
|
||||
|
||||
# Aggiungi il timestamp corrente come ultimo argomento per tutti i casi
|
||||
current_timestamp = str(int(time.time()))
|
||||
|
||||
if event in ["ev_open_before", "ev_open_after"]:
|
||||
# ALIAS="$2", FORCE="$3"
|
||||
alias = args[0] if len(args) > 0 else ""
|
||||
force = args[1] if len(args) > 1 else ""
|
||||
self.CURRENT_EVENT_ALIAS = alias
|
||||
script_args.extend([alias, force, current_timestamp])
|
||||
elif event == "ev_open_in_before":
|
||||
# ALIAS="$2", FORCE="$3", MINUTE_START="$4", MINUTE_STOP="$5"
|
||||
alias = args[0] if len(args) > 0 else ""
|
||||
force = args[1] if len(args) > 1 else ""
|
||||
minute_start = args[2] if len(args) > 2 else ""
|
||||
minute_stop = args[3] if len(args) > 3 else ""
|
||||
self.CURRENT_EVENT_ALIAS = alias
|
||||
script_args.extend([alias, force, minute_start, minute_stop, current_timestamp])
|
||||
elif event == "ev_open_in_after":
|
||||
# ALIAS="$2", FORCE="$3", CRON_START="$4", CRON_STOP="$5"
|
||||
alias = args[0] if len(args) > 0 else ""
|
||||
force = args[1] if len(args) > 1 else ""
|
||||
cron_start = args[2] if len(args) > 2 else ""
|
||||
cron_stop = args[3] if len(args) > 3 else ""
|
||||
self.CURRENT_EVENT_ALIAS = alias
|
||||
script_args.extend([alias, force, cron_start, cron_stop, current_timestamp])
|
||||
elif event in ["ev_close_before", "ev_close_after"]:
|
||||
# ALIAS="$2"
|
||||
alias = args[0] if len(args) > 0 else ""
|
||||
self.CURRENT_EVENT_ALIAS = alias
|
||||
script_args.extend([alias, current_timestamp])
|
||||
elif event in ["check_rain_sensor_before", "check_rain_sensor_after", "check_rain_sensor_change"]:
|
||||
# STATE="$2"
|
||||
state = args[0] if len(args) > 0 else ""
|
||||
script_args.extend([state, current_timestamp])
|
||||
elif event == "check_rain_online_before":
|
||||
# STATE="$2"
|
||||
state = args[0] if len(args) > 0 else ""
|
||||
script_args.extend([state, current_timestamp])
|
||||
elif event in ["check_rain_online_after", "check_rain_online_change"]:
|
||||
# STATE="$2", WEATHER="$3"
|
||||
state = args[0] if len(args) > 0 else ""
|
||||
weather = args[1] if len(args) > 1 else ""
|
||||
script_args.extend([state, weather, current_timestamp])
|
||||
elif event in ["init_before", "init_after", "exec_poweroff_before", "exec_poweroff_after",
|
||||
"exec_reboot_before", "exec_reboot_after"]:
|
||||
# Nessun argomento specifico oltre l'evento, ma il Bash script passa $2 come CAUSE (spesso vuoto)
|
||||
# e il timestamp. Qui passiamo solo il timestamp.
|
||||
script_args.extend([current_timestamp])
|
||||
elif event in ["cron_add_before", "cron_add_after"]:
|
||||
# CRON_TYPE="$2", CRON_ARG="$3", CRON_ELEMENT="$4"
|
||||
cron_type = args[0] if len(args) > 0 else ""
|
||||
cron_arg = args[1] if len(args) > 1 else ""
|
||||
cron_element = args[2] if len(args) > 2 else ""
|
||||
script_args.extend([cron_type, cron_arg, cron_element, current_timestamp])
|
||||
elif event in ["cron_del_before", "cron_del_after"]:
|
||||
# CRON_TYPE="$2", CRON_ARG="$3"
|
||||
cron_type = args[0] if len(args) > 0 else ""
|
||||
cron_arg = args[1] if len(args) > 1 else ""
|
||||
script_args.extend([cron_type, cron_arg, current_timestamp])
|
||||
else: # Caso generico
|
||||
# EVENT="$1", CAUSE="$2"
|
||||
cause = args[0] if len(args) > 0 else ""
|
||||
script_args.extend([cause, current_timestamp])
|
||||
|
||||
return script_args
|
||||
|
||||
def trigger_event(self, event, *args):
|
||||
"""
|
||||
Attiva un evento ed esegue gli script associati.
|
||||
:param event: Nome dell'evento da attivare.
|
||||
:param args: Argomenti aggiuntivi da passare agli script dell'evento.
|
||||
:return: Codice di uscita dell'ultimo script eseguito, o 0 se tutto ok.
|
||||
"""
|
||||
self.log_write("event", "info", f"Triggering event: {event} with args: {args}")
|
||||
|
||||
current_event_dir = os.path.join(self.event_dir, event)
|
||||
return_code = 0
|
||||
|
||||
if os.path.isdir(current_event_dir):
|
||||
# Ordina i file per garantire un ordine di esecuzione consistente
|
||||
files_in_dir = sorted(os.listdir(current_event_dir))
|
||||
for f_name in files_in_dir:
|
||||
script_path = os.path.join(current_event_dir, f_name)
|
||||
|
||||
# Controlla se è un file eseguibile
|
||||
if os.path.isfile(script_path) and os.access(script_path, os.X_OK):
|
||||
script_args = self._build_script_args(event, *args)
|
||||
|
||||
self.log_write("event", "info", f"Executing event script: {script_path} with args: {script_args}")
|
||||
try:
|
||||
# Esegue lo script esterno, reindirizzando stdout/stderr a DEVNULL per replicare &> /dev/null
|
||||
result = subprocess.run(
|
||||
[script_path] + script_args,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True, # Solleva CalledProcessError per codici di uscita non zero
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL
|
||||
)
|
||||
return_code = result.returncode
|
||||
except subprocess.CalledProcessError as e:
|
||||
return_code = e.returncode
|
||||
self.log_write("event", "error",
|
||||
f"Script evento '{script_path}' fallito con codice {return_code}. "
|
||||
f"Output: {e.stdout.strip()} Errore: {e.stderr.strip()}")
|
||||
|
||||
# Aggiorna CURRENT_EVENT e chiama mqtt_status in background
|
||||
self.CURRENT_EVENT = event
|
||||
threading.Thread(target=self.mqtt_status, args=(self.CURRENT_EVENT,)).start()
|
||||
|
||||
self.log_write("event", "error",
|
||||
f"Stop catena di eventi per codice di uscita {return_code} in {script_path}")
|
||||
return return_code # Ferma l'esecuzione degli script successivi
|
||||
|
||||
except FileNotFoundError:
|
||||
self.log_write("event", "error", f"Script evento non trovato: {script_path}")
|
||||
return_code = 127 # Codice di errore comune per "comando non trovato"
|
||||
break # Ferma l'esecuzione degli script successivi
|
||||
except Exception as e:
|
||||
self.log_write("event", "error", f"Errore generico durante l'esecuzione di {script_path}: {e}")
|
||||
return_code = 1 # Errore generico
|
||||
break # Ferma l'esecuzione degli script successivi
|
||||
else:
|
||||
self.log_write("event", "info", f"Nessuna directory eventi trovata per: {event}")
|
||||
|
||||
# Aggiorna CURRENT_EVENT e chiama mqtt_status in background, indipendentemente dal successo
|
||||
self.CURRENT_EVENT = event
|
||||
threading.Thread(target=self.mqtt_status, args=(self.CURRENT_EVENT,)).start()
|
||||
|
||||
return return_code
|
||||
|
||||
# --- Esempio di utilizzo ---
|
||||
if __name__ == "__main__":
|
||||
print("--- Test EventManager ---")
|
||||
|
||||
# Crea una directory di test per gli eventi e alcuni script fittizi
|
||||
test_event_dir = mock_config["EVENT_DIR"]
|
||||
os.makedirs(os.path.join(test_event_dir, "test_event"), exist_ok=True)
|
||||
os.makedirs(os.path.join(test_event_dir, "error_event"), exist_ok=True)
|
||||
os.makedirs(os.path.join(test_event_dir, "ev_open_before"), exist_ok=True)
|
||||
|
||||
# Script fittizio che ha successo
|
||||
with open(os.path.join(test_event_dir, "test_event", "script_successo.sh"), "w") as f:
|
||||
f.write("#!/bin/bash\n")
|
||||
f.write("echo 'Script di successo eseguito per $1'\n")
|
||||
f.write("exit 0\n")
|
||||
os.chmod(os.path.join(test_event_dir, "test_event", "script_successo.sh"), 0o755)
|
||||
|
||||
# Script fittizio che fallisce
|
||||
with open(os.path.join(test_event_dir, "error_event", "script_fallimento.sh"), "w") as f:
|
||||
f.write("#!/bin/bash\n")
|
||||
f.write("echo 'Script di fallimento eseguito per $1'\n")
|
||||
f.write("exit 10\n") # Codice di uscita non zero
|
||||
os.chmod(os.path.join(test_event_dir, "error_event", "script_fallimento.sh"), 0o755)
|
||||
|
||||
# Script fittizio per ev_open_before
|
||||
with open(os.path.join(test_event_dir, "ev_open_before", "log_open_before.sh"), "w") as f:
|
||||
f.write("#!/bin/bash\n")
|
||||
f.write("echo 'ev_open_before: Evento=$1, Alias=$2, Force=$3, Timestamp=$4' >> /tmp/event_test.log\n")
|
||||
f.write("exit 0\n")
|
||||
os.chmod(os.path.join(test_event_dir, "ev_open_before", "log_open_before.sh"), 0o755)
|
||||
|
||||
|
||||
event_manager = EventManager(
|
||||
event_dir=mock_config["EVENT_DIR"],
|
||||
log_writer=log_write,
|
||||
mqtt_status_func=mqtt_status_mock,
|
||||
message_writer_func=message_write
|
||||
)
|
||||
|
||||
print("\n--- Esecuzione di un evento di successo ---")
|
||||
result = event_manager.trigger_event("test_event", "some_cause")
|
||||
print(f"Codice di ritorno: {result}")
|
||||
print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}")
|
||||
print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}")
|
||||
|
||||
print("\n--- Esecuzione di un evento che fallisce ---")
|
||||
result = event_manager.trigger_event("error_event", "another_cause")
|
||||
print(f"Codice di ritorno: {result}")
|
||||
print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}")
|
||||
print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}")
|
||||
|
||||
print("\n--- Esecuzione di un evento specifico (ev_open_before) ---")
|
||||
result = event_manager.trigger_event("ev_open_before", "Zona_1", "true")
|
||||
print(f"Codice di ritorno: {result}")
|
||||
print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}")
|
||||
print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}")
|
||||
print("Controlla /tmp/event_test.log per l'output dello script.")
|
||||
|
||||
print("\n--- Test completato ---")
|
||||
# Pulisci le directory di test (opzionale)
|
||||
# import shutil
|
||||
# shutil.rmtree(test_event_dir)
|
||||
Reference in New Issue
Block a user