import os import time import json import logging # --- Mock/Placeholder per le dipendenze esterne e configurazione --- # In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale. # Configura un logger di base per le funzioni di log logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def log_write(log_type, level, message): """Simula la funzione log_write dal tuo script Bash.""" if level == "info": logging.info(f"[{log_type}] {message}") elif level == "warning": logging.warning(f"[{log_type}] {message}") elif level == "error": logging.error(f"[{log_type}] {message}") else: logging.debug(f"[{log_type}] {message}") # Mock per la classe EventManager (dal file event_manager_py) class MockEventManager: def __init__(self): self.CURRENT_EVENT = "" self.CURRENT_EVENT_ALIAS = "" def trigger_event(self, event, *args): log_write("event", "info", f"Mock EventManager: Triggered event: {event} with args: {args}") self.CURRENT_EVENT = event # Simula un codice di ritorno di successo return 0 # Mock per la classe DriverManager (dal file driver_manager_py) class MockDriverManager: def __init__(self, config): self.config = config def drv_rain_online_get(self, service_id): """Simula il recupero dello stato pioggia online.""" # Restituisce un timestamp se piove, un valore negativo se non piove, o 0 in caso di errore # Per i test, simuliamo che non piova (-1) o che piova (timestamp attuale) if "openweathermap" in service_id: # Simula pioggia 50% delle volte if time.time() % 2 == 0: # Scrivi un mock di dati meteo online per il test mock_weather_data = { "weather": [{"description": "light rain"}], "main": {"temp": 280, "humidity": 90} } with open(os.path.join(self.config["STATUS_DIR"], "last_weather_online"), "w") as f: json.dump(mock_weather_data, f) return str(int(time.time())) # Simula pioggia (timestamp) else: return "-1" # Simula non pioggia return "0" # Errore o servizio non riconosciuto def drv_rain_sensor_get(self, gpio_id): """Simula il recupero dello stato dal sensore pioggia.""" # Restituisce lo stato del GPIO (0 o 1) # Per i test, simuliamo lo stato del sensore if time.time() % 3 == 0: return str(self.config.get("RAIN_GPIO_STATE", 0)) # Simula pioggia return str(1 - self.config.get("RAIN_GPIO_STATE", 0)) # Simula non pioggia # Mock per la classe PiGarden (per le dipendenze ev_*) class MockPiGarden: def __init__(self, config): self.config = config self.solenoid_states = {} # Mappa alias a stato (0=chiuso, 1=aperto, 2=forzato) # Inizializza stati fittizi for i in range(1, self.config.get("EV_TOTAL", 0) + 1): self.solenoid_states[str(i)] = 0 # Tutti chiusi di default def ev_status(self, alias): """Simula ev_status, restituisce lo stato dell'elettrovalvola.""" return self.solenoid_states.get(alias, 0) # 0 se non trovato o chiuso def ev_close(self, alias): """Simula ev_close, imposta lo stato dell'elettrovalvola a chiuso.""" self.solenoid_states[alias] = 0 log_write("irrigate", "info", f"MockPiGarden: Elettrovalvola '{alias}' chiusa.") def ev_check_moisture(self, ev_num): """ Simula ev_check_moisture. Ritorna 0 se l'umidità non è stata raggiunta (bisogno d'acqua), >0 se l'umidità massima è stata raggiunta. """ # Per i test, simuliamo che l'umidità sia raggiunta per EV1, altrimenti no if ev_num == 1: return 1 # Umidità raggiunta return 0 # Umidità non raggiunta def ev_check_moisture_autoclose(self, ev_num): """ Simula ev_check_moisture_autoclose. Ritorna 0 se non deve chiudere automaticamente, >0 se sì. """ # Per i test, simuliamo che l'autochiusura sia attiva per EV1 e l'umidità sia raggiunta if self.config.get(f"EV{ev_num}_SENSOR_MOISTURE_AUTOCLOSE", "0") == "1": return self.ev_check_moisture(ev_num) # Usa la logica di check_moisture return 0 def ev_number2norain(self, ev_num): """Simula ev_number2norain.""" return self.config.get(f"EV{ev_num}_NORAIN", "0") == "1" # Variabili di configurazione simulate (dal piGarden.conf) mock_config_rain = { "WEATHER_SERVICE": "drv:openweathermap", # Esempio di driver di servizio meteo "RAIN_GPIO": 25, "RAIN_GPIO_STATE": 0, # Stato del GPIO che indica pioggia "NOT_IRRIGATE_IF_RAIN_ONLINE": 86400, # 24 ore in secondi "NOT_IRRIGATE_IF_RAIN_SENSOR": 86400, # 24 ore in secondi "EV_TOTAL": 6, "STATUS_DIR": "/tmp/piGarden_status", # Directory per i file di stato # Elettrovalvole di esempio per close_all_for_rain "EV1_ALIAS": "Zona_1", "EV1_NORAIN": "0", "EV1_SENSOR_MOISTURE_AUTOCLOSE": "1", "EV2_ALIAS": "Zona_2", "EV2_NORAIN": "1", # Questa zona non si chiude per pioggia "EV3_ALIAS": "Zona_3", "EV3_NORAIN": "0", "EV4_ALIAS": "Zona_4", "EV4_NORAIN": "0", "EV5_ALIAS": "Zona_5", "EV5_NORAIN": "0", "EV6_ALIAS": "Zona_6", "EV6_NORAIN": "0", } # Assicurati che la directory di stato esista per i test os.makedirs(mock_config_rain["STATUS_DIR"], exist_ok=True) # --- Classe RainManager --- class RainManager: def __init__(self, config, log_writer, event_manager, driver_manager, pigarden_core): self.config = config self.log_write = log_writer self.event_manager = event_manager self.driver_manager = driver_manager self.pigarden_core = pigarden_core # Istanza della classe PiGarden principale self.status_dir = self.config.get("STATUS_DIR") self.weather_service = self.config.get("WEATHER_SERVICE") self.rain_gpio = self.config.get("RAIN_GPIO") self.rain_gpio_state = self.config.get("RAIN_GPIO_STATE") self.not_irrigate_if_rain_online = self.config.get("NOT_IRRIGATE_IF_RAIN_ONLINE") self.not_irrigate_if_rain_sensor = self.config.get("NOT_IRRIGATE_IF_RAIN_SENSOR") self.ev_total = self.config.get("EV_TOTAL") # Inizializza i file di stato se non esistono self._init_status_files() def _init_status_files(self): """Assicura che i file di stato esistano per evitare errori FileNotFoundError.""" for filename in ["last_state_rain_online", "last_rain_online", "last_state_rain_sensor", "last_rain_sensor", "last_weather_online"]: file_path = os.path.join(self.status_dir, filename) if not os.path.exists(file_path): # Crea il file vuoto o con un valore di default appropriato with open(file_path, 'w') as f: if filename.startswith("last_rain_"): f.write("0") # Timestamp 0 elif filename.startswith("last_state_rain_"): f.write("unknown") elif filename == "last_weather_online": f.write("{}") # JSON vuoto def _read_status_file(self, filename, default=""): """Legge il contenuto di un file di stato.""" file_path = os.path.join(self.status_dir, filename) try: with open(file_path, 'r') as f: content = f.read().strip() return content if content else default except FileNotFoundError: return default except Exception as e: self.log_write("rain", "error", f"Errore lettura {filename}: {e}") return default def _write_status_file(self, filename, content): """Scrive il contenuto in un file di stato.""" file_path = os.path.join(self.status_dir, filename) try: with open(file_path, 'w') as f: f.write(str(content)) except Exception as e: self.log_write("rain", "error", f"Errore scrittura {filename}: {e}") def _delete_status_file(self, filename): """Elimina un file di stato.""" file_path = os.path.join(self.status_dir, filename) try: if os.path.exists(file_path): os.remove(file_path) except Exception as e: self.log_write("rain", "error", f"Errore eliminazione {filename}: {e}") def check_rain_online(self): """ Esegue il controllo meteo tramite il servizio online configurato. """ if self.weather_service == "none": self.log_write("rain", "warning", "check_rain_online - servizio online disabilitato") return self.event_manager.trigger_event("check_rain_online_before", "") local_epoch_str = self.driver_manager.drv_rain_online_get(self.weather_service) current_state_rain_online = "" last_state_rain_online = self._read_status_file("last_state_rain_online", default="norain") weather_json = "{}" # Default a JSON vuoto if local_epoch_str and local_epoch_str.lstrip('-').isdigit(): # Controlla se è un numero (anche negativo) local_epoch = int(local_epoch_str) if local_epoch == 0: self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (valore 0)") else: if local_epoch > 0: current_state_rain_online = 'rain' self._write_status_file("last_rain_online", local_epoch) else: current_state_rain_online = 'norain' # Leggi il JSON meteo, se esiste e valido weather_data_str = self._read_status_file("last_weather_online", default="{}") try: weather_json = json.loads(weather_data_str) # Estrai solo la parte "weather" se presente, altrimenti l'intero JSON weather_display = json.dumps(weather_json.get("weather", weather_json)) except json.JSONDecodeError: weather_display = "null" # Se il file non è un JSON valido self.log_write("rain", "info", f"check_rain_online - weather={weather_display}, local_epoch={local_epoch}") if current_state_rain_online != last_state_rain_online: self._write_status_file("last_state_rain_online", current_state_rain_online) self.event_manager.trigger_event("check_rain_online_change", current_state_rain_online, weather_display) else: self.log_write("rain", "error", "check_rain_online - fallita lettura dati online (non un numero)") self.event_manager.trigger_event("check_rain_online_after", current_state_rain_online, weather_json) def check_rain_sensor(self): """ Controlla se piove tramite sensore hardware. """ if not self.rain_gpio: self.log_write("rain", "warning", "Sensore pioggia non presente") return self.event_manager.trigger_event("check_rain_sensor_before", "") current_state_rain_sensor = "" last_state_rain_sensor = self._read_status_file("last_state_rain_sensor", default="norain") s_str = self.driver_manager.drv_rain_sensor_get(self.rain_gpio) s = int(s_str) if s_str and s_str.isdigit() else -1 # Converte in int, default a -1 se non valido if s == self.rain_gpio_state: # Confronta con lo stato configurato per la pioggia current_state_rain_sensor = 'rain' local_epoch = int(time.time()) self._write_status_file("last_rain_sensor", local_epoch) self.log_write("rain", "info", f"check_rain_sensor - ora sta piovendo ({local_epoch})") else: current_state_rain_sensor = 'norain' self.log_write("rain", "info", "check_rain_sensor - ora non sta piovendo") if current_state_rain_sensor != last_state_rain_sensor: self._write_status_file("last_state_rain_sensor", current_state_rain_sensor) self.event_manager.trigger_event("check_rain_sensor_change", current_state_rain_sensor) self.event_manager.trigger_event("check_rain_sensor_after", current_state_rain_sensor) def close_all_for_rain(self): """ Chiude tutte le elettrovalvole se sta piovendo o se hanno raggiunto l'umidità massima. """ # Chiude le elettrovalvole che hanno raggiunto l'umidità del terreno impostata for i in range(1, self.ev_total + 1): alias = self.config.get(f"EV{i}_ALIAS") if not alias: continue # Salta se l'alias non è definito state = self.pigarden_core.ev_status(alias) moisture = self.pigarden_core.ev_check_moisture_autoclose(i) # Se l'elettrovalvola è aperta (stato 1) e l'umidità massima è stata raggiunta (moisture > 0) if state == 1 and moisture > 0: self.pigarden_core.ev_close(alias) self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' perché l'umidità massima del terreno è stata raggiunta") # Chiude le elettrovalvole in caso di pioggia (online o sensore) close_all_flag = False now = int(time.time()) # Controllo pioggia online if self.not_irrigate_if_rain_online > 0: last_rain_online_str = self._read_status_file("last_rain_online", default="0") try: last_rain_online = int(last_rain_online_str) if now - last_rain_online < self.not_irrigate_if_rain_online: close_all_flag = True except ValueError: pass # Ignora se il timestamp non è un numero # Controllo pioggia sensore if self.not_irrigate_if_rain_sensor > 0: last_rain_sensor_str = self._read_status_file("last_rain_sensor", default="0") try: last_rain_sensor = int(last_rain_sensor_str) if now - last_rain_sensor < self.not_irrigate_if_rain_sensor: close_all_flag = True except ValueError: pass # Ignora se il timestamp non è un numero if close_all_flag: # Piove: valuta se chiudere le elettrovalvole for i in range(1, self.ev_total + 1): alias = self.config.get(f"EV{i}_ALIAS") if not alias: continue state = self.pigarden_core.ev_status(alias) ev_norain = self.pigarden_core.ev_number2norain(i) # True se non deve chiudere per pioggia moisture = self.pigarden_core.ev_check_moisture(i) # 0 se non ha raggiunto l'umidità ottimale # Se l'elettrovalvola è aperta (stato 1), NON è impostata per ignorare la pioggia (ev_norain è False), # E l'umidità non è ancora ottimale (moisture è 0 o non ha raggiunto il max) # La logica Bash `[ "$moisture" -ne 0 ]` significa "se l'umidità NON è zero", # che nel contesto di `ev_check_moisture` (che ritorna 0 per "non raggiunta", >0 per "raggiunta") # significherebbe "se l'umidità è stata raggiunta". # Tuttavia, il commento nello script Bash `if [ $moisture -gt 0 ]; then message_write "warning" "solenoid not open because maximum soil moisture has been reached"` # suggerisce che >0 significa "umidità massima raggiunta". # Quindi, per chiudere per pioggia, l'umidità NON deve essere già al massimo. # Se `ev_check_moisture` ritorna 0 per "non raggiunto" e >0 per "raggiunto", # allora `moisture == 0` significa "ha ancora bisogno d'acqua". # Quindi, la condizione per chiudere per pioggia dovrebbe essere: # `state == 1` (aperta) AND `not ev_norain` (non ignora pioggia) AND `moisture == 0` (ha ancora bisogno d'acqua) # Il Bash `[ "$moisture" -ne 0 ]` nella seconda loop di close_all_for_rain è contro-intuitivo # se `ev_check_moisture` ritorna >0 per "raggiunto". # Assumo che `moisture -ne 0` in quel contesto significhi "se l'umidità non è perfetta, chiudi". # Se `ev_check_moisture` ritorna 0 per "umidità OK/raggiunta" e 1 per "non OK", allora -ne 0 ha senso. # Basandomi sulla funzione `ev_open` che usa `moisture -gt 0` per bloccare l'apertura (umidità già alta), # `moisture -ne 0` qui dovrebbe significare "se l'umidità non è ancora ottimale/non è zero". # Se 0 significa "umidità OK", allora -ne 0 significa "umidità NON OK". # Adotterò la traduzione letterale di `moisture != 0` e lascerò al mock di `ev_check_moisture` di definire il comportamento. if state == 1 and not ev_norain and moisture != 0: self.pigarden_core.ev_close(alias) self.log_write("irrigate", "warning", f"close_all_for_rain - Chiusa elettrovalvola '{alias}' per pioggia") def last_rain_sensor_timestamp(self): """Mostra il timestamp dell'ultima pioggia rilevato dal sensore.""" return self._read_status_file("last_rain_sensor", default="0") def last_rain_online_timestamp(self): """Mostra il timestamp dell'ultima pioggia rilevato dal servizio online.""" return self._read_status_file("last_rain_online", default="0") def reset_last_rain_sensor_timestamp(self): """Resetta il timestamp dell'ultima pioggia rilevato dal sensore.""" self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_before", "") self._delete_status_file("last_rain_sensor") self.event_manager.trigger_event("reset_last_rain_sensor_timestamp_after", "") self.log_write("rain", "info", "reset_last_rain_sensor_timestamp") def reset_last_rain_online_timestamp(self): """Resetta il timestamp dell'ultima pioggia rilevato dal servizio online.""" self.event_manager.trigger_event("reset_last_rain_online_timestamp_before", "") self._delete_status_file("last_rain_online") # Il Bash script chiama trigger_event due volte con _before, correggo a _after self.event_manager.trigger_event("reset_last_rain_online_timestamp_after", "") self.log_write("rain", "info", "reset_last_rain_online_timestamp") # --- Esempio di utilizzo --- if __name__ == "__main__": print("--- Test RainManager ---") # Inizializza le dipendenze mock mock_event_manager = MockEventManager() mock_driver_manager = MockDriverManager(mock_config_rain) mock_pigarden_core = MockPiGarden(mock_config_rain) # Inizializza RainManager rain_manager = RainManager( config=mock_config_rain, log_writer=log_write, event_manager=mock_event_manager, driver_manager=mock_driver_manager, pigarden_core=mock_pigarden_core ) # --- Test check_rain_online --- print("\n--- Test: check_rain_online (potrebbe simulare pioggia o no) ---") rain_manager.check_rain_online() print(f"Stato pioggia online (file): {rain_manager.last_rain_online_timestamp()}") print(f"Stato ultimo rilevamento online (file): {rain_manager._read_status_file('last_state_rain_online')}") # --- Test check_rain_sensor --- print("\n--- Test: check_rain_sensor (potrebbe simulare pioggia o no) ---") rain_manager.check_rain_sensor() print(f"Stato pioggia sensore (file): {rain_manager.last_rain_sensor_timestamp()}") print(f"Stato ultimo rilevamento sensore (file): {rain_manager._read_status_file('last_state_rain_sensor')}") # --- Test close_all_for_rain --- print("\n--- Test: close_all_for_rain ---") # Imposta un'elettrovalvola aperta per il test mock_pigarden_core.solenoid_states["Zona_1"] = 1 mock_pigarden_core.solenoid_states["Zona_3"] = 1 print(f"Stato iniziale Zona_1: {mock_pigarden_core.ev_status('Zona_1')}") print(f"Stato iniziale Zona_3: {mock_pigarden_core.ev_status('Zona_3')}") # Simula una pioggia recente per attivare la chiusura rain_manager._write_status_file("last_rain_online", int(time.time()) - 100) # 100 secondi fa rain_manager._write_status_file("last_rain_sensor", int(time.time()) - 50) # 50 secondi fa rain_manager.close_all_for_rain() print(f"Stato finale Zona_1: {mock_pigarden_core.ev_status('Zona_1')} (dovrebbe essere 0 se autoclose è attivo o piove)") print(f"Stato finale Zona_3: {mock_pigarden_core.ev_status('Zona_3')} (dovrebbe essere 0 se piove)") # --- Test reset timestamp --- print("\n--- Test: reset_last_rain_sensor_timestamp ---") rain_manager.reset_last_rain_sensor_timestamp() print(f"Timestamp sensore dopo reset: {rain_manager.last_rain_sensor_timestamp()}") print("\n--- Test: reset_last_rain_online_timestamp ---") rain_manager.reset_last_rain_online_timestamp() print(f"Timestamp online dopo reset: {rain_manager.last_rain_online_timestamp()}") print("\n--- Test completato ---") # Pulisci le directory di test (opzionale) # import shutil # shutil.rmtree(mock_config_rain["STATUS_DIR"])