roberto-patch-1 #2

Merged
roberto merged 17 commits from roberto-patch-1 into master 2025-07-09 19:01:17 +02:00
Showing only changes of commit a8800280f3 - Show all commits

432
Python/sensor_include.py Normal file
View File

@@ -0,0 +1,432 @@
import os
import time
import json
import logging
# --- Mock/Placeholder per le dipendenze esterne e configurazione ---
# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale.
# Configura un logger di base per le funzioni di log
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def log_write(log_type, level, message):
"""Simula la funzione log_write dal tuo script Bash."""
if level == "info":
logging.info(f"[{log_type}] {message}")
elif level == "warning":
logging.warning(f"[{log_type}] {message}")
elif level == "error":
logging.error(f"[{log_type}] {message}")
else:
logging.debug(f"[{log_type}] {message}")
# Mock per la classe EventManager (dal file event_manager_py)
class MockEventManager:
def __init__(self):
self.CURRENT_EVENT = ""
self.CURRENT_EVENT_ALIAS = ""
def trigger_event(self, event, *args):
log_write("event", "info", f"Mock EventManager: Triggered event: {event} with args: {args}")
self.CURRENT_EVENT = event
# Simula un codice di ritorno di successo
return 0
# Mock per la classe DriverManager (dal file driver_manager_py)
class MockDriverManager:
def __init__(self, config):
self.config = config
def drv_rain_online_get(self, service_id):
"""Simula il recupero dello stato pioggia online."""
# Restituisce un timestamp se piove, un valore negativo se non piove, o 0 in caso di errore
# Per i test, simuliamo che non piova (-1) o che piova (timestamp attuale)
if "openweathermap" in service_id:
# Simula pioggia 50% delle volte
if time.time() % 2 == 0:
# Scrivi un mock di dati meteo online per il test
mock_weather_data = {
"weather": [{"description": "light rain"}],
"main": {"temp": 280, "humidity": 90}
}
with open(os.path.join(self.config["STATUS_DIR"], "last_weather_online"), "w") as f:
json.dump(mock_weather_data, f)
return str(int(time.time())) # Simula pioggia (timestamp)
else:
return "-1" # Simula non pioggia
return "0" # Errore o servizio non riconosciuto
def drv_rain_sensor_get(self, gpio_id):
"""Simula il recupero dello stato dal sensore pioggia."""
# Restituisce lo stato del GPIO (0 o 1)
# Per i test, simuliamo lo stato del sensore
if time.time() % 3 == 0:
return str(self.config.get("RAIN_GPIO_STATE", 0)) # Simula pioggia
return str(1 - self.config.get("RAIN_GPIO_STATE", 0)) # Simula non pioggia
# Mock per la classe PiGarden (per le dipendenze ev_*)
class MockPiGarden:
def __init__(self, config):
self.config = config
self.solenoid_states = {} # Mappa alias a stato (0=chiuso, 1=aperto, 2=forzato)
# Inizializza stati fittizi
for i in range(1, self.config.get("EV_TOTAL", 0) + 1):
self.solenoid_states[str(i)] = 0 # Tutti chiusi di default
def ev_status(self, alias):
"""Simula ev_status, restituisce lo stato dell'elettrovalvola."""
return self.solenoid_states.get(alias, 0) # 0 se non trovato o chiuso
def ev_close(self, alias):
"""Simula ev_close, imposta lo stato dell'elettrovalvola a chiuso."""
self.solenoid_states[alias] = 0
log_write("irrigate", "info", f"MockPiGarden: Elettrovalvola '{alias}' chiusa.")
def ev_check_moisture(self, ev_num):
"""
Simula ev_check_moisture.
Ritorna 0 se l'umidità non è stata raggiunta (bisogno d'acqua),
>0 se l'umidità massima è stata raggiunta.
"""
# Per i test, simuliamo che l'umidità sia raggiunta per EV1, altrimenti no
if ev_num == 1:
return 1 # Umidità raggiunta
return 0 # Umidità non raggiunta
def ev_check_moisture_autoclose(self, ev_num):
"""
Simula ev_check_moisture_autoclose.
Ritorna 0 se non deve chiudere automaticamente, >0 se sì.
"""
# Per i test, simuliamo che l'autochiusura sia attiva per EV1 e l'umidità sia raggiunta
if self.config.get(f"EV{ev_num}_SENSOR_MOISTURE_AUTOCLOSE", "0") == "1":
return self.ev_check_moisture(ev_num) # Usa la logica di check_moisture
return 0
def ev_number2norain(self, ev_num):
"""Simula ev_number2norain."""
return self.config.get(f"EV{ev_num}_NORAIN", "0") == "1"
# Variabili di configurazione simulate (dal piGarden.conf)
mock_config_rain = {
"WEATHER_SERVICE": "drv:openweathermap", # Esempio di driver di servizio meteo
"RAIN_GPIO": 25,
"RAIN_GPIO_STATE": 0, # Stato del GPIO che indica pioggia
"NOT_IRRIGATE_IF_RAIN_ONLINE": 86400, # 24 ore in secondi
"NOT_IRRIGATE_IF_RAIN_SENSOR": 86400, # 24 ore in secondi
"EV_TOTAL": 6,
"STATUS_DIR": "/tmp/piGarden_status", # Directory per i file di stato
# Elettrovalvole di esempio per close_all_for_rain
"EV1_ALIAS": "Zona_1", "EV1_NORAIN": "0", "EV1_SENSOR_MOISTURE_AUTOCLOSE": "1",
"EV2_ALIAS": "Zona_2", "EV2_NORAIN": "1", # Questa zona non si chiude per pioggia
"EV3_ALIAS": "Zona_3", "EV3_NORAIN": "0",
"EV4_ALIAS": "Zona_4", "EV4_NORAIN": "0",
"EV5_ALIAS": "Zona_5", "EV5_NORAIN": "0",
"EV6_ALIAS": "Zona_6", "EV6_NORAIN": "0",
}
# Assicurati che la directory di stato esista per i test
os.makedirs(mock_config_rain["STATUS_DIR"], exist_ok=True)
# --- Classe RainManager ---
class RainManager:
def __init__(self, config, log_writer, event_manager, driver_manager, pigarden_core):
self.config = config
self.log_write = log_writer
self.event_manager = event_manager
self.driver_manager = driver_manager
self.pigarden_core = pigarden_core # Istanza della classe PiGarden principale
self.status_dir = self.config.get("STATUS_DIR")
self.weather_service = self.config.get("WEATHER_SERVICE")
self.rain_gpio = self.config.get("RAIN_GPIO")
self.rain_gpio_state = self.config.get("RAIN_GPIO_STATE")
self.not_irrigate_if_rain_online = self.config.get("NOT_IRRIGATE_IF_RAIN_ONLINE")
self.not_irrigate_if_rain_sensor = self.config.get("NOT_IRRIGATE_IF_RAIN_SENSOR")
self.ev_total = self.config.get("EV_TOTAL")
# Inizializza i file di stato se non esistono
self._init_status_files()
def _init_status_files(self):
"""Assicura che i file di stato esistano per evitare errori FileNotFoundError."""
for filename in ["last_state_rain_online", "last_rain_online",
"last_state_rain_sensor", "last_rain_sensor",
"last_weather_online"]:
file_path = os.path.join(self.status_dir, filename)
if not os.path.exists(file_path):
# Crea il file vuoto o con un valore di default appropriato
with open(file_path, 'w') as f:
if filename.startswith("last_rain_"):
f.write("0") # Timestamp 0
elif filename.startswith("last_state_rain_"):
f.write("unknown")
elif filename == "last_weather_online":
f.write("{}") # JSON vuoto
def _read_status_file(self, filename, default=""):
"""Legge il contenuto di un file di stato."""
file_path = os.path.join(self.status_dir, filename)
try:
with open(file_path, 'r') as f:
content = f.read().strip()
return content if content else default
except FileNotFoundError:
return default
except Exception as e:
self.log_write("rain", "error", f"Errore lettura {filename}: {e}")
return default
def _write_status_file(self, filename, content):
"""Scrive il contenuto in un file di stato."""
file_path = os.path.join(self.status_dir, filename)
try:
with open(file_path, 'w') as f:
f.write(str(content))
except Exception as e:
self.log_write("rain", "error", f"Errore scrittura {filename}: {e}")
def _delete_status_file(self, filename):
"""Elimina un file di stato."""
file_path = os.path.join(self.status_dir, filename)
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
self.log_write("rain", "error", f"Errore eliminazione {filename}: {e}")
def check_rain_online(self):
"""
Esegue il controllo meteo tramite il servizio online configurato.
"""
if self.weather_service == "none":
self.log_write("rain", "warning", "check_rain_online - servizio online disabilitato")
return
self.event_manager.trigger_event("check_rain_online_before", "")
local_epoch_str = self.driver_manager.drv_rain_online_get(self.weather_service)
current_state_rain_online = ""
last_state_rain_online = self._read_status_file("last_state_rain_online", default="norain")
weather_json = "{}" # Default a JSON vuoto
if local_epoch_str and local_epoch_str.lstrip('-').isdigit(): # Controlla se è un numero (anche negativo)
local_epoch = int(local_epoch_str)
if local_epoch == 0:
self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (valore 0)")
else:
if local_epoch > 0:
current_state_rain_online = 'rain'
self._write_status_file("last_rain_online", local_epoch)
else:
current_state_rain_online = 'norain'
# Leggi il JSON meteo, se esiste e valido
weather_data_str = self._read_status_file("last_weather_online", default="{}")
try:
weather_json = json.loads(weather_data_str)
# Estrai solo la parte "weather" se presente, altrimenti l'intero JSON
weather_display = json.dumps(weather_json.get("weather", weather_json))
except json.JSONDecodeError:
weather_display = "null" # Se il file non è un JSON valido
self.log_write("rain", "info", f"check_rain_online - weather={weather_display}, local_epoch={local_epoch}")
if current_state_rain_online != last_state_rain_online:
self._write_status_file("last_state_rain_online", current_state_rain_online)
self.event_manager.trigger_event("check_rain_online_change", current_state_rain_online, weather_display)
else:
self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (non un numero)")
self.event_manager.trigger_event("check_rain_online_after", current_state_rain_online, weather_json)
def check_rain_sensor(self):
"""
Controlla se piove tramite sensore hardware.
"""
if not self.rain_gpio:
self.log_write("rain", "warning", "Sensore pioggia non presente")
return
self.event_manager.trigger_event("check_rain_sensor_before", "")
current_state_rain_sensor = ""
last_state_rain_sensor = self._read_status_file("last_state_rain_sensor", default="norain")
s_str = self.driver_manager.drv_rain_sensor_get(self.rain_gpio)
s = int(s_str) if s_str and s_str.isdigit() else -1 # Converte in int, default a -1 se non valido
if s == self.rain_gpio_state: # Confronta con lo stato configurato per la pioggia
current_state_rain_sensor = 'rain'
local_epoch = int(time.time())
self._write_status_file("last_rain_sensor", local_epoch)
self.log_write("rain", "info", f"check_rain_sensor - ora sta piovendo ({local_epoch})")
else:
current_state_rain_sensor = 'norain'
self.log_write("rain", "info", "check_rain_sensor - ora non sta piovendo")
if current_state_rain_sensor != last_state_rain_sensor:
self._write_status_file("last_state_rain_sensor", current_state_rain_sensor)
self.event_manager.trigger_event("check_rain_sensor_change", current_state_rain_sensor)
self.event_manager.trigger_event("check_rain_sensor_after", current_state_rain_sensor)
def close_all_for_rain(self):
"""
Chiude tutte le elettrovalvole se sta piovendo o se hanno raggiunto l'umidità massima.
"""
# Chiude le elettrovalvole che hanno raggiunto l'umidità del terreno impostata
for i in range(1, self.ev_total + 1):
alias = self.config.get(f"EV{i}_ALIAS")
if not alias: continue # Salta se l'alias non è definito
state = self.pigarden_core.ev_status(alias)
moisture = self.pigarden_core.ev_check_moisture_autoclose(i)
# Se l'elettrovalvola è aperta (stato 1) e l'umidità massima è stata raggiunta (moisture > 0)
if state == 1 and moisture > 0:
self.pigarden_core.ev_close(alias)
self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' perché l'umidità massima del terreno è stata raggiunta")
# Chiude le elettrovalvole in caso di pioggia (online o sensore)
close_all_flag = False
now = int(time.time())
# Controllo pioggia online
if self.not_irrigate_if_rain_online > 0:
last_rain_online_str = self._read_status_file("last_rain_online", default="0")
try:
last_rain_online = int(last_rain_online_str)
if now - last_rain_online < self.not_irrigate_if_rain_online:
close_all_flag = True
except ValueError:
pass # Ignora se il timestamp non è un numero
# Controllo pioggia sensore
if self.not_irrigate_if_rain_sensor > 0:
last_rain_sensor_str = self._read_status_file("last_rain_sensor", default="0")
try:
last_rain_sensor = int(last_rain_sensor_str)
if now - last_rain_sensor < self.not_irrigate_if_rain_sensor:
close_all_flag = True
except ValueError:
pass # Ignora se il timestamp non è un numero
if close_all_flag:
# Piove: valuta se chiudere le elettrovalvole
for i in range(1, self.ev_total + 1):
alias = self.config.get(f"EV{i}_ALIAS")
if not alias: continue
state = self.pigarden_core.ev_status(alias)
ev_norain = self.pigarden_core.ev_number2norain(i) # True se non deve chiudere per pioggia
moisture = self.pigarden_core.ev_check_moisture(i) # 0 se non ha raggiunto l'umidità ottimale
# Se l'elettrovalvola è aperta (stato 1), NON è impostata per ignorare la pioggia (ev_norain è False),
# E l'umidità non è ancora ottimale (moisture è 0 o non ha raggiunto il max)
# La logica Bash `[ "$moisture" -ne 0 ]` significa "se l'umidità NON è zero",
# che nel contesto di `ev_check_moisture` (che ritorna 0 per "non raggiunta", >0 per "raggiunta")
# significherebbe "se l'umidità è stata raggiunta".
# Tuttavia, il commento nello script Bash `if [ $moisture -gt 0 ]; then message_write "warning" "solenoid not open because maximum soil moisture has been reached"`
# suggerisce che >0 significa "umidità massima raggiunta".
# Quindi, per chiudere per pioggia, l'umidità NON deve essere già al massimo.
# Se `ev_check_moisture` ritorna 0 per "non raggiunto" e >0 per "raggiunto",
# allora `moisture == 0` significa "ha ancora bisogno d'acqua".
# Quindi, la condizione per chiudere per pioggia dovrebbe essere:
# `state == 1` (aperta) AND `not ev_norain` (non ignora pioggia) AND `moisture == 0` (ha ancora bisogno d'acqua)
# Il Bash `[ "$moisture" -ne 0 ]` nella seconda loop di close_all_for_rain è contro-intuitivo
# se `ev_check_moisture` ritorna >0 per "raggiunto".
# Assumo che `moisture -ne 0` in quel contesto significhi "se l'umidità non è perfetta, chiudi".
# Se `ev_check_moisture` ritorna 0 per "umidità OK/raggiunta" e 1 per "non OK", allora -ne 0 ha senso.
# Basandomi sulla funzione `ev_open` che usa `moisture -gt 0` per bloccare l'apertura (umidità già alta),
# `moisture -ne 0` qui dovrebbe significare "se l'umidità non è ancora ottimale/non è zero".
# Se 0 significa "umidità OK", allora -ne 0 significa "umidità NON OK".
# Adotterò la traduzione letterale di `moisture != 0` e lascerò al mock di `ev_check_moisture` di definire il comportamento.
if state == 1 and not ev_norain and moisture != 0:
self.pigarden_core.ev_close(alias)
self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' per pioggia")
def last_rain_sensor_timestamp(self):
"""Mostra il timestamp dell'ultima pioggia rilevato dal sensore."""
return self._read_status_file("last_rain_sensor", default="0")
def last_rain_online_timestamp(self):
"""Mostra il timestamp dell'ultima pioggia rilevato dal servizio online."""
return self._read_status_file("last_rain_online", default="0")
def reset_last_rain_sensor_timestamp(self):
"""Resetta il timestamp dell'ultima pioggia rilevato dal sensore."""
self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_before", "")
self._delete_status_file("last_rain_sensor")
self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_after", "")
self.log_write("rain", "info", "reset_last_rain_sensor_timestamp")
def reset_last_rain_online_timestamp(self):
"""Resetta il timestamp dell'ultima pioggia rilevato dal servizio online."""
self.event_manager.trigger_event("reset_last_rain_online_timestamp_before", "")
self._delete_status_file("last_rain_online")
# Il Bash script chiama trigger_event due volte con _before, correggo a _after
self.event_manager.trigger_event("reset_last_rain_online_timestamp_after", "")
self.log_write("rain", "info", "reset_last_rain_online_timestamp")
# --- Esempio di utilizzo ---
if __name__ == "__main__":
print("--- Test RainManager ---")
# Inizializza le dipendenze mock
mock_event_manager = MockEventManager()
mock_driver_manager = MockDriverManager(mock_config_rain)
mock_pigarden_core = MockPiGarden(mock_config_rain)
# Inizializza RainManager
rain_manager = RainManager(
config=mock_config_rain,
log_writer=log_write,
event_manager=mock_event_manager,
driver_manager=mock_driver_manager,
pigarden_core=mock_pigarden_core
)
# --- Test check_rain_online ---
print("\n--- Test: check_rain_online (potrebbe simulare pioggia o no) ---")
rain_manager.check_rain_online()
print(f"Stato pioggia online (file): {rain_manager.last_rain_online_timestamp()}")
print(f"Stato ultimo rilevamento online (file): {rain_manager._read_status_file('last_state_rain_online')}")
# --- Test check_rain_sensor ---
print("\n--- Test: check_rain_sensor (potrebbe simulare pioggia o no) ---")
rain_manager.check_rain_sensor()
print(f"Stato pioggia sensore (file): {rain_manager.last_rain_sensor_timestamp()}")
print(f"Stato ultimo rilevamento sensore (file): {rain_manager._read_status_file('last_state_rain_sensor')}")
# --- Test close_all_for_rain ---
print("\n--- Test: close_all_for_rain ---")
# Imposta un'elettrovalvola aperta per il test
mock_pigarden_core.solenoid_states["Zona_1"] = 1
mock_pigarden_core.solenoid_states["Zona_3"] = 1
print(f"Stato iniziale Zona_1: {mock_pigarden_core.ev_status('Zona_1')}")
print(f"Stato iniziale Zona_3: {mock_pigarden_core.ev_status('Zona_3')}")
# Simula una pioggia recente per attivare la chiusura
rain_manager._write_status_file("last_rain_online", int(time.time()) - 100) # 100 secondi fa
rain_manager._write_status_file("last_rain_sensor", int(time.time()) - 50) # 50 secondi fa
rain_manager.close_all_for_rain()
print(f"Stato finale Zona_1: {mock_pigarden_core.ev_status('Zona_1')} (dovrebbe essere 0 se autoclose è attivo o piove)")
print(f"Stato finale Zona_3: {mock_pigarden_core.ev_status('Zona_3')} (dovrebbe essere 0 se piove)")
# --- Test reset timestamp ---
print("\n--- Test: reset_last_rain_sensor_timestamp ---")
rain_manager.reset_last_rain_sensor_timestamp()
print(f"Timestamp sensore dopo reset: {rain_manager.last_rain_sensor_timestamp()}")
print("\n--- Test: reset_last_rain_online_timestamp ---")
rain_manager.reset_last_rain_online_timestamp()
print(f"Timestamp online dopo reset: {rain_manager.last_rain_online_timestamp()}")
print("\n--- Test completato ---")
# Pulisci le directory di test (opzionale)
# import shutil
# shutil.rmtree(mock_config_rain["STATUS_DIR"])