Last modified: 13:37 10/24 Calejo Control Adapter - Implementation Specification v2.0 Project: Calejo Control Adapter Version: 2.0 (with Safety & Security Framework) Date: October 18, 2025 Author: Manus AI Target Audience: Development LLM / Software Engineers Document Revision History Version Date Changes 1.0 2025-10-18 Initial specification 2.0 2025-10-18 Added comprehensive safety and security framework 1. Project Overview 1.1 Purpose Implement a multi-protocol integration adapter that translates optimized pump control plans from Calejo Optimize into real-time control signals for municipal wastewater pump stations. The adapter must support diverse SCADA systems with minimal configuration through automatic discovery and multiple protocol support. Critical Requirement: The adapter must implement comprehensive safety mechanisms to ensure pump operations remain within safe limits under all conditions, including system failures, communication loss, and cyber attacks. 1.2 Architecture Summary ┌─────────────────────────────────────────────────────────┐ │ Calejo Optimize Container (Existing) │ │ - Optimization Engine │ │ - PostgreSQL Database (pump plans) │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ │ Calejo Control Adapter (NEW - TO BE IMPLEMENTED) │ │ │ │ ┌────────────────────────────────────────────────┐ │ │ │ Core Components: │ │ │ │ 1. Auto-Discovery Module │ │ │ │ 2. Security Layer │ │ │ │ 3. Safety Framework ⚠️ NEW │ │ │ │ 4. Plan-to-Setpoint Logic Engine │ │ │ │ 5. Multi-Protocol Server │ │ │ │ - OPC UA Server │ │ │ │ - Modbus TCP Server │ │ │ │ - REST API │ │ │ └────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────┘ ↓ (Multiple Protocols) ↓ ┌─────────────────┼─────────────────┐ ↓ ↓ ↓ Siemens WinCC Schneider EcoStruxure Rockwell FactoryTalk 1.3 Key Requirements The implementation must provide: Auto-Discovery: Automatically discover pump stations and pumps from database Security: Authentication, authorization, encryption, audit logging Safety Framework ⚠️: Multi-layer limits, watchdogs, emergency stop, failsafe mechanisms Plan-to-Setpoint Logic: Transform optimization plans based on pump control type Multi-Protocol Support: OPC UA, Modbus TCP, REST API simultaneously High Availability: Caching, failover, health monitoring Compliance: IEC 62443, NIS2 Directive, ISO 27001 Easy Deployment: Docker container, environment-based configuration 2. Technology Stack 2.1 Required Technologies Component Technology Version Justification Language Python 3.11+ Excellent library support for OPC UA, Modbus, REST Database Client psycopg2 2.9+ PostgreSQL driver for database queries OPC UA Server asyncua 1.0+ Modern async OPC UA implementation Modbus Server pymodbus 3.5+ Industry-standard Modbus TCP/RTU library REST API FastAPI 0.104+ High-performance async REST framework Security cryptography, PyJWT Latest TLS/SSL, JWT tokens, certificate management Configuration pydantic-settings 2.0+ Type-safe environment variable management Logging structlog 23.0+ Structured logging for production monitoring Alerting aiosmtplib, twilio Latest Email and SMS alerts Container Docker 24.0+ Containerization for deployment 2.2 Development Dependencies # requirements.txt asyncua==1.0.6 pymodbus==3.5.4 fastapi==0.104.1 uvicorn[standard]==0.24.0 psycopg2-binary==2.9.9 pydantic==2.5.0 pydantic-settings==2.1.0 cryptography==41.0.7 PyJWT==2.8.0 structlog==23.2.0 python-dotenv==1.0.0 redis==5.0.1 # For distributed caching (optional) prometheus-client==0.19.0 # For metrics aiosmtplib==3.0.1 # For email alerts twilio==8.10.0 # For SMS alerts httpx==0.25.0 # For webhook alerts 3. Database Schema 3.1 Core Tables 3.1.1 pump_stations Metadata about pump stations. CREATE TABLE pump_stations ( station_id VARCHAR(50) PRIMARY KEY, station_name VARCHAR(200) NOT NULL, location VARCHAR(200), latitude DECIMAL(10, 8), longitude DECIMAL(11, 8), timezone VARCHAR(50) DEFAULT 'Europe/Rome', active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); 3.1.2 pumps Metadata about individual pumps. CREATE TABLE pumps ( pump_id VARCHAR(50) NOT NULL, station_id VARCHAR(50) NOT NULL, pump_name VARCHAR(200), pump_type VARCHAR(50), -- 'SUBMERSIBLE', 'CENTRIFUGAL', etc. control_type VARCHAR(50) NOT NULL, -- 'LEVEL_CONTROLLED', 'POWER_CONTROLLED', 'DIRECT_SPEED' manufacturer VARCHAR(100), model VARCHAR(100), rated_power_kw DECIMAL(10, 2), min_speed_hz DECIMAL(5, 2) DEFAULT 20.0, max_speed_hz DECIMAL(5, 2) DEFAULT 50.0, -- Default setpoint (used in failsafe mode) default_setpoint_hz DECIMAL(5, 2) NOT NULL DEFAULT 35.0, -- Control-specific parameters (JSON) control_parameters JSONB, active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), PRIMARY KEY (station_id, pump_id), FOREIGN KEY (station_id) REFERENCES pump_stations(station_id) ); COMMENT ON COLUMN pumps.default_setpoint_hz IS 'Default safe setpoint used in failsafe mode (existing pump configuration)'; 3.1.3 pump_plans Optimization plans generated by Calejo Optimize. CREATE TABLE pump_plans ( plan_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, interval_start TIMESTAMP NOT NULL, interval_end TIMESTAMP NOT NULL, -- Optimization outputs target_flow_m3h DECIMAL(10, 2), target_power_kw DECIMAL(10, 2), target_level_m DECIMAL(5, 2), suggested_speed_hz DECIMAL(5, 2), -- Metadata plan_created_at TIMESTAMP DEFAULT NOW(), optimization_run_id INTEGER, FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) ); CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end); 3.1.4 pump_feedback Real-time feedback from pumps. CREATE TABLE pump_feedback ( feedback_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, timestamp TIMESTAMP NOT NULL DEFAULT NOW(), -- Actual measurements actual_speed_hz DECIMAL(5, 2), actual_power_kw DECIMAL(10, 2), actual_flow_m3h DECIMAL(10, 2), wet_well_level_m DECIMAL(5, 2), pump_running BOOLEAN, -- Status alarm_active BOOLEAN DEFAULT FALSE, alarm_code VARCHAR(50), FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) ); CREATE INDEX idx_pump_feedback_latest ON pump_feedback(station_id, pump_id, timestamp DESC); 3.2 Safety and Security Tables 3.2.1 pump_safety_limits Purpose: Define hard operational limits that cannot be exceeded by optimization or manual control. CREATE TABLE pump_safety_limits ( station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, -- Speed limits (Layer 2: Station Safety Limits) hard_min_speed_hz DECIMAL(5, 2) NOT NULL, hard_max_speed_hz DECIMAL(5, 2) NOT NULL, -- Level limits hard_min_level_m DECIMAL(5, 2), hard_max_level_m DECIMAL(5, 2), emergency_stop_level_m DECIMAL(5, 2), -- Emergency stop if exceeded dry_run_protection_level_m DECIMAL(5, 2), -- Stop pump if level too low -- Power and flow limits hard_max_power_kw DECIMAL(10, 2), hard_max_flow_m3h DECIMAL(10, 2), -- Operational limits max_starts_per_hour INTEGER DEFAULT 6, min_run_time_seconds INTEGER DEFAULT 300, max_continuous_run_hours INTEGER DEFAULT 24, -- Rate of change limits (prevent sudden changes that damage equipment) max_speed_change_hz_per_min DECIMAL(5, 2) DEFAULT 5.0, -- Metadata set_by VARCHAR(100), set_at TIMESTAMP DEFAULT NOW(), approved_by VARCHAR(100), -- Dual approval approved_at TIMESTAMP, notes TEXT, PRIMARY KEY (station_id, pump_id), FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id), -- Constraints CONSTRAINT check_speed_limits CHECK (hard_min_speed_hz >= 15 AND hard_max_speed_hz <= 55), CONSTRAINT check_min_max CHECK (hard_min_speed_hz < hard_max_speed_hz), CONSTRAINT check_approved CHECK (approved_by IS NULL OR approved_by != set_by) -- Dual approval ); COMMENT ON TABLE pump_safety_limits IS 'Hard operational limits enforced by Calejo Control adapter (Layer 2)'; COMMENT ON COLUMN pump_safety_limits.hard_min_speed_hz IS 'Minimum speed enforced by adapter (must be >= PLC physical limit)'; COMMENT ON COLUMN pump_safety_limits.hard_max_speed_hz IS 'Maximum speed enforced by adapter (must be <= PLC physical limit)'; 3.2.2 safety_limit_violations Purpose: Audit trail of all safety limit violations. CREATE TABLE safety_limit_violations ( violation_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, requested_setpoint DECIMAL(5, 2), enforced_setpoint DECIMAL(5, 2), violations TEXT[], -- Array of violation descriptions timestamp TIMESTAMP DEFAULT NOW(), FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) ); CREATE INDEX idx_violations_timestamp ON safety_limit_violations(timestamp DESC); COMMENT ON TABLE safety_limit_violations IS 'Audit trail of safety limit violations (immutable)'; 3.2.3 failsafe_events Purpose: Record when adapter enters failsafe mode (e.g., database timeout). CREATE TABLE failsafe_events ( event_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, event_type VARCHAR(50) NOT NULL, -- 'DATABASE_TIMEOUT', 'COMMUNICATION_LOSS', 'INVALID_DATA' default_setpoint DECIMAL(5, 2), triggered_by VARCHAR(100), timestamp TIMESTAMP DEFAULT NOW(), cleared_at TIMESTAMP, notes TEXT, FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) ); CREATE INDEX idx_failsafe_events_active ON failsafe_events(station_id, pump_id, timestamp DESC) WHERE cleared_at IS NULL; COMMENT ON TABLE failsafe_events IS 'Record of failsafe mode activations'; 3.2.4 emergency_stop_events Purpose: Record emergency stop activations ("big red button"). CREATE TABLE emergency_stop_events ( event_id SERIAL PRIMARY KEY, station_id VARCHAR(50), -- NULL = all stations pump_id VARCHAR(50), -- NULL = all pumps at station triggered_by VARCHAR(100) NOT NULL, reason TEXT NOT NULL, timestamp TIMESTAMP DEFAULT NOW(), cleared_at TIMESTAMP, cleared_by VARCHAR(100), clear_notes TEXT, FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) ); CREATE INDEX idx_emergency_stop_active ON emergency_stop_events(timestamp DESC) WHERE cleared_at IS NULL; COMMENT ON TABLE emergency_stop_events IS 'Emergency stop event log (big red button)'; 3.2.5 audit_log Purpose: Immutable audit trail for compliance (IEC 62443, ISO 27001, NIS2). CREATE TABLE audit_log ( log_id BIGSERIAL PRIMARY KEY, timestamp TIMESTAMP DEFAULT NOW(), event_type VARCHAR(50) NOT NULL, severity VARCHAR(20) NOT NULL, -- 'INFO', 'WARNING', 'CRITICAL' station_id VARCHAR(50), pump_id VARCHAR(50), user_id VARCHAR(100), ip_address INET, protocol VARCHAR(20), -- 'OPC_UA', 'MODBUS', 'REST_API' action VARCHAR(100), resource VARCHAR(200), result VARCHAR(20), -- 'SUCCESS', 'FAILURE', 'DENIED' event_data JSONB ); CREATE INDEX idx_audit_log_timestamp ON audit_log(timestamp DESC); CREATE INDEX idx_audit_log_severity ON audit_log(severity, timestamp DESC); CREATE INDEX idx_audit_log_user ON audit_log(user_id, timestamp DESC); -- Make audit log immutable (append-only) CREATE RULE audit_log_no_update AS ON UPDATE TO audit_log DO INSTEAD NOTHING; CREATE RULE audit_log_no_delete AS ON DELETE TO audit_log DO INSTEAD NOTHING; COMMENT ON TABLE audit_log IS 'Immutable audit trail for compliance (IEC 62443, ISO 27001, NIS2 Directive)'; 3.3 Database User Permissions -- Create read-only user for adapter CREATE USER control_reader WITH PASSWORD 'secure_password'; GRANT CONNECT ON DATABASE calejo TO control_reader; GRANT USAGE ON SCHEMA public TO control_reader; GRANT SELECT ON pump_stations, pumps, pump_plans, pump_feedback, pump_safety_limits TO control_reader; GRANT INSERT ON safety_limit_violations, failsafe_events, emergency_stop_events, audit_log TO control_reader; GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO control_reader; 4. Core Component Specifications 4.1 Auto-Discovery Module File: src/core/auto_discovery.py Purpose: Automatically discover pump stations and pumps from database on startup. (Implementation unchanged from v1.0 - see previous specification) 4.2 Security Layer File: src/core/security.py Purpose: Provide authentication, authorization, encryption, and audit logging. (Implementation from v1.0, with additions below) 4.2.1 Compliance Requirements The security layer must meet the following standards for Italian utilities: IEC 62443 (Industrial Cybersecurity): Zone and conduit model (IT network, DMZ, OT network) Defense in depth Role-based access control (RBAC) Audit logging ISO 27001 (Information Security Management): Risk assessment Access control Cryptography Incident management NIS2 Directive (EU Critical Infrastructure): Risk management measures Incident reporting Supply chain security Cybersecurity training Implementation: class ComplianceManager: """ Ensure compliance with IEC 62443, ISO 27001, and NIS2 Directive. """ def __init__(self, audit_logger: AuditLogger): self.audit_logger = audit_logger def log_compliance_event( self, standard: str, # 'IEC_62443', 'ISO_27001', 'NIS2' requirement: str, event_type: str, details: dict ): """Log compliance-related event.""" self.audit_logger.log( event_type='COMPLIANCE', severity='INFO', event_data={ 'standard': standard, 'requirement': requirement, 'event_type': event_type, 'details': details } ) 4.3 Safety Framework ⚠️ NEW File: src/core/safety.py Purpose: Multi-layer safety mechanisms to prevent equipment damage and operational hazards. 4.3.1 Safety Limit Enforcer Purpose: Enforce hard operational limits on all setpoints (last line of defense). from typing import Tuple, List, Optional, Dict from dataclasses import dataclass import structlog logger = structlog.get_logger() @dataclass class SafetyLimits: """Safety limits for a pump.""" hard_min_speed_hz: float hard_max_speed_hz: float hard_min_level_m: Optional[float] hard_max_level_m: Optional[float] hard_max_power_kw: Optional[float] max_speed_change_hz_per_min: float class SafetyLimitEnforcer: """ Enforces multi-layer safety limits on all setpoints. This is the LAST line of defense before setpoints are exposed to SCADA. ALL setpoints MUST pass through this enforcer. Three-Layer Architecture: - Layer 1: Physical Hard Limits (PLC/VFD) - 15-55 Hz - Layer 2: Station Safety Limits (Database) - 20-50 Hz (enforced here) - Layer 3: Optimization Constraints (Calejo Optimize) - 25-45 Hz """ def __init__(self, db_client: DatabaseClient, audit_logger: AuditLogger): self.db_client = db_client self.audit_logger = audit_logger self.safety_limits_cache: Dict[Tuple[str, str], SafetyLimits] = {} self.previous_setpoints: Dict[Tuple[str, str], float] = {} self._load_safety_limits() def _load_safety_limits(self): """Load all safety limits from database into cache.""" query = """ SELECT station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz, hard_min_level_m, hard_max_level_m, hard_max_power_kw, max_speed_change_hz_per_min FROM pump_safety_limits WHERE approved_at IS NOT NULL -- Only use approved limits """ limits = self.db_client.execute_query(query) for limit in limits: key = (limit['station_id'], limit['pump_id']) self.safety_limits_cache[key] = SafetyLimits( hard_min_speed_hz=limit['hard_min_speed_hz'], hard_max_speed_hz=limit['hard_max_speed_hz'], hard_min_level_m=limit.get('hard_min_level_m'), hard_max_level_m=limit.get('hard_max_level_m'), hard_max_power_kw=limit.get('hard_max_power_kw'), max_speed_change_hz_per_min=limit['max_speed_change_hz_per_min'] ) logger.info("safety_limits_loaded", pump_count=len(limits)) def enforce_setpoint( self, station_id: str, pump_id: str, setpoint: float ) -> Tuple[float, List[str]]: """ Enforce safety limits on setpoint. Args: station_id: Station identifier pump_id: Pump identifier setpoint: Proposed setpoint (Hz) Returns: Tuple of (enforced_setpoint, violations) - enforced_setpoint: Safe setpoint (clamped if necessary) - violations: List of safety violations (for logging/alerting) """ violations = [] enforced_setpoint = setpoint # Get safety limits key = (station_id, pump_id) limits = self.safety_limits_cache.get(key) if not limits: logger.error( "no_safety_limits", station_id=station_id, pump_id=pump_id ) # CRITICAL: No safety limits defined - reject setpoint self.audit_logger.log( event_type='SAFETY_ERROR', severity='CRITICAL', station_id=station_id, pump_id=pump_id, action='enforce_setpoint', result='REJECTED', event_data={'reason': 'NO_SAFETY_LIMITS_DEFINED'} ) return (0.0, ["NO_SAFETY_LIMITS_DEFINED"]) # Check minimum speed if enforced_setpoint < limits.hard_min_speed_hz: violations.append( f"BELOW_MIN_SPEED: {enforced_setpoint:.2f} < {limits.hard_min_speed_hz:.2f}" ) enforced_setpoint = limits.hard_min_speed_hz # Check maximum speed if enforced_setpoint > limits.hard_max_speed_hz: violations.append( f"ABOVE_MAX_SPEED: {enforced_setpoint:.2f} > {limits.hard_max_speed_hz:.2f}" ) enforced_setpoint = limits.hard_max_speed_hz # Check rate of change (prevent sudden speed changes that damage equipment) previous_setpoint = self.previous_setpoints.get(key) if previous_setpoint is not None: max_change = limits.max_speed_change_hz_per_min * 5.0 # 5-minute interval actual_change = abs(enforced_setpoint - previous_setpoint) if actual_change > max_change: # Limit rate of change direction = 1 if enforced_setpoint > previous_setpoint else -1 enforced_setpoint = previous_setpoint + (direction * max_change) violations.append( f"RATE_OF_CHANGE_LIMITED: {actual_change:.2f} Hz > {max_change:.2f} Hz" ) # Store current setpoint for next rate-of-change check self.previous_setpoints[key] = enforced_setpoint # Log violations if violations: logger.warning( "safety_limit_violation", station_id=station_id, pump_id=pump_id, requested_setpoint=setpoint, enforced_setpoint=enforced_setpoint, violations=violations ) # Record violation in database for audit self._record_violation(station_id, pump_id, setpoint, enforced_setpoint, violations) # Log to audit trail self.audit_logger.log( event_type='SAFETY_VIOLATION', severity='WARNING', station_id=station_id, pump_id=pump_id, action='enforce_setpoint', result='CLAMPED', event_data={ 'requested': setpoint, 'enforced': enforced_setpoint, 'violations': violations } ) return (enforced_setpoint, violations) def _record_violation( self, station_id: str, pump_id: str, requested: float, enforced: float, violations: List[str] ): """Record safety limit violation in database.""" query = """ INSERT INTO safety_limit_violations (station_id, pump_id, requested_setpoint, enforced_setpoint, violations, timestamp) VALUES (%s, %s, %s, %s, %s, NOW()) """ self.db_client.execute(query, (station_id, pump_id, requested, enforced, violations)) 4.3.2 Database Watchdog Purpose: Detect if optimization hasn't updated plans for 20 minutes and revert to safe defaults. import asyncio from datetime import datetime, timedelta class DatabaseWatchdog: """ Monitors database updates and triggers failsafe mode if updates stop. Requirement: Detect if Calejo Optimize hasn't updated pump plans for 20 minutes. Action: Revert to default safe setpoints (existing pump configuration). """ def __init__( self, db_client: DatabaseClient, audit_logger: AuditLogger, alert_manager: AlertManager, timeout_seconds: int = 1200, # 20 minutes check_interval_seconds: int = 60 # Check every minute ): self.db_client = db_client self.audit_logger = audit_logger self.alert_manager = alert_manager self.timeout_seconds = timeout_seconds self.check_interval_seconds = check_interval_seconds self.failsafe_mode: Dict[Tuple[str, str], bool] = {} async def start(self): """Start watchdog monitoring loop.""" logger.info( "database_watchdog_started", timeout_seconds=self.timeout_seconds, check_interval_seconds=self.check_interval_seconds ) while True: await asyncio.sleep(self.check_interval_seconds) await self._check_updates() async def _check_updates(self): """Check for stale data and trigger failsafe if needed.""" now = datetime.now() # Query last update time for all pumps query = """ SELECT station_id, pump_id, MAX(plan_created_at) as last_update FROM pump_plans GROUP BY station_id, pump_id """ results = self.db_client.execute_query(query) for row in results: key = (row['station_id'], row['pump_id']) last_update = row['last_update'] if last_update: time_since_update = (now - last_update).total_seconds() # Check if update is stale if time_since_update > self.timeout_seconds: if not self.failsafe_mode.get(key, False): # Trigger failsafe mode await self._trigger_failsafe( row['station_id'], row['pump_id'], time_since_update, last_update ) self.failsafe_mode[key] = True else: # Updates are current, exit failsafe mode if active if self.failsafe_mode.get(key, False): await self._clear_failsafe(row['station_id'], row['pump_id']) self.failsafe_mode[key] = False async def _trigger_failsafe( self, station_id: str, pump_id: str, time_since_update: float, last_update: datetime ): """ Trigger failsafe mode for a pump. Sets pump to default safe setpoint (existing configuration). """ logger.critical( "watchdog_failsafe_triggered", station_id=station_id, pump_id=pump_id, time_since_update_seconds=time_since_update, last_update=last_update.isoformat() ) # Get default safe setpoint from pump configuration default_setpoint = await self._get_default_setpoint(station_id, pump_id) # Record failsafe event query = """ INSERT INTO failsafe_events (station_id, pump_id, event_type, default_setpoint, triggered_by, timestamp) VALUES (%s, %s, 'DATABASE_TIMEOUT', %s, 'DATABASE_WATCHDOG', NOW()) """ self.db_client.execute(query, (station_id, pump_id, default_setpoint)) # Log to audit trail self.audit_logger.log( event_type='FAILSAFE_TRIGGERED', severity='CRITICAL', station_id=station_id, pump_id=pump_id, action='trigger_failsafe', result='SUCCESS', event_data={ 'reason': 'DATABASE_TIMEOUT', 'time_since_update_seconds': time_since_update, 'default_setpoint': default_setpoint } ) # Send alert to operators await self.alert_manager.send_alert( severity='CRITICAL', title=f'FAILSAFE MODE: {station_id}/{pump_id}', message=( f"No database updates for {time_since_update/60:.1f} minutes. " f"Reverting to default setpoint: {default_setpoint} Hz. " f"Last update: {last_update.isoformat()}" ), station_id=station_id, pump_id=pump_id ) async def _clear_failsafe(self, station_id: str, pump_id: str): """Clear failsafe mode when database updates resume.""" logger.info( "watchdog_failsafe_cleared", station_id=station_id, pump_id=pump_id ) # Update failsafe event query = """ UPDATE failsafe_events SET cleared_at = NOW() WHERE station_id = %s AND pump_id = %s AND cleared_at IS NULL """ self.db_client.execute(query, (station_id, pump_id)) # Log to audit trail self.audit_logger.log( event_type='FAILSAFE_CLEARED', severity='INFO', station_id=station_id, pump_id=pump_id, action='clear_failsafe', result='SUCCESS', event_data={'reason': 'DATABASE_UPDATES_RESUMED'} ) # Send alert await self.alert_manager.send_alert( severity='INFO', title=f'Failsafe Cleared: {station_id}/{pump_id}', message='Database updates resumed. Returning to normal operation.', station_id=station_id, pump_id=pump_id ) async def _get_default_setpoint(self, station_id: str, pump_id: str) -> float: """ Get default safe setpoint for pump (existing configuration). Returns pump's configured default_setpoint_hz. """ query = """ SELECT default_setpoint_hz FROM pumps WHERE station_id = %s AND pump_id = %s """ result = self.db_client.execute_query(query, (station_id, pump_id)) if result and result[0]['default_setpoint_hz']: return float(result[0]['default_setpoint_hz']) # Ultimate fallback (should never reach here) logger.error( "no_default_setpoint_configured", station_id=station_id, pump_id=pump_id ) return 35.0 # Conservative fallback 4.3.3 Emergency Stop Manager Purpose: Provide "big red button" to immediately halt optimization control. class EmergencyStopManager: """ Manages emergency stop functionality ("big red button"). Provides immediate halt of optimization control and reversion to safe defaults. Supports both single-pump and system-wide emergency stop. """ def __init__( self, db_client: DatabaseClient, audit_logger: AuditLogger, alert_manager: AlertManager ): self.db_client = db_client self.audit_logger = audit_logger self.alert_manager = alert_manager self.emergency_stop_active: Dict[Tuple[Optional[str], Optional[str]], int] = {} async def trigger_emergency_stop( self, triggered_by: str, reason: str, station_id: Optional[str] = None, pump_id: Optional[str] = None ) -> int: """ Trigger emergency stop. Args: triggered_by: User or system that triggered stop reason: Reason for emergency stop station_id: Optional - limit to specific station (None = all stations) pump_id: Optional - limit to specific pump (None = all pumps at station) Returns: event_id: Emergency stop event ID """ logger.critical( "emergency_stop_triggered", triggered_by=triggered_by, reason=reason, station_id=station_id, pump_id=pump_id ) # Record event query = """ INSERT INTO emergency_stop_events (station_id, pump_id, triggered_by, reason, timestamp) VALUES (%s, %s, %s, %s, NOW()) RETURNING event_id """ result = self.db_client.execute(query, (station_id, pump_id, triggered_by, reason)) event_id = result[0]['event_id'] # Set emergency stop flag key = (station_id, pump_id) self.emergency_stop_active[key] = event_id # Set all affected pumps to default setpoints affected_pumps = await self._get_affected_pumps(station_id, pump_id) for pump in affected_pumps: await self._set_pump_to_default( pump['station_id'], pump['pump_id'], event_id ) # Log to audit trail self.audit_logger.log( event_type='EMERGENCY_STOP', severity='CRITICAL', station_id=station_id, pump_id=pump_id, user_id=triggered_by, action='trigger_emergency_stop', result='SUCCESS', event_data={ 'event_id': event_id, 'reason': reason, 'affected_pumps': len(affected_pumps) } ) # Send alerts await self.alert_manager.send_alert( severity='CRITICAL', title='EMERGENCY STOP ACTIVATED', message=( f"Emergency stop triggered by {triggered_by}. " f"Reason: {reason}. " f"Scope: {self._get_scope_description(station_id, pump_id)}. " f"Affected pumps: {len(affected_pumps)}" ), station_id=station_id, pump_id=pump_id ) return event_id async def clear_emergency_stop( self, event_id: int, cleared_by: str, notes: str ): """Clear emergency stop and resume normal operation.""" logger.info( "emergency_stop_cleared", event_id=event_id, cleared_by=cleared_by ) # Update event record query = """ UPDATE emergency_stop_events SET cleared_at = NOW(), cleared_by = %s, clear_notes = %s WHERE event_id = %s RETURNING station_id, pump_id """ result = self.db_client.execute(query, (cleared_by, notes, event_id)) if result: station_id = result[0]['station_id'] pump_id = result[0]['pump_id'] # Clear emergency stop flag key = (station_id, pump_id) if key in self.emergency_stop_active: del self.emergency_stop_active[key] # Log to audit trail self.audit_logger.log( event_type='EMERGENCY_STOP_CLEARED', severity='INFO', station_id=station_id, pump_id=pump_id, user_id=cleared_by, action='clear_emergency_stop', result='SUCCESS', event_data={ 'event_id': event_id, 'notes': notes } ) # Send alert await self.alert_manager.send_alert( severity='INFO', title='Emergency Stop Cleared', message=f"Emergency stop cleared by {cleared_by}. Notes: {notes}", station_id=station_id, pump_id=pump_id ) def is_emergency_stop_active( self, station_id: str, pump_id: str ) -> bool: """Check if emergency stop is active for a pump.""" # Check specific pump if (station_id, pump_id) in self.emergency_stop_active: return True # Check station-wide if (station_id, None) in self.emergency_stop_active: return True # Check system-wide if (None, None) in self.emergency_stop_active: return True return False async def _get_affected_pumps( self, station_id: Optional[str], pump_id: Optional[str] ) -> List[Dict]: """Get list of pumps affected by emergency stop.""" if station_id and pump_id: # Single pump query = """ SELECT station_id, pump_id FROM pumps WHERE station_id = %s AND pump_id = %s AND active = TRUE """ return self.db_client.execute_query(query, (station_id, pump_id)) elif station_id: # All pumps at station query = """ SELECT station_id, pump_id FROM pumps WHERE station_id = %s AND active = TRUE """ return self.db_client.execute_query(query, (station_id,)) else: # ALL pumps in system query = """ SELECT station_id, pump_id FROM pumps WHERE active = TRUE """ return self.db_client.execute_query(query) async def _set_pump_to_default( self, station_id: str, pump_id: str, event_id: int ): """Set pump to default safe setpoint.""" # Get default setpoint query = """ SELECT default_setpoint_hz FROM pumps WHERE station_id = %s AND pump_id = %s """ result = self.db_client.execute_query(query, (station_id, pump_id)) if result: default_setpoint = result[0]['default_setpoint_hz'] logger.info( "pump_set_to_default", station_id=station_id, pump_id=pump_id, default_setpoint=default_setpoint, event_id=event_id ) # Note: Actual setpoint override is handled by SetpointManager # This method just logs the action def _get_scope_description( self, station_id: Optional[str], pump_id: Optional[str] ) -> str: """Get human-readable description of emergency stop scope.""" if station_id and pump_id: return f"Single pump ({station_id}/{pump_id})" elif station_id: return f"All pumps at station {station_id}" else: return "ALL PUMPS SYSTEM-WIDE" 4.3.4 Alert Manager Purpose: Multi-channel alerting (email, SMS, SCADA, webhooks). import aiosmtplib from email.message import EmailMessage from twilio.rest import Client as TwilioClient import httpx class AlertManager: """ Send alerts via multiple channels. Supports: - Email (SMTP) - SMS (Twilio) - SCADA HMI alarms (OPC UA) - Webhooks (HTTP POST) """ def __init__(self, config: AlertConfig): self.config = config self.twilio_client = None if config.sms_enabled: self.twilio_client = TwilioClient( config.twilio_account_sid, config.twilio_auth_token ) async def send_alert( self, severity: str, # 'INFO', 'WARNING', 'CRITICAL' title: str, message: str, station_id: Optional[str] = None, pump_id: Optional[str] = None ): """Send alert via all configured channels.""" # Email if self.config.email_enabled: await self._send_email(severity, title, message, station_id, pump_id) # SMS (for critical alerts only) if severity == 'CRITICAL' and self.config.sms_enabled: await self._send_sms(title, message) # SCADA HMI alarm if self.config.scada_alarms_enabled: await self._trigger_scada_alarm(severity, title, message, station_id, pump_id) # Webhook if self.config.webhook_enabled: await self._send_webhook(severity, title, message, station_id, pump_id) async def _send_email( self, severity: str, title: str, message: str, station_id: Optional[str], pump_id: Optional[str] ): """Send email alert via SMTP.""" try: email = EmailMessage() email['From'] = self.config.email_from email['To'] = ', '.join(self.config.email_recipients) email['Subject'] = f"[{severity}] Calejo Control: {title}" body = f""" Calejo Control Alert Severity: {severity} Title: {title} Station: {station_id or 'N/A'} Pump: {pump_id or 'N/A'} Timestamp: {datetime.now().isoformat()} Message: {message} --- This is an automated alert from Calejo Control. """ email.set_content(body) await aiosmtplib.send( email, hostname=self.config.smtp_host, port=self.config.smtp_port, username=self.config.smtp_username, password=self.config.smtp_password, use_tls=self.config.smtp_use_tls ) logger.info("email_alert_sent", severity=severity, title=title) except Exception as e: logger.error("email_alert_failed", error=str(e)) async def _send_sms(self, title: str, message: str): """Send SMS alert via Twilio.""" try: for phone_number in self.config.sms_recipients: self.twilio_client.messages.create( body=f"Calejo Control CRITICAL: {title}. {message[:100]}", from_=self.config.twilio_phone_number, to=phone_number ) logger.info("sms_alert_sent", title=title) except Exception as e: logger.error("sms_alert_failed", error=str(e)) async def _trigger_scada_alarm( self, severity: str, title: str, message: str, station_id: Optional[str], pump_id: Optional[str] ): """Trigger alarm in SCADA HMI via OPC UA.""" # Implementation depends on SCADA system # Typically write to OPC UA alarm node pass async def _send_webhook( self, severity: str, title: str, message: str, station_id: Optional[str], pump_id: Optional[str] ): """Send webhook alert via HTTP POST.""" try: payload = { 'severity': severity, 'title': title, 'message': message, 'station_id': station_id, 'pump_id': pump_id, 'timestamp': datetime.now().isoformat() } async with httpx.AsyncClient() as client: response = await client.post( self.config.webhook_url, json=payload, headers={'Authorization': f'Bearer {self.config.webhook_token}'}, timeout=10.0 ) response.raise_for_status() logger.info("webhook_alert_sent", severity=severity, title=title) except Exception as e: logger.error("webhook_alert_failed", error=str(e)) @dataclass class AlertConfig: """Alert configuration.""" # Email email_enabled: bool = True email_from: str = "calejo-control@example.com" email_recipients: List[str] = None smtp_host: str = "smtp.gmail.com" smtp_port: int = 587 smtp_username: str = "" smtp_password: str = "" smtp_use_tls: bool = True # SMS sms_enabled: bool = True sms_recipients: List[str] = None twilio_account_sid: str = "" twilio_auth_token: str = "" twilio_phone_number: str = "" # SCADA scada_alarms_enabled: bool = True # Webhook webhook_enabled: bool = True webhook_url: str = "" webhook_token: str = "" 4.4 Plan-to-Setpoint Logic Engine File: src/core/setpoint_logic.py (Implementation unchanged from v1.0 - see previous specification) Integration with Safety Framework: class SetpointManager: """ Manages setpoint calculation for all pumps. Integrates with safety framework to enforce limits and handle failsafe mode. """ def __init__( self, discovery: AdapterAutoDiscovery, db_client: DatabaseClient, safety_enforcer: SafetyLimitEnforcer, emergency_stop_manager: EmergencyStopManager, watchdog: DatabaseWatchdog ): self.discovery = discovery self.db_client = db_client self.safety_enforcer = safety_enforcer self.emergency_stop_manager = emergency_stop_manager self.watchdog = watchdog # Create calculator instances self.calculators = { 'DIRECT_SPEED': DirectSpeedCalculator(), 'LEVEL_CONTROLLED': LevelControlledCalculator(), 'POWER_CONTROLLED': PowerControlledCalculator() } def get_current_setpoint(self, station_id: str, pump_id: str) -> Optional[float]: """ Get current setpoint for a pump. Integrates safety checks: 1. Check if emergency stop is active 2. Check if failsafe mode is active 3. Calculate setpoint from optimization plan 4. Enforce safety limits Returns: Setpoint in Hz, or None if no valid plan exists """ # Check emergency stop if self.emergency_stop_manager.is_emergency_stop_active(station_id, pump_id): logger.info( "emergency_stop_active", station_id=station_id, pump_id=pump_id ) return self._get_default_setpoint(station_id, pump_id) # Check failsafe mode if self.watchdog.failsafe_mode.get((station_id, pump_id), False): logger.info( "failsafe_mode_active", station_id=station_id, pump_id=pump_id ) return self._get_default_setpoint(station_id, pump_id) # Get pump info pump_info = self.discovery.get_pump(station_id, pump_id) if not pump_info: logger.error("pump_not_found", station_id=station_id, pump_id=pump_id) return None # Get current optimization plan plan = self.db_client.get_current_plan(station_id, pump_id) if not plan: logger.warning("no_active_plan", station_id=station_id, pump_id=pump_id) return self._get_default_setpoint(station_id, pump_id) # Get latest feedback (optional) feedback = self.db_client.get_latest_feedback(station_id, pump_id) # Get appropriate calculator calculator = self.calculators.get(pump_info.control_type) if not calculator: logger.error("unknown_control_type", control_type=pump_info.control_type) return None # Calculate setpoint setpoint = calculator.calculate_setpoint(plan, feedback, pump_info) # Enforce safety limits (LAST LINE OF DEFENSE) enforced_setpoint, violations = self.safety_enforcer.enforce_setpoint( station_id, pump_id, setpoint ) return enforced_setpoint def _get_default_setpoint(self, station_id: str, pump_id: str) -> float: """Get default safe setpoint from pump configuration.""" query = """ SELECT default_setpoint_hz FROM pumps WHERE station_id = %s AND pump_id = %s """ result = self.db_client.execute_query(query, (station_id, pump_id)) if result and result[0]['default_setpoint_hz']: return float(result[0]['default_setpoint_hz']) # Fallback return 35.0 4.5 Multi-Protocol Server (Implementation unchanged from v1.0 - see previous specification) 5. REST API Endpoints 5.1 Emergency Stop Endpoints from fastapi import FastAPI, Depends, HTTPException, status from pydantic import BaseModel app = FastAPI(title="Calejo Control API", version="2.0") class EmergencyStopRequest(BaseModel): triggered_by: str reason: str station_id: Optional[str] = None pump_id: Optional[str] = None class EmergencyStopClearRequest(BaseModel): cleared_by: str notes: str @app.post( "/api/v1/emergency-stop", summary="Trigger emergency stop", description="Immediately halt optimization control and revert to safe defaults (BIG RED BUTTON)", status_code=status.HTTP_201_CREATED, dependencies=[Depends(verify_admin_permission)] ) async def trigger_emergency_stop(request: EmergencyStopRequest): """ Trigger emergency stop ("big red button"). Requires admin permission. Scope: - If station_id and pump_id provided: Stop single pump - If station_id only: Stop all pumps at station - If neither: Stop ALL pumps system-wide """ event_id = await emergency_stop_manager.trigger_emergency_stop( triggered_by=request.triggered_by, reason=request.reason, station_id=request.station_id, pump_id=request.pump_id ) return { "status": "emergency_stop_triggered", "event_id": event_id, "scope": emergency_stop_manager._get_scope_description( request.station_id, request.pump_id ), "timestamp": datetime.now().isoformat() } @app.post( "/api/v1/emergency-stop/{event_id}/clear", summary="Clear emergency stop", description="Resume normal operation after emergency stop", dependencies=[Depends(verify_admin_permission)] ) async def clear_emergency_stop(event_id: int, request: EmergencyStopClearRequest): """Clear emergency stop and resume normal operation.""" await emergency_stop_manager.clear_emergency_stop( event_id=event_id, cleared_by=request.cleared_by, notes=request.notes ) return { "status": "emergency_stop_cleared", "event_id": event_id, "timestamp": datetime.now().isoformat() } @app.get( "/api/v1/emergency-stop/status", summary="Get emergency stop status", description="Check if any emergency stops are active" ) async def get_emergency_stop_status(): """Get current emergency stop status.""" query = """ SELECT event_id, station_id, pump_id, triggered_by, reason, timestamp FROM emergency_stop_events WHERE cleared_at IS NULL ORDER BY timestamp DESC """ active_stops = db_client.execute_query(query) return { "active_emergency_stops": len(active_stops), "events": active_stops } 5.2 Safety Status Endpoints @app.get( "/api/v1/safety/status", summary="Get safety status", description="Get current safety status for all pumps" ) async def get_safety_status(): """Get safety status dashboard data.""" # Get active violations violations_query = """ SELECT station_id, pump_id, COUNT(*) as violation_count FROM safety_limit_violations WHERE timestamp > NOW() - INTERVAL '1 hour' GROUP BY station_id, pump_id """ violations = db_client.execute_query(violations_query) # Get failsafe mode pumps failsafe_query = """ SELECT station_id, pump_id, event_type, timestamp FROM failsafe_events WHERE cleared_at IS NULL """ failsafe_pumps = db_client.execute_query(failsafe_query) # Get emergency stops emergency_stops_query = """ SELECT event_id, station_id, pump_id, triggered_by, reason, timestamp FROM emergency_stop_events WHERE cleared_at IS NULL """ emergency_stops = db_client.execute_query(emergency_stops_query) return { "timestamp": datetime.now().isoformat(), "safety_violations_last_hour": len(violations), "pumps_in_failsafe_mode": len(failsafe_pumps), "active_emergency_stops": len(emergency_stops), "violations": violations, "failsafe_pumps": failsafe_pumps, "emergency_stops": emergency_stops } @app.get( "/api/v1/safety/violations", summary="Get safety violations", description="Get recent safety limit violations" ) async def get_safety_violations(hours: int = 24, limit: int = 100): """Get recent safety limit violations.""" query = """ SELECT station_id, pump_id, requested_setpoint, enforced_setpoint, violations, timestamp FROM safety_limit_violations WHERE timestamp > NOW() - INTERVAL '%s hours' ORDER BY timestamp DESC LIMIT %s """ violations = db_client.execute_query(query, (hours, limit)) return { "violations": violations, "count": len(violations) } 6. Configuration Management 6.1 Environment Variables File: .env # Database connection DB_HOST=calejo-optimize DB_PORT=5432 DB_NAME=calejo DB_USER=control_reader DB_PASSWORD=secure_password # Station filter (optional) STATION_FILTER= # Security API_KEY=your_api_key_here TLS_ENABLED=true TLS_CERT_PATH=/etc/calejo/ssl/cert.pem TLS_KEY_PATH=/etc/calejo/ssl/key.pem # OPC UA OPCUA_ENABLED=true OPCUA_PORT=4840 OPCUA_SECURITY_MODE=SignAndEncrypt OPCUA_CERT_PATH=/etc/calejo/opcua/cert.der OPCUA_KEY_PATH=/etc/calejo/opcua/key.pem # Modbus TCP MODBUS_ENABLED=true MODBUS_PORT=502 MODBUS_SLAVE_ID=1 # REST API REST_API_ENABLED=true REST_API_PORT=8080 REST_API_CORS_ENABLED=true # Safety - Watchdog WATCHDOG_ENABLED=true WATCHDOG_TIMEOUT_SECONDS=1200 # 20 minutes WATCHDOG_CHECK_INTERVAL_SECONDS=60 # Check every minute # Alerts - Email ALERT_EMAIL_ENABLED=true ALERT_EMAIL_FROM=calejo-control@example.com ALERT_EMAIL_RECIPIENTS=operator1@utility.it,operator2@utility.it SMTP_HOST=smtp.gmail.com SMTP_PORT=587 SMTP_USERNAME=calejo-control@example.com SMTP_PASSWORD=smtp_password SMTP_USE_TLS=true # Alerts - SMS ALERT_SMS_ENABLED=true ALERT_SMS_RECIPIENTS=+393401234567,+393407654321 TWILIO_ACCOUNT_SID=your_twilio_account_sid TWILIO_AUTH_TOKEN=your_twilio_auth_token TWILIO_PHONE_NUMBER=+15551234567 # Alerts - Webhook ALERT_WEBHOOK_ENABLED=true ALERT_WEBHOOK_URL=https://utility-monitoring.example.com/webhook ALERT_WEBHOOK_TOKEN=webhook_bearer_token # Alerts - SCADA ALERT_SCADA_ENABLED=true # Logging LOG_LEVEL=INFO LOG_FORMAT=json AUDIT_LOG_ENABLED=true # Compliance COMPLIANCE_STANDARDS=IEC_62443,ISO_27001,NIS2 7. Implementation Priorities (Updated) Phase 1: Core Foundation (Week 1-2) Project setup Database client Auto-discovery Configuration management Phase 2: Safety Framework (Week 2-3) ⚠️ NEW Safety limit enforcer Database watchdog (20-minute timeout) Emergency stop manager Alert manager (email, SMS, webhook) Safety database tables Audit logging Phase 3: Setpoint Logic (Week 3-4) Implement setpoint calculators Implement SetpointManager with safety integration Unit tests Phase 4: Security Layer (Week 4-5) Authentication Authorization TLS/SSL Compliance logging (IEC 62443, ISO 27001, NIS2) Phase 5: Protocol Servers (Week 5-7) OPC UA Server Modbus TCP Server REST API (including emergency stop endpoints) Phase 6: Integration and Testing (Week 7-8) Integration testing Docker containerization Documentation Phase 7: Production Hardening (Week 8-9) Error handling Monitoring Security hardening 8. Success Criteria (Updated) The implementation is considered complete when: ✅ Auto-discovery successfully discovers all stations and pumps ✅ Safety limit enforcer clamps all setpoints within hard limits ✅ Database watchdog detects 20-minute timeout and triggers failsafe ✅ Emergency stop API works for single pump and system-wide ✅ Multi-channel alerts (email, SMS, webhook) are sent for critical events ✅ All three control types calculate correct setpoints ✅ OPC UA server exposes hierarchical structure ✅ Modbus TCP server responds correctly ✅ REST API provides all endpoints with OpenAPI docs ✅ Security layer enforces authentication and authorization ✅ Audit logging records all safety events (immutable) ✅ Compliance requirements met (IEC 62443, ISO 27001, NIS2) ✅ Container deploys successfully ✅ Integration tests pass with real SCADA systems ✅ Performance meets requirements 9. Compliance Documentation 9.1 IEC 62443 (Industrial Cybersecurity) Implemented Controls: Requirement Implementation SR 1.1 - Human user identification API key authentication, user tracking in audit log SR 1.2 - Software process identification Process-level logging, component identification SR 1.3 - Device identification IP address logging, geofencing SR 1.5 - Authenticator management Secure API key storage, certificate management SR 2.1 - Authorization enforcement Role-based access control (read, write, admin) SR 3.1 - Communication integrity TLS/SSL encryption, OPC UA SignAndEncrypt SR 3.3 - Security functionality verification Health checks, integrity monitoring SR 4.1 - Information confidentiality Encryption at rest and in transit SR 5.1 - Network segmentation DMZ deployment, firewall rules SR 6.1 - Audit log accessibility Immutable audit log, query API SR 7.1 - Denial of service protection Rate limiting, resource monitoring 9.2 ISO 27001 (Information Security) Implemented Controls: Control Implementation A.9 - Access control Authentication, authorization, RBAC A.10 - Cryptography TLS/SSL, certificate-based auth A.12 - Operations security Logging, monitoring, change management A.16 - Incident management Alert system, emergency stop, audit trail A.18 - Compliance Audit logging, compliance reporting 9.3 NIS2 Directive (EU Critical Infrastructure) Implemented Measures: Requirement Implementation Risk management Multi-layer safety limits, failsafe mechanisms Incident handling Emergency stop, alert system, audit trail Business continuity Failover to default setpoints, caching Supply chain security Dependency management, security updates Security monitoring Real-time monitoring, anomaly detection Incident reporting Audit log, compliance reports 10. Additional Notes 10.1 Safety Philosophy The implementation follows the "Defense in Depth" principle with multiple independent safety layers: Physical Layer: PLC/VFD hard limits (15-55 Hz) Adapter Layer: Safety limit enforcer (20-50 Hz) Optimization Layer: Optimization constraints (25-45 Hz) Watchdog Layer: Database timeout detection (20 minutes) Emergency Layer: Big red button (immediate halt) Fail-Safe Design: On any failure, the system reverts to safe defaults (existing pump configuration). 10.2 Utility Sales Benefits Key selling points for Italian utilities: ✅ "Cannot damage equipment" - Three independent safety layers ✅ "Fails safe" - Automatic reversion to safe defaults ✅ "Complete audit trail" - Meets regulatory requirements ✅ "Operators have final control" - Emergency stop always available ✅ "Proven security" - Follows IEC 62443, ISO 27001, NIS2 ✅ "Immediate alerts" - Email, SMS, SCADA, webhooks END OF SPECIFICATION v2.0 This specification provides complete requirements for implementing Calejo Control adapter with comprehensive safety and security framework. Begin with Phase 1 and proceed sequentially.