""" Setpoint Manager for Calejo Control Adapter. Manages setpoint calculation for all pumps with safety integration. """ from typing import Dict, Optional, Any import structlog from src.core.auto_discovery import AutoDiscovery from src.database.flexible_client import FlexibleDatabaseClient from src.core.safety import SafetyLimitEnforcer from src.core.emergency_stop import EmergencyStopManager from src.monitoring.watchdog import DatabaseWatchdog logger = structlog.get_logger() class SetpointCalculator: """Base class for setpoint calculators.""" def calculate_setpoint(self, plan: Dict[str, Any], feedback: Optional[Dict[str, Any]], pump_info: Dict[str, Any]) -> float: """ Calculate setpoint from optimization plan. Args: plan: Optimization plan data feedback: Latest feedback data (optional) pump_info: Pump configuration information Returns: Calculated setpoint in Hz """ raise NotImplementedError("Subclasses must implement calculate_setpoint") class DirectSpeedCalculator(SetpointCalculator): """Calculator for direct speed control pumps.""" def calculate_setpoint(self, plan: Dict[str, Any], feedback: Optional[Dict[str, Any]], pump_info: Dict[str, Any]) -> float: """ Calculate setpoint for direct speed control. Uses suggested_speed_hz directly from optimization plan. """ return float(plan.get('suggested_speed_hz', 35.0)) class LevelControlledCalculator(SetpointCalculator): """Calculator for level-controlled pumps.""" def calculate_setpoint(self, plan: Dict[str, Any], feedback: Optional[Dict[str, Any]], pump_info: Dict[str, Any]) -> float: """ Calculate setpoint for level-controlled pumps. Uses target_level_m and current level feedback to calculate speed. """ target_level = float(plan.get('target_level_m', 2.0)) # If feedback available, use PID-like control if feedback and 'current_level_m' in feedback: current_level = float(feedback['current_level_m']) level_error = target_level - current_level # Simple proportional control kp = 5.0 # Proportional gain base_speed = 35.0 speed_adjustment = kp * level_error return base_speed + speed_adjustment # Fallback: use suggested speed or default return float(plan.get('suggested_speed_hz', 35.0)) class PowerControlledCalculator(SetpointCalculator): """Calculator for power-controlled pumps.""" def calculate_setpoint(self, plan: Dict[str, Any], feedback: Optional[Dict[str, Any]], pump_info: Dict[str, Any]) -> float: """ Calculate setpoint for power-controlled pumps. Uses target_power_kw and current power feedback to calculate speed. """ target_power = float(plan.get('target_power_kw', 15.0)) # If feedback available, use power-based control if feedback and 'current_power_kw' in feedback: current_power = float(feedback['current_power_kw']) power_error = target_power - current_power # Simple proportional control kp = 2.0 # Proportional gain base_speed = 35.0 speed_adjustment = kp * power_error return base_speed + speed_adjustment # Fallback: use suggested speed or default return float(plan.get('suggested_speed_hz', 35.0)) class SetpointManager: """ Manages setpoint calculation for all pumps. Integrates with safety framework to enforce limits and handle failsafe mode. """ def __init__( self, discovery: AutoDiscovery, db_client: FlexibleDatabaseClient, safety_enforcer: SafetyLimitEnforcer, emergency_stop_manager: EmergencyStopManager, watchdog: DatabaseWatchdog ): self.discovery = discovery self.db_client = db_client self.safety_enforcer = safety_enforcer self.emergency_stop_manager = emergency_stop_manager self.watchdog = watchdog self.running = False # Create calculator instances self.calculators = { 'DIRECT_SPEED': DirectSpeedCalculator(), 'LEVEL_CONTROLLED': LevelControlledCalculator(), 'POWER_CONTROLLED': PowerControlledCalculator() } async def start(self) -> None: """Start the Setpoint Manager.""" self.running = True logger.info("setpoint_manager_started") async def stop(self) -> None: """Stop the Setpoint Manager.""" self.running = False logger.info("setpoint_manager_stopped") def get_current_setpoint(self, station_id: str, pump_id: str) -> Optional[float]: """ Get current setpoint for a pump. Integrates safety checks: 1. Check if emergency stop is active 2. Check if failsafe mode is active 3. Calculate setpoint from optimization plan 4. Enforce safety limits Returns: Setpoint in Hz, or None if no valid plan exists """ # Check emergency stop if self.emergency_stop_manager.is_emergency_stop_active(station_id, pump_id): logger.info( "emergency_stop_active", station_id=station_id, pump_id=pump_id ) return 0.0 # Complete stop during emergency # Check failsafe mode if self.watchdog.is_failsafe_active(station_id, pump_id): logger.info( "failsafe_mode_active", station_id=station_id, pump_id=pump_id ) return self._get_default_setpoint(station_id, pump_id) # Get pump info pump_info = self.discovery.get_pump(station_id, pump_id) if not pump_info: logger.error("pump_not_found", station_id=station_id, pump_id=pump_id) return None # Get current optimization plan plan = self.db_client.get_current_plan(station_id, pump_id) if not plan: logger.warning("no_active_plan", station_id=station_id, pump_id=pump_id) return self._get_default_setpoint(station_id, pump_id) # Get latest feedback (optional) feedback = self.db_client.get_latest_feedback(station_id, pump_id) # Get appropriate calculator calculator = self.calculators.get(pump_info['control_type']) if not calculator: logger.error("unknown_control_type", control_type=pump_info['control_type']) return None # Calculate setpoint setpoint = calculator.calculate_setpoint(plan, feedback, pump_info) # Enforce safety limits (LAST LINE OF DEFENSE) safe_setpoint, violations = self.safety_enforcer.enforce_setpoint( station_id, pump_id, setpoint ) # Log if setpoint was modified if safe_setpoint != setpoint: logger.warning( "setpoint_limited_by_safety", station_id=station_id, pump_id=pump_id, original_setpoint=setpoint, safe_setpoint=safe_setpoint ) return safe_setpoint def get_all_current_setpoints(self) -> Dict[str, Dict[str, Optional[float]]]: """ Get current setpoints for all discovered pumps. Returns: Dictionary mapping station_id -> pump_id -> setpoint """ setpoints = {} for station in self.discovery.get_stations(): station_id = station['station_id'] setpoints[station_id] = {} for pump in self.discovery.get_pumps(station_id): pump_id = pump['pump_id'] setpoint = self.get_current_setpoint(station_id, pump_id) setpoints[station_id][pump_id] = setpoint return setpoints def _get_default_setpoint(self, station_id: str, pump_id: str) -> float: """ Get default safe setpoint for pump. Returns pump's configured default_setpoint_hz or conservative fallback. """ try: query = """ SELECT default_setpoint_hz FROM pumps WHERE station_id = %s AND pump_id = %s """ result = self.db_client.execute_query(query, (station_id, pump_id)) if result and result[0]['default_setpoint_hz']: return float(result[0]['default_setpoint_hz']) except Exception as e: logger.error( "failed_to_get_default_setpoint", station_id=station_id, pump_id=pump_id, error=str(e) ) # Ultimate fallback (should never reach here) logger.error( "no_default_setpoint_configured", station_id=station_id, pump_id=pump_id ) return 35.0 # Conservative fallback