import os import sys import time import subprocess import json import re import hashlib import datetime import shutil # Per rm -f e rmdir ricorsivo import socket # Per il client MQTT (se non usi paho-mqtt) # Per list_descendants try: import psutil except ImportError: print("Warning: psutil not found. Process descendant listing will be limited.", file=sys.stderr) psutil = None # --- CONFIGURAZIONE --- # Questa classe simula il caricamento da /etc/piGarden.conf # Dovrai popolare questi valori con la tua configurazione reale. class Config: def __init__(self): # Versione dello script self.VERSION = 0 self.SUB_VERSION = 6 self.RELEASE_VERSION = 5 # Percorsi e file self.DIR_SCRIPT = os.path.dirname(os.path.abspath(__file__)) self.NAME_SCRIPT = os.path.basename(os.path.abspath(__file__)) self.CONFIG_ETC = "/etc/piGarden.conf" # Il percorso del file di configurazione Bash originale # Percorso temporaneo (come in Bash, preferisce /run/shm) self.TMP_PATH = "/run/shm" if os.path.isdir("/run/shm") else "/tmp" self.TCPSERVER_PID_FILE = os.path.join(self.TMP_PATH, "piGardenTcpServer.pid") self.LOCK_FILE = os.path.join(self.TMP_PATH, "piGarden.dir.lock") self.TMP_CRON_FILE = os.path.join(self.TMP_PATH, f"pigarden.user.cron.{os.getpid()}") # PID dinamico # Log self.LOG_FILE = "/var/log/piGarden.log" # Esempio self.LOG_OUTPUT_DRV_FILE = "/dev/null" # Default, come in Bash self.LOG_FILE_MAX_SIZE = 10 * 1024 * 1024 # 10 MB, esempio self.LOG_URL = "" # URL per inviare i log (se LOG_API_TOKEN è impostato) self.LOG_API_TOKEN = "" self.LOG_CURL_PARAM = "-s -k" # Parametri per curl (es. -s per silenzioso, -k per ignorare SSL) # Stato delle elettrovalvole self.STATUS_DIR = os.path.join(self.TMP_PATH, "pigarden_status") # Directory per gli stati delle EV os.makedirs(self.STATUS_DIR, exist_ok=True) # Assicurati che la directory esista # Elettrovalvole (esempi, devi popolarli con i tuoi dati reali) # EV_TOTAL deve essere impostato in base a quante EV hai self.EV_TOTAL = 2 # Esempio: 2 elettrovalvole self.EV_MONOSTABLE = 0 # 0 per bistabile, 1 per monostabile (globale) # Dettagli per ogni elettrovalvola (simula EVx_ALIAS, EVx_GPIO, EVx_NORAIN, EVx_REMOTE, EVx_MONOSTABLE) self.EV_CONFIG = { 1: {"ALIAS": "Zona_1", "GPIO": 17, "NORAIN": 0, "REMOTE": 0, "MONOSTABLE": 0}, 2: {"ALIAS": "Zona_2", "GPIO": 27, "NORAIN": 1, "REMOTE": 0, "MONOSTABLE": 0}, # Aggiungi altre elettrovalvole qui } # Sensori (esempi) self.SENSOR_TOTAL = 0 # Esempio: 0 sensori self.SENSOR_CONFIG = { # 1: {"ALIAS": "Sensore_Umidita_1", "GPIO": 22, "TYPE": "moisture"}, # Aggiungi altri sensori qui } self.SENSOR_STATE_TYPE = "moisture temperature fertility illuminance" # Tipi di sensori supportati self.RAIN_GPIO = None # GPIO per il sensore di pioggia (es. 23) self.NOT_IRRIGATE_IF_RAIN_ONLINE = 0 # Secondi dopo l'ultima pioggia online per non irrigare self.NOT_IRRIGATE_IF_RAIN_SENSOR = 0 # Secondi dopo l'ultima pioggia sensore per non irrigare # MQTT self.MQTT_ENABLE = 0 # 1 per abilitare MQTT self.MQTT_HOST = "localhost" self.MQTT_PORT = 1883 self.MQTT_USER = "" self.MQTT_PWD = "" self.MQTT_CLIENT_ID = "piGarden" self.MQTT_TOPIC = "piGarden/status" # piGardenSched self.PIGARDENSCHED_PATH = "/usr/local/bin/piGardenSched" # Percorso dell'eseguibile piGardenSched self.PIGARDENSCHED = 0 # 1 se piGardenSched è configurato e eseguibile if os.path.exists(self.PIGARDENSCHED_PATH) and os.access(self.PIGARDENSCHED_PATH, os.X_OK): self.PIGARDENSCHED = 1 # Statistiche di utilizzo self.NO_SEND_IDENTIFIER = 0 # 1 per disabilitare l'invio dell'identificativo # Variabili di stato interne (equivalenti a MESSAGE_INFO, CURRENT_EVENT, ecc. in Bash) self.MESSAGE_INFO = "" self.MESSAGE_WARNING = "" self.MESSAGE_SUCCESS = "" self.CURRENT_EVENT = "" self.CURRENT_EVENT_ALIAS = "" self.PARENT_PID = 0 # Usato per json_status quando chiamato da un processo figlio # Servizio meteo online self.WEATHER_SERVICE = "drv:wunderground" # o "none" o "drv:your_service" # self.WEATHER_API_KEY = "YOUR_API_KEY" # Chiave API per il servizio meteo # self.WEATHER_LOCATION = "YOUR_LOCATION" # Località per il servizio meteo # Inizializza la configurazione globale config = Config() # --- FUNZIONI UTILITY GLOBALI --- def log_write(source, level, message): """ Scrive un messaggio nel file di log e gestisce la rotazione. Invia anche il log a piGardenWeb se configurato. """ if not os.path.exists(config.LOG_FILE): try: os.makedirs(os.path.dirname(config.LOG_FILE), exist_ok=True) with open(config.LOG_FILE, 'a'): # Crea il file se non esiste pass except OSError as e: print(f"Error creating log directory/file: {e}", file=sys.stderr) return # Gestione rotazione log try: if os.path.exists(config.LOG_FILE): actual_size = os.path.getsize(config.LOG_FILE) if actual_size >= config.LOG_FILE_MAX_SIZE: timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") shutil.move(config.LOG_FILE, f"{config.LOG_FILE}.{timestamp}") # Non comprimiamo con gzip qui, lo script Bash lo faceva dopo lo spostamento # Se vuoi gzip, puoi aggiungere: subprocess.run(["gzip", f"{config.LOG_FILE}.{timestamp}"]) log_write("general", "info", f"Log file rotated to {config.LOG_FILE}.{timestamp}") except Exception as e: print(f"Error during log rotation: {e}", file=sys.stderr) # Scrivi sul file di log timestamp_log = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"{timestamp_log}\t\t{source}\t{level}\t{message}\n" try: with open(config.LOG_FILE, "a") as f: f.write(log_entry) except IOError as e: print(f"Error writing to log file {config.LOG_FILE}: {e}", file=sys.stderr) # Invia a piGardenWeb (se configurato) log_send(source, level, timestamp_log, message) def log_send(log_type, level, datetime_log, message): """Invia un log verso piGardenWeb.""" if config.LOG_URL and config.LOG_API_TOKEN: try: # Usiamo requests per un'interazione HTTP più robusta # pip install requests import requests data = { "api_token": config.LOG_API_TOKEN, "type": log_type, "level": level, "datetime_log": datetime_log, "message": message } # subprocess.Popen per non bloccare il processo principale # In un'applicazione reale, useresti un thread separato o una coda di messaggi # per inviare log in background per non bloccare l'esecuzione. # Qui simulo il comportamento di `curl ... &` # Nota: requests.post blocca. Per non bloccare, dovresti usare un thread. # Per emulare `&> /dev/null &` di Bash, useremmo subprocess.Popen # con stdout/stderr a DEVNULL e un thread per la chiamata requests. # Per semplicità, qui faremo una chiamata bloccante, ma con un timeout. # Esempio con subprocess.Popen per emulare curl in background # Questo è più vicino al comportamento Bash originale cmd = ["curl"] + config.LOG_CURL_PARAM.split() + [config.LOG_URL, "-d", f"api_token={config.LOG_API_TOKEN}&type={log_type}&level={level}&datetime_log={datetime_log}&message={message}"] subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except ImportError: log_write("log_send", "warning", "requests module not found, cannot send logs via HTTP.") except Exception as e: log_write("log_send", "error", f"Error sending log to piGardenWeb: {e}") def message_write(msg_type, message): """Scrive una tipologia di messaggio da inviare via socket server (o memorizza in variabili globali).""" if msg_type == 'info': config.MESSAGE_INFO = message elif msg_type == "warning": config.MESSAGE_WARNING = message elif msg_type == "success": config.MESSAGE_SUCCESS = message else: return log_write("message_write", msg_type, message) # Anche i messaggi vengono loggati def json_error(code, description): """Mostra un json per una risposta di errore.""" return json.dumps({"error": {"code": code, "description": description}}) # --- FUNZIONI DI GESTIONE DEL LOCK --- def lock(current_time=0): """Gestisce l'apertura di un lock (file-based).""" max_time = 10 # Secondi if current_time > max_time: log_write("general", "error", "Maximum locked time reached") time.sleep(max_time) # Sleep prima di sbloccare e uscire unlock() sys.exit(1) try: os.mkdir(config.LOCK_FILE) # Lock acquisito except FileExistsError: log_write("general", "info", f"Sleep 1 second for locked state (attempt {current_time + 1})") time.sleep(1) lock(current_time + 1) # Riprova ricorsivamente except OSError as e: log_write("general", "error", f"Error creating lock directory {config.LOCK_FILE}: {e}") sys.exit(1) def unlock(): """Chiude un lock (file-based).""" try: os.rmdir(config.LOCK_FILE) except OSError as e: log_write("general", "error", f"Error removing lock directory {config.LOCK_FILE}: {e}") # --- FUNZIONI DI GESTIONE DELLE ELETTROVALVOLE (EV) --- def ev_set_state(ev_number, state): """Imposta lo stato di una elettrovalvola.""" state_file = os.path.join(config.STATUS_DIR, f"ev{ev_number}") try: with open(state_file, "w") as f: f.write(str(state)) except IOError as e: log_write("ev_set_state", "error", f"Error writing EV state to {state_file}: {e}") def ev_get_state(ev_number): """Legge lo stato di una elettrovalvola.""" state_file = os.path.join(config.STATUS_DIR, f"ev{ev_number}") if os.path.exists(state_file): try: with open(state_file, "r") as f: return int(f.read().strip()) except (IOError, ValueError) as e: log_write("ev_get_state", "error", f"Error reading EV state from {state_file}: {e}") return 0 # Default a chiuso in caso di errore return 0 # Default a chiuso se il file non esiste def ev_alias2number(alias): """Recupera il numero di un'elettrovalvola in base all'alias.""" for num, ev_data in config.EV_CONFIG.items(): if ev_data["ALIAS"] == alias: return num log_write("general", "error", f"ERROR solenoid alias not found: {alias}") message_write("warning", "Solenoid alias not found") json_status_wrapper() # Chiama la funzione per stampare JSON sys.exit(1) # Esce come faceva lo script Bash def alias_exists(alias): """Verifica se un alias di un'elettrovalvola esiste.""" for ev_data in config.EV_CONFIG.values(): if ev_data["ALIAS"] == alias: return True return False def ev_number2gpio(ev_number): """Recupera il numero di gpio associato a un'elettrovalvola.""" return config.EV_CONFIG.get(ev_number, {}).get("GPIO") def ev_number2norain(ev_number): """Recupera il valore norain associato a un'elettrovalvola.""" return config.EV_CONFIG.get(ev_number, {}).get("NORAIN", 0) # Default a 0 se non specificato def ev_number2remote(ev_number): """Recupera il valore remote associato a un'elettrovalvola.""" return config.EV_CONFIG.get(ev_number, {}).get("REMOTE", 0) def ev_number2monostable(ev_number): """Recupera il valore monostable associato a un'elettrovalvola.""" return config.EV_CONFIG.get(ev_number, {}).get("MONOSTABLE", 0) def ev_status_all(): """Mostra lo stato di tutte le elettrovalvole.""" for i in range(1, config.EV_TOTAL + 1): alias = config.EV_CONFIG.get(i, {}).get("ALIAS", f"EV{i}") state = ev_get_state(i) print(f"{alias}: {state}") def ev_status(alias): """Mostra lo stato di un'elettrovalvola.""" ev_num = ev_alias2number(alias) state = ev_get_state(ev_num) print(f"{state}") # Stampa solo lo stato, come in Bash return state def close_all(force=False): """Chiude tutte le elettrovalvole.""" for i in range(1, config.EV_TOTAL + 1): alias = config.EV_CONFIG.get(i, {}).get("ALIAS", f"EV{i}") state = ev_get_state(i) if state > 0 or force: ev_close(alias) log_write("irrigate", "info", f"close_all - Close solenoid '{alias}' for rain") def list_alias(): """Stampa la lista degli alias delle elettrovalvole.""" for ev_data in config.EV_CONFIG.values(): print(ev_data["ALIAS"]) # --- FUNZIONI DRIVER (PLACEHOLDERS) --- # Queste funzioni simulano le interazioni hardware. # DEVONO ESSERE IMPLEMENTATE CON LE LIBRERIE GPIO REALI (es. RPi.GPIO, gpiozero) def drv_init(): """Inizializza i driver GPIO.""" log_write("drv", "info", "Initializing GPIO drivers...") # Esempio: # import RPi.GPIO as GPIO # GPIO.setmode(GPIO.BCM) # GPIO.setwarnings(False) def drv_supply_bistable_init(gpio1, gpio2): """Inizializza l'alimentazione per elettrovalvole bistabili.""" log_write("drv", "info", f"Initializing bistable supply on GPIOs {gpio1}, {gpio2}") # Imposta i GPIO come output e a stato iniziale (es. LOW) def drv_rele_init(gpio): """Inizializza un GPIO per un relè (elettrovalvola).""" log_write("drv", "info", f"Initializing relay on GPIO {gpio}") # Imposta il GPIO come output def drv_rele_close(gpio): """Attiva il relè per chiudere (o aprire, a seconda della logica del relè/valvola).""" log_write("drv", "info", f"Activating relay (close) on GPIO {gpio}") # Esempio: GPIO.output(gpio, GPIO.HIGH) # O LOW, a seconda del relè return 0 # 0 per successo def drv_rele_open(gpio): """Disattiva il relè per aprire (o chiudere, a seconda della logica).""" log_write("drv", "info", f"Deactivating relay (open) on GPIO {gpio}") # Esempio: GPIO.output(gpio, GPIO.LOW) # O HIGH return 0 # 0 per successo def drv_supply_positive(gpio1, gpio2): """Imposta l'alimentazione con voltaggio positivo per bistabili.""" log_write("drv", "info", f"Setting positive supply on GPIOs {gpio1}, {gpio2}") # Logica per invertire la polarità def drv_supply_negative(gpio1, gpio2): """Imposta l'alimentazione con voltaggio negativo per bistabili.""" log_write("drv", "info", f"Setting negative supply on GPIOs {gpio1}, {gpio2}") # Logica per invertire la polarità def drv_rain_sensor_init(gpio): """Inizializza il sensore di pioggia.""" log_write("drv", "info", f"Initializing rain sensor on GPIO {gpio}") # Imposta il GPIO come input e aggiungi un event listener # --- FUNZIONI CRON (PLACEHOLDERS) --- # Queste funzioni simulano l'interazione con crontab. # Dovrai implementarle usando una libreria come `python-crontab` o manipolando il crontab. def cron_del(cron_type, alias): """Elimina le voci cron per un tipo e alias specifici.""" log_write("cron", "info", f"Deleting cron entries for type: {cron_type}, alias: {alias}") # Esempio: usa python-crontab per rimuovere le entry return "" # Simula output vuoto per successo def cron_add(cron_type, cron_schedule, alias, force=""): """Aggiunge una voce cron.""" log_write("cron", "info", f"Adding cron entry for type: {cron_type}, schedule: {cron_schedule}, alias: {alias}, force: {force}") # Esempio: usa python-crontab per aggiungere l'entry return "" def cron_get(cron_type, alias): """Recupera le voci cron per un tipo e alias specifici.""" log_write("cron", "info", f"Getting cron entries for type: {cron_type}, alias: {alias}") # Esempio: restituisce una stringa con le voci cron, separate da newline return "" def cron_disable_all_open_close(): """Disabilita tutte le schedulazioni cron di apertura/chiusura.""" log_write("cron", "info", "Disabling all open/close cron schedules.") return "" def cron_enable_all_open_close(): """Abilita tutte le schedulazioni cron di apertura/chiusura.""" log_write("cron", "info", "Enabling all open/close cron schedules.") return "" def set_cron_init(): """Imposta il crontab per l'inizializzazione dell'unità di controllo.""" log_write("cron", "info", "Setting cron for init.") return "" def del_cron_init(): """Rimuove il crontab per l'inizializzazione dell'unità di controllo.""" log_write("cron", "info", "Deleting cron for init.") return "" def set_cron_start_socket_server(): """Imposta il crontab per l'avvio del socket server.""" log_write("cron", "info", "Setting cron for socket server start.") return "" def del_cron_start_socket_server(): """Rimuove il crontab per l'avvio del socket server.""" log_write("cron", "info", "Deleting cron for socket server start.") return "" def set_cron_check_rain_sensor(): """Imposta il crontab per il controllo della pioggia dal sensore.""" log_write("cron", "info", "Setting cron for rain sensor check.") return "" def del_cron_check_rain_sensor(): """Rimuove il crontab per il controllo della pioggia dal sensore.""" log_write("cron", "info", "Deleting cron for rain sensor check.") return "" def set_cron_check_rain_online(): """Imposta il crontab per il controllo della pioggia dal servizio online.""" log_write("cron", "info", "Setting cron for online rain check.") return "" def del_cron_check_rain_online(): """Rimuove il crontab per il controllo della pioggia dal servizio online.""" log_write("cron", "info", "Deleting cron for online rain check.") return "" def set_cron_close_all_for_rain(): """Imposta il crontab per chiudere tutte le elettrovalvole quando piove.""" log_write("cron", "info", "Setting cron for close all for rain.") return "" def del_cron_close_all_for_rain(): """Rimuove il crontab per chiudere tutte le elettrovalvole quando piove.""" log_write("cron", "info", "Deleting cron for close all for rain.") return "" # --- FUNZIONI SENSORI (PLACEHOLDERS) --- def json_sensor_status_all(): """Restituisce lo stato di tutti i sensori in formato JSON.""" sensor_data = {} for num, sensor_config in config.SENSOR_CONFIG.items(): alias = sensor_config["ALIAS"] # Qui dovresti leggere lo stato reale del sensore # Per ora, simulo uno stato casuale o fisso status_value = 0 # Placeholder sensor_data[alias] = {"alias": alias, "status": status_value} return json.dumps(sensor_data) def list_alias_sensor(): """Stampa la lista degli alias dei sensori.""" for sensor_data in config.SENSOR_CONFIG.values(): print(sensor_data["ALIAS"]) def sensor_status(alias, sensor_type=None): """Mostra lo stato di un sensore.""" # Trova il sensore per alias sensor_num = None for num, s_config in config.SENSOR_CONFIG.items(): if s_config["ALIAS"] == alias: sensor_num = num break if sensor_num is None: log_write("sensor", "error", f"Sensor alias not found: {alias}") message_write("warning", "Sensor alias not found") json_status_wrapper() return 1 # Errore # Qui dovresti leggere il valore reale dal sensore # Per ora, restituisco un valore fisso o casuale value = 0 # Placeholder print(value) return value def sensor_status_set(alias, sensor_type, value): """Imposta lo stato di un sensore (se supportato, es. per sensori virtuali).""" log_write("sensor", "info", f"Setting sensor {alias} type {sensor_type} to value {value}") # Implementa la logica per impostare lo stato del sensore return 0 # Successo def sensor_status_all(): """Mostra lo stato di tutti i sensori.""" for num, sensor_config in config.SENSOR_CONFIG.items(): alias = sensor_config["ALIAS"] # Qui dovresti leggere lo stato reale del sensore status_value = 0 # Placeholder print(f"{alias}: {status_value}") # --- FUNZIONI PIOGGIA (PLACEHOLDERS) --- def ev_check_moisture(ev_num): """Verifica l'umidità del terreno per un'elettrovalvola.""" log_write("irrigate", "info", f"Checking moisture for EV {ev_num}") # Implementa la logica per leggere l'umidità del sensore associato all'EV # Restituisce 0 se l'umidità è bassa (ok per irrigare), >0 altrimenti return 0 # Placeholder: nessuna umidità alta def check_rain_sensor(): """Controlla la pioggia dal sensore hardware.""" if config.RAIN_GPIO: log_write("rain", "info", f"Checking rain from sensor on GPIO {config.RAIN_GPIO}") # Leggi lo stato del sensore GPIO # Se piove, aggiorna il timestamp in config.STATUS_DIR/last_rain_sensor # Esempio: # if GPIO.input(config.RAIN_GPIO) == GPIO.LOW: # Assumendo LOW = pioggia # with open(os.path.join(config.STATUS_DIR, "last_rain_sensor"), "w") as f: # f.write(str(int(time.time()))) # log_write("rain", "info", "Rain detected by sensor.") pass else: log_write("rain", "info", "Rain sensor not configured.") def check_rain_online(): """Controlla la pioggia da un servizio API online.""" if config.WEATHER_SERVICE and config.WEATHER_SERVICE != "none": log_write("rain", "info", f"Checking rain from online service: {config.WEATHER_SERVICE}") # Implementa la chiamata all'API meteo (es. con 'requests') # Se piove, aggiorna il timestamp in config.STATUS_DIR/last_rain_online # Esempio: # import requests # try: # response = requests.get(f"https://api.example.com/weather?location={config.WEATHER_LOCATION}&api_key={config.WEATHER_API_KEY}") # data = response.json() # if data.get("rain_detected"): # Logica specifica per il tuo servizio # with open(os.path.join(config.STATUS_DIR, "last_rain_online"), "w") as f: # f.write(str(int(time.time()))) # log_write("rain", "info", "Rain detected by online service.") # except Exception as e: # log_write("rain", "error", f"Error checking online rain: {e}") pass else: log_write("rain", "info", "Online rain service not configured.") def reset_last_rain_sensor_timestamp(): """Resetta il timestamp dell'ultima pioggia dal sensore.""" file_path = os.path.join(config.STATUS_DIR, "last_rain_sensor") if os.path.exists(file_path): os.remove(file_path) log_write("rain", "info", "Timestamp of last sensor rain successfully reset.") def reset_last_rain_online_timestamp(): """Resetta il timestamp dell'ultima pioggia online.""" file_path = os.path.join(config.STATUS_DIR, "last_rain_online") if os.path.exists(file_path): os.remove(file_path) log_write("rain", "info", "Timestamp of last online rain successfully reset.") def last_rain_sensor_timestamp(): """Mostra il timestamp dell'ultima pioggia dal sensore.""" file_path = os.path.join(config.STATUS_DIR, "last_rain_sensor") if os.path.exists(file_path): with open(file_path, "r") as f: print(f.read().strip()) else: print("") def last_rain_online_timestamp(): """Mostra il timestamp dell'ultima pioggia online.""" file_path = os.path.join(config.STATUS_DIR, "last_rain_online") if os.path.exists(file_path): with open(file_path, "r") as f: print(f.read().strip()) else: print("") # --- FUNZIONI EVENTI (PLACEHOLDERS) --- # Queste funzioni simulano il sistema di eventi. # Potresti voler implementare un sistema di eventi più robusto (es. con un dispatcher). def trigger_event(event_name, alias="", param1="", param2=""): """Triggera un evento.""" config.CURRENT_EVENT = event_name config.CURRENT_EVENT_ALIAS = alias log_write("event", "info", f"Triggering event: {event_name} (alias: {alias}, params: {param1}, {param2})") # Qui potresti chiamare script esterni o funzioni Python basate sull'evento # Esempio: # if event_name == "init_before": # # Esegui codice specifico per init_before # pass return 0 # 0 per successo, diverso da 0 per errore (come in Bash) # --- FUNZIONI PRINCIPALI DEL SISTEMA --- def initialize(): """Inizializza le elettrovalvole e l'alimentazione.""" log_write("general", "info", "Run initialize") unlock() # Assicurati che non ci siano lock pendenti trigger_event("init_before", "") # Inizializza i driver gpio # list_drv non è definito nello script Bash fornito, presumo sia una lista di driver # Per ora, useremo solo drv_init log_write("drv", "info", "Initializing all drivers...") drv_init() # Chiamata generica per inizializzare tutti i driver # Imposta l'alimentazione con voltaggio negativo e setta i gpio in scrittura per le elettrovalvole bistabili if config.EV_MONOSTABLE != 1: # SUPPLY_GPIO_1 e SUPPLY_GPIO_2 non sono definiti in config, dovresti aggiungerli # Esempio: config.SUPPLY_GPIO_1 = 5, config.SUPPLY_GPIO_2 = 6 drv_supply_bistable_init(getattr(config, 'SUPPLY_GPIO_1', None), getattr(config, 'SUPPLY_GPIO_2', None)) # Elimina tutti gli stati delle elettrovalvole preesistenti try: for f in os.listdir(config.STATUS_DIR): if f.startswith("ev"): os.remove(os.path.join(config.STATUS_DIR, f)) log_write("general", "info", "Removed existing EV status files.") except Exception as e: log_write("general", "error", f"Error removing EV status files: {e}") # Inizializza i gpio delle elettrovalvole e ne chiude l'alimentazione for i in range(1, config.EV_TOTAL + 1): gpio = ev_number2gpio(i) if gpio is not None: drv_rele_init(gpio) ev_set_state(i, 0) # Imposta lo stato iniziale a chiuso # Chiude tutte le elettrovalvole for i in range(1, config.EV_TOTAL + 1): alias = config.EV_CONFIG.get(i, {}).get("ALIAS") if alias: ev_close(alias) # Chiama ev_close per ogni valvola # Inizializza il sensore di rilevamento pioggia if config.RAIN_GPIO: drv_rain_sensor_init(config.RAIN_GPIO) log_write("rain", "info", "Rain sensor initialized") else: log_write("rain", "info", "Rain sensor not present") trigger_event("init_after", "") log_write("general", "info", "End initialize") def ev_open(alias, force=""): """Commuta un'elettrovalvola nello stato aperto.""" cron_del("open_in", alias) # Non catturiamo l'output, come &> /dev/null ev_num = ev_alias2number(alias) gpio = ev_number2gpio(ev_num) ev_norain = ev_number2norain(ev_num) ev_is_remote = ev_number2remote(ev_num) ev_is_monostable = ev_number2monostable(ev_num) if force != "force": moisture = ev_check_moisture(ev_num) if moisture > 0: message_write("warning", "solenoid not open because maximum soil moisture has been reached") trigger_event("ev_not_open_for_moisture", alias) log_write("irrigate", "warning", f"Solenoid '{alias}' not open because maximum soil moisture has been reached") return if ev_norain != 1: now = int(time.time()) # Controllo pioggia online last_rain_online_file = os.path.join(config.STATUS_DIR, "last_rain_online") if config.NOT_IRRIGATE_IF_RAIN_ONLINE > 0 and os.path.exists(last_rain_online_file): try: with open(last_rain_online_file, "r") as f: last_rain_online = int(f.read().strip()) diff = now - last_rain_online if diff < config.NOT_IRRIGATE_IF_RAIN_ONLINE and moisture == 0: # Bash aveva moisture -ne 0 message_write("warning", "Solenoid not open for rain") trigger_event("ev_not_open_for_rain_online", alias) trigger_event("ev_not_open_for_rain", alias) log_write("irrigate", "warning", f"Solenoid '{alias}' not open for rain (online check)") return except (IOError, ValueError): pass # Ignora errori di lettura del file # Controllo pioggia sensore check_rain_sensor() # Assicurati che aggiorni last_rain_sensor last_rain_sensor_file = os.path.join(config.STATUS_DIR, "last_rain_sensor") if config.NOT_IRRIGATE_IF_RAIN_SENSOR > 0 and os.path.exists(last_rain_sensor_file): try: with open(last_rain_sensor_file, "r") as f: last_rain_sensor = int(f.read().strip()) diff = now - last_rain_sensor if diff < config.NOT_IRRIGATE_IF_RAIN_SENSOR and moisture == 0: # Bash aveva moisture -ne 0 message_write("warning", "Solenoid not open for rain") trigger_event("ev_not_open_for_rain_sensor", alias) trigger_event("ev_not_open_for_rain", alias) log_write("irrigate", "warning", f"Solenoid '{alias}' not open for rain (sensor check)") return except (IOError, ValueError): pass # Ignora errori di lettura del file state = 1 if force == "force": state = 2 if trigger_event("ev_open_before", alias, force) != 0: log_write("irrigate", "warning", f"Solenoid '{alias}' not open due to external event") message_write('warning', "Solenoid not open due to external event") mqtt_status() # Chiamata a mqtt_status (placeholder) return lock() # Gestisce l'apertura dell'elettrovalvola in base alla tipologia (monostabile / bistabile) if config.EV_MONOSTABLE == 1 or ev_is_remote == 1 or ev_is_monostable == 1: if drv_rele_close(gpio) == 1: # Se drv_rele_close restituisce 1 per errore unlock() return else: supply_positive() # supply_positive(config.SUPPLY_GPIO_1, config.SUPPLY_GPIO_2) drv_rele_close(gpio) time.sleep(1) drv_rele_open(gpio) ev_set_state(ev_num, state) log_write("irrigate", "info", f"Solenoid '{alias}' open") message_write("success", "Solenoid open") trigger_event("ev_open_after", alias, force) unlock() def ev_open_in(minute_start_str, minute_stop_str, alias, force=""): """Commuta un'elettrovalvola nello stato aperto dopo un intervallo.""" try: minute_start = int(minute_start_str) minute_stop = int(minute_stop_str) except ValueError: print("Time start/stop of irrigation is wrong or not specified", file=sys.stderr) message_write("warning", "Time start/stop of irrigation is wrong or not specified") mqtt_status() return 1 if minute_stop < 1: print("Time stop of irrigation is wrong", file=sys.stderr) message_write("warning", "Time stop of irrigation is wrong") mqtt_status() return 1 if not alias: print("Alias solenoid not specified", file=sys.stderr) message_write("warning", "Alias solenoid not specified") mqtt_status() return 1 trigger_event("ev_open_in_before", alias, force, minute_start_str, minute_stop_str) # gpio_alias2number $alias > /dev/null 2>&1 # Questa riga in Bash sembra solo verificare l'esistenza dell'alias e uscire in caso di errore. # La funzione ev_alias2number già fa questo. ev_alias2number(alias) # Per verificare che l'alias esista minute_start_actual = minute_start + 1 minute_stop_actual = minute_start_actual + minute_stop # Calcola le date/ore per cron # In Python, non generiamo stringhe cron direttamente per `date -d`, # ma calcoliamo i timestamp e li passiamo alle funzioni cron_add/del. # Le funzioni cron_add/del dovranno poi convertire questi timestamp in stringhe cron se necessario. # Per semplicità, qui useremo un formato simile a quello di Bash per cron_start/stop # che le funzioni cron_add/del dovranno interpretare. # cron_start_dt = datetime.datetime.now() + datetime.timedelta(minutes=minute_start_actual) # cron_stop_dt = datetime.datetime.now() + datetime.timedelta(minutes=minute_stop_actual) # cron_start_str_for_cron = cron_start_dt.strftime("%M %H %d %m %w") # %w per giorno della settimana (0-6) # cron_stop_str_for_cron = cron_stop_dt.strftime("%M %H %d %m %w") # Replicando il formato Bash per cron_add/del # Il formato è "MM HH DD MM DW" (minuto, ora, giorno del mese, mese, giorno della settimana) # dove DW è 0-6 (domenica-sabato) per `date +%u` in Bash. # Python's datetime.strftime('%w') restituisce 0-6 (domenica-sabato) # Questo è un placeholder. Le funzioni cron_add/del dovrebbero gestire la logica cron reale. # Qui passiamo solo i minuti relativi. cron_del("open_in", alias) cron_del("open_in_stop", alias) if minute_start_actual == 1: ev_open(alias, force) cron_start_info = "- - - - -" # Come in Bash else: # In un sistema reale, cron_add riceverebbe la data/ora esatta o i minuti di ritardo # e si occuperebbe di schedulare il job. cron_add("open_in", f"+{minute_start_actual}m", alias, force) cron_start_info = f"Scheduled in {minute_start_actual} minutes" # Info per il log cron_add("open_in_stop", f"+{minute_stop_actual}m", alias) cron_stop_info = f"Scheduled stop in {minute_stop_actual} minutes" # Info per il log message_write("success", "Scheduled start successfully performed") trigger_event("ev_open_in_after", alias, force, cron_start_info, cron_stop_info) return 0 def ev_close(alias): """Commuta un'elettrovalvola nello stato chiuso.""" ev_num = ev_alias2number(alias) gpio = ev_number2gpio(ev_num) ev_is_remote = ev_number2remote(ev_num) trigger_event("ev_close_before", alias) lock() if config.EV_MONOSTABLE == 1 or ev_is_remote == 1: if drv_rele_open(gpio) == 1: # Se drv_rele_open restituisce 1 per errore unlock() return else: supply_negative() # supply_negative(config.SUPPLY_GPIO_1, config.SUPPLY_GPIO_2) drv_rele_close(gpio) time.sleep(1) drv_rele_open(gpio) ev_set_state(ev_num, 0) # 0 = chiuso log_write("irrigate", "info", f"Solenoid '{alias}' close") message_write("success", "Solenoid close") trigger_event("ev_close_after", alias) unlock() cron_del("open_in_stop", alias) # Non catturiamo l'output, come &> /dev/null def supply_positive(): """Imposta l'alimentazione delle elettrovalvole con voltaggio positivo.""" # SUPPLY_GPIO_1 e SUPPLY_GPIO_2 non sono definiti in config, dovresti aggiungerli drv_supply_positive(getattr(config, 'SUPPLY_GPIO_1', None), getattr(config, 'SUPPLY_GPIO_2', None)) def supply_negative(): """Imposta l'alimentazione delle elettrovalvole con voltaggio negativo.""" # SUPPLY_GPIO_1 e SUPPLY_GPIO_2 non sono definiti in config, dovresti aggiungerli drv_supply_negative(getattr(config, 'SUPPLY_GPIO_1', None), getattr(config, 'SUPPLY_GPIO_2', None)) def send_identifier(): """Invia l'identificativo univoco ad uso statistico di utilizzo.""" if config.NO_SEND_IDENTIFIER == 1: return file_id_path = "/tmp/pigarden.id" if os.path.exists(file_id_path): max_age_file = 86400 # 1 giorno in secondi try: time_file = os.path.getmtime(file_id_path) # Tempo dell'ultima modifica age_file = int(time.time()) - int(time_file) if age_file < max_age_file: return # ID troppo giovane, non inviare except OSError as e: log_write("send_identifier", "error", f"Error getting file modification time for {file_id_path}: {e}") return # Genera l'ID (come nello script Bash precedente) try: ifconfig_output = subprocess.check_output(['/sbin/ifconfig'], encoding='utf-8') mac_address_pattern = re.compile(r'([0-9a-fA-F]{1,2}:){5}[0-9a-fA-F]{1,2}') match = mac_address_pattern.search(ifconfig_output) if match: mac_address = match.group(0) id_hash = hashlib.md5(mac_address.encode('utf-8')).hexdigest() else: log_write("send_identifier", "warning", "No MAC address found for identifier.") return except (FileNotFoundError, subprocess.CalledProcessError) as e: log_write("send_identifier", "error", f"Error getting MAC address for identifier: {e}") return if not id_hash: return try: with open(file_id_path, "w") as f: f.write(id_hash) except IOError as e: log_write("send_identifier", "error", f"Error writing identifier to {file_id_path}: {e}") return log_write("general", "info", "Send installation identifier to collect usage") # Invio via CURL (subprocess.Popen per emulare nohup ... &) url = f"https://www.lejubila.net/statistic/collect_usage/piGarden/{id_hash}/{config.VERSION}/{config.SUB_VERSION}/{config.RELEASE_VERSION}" cmd = ["curl", url] subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def exec_poweroff(): """Spegne il sistema.""" trigger_event("exec_poweroff_before", "") log_write("system", "warning", "Executing system poweroff...") # In un sistema reale, questo script potrebbe chiamare un altro script Python o un comando di sistema. # Per replicare il comportamento Bash, potremmo chiamare un comando di sistema. # Il sleep 15 è strano, forse per dare tempo al log di essere scritto. time.sleep(15) # Simula il sleep # Esegui lo script poweroff.sh se esiste, altrimenti il comando di sistema poweroff_script_path = os.path.join(config.DIR_SCRIPT, "scripts", "poweroff.sh") if os.path.exists(poweroff_script_path) and os.access(poweroff_script_path, os.X_OK): try: subprocess.run([poweroff_script_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: log_write("system", "error", f"Error executing poweroff script: {e}") else: try: # Assicurati che l'utente abbia i permessi sudo senza password per 'poweroff' subprocess.run(["sudo", "poweroff"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: log_write("system", "error", f"Error executing sudo poweroff: {e}") except FileNotFoundError: log_write("system", "error", "Comando 'sudo' o 'poweroff' non trovato.") trigger_event("exec_poweroff_after", "") def exec_reboot(): """Riavvia il sistema.""" trigger_event("exec_reboot_before", "") log_write("system", "warning", "Executing system reboot...") time.sleep(15) # Simula il sleep reboot_script_path = os.path.join(config.DIR_SCRIPT, "scripts", "reboot.sh") if os.path.exists(reboot_script_path) and os.access(reboot_script_path, os.X_OK): try: subprocess.run([reboot_script_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: log_write("system", "error", f"Error executing reboot script: {e}") else: try: # Assicurati che l'utente abbia i permessi sudo senza password per 'reboot' subprocess.run(["sudo", "reboot"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: log_write("system", "error", f"Error executing sudo reboot: {e}") except FileNotFoundError: log_write("system", "error", "Comando 'sudo' o 'reboot' non trovato.") trigger_event("exec_reboot_after", "") def cmd_pigardensched(*args): """Esegue un comando con piGardenSched.""" if config.PIGARDENSCHED == 0: print("piGardenSched not configured in piGarden", file=sys.stderr) log_write("piGardenSched", "error", "piGardenSched not configured in piGarden") return 1 # Errore try: # Passa tutti gli argomenti al sottoprocesso result = subprocess.run([config.PIGARDENSCHED_PATH] + list(args), capture_output=True, text=True, check=True) # print(result.stdout.strip()) # Stampa l'output di piGardenSched se necessario return 0 # Successo except subprocess.CalledProcessError as e: print("piGardenSched command failed", file=sys.stderr) log_write("piGardenSched", "error", f"piGardenSched command failed: {e.stderr.strip()}") return 1 # Errore except FileNotFoundError: print(f"piGardenSched executable not found at {config.PIGARDENSCHED_PATH}", file=sys.stderr) log_write("piGardenSched", "error", f"piGardenSched executable not found at {config.PIGARDENSCHED_PATH}") return 1 def deg2dir(degrees): """Converte da gradi a direzione.""" try: deg = int(float(degrees)) # Gestisce anche i decimali come in Bash (sed 's/\..*$//') except (ValueError, TypeError): return "" # Come in Bash per "null" o input non valido if deg <= 11: return "North" elif deg <= 33: return "NNE" elif deg <= 56: return "NE" elif deg <= 78: return "ENE" elif deg <= 101: return "East" elif deg <= 123: return "ESE" elif deg <= 146: return "SE" elif deg <= 168: return "SSE" elif deg <= 191: return "South" elif deg <= 213: return "SSW" elif deg <= 236: return "SW" elif deg <= 258: return "WSW" elif deg <= 281: return "West" elif deg <= 303: return "WNW" elif deg <= 326: return "NW" elif deg <= 348: return "NNW" else: return "North" def debug1(*args): """Esegue il codice di debug 1.""" log_write("debug", "info", f"Running debug1 with args: {args}") # Qui potresti importare ed eseguire un modulo Python dedicato al debug # Esempio: # from debug import debug1_script # debug1_script.run(*args) print("Debug1 executed (placeholder).") def debug2(*args): """Esegue il codice di debug 2.""" log_write("debug", "info", f"Running debug2 with args: {args}") # Esempio: # from debug import debug2_script # debug2_script.run(*args) print("Debug2 executed (placeholder).") # --- FUNZIONE JSON STATUS --- # Questa è una funzione complessa che richiede molte dipendenze. # Ho cercato di replicare la sua logica il più fedelmente possibile. def json_status_wrapper(*args_from_cli): """ Stampa un json contenente lo status della centralina. I parametri opzionali controllano quali dati aggiuntivi includere. """ json_data = {} # Inizializza i messaggi (come le variabili globali in Bash) json_data["info"] = config.MESSAGE_INFO json_data["warning"] = config.MESSAGE_WARNING json_data["success"] = config.MESSAGE_SUCCESS # Versione json_data["version"] = { "ver": config.VERSION, "sub": config.SUB_VERSION, "rel": config.RELEASE_VERSION } # Timestamp json_data["timestamp"] = int(time.time()) # Evento corrente json_data["event"] = { "event": config.CURRENT_EVENT, "alias": config.CURRENT_EVENT_ALIAS } # PID corrente (PARENT_PID se specificato) current_pid = os.getpid() if config.PARENT_PID > 0: current_pid = config.PARENT_PID json_data["pid"] = current_pid # Aggiunto per chiarezza, non esplicito in Bash ma utile # Parsing degli argomenti per includere dati aggiuntivi with_get_cron = "0" with_get_cron_open_in = "0" with_get_schedule = "0" for arg in args_from_cli: if arg == "get_cron": with_get_cron = "1" elif arg.startswith("get_cron:"): with_get_cron = arg.split(":")[1] elif arg == "get_cron_open_in": with_get_cron_open_in = "1" elif arg.startswith("get_cron_open_in:"): with_get_cron_open_in = arg.split(":")[1] elif arg == "get_schedule" and config.PIGARDENSCHED == 1: with_get_schedule = "1" # Stato delle zone/elettrovalvole zones_data = {} ev_data_aliases = {} for i in range(1, config.EV_TOTAL + 1): alias = config.EV_CONFIG.get(i, {}).get("ALIAS", f"EV{i}") state = ev_get_state(i) # ev_status $av > /dev/null; local sv=$? zones_data[alias] = {"name": alias, "state": state} ev_data_aliases[f"EV{i}"] = {"alias": alias} # EVx_ALIAS json_data["zones"] = zones_data json_data["ev"] = ev_data_aliases # Ultima pioggia (sensore e online) last_rain_sensor_file = os.path.join(config.STATUS_DIR, "last_rain_sensor") last_rain_online_file = os.path.join(config.STATUS_DIR, "last_rain_online") last_weather_online_file = os.path.join(config.STATUS_DIR, "last_weather_online") json_data["last_rain_sensor"] = "" if os.path.exists(last_rain_sensor_file): try: with open(last_rain_sensor_file, "r") as f: json_data["last_rain_sensor"] = f.read().strip() except IOError: pass json_data["last_rain_online"] = "" if os.path.exists(last_rain_online_file): try: with open(last_rain_online_file, "r") as f: json_data["last_rain_online"] = f.read().strip() except IOError: pass json_data["last_weather_online"] = "" # In Bash era un JSON string, qui lo mettiamo come stringa vuota o il contenuto del file if os.path.exists(last_weather_online_file): try: with open(last_weather_online_file, "r") as f: content = f.read().strip() # Se il contenuto è già un JSON valido, lo si può includere direttamente # Altrimenti, lo si lascia come stringa try: json.loads(content) # Prova a parsare come JSON json_data["last_weather_online"] = json.loads(content) except json.JSONDecodeError: json_data["last_weather_online"] = content # Lascia come stringa se non è JSON valido except IOError: pass # Errore (sempre presente nel JSON Bash) json_data["error"] = {"code": 0, "description": ""} # Placeholder, dovrebbe essere aggiornato dalle funzioni di errore # Dati cron (open/close) cron_open_data = {} cron_close_data = {} if with_get_cron != "0": element_for_cron = [] if with_get_cron == "1": element_for_cron = range(1, config.EV_TOTAL + 1) else: try: element_for_cron = [ev_alias2number(with_get_cron)] except SystemExit: # ev_alias2number esce in caso di errore pass # Non aggiungere cron data se l'alias non esiste for i in element_for_cron: alias = config.EV_CONFIG.get(i, {}).get("ALIAS") if alias: crn_open = cron_get("open", alias).replace("\n", "%%") cron_open_data[alias] = crn_open crn_close = cron_get("close", alias).replace("\n", "%%") cron_close_data[alias] = crn_close json_data["cron"] = {"open": cron_open_data, "close": cron_close_data} # Dati schedule (piGardenSched) schedule_data = {} if with_get_schedule == "1": try: # Esegui piGardenSched e parsa l'output result = subprocess.run([config.PIGARDENSCHED_PATH, "sched"], capture_output=True, text=True, check=True) for line in result.stdout.strip().split('\n'): if line: parts = line.split(';') if len(parts) >= 1: ev_key = parts[0] # Cerca l'alias corrispondente alias_found = "" for num, ev_conf in config.EV_CONFIG.items(): if f"EV{num}" == ev_key: alias_found = ev_conf["ALIAS"] break if alias_found: schedule_data[ev_key] = {"alias": alias_found, "entry": line} else: log_write("json_status", "warning", f"Alias not found for scheduled EV key: {ev_key}") json_data["schedule"] = schedule_data except (subprocess.CalledProcessError, FileNotFoundError) as e: log_write("json_status", "error", f"Error getting piGardenSched schedule: {e}") json_data["schedule"] = {} # In caso di errore, restituisci un oggetto vuoto else: json_data["schedule"] = {} # Sempre includi la chiave, anche se vuota # Dati cron_open_in cron_open_in_data = {} cron_open_in_stop_data = {} if with_get_cron_open_in != "0": element_for_cron_open_in = [] if with_get_cron_open_in == "1": element_for_cron_open_in = range(1, config.EV_TOTAL + 1) else: try: element_for_cron_open_in = [ev_alias2number(with_get_cron_open_in)] except SystemExit: pass for i in element_for_cron_open_in: alias = config.EV_CONFIG.get(i, {}).get("ALIAS") if alias: crn_open_in = cron_get("open_in", alias).replace("\n", "%%") cron_open_in_data[alias] = crn_open_in crn_open_in_stop = cron_get("open_in_stop", alias).replace("\n", "%%") cron_open_in_stop_data[alias] = crn_open_in_stop json_data["cron_open_in"] = {"open_in": cron_open_in_data, "open_in_stop": cron_open_in_stop_data} # Stato dei sensori json_data["sensors"] = json.loads(json_sensor_status_all()) # Assumendo che json_sensor_status_all restituisca JSON string # Stampa il JSON finale print(json.dumps(json_data, indent=2)) # indent=2 per una stampa leggibile # --- MQTT STATUS --- def mqtt_status(parent_pid=None): """Invia al broker mqtt il json contenente lo stato del sistema.""" if not config.MQTT_ENABLE == 1: return if parent_pid is not None: config.PARENT_PID = parent_pid # Genera il JSON di stato # Cattura l'output di json_status_wrapper() reindirizzando stdout old_stdout = sys.stdout sys.stdout = captured_output = io.StringIO() json_status_wrapper() sys.stdout = old_stdout js = captured_output.getvalue().strip() try: # Usa paho-mqtt per inviare il messaggio # pip install paho-mqtt import paho.mqtt.client as mqtt client = mqtt.Client(client_id=config.MQTT_CLIENT_ID) if config.MQTT_USER and config.MQTT_PWD: client.username_pw_set(config.MQTT_USER, config.MQTT_PWD) client.connect(config.MQTT_HOST, config.MQTT_PORT, 60) # 60 secondi di keepalive client.publish(config.MQTT_TOPIC, js, qos=1, retain=True) # retain=True come -r client.disconnect() log_write("mqtt", "info", "MQTT status published successfully.") except ImportError: log_write("mqtt", "error", "paho-mqtt module not found. Cannot publish MQTT status.") except Exception as e: log_write("mqtt", "error", f"Error publishing MQTT status: {e}") # --- FUNZIONE PRINCIPALE (CLI DISPATCHER) --- def show_usage(): """Mostra i parametri dello script.""" print(f"piGarden v. {config.VERSION}.{config.SUB_VERSION}.{config.RELEASE_VERSION}") print("\nUsage:") print(f"\t{config.NAME_SCRIPT} init initialize supply and solenoid in closed state") print(f"\t{config.NAME_SCRIPT} open alias [force] open a solenoid") print(f"\t{config.NAME_SCRIPT} open_in minute_start minute_stop alias [force] open a solenoid in minute_start for minute_stop") print(f"\t{config.NAME_SCRIPT} close alias close a solenoid") print(f"\t{config.NAME_SCRIPT} list_alias view list of aliases solenoid") print(f"\t{config.NAME_SCRIPT} ev_status alias show status solenoid") print(f"\t{config.NAME_SCRIPT} ev_status_all show status solenoids") print("\n") print(f"\t{config.NAME_SCRIPT} list_alias_sensor view list of aliases sensor") print(f"\t{config.NAME_SCRIPT} sensor_status alias [type] show status sensor (type: {config.SENSOR_STATE_TYPE})") print(f"\t{config.NAME_SCRIPT} sensor_status_set alias type value set status of sensor (type: {config.SENSOR_STATE_TYPE})") print(f"\t{config.NAME_SCRIPT} sensor_status_all show status of all sensors") print("\n") print(f"\t{config.NAME_SCRIPT} last_rain_sensor_timestamp show timestamp of last rain sensor") print(f"\t{config.NAME_SCRIPT} last_rain_online_timestamp show timestamp of last rain online") print(f"\t{config.NAME_SCRIPT} reset_last_rain_sensor_timestamp show timestamp of last rain sensor") print(f"\t{config.NAME_SCRIPT} reset_last_rain_online_timestamp show timestamp of last rain online") print("\n") print(f"\t{config.NAME_SCRIPT} json_status [get_cron|get_cron_open_in|get_schedule] show status in json format") print(f"\t{config.NAME_SCRIPT} mqtt_status send status in json format to mqtt broker") print("\n") print(f"\t{config.NAME_SCRIPT} check_rain_online check rain from online api service") print(f"\t{config.NAME_SCRIPT} check_rain_sensor check rain from hardware sensor") print("\n") print(f"\t{config.NAME_SCRIPT} close_all_for_rain close all solenoid if it's raining") print(f"\t{config.NAME_SCRIPT} close_all [force] close all solenoid") print("\n") print(f"\t{config.NAME_SCRIPT} start_socket_server [force] start socket server, with 'force' parameter force close socket server if already open") print(f"\t{config.NAME_SCRIPT} stop_socket_server stop socket server") print("\n") print(f"\t{config.NAME_SCRIPT} reboot reboot system") print(f"\t{config.NAME_SCRIPT} poweroff shutdown system") print("\n") print(f"\t{config.NAME_SCRIPT} set_cron_init set crontab for initialize control unit") print(f"\t{config.NAME_SCRIPT} del_cron_init remove crontab for initialize control unit") print(f"\t{config.NAME_SCRIPT} set_cron_start_socket_server set crontab for start socket server") print(f"\t{config.NAME_SCRIPT} del_cron_start_socket_server remove crontab for start socket server") print(f"\t{config.NAME_SCRIPT} set_cron_check_rain_sensor set crontab for check rein from sensor") print(f"\t{config.NAME_SCRIPT} del_cron_check_rain_sensor remove crontab for check rein from sensor") print(f"\t{config.NAME_SCRIPT} set_cron_check_rain_online set crontab for check rein from online service") print(f"\t{config.NAME_SCRIPT} del_cron_check_rain_online remove crontab for check rein from online service") print(f"\t{config.NAME_SCRIPT} set_cron_close_all_for_rain set crontab for close all solenoid when raining") print(f"\t{config.NAME_SCRIPT} del_cron_close_all_for_rain remove crontab for close all solenoid when raining") print("\n") print(f"\t{config.NAME_SCRIPT} add_cron_open alias m h dom mon dow [disabled] add crontab for open a solenoid") print(f"\t{config.NAME_SCRIPT} del_cron_open alias remove all crontab for open a solenoid") print(f"\t{config.NAME_SCRIPT} get_cron_open alias get all crontab for open a solenoid") print(f"\t{config.NAME_SCRIPT} del_cron_open_in alias remove all crontab for open_in a solenoid") print(f"\t{config.NAME_SCRIPT} add_cron_close alias m h dom mon dow [disabled] add crontab for close a solenoid") print(f"\t{config.NAME_SCRIPT} del_cron_close alias remove all crontab for close a solenoid") print(f"\t{config.NAME_SCRIPT} get_cron_close alias get all crontab for close a solenoid") print("\n") print(f"\t{config.NAME_SCRIPT} cmd_pigardensched [prm1 [prm2 [prm3]...]] performs a pigardensched command") print("\n") print(f"\t{config.NAME_SCRIPT} debug1 [parameter]|[parameter]|..] Run debug code 1") print(f"\t{config.NAME_SCRIPT} debug2 [parameter]|[parameter]|..] Run debug code 2") # --- MAIN EXECUTION BLOCK --- if __name__ == "__main__": # Carica la configurazione dal file /etc/piGarden.conf (simulazione) # In un'applicazione reale, useresti una libreria per parsare un file .conf o .ini # Per ora, assumiamo che la classe Config sia già popolata con i valori di default # o che tu la popoli manualmente qui. # Esempio di caricamento (molto semplificato): # try: # with open(config.CONFIG_ETC, 'r') as f: # # Parsa il file di configurazione e aggiorna gli attributi di 'config' # # Questo richiede un parser INI/JSON/YAML a seconda del formato del tuo file # pass # except FileNotFoundError: # print(f"Config file not found in {config.CONFIG_ETC}", file=sys.stderr) # sys.exit(1) # Importa le funzioni del server socket (dal file precedente) # Questo richiede che il file socket_server.py sia nello stesso PATH # o che tu lo abbia copiato/incollato qui. # Per evitare dipendenze circolari o problemi di import, # ho copiato le funzioni start_socket_server e stop_socket_server direttamente qui. # Ho bisogno di importare io per catturare l'output di json_status_wrapper import io # Funzioni del server socket (copiate dal file precedente per auto-contenimento) # Rimuovi questa sezione se preferisci importare da un modulo separato. # Globals per il server socket (dall'altro script) TCPSERVER_PID_FILE = config.TCPSERVER_PID_FILE TCPSERVER_IP = "0.0.0.0" # Ascolta su tutte le interfacce disponibili TCPSERVER_PORT = 12345 # Porta del server socket TCPSERVER_USER = config.TCPSERVER_USER # Credenziali opzionali per l'autenticazione TCPSERVER_PWD = config.TCPSERVER_PWD class MyTCPHandler(socketserver.BaseRequestHandler): def handle(self): global RUN_FROM_TCPSERVER RUN_FROM_TCPSERVER = True client_ip = self.client_address[0] log_write("socket_server", "info", f"Nuova connessione da: {client_ip}") response_to_client = "" try: if TCPSERVER_USER and TCPSERVER_PWD: self.request.settimeout(3) try: user_line = self.request.recv(1024).decode('utf-8').strip() password_line = self.request.recv(1024).decode('utf-8').strip() except socket.timeout: log_write("socket_server", "warning", f"socket connection from: {client_ip} - Timeout during credentials read") response_to_client = json_error(0, "Authentication timeout") self.request.sendall(response_to_client.encode('utf-8') + b'\n') return if user_line != TCPSERVER_USER or password_line != TCPSERVER_PWD: log_write("socket_server", "warning", f"socket connection from: {client_ip} - Bad socket server credentials - user:{user_line}") response_to_client = json_error(0, "Bad socket server credentials") self.request.sendall(response_to_client.encode('utf-8') + b'\n') return else: log_write("socket_server", "info", f"socket connection from: {client_ip} - Authentication successful") self.request.settimeout(None) command_line = self.request.recv(4096).decode('utf-8').strip() args = command_line.split(' ') # Mappa gli argomenti per facilitare il passaggio alle funzioni cmd_args = [args[i] if len(args) > i else "" for i in range(8)] log_write("socket_server", "info", f"socket connection from: {client_ip} - command: {command_line}") # Cattura l'output delle funzioni che stampano su stdout old_stdout = sys.stdout sys.stdout = captured_output = io.StringIO() # Chiama la funzione di dispatch principale con gli argomenti # Questo è il punto dove il server socket chiama le funzioni di piGarden dispatch_command(*cmd_args) sys.stdout = old_stdout response_to_client = captured_output.getvalue().strip() # Se non c'è una risposta esplicita (es. da json_status), invia un messaggio di successo generico if not response_to_client: response_to_client = json.dumps({"status": "success", "message": "Command executed."}) self.request.sendall(response_to_client.encode('utf-8') + b'\n') except socket.timeout: log_write("socket_server", "warning", f"socket connection from: {client_ip} - Timeout waiting for command.") response_to_client = json_error(0, "Timeout waiting for command") self.request.sendall(response_to_client.encode('utf-8') + b'\n') except Exception as e: log_write("socket_server", "error", f"Errore durante la gestione della connessione da {client_ip}: {e}") response_to_client = json_error(-1, f"Internal server error: {e}") self.request.sendall(response_to_client.encode('utf-8') + b'\n') finally: self.request.close() RUN_FROM_TCPSERVER = False def start_socket_server_internal(): if os.path.exists(TCPSERVER_PID_FILE): try: os.remove(TCPSERVER_PID_FILE) log_write("socket_server", "info", f"Rimosso file PID esistente: {TCPSERVER_PID_FILE}") except OSError as e: log_write("socket_server", "error", f"Errore nella rimozione del file PID: {e}") sys.exit(1) current_pid = os.getpid() try: with open(TCPSERVER_PID_FILE, "w") as f: f.write(str(current_pid)) log_write("socket_server", "info", f"Server PID {current_pid} scritto in {TCPSERVER_PID_FILE}") except IOError as e: log_write("socket_server", "error", f"Errore nella scrittura del file PID: {e}") sys.exit(1) try: server = socketserver.ThreadingTCPServer((TCPSERVER_IP, TCPSERVER_PORT), MyTCPHandler) log_write("socket_server", "info", f"Server socket avviato su {TCPSERVER_IP}:{TCPSERVER_PORT}") server.serve_forever() except Exception as e: log_write("socket_server", "error", f"Errore all'avvio del server socket: {e}") if os.path.exists(TCPSERVER_PID_FILE): os.remove(TCPSERVER_PID_FILE) sys.exit(1) def stop_socket_server_internal(): if not os.path.exists(TCPSERVER_PID_FILE): print("Daemon is not running") sys.exit(1) log_write("socket_server", "info", "Richiesta di stop del socket server.") with open(TCPSERVER_PID_FILE, "r") as f: try: pid = int(f.read().strip()) except ValueError: print(f"Errore: Il file PID '{TCPSERVER_PID_FILE}' contiene un PID non valido.") sys.exit(1) descendants = list_descendants(pid) for d_pid in descendants: try: os.kill(d_pid, 9) log_write("socket_server", "info", f"Terminato processo discendente {d_pid}") except ProcessLookupError: log_write("socket_server", "info", f"Processo discendente {d_pid} non trovato, probabilmente già terminato.") except Exception as e: log_write("socket_server", "error", f"Errore durante l'uccisione del processo discendente {d_pid}: {e}") try: os.kill(pid, 9) log_write("socket_server", "info", f"Terminato processo server {pid}") except ProcessLookupError: print(f"Processo con PID {pid} non trovato, probabilmente già terminato.") except Exception as e: log_write("socket_server", "error", f"Errore durante l'uccisione del processo server {pid}: {e}") if os.path.exists(TCPSERVER_PID_FILE): try: os.remove(TCPSERVER_PID_FILE) log_write("socket_server", "info", "File PID rimosso.") except OSError as e: log_write("socket_server", "error", f"Errore nella rimozione del file PID: {e}") # Funzione di dispatch centrale per i comandi CLI e socket def dispatch_command(cmd, *args): if cmd == "init": initialize() elif cmd == "open": ev_open(args[0], args[1] if len(args) > 1 else "") elif cmd == "open_in": ev_open_in(args[0], args[1], args[2], args[3] if len(args) > 3 else "") elif cmd == "close": ev_close(args[0]) elif cmd == "list_alias": list_alias() elif cmd == "ev_status": ev_status(args[0]) elif cmd == "ev_status_all": ev_status_all() elif cmd == "list_alias_sensor": list_alias_sensor() elif cmd == "sensor_status": sensor_status(args[0], args[1] if len(args) > 1 else None) elif cmd == "sensor_status_set": sensor_status_set(args[0], args[1], args[2]) elif cmd == "sensor_status_all": sensor_status_all() elif cmd == "last_rain_sensor_timestamp": last_rain_sensor_timestamp() elif cmd == "last_rain_online_timestamp": last_rain_online_timestamp() elif cmd == "reset_last_rain_sensor_timestamp": reset_last_rain_sensor_timestamp() message_write("success", "Timestamp of last sensor rain successfully reset") json_status_wrapper() elif cmd == "reset_last_rain_online_timestamp": reset_last_rain_online_timestamp() message_write("success", "Timestamp of last online rain successfully reset") json_status_wrapper() elif cmd == "json_status": json_status_wrapper(*args) elif cmd == "mqtt_status": mqtt_status() elif cmd == "check_rain_online": check_rain_online() message_write("success", "Online rain check performed") json_status_wrapper() elif cmd == "check_rain_sensor": check_rain_sensor() message_write("success", "Sensor rain check performed") json_status_wrapper() elif cmd == "close_all_for_rain": # La logica "for_rain" è implicita in close_all se non c'è "force" close_all() message_write("success", "All solenoid closed for rain") json_status_wrapper() elif cmd == "close_all": close_all(force=(args[0] == "force" if len(args) > 0 else False)) message_write("success", "All solenoid closed") json_status_wrapper() elif cmd == "start_socket_server": # Il parametro force nello script Bash era per forzare la chiusura se già aperto # Qui la logica di start_socket_server_internal già gestisce la rimozione del PID file start_socket_server_internal() elif cmd == "stop_socket_server": stop_socket_server_internal() elif cmd == "reboot": exec_reboot() elif cmd == "poweroff": exec_poweroff() elif cmd == "set_cron_init": vret = set_cron_init() if vret: json_error(0, "Cron set failed") else: message_write("success", "Cron set successful"); json_status_wrapper() elif cmd == "del_cron_init": vret = del_cron_init() if vret: json_error(0, "Cron del failed") else: message_write("success", "Cron deleted successful"); json_status_wrapper() elif cmd == "set_cron_start_socket_server": vret = set_cron_start_socket_server() if vret: json_error(0, "Cron set failed") else: message_write("success", "Cron set successful"); json_status_wrapper() elif cmd == "del_cron_start_socket_server": vret = del_cron_start_socket_server() if vret: json_error(0, "Cron del failed") else: message_write("success", "Cron deleted successful"); json_status_wrapper() elif cmd == "set_cron_check_rain_sensor": vret = set_cron_check_rain_sensor() if vret: json_error(0, "Cron set failed") else: message_write("success", "Cron set successful"); json_status_wrapper() elif cmd == "del_cron_check_rain_sensor": vret = del_cron_check_rain_sensor() if vret: json_error(0, "Cron del failed") else: message_write("success", "Cron deleted successful"); json_status_wrapper() elif cmd == "set_cron_check_rain_online": vret = set_cron_check_rain_online() if vret: json_error(0, "Cron set failed") else: message_write("success", "Cron set successful"); json_status_wrapper() elif cmd == "del_cron_check_rain_online": vret = del_cron_check_rain_online() if vret: json_error(0, "Cron del failed") else: message_write("success", "Cron deleted successful"); json_status_wrapper() elif cmd == "set_cron_close_all_for_rain": vret = set_cron_close_all_for_rain() if vret: json_error(0, "Cron set failed") else: message_write("success", "Cron set successful"); json_status_wrapper() elif cmd == "del_cron_close_all_for_rain": vret = del_cron_close_all_for_rain() if vret: json_error(0, "Cron del failed") else: message_write("success", "Cron deleted successful"); json_status_wrapper() elif cmd == "add_cron_open": vret = add_cron_open(args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else "") if vret: json_error(0, "Cron set failed") else: message_write("success", "Cron set successful"); json_status_wrapper() elif cmd == "del_cron_open": vret = del_cron_open(args[0]) if vret: json_error(0, "Cron set failed") else: message_write("success", "Cron set successful"); json_status_wrapper() elif cmd == "get_cron_open": print(cron_get("open", args[0])) # Stampa diretta elif cmd == "del_cron_open_in": vret = del_cron_open_in(args[0]) if vret: json_error(0, "Cron del failed") else: message_write("success", "Scheduled start successfully deleted"); json_status_wrapper(f"get_cron_open_in:{args[0]}") elif cmd == "add_cron_close": vret = add_cron_close(args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else "") if vret: json_error(0, "Cron set failed") else: message_write("success", "Cron set successful"); json_status_wrapper() elif cmd == "del_cron_close": vret = del_cron_close(args[0]) if vret: json_error(0, "Cron set failed") else: message_write("success", "Cron set successful"); json_status_wrapper() elif cmd == "get_cron_close": print(cron_get("close", args[0])) # Stampa diretta elif cmd == "cmd_pigardensched": vret = cmd_pigardensched(*args) if vret != 0: # cmd_pigardensched restituisce 0 per successo, 1 per errore json_error(0, "piGardenSched command failed") log_write("socket_server", "error", f"piGardenSched command failed: {vret}") else: message_write("success", "Schedule set successful") json_status_wrapper() elif cmd == "debug1": debug1(*args) elif cmd == "debug2": debug2(*args) else: show_usage() sys.exit(1) # Invio dell'identificativo all'avvio dello script (se non è un comando specifico) if len(sys.argv) == 1 or sys.argv[1] not in ["start_socket_server", "stop_socket_server"]: send_identifier() # Dispatcher per i comandi dalla riga di comando if len(sys.argv) > 1: dispatch_command(sys.argv[1], *sys.argv[2:]) else: show_usage() sys.exit(1)