433 lines
21 KiB
Python
433 lines
21 KiB
Python
import os
|
|
import time
|
|
import json
|
|
import logging
|
|
|
|
# --- Mock/Placeholder per le dipendenze esterne e configurazione ---
|
|
# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale.
|
|
|
|
# Configura un logger di base per le funzioni di log
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
def log_write(log_type, level, message):
|
|
"""Simula la funzione log_write dal tuo script Bash."""
|
|
if level == "info":
|
|
logging.info(f"[{log_type}] {message}")
|
|
elif level == "warning":
|
|
logging.warning(f"[{log_type}] {message}")
|
|
elif level == "error":
|
|
logging.error(f"[{log_type}] {message}")
|
|
else:
|
|
logging.debug(f"[{log_type}] {message}")
|
|
|
|
# Mock per la classe EventManager (dal file event_manager_py)
|
|
class MockEventManager:
|
|
def __init__(self):
|
|
self.CURRENT_EVENT = ""
|
|
self.CURRENT_EVENT_ALIAS = ""
|
|
|
|
def trigger_event(self, event, *args):
|
|
log_write("event", "info", f"Mock EventManager: Triggered event: {event} with args: {args}")
|
|
self.CURRENT_EVENT = event
|
|
# Simula un codice di ritorno di successo
|
|
return 0
|
|
|
|
# Mock per la classe DriverManager (dal file driver_manager_py)
|
|
class MockDriverManager:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
|
|
def drv_rain_online_get(self, service_id):
|
|
"""Simula il recupero dello stato pioggia online."""
|
|
# Restituisce un timestamp se piove, un valore negativo se non piove, o 0 in caso di errore
|
|
# Per i test, simuliamo che non piova (-1) o che piova (timestamp attuale)
|
|
if "openweathermap" in service_id:
|
|
# Simula pioggia 50% delle volte
|
|
if time.time() % 2 == 0:
|
|
# Scrivi un mock di dati meteo online per il test
|
|
mock_weather_data = {
|
|
"weather": [{"description": "light rain"}],
|
|
"main": {"temp": 280, "humidity": 90}
|
|
}
|
|
with open(os.path.join(self.config["STATUS_DIR"], "last_weather_online"), "w") as f:
|
|
json.dump(mock_weather_data, f)
|
|
return str(int(time.time())) # Simula pioggia (timestamp)
|
|
else:
|
|
return "-1" # Simula non pioggia
|
|
return "0" # Errore o servizio non riconosciuto
|
|
|
|
def drv_rain_sensor_get(self, gpio_id):
|
|
"""Simula il recupero dello stato dal sensore pioggia."""
|
|
# Restituisce lo stato del GPIO (0 o 1)
|
|
# Per i test, simuliamo lo stato del sensore
|
|
if time.time() % 3 == 0:
|
|
return str(self.config.get("RAIN_GPIO_STATE", 0)) # Simula pioggia
|
|
return str(1 - self.config.get("RAIN_GPIO_STATE", 0)) # Simula non pioggia
|
|
|
|
# Mock per la classe PiGarden (per le dipendenze ev_*)
|
|
class MockPiGarden:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.solenoid_states = {} # Mappa alias a stato (0=chiuso, 1=aperto, 2=forzato)
|
|
# Inizializza stati fittizi
|
|
for i in range(1, self.config.get("EV_TOTAL", 0) + 1):
|
|
self.solenoid_states[str(i)] = 0 # Tutti chiusi di default
|
|
|
|
def ev_status(self, alias):
|
|
"""Simula ev_status, restituisce lo stato dell'elettrovalvola."""
|
|
return self.solenoid_states.get(alias, 0) # 0 se non trovato o chiuso
|
|
|
|
def ev_close(self, alias):
|
|
"""Simula ev_close, imposta lo stato dell'elettrovalvola a chiuso."""
|
|
self.solenoid_states[alias] = 0
|
|
log_write("irrigate", "info", f"MockPiGarden: Elettrovalvola '{alias}' chiusa.")
|
|
|
|
def ev_check_moisture(self, ev_num):
|
|
"""
|
|
Simula ev_check_moisture.
|
|
Ritorna 0 se l'umidità non è stata raggiunta (bisogno d'acqua),
|
|
>0 se l'umidità massima è stata raggiunta.
|
|
"""
|
|
# Per i test, simuliamo che l'umidità sia raggiunta per EV1, altrimenti no
|
|
if ev_num == 1:
|
|
return 1 # Umidità raggiunta
|
|
return 0 # Umidità non raggiunta
|
|
|
|
def ev_check_moisture_autoclose(self, ev_num):
|
|
"""
|
|
Simula ev_check_moisture_autoclose.
|
|
Ritorna 0 se non deve chiudere automaticamente, >0 se sì.
|
|
"""
|
|
# Per i test, simuliamo che l'autochiusura sia attiva per EV1 e l'umidità sia raggiunta
|
|
if self.config.get(f"EV{ev_num}_SENSOR_MOISTURE_AUTOCLOSE", "0") == "1":
|
|
return self.ev_check_moisture(ev_num) # Usa la logica di check_moisture
|
|
return 0
|
|
|
|
def ev_number2norain(self, ev_num):
|
|
"""Simula ev_number2norain."""
|
|
return self.config.get(f"EV{ev_num}_NORAIN", "0") == "1"
|
|
|
|
# Variabili di configurazione simulate (dal piGarden.conf)
|
|
mock_config_rain = {
|
|
"WEATHER_SERVICE": "drv:openweathermap", # Esempio di driver di servizio meteo
|
|
"RAIN_GPIO": 25,
|
|
"RAIN_GPIO_STATE": 0, # Stato del GPIO che indica pioggia
|
|
"NOT_IRRIGATE_IF_RAIN_ONLINE": 86400, # 24 ore in secondi
|
|
"NOT_IRRIGATE_IF_RAIN_SENSOR": 86400, # 24 ore in secondi
|
|
"EV_TOTAL": 6,
|
|
"STATUS_DIR": "/tmp/piGarden_status", # Directory per i file di stato
|
|
# Elettrovalvole di esempio per close_all_for_rain
|
|
"EV1_ALIAS": "Zona_1", "EV1_NORAIN": "0", "EV1_SENSOR_MOISTURE_AUTOCLOSE": "1",
|
|
"EV2_ALIAS": "Zona_2", "EV2_NORAIN": "1", # Questa zona non si chiude per pioggia
|
|
"EV3_ALIAS": "Zona_3", "EV3_NORAIN": "0",
|
|
"EV4_ALIAS": "Zona_4", "EV4_NORAIN": "0",
|
|
"EV5_ALIAS": "Zona_5", "EV5_NORAIN": "0",
|
|
"EV6_ALIAS": "Zona_6", "EV6_NORAIN": "0",
|
|
}
|
|
|
|
# Assicurati che la directory di stato esista per i test
|
|
os.makedirs(mock_config_rain["STATUS_DIR"], exist_ok=True)
|
|
|
|
# --- Classe RainManager ---
|
|
|
|
class RainManager:
|
|
def __init__(self, config, log_writer, event_manager, driver_manager, pigarden_core):
|
|
self.config = config
|
|
self.log_write = log_writer
|
|
self.event_manager = event_manager
|
|
self.driver_manager = driver_manager
|
|
self.pigarden_core = pigarden_core # Istanza della classe PiGarden principale
|
|
|
|
self.status_dir = self.config.get("STATUS_DIR")
|
|
self.weather_service = self.config.get("WEATHER_SERVICE")
|
|
self.rain_gpio = self.config.get("RAIN_GPIO")
|
|
self.rain_gpio_state = self.config.get("RAIN_GPIO_STATE")
|
|
self.not_irrigate_if_rain_online = self.config.get("NOT_IRRIGATE_IF_RAIN_ONLINE")
|
|
self.not_irrigate_if_rain_sensor = self.config.get("NOT_IRRIGATE_IF_RAIN_SENSOR")
|
|
self.ev_total = self.config.get("EV_TOTAL")
|
|
|
|
# Inizializza i file di stato se non esistono
|
|
self._init_status_files()
|
|
|
|
def _init_status_files(self):
|
|
"""Assicura che i file di stato esistano per evitare errori FileNotFoundError."""
|
|
for filename in ["last_state_rain_online", "last_rain_online",
|
|
"last_state_rain_sensor", "last_rain_sensor",
|
|
"last_weather_online"]:
|
|
file_path = os.path.join(self.status_dir, filename)
|
|
if not os.path.exists(file_path):
|
|
# Crea il file vuoto o con un valore di default appropriato
|
|
with open(file_path, 'w') as f:
|
|
if filename.startswith("last_rain_"):
|
|
f.write("0") # Timestamp 0
|
|
elif filename.startswith("last_state_rain_"):
|
|
f.write("unknown")
|
|
elif filename == "last_weather_online":
|
|
f.write("{}") # JSON vuoto
|
|
|
|
def _read_status_file(self, filename, default=""):
|
|
"""Legge il contenuto di un file di stato."""
|
|
file_path = os.path.join(self.status_dir, filename)
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
content = f.read().strip()
|
|
return content if content else default
|
|
except FileNotFoundError:
|
|
return default
|
|
except Exception as e:
|
|
self.log_write("rain", "error", f"Errore lettura {filename}: {e}")
|
|
return default
|
|
|
|
def _write_status_file(self, filename, content):
|
|
"""Scrive il contenuto in un file di stato."""
|
|
file_path = os.path.join(self.status_dir, filename)
|
|
try:
|
|
with open(file_path, 'w') as f:
|
|
f.write(str(content))
|
|
except Exception as e:
|
|
self.log_write("rain", "error", f"Errore scrittura {filename}: {e}")
|
|
|
|
def _delete_status_file(self, filename):
|
|
"""Elimina un file di stato."""
|
|
file_path = os.path.join(self.status_dir, filename)
|
|
try:
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
except Exception as e:
|
|
self.log_write("rain", "error", f"Errore eliminazione {filename}: {e}")
|
|
|
|
def check_rain_online(self):
|
|
"""
|
|
Esegue il controllo meteo tramite il servizio online configurato.
|
|
"""
|
|
if self.weather_service == "none":
|
|
self.log_write("rain", "warning", "check_rain_online - servizio online disabilitato")
|
|
return
|
|
|
|
self.event_manager.trigger_event("check_rain_online_before", "")
|
|
|
|
local_epoch_str = self.driver_manager.drv_rain_online_get(self.weather_service)
|
|
current_state_rain_online = ""
|
|
last_state_rain_online = self._read_status_file("last_state_rain_online", default="norain")
|
|
weather_json = "{}" # Default a JSON vuoto
|
|
|
|
if local_epoch_str and local_epoch_str.lstrip('-').isdigit(): # Controlla se è un numero (anche negativo)
|
|
local_epoch = int(local_epoch_str)
|
|
if local_epoch == 0:
|
|
self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (valore 0)")
|
|
else:
|
|
if local_epoch > 0:
|
|
current_state_rain_online = 'rain'
|
|
self._write_status_file("last_rain_online", local_epoch)
|
|
else:
|
|
current_state_rain_online = 'norain'
|
|
|
|
# Leggi il JSON meteo, se esiste e valido
|
|
weather_data_str = self._read_status_file("last_weather_online", default="{}")
|
|
try:
|
|
weather_json = json.loads(weather_data_str)
|
|
# Estrai solo la parte "weather" se presente, altrimenti l'intero JSON
|
|
weather_display = json.dumps(weather_json.get("weather", weather_json))
|
|
except json.JSONDecodeError:
|
|
weather_display = "null" # Se il file non è un JSON valido
|
|
|
|
self.log_write("rain", "info", f"check_rain_online - weather={weather_display}, local_epoch={local_epoch}")
|
|
|
|
if current_state_rain_online != last_state_rain_online:
|
|
self._write_status_file("last_state_rain_online", current_state_rain_online)
|
|
self.event_manager.trigger_event("check_rain_online_change", current_state_rain_online, weather_display)
|
|
else:
|
|
self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (non un numero)")
|
|
|
|
self.event_manager.trigger_event("check_rain_online_after", current_state_rain_online, weather_json)
|
|
|
|
def check_rain_sensor(self):
|
|
"""
|
|
Controlla se piove tramite sensore hardware.
|
|
"""
|
|
if not self.rain_gpio:
|
|
self.log_write("rain", "warning", "Sensore pioggia non presente")
|
|
return
|
|
|
|
self.event_manager.trigger_event("check_rain_sensor_before", "")
|
|
|
|
current_state_rain_sensor = ""
|
|
last_state_rain_sensor = self._read_status_file("last_state_rain_sensor", default="norain")
|
|
|
|
s_str = self.driver_manager.drv_rain_sensor_get(self.rain_gpio)
|
|
s = int(s_str) if s_str and s_str.isdigit() else -1 # Converte in int, default a -1 se non valido
|
|
|
|
if s == self.rain_gpio_state: # Confronta con lo stato configurato per la pioggia
|
|
current_state_rain_sensor = 'rain'
|
|
local_epoch = int(time.time())
|
|
self._write_status_file("last_rain_sensor", local_epoch)
|
|
self.log_write("rain", "info", f"check_rain_sensor - ora sta piovendo ({local_epoch})")
|
|
else:
|
|
current_state_rain_sensor = 'norain'
|
|
self.log_write("rain", "info", "check_rain_sensor - ora non sta piovendo")
|
|
|
|
if current_state_rain_sensor != last_state_rain_sensor:
|
|
self._write_status_file("last_state_rain_sensor", current_state_rain_sensor)
|
|
self.event_manager.trigger_event("check_rain_sensor_change", current_state_rain_sensor)
|
|
|
|
self.event_manager.trigger_event("check_rain_sensor_after", current_state_rain_sensor)
|
|
|
|
def close_all_for_rain(self):
|
|
"""
|
|
Chiude tutte le elettrovalvole se sta piovendo o se hanno raggiunto l'umidità massima.
|
|
"""
|
|
# Chiude le elettrovalvole che hanno raggiunto l'umidità del terreno impostata
|
|
for i in range(1, self.ev_total + 1):
|
|
alias = self.config.get(f"EV{i}_ALIAS")
|
|
if not alias: continue # Salta se l'alias non è definito
|
|
|
|
state = self.pigarden_core.ev_status(alias)
|
|
moisture = self.pigarden_core.ev_check_moisture_autoclose(i)
|
|
|
|
# Se l'elettrovalvola è aperta (stato 1) e l'umidità massima è stata raggiunta (moisture > 0)
|
|
if state == 1 and moisture > 0:
|
|
self.pigarden_core.ev_close(alias)
|
|
self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' perché l'umidità massima del terreno è stata raggiunta")
|
|
|
|
# Chiude le elettrovalvole in caso di pioggia (online o sensore)
|
|
close_all_flag = False
|
|
now = int(time.time())
|
|
|
|
# Controllo pioggia online
|
|
if self.not_irrigate_if_rain_online > 0:
|
|
last_rain_online_str = self._read_status_file("last_rain_online", default="0")
|
|
try:
|
|
last_rain_online = int(last_rain_online_str)
|
|
if now - last_rain_online < self.not_irrigate_if_rain_online:
|
|
close_all_flag = True
|
|
except ValueError:
|
|
pass # Ignora se il timestamp non è un numero
|
|
|
|
# Controllo pioggia sensore
|
|
if self.not_irrigate_if_rain_sensor > 0:
|
|
last_rain_sensor_str = self._read_status_file("last_rain_sensor", default="0")
|
|
try:
|
|
last_rain_sensor = int(last_rain_sensor_str)
|
|
if now - last_rain_sensor < self.not_irrigate_if_rain_sensor:
|
|
close_all_flag = True
|
|
except ValueError:
|
|
pass # Ignora se il timestamp non è un numero
|
|
|
|
if close_all_flag:
|
|
# Piove: valuta se chiudere le elettrovalvole
|
|
for i in range(1, self.ev_total + 1):
|
|
alias = self.config.get(f"EV{i}_ALIAS")
|
|
if not alias: continue
|
|
|
|
state = self.pigarden_core.ev_status(alias)
|
|
ev_norain = self.pigarden_core.ev_number2norain(i) # True se non deve chiudere per pioggia
|
|
moisture = self.pigarden_core.ev_check_moisture(i) # 0 se non ha raggiunto l'umidità ottimale
|
|
|
|
# Se l'elettrovalvola è aperta (stato 1), NON è impostata per ignorare la pioggia (ev_norain è False),
|
|
# E l'umidità non è ancora ottimale (moisture è 0 o non ha raggiunto il max)
|
|
# La logica Bash `[ "$moisture" -ne 0 ]` significa "se l'umidità NON è zero",
|
|
# che nel contesto di `ev_check_moisture` (che ritorna 0 per "non raggiunta", >0 per "raggiunta")
|
|
# significherebbe "se l'umidità è stata raggiunta".
|
|
# Tuttavia, il commento nello script Bash `if [ $moisture -gt 0 ]; then message_write "warning" "solenoid not open because maximum soil moisture has been reached"`
|
|
# suggerisce che >0 significa "umidità massima raggiunta".
|
|
# Quindi, per chiudere per pioggia, l'umidità NON deve essere già al massimo.
|
|
# Se `ev_check_moisture` ritorna 0 per "non raggiunto" e >0 per "raggiunto",
|
|
# allora `moisture == 0` significa "ha ancora bisogno d'acqua".
|
|
# Quindi, la condizione per chiudere per pioggia dovrebbe essere:
|
|
# `state == 1` (aperta) AND `not ev_norain` (non ignora pioggia) AND `moisture == 0` (ha ancora bisogno d'acqua)
|
|
# Il Bash `[ "$moisture" -ne 0 ]` nella seconda loop di close_all_for_rain è contro-intuitivo
|
|
# se `ev_check_moisture` ritorna >0 per "raggiunto".
|
|
# Assumo che `moisture -ne 0` in quel contesto significhi "se l'umidità non è perfetta, chiudi".
|
|
# Se `ev_check_moisture` ritorna 0 per "umidità OK/raggiunta" e 1 per "non OK", allora -ne 0 ha senso.
|
|
# Basandomi sulla funzione `ev_open` che usa `moisture -gt 0` per bloccare l'apertura (umidità già alta),
|
|
# `moisture -ne 0` qui dovrebbe significare "se l'umidità non è ancora ottimale/non è zero".
|
|
# Se 0 significa "umidità OK", allora -ne 0 significa "umidità NON OK".
|
|
# Adotterò la traduzione letterale di `moisture != 0` e lascerò al mock di `ev_check_moisture` di definire il comportamento.
|
|
|
|
if state == 1 and not ev_norain and moisture != 0:
|
|
self.pigarden_core.ev_close(alias)
|
|
self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' per pioggia")
|
|
|
|
def last_rain_sensor_timestamp(self):
|
|
"""Mostra il timestamp dell'ultima pioggia rilevato dal sensore."""
|
|
return self._read_status_file("last_rain_sensor", default="0")
|
|
|
|
def last_rain_online_timestamp(self):
|
|
"""Mostra il timestamp dell'ultima pioggia rilevato dal servizio online."""
|
|
return self._read_status_file("last_rain_online", default="0")
|
|
|
|
def reset_last_rain_sensor_timestamp(self):
|
|
"""Resetta il timestamp dell'ultima pioggia rilevato dal sensore."""
|
|
self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_before", "")
|
|
self._delete_status_file("last_rain_sensor")
|
|
self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_after", "")
|
|
self.log_write("rain", "info", "reset_last_rain_sensor_timestamp")
|
|
|
|
def reset_last_rain_online_timestamp(self):
|
|
"""Resetta il timestamp dell'ultima pioggia rilevato dal servizio online."""
|
|
self.event_manager.trigger_event("reset_last_rain_online_timestamp_before", "")
|
|
self._delete_status_file("last_rain_online")
|
|
# Il Bash script chiama trigger_event due volte con _before, correggo a _after
|
|
self.event_manager.trigger_event("reset_last_rain_online_timestamp_after", "")
|
|
self.log_write("rain", "info", "reset_last_rain_online_timestamp")
|
|
|
|
# --- Esempio di utilizzo ---
|
|
if __name__ == "__main__":
|
|
print("--- Test RainManager ---")
|
|
|
|
# Inizializza le dipendenze mock
|
|
mock_event_manager = MockEventManager()
|
|
mock_driver_manager = MockDriverManager(mock_config_rain)
|
|
mock_pigarden_core = MockPiGarden(mock_config_rain)
|
|
|
|
# Inizializza RainManager
|
|
rain_manager = RainManager(
|
|
config=mock_config_rain,
|
|
log_writer=log_write,
|
|
event_manager=mock_event_manager,
|
|
driver_manager=mock_driver_manager,
|
|
pigarden_core=mock_pigarden_core
|
|
)
|
|
|
|
# --- Test check_rain_online ---
|
|
print("\n--- Test: check_rain_online (potrebbe simulare pioggia o no) ---")
|
|
rain_manager.check_rain_online()
|
|
print(f"Stato pioggia online (file): {rain_manager.last_rain_online_timestamp()}")
|
|
print(f"Stato ultimo rilevamento online (file): {rain_manager._read_status_file('last_state_rain_online')}")
|
|
|
|
# --- Test check_rain_sensor ---
|
|
print("\n--- Test: check_rain_sensor (potrebbe simulare pioggia o no) ---")
|
|
rain_manager.check_rain_sensor()
|
|
print(f"Stato pioggia sensore (file): {rain_manager.last_rain_sensor_timestamp()}")
|
|
print(f"Stato ultimo rilevamento sensore (file): {rain_manager._read_status_file('last_state_rain_sensor')}")
|
|
|
|
# --- Test close_all_for_rain ---
|
|
print("\n--- Test: close_all_for_rain ---")
|
|
# Imposta un'elettrovalvola aperta per il test
|
|
mock_pigarden_core.solenoid_states["Zona_1"] = 1
|
|
mock_pigarden_core.solenoid_states["Zona_3"] = 1
|
|
print(f"Stato iniziale Zona_1: {mock_pigarden_core.ev_status('Zona_1')}")
|
|
print(f"Stato iniziale Zona_3: {mock_pigarden_core.ev_status('Zona_3')}")
|
|
|
|
# Simula una pioggia recente per attivare la chiusura
|
|
rain_manager._write_status_file("last_rain_online", int(time.time()) - 100) # 100 secondi fa
|
|
rain_manager._write_status_file("last_rain_sensor", int(time.time()) - 50) # 50 secondi fa
|
|
|
|
rain_manager.close_all_for_rain()
|
|
print(f"Stato finale Zona_1: {mock_pigarden_core.ev_status('Zona_1')} (dovrebbe essere 0 se autoclose è attivo o piove)")
|
|
print(f"Stato finale Zona_3: {mock_pigarden_core.ev_status('Zona_3')} (dovrebbe essere 0 se piove)")
|
|
|
|
# --- Test reset timestamp ---
|
|
print("\n--- Test: reset_last_rain_sensor_timestamp ---")
|
|
rain_manager.reset_last_rain_sensor_timestamp()
|
|
print(f"Timestamp sensore dopo reset: {rain_manager.last_rain_sensor_timestamp()}")
|
|
|
|
print("\n--- Test: reset_last_rain_online_timestamp ---")
|
|
rain_manager.reset_last_rain_online_timestamp()
|
|
print(f"Timestamp online dopo reset: {rain_manager.last_rain_online_timestamp()}")
|
|
|
|
print("\n--- Test completato ---")
|
|
# Pulisci le directory di test (opzionale)
|
|
# import shutil
|
|
# shutil.rmtree(mock_config_rain["STATUS_DIR"])
|