CalejoControl/src/core/auto_discovery.py

88 lines
3.0 KiB
Python
Raw Normal View History

"""
Auto-Discovery Module for Calejo Control Adapter.
Automatically discovers pump stations and pumps from database on startup.
"""
from typing import Dict, List
import structlog
from src.database.client import DatabaseClient
logger = structlog.get_logger()
class AutoDiscovery:
"""Auto-discovery module for pump stations and pumps."""
def __init__(self, db_client: DatabaseClient):
self.db_client = db_client
self.pump_stations: Dict[str, Dict] = {}
self.pumps: Dict[str, List[Dict]] = {}
async def discover(self):
"""Discover all pump stations and pumps from database."""
try:
# Discover pump stations
stations = self.db_client.get_pump_stations()
for station in stations:
self.pump_stations[station['station_id']] = station
# Discover pumps
pumps = self.db_client.get_pumps()
for pump in pumps:
station_id = pump['station_id']
if station_id not in self.pumps:
self.pumps[station_id] = []
self.pumps[station_id].append(pump)
logger.info(
"auto_discovery_completed",
station_count=len(self.pump_stations),
pump_count=len(pumps)
)
# Log discovered stations and pumps
for station_id, station in self.pump_stations.items():
station_pumps = self.pumps.get(station_id, [])
logger.info(
"station_discovered",
station_id=station_id,
station_name=station['station_name'],
pump_count=len(station_pumps)
)
for pump in station_pumps:
logger.info(
"pump_discovered",
station_id=station_id,
pump_id=pump['pump_id'],
pump_name=pump['pump_name'],
control_type=pump['control_type']
)
except Exception as e:
logger.error("auto_discovery_failed", error=str(e))
raise
def get_stations(self) -> Dict[str, Dict]:
"""Get all discovered pump stations."""
return self.pump_stations
def get_pumps(self, station_id: str = None) -> List[Dict]:
"""Get all discovered pumps, optionally filtered by station."""
if station_id:
return self.pumps.get(station_id, [])
else:
all_pumps = []
for station_pumps in self.pumps.values():
all_pumps.extend(station_pumps)
return all_pumps
def get_pump(self, station_id: str, pump_id: str) -> Dict:
"""Get a specific pump by station and pump ID."""
station_pumps = self.pumps.get(station_id, [])
for pump in station_pumps:
if pump['pump_id'] == pump_id:
return pump
return None