From a8800280f38e93d841fa99b3973c2d491538ffd2 Mon Sep 17 00:00:00 2001 From: roberto Date: Wed, 9 Jul 2025 18:57:32 +0200 Subject: [PATCH] sensor_include.py --- Python/sensor_include.py | 432 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 432 insertions(+) create mode 100644 Python/sensor_include.py diff --git a/Python/sensor_include.py b/Python/sensor_include.py new file mode 100644 index 0000000..43e028f --- /dev/null +++ b/Python/sensor_include.py @@ -0,0 +1,432 @@ +import os +import time +import json +import logging + +# --- Mock/Placeholder per le dipendenze esterne e configurazione --- +# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale. + +# Configura un logger di base per le funzioni di log +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def log_write(log_type, level, message): + """Simula la funzione log_write dal tuo script Bash.""" + if level == "info": + logging.info(f"[{log_type}] {message}") + elif level == "warning": + logging.warning(f"[{log_type}] {message}") + elif level == "error": + logging.error(f"[{log_type}] {message}") + else: + logging.debug(f"[{log_type}] {message}") + +# Mock per la classe EventManager (dal file event_manager_py) +class MockEventManager: + def __init__(self): + self.CURRENT_EVENT = "" + self.CURRENT_EVENT_ALIAS = "" + + def trigger_event(self, event, *args): + log_write("event", "info", f"Mock EventManager: Triggered event: {event} with args: {args}") + self.CURRENT_EVENT = event + # Simula un codice di ritorno di successo + return 0 + +# Mock per la classe DriverManager (dal file driver_manager_py) +class MockDriverManager: + def __init__(self, config): + self.config = config + + def drv_rain_online_get(self, service_id): + """Simula il recupero dello stato pioggia online.""" + # Restituisce un timestamp se piove, un valore negativo se non piove, o 0 in caso di errore + # Per i test, simuliamo che non piova (-1) o che piova (timestamp attuale) + if "openweathermap" in service_id: + # Simula pioggia 50% delle volte + if time.time() % 2 == 0: + # Scrivi un mock di dati meteo online per il test + mock_weather_data = { + "weather": [{"description": "light rain"}], + "main": {"temp": 280, "humidity": 90} + } + with open(os.path.join(self.config["STATUS_DIR"], "last_weather_online"), "w") as f: + json.dump(mock_weather_data, f) + return str(int(time.time())) # Simula pioggia (timestamp) + else: + return "-1" # Simula non pioggia + return "0" # Errore o servizio non riconosciuto + + def drv_rain_sensor_get(self, gpio_id): + """Simula il recupero dello stato dal sensore pioggia.""" + # Restituisce lo stato del GPIO (0 o 1) + # Per i test, simuliamo lo stato del sensore + if time.time() % 3 == 0: + return str(self.config.get("RAIN_GPIO_STATE", 0)) # Simula pioggia + return str(1 - self.config.get("RAIN_GPIO_STATE", 0)) # Simula non pioggia + +# Mock per la classe PiGarden (per le dipendenze ev_*) +class MockPiGarden: + def __init__(self, config): + self.config = config + self.solenoid_states = {} # Mappa alias a stato (0=chiuso, 1=aperto, 2=forzato) + # Inizializza stati fittizi + for i in range(1, self.config.get("EV_TOTAL", 0) + 1): + self.solenoid_states[str(i)] = 0 # Tutti chiusi di default + + def ev_status(self, alias): + """Simula ev_status, restituisce lo stato dell'elettrovalvola.""" + return self.solenoid_states.get(alias, 0) # 0 se non trovato o chiuso + + def ev_close(self, alias): + """Simula ev_close, imposta lo stato dell'elettrovalvola a chiuso.""" + self.solenoid_states[alias] = 0 + log_write("irrigate", "info", f"MockPiGarden: Elettrovalvola '{alias}' chiusa.") + + def ev_check_moisture(self, ev_num): + """ + Simula ev_check_moisture. + Ritorna 0 se l'umidità non è stata raggiunta (bisogno d'acqua), + >0 se l'umidità massima è stata raggiunta. + """ + # Per i test, simuliamo che l'umidità sia raggiunta per EV1, altrimenti no + if ev_num == 1: + return 1 # Umidità raggiunta + return 0 # Umidità non raggiunta + + def ev_check_moisture_autoclose(self, ev_num): + """ + Simula ev_check_moisture_autoclose. + Ritorna 0 se non deve chiudere automaticamente, >0 se sì. + """ + # Per i test, simuliamo che l'autochiusura sia attiva per EV1 e l'umidità sia raggiunta + if self.config.get(f"EV{ev_num}_SENSOR_MOISTURE_AUTOCLOSE", "0") == "1": + return self.ev_check_moisture(ev_num) # Usa la logica di check_moisture + return 0 + + def ev_number2norain(self, ev_num): + """Simula ev_number2norain.""" + return self.config.get(f"EV{ev_num}_NORAIN", "0") == "1" + +# Variabili di configurazione simulate (dal piGarden.conf) +mock_config_rain = { + "WEATHER_SERVICE": "drv:openweathermap", # Esempio di driver di servizio meteo + "RAIN_GPIO": 25, + "RAIN_GPIO_STATE": 0, # Stato del GPIO che indica pioggia + "NOT_IRRIGATE_IF_RAIN_ONLINE": 86400, # 24 ore in secondi + "NOT_IRRIGATE_IF_RAIN_SENSOR": 86400, # 24 ore in secondi + "EV_TOTAL": 6, + "STATUS_DIR": "/tmp/piGarden_status", # Directory per i file di stato + # Elettrovalvole di esempio per close_all_for_rain + "EV1_ALIAS": "Zona_1", "EV1_NORAIN": "0", "EV1_SENSOR_MOISTURE_AUTOCLOSE": "1", + "EV2_ALIAS": "Zona_2", "EV2_NORAIN": "1", # Questa zona non si chiude per pioggia + "EV3_ALIAS": "Zona_3", "EV3_NORAIN": "0", + "EV4_ALIAS": "Zona_4", "EV4_NORAIN": "0", + "EV5_ALIAS": "Zona_5", "EV5_NORAIN": "0", + "EV6_ALIAS": "Zona_6", "EV6_NORAIN": "0", +} + +# Assicurati che la directory di stato esista per i test +os.makedirs(mock_config_rain["STATUS_DIR"], exist_ok=True) + +# --- Classe RainManager --- + +class RainManager: + def __init__(self, config, log_writer, event_manager, driver_manager, pigarden_core): + self.config = config + self.log_write = log_writer + self.event_manager = event_manager + self.driver_manager = driver_manager + self.pigarden_core = pigarden_core # Istanza della classe PiGarden principale + + self.status_dir = self.config.get("STATUS_DIR") + self.weather_service = self.config.get("WEATHER_SERVICE") + self.rain_gpio = self.config.get("RAIN_GPIO") + self.rain_gpio_state = self.config.get("RAIN_GPIO_STATE") + self.not_irrigate_if_rain_online = self.config.get("NOT_IRRIGATE_IF_RAIN_ONLINE") + self.not_irrigate_if_rain_sensor = self.config.get("NOT_IRRIGATE_IF_RAIN_SENSOR") + self.ev_total = self.config.get("EV_TOTAL") + + # Inizializza i file di stato se non esistono + self._init_status_files() + + def _init_status_files(self): + """Assicura che i file di stato esistano per evitare errori FileNotFoundError.""" + for filename in ["last_state_rain_online", "last_rain_online", + "last_state_rain_sensor", "last_rain_sensor", + "last_weather_online"]: + file_path = os.path.join(self.status_dir, filename) + if not os.path.exists(file_path): + # Crea il file vuoto o con un valore di default appropriato + with open(file_path, 'w') as f: + if filename.startswith("last_rain_"): + f.write("0") # Timestamp 0 + elif filename.startswith("last_state_rain_"): + f.write("unknown") + elif filename == "last_weather_online": + f.write("{}") # JSON vuoto + + def _read_status_file(self, filename, default=""): + """Legge il contenuto di un file di stato.""" + file_path = os.path.join(self.status_dir, filename) + try: + with open(file_path, 'r') as f: + content = f.read().strip() + return content if content else default + except FileNotFoundError: + return default + except Exception as e: + self.log_write("rain", "error", f"Errore lettura {filename}: {e}") + return default + + def _write_status_file(self, filename, content): + """Scrive il contenuto in un file di stato.""" + file_path = os.path.join(self.status_dir, filename) + try: + with open(file_path, 'w') as f: + f.write(str(content)) + except Exception as e: + self.log_write("rain", "error", f"Errore scrittura {filename}: {e}") + + def _delete_status_file(self, filename): + """Elimina un file di stato.""" + file_path = os.path.join(self.status_dir, filename) + try: + if os.path.exists(file_path): + os.remove(file_path) + except Exception as e: + self.log_write("rain", "error", f"Errore eliminazione {filename}: {e}") + + def check_rain_online(self): + """ + Esegue il controllo meteo tramite il servizio online configurato. + """ + if self.weather_service == "none": + self.log_write("rain", "warning", "check_rain_online - servizio online disabilitato") + return + + self.event_manager.trigger_event("check_rain_online_before", "") + + local_epoch_str = self.driver_manager.drv_rain_online_get(self.weather_service) + current_state_rain_online = "" + last_state_rain_online = self._read_status_file("last_state_rain_online", default="norain") + weather_json = "{}" # Default a JSON vuoto + + if local_epoch_str and local_epoch_str.lstrip('-').isdigit(): # Controlla se è un numero (anche negativo) + local_epoch = int(local_epoch_str) + if local_epoch == 0: + self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (valore 0)") + else: + if local_epoch > 0: + current_state_rain_online = 'rain' + self._write_status_file("last_rain_online", local_epoch) + else: + current_state_rain_online = 'norain' + + # Leggi il JSON meteo, se esiste e valido + weather_data_str = self._read_status_file("last_weather_online", default="{}") + try: + weather_json = json.loads(weather_data_str) + # Estrai solo la parte "weather" se presente, altrimenti l'intero JSON + weather_display = json.dumps(weather_json.get("weather", weather_json)) + except json.JSONDecodeError: + weather_display = "null" # Se il file non è un JSON valido + + self.log_write("rain", "info", f"check_rain_online - weather={weather_display}, local_epoch={local_epoch}") + + if current_state_rain_online != last_state_rain_online: + self._write_status_file("last_state_rain_online", current_state_rain_online) + self.event_manager.trigger_event("check_rain_online_change", current_state_rain_online, weather_display) + else: + self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (non un numero)") + + self.event_manager.trigger_event("check_rain_online_after", current_state_rain_online, weather_json) + + def check_rain_sensor(self): + """ + Controlla se piove tramite sensore hardware. + """ + if not self.rain_gpio: + self.log_write("rain", "warning", "Sensore pioggia non presente") + return + + self.event_manager.trigger_event("check_rain_sensor_before", "") + + current_state_rain_sensor = "" + last_state_rain_sensor = self._read_status_file("last_state_rain_sensor", default="norain") + + s_str = self.driver_manager.drv_rain_sensor_get(self.rain_gpio) + s = int(s_str) if s_str and s_str.isdigit() else -1 # Converte in int, default a -1 se non valido + + if s == self.rain_gpio_state: # Confronta con lo stato configurato per la pioggia + current_state_rain_sensor = 'rain' + local_epoch = int(time.time()) + self._write_status_file("last_rain_sensor", local_epoch) + self.log_write("rain", "info", f"check_rain_sensor - ora sta piovendo ({local_epoch})") + else: + current_state_rain_sensor = 'norain' + self.log_write("rain", "info", "check_rain_sensor - ora non sta piovendo") + + if current_state_rain_sensor != last_state_rain_sensor: + self._write_status_file("last_state_rain_sensor", current_state_rain_sensor) + self.event_manager.trigger_event("check_rain_sensor_change", current_state_rain_sensor) + + self.event_manager.trigger_event("check_rain_sensor_after", current_state_rain_sensor) + + def close_all_for_rain(self): + """ + Chiude tutte le elettrovalvole se sta piovendo o se hanno raggiunto l'umidità massima. + """ + # Chiude le elettrovalvole che hanno raggiunto l'umidità del terreno impostata + for i in range(1, self.ev_total + 1): + alias = self.config.get(f"EV{i}_ALIAS") + if not alias: continue # Salta se l'alias non è definito + + state = self.pigarden_core.ev_status(alias) + moisture = self.pigarden_core.ev_check_moisture_autoclose(i) + + # Se l'elettrovalvola è aperta (stato 1) e l'umidità massima è stata raggiunta (moisture > 0) + if state == 1 and moisture > 0: + self.pigarden_core.ev_close(alias) + self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' perché l'umidità massima del terreno è stata raggiunta") + + # Chiude le elettrovalvole in caso di pioggia (online o sensore) + close_all_flag = False + now = int(time.time()) + + # Controllo pioggia online + if self.not_irrigate_if_rain_online > 0: + last_rain_online_str = self._read_status_file("last_rain_online", default="0") + try: + last_rain_online = int(last_rain_online_str) + if now - last_rain_online < self.not_irrigate_if_rain_online: + close_all_flag = True + except ValueError: + pass # Ignora se il timestamp non è un numero + + # Controllo pioggia sensore + if self.not_irrigate_if_rain_sensor > 0: + last_rain_sensor_str = self._read_status_file("last_rain_sensor", default="0") + try: + last_rain_sensor = int(last_rain_sensor_str) + if now - last_rain_sensor < self.not_irrigate_if_rain_sensor: + close_all_flag = True + except ValueError: + pass # Ignora se il timestamp non è un numero + + if close_all_flag: + # Piove: valuta se chiudere le elettrovalvole + for i in range(1, self.ev_total + 1): + alias = self.config.get(f"EV{i}_ALIAS") + if not alias: continue + + state = self.pigarden_core.ev_status(alias) + ev_norain = self.pigarden_core.ev_number2norain(i) # True se non deve chiudere per pioggia + moisture = self.pigarden_core.ev_check_moisture(i) # 0 se non ha raggiunto l'umidità ottimale + + # Se l'elettrovalvola è aperta (stato 1), NON è impostata per ignorare la pioggia (ev_norain è False), + # E l'umidità non è ancora ottimale (moisture è 0 o non ha raggiunto il max) + # La logica Bash `[ "$moisture" -ne 0 ]` significa "se l'umidità NON è zero", + # che nel contesto di `ev_check_moisture` (che ritorna 0 per "non raggiunta", >0 per "raggiunta") + # significherebbe "se l'umidità è stata raggiunta". + # Tuttavia, il commento nello script Bash `if [ $moisture -gt 0 ]; then message_write "warning" "solenoid not open because maximum soil moisture has been reached"` + # suggerisce che >0 significa "umidità massima raggiunta". + # Quindi, per chiudere per pioggia, l'umidità NON deve essere già al massimo. + # Se `ev_check_moisture` ritorna 0 per "non raggiunto" e >0 per "raggiunto", + # allora `moisture == 0` significa "ha ancora bisogno d'acqua". + # Quindi, la condizione per chiudere per pioggia dovrebbe essere: + # `state == 1` (aperta) AND `not ev_norain` (non ignora pioggia) AND `moisture == 0` (ha ancora bisogno d'acqua) + # Il Bash `[ "$moisture" -ne 0 ]` nella seconda loop di close_all_for_rain è contro-intuitivo + # se `ev_check_moisture` ritorna >0 per "raggiunto". + # Assumo che `moisture -ne 0` in quel contesto significhi "se l'umidità non è perfetta, chiudi". + # Se `ev_check_moisture` ritorna 0 per "umidità OK/raggiunta" e 1 per "non OK", allora -ne 0 ha senso. + # Basandomi sulla funzione `ev_open` che usa `moisture -gt 0` per bloccare l'apertura (umidità già alta), + # `moisture -ne 0` qui dovrebbe significare "se l'umidità non è ancora ottimale/non è zero". + # Se 0 significa "umidità OK", allora -ne 0 significa "umidità NON OK". + # Adotterò la traduzione letterale di `moisture != 0` e lascerò al mock di `ev_check_moisture` di definire il comportamento. + + if state == 1 and not ev_norain and moisture != 0: + self.pigarden_core.ev_close(alias) + self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' per pioggia") + + def last_rain_sensor_timestamp(self): + """Mostra il timestamp dell'ultima pioggia rilevato dal sensore.""" + return self._read_status_file("last_rain_sensor", default="0") + + def last_rain_online_timestamp(self): + """Mostra il timestamp dell'ultima pioggia rilevato dal servizio online.""" + return self._read_status_file("last_rain_online", default="0") + + def reset_last_rain_sensor_timestamp(self): + """Resetta il timestamp dell'ultima pioggia rilevato dal sensore.""" + self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_before", "") + self._delete_status_file("last_rain_sensor") + self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_after", "") + self.log_write("rain", "info", "reset_last_rain_sensor_timestamp") + + def reset_last_rain_online_timestamp(self): + """Resetta il timestamp dell'ultima pioggia rilevato dal servizio online.""" + self.event_manager.trigger_event("reset_last_rain_online_timestamp_before", "") + self._delete_status_file("last_rain_online") + # Il Bash script chiama trigger_event due volte con _before, correggo a _after + self.event_manager.trigger_event("reset_last_rain_online_timestamp_after", "") + self.log_write("rain", "info", "reset_last_rain_online_timestamp") + +# --- Esempio di utilizzo --- +if __name__ == "__main__": + print("--- Test RainManager ---") + + # Inizializza le dipendenze mock + mock_event_manager = MockEventManager() + mock_driver_manager = MockDriverManager(mock_config_rain) + mock_pigarden_core = MockPiGarden(mock_config_rain) + + # Inizializza RainManager + rain_manager = RainManager( + config=mock_config_rain, + log_writer=log_write, + event_manager=mock_event_manager, + driver_manager=mock_driver_manager, + pigarden_core=mock_pigarden_core + ) + + # --- Test check_rain_online --- + print("\n--- Test: check_rain_online (potrebbe simulare pioggia o no) ---") + rain_manager.check_rain_online() + print(f"Stato pioggia online (file): {rain_manager.last_rain_online_timestamp()}") + print(f"Stato ultimo rilevamento online (file): {rain_manager._read_status_file('last_state_rain_online')}") + + # --- Test check_rain_sensor --- + print("\n--- Test: check_rain_sensor (potrebbe simulare pioggia o no) ---") + rain_manager.check_rain_sensor() + print(f"Stato pioggia sensore (file): {rain_manager.last_rain_sensor_timestamp()}") + print(f"Stato ultimo rilevamento sensore (file): {rain_manager._read_status_file('last_state_rain_sensor')}") + + # --- Test close_all_for_rain --- + print("\n--- Test: close_all_for_rain ---") + # Imposta un'elettrovalvola aperta per il test + mock_pigarden_core.solenoid_states["Zona_1"] = 1 + mock_pigarden_core.solenoid_states["Zona_3"] = 1 + print(f"Stato iniziale Zona_1: {mock_pigarden_core.ev_status('Zona_1')}") + print(f"Stato iniziale Zona_3: {mock_pigarden_core.ev_status('Zona_3')}") + + # Simula una pioggia recente per attivare la chiusura + rain_manager._write_status_file("last_rain_online", int(time.time()) - 100) # 100 secondi fa + rain_manager._write_status_file("last_rain_sensor", int(time.time()) - 50) # 50 secondi fa + + rain_manager.close_all_for_rain() + print(f"Stato finale Zona_1: {mock_pigarden_core.ev_status('Zona_1')} (dovrebbe essere 0 se autoclose è attivo o piove)") + print(f"Stato finale Zona_3: {mock_pigarden_core.ev_status('Zona_3')} (dovrebbe essere 0 se piove)") + + # --- Test reset timestamp --- + print("\n--- Test: reset_last_rain_sensor_timestamp ---") + rain_manager.reset_last_rain_sensor_timestamp() + print(f"Timestamp sensore dopo reset: {rain_manager.last_rain_sensor_timestamp()}") + + print("\n--- Test: reset_last_rain_online_timestamp ---") + rain_manager.reset_last_rain_online_timestamp() + print(f"Timestamp online dopo reset: {rain_manager.last_rain_online_timestamp()}") + + print("\n--- Test completato ---") + # Pulisci le directory di test (opzionale) + # import shutil + # shutil.rmtree(mock_config_rain["STATUS_DIR"])