drv.include.py

This commit is contained in:
2025-07-09 18:47:54 +02:00
parent caf698bfd5
commit a11db80ac0

487
Python/drv.include.py Normal file
View File

@@ -0,0 +1,487 @@
import os
import re
import logging
import subprocess # Per eseguire comandi esterni come 'gpio' se necessario
# --- Mock/Placeholder per le dipendenze esterne e configurazione ---
# In un'applicazione reale, queste verrebbero fornite dal tuo sistema piGarden principale.
# Configura un logger di base per le funzioni di log
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def log_write(log_type, level, message):
"""Simula la funzione log_write dal tuo script Bash."""
if level == "info":
logging.info(f"[{log_type}] {message}")
elif level == "warning":
logging.warning(f"[{log_type}] {message}")
elif level == "error":
logging.error(f"[{log_type}] {message}")
else:
logging.debug(f"[{log_type}] {message}")
def message_write(msg_type, message):
"""Simula la funzione message_write dal tuo script Bash."""
if msg_type == 'info':
logging.info(f"[message] INFO: {message}")
elif msg_type == 'warning':
logging.warning(f"[message] WARNING: {message}")
elif msg_type == 'success':
logging.info(f"[message] SUCCESS: {message}")
# Variabili di configurazione simulate (dovrebbero venire dal piGarden.conf)
# Queste saranno passate alla classe DriverManager
mock_config = {
"EV_TOTAL": 6,
"SUPPLY_GPIO_1": 2,
"SUPPLY_GPIO_2": 3,
"RAIN_GPIO": 25,
"WEATHER_SERVICE": "drv:openweathermap", # Esempio di driver di servizio meteo
"RELE_GPIO_CLOSE": 0,
"RELE_GPIO_OPEN": 1,
"SUPPLY_GPIO_POS": 0,
"SUPPLY_GPIO_NEG": 1,
"GPIO": "/usr/local/bin/gpio", # Percorso al comando gpio (wiringPi)
"CUT": "/usr/bin/cut", # Percorso al comando cut
"LOG_OUTPUT_DRV_FILE": "/tmp/piGarden.drv.log", # Percorso per il log dei driver
# Elettrovalvole di esempio per setup_drv
"EV1_GPIO": "17",
"EV2_GPIO": "drv:custom_rele", # Esempio di un GPIO gestito da un driver custom
"EV3_GPIO": "22",
"EV4_GPIO": "18",
"EV5_GPIO": "23",
"EV6_GPIO": "24",
}
# --- Classe DriverManager ---
class DriverManager:
def __init__(self, config, log_writer, message_writer):
self.config = config
self.log_write = log_writer
self.message_write = message_writer
self.list_drv = [] # Lista dei driver attivi rilevati
# Percorsi degli strumenti esterni (dal config)
self.gpio_cmd = self.config.get("GPIO")
self.cut_cmd = self.config.get("CUT")
self.log_output_drv_file = self.config.get("LOG_OUTPUT_DRV_FILE")
# Inizializza il file di log dei driver se non esiste
if not os.path.exists(self.log_output_drv_file):
open(self.log_output_drv_file, 'a').close()
# Configura un logger specifico per l'output dei driver, come nello script Bash
self.drv_logger = logging.getLogger('driver_output')
self.drv_logger.setLevel(logging.INFO)
# Rimuovi handler esistenti per evitare duplicati se chiamato più volte
if not self.drv_logger.handlers:
drv_handler = logging.FileHandler(self.log_output_drv_file, mode='a')
drv_formatter = logging.Formatter('%(asctime)s %(message)s')
drv_handler.setFormatter(drv_formatter)
self.drv_logger.addHandler(drv_handler)
self.drv_logger.propagate = False # Evita che i log vadano al logger root
# Placeholder per le funzioni GPIO dirette (sostituire con RPi.GPIO o gpiozero)
# Esempio con subprocess per il comando 'gpio' (meno Pythonico ma più fedele al Bash)
self._gpio_write = lambda gpio_id, value: self._run_gpio_command("write", gpio_id, value)
self._gpio_mode = lambda gpio_id, mode: self._run_gpio_command("mode", gpio_id, mode)
self._gpio_read = lambda gpio_id: self._run_gpio_command("read", gpio_id)
# Inizializza i driver al momento della creazione dell'istanza
self.setup_drv()
def _run_gpio_command(self, action, gpio_id, value=None):
"""Esegue un comando 'gpio' tramite subprocess."""
cmd = [self.gpio_cmd, "-g", action, str(gpio_id)]
if value is not None:
cmd.append(str(value))
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
self.drv_logger.info(f"GPIO command '{' '.join(cmd)}' output: {result.stdout.strip()}")
return result.stdout.strip()
except subprocess.CalledProcessError as e:
self.log_write("drv", "error", f"Errore GPIO command '{' '.join(cmd)}': {e.stderr.strip()}")
self.message_write("warning", f"Errore GPIO: {e.stderr.strip()}")
return None # O solleva un'eccezione
def setup_drv(self):
"""
Funzione eseguita ad ogni avvio, include i driver e lancia le funzioni di setup.
"""
self.list_drv = [] # Azzera la lista dei driver
# Raccoglie i nomi dei driver utilizzati per le elettrovalvole
ev_total = self.config.get("EV_TOTAL", 0)
for i in range(1, ev_total + 1):
gpio_val = self.config.get(f"EV{i}_GPIO", "")
if gpio_val.startswith("drv:"):
drv = gpio_val.split(":")[1]
if drv not in self.list_drv:
self.list_drv.append(drv)
# Raccoglie i nomi dei driver utilizzati per gli altri gpio e servizi
for key in ["SUPPLY_GPIO_1", "SUPPLY_GPIO_2", "RAIN_GPIO", "WEATHER_SERVICE"]:
gpio_val = self.config.get(key, "")
if isinstance(gpio_val, str) and gpio_val.startswith("drv:"):
drv = gpio_val.split(":")[1]
if drv not in self.list_drv:
self.list_drv.append(drv)
# Simula l'inclusione dei file dei driver e l'esecuzione della funzione di setup
for drv in self.list_drv:
# In un'applicazione reale, qui potresti caricare moduli Python specifici
# per ogni driver o chiamare metodi dedicati.
# Per ora, simuliamo la chiamata a drv_<drv>_setup
setup_func_name = f"drv_{drv}_setup"
if hasattr(self, setup_func_name) and callable(getattr(self, setup_func_name)):
self.drv_logger.info(f"{setup_func_name}")
try:
getattr(self, setup_func_name)()
except Exception as e:
self.log_write("drv", "error", f"Errore in {setup_func_name}: {e}")
else:
self.drv_logger.info(f"Nessuna funzione di setup trovata per driver: {drv}")
def get_driver_callback(self, function_name, driver_id):
"""
Restituisce il nome del metodo interno da richiamare per una specifica funzione del driver.
"""
if isinstance(driver_id, str) and driver_id.startswith("drv:"):
drv = driver_id.split(":")[1]
if drv not in self.list_drv:
return "drvnotfound"
return f"drv_{drv}_{function_name}"
return None # Nessun driver specifico, useremo il GPIO diretto
# --- Implementazioni delle funzioni drv_* ---
# Esempio di un driver custom (simulato)
def drv_custom_rele_rele_init(self, gpio_id):
self.drv_logger.info(f"Custom Relè Driver: Inizializzazione {gpio_id}")
# Logica specifica per il relè custom
# Esempio: self.custom_rele_board.init(gpio_id)
return True
def drv_custom_rele_rele_close(self, gpio_id):
self.drv_logger.info(f"Custom Relè Driver: Chiusura {gpio_id}")
# Logica specifica per il relè custom
# Esempio: self.custom_rele_board.set_state(gpio_id, 'closed')
return True
def drv_custom_rele_rele_open(self, gpio_id):
self.drv_logger.info(f"Custom Relè Driver: Apertura {gpio_id}")
# Logica specifica per il relè custom
# Esempio: self.custom_rele_board.set_state(gpio_id, 'open')
return True
def drv_openweathermap_rain_online_get(self, driver_id):
self.drv_logger.info(f"OpenWeatherMap Driver: Recupero dati meteo online per {driver_id}")
# Qui faresti una chiamata API reale a OpenWeatherMap
# Esempio:
# import requests
# api_key = self.config.get("OPENWEATHERMAP_KEY")
# location = self.config.get("OPENWEATHERMAP_LOCATION")
# url = f"http://api.openweathermap.org/data/2.5/weather?{location}&appid={api_key}"
# try:
# response = requests.get(url)
# response.raise_for_status() # Solleva un'eccezione per errori HTTP
# data = response.json()
# # Estrai lo stato della pioggia da 'data'
# if 'rain' in data and data['rain']:
# return "1" # Indica pioggia
# return "0" # Nessuna pioggia
# except requests.exceptions.RequestException as e:
# self.log_write("drv", "error", f"Errore OpenWeatherMap API: {e}")
# self.message_write("warning", "Errore servizio meteo online")
# return ""
return "0" # Mock return
def drv_rele_init(self, gpio_id):
"""Inizializza un relè e lo porta nello stato aperto."""
fnc_name = self.get_driver_callback("rele_init", gpio_id)
if fnc_name is None: # Nessun driver specifico, usa GPIO diretto
self._gpio_write(gpio_id, self.config.get("RELE_GPIO_OPEN"))
self._gpio_mode(gpio_id, "out")
elif fnc_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
self.message_write("warning", f"Driver non trovato: {gpio_id}")
else:
# Chiama la funzione del driver dinamico
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
try:
getattr(self, fnc_name)(gpio_id)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
def drv_rele_close(self, gpio_id):
"""Chiude un relè."""
fnc_name = self.get_driver_callback("rele_close", gpio_id)
if fnc_name is None:
self._gpio_write(gpio_id, self.config.get("RELE_GPIO_CLOSE"))
elif fnc_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
self.message_write("warning", f"Driver non trovato: {gpio_id}")
return False # Fallimento
else:
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
try:
getattr(self, fnc_name)(gpio_id)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
return False
else:
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
return False
return True # Successo
def drv_rele_open(self, gpio_id):
"""Apre un relè."""
fnc_name = self.get_driver_callback("rele_open", gpio_id)
if fnc_name is None:
self._gpio_write(gpio_id, self.config.get("RELE_GPIO_OPEN"))
elif fnc_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
self.message_write("warning", f"Driver non trovato: {gpio_id}")
return False
else:
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
try:
getattr(self, fnc_name)(gpio_id)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
return False
else:
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
return False
return True
def drv_supply_bistable_init(self, idx1, idx2):
"""Inizializza i relè che gestiscono l'alimentazione per le valvole bistabili."""
fnc1_name = self.get_driver_callback("supply_bistable_init", idx1)
fnc2_name = self.get_driver_callback("supply_bistable_init", idx2)
if fnc1_name is None:
self._gpio_write(idx1, 0)
self._gpio_mode(idx1, "out")
elif fnc1_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {idx1}")
self.message_write("warning", f"Driver non trovato: {idx1}")
return
else:
if hasattr(self, fnc1_name) and callable(getattr(self, fnc1_name)):
self.drv_logger.info(f"{fnc1_name} arg:{idx1}")
try:
getattr(self, fnc1_name)(idx1)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc1_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc1_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc1_name}' non implementata.")
if fnc2_name is None:
self._gpio_write(idx2, 0)
self._gpio_mode(idx2, "out")
elif fnc2_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {idx2}")
self.message_write("warning", f"Driver non trovato: {idx2}")
else:
if hasattr(self, fnc2_name) and callable(getattr(self, fnc2_name)):
self.drv_logger.info(f"{fnc2_name} arg:{idx2}")
try:
getattr(self, fnc2_name)(idx2)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc2_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc2_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc2_name}' non implementata.")
def drv_supply_positive(self, idx1, idx2):
"""Imposta la tensione positiva per le elettrovalvole bistabili."""
fnc1_name = self.get_driver_callback("supply_positive", idx1)
fnc2_name = self.get_driver_callback("supply_positive", idx2)
if fnc1_name is None:
self._gpio_write(idx1, self.config.get("SUPPLY_GPIO_POS"))
elif fnc1_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {idx1}")
self.message_write("warning", f"Driver non trovato: {idx1}")
return
else:
if hasattr(self, fnc1_name) and callable(getattr(self, fnc1_name)):
self.drv_logger.info(f"{fnc1_name} arg:{idx1}")
try:
getattr(self, fnc1_name)(idx1)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc1_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc1_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc1_name}' non implementata.")
if fnc2_name is None:
self._gpio_write(idx2, self.config.get("SUPPLY_GPIO_POS"))
elif fnc2_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {idx2}")
self.message_write("warning", f"Driver non trovato: {idx2}")
else:
if hasattr(self, fnc2_name) and callable(getattr(self, fnc2_name)):
self.drv_logger.info(f"{fnc2_name} arg:{idx2}")
try:
getattr(self, fnc2_name)(idx2)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc2_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc2_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc2_name}' non implementata.")
def drv_supply_negative(self, idx1, idx2):
"""Imposta la tensione negativa per le elettrovalvole bistabili."""
fnc1_name = self.get_driver_callback("supply_negative", idx1)
fnc2_name = self.get_driver_callback("supply_negative", idx2)
if fnc1_name is None:
self._gpio_write(idx1, self.config.get("SUPPLY_GPIO_NEG"))
elif fnc1_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {idx1}")
self.message_write("warning", f"Driver non trovato: {idx1}")
return
else:
if hasattr(self, fnc1_name) and callable(getattr(self, fnc1_name)):
self.drv_logger.info(f"{fnc1_name} arg:{idx1}")
try:
getattr(self, fnc1_name)(idx1)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc1_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc1_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc1_name}' non implementata.")
if fnc2_name is None:
self._gpio_write(idx2, self.config.get("SUPPLY_GPIO_NEG"))
elif fnc2_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {idx2}")
self.message_write("warning", f"Driver non trovato: {idx2}")
else:
if hasattr(self, fnc2_name) and callable(getattr(self, fnc2_name)):
self.drv_logger.info(f"{fnc2_name} arg:{idx2}")
try:
getattr(self, fnc2_name)(idx2)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc2_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc2_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc2_name}' non implementata.")
def drv_rain_sensor_init(self, gpio_id):
"""Inizializza il sensore della pioggia."""
fnc_name = self.get_driver_callback("rain_sensor_init", gpio_id)
if fnc_name is None:
self._gpio_mode(gpio_id, "in")
elif fnc_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
self.message_write("warning", f"Driver non trovato: {gpio_id}")
else:
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
try:
getattr(self, fnc_name)(gpio_id)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
def drv_rain_sensor_get(self, gpio_id):
"""Legge lo stato del sensore della pioggia."""
fnc_name = self.get_driver_callback("rain_sensor_get", gpio_id)
vret = ""
if fnc_name is None:
vret = self._gpio_read(gpio_id)
elif fnc_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato: {gpio_id}")
self.message_write("warning", f"Driver non trovato: {gpio_id}")
else:
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
self.drv_logger.info(f"{fnc_name} arg:{gpio_id}")
try:
vret = getattr(self, fnc_name)(gpio_id)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
return vret
def drv_rain_online_get(self, driver_id):
"""Legge lo stato delle condizioni meteo dal servizio online."""
fnc_name = self.get_driver_callback("rain_online_get", driver_id)
vret = ""
if fnc_name is None or fnc_name == "drvnotfound":
self.log_write("drv", "error", f"Driver non trovato o non specificato per il servizio meteo: {driver_id}")
self.message_write("warning", f"Driver non trovato per il servizio meteo: {driver_id}")
else:
if hasattr(self, fnc_name) and callable(getattr(self, fnc_name)):
self.drv_logger.info(f"{fnc_name} arg:{driver_id}")
try:
vret = getattr(self, fnc_name)(driver_id)
except Exception as e:
self.log_write("drv", "error", f"Errore in {fnc_name}: {e}")
else:
self.log_write("drv", "error", f"Funzione driver '{fnc_name}' non implementata.")
self.message_write("warning", f"Funzione driver '{fnc_name}' non implementata.")
return vret
# --- Esempio di utilizzo ---
if __name__ == "__main__":
print("--- Test DriverManager ---")
# Inizializza il DriverManager con la configurazione mock e le funzioni di log/messaggio
driver_manager = DriverManager(mock_config, log_write, message_write)
print("\n--- Test setup_drv (eseguito all'inizializzazione) ---")
print(f"Driver rilevati: {driver_manager.list_drv}")
print("\n--- Test drv_rele_init (GPIO diretto) ---")
driver_manager.drv_rele_init("17") # EV1_GPIO = 17
print("\n--- Test drv_rele_close (GPIO diretto) ---")
driver_manager.drv_rele_close("17")
print("\n--- Test drv_rele_open (GPIO diretto) ---")
driver_manager.drv_rele_open("17")
print("\n--- Test drv_rele_init (Custom Driver) ---")
driver_manager.drv_rele_init("drv:custom_rele") # EV2_GPIO = drv:custom_rele
print("\n--- Test drv_supply_positive ---")
driver_manager.drv_supply_positive(mock_config["SUPPLY_GPIO_1"], mock_config["SUPPLY_GPIO_2"])
print("\n--- Test drv_rain_sensor_init ---")
driver_manager.drv_rain_sensor_init(mock_config["RAIN_GPIO"])
print("\n--- Test drv_rain_sensor_get ---")
rain_sensor_state = driver_manager.drv_rain_sensor_get(mock_config["RAIN_GPIO"])
print(f"Stato sensore pioggia: {rain_sensor_state}")
print("\n--- Test drv_rain_online_get (OpenWeatherMap Driver) ---")
online_rain_state = driver_manager.drv_rain_online_get(mock_config["WEATHER_SERVICE"])
print(f"Stato pioggia online: {online_rain_state}")
print("\n--- Test completato ---")
print(f"Controlla il file di log dei driver: {driver_manager.log_output_drv_file}")