""" Safety Framework for Calejo Control Adapter. Implements multi-layer safety mechanisms to prevent equipment damage and operational hazards. """ from typing import Tuple, List, Optional, Dict from dataclasses import dataclass import structlog from src.database.client import DatabaseClient from src.core.emergency_stop import EmergencyStopManager logger = structlog.get_logger() @dataclass class SafetyLimits: """Safety limits for a pump.""" hard_min_speed_hz: float hard_max_speed_hz: float hard_min_level_m: Optional[float] hard_max_level_m: Optional[float] hard_max_power_kw: Optional[float] max_speed_change_hz_per_min: float class SafetyLimitEnforcer: """ Enforces multi-layer safety limits on all setpoints. This is the LAST line of defense before setpoints are exposed to SCADA. ALL setpoints MUST pass through this enforcer. Three-Layer Architecture: - Layer 1: Physical Hard Limits (PLC/VFD) - 15-55 Hz - Layer 2: Station Safety Limits (Database) - 20-50 Hz (enforced here) - Layer 3: Optimization Constraints (Calejo Optimize) - 25-45 Hz """ def __init__(self, db_client: DatabaseClient, emergency_stop_manager: EmergencyStopManager = None): self.db_client = db_client self.emergency_stop_manager = emergency_stop_manager self.safety_limits_cache: Dict[Tuple[str, str], SafetyLimits] = {} self.previous_setpoints: Dict[Tuple[str, str], float] = {} async def load_safety_limits(self): """Load safety limits from database into cache.""" try: limits = self.db_client.get_safety_limits() for limit in limits: key = (limit['station_id'], limit['pump_id']) self.safety_limits_cache[key] = SafetyLimits( hard_min_speed_hz=limit['hard_min_speed_hz'], hard_max_speed_hz=limit['hard_max_speed_hz'], hard_min_level_m=limit.get('hard_min_level_m'), hard_max_level_m=limit.get('hard_max_level_m'), hard_max_power_kw=limit.get('hard_max_power_kw'), max_speed_change_hz_per_min=limit['max_speed_change_hz_per_min'] ) logger.info("safety_limits_loaded", pump_count=len(limits)) except Exception as e: logger.error("failed_to_load_safety_limits", error=str(e)) raise def enforce_setpoint( self, station_id: str, pump_id: str, setpoint: float ) -> Tuple[float, List[str]]: """ Enforce safety limits on setpoint. Args: station_id: Station identifier pump_id: Pump identifier setpoint: Proposed setpoint (Hz) Returns: Tuple of (enforced_setpoint, violations) - enforced_setpoint: Safe setpoint (clamped if necessary) - violations: List of safety violations (for logging/alerting) """ violations = [] enforced_setpoint = setpoint # Check emergency stop first (highest priority) if self.emergency_stop_manager and self.emergency_stop_manager.is_emergency_stop_active(station_id, pump_id): violations.append("EMERGENCY_STOP_ACTIVE") # Emergency stop overrides everything - set to 0 Hz return (0.0, violations) # Get safety limits key = (station_id, pump_id) limits = self.safety_limits_cache.get(key) if not limits: logger.error( "no_safety_limits", station_id=station_id, pump_id=pump_id ) # CRITICAL: No safety limits defined - reject setpoint return (0.0, ["NO_SAFETY_LIMITS_DEFINED"]) # Check minimum speed if enforced_setpoint < limits.hard_min_speed_hz: violations.append( f"BELOW_MIN_SPEED: {enforced_setpoint:.2f} < {limits.hard_min_speed_hz:.2f}" ) enforced_setpoint = limits.hard_min_speed_hz # Check maximum speed if enforced_setpoint > limits.hard_max_speed_hz: violations.append( f"ABOVE_MAX_SPEED: {enforced_setpoint:.2f} > {limits.hard_max_speed_hz:.2f}" ) enforced_setpoint = limits.hard_max_speed_hz # Check rate of change (prevent sudden speed changes that damage equipment) previous_setpoint = self.previous_setpoints.get(key) if previous_setpoint is not None: max_change = limits.max_speed_change_hz_per_min * 5.0 # 5-minute interval actual_change = abs(enforced_setpoint - previous_setpoint) if actual_change > max_change: # Limit rate of change direction = 1 if enforced_setpoint > previous_setpoint else -1 enforced_setpoint = previous_setpoint + (direction * max_change) violations.append( f"RATE_OF_CHANGE_LIMITED: {actual_change:.2f} Hz > {max_change:.2f} Hz" ) # Store current setpoint for next rate-of-change check self.previous_setpoints[key] = enforced_setpoint # Log violations if violations: logger.warning( "safety_limit_violation", station_id=station_id, pump_id=pump_id, requested_setpoint=setpoint, enforced_setpoint=enforced_setpoint, violations=violations ) # Record violation in database for audit self._record_violation(station_id, pump_id, setpoint, enforced_setpoint, violations) return (enforced_setpoint, violations) def get_safety_limits(self, station_id: str, pump_id: str) -> Optional[SafetyLimits]: """Get safety limits for a specific pump.""" key = (station_id, pump_id) return self.safety_limits_cache.get(key) def has_safety_limits(self, station_id: str, pump_id: str) -> bool: """Check if safety limits exist for a specific pump.""" key = (station_id, pump_id) return key in self.safety_limits_cache def clear_safety_limits(self, station_id: str, pump_id: str): """Clear safety limits for a specific pump.""" key = (station_id, pump_id) self.safety_limits_cache.pop(key, None) self.previous_setpoints.pop(key, None) def clear_all_safety_limits(self): """Clear all safety limits.""" self.safety_limits_cache.clear() self.previous_setpoints.clear() def get_loaded_limits_count(self) -> int: """Get the number of loaded safety limits.""" return len(self.safety_limits_cache) def _record_violation( self, station_id: str, pump_id: str, requested: float, enforced: float, violations: List[str] ): """Record safety limit violation in database.""" if not self.db_client: # Database client not available - skip recording return query = """ INSERT INTO safety_limit_violations (station_id, pump_id, requested_setpoint, enforced_setpoint, violations, timestamp) VALUES (:station_id, :pump_id, :requested, :enforced, :violations, datetime('now')) """ self.db_client.execute(query, { "station_id": station_id, "pump_id": pump_id, "requested": requested, "enforced": enforced, "violations": ", ".join(violations) # Convert list to string })