Compare commits
18 Commits
10045d5a11
...
4d2cdc81ef
| Author | SHA1 | Date | |
|---|---|---|---|
| 4d2cdc81ef | |||
| a8800280f3 | |||
| cb6220a059 | |||
| cdeb049b06 | |||
| 390dd41dac | |||
| a11db80ac0 | |||
| caf698bfd5 | |||
| 566b982e00 | |||
| 412c58e054 | |||
| cde121ced7 | |||
| 9d76bb3456 | |||
| ecef7fb368 | |||
| d81834131b | |||
| fa30f253be | |||
| 6326ec01fe | |||
| 9cf3ee0785 | |||
| fbd578f502 | |||
| 82a637d6d2 |
15
Python/Socket_server_x_test.py
Normal file
15
Python/Socket_server_x_test.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import socket
|
||||
|
||||
HOST = 'localhost'
|
||||
PORT = 12345
|
||||
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.connect((HOST, PORT))
|
||||
|
||||
# Se l'autenticazione è abilitata
|
||||
s.sendall(b'admin\n')
|
||||
s.sendall(b'password123\n')
|
||||
|
||||
s.sendall(b'status\n') # Invia il comando
|
||||
data = s.recv(1024)
|
||||
print(f"Received: {data.decode()}")
|
||||
434
Python/cron_include.py
Normal file
434
Python/cron_include.py
Normal file
@@ -0,0 +1,434 @@
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
from crontab import CronTab, CronItem # Importa le classi necessarie da python-crontab
|
||||
|
||||
# --- Mock/Placeholder per le dipendenze esterne ---
|
||||
# In un'applicazione reale, queste funzioni verrebbero fornite dal tuo sistema piGarden principale.
|
||||
|
||||
# Configura un logger di base per le funzioni di log
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def log_write(log_type, level, message):
|
||||
"""
|
||||
Simula la funzione log_write dal tuo script Bash.
|
||||
In un'applicazione reale, useresti il modulo logging di Python.
|
||||
"""
|
||||
if level == "info":
|
||||
logging.info(f"[{log_type}] {message}")
|
||||
elif level == "warning":
|
||||
logging.warning(f"[{log_type}] {message}")
|
||||
elif level == "error":
|
||||
logging.error(f"[{log_type}] {message}")
|
||||
else:
|
||||
logging.debug(f"[{log_type}] {message}")
|
||||
|
||||
def trigger_event(event_name, *args):
|
||||
"""
|
||||
Simula la funzione trigger_event dal tuo script Bash.
|
||||
"""
|
||||
log_write("event", "info", f"Triggered event: {event_name} with args: {args}")
|
||||
# Qui potresti aggiungere la logica per chiamare handler di eventi reali
|
||||
|
||||
def alias_exists(alias_name):
|
||||
"""
|
||||
Simula la funzione alias_exists.
|
||||
Dovrebbe essere integrata con la tua configurazione delle elettrovalvole.
|
||||
Per questo esempio, restituisce True solo per alias "1" a "6".
|
||||
"""
|
||||
try:
|
||||
num = int(alias_name)
|
||||
return 1 <= num <= EV_TOTAL # Assumiamo EV_TOTAL sia definito globalmente o passato
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
# EV_TOTAL deve essere definito o passato, simuliamo un valore qui
|
||||
EV_TOTAL = 6
|
||||
# Percorso dello script principale (es. piGarden.py)
|
||||
# Questo dovrebbe essere il percorso assoluto del tuo script piGarden principale
|
||||
# In un'applicazione reale, lo passeresti dalla tua classe PiGarden
|
||||
PI_GARDEN_SCRIPT_PATH = "/home/pi/piGarden/piGarden.py"
|
||||
|
||||
# --- Classe CronManager ---
|
||||
|
||||
class CronManager:
|
||||
def __init__(self, script_path, ev_total_val, log_writer, event_trigger, alias_checker):
|
||||
self.script_path = script_path
|
||||
self.ev_total = ev_total_val
|
||||
self.log_write = log_writer
|
||||
self.trigger_event = event_trigger
|
||||
self.alias_exists = alias_checker
|
||||
self.cron_user = True # Gestisce il crontab dell'utente corrente
|
||||
|
||||
def _get_crontab(self):
|
||||
"""Ottiene l'oggetto CronTab per l'utente corrente."""
|
||||
try:
|
||||
return CronTab(user=self.cron_user)
|
||||
except Exception as e:
|
||||
self.log_write("cron", "error", f"Impossibile accedere al crontab: {e}")
|
||||
raise
|
||||
|
||||
def cron_del(self, cron_type, cron_arg=""):
|
||||
"""
|
||||
Elimina una tipologia di schedulazione dal crontab dell'utente.
|
||||
:param cron_type: Tipologia del cron (es. "init", "open", "close").
|
||||
:param cron_arg: Argomento della tipologia (es. alias dell'elettrovalvola).
|
||||
"""
|
||||
if not cron_type:
|
||||
self.log_write("cron", "error", "Tipo cron vuoto")
|
||||
print("Tipo cron vuoto", file=os.sys.stderr)
|
||||
return False
|
||||
|
||||
crontab = self._get_crontab()
|
||||
jobs_to_remove = []
|
||||
|
||||
# Il tuo script Bash usa commenti START/END.
|
||||
# Possiamo cercare lavori che contengono questi commenti.
|
||||
# Alternativamente, si potrebbe assegnare un commento specifico ad ogni job creato.
|
||||
start_comment_pattern = re.compile(rf"^# START cron {re.escape(cron_type)} {re.escape(cron_arg)}$")
|
||||
end_comment_pattern = re.compile(rf"^# END cron {re.escape(cron_type)} {re.escape(cron_arg)}$")
|
||||
|
||||
found_block = False
|
||||
in_block = False
|
||||
for job in list(crontab.jobs): # Iterate over a copy because we might modify
|
||||
if start_comment_pattern.match(job.comment or ""):
|
||||
in_block = True
|
||||
found_block = True
|
||||
jobs_to_remove.append(job) # Include the START comment job itself
|
||||
elif end_comment_pattern.match(job.comment or ""):
|
||||
if in_block:
|
||||
jobs_to_remove.append(job) # Include the END comment job itself
|
||||
in_block = False
|
||||
elif in_block:
|
||||
jobs_to_remove.append(job)
|
||||
|
||||
if not found_block:
|
||||
print(f"{cron_type} {cron_arg} cron non presente", file=os.sys.stderr)
|
||||
return True # Considerato un successo se non c'è nulla da eliminare
|
||||
|
||||
self.trigger_event("cron_del_before", cron_type, cron_arg)
|
||||
|
||||
for job in jobs_to_remove:
|
||||
crontab.remove(job)
|
||||
|
||||
try:
|
||||
crontab.write()
|
||||
self.log_write("cron", "info", f"Cron '{cron_type} {cron_arg}' eliminato con successo.")
|
||||
self.trigger_event("cron_del_after", cron_type, cron_arg)
|
||||
return True
|
||||
except Exception as e:
|
||||
self.log_write("cron", "error", f"Errore durante la scrittura del crontab: {e}")
|
||||
print(f"Errore durante la scrittura del crontab: {e}", file=os.sys.stderr)
|
||||
return False
|
||||
|
||||
def _get_cron_command(self, cron_type, cron_arg, cron_arg2):
|
||||
"""Determina il comando Bash da eseguire per il cron job."""
|
||||
base_command = f"{self.script_path}"
|
||||
|
||||
if cron_type == "init":
|
||||
return f"{base_command} init"
|
||||
elif cron_type == "start_socket_server":
|
||||
return f"{base_command} start_socket_server force"
|
||||
elif cron_type == "check_rain_online":
|
||||
return f"{base_command} check_rain_online 2> /tmp/check_rain_online.err"
|
||||
elif cron_type == "check_rain_sensor":
|
||||
return f"{base_command} check_rain_sensor 2> /tmp/check_rain_sensor.err"
|
||||
elif cron_type == "close_all_for_rain":
|
||||
return f"{base_command} close_all_for_rain 2> /tmp/close_all_for_rain.err 1> /dev/null"
|
||||
elif cron_type == "open":
|
||||
return f"{base_command} open {cron_arg}"
|
||||
elif cron_type == "open_in":
|
||||
return f"{base_command} open {cron_arg} {cron_arg2}"
|
||||
elif cron_type == "open_in_stop":
|
||||
return f"{base_command} close {cron_arg}"
|
||||
elif cron_type == "close":
|
||||
return f"{base_command} close {cron_arg}"
|
||||
else:
|
||||
self.log_write("cron", "error", f"Tipo cron errato: {cron_type}")
|
||||
print(f"Tipo cron errato: {cron_type}", file=os.sys.stderr)
|
||||
raise ValueError(f"Tipo cron errato: {cron_type}")
|
||||
|
||||
def cron_add(self, cron_type, minute="*", hour="*", dom="*", month="*", dow="*", cron_arg="", cron_arg2=""):
|
||||
"""
|
||||
Aggiunge una schedulazione nel crontab dell'utente.
|
||||
:param cron_type: Tipologia del cron.
|
||||
:param minute: Minuto (0-59, *, */n, @reboot).
|
||||
:param hour: Ora (0-23, *, */n).
|
||||
:param dom: Giorno del mese (1-31, *, */n).
|
||||
:param month: Mese (1-12, *, */n).
|
||||
:param dow: Giorno della settimana (0-6, *, */n).
|
||||
:param cron_arg: Primo argomento specifico della tipologia.
|
||||
:param cron_arg2: Secondo argomento specifico della tipologia (es. "disabled").
|
||||
"""
|
||||
if not cron_type:
|
||||
self.log_write("cron", "error", "Tipo cron vuoto")
|
||||
print("Tipo cron vuoto", file=os.sys.stderr)
|
||||
return False
|
||||
|
||||
# Elimina prima qualsiasi blocco esistente per garantire l'idempotenza
|
||||
self.cron_del(cron_type, cron_arg)
|
||||
|
||||
crontab = self._get_crontab()
|
||||
|
||||
# Determina il comando e se deve essere disabilitato
|
||||
cron_command = self._get_cron_command(cron_type, cron_arg, cron_arg2)
|
||||
cron_disabled = (cron_arg2 == "disabled")
|
||||
|
||||
# Crea i commenti START e END per il blocco
|
||||
start_comment = f"# START cron {cron_type} {cron_arg}"
|
||||
end_comment = f"# END cron {cron_type} {cron_arg}"
|
||||
|
||||
# Aggiungi il commento START
|
||||
job_start = crontab.new(command=f"echo '{start_comment}'", comment=start_comment)
|
||||
job_start.minute.every(1) # Un cron job fittizio per il commento START
|
||||
job_start.enabled = False # Disabilita il job commento
|
||||
|
||||
# Aggiungi il job principale
|
||||
job = crontab.new(command=cron_command)
|
||||
if minute == "@reboot":
|
||||
job.set_every("reboot")
|
||||
else:
|
||||
job.minute.on(minute)
|
||||
job.hour.on(hour)
|
||||
job.dom.on(dom)
|
||||
job.month.on(month)
|
||||
job.dow.on(dow)
|
||||
job.enabled = not cron_disabled
|
||||
job.comment = f"piGarden {cron_type} {cron_arg}" # Un commento più descrittivo per il job reale
|
||||
|
||||
# Aggiungi il commento END
|
||||
job_end = crontab.new(command=f"echo '{end_comment}'", comment=end_comment)
|
||||
job_end.minute.every(1) # Un cron job fittizio per il commento END
|
||||
job_end.enabled = False # Disabilita il job commento
|
||||
|
||||
try:
|
||||
crontab.write()
|
||||
self.log_write("cron", "info", f"Cron '{cron_type} {cron_arg}' aggiunto con successo: {job.render()}")
|
||||
self.trigger_event("cron_add_after", cron_type, cron_arg, job.render())
|
||||
return True
|
||||
except Exception as e:
|
||||
self.log_write("cron", "error", f"Errore durante la scrittura del crontab: {e}")
|
||||
print(f"Errore durante la scrittura del crontab: {e}", file=os.sys.stderr)
|
||||
return False
|
||||
|
||||
def cron_get(self, cron_type, cron_arg=""):
|
||||
"""
|
||||
Legge una tipologia di schedulazione dal crontab dell'utente.
|
||||
:param cron_type: Tipologia del cron.
|
||||
:param cron_arg: Argomento della tipologia.
|
||||
:return: Stringa contenente le schedulazioni trovate, separate da newline.
|
||||
"""
|
||||
if not cron_type:
|
||||
self.log_write("cron", "error", "Tipo cron vuoto")
|
||||
print("Tipo cron vuoto", file=os.sys.stderr)
|
||||
return ""
|
||||
|
||||
crontab = self._get_crontab()
|
||||
found_jobs = []
|
||||
|
||||
# Cerca i job principali che corrispondono al tipo e all'argomento
|
||||
for job in crontab.jobs:
|
||||
if job.comment and job.comment.startswith(f"piGarden {cron_type} {cron_arg}"):
|
||||
found_jobs.append(job.render())
|
||||
|
||||
return "\n".join(found_jobs)
|
||||
|
||||
# --- Funzioni wrapper per tipi di cron specifici ---
|
||||
|
||||
def set_cron_init(self):
|
||||
self.cron_del("init") # Assicurati che non ci siano duplicati
|
||||
self.cron_add("init", minute="@reboot")
|
||||
|
||||
def del_cron_init(self):
|
||||
self.cron_del("init")
|
||||
|
||||
def set_cron_start_socket_server(self):
|
||||
self.cron_del("start_socket_server")
|
||||
self.cron_add("start_socket_server", minute="@reboot")
|
||||
|
||||
def del_cron_start_socket_server(self):
|
||||
self.cron_del("start_socket_server")
|
||||
|
||||
def set_cron_check_rain_sensor(self):
|
||||
self.cron_del("check_rain_sensor")
|
||||
self.cron_add("check_rain_sensor", minute="*") # Ogni minuto
|
||||
|
||||
def del_cron_check_rain_sensor(self):
|
||||
self.cron_del("check_rain_sensor")
|
||||
|
||||
def set_cron_check_rain_online(self):
|
||||
self.cron_del("check_rain_online")
|
||||
self.cron_add("check_rain_online", minute="*/3") # Ogni 3 minuti
|
||||
|
||||
def del_cron_check_rain_online(self):
|
||||
self.cron_del("check_rain_online")
|
||||
|
||||
def set_cron_close_all_for_rain(self):
|
||||
self.cron_del("close_all_for_rain")
|
||||
self.cron_add("close_all_for_rain", minute="*/5") # Ogni 5 minuti
|
||||
|
||||
def del_cron_close_all_for_rain(self):
|
||||
self.cron_del("close_all_for_rain")
|
||||
|
||||
def add_cron_open(self, alias, minute, hour, dom, month, dow, disabled=""):
|
||||
if not self.alias_exists(alias):
|
||||
self.log_write("cron", "error", f"Alias '{alias}' non trovato")
|
||||
print(f"Alias '{alias}' non trovato", file=os.sys.stderr)
|
||||
return False
|
||||
self.cron_add("open", minute, hour, dom, month, dow, alias, disabled)
|
||||
return True
|
||||
|
||||
def del_cron_open(self, alias):
|
||||
if not self.alias_exists(alias):
|
||||
self.log_write("cron", "error", f"Alias '{alias}' non trovato")
|
||||
print(f"Alias '{alias}' non trovato", file=os.sys.stderr)
|
||||
return False
|
||||
self.cron_del("open", alias)
|
||||
return True
|
||||
|
||||
def get_cron_open(self, alias):
|
||||
if not self.alias_exists(alias):
|
||||
self.log_write("cron", "error", f"Alias '{alias}' non trovato")
|
||||
print(f"Alias '{alias}' non trovato", file=os.sys.stderr)
|
||||
return ""
|
||||
return self.cron_get("open", alias)
|
||||
|
||||
def del_cron_open_in(self, alias):
|
||||
if not self.alias_exists(alias):
|
||||
self.log_write("cron", "error", f"Alias '{alias}' non trovato")
|
||||
print(f"Alias '{alias}' non trovato", file=os.sys.stderr)
|
||||
return False
|
||||
self.cron_del("open_in", alias)
|
||||
self.cron_del("open_in_stop", alias)
|
||||
return True
|
||||
|
||||
def get_cron_close(self, alias):
|
||||
if not self.alias_exists(alias):
|
||||
self.log_write("cron", "error", f"Alias '{alias}' non trovato")
|
||||
print(f"Alias '{alias}' non trovato", file=os.sys.stderr)
|
||||
return ""
|
||||
return self.cron_get("close", alias)
|
||||
|
||||
def add_cron_close(self, alias, minute, hour, dom, month, dow, disabled=""):
|
||||
if not self.alias_exists(alias):
|
||||
self.log_write("cron", "error", f"Alias '{alias}' non trovato")
|
||||
print(f"Alias '{alias}' non trovato", file=os.sys.stderr)
|
||||
return False
|
||||
self.cron_add("close", minute, hour, dom, month, dow, alias, disabled)
|
||||
return True
|
||||
|
||||
def del_cron_close(self, alias):
|
||||
if not self.alias_exists(alias):
|
||||
self.log_write("cron", "error", f"Alias '{alias}' non trovato")
|
||||
print(f"Alias '{alias}' non trovato", file=os.sys.stderr)
|
||||
return False
|
||||
self.cron_del("close", alias)
|
||||
return True
|
||||
|
||||
def cron_disable_all_open_close(self):
|
||||
crontab = self._get_crontab()
|
||||
for i in range(1, self.ev_total + 1):
|
||||
alias = str(i) # Assumendo che gli alias siano i numeri delle EV
|
||||
# Disabilita le schedulazioni di apertura
|
||||
for job in list(crontab.jobs):
|
||||
if job.comment and job.comment.startswith(f"piGarden open {alias}") and job.enabled:
|
||||
job.enabled = False
|
||||
self.log_write("cron", "info", f"Disabilitato cron 'open' per alias {alias}: {job.render()}")
|
||||
|
||||
# Disabilita le schedulazioni di chiusura
|
||||
for job in list(crontab.jobs):
|
||||
if job.comment and job.comment.startswith(f"piGarden close {alias}") and job.enabled:
|
||||
job.enabled = False
|
||||
self.log_write("cron", "info", f"Disabilitato cron 'close' per alias {alias}: {job.render()}")
|
||||
try:
|
||||
crontab.write()
|
||||
self.log_write("cron", "info", "Tutte le schedulazioni di apertura/chiusura disabilitate.")
|
||||
return True
|
||||
except Exception as e:
|
||||
self.log_write("cron", "error", f"Errore durante la disabilitazione dei cron: {e}")
|
||||
return False
|
||||
|
||||
def cron_enable_all_open_close(self):
|
||||
crontab = self._get_crontab()
|
||||
for i in range(1, self.ev_total + 1):
|
||||
alias = str(i) # Assumendo che gli alias siano i numeri delle EV
|
||||
# Abilita le schedulazioni di apertura
|
||||
for job in list(crontab.jobs):
|
||||
if job.comment and job.comment.startswith(f"piGarden open {alias}") and not job.enabled:
|
||||
job.enabled = True
|
||||
self.log_write("cron", "info", f"Abilitato cron 'open' per alias {alias}: {job.render()}")
|
||||
|
||||
# Abilita le schedulazioni di chiusura
|
||||
for job in list(crontab.jobs):
|
||||
if job.comment and job.comment.startswith(f"piGarden close {alias}") and not job.enabled:
|
||||
job.enabled = True
|
||||
self.log_write("cron", "info", f"Abilitato cron 'close' per alias {alias}: {job.render()}")
|
||||
try:
|
||||
crontab.write()
|
||||
self.log_write("cron", "info", "Tutte le schedulazioni di apertura/chiusura abilitate.")
|
||||
return True
|
||||
except Exception as e:
|
||||
self.log_write("cron", "error", f"Errore durante l'abilitazione dei cron: {e}")
|
||||
return False
|
||||
|
||||
# --- Esempio di utilizzo (per testare la classe CronManager) ---
|
||||
if __name__ == "__main__":
|
||||
# Inizializza il gestore cron con le dipendenze mock
|
||||
cron_manager = CronManager(
|
||||
script_path=PI_GARDEN_SCRIPT_PATH,
|
||||
ev_total_val=EV_TOTAL,
|
||||
log_writer=log_write,
|
||||
event_trigger=trigger_event,
|
||||
alias_checker=alias_exists
|
||||
)
|
||||
|
||||
print("--- Test Cron Manager ---")
|
||||
|
||||
# Esempio: Aggiungi un cron per l'inizializzazione
|
||||
print("\nAggiungo cron 'init'...")
|
||||
cron_manager.set_cron_init()
|
||||
|
||||
# Esempio: Aggiungi un cron per aprire l'elettrovalvola "1" ogni giorno alle 7:00
|
||||
print("\nAggiungo cron 'open' per EV 1 alle 07:00...")
|
||||
cron_manager.add_cron_open("1", "0", "7", "*", "*", "*")
|
||||
|
||||
# Esempio: Aggiungi un cron per chiudere l'elettrovalvola "2" ogni 5 minuti (disabilitato)
|
||||
print("\nAggiungo cron 'close' per EV 2 ogni 5 minuti (disabilitato)...")
|
||||
cron_manager.add_cron_close("2", "*/5", "*", "*", "*", "*", "disabled")
|
||||
|
||||
# Esempio: Ottieni i cron per l'elettrovalvola "1"
|
||||
print("\nCron 'open' per EV 1:")
|
||||
print(cron_manager.get_cron_open("1"))
|
||||
|
||||
# Esempio: Ottieni i cron per l'elettrovalvola "2"
|
||||
print("\nCron 'close' per EV 2:")
|
||||
print(cron_manager.get_cron_close("2"))
|
||||
|
||||
# Esempio: Disabilita tutti i cron di apertura/chiusura
|
||||
print("\nDisabilito tutti i cron di apertura/chiusura...")
|
||||
cron_manager.cron_disable_all_open_close()
|
||||
|
||||
# Verifica lo stato dopo la disabilitazione
|
||||
print("\nCron 'open' per EV 1 dopo disabilitazione:")
|
||||
print(cron_manager.get_cron_open("1")) # Dovrebbe mostrare il job ma disabilitato
|
||||
|
||||
# Esempio: Abilita tutti i cron di apertura/chiusura
|
||||
print("\nAbilito tutti i cron di apertura/chiusura...")
|
||||
cron_manager.cron_enable_all_open_close()
|
||||
|
||||
# Verifica lo stato dopo l'abilitazione
|
||||
print("\nCron 'open' per EV 1 dopo abilitazione:")
|
||||
print(cron_manager.get_cron_open("1")) # Dovrebbe mostrare il job abilitato
|
||||
|
||||
# Esempio: Elimina un cron specifico
|
||||
print("\nElimino cron 'init'...")
|
||||
cron_manager.del_cron_init()
|
||||
|
||||
print("\nElimino cron 'open' per EV 1...")
|
||||
cron_manager.del_cron_open("1")
|
||||
|
||||
print("\nElimino cron 'close' per EV 2...")
|
||||
cron_manager.del_cron_close("2")
|
||||
|
||||
print("\n--- Test Completato ---")
|
||||
print("Controlla il tuo crontab con 'crontab -l' per vedere le modifiche.")
|
||||
487
Python/drv.include.py
Normal file
487
Python/drv.include.py
Normal file
@@ -0,0 +1,487 @@
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
import subprocess # Per eseguire comandi esterni come 'gpio' se necessario
|
||||
|
||||
# --- Mock/Placeholder per le dipendenze esterne e configurazione ---
|
||||
# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale.
|
||||
|
||||
# Configura un logger di base per le funzioni di log
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def log_write(log_type, level, message):
|
||||
"""Simula la funzione log_write dal tuo script Bash."""
|
||||
if level == "info":
|
||||
logging.info(f"[{log_type}] {message}")
|
||||
elif level == "warning":
|
||||
logging.warning(f"[{log_type}] {message}")
|
||||
elif level == "error":
|
||||
logging.error(f"[{log_type}] {message}")
|
||||
else:
|
||||
logging.debug(f"[{log_type}] {message}")
|
||||
|
||||
def message_write(msg_type, message):
|
||||
"""Simula la funzione message_write dal tuo script Bash."""
|
||||
if msg_type == 'info':
|
||||
logging.info(f"[message] INFO: {message}")
|
||||
elif msg_type == 'warning':
|
||||
logging.warning(f"[message] WARNING: {message}")
|
||||
elif msg_type == 'success':
|
||||
logging.info(f"[message] SUCCESS: {message}")
|
||||
|
||||
# Variabili di configurazione simulate (dovrebbero venire dal piGarden.conf)
|
||||
# Queste saranno passate alla classe DriverManager
|
||||
mock_config = {
|
||||
"EV_TOTAL": 6,
|
||||
"SUPPLY_GPIO_1": 2,
|
||||
"SUPPLY_GPIO_2": 3,
|
||||
"RAIN_GPIO": 25,
|
||||
"WEATHER_SERVICE": "drv:openweathermap", # Esempio di driver di servizio meteo
|
||||
"RELE_GPIO_CLOSE": 0,
|
||||
"RELE_GPIO_OPEN": 1,
|
||||
"SUPPLY_GPIO_POS": 0,
|
||||
"SUPPLY_GPIO_NEG": 1,
|
||||
"GPIO": "/usr/local/bin/gpio", # Percorso al comando gpio (wiringPi)
|
||||
"CUT": "/usr/bin/cut", # Percorso al comando cut
|
||||
"LOG_OUTPUT_DRV_FILE": "/tmp/piGarden.drv.log", # Percorso per il log dei driver
|
||||
# Elettrovalvole di esempio per setup_drv
|
||||
"EV1_GPIO": "17",
|
||||
"EV2_GPIO": "drv:custom_rele", # Esempio di un GPIO gestito da un driver custom
|
||||
"EV3_GPIO": "22",
|
||||
"EV4_GPIO": "18",
|
||||
"EV5_GPIO": "23",
|
||||
"EV6_GPIO": "24",
|
||||
}
|
||||
|
||||
# --- Classe DriverManager ---
|
||||
|
||||
class DriverManager:
|
||||
def __init__(self, config, log_writer, message_writer):
|
||||
self.config = config
|
||||
self.log_write = log_writer
|
||||
self.message_write = message_writer
|
||||
self.list_drv = [] # Lista dei driver attivi rilevati
|
||||
|
||||
# Percorsi degli strumenti esterni (dal config)
|
||||
self.gpio_cmd = self.config.get("GPIO")
|
||||
self.cut_cmd = self.config.get("CUT")
|
||||
self.log_output_drv_file = self.config.get("LOG_OUTPUT_DRV_FILE")
|
||||
|
||||
# Inizializza il file di log dei driver se non esiste
|
||||
if not os.path.exists(self.log_output_drv_file):
|
||||
open(self.log_output_drv_file, 'a').close()
|
||||
|
||||
# Configura un logger specifico per l'output dei driver, come nello script Bash
|
||||
self.drv_logger = logging.getLogger('driver_output')
|
||||
self.drv_logger.setLevel(logging.INFO)
|
||||
# Rimuovi handler esistenti per evitare duplicati se chiamato più volte
|
||||
if not self.drv_logger.handlers:
|
||||
drv_handler = logging.FileHandler(self.log_output_drv_file, mode='a')
|
||||
drv_formatter = logging.Formatter('%(asctime)s %(message)s')
|
||||
drv_handler.setFormatter(drv_formatter)
|
||||
self.drv_logger.addHandler(drv_handler)
|
||||
self.drv_logger.propagate = False # Evita che i log vadano al logger root
|
||||
|
||||
# Placeholder per le funzioni GPIO dirette (sostituire con RPi.GPIO o gpiozero)
|
||||
# Esempio con subprocess per il comando 'gpio' (meno Pythonico ma più fedele al Bash)
|
||||
self._gpio_write = lambda gpio_id, value: self._run_gpio_command("write", gpio_id, value)
|
||||
self._gpio_mode = lambda gpio_id, mode: self._run_gpio_command("mode", gpio_id, mode)
|
||||
self._gpio_read = lambda gpio_id: self._run_gpio_command("read", gpio_id)
|
||||
|
||||
# Inizializza i driver al momento della creazione dell'istanza
|
||||
self.setup_drv()
|
||||
|
||||
def _run_gpio_command(self, action, gpio_id, value=None):
|
||||
"""Esegue un comando 'gpio' tramite subprocess."""
|
||||
cmd = [self.gpio_cmd, "-g", action, str(gpio_id)]
|
||||
if value is not None:
|
||||
cmd.append(str(value))
|
||||
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||||
self.drv_logger.info(f"GPIO command '{' '.join(cmd)}' output: {result.stdout.strip()}")
|
||||
return result.stdout.strip()
|
||||
except subprocess.CalledProcessError as e:
|
||||
self.log_write("drv", "error", f"Errore GPIO command '{' '.join(cmd)}': {e.stderr.strip()}")
|
||||
self.message_write("warning", f"Errore GPIO: {e.stderr.strip()}")
|
||||
return None # O solleva un'eccezione
|
||||
|
||||
def setup_drv(self):
|
||||
"""
|
||||
Funzione eseguita ad ogni avvio, include i driver e lancia le funzioni di setup.
|
||||
"""
|
||||
self.list_drv = [] # Azzera la lista dei driver
|
||||
|
||||
# Raccoglie i nomi dei driver utilizzati per le elettrovalvole
|
||||
ev_total = self.config.get("EV_TOTAL", 0)
|
||||
for i in range(1, ev_total + 1):
|
||||
gpio_val = self.config.get(f"EV{i}_GPIO", "")
|
||||
if gpio_val.startswith("drv:"):
|
||||
drv = gpio_val.split(":")[1]
|
||||
if drv not in self.list_drv:
|
||||
self.list_drv.append(drv)
|
||||
|
||||
# Raccoglie i nomi dei driver utilizzati per gli altri gpio e servizi
|
||||
for key in ["SUPPLY_GPIO_1", "SUPPLY_GPIO_2", "RAIN_GPIO", "WEATHER_SERVICE"]:
|
||||
gpio_val = self.config.get(key, "")
|
||||
if isinstance(gpio_val, str) and gpio_val.startswith("drv:"):
|
||||
drv = gpio_val.split(":")[1]
|
||||
if drv not in self.list_drv:
|
||||
self.list_drv.append(drv)
|
||||
|
||||
# Simula l'inclusione dei file dei driver e l'esecuzione della funzione di setup
|
||||
for drv in self.list_drv:
|
||||
# In un'applicazione reale, qui potresti caricare moduli Python specifici
|
||||
# per ogni driver o chiamare metodi dedicati.
|
||||
# Per ora, simuliamo la chiamata a drv_<drv>_setup
|
||||
setup_func_name = f"drv_{drv}_setup"
|
||||
if hasattr(self, setup_func_name) and callable(getattr(self, setup_func_name)):
|
||||
self.drv_logger.info(f"{setup_func_name}")
|
||||
try:
|
||||
getattr(self, setup_func_name)()
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {setup_func_name}: {e}")
|
||||
else:
|
||||
self.drv_logger.info(f"Nessuna funzione di setup trovata per driver: {drv}")
|
||||
|
||||
def get_driver_callback(self, function_name, driver_id):
|
||||
"""
|
||||
Restituisce il nome del metodo interno da richiamare per una specifica funzione del driver.
|
||||
"""
|
||||
if isinstance(driver_id, str) and driver_id.startswith("drv:"):
|
||||
drv = driver_id.split(":")[1]
|
||||
if drv not in self.list_drv:
|
||||
return "drvnotfound"
|
||||
return f"drv_{drv}_{function_name}"
|
||||
return None # Nessun driver specifico, useremo il GPIO diretto
|
||||
|
||||
# --- Implementazioni delle funzioni drv_* ---
|
||||
|
||||
# Esempio di un driver custom (simulato)
|
||||
def drv_custom_rele_rele_init(self, gpio_id):
|
||||
self.drv_logger.info(f"Custom Relè Driver: Inizializzazione {gpio_id}")
|
||||
# Logica specifica per il relè custom
|
||||
# Esempio: self.custom_rele_board.init(gpio_id)
|
||||
return True
|
||||
|
||||
def drv_custom_rele_rele_close(self, gpio_id):
|
||||
self.drv_logger.info(f"Custom Relè Driver: Chiusura {gpio_id}")
|
||||
# Logica specifica per il relè custom
|
||||
# Esempio: self.custom_rele_board.set_state(gpio_id, 'closed')
|
||||
return True
|
||||
|
||||
def drv_custom_rele_rele_open(self, gpio_id):
|
||||
self.drv_logger.info(f"Custom Relè Driver: Apertura {gpio_id}")
|
||||
# Logica specifica per il relè custom
|
||||
# Esempio: self.custom_rele_board.set_state(gpio_id, 'open')
|
||||
return True
|
||||
|
||||
def drv_openweathermap_rain_online_get(self, driver_id):
|
||||
self.drv_logger.info(f"OpenWeatherMap Driver: Recupero dati meteo online per {driver_id}")
|
||||
# Qui faresti una chiamata API reale a OpenWeatherMap
|
||||
# Esempio:
|
||||
# import requests
|
||||
# api_key = self.config.get("OPENWEATHERMAP_KEY")
|
||||
# location = self.config.get("OPENWEATHERMAP_LOCATION")
|
||||
# url = f"http://api.openweathermap.org/data/2.5/weather?{location}&appid={api_key}"
|
||||
# try:
|
||||
# response = requests.get(url)
|
||||
# response.raise_for_status() # Solleva un'eccezione per errori HTTP
|
||||
# data = response.json()
|
||||
# # Estrai lo stato della pioggia da 'data'
|
||||
# if 'rain' in data and data['rain']:
|
||||
# return "1" # Indica pioggia
|
||||
# return "0" # Nessuna pioggia
|
||||
# except requests.exceptions.RequestException as e:
|
||||
# self.log_write("drv", "error", f"Errore OpenWeatherMap API: {e}")
|
||||
# self.message_write("warning", "Errore servizio meteo online")
|
||||
# return ""
|
||||
return "0" # Mock return
|
||||
|
||||
def drv_rele_init(self, gpio_id):
|
||||
"""Inizializza un relè e lo porta nello stato aperto."""
|
||||
fnc_name = self.get_driver_callback("rele_init", gpio_id)
|
||||
|
||||
if fnc_name is None: # Nessun driver specifico, usa GPIO diretto
|
||||
self._gpio_write(gpio_id, self.config.get("RELE_GPIO_OPEN"))
|
||||
self._gpio_mode(gpio_id, "out")
|
||||
elif fnc_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
|
||||
self.message_write("warning", f"Driver non trovato: {gpio_id}")
|
||||
else:
|
||||
# Chiama la funzione del driver dinamico
|
||||
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
|
||||
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
|
||||
try:
|
||||
getattr(self, fnc_name)(gpio_id)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
|
||||
|
||||
def drv_rele_close(self, gpio_id):
|
||||
"""Chiude un relè."""
|
||||
fnc_name = self.get_driver_callback("rele_close", gpio_id)
|
||||
|
||||
if fnc_name is None:
|
||||
self._gpio_write(gpio_id, self.config.get("RELE_GPIO_CLOSE"))
|
||||
elif fnc_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
|
||||
self.message_write("warning", f"Driver non trovato: {gpio_id}")
|
||||
return False # Fallimento
|
||||
else:
|
||||
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
|
||||
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
|
||||
try:
|
||||
getattr(self, fnc_name)(gpio_id)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
|
||||
return False
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
return False
|
||||
return True # Successo
|
||||
|
||||
def drv_rele_open(self, gpio_id):
|
||||
"""Apre un relè."""
|
||||
fnc_name = self.get_driver_callback("rele_open", gpio_id)
|
||||
|
||||
if fnc_name is None:
|
||||
self._gpio_write(gpio_id, self.config.get("RELE_GPIO_OPEN"))
|
||||
elif fnc_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
|
||||
self.message_write("warning", f"Driver non trovato: {gpio_id}")
|
||||
return False
|
||||
else:
|
||||
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
|
||||
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
|
||||
try:
|
||||
getattr(self, fnc_name)(gpio_id)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
|
||||
return False
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
return False
|
||||
return True
|
||||
|
||||
def drv_supply_bistable_init(self, idx1, idx2):
|
||||
"""Inizializza i relè che gestiscono l'alimentazione per le valvole bistabili."""
|
||||
fnc1_name = self.get_driver_callback("supply_bistable_init", idx1)
|
||||
fnc2_name = self.get_driver_callback("supply_bistable_init", idx2)
|
||||
|
||||
if fnc1_name is None:
|
||||
self._gpio_write(idx1, 0)
|
||||
self._gpio_mode(idx1, "out")
|
||||
elif fnc1_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {idx1}")
|
||||
self.message_write("warning", f"Driver non trovato: {idx1}")
|
||||
return
|
||||
else:
|
||||
if hasattr(self, fnc1_name) and callable(getattr(self, fnc1_name)):
|
||||
self.drv_logger.info(f"{fnc1_name} arg:{idx1}")
|
||||
try:
|
||||
getattr(self, fnc1_name)(idx1)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc1_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc1_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc1_name}' non implementata.")
|
||||
|
||||
if fnc2_name is None:
|
||||
self._gpio_write(idx2, 0)
|
||||
self._gpio_mode(idx2, "out")
|
||||
elif fnc2_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {idx2}")
|
||||
self.message_write("warning", f"Driver non trovato: {idx2}")
|
||||
else:
|
||||
if hasattr(self, fnc2_name) and callable(getattr(self, fnc2_name)):
|
||||
self.drv_logger.info(f"{fnc2_name} arg:{idx2}")
|
||||
try:
|
||||
getattr(self, fnc2_name)(idx2)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc2_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc2_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc2_name}' non implementata.")
|
||||
|
||||
def drv_supply_positive(self, idx1, idx2):
|
||||
"""Imposta la tensione positiva per le elettrovalvole bistabili."""
|
||||
fnc1_name = self.get_driver_callback("supply_positive", idx1)
|
||||
fnc2_name = self.get_driver_callback("supply_positive", idx2)
|
||||
|
||||
if fnc1_name is None:
|
||||
self._gpio_write(idx1, self.config.get("SUPPLY_GPIO_POS"))
|
||||
elif fnc1_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {idx1}")
|
||||
self.message_write("warning", f"Driver non trovato: {idx1}")
|
||||
return
|
||||
else:
|
||||
if hasattr(self, fnc1_name) and callable(getattr(self, fnc1_name)):
|
||||
self.drv_logger.info(f"{fnc1_name} arg:{idx1}")
|
||||
try:
|
||||
getattr(self, fnc1_name)(idx1)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc1_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc1_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc1_name}' non implementata.")
|
||||
|
||||
if fnc2_name is None:
|
||||
self._gpio_write(idx2, self.config.get("SUPPLY_GPIO_POS"))
|
||||
elif fnc2_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {idx2}")
|
||||
self.message_write("warning", f"Driver non trovato: {idx2}")
|
||||
else:
|
||||
if hasattr(self, fnc2_name) and callable(getattr(self, fnc2_name)):
|
||||
self.drv_logger.info(f"{fnc2_name} arg:{idx2}")
|
||||
try:
|
||||
getattr(self, fnc2_name)(idx2)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc2_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc2_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc2_name}' non implementata.")
|
||||
|
||||
def drv_supply_negative(self, idx1, idx2):
|
||||
"""Imposta la tensione negativa per le elettrovalvole bistabili."""
|
||||
fnc1_name = self.get_driver_callback("supply_negative", idx1)
|
||||
fnc2_name = self.get_driver_callback("supply_negative", idx2)
|
||||
|
||||
if fnc1_name is None:
|
||||
self._gpio_write(idx1, self.config.get("SUPPLY_GPIO_NEG"))
|
||||
elif fnc1_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {idx1}")
|
||||
self.message_write("warning", f"Driver non trovato: {idx1}")
|
||||
return
|
||||
else:
|
||||
if hasattr(self, fnc1_name) and callable(getattr(self, fnc1_name)):
|
||||
self.drv_logger.info(f"{fnc1_name} arg:{idx1}")
|
||||
try:
|
||||
getattr(self, fnc1_name)(idx1)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc1_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc1_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc1_name}' non implementata.")
|
||||
|
||||
if fnc2_name is None:
|
||||
self._gpio_write(idx2, self.config.get("SUPPLY_GPIO_NEG"))
|
||||
elif fnc2_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {idx2}")
|
||||
self.message_write("warning", f"Driver non trovato: {idx2}")
|
||||
else:
|
||||
if hasattr(self, fnc2_name) and callable(getattr(self, fnc2_name)):
|
||||
self.drv_logger.info(f"{fnc2_name} arg:{idx2}")
|
||||
try:
|
||||
getattr(self, fnc2_name)(idx2)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc2_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc2_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc2_name}' non implementata.")
|
||||
|
||||
def drv_rain_sensor_init(self, gpio_id):
|
||||
"""Inizializza il sensore della pioggia."""
|
||||
fnc_name = self.get_driver_callback("rain_sensor_init", gpio_id)
|
||||
|
||||
if fnc_name is None:
|
||||
self._gpio_mode(gpio_id, "in")
|
||||
elif fnc_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
|
||||
self.message_write("warning", f"Driver non trovato: {gpio_id}")
|
||||
else:
|
||||
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
|
||||
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
|
||||
try:
|
||||
getattr(self, fnc_name)(gpio_id)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
|
||||
def drv_rain_sensor_get(self, gpio_id):
|
||||
"""Legge lo stato del sensore della pioggia."""
|
||||
fnc_name = self.get_driver_callback("rain_sensor_get", gpio_id)
|
||||
vret = ""
|
||||
|
||||
if fnc_name is None:
|
||||
vret = self._gpio_read(gpio_id)
|
||||
elif fnc_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
|
||||
self.message_write("warning", f"Driver non trovato: {gpio_id}")
|
||||
else:
|
||||
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
|
||||
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
|
||||
try:
|
||||
vret = getattr(self, fnc_name)(gpio_id)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
return vret
|
||||
|
||||
def drv_rain_online_get(self, driver_id):
|
||||
"""Legge lo stato delle condizioni meteo dal servizio online."""
|
||||
fnc_name = self.get_driver_callback("rain_online_get", driver_id)
|
||||
vret = ""
|
||||
|
||||
if fnc_name is None or fnc_name == "drvnotfound":
|
||||
self.log_write("drv", "error", f"Driver non trovato o non specificato per il servizio meteo: {driver_id}")
|
||||
self.message_write("warning", f"Driver non trovato per il servizio meteo: {driver_id}")
|
||||
else:
|
||||
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
|
||||
self.drv_logger.info(f"{fnc_name} arg:{driver_id}")
|
||||
try:
|
||||
vret = getattr(self, fnc_name)(driver_id)
|
||||
except Exception as e:
|
||||
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
|
||||
else:
|
||||
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
|
||||
return vret
|
||||
|
||||
# --- Esempio di utilizzo ---
|
||||
if __name__ == "__main__":
|
||||
print("--- Test DriverManager ---")
|
||||
|
||||
# Inizializza il DriverManager con la configurazione mock e le funzioni di log/messaggio
|
||||
driver_manager = DriverManager(mock_config, log_write, message_write)
|
||||
|
||||
print("\n--- Test setup_drv (eseguito all'inizializzazione) ---")
|
||||
print(f"Driver rilevati: {driver_manager.list_drv}")
|
||||
|
||||
print("\n--- Test drv_rele_init (GPIO diretto) ---")
|
||||
driver_manager.drv_rele_init("17") # EV1_GPIO = 17
|
||||
|
||||
print("\n--- Test drv_rele_close (GPIO diretto) ---")
|
||||
driver_manager.drv_rele_close("17")
|
||||
|
||||
print("\n--- Test drv_rele_open (GPIO diretto) ---")
|
||||
driver_manager.drv_rele_open("17")
|
||||
|
||||
print("\n--- Test drv_rele_init (Custom Driver) ---")
|
||||
driver_manager.drv_rele_init("drv:custom_rele") # EV2_GPIO = drv:custom_rele
|
||||
|
||||
print("\n--- Test drv_supply_positive ---")
|
||||
driver_manager.drv_supply_positive(mock_config["SUPPLY_GPIO_1"], mock_config["SUPPLY_GPIO_2"])
|
||||
|
||||
print("\n--- Test drv_rain_sensor_init ---")
|
||||
driver_manager.drv_rain_sensor_init(mock_config["RAIN_GPIO"])
|
||||
|
||||
print("\n--- Test drv_rain_sensor_get ---")
|
||||
rain_sensor_state = driver_manager.drv_rain_sensor_get(mock_config["RAIN_GPIO"])
|
||||
print(f"Stato sensore pioggia: {rain_sensor_state}")
|
||||
|
||||
print("\n--- Test drv_rain_online_get (OpenWeatherMap Driver) ---")
|
||||
online_rain_state = driver_manager.drv_rain_online_get(mock_config["WEATHER_SERVICE"])
|
||||
print(f"Stato pioggia online: {online_rain_state}")
|
||||
|
||||
print("\n--- Test completato ---")
|
||||
print(f"Controlla il file di log dei driver: {driver_manager.log_output_drv_file}")
|
||||
255
Python/event_include.py
Normal file
255
Python/event_include.py
Normal file
@@ -0,0 +1,255 @@
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
import threading
|
||||
import logging
|
||||
|
||||
# --- Mock/Placeholder per le dipendenze esterne e configurazione ---
|
||||
# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale.
|
||||
|
||||
# Configura un logger di base per le funzioni di log
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def log_write(log_type, level, message):
|
||||
"""Simula la funzione log_write dal tuo script Bash."""
|
||||
if level == "info":
|
||||
logging.info(f"[{log_type}] {message}")
|
||||
elif level == "warning":
|
||||
logging.warning(f"[{log_type}] {message}")
|
||||
elif level == "error":
|
||||
logging.error(f"[{log_type}] {message}")
|
||||
else:
|
||||
logging.debug(f"[{log_type}] {message}")
|
||||
|
||||
def message_write(msg_type, message):
|
||||
"""Simula la funzione message_write dal tuo script Bash."""
|
||||
if msg_type == 'info':
|
||||
logging.info(f"[message] INFO: {message}")
|
||||
elif msg_type == 'warning':
|
||||
logging.warning(f"[message] WARNING: {message}")
|
||||
elif msg_type == 'success':
|
||||
logging.info(f"[message] SUCCESS: {message}")
|
||||
|
||||
def mqtt_status_mock(*args):
|
||||
"""Simula la funzione mqtt_status. In un'applicazione reale, chiamerebbe il tuo modulo MQTT."""
|
||||
log_write("mqtt", "info", f"Simulando mqtt_status con args: {args}")
|
||||
|
||||
# Variabili di configurazione simulate
|
||||
mock_config = {
|
||||
"EVENT_DIR": "/tmp/piGarden_events", # Assicurati che questa directory esista per i test
|
||||
}
|
||||
|
||||
# --- Classe EventManager ---
|
||||
|
||||
class EventManager:
|
||||
def __init__(self, event_dir, log_writer, mqtt_status_func, message_writer_func=None):
|
||||
self.event_dir = event_dir
|
||||
self.log_write = log_writer
|
||||
self.mqtt_status = mqtt_status_func
|
||||
self.message_write = message_writer_func if message_writer_func else log_writer # Fallback if not provided
|
||||
|
||||
self.CURRENT_EVENT = ""
|
||||
self.CURRENT_EVENT_ALIAS = ""
|
||||
|
||||
# Assicurati che la directory degli eventi esista per i test
|
||||
os.makedirs(self.event_dir, exist_ok=True)
|
||||
|
||||
def _build_script_args(self, event, *args):
|
||||
"""
|
||||
Costruisce la lista di argomenti per lo script esterno basandosi sul tipo di evento.
|
||||
"""
|
||||
script_args = [event] # Il primo argomento è sempre il nome dell'evento
|
||||
|
||||
# Aggiungi il timestamp corrente come ultimo argomento per tutti i casi
|
||||
current_timestamp = str(int(time.time()))
|
||||
|
||||
if event in ["ev_open_before", "ev_open_after"]:
|
||||
# ALIAS="$2", FORCE="$3"
|
||||
alias = args[0] if len(args) > 0 else ""
|
||||
force = args[1] if len(args) > 1 else ""
|
||||
self.CURRENT_EVENT_ALIAS = alias
|
||||
script_args.extend([alias, force, current_timestamp])
|
||||
elif event == "ev_open_in_before":
|
||||
# ALIAS="$2", FORCE="$3", MINUTE_START="$4", MINUTE_STOP="$5"
|
||||
alias = args[0] if len(args) > 0 else ""
|
||||
force = args[1] if len(args) > 1 else ""
|
||||
minute_start = args[2] if len(args) > 2 else ""
|
||||
minute_stop = args[3] if len(args) > 3 else ""
|
||||
self.CURRENT_EVENT_ALIAS = alias
|
||||
script_args.extend([alias, force, minute_start, minute_stop, current_timestamp])
|
||||
elif event == "ev_open_in_after":
|
||||
# ALIAS="$2", FORCE="$3", CRON_START="$4", CRON_STOP="$5"
|
||||
alias = args[0] if len(args) > 0 else ""
|
||||
force = args[1] if len(args) > 1 else ""
|
||||
cron_start = args[2] if len(args) > 2 else ""
|
||||
cron_stop = args[3] if len(args) > 3 else ""
|
||||
self.CURRENT_EVENT_ALIAS = alias
|
||||
script_args.extend([alias, force, cron_start, cron_stop, current_timestamp])
|
||||
elif event in ["ev_close_before", "ev_close_after"]:
|
||||
# ALIAS="$2"
|
||||
alias = args[0] if len(args) > 0 else ""
|
||||
self.CURRENT_EVENT_ALIAS = alias
|
||||
script_args.extend([alias, current_timestamp])
|
||||
elif event in ["check_rain_sensor_before", "check_rain_sensor_after", "check_rain_sensor_change"]:
|
||||
# STATE="$2"
|
||||
state = args[0] if len(args) > 0 else ""
|
||||
script_args.extend([state, current_timestamp])
|
||||
elif event == "check_rain_online_before":
|
||||
# STATE="$2"
|
||||
state = args[0] if len(args) > 0 else ""
|
||||
script_args.extend([state, current_timestamp])
|
||||
elif event in ["check_rain_online_after", "check_rain_online_change"]:
|
||||
# STATE="$2", WEATHER="$3"
|
||||
state = args[0] if len(args) > 0 else ""
|
||||
weather = args[1] if len(args) > 1 else ""
|
||||
script_args.extend([state, weather, current_timestamp])
|
||||
elif event in ["init_before", "init_after", "exec_poweroff_before", "exec_poweroff_after",
|
||||
"exec_reboot_before", "exec_reboot_after"]:
|
||||
# Nessun argomento specifico oltre l'evento, ma il Bash script passa $2 come CAUSE (spesso vuoto)
|
||||
# e il timestamp. Qui passiamo solo il timestamp.
|
||||
script_args.extend([current_timestamp])
|
||||
elif event in ["cron_add_before", "cron_add_after"]:
|
||||
# CRON_TYPE="$2", CRON_ARG="$3", CRON_ELEMENT="$4"
|
||||
cron_type = args[0] if len(args) > 0 else ""
|
||||
cron_arg = args[1] if len(args) > 1 else ""
|
||||
cron_element = args[2] if len(args) > 2 else ""
|
||||
script_args.extend([cron_type, cron_arg, cron_element, current_timestamp])
|
||||
elif event in ["cron_del_before", "cron_del_after"]:
|
||||
# CRON_TYPE="$2", CRON_ARG="$3"
|
||||
cron_type = args[0] if len(args) > 0 else ""
|
||||
cron_arg = args[1] if len(args) > 1 else ""
|
||||
script_args.extend([cron_type, cron_arg, current_timestamp])
|
||||
else: # Caso generico
|
||||
# EVENT="$1", CAUSE="$2"
|
||||
cause = args[0] if len(args) > 0 else ""
|
||||
script_args.extend([cause, current_timestamp])
|
||||
|
||||
return script_args
|
||||
|
||||
def trigger_event(self, event, *args):
|
||||
"""
|
||||
Attiva un evento ed esegue gli script associati.
|
||||
:param event: Nome dell'evento da attivare.
|
||||
:param args: Argomenti aggiuntivi da passare agli script dell'evento.
|
||||
:return: Codice di uscita dell'ultimo script eseguito, o 0 se tutto ok.
|
||||
"""
|
||||
self.log_write("event", "info", f"Triggering event: {event} with args: {args}")
|
||||
|
||||
current_event_dir = os.path.join(self.event_dir, event)
|
||||
return_code = 0
|
||||
|
||||
if os.path.isdir(current_event_dir):
|
||||
# Ordina i file per garantire un ordine di esecuzione consistente
|
||||
files_in_dir = sorted(os.listdir(current_event_dir))
|
||||
for f_name in files_in_dir:
|
||||
script_path = os.path.join(current_event_dir, f_name)
|
||||
|
||||
# Controlla se è un file eseguibile
|
||||
if os.path.isfile(script_path) and os.access(script_path, os.X_OK):
|
||||
script_args = self._build_script_args(event, *args)
|
||||
|
||||
self.log_write("event", "info", f"Executing event script: {script_path} with args: {script_args}")
|
||||
try:
|
||||
# Esegue lo script esterno, reindirizzando stdout/stderr a DEVNULL per replicare &> /dev/null
|
||||
result = subprocess.run(
|
||||
[script_path] + script_args,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True, # Solleva CalledProcessError per codici di uscita non zero
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL
|
||||
)
|
||||
return_code = result.returncode
|
||||
except subprocess.CalledProcessError as e:
|
||||
return_code = e.returncode
|
||||
self.log_write("event", "error",
|
||||
f"Script evento '{script_path}' fallito con codice {return_code}. "
|
||||
f"Output: {e.stdout.strip()} Errore: {e.stderr.strip()}")
|
||||
|
||||
# Aggiorna CURRENT_EVENT e chiama mqtt_status in background
|
||||
self.CURRENT_EVENT = event
|
||||
threading.Thread(target=self.mqtt_status, args=(self.CURRENT_EVENT,)).start()
|
||||
|
||||
self.log_write("event", "error",
|
||||
f"Stop catena di eventi per codice di uscita {return_code} in {script_path}")
|
||||
return return_code # Ferma l'esecuzione degli script successivi
|
||||
|
||||
except FileNotFoundError:
|
||||
self.log_write("event", "error", f"Script evento non trovato: {script_path}")
|
||||
return_code = 127 # Codice di errore comune per "comando non trovato"
|
||||
break # Ferma l'esecuzione degli script successivi
|
||||
except Exception as e:
|
||||
self.log_write("event", "error", f"Errore generico durante l'esecuzione di {script_path}: {e}")
|
||||
return_code = 1 # Errore generico
|
||||
break # Ferma l'esecuzione degli script successivi
|
||||
else:
|
||||
self.log_write("event", "info", f"Nessuna directory eventi trovata per: {event}")
|
||||
|
||||
# Aggiorna CURRENT_EVENT e chiama mqtt_status in background, indipendentemente dal successo
|
||||
self.CURRENT_EVENT = event
|
||||
threading.Thread(target=self.mqtt_status, args=(self.CURRENT_EVENT,)).start()
|
||||
|
||||
return return_code
|
||||
|
||||
# --- Esempio di utilizzo ---
|
||||
if __name__ == "__main__":
|
||||
print("--- Test EventManager ---")
|
||||
|
||||
# Crea una directory di test per gli eventi e alcuni script fittizi
|
||||
test_event_dir = mock_config["EVENT_DIR"]
|
||||
os.makedirs(os.path.join(test_event_dir, "test_event"), exist_ok=True)
|
||||
os.makedirs(os.path.join(test_event_dir, "error_event"), exist_ok=True)
|
||||
os.makedirs(os.path.join(test_event_dir, "ev_open_before"), exist_ok=True)
|
||||
|
||||
# Script fittizio che ha successo
|
||||
with open(os.path.join(test_event_dir, "test_event", "script_successo.sh"), "w") as f:
|
||||
f.write("#!/bin/bash\n")
|
||||
f.write("echo 'Script di successo eseguito per $1'\n")
|
||||
f.write("exit 0\n")
|
||||
os.chmod(os.path.join(test_event_dir, "test_event", "script_successo.sh"), 0o755)
|
||||
|
||||
# Script fittizio che fallisce
|
||||
with open(os.path.join(test_event_dir, "error_event", "script_fallimento.sh"), "w") as f:
|
||||
f.write("#!/bin/bash\n")
|
||||
f.write("echo 'Script di fallimento eseguito per $1'\n")
|
||||
f.write("exit 10\n") # Codice di uscita non zero
|
||||
os.chmod(os.path.join(test_event_dir, "error_event", "script_fallimento.sh"), 0o755)
|
||||
|
||||
# Script fittizio per ev_open_before
|
||||
with open(os.path.join(test_event_dir, "ev_open_before", "log_open_before.sh"), "w") as f:
|
||||
f.write("#!/bin/bash\n")
|
||||
f.write("echo 'ev_open_before: Evento=$1, Alias=$2, Force=$3, Timestamp=$4' >> /tmp/event_test.log\n")
|
||||
f.write("exit 0\n")
|
||||
os.chmod(os.path.join(test_event_dir, "ev_open_before", "log_open_before.sh"), 0o755)
|
||||
|
||||
|
||||
event_manager = EventManager(
|
||||
event_dir=mock_config["EVENT_DIR"],
|
||||
log_writer=log_write,
|
||||
mqtt_status_func=mqtt_status_mock,
|
||||
message_writer_func=message_write
|
||||
)
|
||||
|
||||
print("\n--- Esecuzione di un evento di successo ---")
|
||||
result = event_manager.trigger_event("test_event", "some_cause")
|
||||
print(f"Codice di ritorno: {result}")
|
||||
print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}")
|
||||
print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}")
|
||||
|
||||
print("\n--- Esecuzione di un evento che fallisce ---")
|
||||
result = event_manager.trigger_event("error_event", "another_cause")
|
||||
print(f"Codice di ritorno: {result}")
|
||||
print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}")
|
||||
print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}")
|
||||
|
||||
print("\n--- Esecuzione di un evento specifico (ev_open_before) ---")
|
||||
result = event_manager.trigger_event("ev_open_before", "Zona_1", "true")
|
||||
print(f"Codice di ritorno: {result}")
|
||||
print(f"CURRENT_EVENT: {event_manager.CURRENT_EVENT}")
|
||||
print(f"CURRENT_EVENT_ALIAS: {event_manager.CURRENT_EVENT_ALIAS}")
|
||||
print("Controlla /tmp/event_test.log per l'output dello script.")
|
||||
|
||||
print("\n--- Test completato ---")
|
||||
# Pulisci le directory di test (opzionale)
|
||||
# import shutil
|
||||
# shutil.rmtree(test_event_dir)
|
||||
1616
Python/piGarden.py
Normal file
1616
Python/piGarden.py
Normal file
File diff suppressed because it is too large
Load Diff
432
Python/rain_include.py
Normal file
432
Python/rain_include.py
Normal file
@@ -0,0 +1,432 @@
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import logging
|
||||
|
||||
# --- Mock/Placeholder per le dipendenze esterne e configurazione ---
|
||||
# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale.
|
||||
|
||||
# Configura un logger di base per le funzioni di log
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def log_write(log_type, level, message):
|
||||
"""Simula la funzione log_write dal tuo script Bash."""
|
||||
if level == "info":
|
||||
logging.info(f"[{log_type}] {message}")
|
||||
elif level == "warning":
|
||||
logging.warning(f"[{log_type}] {message}")
|
||||
elif level == "error":
|
||||
logging.error(f"[{log_type}] {message}")
|
||||
else:
|
||||
logging.debug(f"[{log_type}] {message}")
|
||||
|
||||
# Mock per la classe EventManager (dal file event_manager_py)
|
||||
class MockEventManager:
|
||||
def __init__(self):
|
||||
self.CURRENT_EVENT = ""
|
||||
self.CURRENT_EVENT_ALIAS = ""
|
||||
|
||||
def trigger_event(self, event, *args):
|
||||
log_write("event", "info", f"Mock EventManager: Triggered event: {event} with args: {args}")
|
||||
self.CURRENT_EVENT = event
|
||||
# Simula un codice di ritorno di successo
|
||||
return 0
|
||||
|
||||
# Mock per la classe DriverManager (dal file driver_manager_py)
|
||||
class MockDriverManager:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def drv_rain_online_get(self, service_id):
|
||||
"""Simula il recupero dello stato pioggia online."""
|
||||
# Restituisce un timestamp se piove, un valore negativo se non piove, o 0 in caso di errore
|
||||
# Per i test, simuliamo che non piova (-1) o che piova (timestamp attuale)
|
||||
if "openweathermap" in service_id:
|
||||
# Simula pioggia 50% delle volte
|
||||
if time.time() % 2 == 0:
|
||||
# Scrivi un mock di dati meteo online per il test
|
||||
mock_weather_data = {
|
||||
"weather": [{"description": "light rain"}],
|
||||
"main": {"temp": 280, "humidity": 90}
|
||||
}
|
||||
with open(os.path.join(self.config["STATUS_DIR"], "last_weather_online"), "w") as f:
|
||||
json.dump(mock_weather_data, f)
|
||||
return str(int(time.time())) # Simula pioggia (timestamp)
|
||||
else:
|
||||
return "-1" # Simula non pioggia
|
||||
return "0" # Errore o servizio non riconosciuto
|
||||
|
||||
def drv_rain_sensor_get(self, gpio_id):
|
||||
"""Simula il recupero dello stato dal sensore pioggia."""
|
||||
# Restituisce lo stato del GPIO (0 o 1)
|
||||
# Per i test, simuliamo lo stato del sensore
|
||||
if time.time() % 3 == 0:
|
||||
return str(self.config.get("RAIN_GPIO_STATE", 0)) # Simula pioggia
|
||||
return str(1 - self.config.get("RAIN_GPIO_STATE", 0)) # Simula non pioggia
|
||||
|
||||
# Mock per la classe PiGarden (per le dipendenze ev_*)
|
||||
class MockPiGarden:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.solenoid_states = {} # Mappa alias a stato (0=chiuso, 1=aperto, 2=forzato)
|
||||
# Inizializza stati fittizi
|
||||
for i in range(1, self.config.get("EV_TOTAL", 0) + 1):
|
||||
self.solenoid_states[str(i)] = 0 # Tutti chiusi di default
|
||||
|
||||
def ev_status(self, alias):
|
||||
"""Simula ev_status, restituisce lo stato dell'elettrovalvola."""
|
||||
return self.solenoid_states.get(alias, 0) # 0 se non trovato o chiuso
|
||||
|
||||
def ev_close(self, alias):
|
||||
"""Simula ev_close, imposta lo stato dell'elettrovalvola a chiuso."""
|
||||
self.solenoid_states[alias] = 0
|
||||
log_write("irrigate", "info", f"MockPiGarden: Elettrovalvola '{alias}' chiusa.")
|
||||
|
||||
def ev_check_moisture(self, ev_num):
|
||||
"""
|
||||
Simula ev_check_moisture.
|
||||
Ritorna 0 se l'umidità non è stata raggiunta (bisogno d'acqua),
|
||||
>0 se l'umidità massima è stata raggiunta.
|
||||
"""
|
||||
# Per i test, simuliamo che l'umidità sia raggiunta per EV1, altrimenti no
|
||||
if ev_num == 1:
|
||||
return 1 # Umidità raggiunta
|
||||
return 0 # Umidità non raggiunta
|
||||
|
||||
def ev_check_moisture_autoclose(self, ev_num):
|
||||
"""
|
||||
Simula ev_check_moisture_autoclose.
|
||||
Ritorna 0 se non deve chiudere automaticamente, >0 se sì.
|
||||
"""
|
||||
# Per i test, simuliamo che l'autochiusura sia attiva per EV1 e l'umidità sia raggiunta
|
||||
if self.config.get(f"EV{ev_num}_SENSOR_MOISTURE_AUTOCLOSE", "0") == "1":
|
||||
return self.ev_check_moisture(ev_num) # Usa la logica di check_moisture
|
||||
return 0
|
||||
|
||||
def ev_number2norain(self, ev_num):
|
||||
"""Simula ev_number2norain."""
|
||||
return self.config.get(f"EV{ev_num}_NORAIN", "0") == "1"
|
||||
|
||||
# Variabili di configurazione simulate (dal piGarden.conf)
|
||||
mock_config_rain = {
|
||||
"WEATHER_SERVICE": "drv:openweathermap", # Esempio di driver di servizio meteo
|
||||
"RAIN_GPIO": 25,
|
||||
"RAIN_GPIO_STATE": 0, # Stato del GPIO che indica pioggia
|
||||
"NOT_IRRIGATE_IF_RAIN_ONLINE": 86400, # 24 ore in secondi
|
||||
"NOT_IRRIGATE_IF_RAIN_SENSOR": 86400, # 24 ore in secondi
|
||||
"EV_TOTAL": 6,
|
||||
"STATUS_DIR": "/tmp/piGarden_status", # Directory per i file di stato
|
||||
# Elettrovalvole di esempio per close_all_for_rain
|
||||
"EV1_ALIAS": "Zona_1", "EV1_NORAIN": "0", "EV1_SENSOR_MOISTURE_AUTOCLOSE": "1",
|
||||
"EV2_ALIAS": "Zona_2", "EV2_NORAIN": "1", # Questa zona non si chiude per pioggia
|
||||
"EV3_ALIAS": "Zona_3", "EV3_NORAIN": "0",
|
||||
"EV4_ALIAS": "Zona_4", "EV4_NORAIN": "0",
|
||||
"EV5_ALIAS": "Zona_5", "EV5_NORAIN": "0",
|
||||
"EV6_ALIAS": "Zona_6", "EV6_NORAIN": "0",
|
||||
}
|
||||
|
||||
# Assicurati che la directory di stato esista per i test
|
||||
os.makedirs(mock_config_rain["STATUS_DIR"], exist_ok=True)
|
||||
|
||||
# --- Classe RainManager ---
|
||||
|
||||
class RainManager:
|
||||
def __init__(self, config, log_writer, event_manager, driver_manager, pigarden_core):
|
||||
self.config = config
|
||||
self.log_write = log_writer
|
||||
self.event_manager = event_manager
|
||||
self.driver_manager = driver_manager
|
||||
self.pigarden_core = pigarden_core # Istanza della classe PiGarden principale
|
||||
|
||||
self.status_dir = self.config.get("STATUS_DIR")
|
||||
self.weather_service = self.config.get("WEATHER_SERVICE")
|
||||
self.rain_gpio = self.config.get("RAIN_GPIO")
|
||||
self.rain_gpio_state = self.config.get("RAIN_GPIO_STATE")
|
||||
self.not_irrigate_if_rain_online = self.config.get("NOT_IRRIGATE_IF_RAIN_ONLINE")
|
||||
self.not_irrigate_if_rain_sensor = self.config.get("NOT_IRRIGATE_IF_RAIN_SENSOR")
|
||||
self.ev_total = self.config.get("EV_TOTAL")
|
||||
|
||||
# Inizializza i file di stato se non esistono
|
||||
self._init_status_files()
|
||||
|
||||
def _init_status_files(self):
|
||||
"""Assicura che i file di stato esistano per evitare errori FileNotFoundError."""
|
||||
for filename in ["last_state_rain_online", "last_rain_online",
|
||||
"last_state_rain_sensor", "last_rain_sensor",
|
||||
"last_weather_online"]:
|
||||
file_path = os.path.join(self.status_dir, filename)
|
||||
if not os.path.exists(file_path):
|
||||
# Crea il file vuoto o con un valore di default appropriato
|
||||
with open(file_path, 'w') as f:
|
||||
if filename.startswith("last_rain_"):
|
||||
f.write("0") # Timestamp 0
|
||||
elif filename.startswith("last_state_rain_"):
|
||||
f.write("unknown")
|
||||
elif filename == "last_weather_online":
|
||||
f.write("{}") # JSON vuoto
|
||||
|
||||
def _read_status_file(self, filename, default=""):
|
||||
"""Legge il contenuto di un file di stato."""
|
||||
file_path = os.path.join(self.status_dir, filename)
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
content = f.read().strip()
|
||||
return content if content else default
|
||||
except FileNotFoundError:
|
||||
return default
|
||||
except Exception as e:
|
||||
self.log_write("rain", "error", f"Errore lettura {filename}: {e}")
|
||||
return default
|
||||
|
||||
def _write_status_file(self, filename, content):
|
||||
"""Scrive il contenuto in un file di stato."""
|
||||
file_path = os.path.join(self.status_dir, filename)
|
||||
try:
|
||||
with open(file_path, 'w') as f:
|
||||
f.write(str(content))
|
||||
except Exception as e:
|
||||
self.log_write("rain", "error", f"Errore scrittura {filename}: {e}")
|
||||
|
||||
def _delete_status_file(self, filename):
|
||||
"""Elimina un file di stato."""
|
||||
file_path = os.path.join(self.status_dir, filename)
|
||||
try:
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
except Exception as e:
|
||||
self.log_write("rain", "error", f"Errore eliminazione {filename}: {e}")
|
||||
|
||||
def check_rain_online(self):
|
||||
"""
|
||||
Esegue il controllo meteo tramite il servizio online configurato.
|
||||
"""
|
||||
if self.weather_service == "none":
|
||||
self.log_write("rain", "warning", "check_rain_online - servizio online disabilitato")
|
||||
return
|
||||
|
||||
self.event_manager.trigger_event("check_rain_online_before", "")
|
||||
|
||||
local_epoch_str = self.driver_manager.drv_rain_online_get(self.weather_service)
|
||||
current_state_rain_online = ""
|
||||
last_state_rain_online = self._read_status_file("last_state_rain_online", default="norain")
|
||||
weather_json = "{}" # Default a JSON vuoto
|
||||
|
||||
if local_epoch_str and local_epoch_str.lstrip('-').isdigit(): # Controlla se è un numero (anche negativo)
|
||||
local_epoch = int(local_epoch_str)
|
||||
if local_epoch == 0:
|
||||
self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (valore 0)")
|
||||
else:
|
||||
if local_epoch > 0:
|
||||
current_state_rain_online = 'rain'
|
||||
self._write_status_file("last_rain_online", local_epoch)
|
||||
else:
|
||||
current_state_rain_online = 'norain'
|
||||
|
||||
# Leggi il JSON meteo, se esiste e valido
|
||||
weather_data_str = self._read_status_file("last_weather_online", default="{}")
|
||||
try:
|
||||
weather_json = json.loads(weather_data_str)
|
||||
# Estrai solo la parte "weather" se presente, altrimenti l'intero JSON
|
||||
weather_display = json.dumps(weather_json.get("weather", weather_json))
|
||||
except json.JSONDecodeError:
|
||||
weather_display = "null" # Se il file non è un JSON valido
|
||||
|
||||
self.log_write("rain", "info", f"check_rain_online - weather={weather_display}, local_epoch={local_epoch}")
|
||||
|
||||
if current_state_rain_online != last_state_rain_online:
|
||||
self._write_status_file("last_state_rain_online", current_state_rain_online)
|
||||
self.event_manager.trigger_event("check_rain_online_change", current_state_rain_online, weather_display)
|
||||
else:
|
||||
self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (non un numero)")
|
||||
|
||||
self.event_manager.trigger_event("check_rain_online_after", current_state_rain_online, weather_json)
|
||||
|
||||
def check_rain_sensor(self):
|
||||
"""
|
||||
Controlla se piove tramite sensore hardware.
|
||||
"""
|
||||
if not self.rain_gpio:
|
||||
self.log_write("rain", "warning", "Sensore pioggia non presente")
|
||||
return
|
||||
|
||||
self.event_manager.trigger_event("check_rain_sensor_before", "")
|
||||
|
||||
current_state_rain_sensor = ""
|
||||
last_state_rain_sensor = self._read_status_file("last_state_rain_sensor", default="norain")
|
||||
|
||||
s_str = self.driver_manager.drv_rain_sensor_get(self.rain_gpio)
|
||||
s = int(s_str) if s_str and s_str.isdigit() else -1 # Converte in int, default a -1 se non valido
|
||||
|
||||
if s == self.rain_gpio_state: # Confronta con lo stato configurato per la pioggia
|
||||
current_state_rain_sensor = 'rain'
|
||||
local_epoch = int(time.time())
|
||||
self._write_status_file("last_rain_sensor", local_epoch)
|
||||
self.log_write("rain", "info", f"check_rain_sensor - ora sta piovendo ({local_epoch})")
|
||||
else:
|
||||
current_state_rain_sensor = 'norain'
|
||||
self.log_write("rain", "info", "check_rain_sensor - ora non sta piovendo")
|
||||
|
||||
if current_state_rain_sensor != last_state_rain_sensor:
|
||||
self._write_status_file("last_state_rain_sensor", current_state_rain_sensor)
|
||||
self.event_manager.trigger_event("check_rain_sensor_change", current_state_rain_sensor)
|
||||
|
||||
self.event_manager.trigger_event("check_rain_sensor_after", current_state_rain_sensor)
|
||||
|
||||
def close_all_for_rain(self):
|
||||
"""
|
||||
Chiude tutte le elettrovalvole se sta piovendo o se hanno raggiunto l'umidità massima.
|
||||
"""
|
||||
# Chiude le elettrovalvole che hanno raggiunto l'umidità del terreno impostata
|
||||
for i in range(1, self.ev_total + 1):
|
||||
alias = self.config.get(f"EV{i}_ALIAS")
|
||||
if not alias: continue # Salta se l'alias non è definito
|
||||
|
||||
state = self.pigarden_core.ev_status(alias)
|
||||
moisture = self.pigarden_core.ev_check_moisture_autoclose(i)
|
||||
|
||||
# Se l'elettrovalvola è aperta (stato 1) e l'umidità massima è stata raggiunta (moisture > 0)
|
||||
if state == 1 and moisture > 0:
|
||||
self.pigarden_core.ev_close(alias)
|
||||
self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' perché l'umidità massima del terreno è stata raggiunta")
|
||||
|
||||
# Chiude le elettrovalvole in caso di pioggia (online o sensore)
|
||||
close_all_flag = False
|
||||
now = int(time.time())
|
||||
|
||||
# Controllo pioggia online
|
||||
if self.not_irrigate_if_rain_online > 0:
|
||||
last_rain_online_str = self._read_status_file("last_rain_online", default="0")
|
||||
try:
|
||||
last_rain_online = int(last_rain_online_str)
|
||||
if now - last_rain_online < self.not_irrigate_if_rain_online:
|
||||
close_all_flag = True
|
||||
except ValueError:
|
||||
pass # Ignora se il timestamp non è un numero
|
||||
|
||||
# Controllo pioggia sensore
|
||||
if self.not_irrigate_if_rain_sensor > 0:
|
||||
last_rain_sensor_str = self._read_status_file("last_rain_sensor", default="0")
|
||||
try:
|
||||
last_rain_sensor = int(last_rain_sensor_str)
|
||||
if now - last_rain_sensor < self.not_irrigate_if_rain_sensor:
|
||||
close_all_flag = True
|
||||
except ValueError:
|
||||
pass # Ignora se il timestamp non è un numero
|
||||
|
||||
if close_all_flag:
|
||||
# Piove: valuta se chiudere le elettrovalvole
|
||||
for i in range(1, self.ev_total + 1):
|
||||
alias = self.config.get(f"EV{i}_ALIAS")
|
||||
if not alias: continue
|
||||
|
||||
state = self.pigarden_core.ev_status(alias)
|
||||
ev_norain = self.pigarden_core.ev_number2norain(i) # True se non deve chiudere per pioggia
|
||||
moisture = self.pigarden_core.ev_check_moisture(i) # 0 se non ha raggiunto l'umidità ottimale
|
||||
|
||||
# Se l'elettrovalvola è aperta (stato 1), NON è impostata per ignorare la pioggia (ev_norain è False),
|
||||
# E l'umidità non è ancora ottimale (moisture è 0 o non ha raggiunto il max)
|
||||
# La logica Bash `[ "$moisture" -ne 0 ]` significa "se l'umidità NON è zero",
|
||||
# che nel contesto di `ev_check_moisture` (che ritorna 0 per "non raggiunta", >0 per "raggiunta")
|
||||
# significherebbe "se l'umidità è stata raggiunta".
|
||||
# Tuttavia, il commento nello script Bash `if [ $moisture -gt 0 ]; then message_write "warning" "solenoid not open because maximum soil moisture has been reached"`
|
||||
# suggerisce che >0 significa "umidità massima raggiunta".
|
||||
# Quindi, per chiudere per pioggia, l'umidità NON deve essere già al massimo.
|
||||
# Se `ev_check_moisture` ritorna 0 per "non raggiunto" e >0 per "raggiunto",
|
||||
# allora `moisture == 0` significa "ha ancora bisogno d'acqua".
|
||||
# Quindi, la condizione per chiudere per pioggia dovrebbe essere:
|
||||
# `state == 1` (aperta) AND `not ev_norain` (non ignora pioggia) AND `moisture == 0` (ha ancora bisogno d'acqua)
|
||||
# Il Bash `[ "$moisture" -ne 0 ]` nella seconda loop di close_all_for_rain è contro-intuitivo
|
||||
# se `ev_check_moisture` ritorna >0 per "raggiunto".
|
||||
# Assumo che `moisture -ne 0` in quel contesto significhi "se l'umidità non è perfetta, chiudi".
|
||||
# Se `ev_check_moisture` ritorna 0 per "umidità OK/raggiunta" e 1 per "non OK", allora -ne 0 ha senso.
|
||||
# Basandomi sulla funzione `ev_open` che usa `moisture -gt 0` per bloccare l'apertura (umidità già alta),
|
||||
# `moisture -ne 0` qui dovrebbe significare "se l'umidità non è ancora ottimale/non è zero".
|
||||
# Se 0 significa "umidità OK", allora -ne 0 significa "umidità NON OK".
|
||||
# Adotterò la traduzione letterale di `moisture != 0` e lascerò al mock di `ev_check_moisture` di definire il comportamento.
|
||||
|
||||
if state == 1 and not ev_norain and moisture != 0:
|
||||
self.pigarden_core.ev_close(alias)
|
||||
self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' per pioggia")
|
||||
|
||||
def last_rain_sensor_timestamp(self):
|
||||
"""Mostra il timestamp dell'ultima pioggia rilevato dal sensore."""
|
||||
return self._read_status_file("last_rain_sensor", default="0")
|
||||
|
||||
def last_rain_online_timestamp(self):
|
||||
"""Mostra il timestamp dell'ultima pioggia rilevato dal servizio online."""
|
||||
return self._read_status_file("last_rain_online", default="0")
|
||||
|
||||
def reset_last_rain_sensor_timestamp(self):
|
||||
"""Resetta il timestamp dell'ultima pioggia rilevato dal sensore."""
|
||||
self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_before", "")
|
||||
self._delete_status_file("last_rain_sensor")
|
||||
self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_after", "")
|
||||
self.log_write("rain", "info", "reset_last_rain_sensor_timestamp")
|
||||
|
||||
def reset_last_rain_online_timestamp(self):
|
||||
"""Resetta il timestamp dell'ultima pioggia rilevato dal servizio online."""
|
||||
self.event_manager.trigger_event("reset_last_rain_online_timestamp_before", "")
|
||||
self._delete_status_file("last_rain_online")
|
||||
# Il Bash script chiama trigger_event due volte con _before, correggo a _after
|
||||
self.event_manager.trigger_event("reset_last_rain_online_timestamp_after", "")
|
||||
self.log_write("rain", "info", "reset_last_rain_online_timestamp")
|
||||
|
||||
# --- Esempio di utilizzo ---
|
||||
if __name__ == "__main__":
|
||||
print("--- Test RainManager ---")
|
||||
|
||||
# Inizializza le dipendenze mock
|
||||
mock_event_manager = MockEventManager()
|
||||
mock_driver_manager = MockDriverManager(mock_config_rain)
|
||||
mock_pigarden_core = MockPiGarden(mock_config_rain)
|
||||
|
||||
# Inizializza RainManager
|
||||
rain_manager = RainManager(
|
||||
config=mock_config_rain,
|
||||
log_writer=log_write,
|
||||
event_manager=mock_event_manager,
|
||||
driver_manager=mock_driver_manager,
|
||||
pigarden_core=mock_pigarden_core
|
||||
)
|
||||
|
||||
# --- Test check_rain_online ---
|
||||
print("\n--- Test: check_rain_online (potrebbe simulare pioggia o no) ---")
|
||||
rain_manager.check_rain_online()
|
||||
print(f"Stato pioggia online (file): {rain_manager.last_rain_online_timestamp()}")
|
||||
print(f"Stato ultimo rilevamento online (file): {rain_manager._read_status_file('last_state_rain_online')}")
|
||||
|
||||
# --- Test check_rain_sensor ---
|
||||
print("\n--- Test: check_rain_sensor (potrebbe simulare pioggia o no) ---")
|
||||
rain_manager.check_rain_sensor()
|
||||
print(f"Stato pioggia sensore (file): {rain_manager.last_rain_sensor_timestamp()}")
|
||||
print(f"Stato ultimo rilevamento sensore (file): {rain_manager._read_status_file('last_state_rain_sensor')}")
|
||||
|
||||
# --- Test close_all_for_rain ---
|
||||
print("\n--- Test: close_all_for_rain ---")
|
||||
# Imposta un'elettrovalvola aperta per il test
|
||||
mock_pigarden_core.solenoid_states["Zona_1"] = 1
|
||||
mock_pigarden_core.solenoid_states["Zona_3"] = 1
|
||||
print(f"Stato iniziale Zona_1: {mock_pigarden_core.ev_status('Zona_1')}")
|
||||
print(f"Stato iniziale Zona_3: {mock_pigarden_core.ev_status('Zona_3')}")
|
||||
|
||||
# Simula una pioggia recente per attivare la chiusura
|
||||
rain_manager._write_status_file("last_rain_online", int(time.time()) - 100) # 100 secondi fa
|
||||
rain_manager._write_status_file("last_rain_sensor", int(time.time()) - 50) # 50 secondi fa
|
||||
|
||||
rain_manager.close_all_for_rain()
|
||||
print(f"Stato finale Zona_1: {mock_pigarden_core.ev_status('Zona_1')} (dovrebbe essere 0 se autoclose è attivo o piove)")
|
||||
print(f"Stato finale Zona_3: {mock_pigarden_core.ev_status('Zona_3')} (dovrebbe essere 0 se piove)")
|
||||
|
||||
# --- Test reset timestamp ---
|
||||
print("\n--- Test: reset_last_rain_sensor_timestamp ---")
|
||||
rain_manager.reset_last_rain_sensor_timestamp()
|
||||
print(f"Timestamp sensore dopo reset: {rain_manager.last_rain_sensor_timestamp()}")
|
||||
|
||||
print("\n--- Test: reset_last_rain_online_timestamp ---")
|
||||
rain_manager.reset_last_rain_online_timestamp()
|
||||
print(f"Timestamp online dopo reset: {rain_manager.last_rain_online_timestamp()}")
|
||||
|
||||
print("\n--- Test completato ---")
|
||||
# Pulisci le directory di test (opzionale)
|
||||
# import shutil
|
||||
# shutil.rmtree(mock_config_rain["STATUS_DIR"])
|
||||
432
Python/sensor_include.py
Normal file
432
Python/sensor_include.py
Normal file
@@ -0,0 +1,432 @@
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import logging
|
||||
|
||||
# --- Mock/Placeholder per le dipendenze esterne e configurazione ---
|
||||
# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale.
|
||||
|
||||
# Configura un logger di base per le funzioni di log
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def log_write(log_type, level, message):
|
||||
"""Simula la funzione log_write dal tuo script Bash."""
|
||||
if level == "info":
|
||||
logging.info(f"[{log_type}] {message}")
|
||||
elif level == "warning":
|
||||
logging.warning(f"[{log_type}] {message}")
|
||||
elif level == "error":
|
||||
logging.error(f"[{log_type}] {message}")
|
||||
else:
|
||||
logging.debug(f"[{log_type}] {message}")
|
||||
|
||||
# Mock per la classe EventManager (dal file event_manager_py)
|
||||
class MockEventManager:
|
||||
def __init__(self):
|
||||
self.CURRENT_EVENT = ""
|
||||
self.CURRENT_EVENT_ALIAS = ""
|
||||
|
||||
def trigger_event(self, event, *args):
|
||||
log_write("event", "info", f"Mock EventManager: Triggered event: {event} with args: {args}")
|
||||
self.CURRENT_EVENT = event
|
||||
# Simula un codice di ritorno di successo
|
||||
return 0
|
||||
|
||||
# Mock per la classe DriverManager (dal file driver_manager_py)
|
||||
class MockDriverManager:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def drv_rain_online_get(self, service_id):
|
||||
"""Simula il recupero dello stato pioggia online."""
|
||||
# Restituisce un timestamp se piove, un valore negativo se non piove, o 0 in caso di errore
|
||||
# Per i test, simuliamo che non piova (-1) o che piova (timestamp attuale)
|
||||
if "openweathermap" in service_id:
|
||||
# Simula pioggia 50% delle volte
|
||||
if time.time() % 2 == 0:
|
||||
# Scrivi un mock di dati meteo online per il test
|
||||
mock_weather_data = {
|
||||
"weather": [{"description": "light rain"}],
|
||||
"main": {"temp": 280, "humidity": 90}
|
||||
}
|
||||
with open(os.path.join(self.config["STATUS_DIR"], "last_weather_online"), "w") as f:
|
||||
json.dump(mock_weather_data, f)
|
||||
return str(int(time.time())) # Simula pioggia (timestamp)
|
||||
else:
|
||||
return "-1" # Simula non pioggia
|
||||
return "0" # Errore o servizio non riconosciuto
|
||||
|
||||
def drv_rain_sensor_get(self, gpio_id):
|
||||
"""Simula il recupero dello stato dal sensore pioggia."""
|
||||
# Restituisce lo stato del GPIO (0 o 1)
|
||||
# Per i test, simuliamo lo stato del sensore
|
||||
if time.time() % 3 == 0:
|
||||
return str(self.config.get("RAIN_GPIO_STATE", 0)) # Simula pioggia
|
||||
return str(1 - self.config.get("RAIN_GPIO_STATE", 0)) # Simula non pioggia
|
||||
|
||||
# Mock per la classe PiGarden (per le dipendenze ev_*)
|
||||
class MockPiGarden:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.solenoid_states = {} # Mappa alias a stato (0=chiuso, 1=aperto, 2=forzato)
|
||||
# Inizializza stati fittizi
|
||||
for i in range(1, self.config.get("EV_TOTAL", 0) + 1):
|
||||
self.solenoid_states[str(i)] = 0 # Tutti chiusi di default
|
||||
|
||||
def ev_status(self, alias):
|
||||
"""Simula ev_status, restituisce lo stato dell'elettrovalvola."""
|
||||
return self.solenoid_states.get(alias, 0) # 0 se non trovato o chiuso
|
||||
|
||||
def ev_close(self, alias):
|
||||
"""Simula ev_close, imposta lo stato dell'elettrovalvola a chiuso."""
|
||||
self.solenoid_states[alias] = 0
|
||||
log_write("irrigate", "info", f"MockPiGarden: Elettrovalvola '{alias}' chiusa.")
|
||||
|
||||
def ev_check_moisture(self, ev_num):
|
||||
"""
|
||||
Simula ev_check_moisture.
|
||||
Ritorna 0 se l'umidità non è stata raggiunta (bisogno d'acqua),
|
||||
>0 se l'umidità massima è stata raggiunta.
|
||||
"""
|
||||
# Per i test, simuliamo che l'umidità sia raggiunta per EV1, altrimenti no
|
||||
if ev_num == 1:
|
||||
return 1 # Umidità raggiunta
|
||||
return 0 # Umidità non raggiunta
|
||||
|
||||
def ev_check_moisture_autoclose(self, ev_num):
|
||||
"""
|
||||
Simula ev_check_moisture_autoclose.
|
||||
Ritorna 0 se non deve chiudere automaticamente, >0 se sì.
|
||||
"""
|
||||
# Per i test, simuliamo che l'autochiusura sia attiva per EV1 e l'umidità sia raggiunta
|
||||
if self.config.get(f"EV{ev_num}_SENSOR_MOISTURE_AUTOCLOSE", "0") == "1":
|
||||
return self.ev_check_moisture(ev_num) # Usa la logica di check_moisture
|
||||
return 0
|
||||
|
||||
def ev_number2norain(self, ev_num):
|
||||
"""Simula ev_number2norain."""
|
||||
return self.config.get(f"EV{ev_num}_NORAIN", "0") == "1"
|
||||
|
||||
# Variabili di configurazione simulate (dal piGarden.conf)
|
||||
mock_config_rain = {
|
||||
"WEATHER_SERVICE": "drv:openweathermap", # Esempio di driver di servizio meteo
|
||||
"RAIN_GPIO": 25,
|
||||
"RAIN_GPIO_STATE": 0, # Stato del GPIO che indica pioggia
|
||||
"NOT_IRRIGATE_IF_RAIN_ONLINE": 86400, # 24 ore in secondi
|
||||
"NOT_IRRIGATE_IF_RAIN_SENSOR": 86400, # 24 ore in secondi
|
||||
"EV_TOTAL": 6,
|
||||
"STATUS_DIR": "/tmp/piGarden_status", # Directory per i file di stato
|
||||
# Elettrovalvole di esempio per close_all_for_rain
|
||||
"EV1_ALIAS": "Zona_1", "EV1_NORAIN": "0", "EV1_SENSOR_MOISTURE_AUTOCLOSE": "1",
|
||||
"EV2_ALIAS": "Zona_2", "EV2_NORAIN": "1", # Questa zona non si chiude per pioggia
|
||||
"EV3_ALIAS": "Zona_3", "EV3_NORAIN": "0",
|
||||
"EV4_ALIAS": "Zona_4", "EV4_NORAIN": "0",
|
||||
"EV5_ALIAS": "Zona_5", "EV5_NORAIN": "0",
|
||||
"EV6_ALIAS": "Zona_6", "EV6_NORAIN": "0",
|
||||
}
|
||||
|
||||
# Assicurati che la directory di stato esista per i test
|
||||
os.makedirs(mock_config_rain["STATUS_DIR"], exist_ok=True)
|
||||
|
||||
# --- Classe RainManager ---
|
||||
|
||||
class RainManager:
|
||||
def __init__(self, config, log_writer, event_manager, driver_manager, pigarden_core):
|
||||
self.config = config
|
||||
self.log_write = log_writer
|
||||
self.event_manager = event_manager
|
||||
self.driver_manager = driver_manager
|
||||
self.pigarden_core = pigarden_core # Istanza della classe PiGarden principale
|
||||
|
||||
self.status_dir = self.config.get("STATUS_DIR")
|
||||
self.weather_service = self.config.get("WEATHER_SERVICE")
|
||||
self.rain_gpio = self.config.get("RAIN_GPIO")
|
||||
self.rain_gpio_state = self.config.get("RAIN_GPIO_STATE")
|
||||
self.not_irrigate_if_rain_online = self.config.get("NOT_IRRIGATE_IF_RAIN_ONLINE")
|
||||
self.not_irrigate_if_rain_sensor = self.config.get("NOT_IRRIGATE_IF_RAIN_SENSOR")
|
||||
self.ev_total = self.config.get("EV_TOTAL")
|
||||
|
||||
# Inizializza i file di stato se non esistono
|
||||
self._init_status_files()
|
||||
|
||||
def _init_status_files(self):
|
||||
"""Assicura che i file di stato esistano per evitare errori FileNotFoundError."""
|
||||
for filename in ["last_state_rain_online", "last_rain_online",
|
||||
"last_state_rain_sensor", "last_rain_sensor",
|
||||
"last_weather_online"]:
|
||||
file_path = os.path.join(self.status_dir, filename)
|
||||
if not os.path.exists(file_path):
|
||||
# Crea il file vuoto o con un valore di default appropriato
|
||||
with open(file_path, 'w') as f:
|
||||
if filename.startswith("last_rain_"):
|
||||
f.write("0") # Timestamp 0
|
||||
elif filename.startswith("last_state_rain_"):
|
||||
f.write("unknown")
|
||||
elif filename == "last_weather_online":
|
||||
f.write("{}") # JSON vuoto
|
||||
|
||||
def _read_status_file(self, filename, default=""):
|
||||
"""Legge il contenuto di un file di stato."""
|
||||
file_path = os.path.join(self.status_dir, filename)
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
content = f.read().strip()
|
||||
return content if content else default
|
||||
except FileNotFoundError:
|
||||
return default
|
||||
except Exception as e:
|
||||
self.log_write("rain", "error", f"Errore lettura {filename}: {e}")
|
||||
return default
|
||||
|
||||
def _write_status_file(self, filename, content):
|
||||
"""Scrive il contenuto in un file di stato."""
|
||||
file_path = os.path.join(self.status_dir, filename)
|
||||
try:
|
||||
with open(file_path, 'w') as f:
|
||||
f.write(str(content))
|
||||
except Exception as e:
|
||||
self.log_write("rain", "error", f"Errore scrittura {filename}: {e}")
|
||||
|
||||
def _delete_status_file(self, filename):
|
||||
"""Elimina un file di stato."""
|
||||
file_path = os.path.join(self.status_dir, filename)
|
||||
try:
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
except Exception as e:
|
||||
self.log_write("rain", "error", f"Errore eliminazione {filename}: {e}")
|
||||
|
||||
def check_rain_online(self):
|
||||
"""
|
||||
Esegue il controllo meteo tramite il servizio online configurato.
|
||||
"""
|
||||
if self.weather_service == "none":
|
||||
self.log_write("rain", "warning", "check_rain_online - servizio online disabilitato")
|
||||
return
|
||||
|
||||
self.event_manager.trigger_event("check_rain_online_before", "")
|
||||
|
||||
local_epoch_str = self.driver_manager.drv_rain_online_get(self.weather_service)
|
||||
current_state_rain_online = ""
|
||||
last_state_rain_online = self._read_status_file("last_state_rain_online", default="norain")
|
||||
weather_json = "{}" # Default a JSON vuoto
|
||||
|
||||
if local_epoch_str and local_epoch_str.lstrip('-').isdigit(): # Controlla se è un numero (anche negativo)
|
||||
local_epoch = int(local_epoch_str)
|
||||
if local_epoch == 0:
|
||||
self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (valore 0)")
|
||||
else:
|
||||
if local_epoch > 0:
|
||||
current_state_rain_online = 'rain'
|
||||
self._write_status_file("last_rain_online", local_epoch)
|
||||
else:
|
||||
current_state_rain_online = 'norain'
|
||||
|
||||
# Leggi il JSON meteo, se esiste e valido
|
||||
weather_data_str = self._read_status_file("last_weather_online", default="{}")
|
||||
try:
|
||||
weather_json = json.loads(weather_data_str)
|
||||
# Estrai solo la parte "weather" se presente, altrimenti l'intero JSON
|
||||
weather_display = json.dumps(weather_json.get("weather", weather_json))
|
||||
except json.JSONDecodeError:
|
||||
weather_display = "null" # Se il file non è un JSON valido
|
||||
|
||||
self.log_write("rain", "info", f"check_rain_online - weather={weather_display}, local_epoch={local_epoch}")
|
||||
|
||||
if current_state_rain_online != last_state_rain_online:
|
||||
self._write_status_file("last_state_rain_online", current_state_rain_online)
|
||||
self.event_manager.trigger_event("check_rain_online_change", current_state_rain_online, weather_display)
|
||||
else:
|
||||
self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (non un numero)")
|
||||
|
||||
self.event_manager.trigger_event("check_rain_online_after", current_state_rain_online, weather_json)
|
||||
|
||||
def check_rain_sensor(self):
|
||||
"""
|
||||
Controlla se piove tramite sensore hardware.
|
||||
"""
|
||||
if not self.rain_gpio:
|
||||
self.log_write("rain", "warning", "Sensore pioggia non presente")
|
||||
return
|
||||
|
||||
self.event_manager.trigger_event("check_rain_sensor_before", "")
|
||||
|
||||
current_state_rain_sensor = ""
|
||||
last_state_rain_sensor = self._read_status_file("last_state_rain_sensor", default="norain")
|
||||
|
||||
s_str = self.driver_manager.drv_rain_sensor_get(self.rain_gpio)
|
||||
s = int(s_str) if s_str and s_str.isdigit() else -1 # Converte in int, default a -1 se non valido
|
||||
|
||||
if s == self.rain_gpio_state: # Confronta con lo stato configurato per la pioggia
|
||||
current_state_rain_sensor = 'rain'
|
||||
local_epoch = int(time.time())
|
||||
self._write_status_file("last_rain_sensor", local_epoch)
|
||||
self.log_write("rain", "info", f"check_rain_sensor - ora sta piovendo ({local_epoch})")
|
||||
else:
|
||||
current_state_rain_sensor = 'norain'
|
||||
self.log_write("rain", "info", "check_rain_sensor - ora non sta piovendo")
|
||||
|
||||
if current_state_rain_sensor != last_state_rain_sensor:
|
||||
self._write_status_file("last_state_rain_sensor", current_state_rain_sensor)
|
||||
self.event_manager.trigger_event("check_rain_sensor_change", current_state_rain_sensor)
|
||||
|
||||
self.event_manager.trigger_event("check_rain_sensor_after", current_state_rain_sensor)
|
||||
|
||||
def close_all_for_rain(self):
|
||||
"""
|
||||
Chiude tutte le elettrovalvole se sta piovendo o se hanno raggiunto l'umidità massima.
|
||||
"""
|
||||
# Chiude le elettrovalvole che hanno raggiunto l'umidità del terreno impostata
|
||||
for i in range(1, self.ev_total + 1):
|
||||
alias = self.config.get(f"EV{i}_ALIAS")
|
||||
if not alias: continue # Salta se l'alias non è definito
|
||||
|
||||
state = self.pigarden_core.ev_status(alias)
|
||||
moisture = self.pigarden_core.ev_check_moisture_autoclose(i)
|
||||
|
||||
# Se l'elettrovalvola è aperta (stato 1) e l'umidità massima è stata raggiunta (moisture > 0)
|
||||
if state == 1 and moisture > 0:
|
||||
self.pigarden_core.ev_close(alias)
|
||||
self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' perché l'umidità massima del terreno è stata raggiunta")
|
||||
|
||||
# Chiude le elettrovalvole in caso di pioggia (online o sensore)
|
||||
close_all_flag = False
|
||||
now = int(time.time())
|
||||
|
||||
# Controllo pioggia online
|
||||
if self.not_irrigate_if_rain_online > 0:
|
||||
last_rain_online_str = self._read_status_file("last_rain_online", default="0")
|
||||
try:
|
||||
last_rain_online = int(last_rain_online_str)
|
||||
if now - last_rain_online < self.not_irrigate_if_rain_online:
|
||||
close_all_flag = True
|
||||
except ValueError:
|
||||
pass # Ignora se il timestamp non è un numero
|
||||
|
||||
# Controllo pioggia sensore
|
||||
if self.not_irrigate_if_rain_sensor > 0:
|
||||
last_rain_sensor_str = self._read_status_file("last_rain_sensor", default="0")
|
||||
try:
|
||||
last_rain_sensor = int(last_rain_sensor_str)
|
||||
if now - last_rain_sensor < self.not_irrigate_if_rain_sensor:
|
||||
close_all_flag = True
|
||||
except ValueError:
|
||||
pass # Ignora se il timestamp non è un numero
|
||||
|
||||
if close_all_flag:
|
||||
# Piove: valuta se chiudere le elettrovalvole
|
||||
for i in range(1, self.ev_total + 1):
|
||||
alias = self.config.get(f"EV{i}_ALIAS")
|
||||
if not alias: continue
|
||||
|
||||
state = self.pigarden_core.ev_status(alias)
|
||||
ev_norain = self.pigarden_core.ev_number2norain(i) # True se non deve chiudere per pioggia
|
||||
moisture = self.pigarden_core.ev_check_moisture(i) # 0 se non ha raggiunto l'umidità ottimale
|
||||
|
||||
# Se l'elettrovalvola è aperta (stato 1), NON è impostata per ignorare la pioggia (ev_norain è False),
|
||||
# E l'umidità non è ancora ottimale (moisture è 0 o non ha raggiunto il max)
|
||||
# La logica Bash `[ "$moisture" -ne 0 ]` significa "se l'umidità NON è zero",
|
||||
# che nel contesto di `ev_check_moisture` (che ritorna 0 per "non raggiunta", >0 per "raggiunta")
|
||||
# significherebbe "se l'umidità è stata raggiunta".
|
||||
# Tuttavia, il commento nello script Bash `if [ $moisture -gt 0 ]; then message_write "warning" "solenoid not open because maximum soil moisture has been reached"`
|
||||
# suggerisce che >0 significa "umidità massima raggiunta".
|
||||
# Quindi, per chiudere per pioggia, l'umidità NON deve essere già al massimo.
|
||||
# Se `ev_check_moisture` ritorna 0 per "non raggiunto" e >0 per "raggiunto",
|
||||
# allora `moisture == 0` significa "ha ancora bisogno d'acqua".
|
||||
# Quindi, la condizione per chiudere per pioggia dovrebbe essere:
|
||||
# `state == 1` (aperta) AND `not ev_norain` (non ignora pioggia) AND `moisture == 0` (ha ancora bisogno d'acqua)
|
||||
# Il Bash `[ "$moisture" -ne 0 ]` nella seconda loop di close_all_for_rain è contro-intuitivo
|
||||
# se `ev_check_moisture` ritorna >0 per "raggiunto".
|
||||
# Assumo che `moisture -ne 0` in quel contesto significhi "se l'umidità non è perfetta, chiudi".
|
||||
# Se `ev_check_moisture` ritorna 0 per "umidità OK/raggiunta" e 1 per "non OK", allora -ne 0 ha senso.
|
||||
# Basandomi sulla funzione `ev_open` che usa `moisture -gt 0` per bloccare l'apertura (umidità già alta),
|
||||
# `moisture -ne 0` qui dovrebbe significare "se l'umidità non è ancora ottimale/non è zero".
|
||||
# Se 0 significa "umidità OK", allora -ne 0 significa "umidità NON OK".
|
||||
# Adotterò la traduzione letterale di `moisture != 0` e lascerò al mock di `ev_check_moisture` di definire il comportamento.
|
||||
|
||||
if state == 1 and not ev_norain and moisture != 0:
|
||||
self.pigarden_core.ev_close(alias)
|
||||
self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' per pioggia")
|
||||
|
||||
def last_rain_sensor_timestamp(self):
|
||||
"""Mostra il timestamp dell'ultima pioggia rilevato dal sensore."""
|
||||
return self._read_status_file("last_rain_sensor", default="0")
|
||||
|
||||
def last_rain_online_timestamp(self):
|
||||
"""Mostra il timestamp dell'ultima pioggia rilevato dal servizio online."""
|
||||
return self._read_status_file("last_rain_online", default="0")
|
||||
|
||||
def reset_last_rain_sensor_timestamp(self):
|
||||
"""Resetta il timestamp dell'ultima pioggia rilevato dal sensore."""
|
||||
self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_before", "")
|
||||
self._delete_status_file("last_rain_sensor")
|
||||
self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_after", "")
|
||||
self.log_write("rain", "info", "reset_last_rain_sensor_timestamp")
|
||||
|
||||
def reset_last_rain_online_timestamp(self):
|
||||
"""Resetta il timestamp dell'ultima pioggia rilevato dal servizio online."""
|
||||
self.event_manager.trigger_event("reset_last_rain_online_timestamp_before", "")
|
||||
self._delete_status_file("last_rain_online")
|
||||
# Il Bash script chiama trigger_event due volte con _before, correggo a _after
|
||||
self.event_manager.trigger_event("reset_last_rain_online_timestamp_after", "")
|
||||
self.log_write("rain", "info", "reset_last_rain_online_timestamp")
|
||||
|
||||
# --- Esempio di utilizzo ---
|
||||
if __name__ == "__main__":
|
||||
print("--- Test RainManager ---")
|
||||
|
||||
# Inizializza le dipendenze mock
|
||||
mock_event_manager = MockEventManager()
|
||||
mock_driver_manager = MockDriverManager(mock_config_rain)
|
||||
mock_pigarden_core = MockPiGarden(mock_config_rain)
|
||||
|
||||
# Inizializza RainManager
|
||||
rain_manager = RainManager(
|
||||
config=mock_config_rain,
|
||||
log_writer=log_write,
|
||||
event_manager=mock_event_manager,
|
||||
driver_manager=mock_driver_manager,
|
||||
pigarden_core=mock_pigarden_core
|
||||
)
|
||||
|
||||
# --- Test check_rain_online ---
|
||||
print("\n--- Test: check_rain_online (potrebbe simulare pioggia o no) ---")
|
||||
rain_manager.check_rain_online()
|
||||
print(f"Stato pioggia online (file): {rain_manager.last_rain_online_timestamp()}")
|
||||
print(f"Stato ultimo rilevamento online (file): {rain_manager._read_status_file('last_state_rain_online')}")
|
||||
|
||||
# --- Test check_rain_sensor ---
|
||||
print("\n--- Test: check_rain_sensor (potrebbe simulare pioggia o no) ---")
|
||||
rain_manager.check_rain_sensor()
|
||||
print(f"Stato pioggia sensore (file): {rain_manager.last_rain_sensor_timestamp()}")
|
||||
print(f"Stato ultimo rilevamento sensore (file): {rain_manager._read_status_file('last_state_rain_sensor')}")
|
||||
|
||||
# --- Test close_all_for_rain ---
|
||||
print("\n--- Test: close_all_for_rain ---")
|
||||
# Imposta un'elettrovalvola aperta per il test
|
||||
mock_pigarden_core.solenoid_states["Zona_1"] = 1
|
||||
mock_pigarden_core.solenoid_states["Zona_3"] = 1
|
||||
print(f"Stato iniziale Zona_1: {mock_pigarden_core.ev_status('Zona_1')}")
|
||||
print(f"Stato iniziale Zona_3: {mock_pigarden_core.ev_status('Zona_3')}")
|
||||
|
||||
# Simula una pioggia recente per attivare la chiusura
|
||||
rain_manager._write_status_file("last_rain_online", int(time.time()) - 100) # 100 secondi fa
|
||||
rain_manager._write_status_file("last_rain_sensor", int(time.time()) - 50) # 50 secondi fa
|
||||
|
||||
rain_manager.close_all_for_rain()
|
||||
print(f"Stato finale Zona_1: {mock_pigarden_core.ev_status('Zona_1')} (dovrebbe essere 0 se autoclose è attivo o piove)")
|
||||
print(f"Stato finale Zona_3: {mock_pigarden_core.ev_status('Zona_3')} (dovrebbe essere 0 se piove)")
|
||||
|
||||
# --- Test reset timestamp ---
|
||||
print("\n--- Test: reset_last_rain_sensor_timestamp ---")
|
||||
rain_manager.reset_last_rain_sensor_timestamp()
|
||||
print(f"Timestamp sensore dopo reset: {rain_manager.last_rain_sensor_timestamp()}")
|
||||
|
||||
print("\n--- Test: reset_last_rain_online_timestamp ---")
|
||||
rain_manager.reset_last_rain_online_timestamp()
|
||||
print(f"Timestamp online dopo reset: {rain_manager.last_rain_online_timestamp()}")
|
||||
|
||||
print("\n--- Test completato ---")
|
||||
# Pulisci le directory di test (opzionale)
|
||||
# import shutil
|
||||
# shutil.rmtree(mock_config_rain["STATUS_DIR"])
|
||||
525
Python/socket.include.py
Normal file
525
Python/socket.include.py
Normal file
@@ -0,0 +1,525 @@
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
import socket
|
||||
import socketserver
|
||||
import time
|
||||
import sys
|
||||
|
||||
# --- CONFIGURAZIONE (DA ADATTARE ALLE TUE ESIGENZE) ---
|
||||
# Queste variabili dovrebbero essere configurate in un file di configurazione separato
|
||||
# o passate come argomenti al tuo script Python.
|
||||
|
||||
TCPSERVER_PID_FILE = "/var/run/my_socket_server.pid"
|
||||
TCPSERVER_IP = "0.0.0.0" # Ascolta su tutte le interfacce
|
||||
TCPSERVER_PORT = 12345 # Porta del server socket
|
||||
|
||||
TCPSERVER_USER = "admin" # Credenziali opzionali per l'autenticazione
|
||||
TCPSERVER_PWD = "password123"
|
||||
|
||||
# Variabili usate internamente dallo script Bash, che in Python diventano parametri o logica
|
||||
RUN_FROM_TCPSERVER = False # Sarà True quando il comando è eseguito dal server socket
|
||||
|
||||
# Definisci i percorsi per i comandi esterni (se non sono nel PATH di sistema)
|
||||
# In un ambiente di produzione, è meglio specificare percorsi assoluti.
|
||||
# Ad esempio, TR_COMMAND = "/usr/bin/tr"
|
||||
TR_COMMAND = "tr"
|
||||
CUT_COMMAND = "cut"
|
||||
READLINK_COMMAND = "readlink"
|
||||
|
||||
|
||||
# --- FUNZIONI UTILITY (PLACEHOLDERS) ---
|
||||
# Queste sono le funzioni che erano richiamate dallo script Bash
|
||||
# Dovrai implementarle in Python o collegarle alle tue librerie esistenti.
|
||||
|
||||
def log_write(source, level, message):
|
||||
"""Placeholder per la funzione di logging."""
|
||||
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(f"[{timestamp}] [{source}] [{level.upper()}] {message}")
|
||||
|
||||
def json_error(code, message):
|
||||
"""Placeholder per la funzione che genera un JSON di errore."""
|
||||
print(f'{{"status": "error", "code": {code}, "message": "{message}"}}')
|
||||
|
||||
def json_status(*args):
|
||||
"""Placeholder per la funzione che genera un JSON di stato."""
|
||||
if args:
|
||||
print(f'{{"status": "success", "data": "{", ".join(args)}"}}')
|
||||
else:
|
||||
print('{"status": "success"}')
|
||||
|
||||
def message_write(level, message):
|
||||
"""Placeholder per la funzione che scrive un messaggio."""
|
||||
print(f"[{level.upper()}] {message}")
|
||||
|
||||
def ev_open(alias, param=None):
|
||||
"""Placeholder per la funzione di apertura valvola."""
|
||||
log_write("socket_server", "info", f"Executing ev_open for alias: {alias}, param: {param}")
|
||||
# Qui andrebbe la logica per aprire la valvola
|
||||
pass
|
||||
|
||||
def ev_open_in(arg2, arg3, arg4, arg5):
|
||||
"""Placeholder per la funzione di apertura valvola in un intervallo."""
|
||||
log_write("socket_server", "info", f"Executing ev_open_in with args: {arg2}, {arg3}, {arg4}, {arg5}")
|
||||
# Qui andrebbe la logica per aprire la valvola in un intervallo
|
||||
pass
|
||||
|
||||
def ev_close(alias):
|
||||
"""Placeholder per la funzione di chiusura valvola."""
|
||||
log_write("socket_server", "info", f"Executing ev_close for alias: {alias}")
|
||||
# Qui andrebbe la logica per chiudere la valvola
|
||||
pass
|
||||
|
||||
def close_all():
|
||||
"""Placeholder per la funzione di chiusura di tutte le valvole."""
|
||||
log_write("socket_server", "info", "Executing close_all")
|
||||
# Qui andrebbe la logica per chiudere tutte le valvole
|
||||
pass
|
||||
|
||||
def cron_disable_all_open_close():
|
||||
"""Placeholder per la funzione di disabilitazione scheduling cron."""
|
||||
log_write("socket_server", "info", "Executing cron_disable_all_open_close")
|
||||
# Qui andrebbe la logica per disabilitare lo scheduling
|
||||
pass
|
||||
|
||||
def cron_enable_all_open_close():
|
||||
"""Placeholder per la funzione di abilitazione scheduling cron."""
|
||||
log_write("socket_server", "info", "Executing cron_enable_all_open_close")
|
||||
# Qui andrebbe la logica per abilitare lo scheduling
|
||||
pass
|
||||
|
||||
def set_cron_init():
|
||||
"""Placeholder per la funzione set_cron_init."""
|
||||
log_write("socket_server", "info", "Executing set_cron_init")
|
||||
return "" # Simula output vuoto per successo
|
||||
|
||||
def set_cron_start_socket_server():
|
||||
"""Placeholder per la funzione set_cron_start_socket_server."""
|
||||
log_write("socket_server", "info", "Executing set_cron_start_socket_server")
|
||||
return ""
|
||||
|
||||
def set_cron_check_rain_sensor():
|
||||
"""Placeholder per la funzione set_cron_check_rain_sensor."""
|
||||
log_write("socket_server", "info", "Executing set_cron_check_rain_sensor")
|
||||
return ""
|
||||
|
||||
def set_cron_check_rain_online():
|
||||
"""Placeholder per la funzione set_cron_check_rain_online."""
|
||||
log_write("socket_server", "info", "Executing set_cron_check_rain_online")
|
||||
return ""
|
||||
|
||||
def set_cron_close_all_for_rain():
|
||||
"""Placeholder per la funzione set_cron_close_all_for_rain."""
|
||||
log_write("socket_server", "info", "Executing set_cron_close_all_for_rain")
|
||||
return ""
|
||||
|
||||
def del_cron_open(arg):
|
||||
"""Placeholder per la funzione del_cron_open."""
|
||||
log_write("socket_server", "info", f"Executing del_cron_open with arg: {arg}")
|
||||
return ""
|
||||
|
||||
def del_cron_open_in(arg):
|
||||
"""Placeholder per la funzione del_cron_open_in."""
|
||||
log_write("socket_server", "info", f"Executing del_cron_open_in with arg: {arg}")
|
||||
return ""
|
||||
|
||||
def del_cron_close(arg):
|
||||
"""Placeholder per la funzione del_cron_close."""
|
||||
log_write("socket_server", "info", f"Executing del_cron_close with arg: {arg}")
|
||||
return ""
|
||||
|
||||
def add_cron_open(arg2, arg3, arg4, arg5, arg6, arg7, arg8):
|
||||
"""Placeholder per la funzione add_cron_open."""
|
||||
log_write("socket_server", "info", f"Executing add_cron_open with args: {arg2}, {arg3}, {arg4}, {arg5}, {arg6}, {arg7}, {arg8}")
|
||||
return ""
|
||||
|
||||
def add_cron_close(arg2, arg3, arg4, arg5, arg6, arg7, arg8):
|
||||
"""Placeholder per la funzione add_cron_close."""
|
||||
log_write("socket_server", "info", f"Executing add_cron_close with args: {arg2}, {arg3}, {arg4}, {arg5}, {arg6}, {arg7}, {arg8}")
|
||||
return ""
|
||||
|
||||
def cmd_pigardensched(arg2, arg3, arg4, arg5, arg6):
|
||||
"""Placeholder per la funzione cmd_pigardensched."""
|
||||
log_write("socket_server", "info", f"Executing cmd_pigardensched with args: {arg2}, {arg3}, {arg4}, {arg5}, {arg6}")
|
||||
return ""
|
||||
|
||||
def reset_last_rain_sensor_timestamp():
|
||||
"""Placeholder per la funzione reset_last_rain_sensor_timestamp."""
|
||||
log_write("socket_server", "info", "Executing reset_last_rain_sensor_timestamp")
|
||||
pass
|
||||
|
||||
def reset_last_rain_online_timestamp():
|
||||
"""Placeholder per la funzione reset_last_rain_online_timestamp."""
|
||||
log_write("socket_server", "info", "Executing reset_last_rain_online_timestamp")
|
||||
pass
|
||||
|
||||
def sensor_status_set(arg2, arg3, arg4):
|
||||
"""Placeholder per la funzione sensor_status_set."""
|
||||
log_write("socket_server", "info", f"Executing sensor_status_set with args: {arg2}, {arg3}, {arg4}")
|
||||
pass
|
||||
|
||||
# --- FUNZIONI DI GESTIONE DEI PROCESSI ---
|
||||
def list_descendants(pid):
|
||||
"""
|
||||
Simula la logica di 'list_descendants' Bash.
|
||||
In un sistema reale, dovresti usare librerie come 'psutil'
|
||||
per ottenere i processi figli. Qui è solo una simulazione.
|
||||
"""
|
||||
try:
|
||||
# psutil è l'opzione migliore, se disponibile
|
||||
# import psutil
|
||||
# parent = psutil.Process(pid)
|
||||
# return [child.pid for child in parent.children(recursive=True)]
|
||||
|
||||
# Fallback semplice per simulazione (non accurato per processi reali)
|
||||
# Se non hai psutil, questa funzione è difficile da replicare accuratamente in modo generico.
|
||||
# Potrebbe essere necessario un approccio specifico per il tuo sistema operativo.
|
||||
return [] # Nessun discendente noto senza psutil
|
||||
except Exception as e:
|
||||
log_write("process_manager", "error", f"Errore nel listing dei discendenti di {pid}: {e}")
|
||||
return []
|
||||
|
||||
def get_script_path():
|
||||
"""Restituisce il percorso assoluto dello script corrente."""
|
||||
return os.path.abspath(sys.argv[0])
|
||||
|
||||
# --- CLASSE GESTORE RICHIESTE SOCKET ---
|
||||
class MyTCPHandler(socketserver.BaseRequestHandler):
|
||||
"""
|
||||
La classe MyTCPHandler gestirà ogni nuova connessione client.
|
||||
Sovrascrive il metodo handle() per implementare la logica del server.
|
||||
"""
|
||||
def handle(self):
|
||||
global RUN_FROM_TCPSERVER
|
||||
RUN_FROM_TCPSERVER = True
|
||||
|
||||
# TCPREMOTEIP in Python è self.client_address[0]
|
||||
client_ip = self.client_address[0]
|
||||
log_write("socket_server", "info", f"Nuova connessione da: {client_ip}")
|
||||
|
||||
try:
|
||||
# Autenticazione (se configurata)
|
||||
if TCPSERVER_USER and TCPSERVER_PWD:
|
||||
# Leggi utente (timeout 3 secondi)
|
||||
self.request.settimeout(3)
|
||||
try:
|
||||
user_line = self.request.recv(1024).decode('utf-8').strip()
|
||||
password_line = self.request.recv(1024).decode('utf-8').strip()
|
||||
except socket.timeout:
|
||||
log_write("socket_server", "warning", f"socket connection from: {client_ip} - Timeout during credentials read")
|
||||
json_error(0, "Authentication timeout")
|
||||
return
|
||||
|
||||
if user_line != TCPSERVER_USER or password_line != TCPSERVER_PWD:
|
||||
log_write("socket_server", "warning", f"socket connection from: {client_ip} - Bad socket server credentials - user:{user_line}")
|
||||
self.request.sendall(f'{{"status": "error", "code": 0, "message": "Bad socket server credentials"}}\n'.encode('utf-8'))
|
||||
return
|
||||
else:
|
||||
log_write("socket_server", "info", f"socket connection from: {client_ip} - Authentication successful")
|
||||
|
||||
# Leggi il comando
|
||||
command_line = self.request.recv(4096).decode('utf-8').strip()
|
||||
|
||||
# Parsing del comando
|
||||
args = command_line.split(' ')
|
||||
arg1 = args[0] if len(args) > 0 else ""
|
||||
arg2 = args[1] if len(args) > 1 else ""
|
||||
arg3 = args[2] if len(args) > 2 else ""
|
||||
arg4 = args[3] if len(args) > 3 else ""
|
||||
arg5 = args[4] if len(args) > 4 else ""
|
||||
arg6 = args[5] if len(args) > 5 else ""
|
||||
arg7 = args[6] if len(args) > 6 else ""
|
||||
arg8 = args[7] if len(args) > 7 else ""
|
||||
|
||||
log_write("socket_server", "info", f"socket connection from: {client_ip} - command: {command_line}")
|
||||
|
||||
# Esegui il comando
|
||||
response = self.execute_command(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8)
|
||||
|
||||
# Invia la risposta al client (assumendo che le funzioni json_error/json_status stampino su stdout)
|
||||
# Potresti voler catturare l'output di quelle funzioni e inviarlo qui.
|
||||
# Per semplicità, qui si presuppone che inviino direttamente al client,
|
||||
# ma in un sistema reale dovresti costruire la risposta JSON e inviarla.
|
||||
self.request.sendall(response.encode('utf-8') + b'\n')
|
||||
|
||||
except socket.timeout:
|
||||
log_write("socket_server", "warning", f"socket connection from: {client_ip} - Timeout waiting for command.")
|
||||
self.request.sendall(f'{{"status": "error", "code": 0, "message": "Timeout waiting for command"}}\n'.encode('utf-8'))
|
||||
except Exception as e:
|
||||
log_write("socket_server", "error", f"Errore durante la gestione della connessione da {client_ip}: {e}")
|
||||
self.request.sendall(f'{{"status": "error", "code": -1, "message": "Internal server error"}}\n'.encode('utf-8'))
|
||||
finally:
|
||||
self.request.close() # Chiudi la connessione
|
||||
RUN_FROM_TCPSERVER = False # Reset per la prossima connessione o per operazioni interne
|
||||
|
||||
def execute_command(self, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8):
|
||||
"""Esegue il comando ricevuto dal socket server."""
|
||||
# Questo metodo replicherà la logica della case statement Bash
|
||||
vret = "" # Per catturare i risultati delle funzioni
|
||||
|
||||
if arg1 == "status":
|
||||
json_status(arg2, arg3, arg4, arg5, arg6, arg7)
|
||||
elif arg1 == "open":
|
||||
if not arg2: # "empty$arg2" == "empty"
|
||||
json_error(0, "Alias solenoid not specified")
|
||||
else:
|
||||
ev_open(arg2, arg3) # &> /dev/null è gestito non catturando l'output
|
||||
json_status(f"get_cron_open_in:{arg2}")
|
||||
elif arg1 == "open_in":
|
||||
ev_open_in(arg2, arg3, arg4, arg5)
|
||||
json_status(f"get_cron_open_in:{arg4}")
|
||||
elif arg1 == "close":
|
||||
if not arg2:
|
||||
json_error(0, "Alias solenoid not specified")
|
||||
else:
|
||||
ev_close(arg2)
|
||||
json_status(f"get_cron_open_in:{arg2}")
|
||||
elif arg1 == "close_all":
|
||||
if arg2 == "disable_scheduling":
|
||||
cron_disable_all_open_close()
|
||||
close_all()
|
||||
message_write("success", "All solenoid closed")
|
||||
json_status()
|
||||
elif arg1 == "cron_enable_all_open_close":
|
||||
cron_enable_all_open_close()
|
||||
message_write("success", "All solenoid enabled")
|
||||
json_status()
|
||||
elif arg1 == "set_general_cron":
|
||||
# Per i comandi di set_general_cron, chiami le funzioni e concateni i risultati
|
||||
# Ho ipotizzato che i risultati vuoti indichino successo e stringhe non vuote errori
|
||||
funcs_to_call = {
|
||||
"set_cron_init": set_cron_init,
|
||||
"set_cron_start_socket_server": set_cron_start_socket_server,
|
||||
"set_cron_check_rain_sensor": set_cron_check_rain_sensor,
|
||||
"set_cron_check_rain_online": set_cron_check_rain_online,
|
||||
"set_cron_close_all_for_rain": set_cron_close_all_for_rain,
|
||||
}
|
||||
|
||||
# Qui si itera sugli argomenti come nello script Bash
|
||||
for i_arg in [arg for arg in [arg2, arg3, arg4, arg5, arg6, arg7] if arg]:
|
||||
if i_arg in funcs_to_call:
|
||||
ret_val = funcs_to_call[i_arg]()
|
||||
if ret_val: # Se la funzione restituisce qualcosa (un errore in Bash)
|
||||
vret += ret_val
|
||||
|
||||
if vret: # Se c'è stato qualche errore in una delle chiamate
|
||||
json_error(0, "Cron set failed")
|
||||
log_write("socket_server", "error", f"Cron set failed: {vret}")
|
||||
else:
|
||||
message_write("success", "Cron set successful")
|
||||
json_status()
|
||||
|
||||
elif arg1 == "del_cron_open":
|
||||
vret = del_cron_open(arg2)
|
||||
if vret:
|
||||
json_error(0, "Cron set failed")
|
||||
log_write("socket_server", "error", f"Cron del failed: {vret}")
|
||||
else:
|
||||
message_write("success", "Cron set successful")
|
||||
json_status()
|
||||
|
||||
elif arg1 == "del_cron_open_in":
|
||||
vret = del_cron_open_in(arg2)
|
||||
if vret:
|
||||
json_error(0, "Cron del failed")
|
||||
log_write("socket_server", "error", f"Cron del failed: {vret}")
|
||||
else:
|
||||
message_write("success", "Scheduled start successfully deleted")
|
||||
json_status(f"get_cron_open_in:{arg2}")
|
||||
|
||||
elif arg1 == "del_cron_close":
|
||||
vret = del_cron_close(arg2)
|
||||
if vret:
|
||||
json_error(0, "Cron set failed")
|
||||
log_write("socket_server", "error", f"Cron set failed: {vret}")
|
||||
else:
|
||||
message_write("success", "Cron set successful")
|
||||
json_status()
|
||||
|
||||
elif arg1 == "add_cron_open":
|
||||
vret = add_cron_open(arg2, arg3, arg4, arg5, arg6, arg7, arg8)
|
||||
if vret:
|
||||
json_error(0, "Cron set failed")
|
||||
log_write("socket_server", "error", f"Cron set failed: {vret}")
|
||||
else:
|
||||
message_write("success", "Cron set successful")
|
||||
json_status()
|
||||
|
||||
elif arg1 == "add_cron_close":
|
||||
vret = add_cron_close(arg2, arg3, arg4, arg5, arg6, arg7, arg8)
|
||||
if vret:
|
||||
json_error(0, "Cron set failed")
|
||||
log_write("socket_server", "error", f"Cron set failed: {vret}")
|
||||
else:
|
||||
message_write("success", "Cron set successful")
|
||||
json_status()
|
||||
|
||||
elif arg1 == "cmd_pigardensched":
|
||||
vret = cmd_pigardensched(arg2, arg3, arg4, arg5, arg6)
|
||||
if vret:
|
||||
json_error(0, "piGardenSched command failed")
|
||||
log_write("socket_server", "error", f"piGardenSched command failed: {vret}")
|
||||
else:
|
||||
message_write("success", "Schedule set successful")
|
||||
json_status()
|
||||
|
||||
elif arg1 == "reboot":
|
||||
message_write("warning", "System reboot is started")
|
||||
json_status()
|
||||
current_script_path = get_script_path()
|
||||
# Esegui il reboot in un sottoprocesso separato per non bloccare il server
|
||||
# e con nohup-like behavior.
|
||||
# Questo è un esempio, la gestione di reboot/poweroff in Python è delicata.
|
||||
# Potresti voler chiamare un comando di sistema come `sudo reboot`.
|
||||
subprocess.Popen([current_script_path, "reboot_system_internal_cmd"],
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
||||
preexec_fn=os.setsid) # setsid per disassociare dal gruppo di processi
|
||||
|
||||
elif arg1 == "poweroff":
|
||||
message_write("warning", "System shutdown is started")
|
||||
json_status()
|
||||
current_script_path = get_script_path()
|
||||
subprocess.Popen([current_script_path, "poweroff_system_internal_cmd"],
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
||||
preexec_fn=os.setsid) # setsid per disassociare dal gruppo di processi
|
||||
|
||||
elif arg1 == "reset_last_rain_sensor_timestamp":
|
||||
reset_last_rain_sensor_timestamp()
|
||||
message_write("success", "Timestamp of last sensor rain successful reset")
|
||||
json_status()
|
||||
|
||||
elif arg1 == "reset_last_rain_online_timestamp":
|
||||
reset_last_rain_online_timestamp()
|
||||
message_write("success", "Timestamp of last online rain successful reset")
|
||||
json_status()
|
||||
|
||||
elif arg1 == "sensor_status_set":
|
||||
if not arg2:
|
||||
json_error(0, "Alias sensor not specified")
|
||||
else:
|
||||
sensor_status_set(arg2, arg3, arg4)
|
||||
json_status()
|
||||
|
||||
else:
|
||||
json_error(0, "invalid command")
|
||||
|
||||
# Le funzioni json_error, json_status, message_write stampano direttamente su stdout
|
||||
# In un contesto di server reale, dovresti catturare il loro output e inviarlo al client.
|
||||
# Per questa conversione, sto assumendo che tu voglia replicare il comportamento Bash
|
||||
# di "stampa e poi la pipeline gestisce l'invio". Potrebbe richiedere un refactoring
|
||||
# delle funzioni json_error/status per restituire stringhe invece di stampare.
|
||||
# Per ora, restituisco una stringa vuota o un placeholder.
|
||||
return "" # Potresti voler restituire il JSON o il messaggio qui
|
||||
|
||||
# --- FUNZIONI PRINCIPALI DEL SERVER ---
|
||||
|
||||
def start_socket_server():
|
||||
"""Avvia il server socket."""
|
||||
# Rimuovi il file PID esistente
|
||||
if os.path.exists(TCPSERVER_PID_FILE):
|
||||
os.remove(TCPSERVER_PID_FILE)
|
||||
|
||||
# Scrivi il PID dello script principale nel file PID
|
||||
# In un sistema reale, dovresti scrivere il PID del processo del server,
|
||||
# che potrebbe essere diverso se usi un manager di processi come systemd.
|
||||
current_pid = os.getpid()
|
||||
with open(TCPSERVER_PID_FILE, "w") as f:
|
||||
f.write(str(current_pid))
|
||||
|
||||
log_write("socket_server", "info", f"Server PID {current_pid} scritto in {TCPSERVER_PID_FILE}")
|
||||
|
||||
try:
|
||||
# Crea il server TCP
|
||||
# ThreadingTCPServer gestisce ogni richiesta in un thread separato
|
||||
with socketserver.ThreadingTCPServer((TCPSERVER_IP, TCPSERVER_PORT), MyTCPHandler) as server:
|
||||
log_write("socket_server", "info", f"Server socket avviato su {TCPSERVER_IP}:{TCPSERVER_PORT}")
|
||||
# Avvia il server, rimarrà in ascolto indefinitamente
|
||||
server.serve_forever()
|
||||
except Exception as e:
|
||||
log_write("socket_server", "error", f"Errore all'avvio del server socket: {e}")
|
||||
if os.path.exists(TCPSERVER_PID_FILE):
|
||||
os.remove(TCPSERVER_PID_FILE) # Pulisci il PID file in caso di errore
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def stop_socket_server():
|
||||
"""Ferma il server socket."""
|
||||
if not os.path.exists(TCPSERVER_PID_FILE):
|
||||
print("Daemon is not running")
|
||||
sys.exit(1)
|
||||
|
||||
log_write("socket_server", "info", "stop socket server")
|
||||
|
||||
with open(TCPSERVER_PID_FILE, "r") as f:
|
||||
try:
|
||||
pid = int(f.read().strip())
|
||||
except ValueError:
|
||||
print(f"Errore: Il file PID '{TCPSERVER_PID_FILE}' contiene un PID non valido.")
|
||||
sys.exit(1)
|
||||
|
||||
# Tentativo di killare i processi discendenti (se list_descendants è implementato)
|
||||
descendants = list_descendants(pid)
|
||||
for d_pid in descendants:
|
||||
try:
|
||||
os.kill(d_pid, 9) # SIGKILL
|
||||
log_write("socket_server", "info", f"Terminato processo discendente {d_pid}")
|
||||
except ProcessLookupError:
|
||||
pass # Il processo non esiste
|
||||
|
||||
# Tenta di killare il processo principale del server
|
||||
try:
|
||||
os.kill(pid, 9) # SIGKILL
|
||||
log_write("socket_server", "info", f"Terminato processo server {pid}")
|
||||
except ProcessLookupError:
|
||||
print(f"Processo con PID {pid} non trovato.")
|
||||
except Exception as e:
|
||||
log_write("socket_server", "error", f"Errore durante l'uccisione del processo server {pid}: {e}")
|
||||
|
||||
# Rimuovi il file PID
|
||||
if os.path.exists(TCPSERVER_PID_FILE):
|
||||
os.remove(TCPSERVER_PID_FILE)
|
||||
log_write("socket_server", "info", "File PID rimosso.")
|
||||
|
||||
|
||||
# --- FUNZIONE PRINCIPALE PER LA GESTIONE DEGLI ARGOMENTI DELLA CLI ---
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) > 1:
|
||||
command = sys.argv[1]
|
||||
|
||||
if command == "start_socket_server":
|
||||
# Per avviare in background come un vero daemon, dovresti implementare
|
||||
# la demonizzazione (forking) qui o usare una libreria come `python-daemon`.
|
||||
# Per ora, questo lo avvia nel terminale corrente.
|
||||
log_write("main", "info", "Richiesta di avvio del socket server.")
|
||||
start_socket_server()
|
||||
elif command == "stop_socket_server":
|
||||
log_write("main", "info", "Richiesta di stop del socket server.")
|
||||
stop_socket_server()
|
||||
elif command == "socket_server_command":
|
||||
# Questa parte dovrebbe essere gestita dal server TCP stesso.
|
||||
# Non viene chiamata direttamente dalla riga di comando in Python in questo modo.
|
||||
# Se hai bisogno di testare la logica 'socket_server_command' isolatamente,
|
||||
# dovresti chiamare MyTCPHandler.execute_command() con argomenti di test.
|
||||
print("Errore: 'socket_server_command' non può essere chiamato direttamente come script.")
|
||||
print("Questa logica è gestita internamente dal server TCP.")
|
||||
sys.exit(1)
|
||||
elif command == "reboot_system_internal_cmd":
|
||||
# Comando interno per il riavvio effettivo
|
||||
log_write("main", "warning", "Eseguo il riavvio del sistema...")
|
||||
# In un sistema Linux, questo è il modo per riavviare
|
||||
subprocess.run(["sudo", "reboot"])
|
||||
# Assicurati che l'utente che esegue lo script abbia i permessi sudo senza password per 'reboot'
|
||||
elif command == "poweroff_system_internal_cmd":
|
||||
# Comando interno per lo spegnimento effettivo
|
||||
log_write("main", "warning", "Eseguo lo spegnimento del sistema...")
|
||||
# In un sistema Linux, questo è il modo per spegnere
|
||||
subprocess.run(["sudo", "poweroff"])
|
||||
# Assicurati che l'utente che esegue lo script abbia i permessi sudo senza password per 'poweroff'
|
||||
else:
|
||||
print(f"Comando non riconosciuto: {command}")
|
||||
print("Utilizzo: python your_script_name.py [start_socket_server|stop_socket_server]")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("Nessun comando specificato.")
|
||||
print("Utilizzo: python your_script_name.py [start_socket_server|stop_socket_server]")
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user