1756 lines
58 KiB
Plaintext
1756 lines
58 KiB
Plaintext
Last modified: 13:37 10/24
|
|
Calejo Control Adapter - Implementation Specification v2.0
|
|
Project: Calejo Control Adapter
|
|
Version: 2.0 (with Safety & Security Framework)
|
|
Date: October 18, 2025
|
|
Author: Manus AI
|
|
Target Audience: Development LLM / Software Engineers
|
|
Document Revision History
|
|
Version
|
|
|
|
Date
|
|
|
|
Changes
|
|
1.0
|
|
|
|
2025-10-18
|
|
|
|
Initial specification
|
|
2.0
|
|
|
|
2025-10-18
|
|
|
|
Added comprehensive safety and security framework
|
|
1. Project Overview
|
|
1.1 Purpose
|
|
Implement a multi-protocol integration adapter that translates optimized pump control plans from Calejo Optimize into real-time control signals for municipal wastewater pump stations. The adapter must support diverse SCADA systems with minimal configuration through automatic discovery and multiple protocol support.
|
|
Critical Requirement: The adapter must implement comprehensive safety mechanisms to ensure pump operations remain within safe limits under all conditions, including system failures, communication loss, and cyber attacks.
|
|
1.2 Architecture Summary
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ Calejo Optimize Container (Existing) │
|
|
│ - Optimization Engine │
|
|
│ - PostgreSQL Database (pump plans) │
|
|
└─────────────────────────────────────────────────────────┘
|
|
↓
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ Calejo Control Adapter (NEW - TO BE IMPLEMENTED) │
|
|
│ │
|
|
│ ┌────────────────────────────────────────────────┐ │
|
|
│ │ Core Components: │ │
|
|
│ │ 1. Auto-Discovery Module │ │
|
|
│ │ 2. Security Layer │ │
|
|
│ │ 3. Safety Framework ⚠️ NEW │ │
|
|
│ │ 4. Plan-to-Setpoint Logic Engine │ │
|
|
│ │ 5. Multi-Protocol Server │ │
|
|
│ │ - OPC UA Server │ │
|
|
│ │ - Modbus TCP Server │ │
|
|
│ │ - REST API │ │
|
|
│ └────────────────────────────────────────────────┘ │
|
|
└─────────────────────────────────────────────────────────┘
|
|
↓
|
|
(Multiple Protocols)
|
|
↓
|
|
┌─────────────────┼─────────────────┐
|
|
↓ ↓ ↓
|
|
Siemens WinCC Schneider EcoStruxure Rockwell FactoryTalk
|
|
1.3 Key Requirements
|
|
The implementation must provide:
|
|
Auto-Discovery: Automatically discover pump stations and pumps from database
|
|
Security: Authentication, authorization, encryption, audit logging
|
|
Safety Framework ⚠️: Multi-layer limits, watchdogs, emergency stop, failsafe mechanisms
|
|
Plan-to-Setpoint Logic: Transform optimization plans based on pump control type
|
|
Multi-Protocol Support: OPC UA, Modbus TCP, REST API simultaneously
|
|
High Availability: Caching, failover, health monitoring
|
|
Compliance: IEC 62443, NIS2 Directive, ISO 27001
|
|
Easy Deployment: Docker container, environment-based configuration
|
|
2. Technology Stack
|
|
2.1 Required Technologies
|
|
Component
|
|
|
|
Technology
|
|
|
|
Version
|
|
|
|
Justification
|
|
Language
|
|
|
|
Python
|
|
|
|
3.11+
|
|
|
|
Excellent library support for OPC UA, Modbus, REST
|
|
Database Client
|
|
|
|
psycopg2
|
|
|
|
2.9+
|
|
|
|
PostgreSQL driver for database queries
|
|
OPC UA Server
|
|
|
|
asyncua
|
|
|
|
1.0+
|
|
|
|
Modern async OPC UA implementation
|
|
Modbus Server
|
|
|
|
pymodbus
|
|
|
|
3.5+
|
|
|
|
Industry-standard Modbus TCP/RTU library
|
|
REST API
|
|
|
|
FastAPI
|
|
|
|
0.104+
|
|
|
|
High-performance async REST framework
|
|
Security
|
|
|
|
cryptography, PyJWT
|
|
|
|
Latest
|
|
|
|
TLS/SSL, JWT tokens, certificate management
|
|
Configuration
|
|
|
|
pydantic-settings
|
|
|
|
2.0+
|
|
|
|
Type-safe environment variable management
|
|
Logging
|
|
|
|
structlog
|
|
|
|
23.0+
|
|
|
|
Structured logging for production monitoring
|
|
Alerting
|
|
|
|
aiosmtplib, twilio
|
|
|
|
Latest
|
|
|
|
Email and SMS alerts
|
|
Container
|
|
|
|
Docker
|
|
|
|
24.0+
|
|
|
|
Containerization for deployment
|
|
2.2 Development Dependencies
|
|
# requirements.txt
|
|
asyncua==1.0.6
|
|
pymodbus==3.5.4
|
|
fastapi==0.104.1
|
|
uvicorn[standard]==0.24.0
|
|
psycopg2-binary==2.9.9
|
|
pydantic==2.5.0
|
|
pydantic-settings==2.1.0
|
|
cryptography==41.0.7
|
|
PyJWT==2.8.0
|
|
structlog==23.2.0
|
|
python-dotenv==1.0.0
|
|
redis==5.0.1 # For distributed caching (optional)
|
|
prometheus-client==0.19.0 # For metrics
|
|
aiosmtplib==3.0.1 # For email alerts
|
|
twilio==8.10.0 # For SMS alerts
|
|
httpx==0.25.0 # For webhook alerts
|
|
3. Database Schema
|
|
3.1 Core Tables
|
|
3.1.1 pump_stations
|
|
Metadata about pump stations.
|
|
CREATE TABLE pump_stations (
|
|
station_id VARCHAR(50) PRIMARY KEY,
|
|
station_name VARCHAR(200) NOT NULL,
|
|
location VARCHAR(200),
|
|
latitude DECIMAL(10, 8),
|
|
longitude DECIMAL(11, 8),
|
|
timezone VARCHAR(50) DEFAULT 'Europe/Rome',
|
|
active BOOLEAN DEFAULT TRUE,
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
updated_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
3.1.2 pumps
|
|
Metadata about individual pumps.
|
|
CREATE TABLE pumps (
|
|
pump_id VARCHAR(50) NOT NULL,
|
|
station_id VARCHAR(50) NOT NULL,
|
|
pump_name VARCHAR(200),
|
|
pump_type VARCHAR(50), -- 'SUBMERSIBLE', 'CENTRIFUGAL', etc.
|
|
control_type VARCHAR(50) NOT NULL, -- 'LEVEL_CONTROLLED', 'POWER_CONTROLLED', 'DIRECT_SPEED'
|
|
manufacturer VARCHAR(100),
|
|
model VARCHAR(100),
|
|
rated_power_kw DECIMAL(10, 2),
|
|
min_speed_hz DECIMAL(5, 2) DEFAULT 20.0,
|
|
max_speed_hz DECIMAL(5, 2) DEFAULT 50.0,
|
|
|
|
-- Default setpoint (used in failsafe mode)
|
|
default_setpoint_hz DECIMAL(5, 2) NOT NULL DEFAULT 35.0,
|
|
|
|
-- Control-specific parameters (JSON)
|
|
control_parameters JSONB,
|
|
|
|
active BOOLEAN DEFAULT TRUE,
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
updated_at TIMESTAMP DEFAULT NOW(),
|
|
|
|
PRIMARY KEY (station_id, pump_id),
|
|
FOREIGN KEY (station_id) REFERENCES pump_stations(station_id)
|
|
);
|
|
|
|
COMMENT ON COLUMN pumps.default_setpoint_hz IS 'Default safe setpoint used in failsafe mode (existing pump configuration)';
|
|
3.1.3 pump_plans
|
|
Optimization plans generated by Calejo Optimize.
|
|
CREATE TABLE pump_plans (
|
|
plan_id SERIAL PRIMARY KEY,
|
|
station_id VARCHAR(50) NOT NULL,
|
|
pump_id VARCHAR(50) NOT NULL,
|
|
interval_start TIMESTAMP NOT NULL,
|
|
interval_end TIMESTAMP NOT NULL,
|
|
|
|
-- Optimization outputs
|
|
target_flow_m3h DECIMAL(10, 2),
|
|
target_power_kw DECIMAL(10, 2),
|
|
target_level_m DECIMAL(5, 2),
|
|
suggested_speed_hz DECIMAL(5, 2),
|
|
|
|
-- Metadata
|
|
plan_created_at TIMESTAMP DEFAULT NOW(),
|
|
optimization_run_id INTEGER,
|
|
|
|
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
|
|
);
|
|
|
|
CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end);
|
|
3.1.4 pump_feedback
|
|
Real-time feedback from pumps.
|
|
CREATE TABLE pump_feedback (
|
|
feedback_id SERIAL PRIMARY KEY,
|
|
station_id VARCHAR(50) NOT NULL,
|
|
pump_id VARCHAR(50) NOT NULL,
|
|
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
|
|
|
|
-- Actual measurements
|
|
actual_speed_hz DECIMAL(5, 2),
|
|
actual_power_kw DECIMAL(10, 2),
|
|
actual_flow_m3h DECIMAL(10, 2),
|
|
wet_well_level_m DECIMAL(5, 2),
|
|
pump_running BOOLEAN,
|
|
|
|
-- Status
|
|
alarm_active BOOLEAN DEFAULT FALSE,
|
|
alarm_code VARCHAR(50),
|
|
|
|
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
|
|
);
|
|
|
|
CREATE INDEX idx_pump_feedback_latest ON pump_feedback(station_id, pump_id, timestamp DESC);
|
|
3.2 Safety and Security Tables
|
|
3.2.1 pump_safety_limits
|
|
Purpose: Define hard operational limits that cannot be exceeded by optimization or manual control.
|
|
CREATE TABLE pump_safety_limits (
|
|
station_id VARCHAR(50) NOT NULL,
|
|
pump_id VARCHAR(50) NOT NULL,
|
|
|
|
-- Speed limits (Layer 2: Station Safety Limits)
|
|
hard_min_speed_hz DECIMAL(5, 2) NOT NULL,
|
|
hard_max_speed_hz DECIMAL(5, 2) NOT NULL,
|
|
|
|
-- Level limits
|
|
hard_min_level_m DECIMAL(5, 2),
|
|
hard_max_level_m DECIMAL(5, 2),
|
|
emergency_stop_level_m DECIMAL(5, 2), -- Emergency stop if exceeded
|
|
dry_run_protection_level_m DECIMAL(5, 2), -- Stop pump if level too low
|
|
|
|
-- Power and flow limits
|
|
hard_max_power_kw DECIMAL(10, 2),
|
|
hard_max_flow_m3h DECIMAL(10, 2),
|
|
|
|
-- Operational limits
|
|
max_starts_per_hour INTEGER DEFAULT 6,
|
|
min_run_time_seconds INTEGER DEFAULT 300,
|
|
max_continuous_run_hours INTEGER DEFAULT 24,
|
|
|
|
-- Rate of change limits (prevent sudden changes that damage equipment)
|
|
max_speed_change_hz_per_min DECIMAL(5, 2) DEFAULT 5.0,
|
|
|
|
-- Metadata
|
|
set_by VARCHAR(100),
|
|
set_at TIMESTAMP DEFAULT NOW(),
|
|
approved_by VARCHAR(100), -- Dual approval
|
|
approved_at TIMESTAMP,
|
|
notes TEXT,
|
|
|
|
PRIMARY KEY (station_id, pump_id),
|
|
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id),
|
|
|
|
-- Constraints
|
|
CONSTRAINT check_speed_limits CHECK (hard_min_speed_hz >= 15 AND hard_max_speed_hz <= 55),
|
|
CONSTRAINT check_min_max CHECK (hard_min_speed_hz < hard_max_speed_hz),
|
|
CONSTRAINT check_approved CHECK (approved_by IS NULL OR approved_by != set_by) -- Dual approval
|
|
);
|
|
|
|
COMMENT ON TABLE pump_safety_limits IS 'Hard operational limits enforced by Calejo Control adapter (Layer 2)';
|
|
COMMENT ON COLUMN pump_safety_limits.hard_min_speed_hz IS 'Minimum speed enforced by adapter (must be >= PLC physical limit)';
|
|
COMMENT ON COLUMN pump_safety_limits.hard_max_speed_hz IS 'Maximum speed enforced by adapter (must be <= PLC physical limit)';
|
|
3.2.2 safety_limit_violations
|
|
Purpose: Audit trail of all safety limit violations.
|
|
CREATE TABLE safety_limit_violations (
|
|
violation_id SERIAL PRIMARY KEY,
|
|
station_id VARCHAR(50) NOT NULL,
|
|
pump_id VARCHAR(50) NOT NULL,
|
|
requested_setpoint DECIMAL(5, 2),
|
|
enforced_setpoint DECIMAL(5, 2),
|
|
violations TEXT[], -- Array of violation descriptions
|
|
timestamp TIMESTAMP DEFAULT NOW(),
|
|
|
|
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
|
|
);
|
|
|
|
CREATE INDEX idx_violations_timestamp ON safety_limit_violations(timestamp DESC);
|
|
|
|
COMMENT ON TABLE safety_limit_violations IS 'Audit trail of safety limit violations (immutable)';
|
|
3.2.3 failsafe_events
|
|
Purpose: Record when adapter enters failsafe mode (e.g., database timeout).
|
|
CREATE TABLE failsafe_events (
|
|
event_id SERIAL PRIMARY KEY,
|
|
station_id VARCHAR(50) NOT NULL,
|
|
pump_id VARCHAR(50) NOT NULL,
|
|
event_type VARCHAR(50) NOT NULL, -- 'DATABASE_TIMEOUT', 'COMMUNICATION_LOSS', 'INVALID_DATA'
|
|
default_setpoint DECIMAL(5, 2),
|
|
triggered_by VARCHAR(100),
|
|
timestamp TIMESTAMP DEFAULT NOW(),
|
|
cleared_at TIMESTAMP,
|
|
notes TEXT,
|
|
|
|
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
|
|
);
|
|
|
|
CREATE INDEX idx_failsafe_events_active ON failsafe_events(station_id, pump_id, timestamp DESC)
|
|
WHERE cleared_at IS NULL;
|
|
|
|
COMMENT ON TABLE failsafe_events IS 'Record of failsafe mode activations';
|
|
3.2.4 emergency_stop_events
|
|
Purpose: Record emergency stop activations ("big red button").
|
|
CREATE TABLE emergency_stop_events (
|
|
event_id SERIAL PRIMARY KEY,
|
|
station_id VARCHAR(50), -- NULL = all stations
|
|
pump_id VARCHAR(50), -- NULL = all pumps at station
|
|
triggered_by VARCHAR(100) NOT NULL,
|
|
reason TEXT NOT NULL,
|
|
timestamp TIMESTAMP DEFAULT NOW(),
|
|
cleared_at TIMESTAMP,
|
|
cleared_by VARCHAR(100),
|
|
clear_notes TEXT,
|
|
|
|
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
|
|
);
|
|
|
|
CREATE INDEX idx_emergency_stop_active ON emergency_stop_events(timestamp DESC)
|
|
WHERE cleared_at IS NULL;
|
|
|
|
COMMENT ON TABLE emergency_stop_events IS 'Emergency stop event log (big red button)';
|
|
3.2.5 audit_log
|
|
Purpose: Immutable audit trail for compliance (IEC 62443, ISO 27001, NIS2).
|
|
CREATE TABLE audit_log (
|
|
log_id BIGSERIAL PRIMARY KEY,
|
|
timestamp TIMESTAMP DEFAULT NOW(),
|
|
event_type VARCHAR(50) NOT NULL,
|
|
severity VARCHAR(20) NOT NULL, -- 'INFO', 'WARNING', 'CRITICAL'
|
|
station_id VARCHAR(50),
|
|
pump_id VARCHAR(50),
|
|
user_id VARCHAR(100),
|
|
ip_address INET,
|
|
protocol VARCHAR(20), -- 'OPC_UA', 'MODBUS', 'REST_API'
|
|
action VARCHAR(100),
|
|
resource VARCHAR(200),
|
|
result VARCHAR(20), -- 'SUCCESS', 'FAILURE', 'DENIED'
|
|
event_data JSONB
|
|
);
|
|
|
|
CREATE INDEX idx_audit_log_timestamp ON audit_log(timestamp DESC);
|
|
CREATE INDEX idx_audit_log_severity ON audit_log(severity, timestamp DESC);
|
|
CREATE INDEX idx_audit_log_user ON audit_log(user_id, timestamp DESC);
|
|
|
|
-- Make audit log immutable (append-only)
|
|
CREATE RULE audit_log_no_update AS ON UPDATE TO audit_log DO INSTEAD NOTHING;
|
|
CREATE RULE audit_log_no_delete AS ON DELETE TO audit_log DO INSTEAD NOTHING;
|
|
|
|
COMMENT ON TABLE audit_log IS 'Immutable audit trail for compliance (IEC 62443, ISO 27001, NIS2 Directive)';
|
|
3.3 Database User Permissions
|
|
-- Create read-only user for adapter
|
|
CREATE USER control_reader WITH PASSWORD 'secure_password';
|
|
|
|
GRANT CONNECT ON DATABASE calejo TO control_reader;
|
|
GRANT USAGE ON SCHEMA public TO control_reader;
|
|
GRANT SELECT ON pump_stations, pumps, pump_plans, pump_feedback, pump_safety_limits TO control_reader;
|
|
GRANT INSERT ON safety_limit_violations, failsafe_events, emergency_stop_events, audit_log TO control_reader;
|
|
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO control_reader;
|
|
4. Core Component Specifications
|
|
4.1 Auto-Discovery Module
|
|
File: src/core/auto_discovery.py
|
|
Purpose: Automatically discover pump stations and pumps from database on startup.
|
|
(Implementation unchanged from v1.0 - see previous specification)
|
|
4.2 Security Layer
|
|
File: src/core/security.py
|
|
Purpose: Provide authentication, authorization, encryption, and audit logging.
|
|
(Implementation from v1.0, with additions below)
|
|
4.2.1 Compliance Requirements
|
|
The security layer must meet the following standards for Italian utilities:
|
|
IEC 62443 (Industrial Cybersecurity):
|
|
Zone and conduit model (IT network, DMZ, OT network)
|
|
Defense in depth
|
|
Role-based access control (RBAC)
|
|
Audit logging
|
|
ISO 27001 (Information Security Management):
|
|
Risk assessment
|
|
Access control
|
|
Cryptography
|
|
Incident management
|
|
NIS2 Directive (EU Critical Infrastructure):
|
|
Risk management measures
|
|
Incident reporting
|
|
Supply chain security
|
|
Cybersecurity training
|
|
Implementation:
|
|
class ComplianceManager:
|
|
"""
|
|
Ensure compliance with IEC 62443, ISO 27001, and NIS2 Directive.
|
|
"""
|
|
|
|
def __init__(self, audit_logger: AuditLogger):
|
|
self.audit_logger = audit_logger
|
|
|
|
def log_compliance_event(
|
|
self,
|
|
standard: str, # 'IEC_62443', 'ISO_27001', 'NIS2'
|
|
requirement: str,
|
|
event_type: str,
|
|
details: dict
|
|
):
|
|
"""Log compliance-related event."""
|
|
self.audit_logger.log(
|
|
event_type='COMPLIANCE',
|
|
severity='INFO',
|
|
event_data={
|
|
'standard': standard,
|
|
'requirement': requirement,
|
|
'event_type': event_type,
|
|
'details': details
|
|
}
|
|
)
|
|
4.3 Safety Framework ⚠️ NEW
|
|
File: src/core/safety.py
|
|
Purpose: Multi-layer safety mechanisms to prevent equipment damage and operational hazards.
|
|
4.3.1 Safety Limit Enforcer
|
|
Purpose: Enforce hard operational limits on all setpoints (last line of defense).
|
|
from typing import Tuple, List, Optional, Dict
|
|
from dataclasses import dataclass
|
|
import structlog
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
@dataclass
|
|
class SafetyLimits:
|
|
"""Safety limits for a pump."""
|
|
hard_min_speed_hz: float
|
|
hard_max_speed_hz: float
|
|
hard_min_level_m: Optional[float]
|
|
hard_max_level_m: Optional[float]
|
|
hard_max_power_kw: Optional[float]
|
|
max_speed_change_hz_per_min: float
|
|
|
|
class SafetyLimitEnforcer:
|
|
"""
|
|
Enforces multi-layer safety limits on all setpoints.
|
|
|
|
This is the LAST line of defense before setpoints are exposed to SCADA.
|
|
ALL setpoints MUST pass through this enforcer.
|
|
|
|
Three-Layer Architecture:
|
|
- Layer 1: Physical Hard Limits (PLC/VFD) - 15-55 Hz
|
|
- Layer 2: Station Safety Limits (Database) - 20-50 Hz (enforced here)
|
|
- Layer 3: Optimization Constraints (Calejo Optimize) - 25-45 Hz
|
|
"""
|
|
|
|
def __init__(self, db_client: DatabaseClient, audit_logger: AuditLogger):
|
|
self.db_client = db_client
|
|
self.audit_logger = audit_logger
|
|
self.safety_limits_cache: Dict[Tuple[str, str], SafetyLimits] = {}
|
|
self.previous_setpoints: Dict[Tuple[str, str], float] = {}
|
|
self._load_safety_limits()
|
|
|
|
def _load_safety_limits(self):
|
|
"""Load all safety limits from database into cache."""
|
|
query = """
|
|
SELECT station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
|
|
hard_min_level_m, hard_max_level_m, hard_max_power_kw,
|
|
max_speed_change_hz_per_min
|
|
FROM pump_safety_limits
|
|
WHERE approved_at IS NOT NULL -- Only use approved limits
|
|
"""
|
|
limits = self.db_client.execute_query(query)
|
|
|
|
for limit in limits:
|
|
key = (limit['station_id'], limit['pump_id'])
|
|
self.safety_limits_cache[key] = SafetyLimits(
|
|
hard_min_speed_hz=limit['hard_min_speed_hz'],
|
|
hard_max_speed_hz=limit['hard_max_speed_hz'],
|
|
hard_min_level_m=limit.get('hard_min_level_m'),
|
|
hard_max_level_m=limit.get('hard_max_level_m'),
|
|
hard_max_power_kw=limit.get('hard_max_power_kw'),
|
|
max_speed_change_hz_per_min=limit['max_speed_change_hz_per_min']
|
|
)
|
|
|
|
logger.info("safety_limits_loaded", pump_count=len(limits))
|
|
|
|
def enforce_setpoint(
|
|
self,
|
|
station_id: str,
|
|
pump_id: str,
|
|
setpoint: float
|
|
) -> Tuple[float, List[str]]:
|
|
"""
|
|
Enforce safety limits on setpoint.
|
|
|
|
Args:
|
|
station_id: Station identifier
|
|
pump_id: Pump identifier
|
|
setpoint: Proposed setpoint (Hz)
|
|
|
|
Returns:
|
|
Tuple of (enforced_setpoint, violations)
|
|
- enforced_setpoint: Safe setpoint (clamped if necessary)
|
|
- violations: List of safety violations (for logging/alerting)
|
|
"""
|
|
violations = []
|
|
enforced_setpoint = setpoint
|
|
|
|
# Get safety limits
|
|
key = (station_id, pump_id)
|
|
limits = self.safety_limits_cache.get(key)
|
|
|
|
if not limits:
|
|
logger.error(
|
|
"no_safety_limits",
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
# CRITICAL: No safety limits defined - reject setpoint
|
|
self.audit_logger.log(
|
|
event_type='SAFETY_ERROR',
|
|
severity='CRITICAL',
|
|
station_id=station_id,
|
|
pump_id=pump_id,
|
|
action='enforce_setpoint',
|
|
result='REJECTED',
|
|
event_data={'reason': 'NO_SAFETY_LIMITS_DEFINED'}
|
|
)
|
|
return (0.0, ["NO_SAFETY_LIMITS_DEFINED"])
|
|
|
|
# Check minimum speed
|
|
if enforced_setpoint < limits.hard_min_speed_hz:
|
|
violations.append(
|
|
f"BELOW_MIN_SPEED: {enforced_setpoint:.2f} < {limits.hard_min_speed_hz:.2f}"
|
|
)
|
|
enforced_setpoint = limits.hard_min_speed_hz
|
|
|
|
# Check maximum speed
|
|
if enforced_setpoint > limits.hard_max_speed_hz:
|
|
violations.append(
|
|
f"ABOVE_MAX_SPEED: {enforced_setpoint:.2f} > {limits.hard_max_speed_hz:.2f}"
|
|
)
|
|
enforced_setpoint = limits.hard_max_speed_hz
|
|
|
|
# Check rate of change (prevent sudden speed changes that damage equipment)
|
|
previous_setpoint = self.previous_setpoints.get(key)
|
|
if previous_setpoint is not None:
|
|
max_change = limits.max_speed_change_hz_per_min * 5.0 # 5-minute interval
|
|
actual_change = abs(enforced_setpoint - previous_setpoint)
|
|
|
|
if actual_change > max_change:
|
|
# Limit rate of change
|
|
direction = 1 if enforced_setpoint > previous_setpoint else -1
|
|
enforced_setpoint = previous_setpoint + (direction * max_change)
|
|
violations.append(
|
|
f"RATE_OF_CHANGE_LIMITED: {actual_change:.2f} Hz > {max_change:.2f} Hz"
|
|
)
|
|
|
|
# Store current setpoint for next rate-of-change check
|
|
self.previous_setpoints[key] = enforced_setpoint
|
|
|
|
# Log violations
|
|
if violations:
|
|
logger.warning(
|
|
"safety_limit_violation",
|
|
station_id=station_id,
|
|
pump_id=pump_id,
|
|
requested_setpoint=setpoint,
|
|
enforced_setpoint=enforced_setpoint,
|
|
violations=violations
|
|
)
|
|
|
|
# Record violation in database for audit
|
|
self._record_violation(station_id, pump_id, setpoint, enforced_setpoint, violations)
|
|
|
|
# Log to audit trail
|
|
self.audit_logger.log(
|
|
event_type='SAFETY_VIOLATION',
|
|
severity='WARNING',
|
|
station_id=station_id,
|
|
pump_id=pump_id,
|
|
action='enforce_setpoint',
|
|
result='CLAMPED',
|
|
event_data={
|
|
'requested': setpoint,
|
|
'enforced': enforced_setpoint,
|
|
'violations': violations
|
|
}
|
|
)
|
|
|
|
return (enforced_setpoint, violations)
|
|
|
|
def _record_violation(
|
|
self,
|
|
station_id: str,
|
|
pump_id: str,
|
|
requested: float,
|
|
enforced: float,
|
|
violations: List[str]
|
|
):
|
|
"""Record safety limit violation in database."""
|
|
query = """
|
|
INSERT INTO safety_limit_violations
|
|
(station_id, pump_id, requested_setpoint, enforced_setpoint, violations, timestamp)
|
|
VALUES (%s, %s, %s, %s, %s, NOW())
|
|
"""
|
|
self.db_client.execute(query, (station_id, pump_id, requested, enforced, violations))
|
|
4.3.2 Database Watchdog
|
|
Purpose: Detect if optimization hasn't updated plans for 20 minutes and revert to safe defaults.
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
|
|
class DatabaseWatchdog:
|
|
"""
|
|
Monitors database updates and triggers failsafe mode if updates stop.
|
|
|
|
Requirement: Detect if Calejo Optimize hasn't updated pump plans for 20 minutes.
|
|
Action: Revert to default safe setpoints (existing pump configuration).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
db_client: DatabaseClient,
|
|
audit_logger: AuditLogger,
|
|
alert_manager: AlertManager,
|
|
timeout_seconds: int = 1200, # 20 minutes
|
|
check_interval_seconds: int = 60 # Check every minute
|
|
):
|
|
self.db_client = db_client
|
|
self.audit_logger = audit_logger
|
|
self.alert_manager = alert_manager
|
|
self.timeout_seconds = timeout_seconds
|
|
self.check_interval_seconds = check_interval_seconds
|
|
self.failsafe_mode: Dict[Tuple[str, str], bool] = {}
|
|
|
|
async def start(self):
|
|
"""Start watchdog monitoring loop."""
|
|
logger.info(
|
|
"database_watchdog_started",
|
|
timeout_seconds=self.timeout_seconds,
|
|
check_interval_seconds=self.check_interval_seconds
|
|
)
|
|
|
|
while True:
|
|
await asyncio.sleep(self.check_interval_seconds)
|
|
await self._check_updates()
|
|
|
|
async def _check_updates(self):
|
|
"""Check for stale data and trigger failsafe if needed."""
|
|
now = datetime.now()
|
|
|
|
# Query last update time for all pumps
|
|
query = """
|
|
SELECT station_id, pump_id, MAX(plan_created_at) as last_update
|
|
FROM pump_plans
|
|
GROUP BY station_id, pump_id
|
|
"""
|
|
|
|
results = self.db_client.execute_query(query)
|
|
|
|
for row in results:
|
|
key = (row['station_id'], row['pump_id'])
|
|
last_update = row['last_update']
|
|
|
|
if last_update:
|
|
time_since_update = (now - last_update).total_seconds()
|
|
|
|
# Check if update is stale
|
|
if time_since_update > self.timeout_seconds:
|
|
if not self.failsafe_mode.get(key, False):
|
|
# Trigger failsafe mode
|
|
await self._trigger_failsafe(
|
|
row['station_id'],
|
|
row['pump_id'],
|
|
time_since_update,
|
|
last_update
|
|
)
|
|
self.failsafe_mode[key] = True
|
|
else:
|
|
# Updates are current, exit failsafe mode if active
|
|
if self.failsafe_mode.get(key, False):
|
|
await self._clear_failsafe(row['station_id'], row['pump_id'])
|
|
self.failsafe_mode[key] = False
|
|
|
|
async def _trigger_failsafe(
|
|
self,
|
|
station_id: str,
|
|
pump_id: str,
|
|
time_since_update: float,
|
|
last_update: datetime
|
|
):
|
|
"""
|
|
Trigger failsafe mode for a pump.
|
|
|
|
Sets pump to default safe setpoint (existing configuration).
|
|
"""
|
|
logger.critical(
|
|
"watchdog_failsafe_triggered",
|
|
station_id=station_id,
|
|
pump_id=pump_id,
|
|
time_since_update_seconds=time_since_update,
|
|
last_update=last_update.isoformat()
|
|
)
|
|
|
|
# Get default safe setpoint from pump configuration
|
|
default_setpoint = await self._get_default_setpoint(station_id, pump_id)
|
|
|
|
# Record failsafe event
|
|
query = """
|
|
INSERT INTO failsafe_events
|
|
(station_id, pump_id, event_type, default_setpoint, triggered_by, timestamp)
|
|
VALUES (%s, %s, 'DATABASE_TIMEOUT', %s, 'DATABASE_WATCHDOG', NOW())
|
|
"""
|
|
self.db_client.execute(query, (station_id, pump_id, default_setpoint))
|
|
|
|
# Log to audit trail
|
|
self.audit_logger.log(
|
|
event_type='FAILSAFE_TRIGGERED',
|
|
severity='CRITICAL',
|
|
station_id=station_id,
|
|
pump_id=pump_id,
|
|
action='trigger_failsafe',
|
|
result='SUCCESS',
|
|
event_data={
|
|
'reason': 'DATABASE_TIMEOUT',
|
|
'time_since_update_seconds': time_since_update,
|
|
'default_setpoint': default_setpoint
|
|
}
|
|
)
|
|
|
|
# Send alert to operators
|
|
await self.alert_manager.send_alert(
|
|
severity='CRITICAL',
|
|
title=f'FAILSAFE MODE: {station_id}/{pump_id}',
|
|
message=(
|
|
f"No database updates for {time_since_update/60:.1f} minutes. "
|
|
f"Reverting to default setpoint: {default_setpoint} Hz. "
|
|
f"Last update: {last_update.isoformat()}"
|
|
),
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
|
|
async def _clear_failsafe(self, station_id: str, pump_id: str):
|
|
"""Clear failsafe mode when database updates resume."""
|
|
logger.info(
|
|
"watchdog_failsafe_cleared",
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
|
|
# Update failsafe event
|
|
query = """
|
|
UPDATE failsafe_events
|
|
SET cleared_at = NOW()
|
|
WHERE station_id = %s AND pump_id = %s AND cleared_at IS NULL
|
|
"""
|
|
self.db_client.execute(query, (station_id, pump_id))
|
|
|
|
# Log to audit trail
|
|
self.audit_logger.log(
|
|
event_type='FAILSAFE_CLEARED',
|
|
severity='INFO',
|
|
station_id=station_id,
|
|
pump_id=pump_id,
|
|
action='clear_failsafe',
|
|
result='SUCCESS',
|
|
event_data={'reason': 'DATABASE_UPDATES_RESUMED'}
|
|
)
|
|
|
|
# Send alert
|
|
await self.alert_manager.send_alert(
|
|
severity='INFO',
|
|
title=f'Failsafe Cleared: {station_id}/{pump_id}',
|
|
message='Database updates resumed. Returning to normal operation.',
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
|
|
async def _get_default_setpoint(self, station_id: str, pump_id: str) -> float:
|
|
"""
|
|
Get default safe setpoint for pump (existing configuration).
|
|
|
|
Returns pump's configured default_setpoint_hz.
|
|
"""
|
|
query = """
|
|
SELECT default_setpoint_hz
|
|
FROM pumps
|
|
WHERE station_id = %s AND pump_id = %s
|
|
"""
|
|
result = self.db_client.execute_query(query, (station_id, pump_id))
|
|
|
|
if result and result[0]['default_setpoint_hz']:
|
|
return float(result[0]['default_setpoint_hz'])
|
|
|
|
# Ultimate fallback (should never reach here)
|
|
logger.error(
|
|
"no_default_setpoint_configured",
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
return 35.0 # Conservative fallback
|
|
4.3.3 Emergency Stop Manager
|
|
Purpose: Provide "big red button" to immediately halt optimization control.
|
|
class EmergencyStopManager:
|
|
"""
|
|
Manages emergency stop functionality ("big red button").
|
|
|
|
Provides immediate halt of optimization control and reversion to safe defaults.
|
|
Supports both single-pump and system-wide emergency stop.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
db_client: DatabaseClient,
|
|
audit_logger: AuditLogger,
|
|
alert_manager: AlertManager
|
|
):
|
|
self.db_client = db_client
|
|
self.audit_logger = audit_logger
|
|
self.alert_manager = alert_manager
|
|
self.emergency_stop_active: Dict[Tuple[Optional[str], Optional[str]], int] = {}
|
|
|
|
async def trigger_emergency_stop(
|
|
self,
|
|
triggered_by: str,
|
|
reason: str,
|
|
station_id: Optional[str] = None,
|
|
pump_id: Optional[str] = None
|
|
) -> int:
|
|
"""
|
|
Trigger emergency stop.
|
|
|
|
Args:
|
|
triggered_by: User or system that triggered stop
|
|
reason: Reason for emergency stop
|
|
station_id: Optional - limit to specific station (None = all stations)
|
|
pump_id: Optional - limit to specific pump (None = all pumps at station)
|
|
|
|
Returns:
|
|
event_id: Emergency stop event ID
|
|
"""
|
|
logger.critical(
|
|
"emergency_stop_triggered",
|
|
triggered_by=triggered_by,
|
|
reason=reason,
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
|
|
# Record event
|
|
query = """
|
|
INSERT INTO emergency_stop_events
|
|
(station_id, pump_id, triggered_by, reason, timestamp)
|
|
VALUES (%s, %s, %s, %s, NOW())
|
|
RETURNING event_id
|
|
"""
|
|
result = self.db_client.execute(query, (station_id, pump_id, triggered_by, reason))
|
|
event_id = result[0]['event_id']
|
|
|
|
# Set emergency stop flag
|
|
key = (station_id, pump_id)
|
|
self.emergency_stop_active[key] = event_id
|
|
|
|
# Set all affected pumps to default setpoints
|
|
affected_pumps = await self._get_affected_pumps(station_id, pump_id)
|
|
|
|
for pump in affected_pumps:
|
|
await self._set_pump_to_default(
|
|
pump['station_id'],
|
|
pump['pump_id'],
|
|
event_id
|
|
)
|
|
|
|
# Log to audit trail
|
|
self.audit_logger.log(
|
|
event_type='EMERGENCY_STOP',
|
|
severity='CRITICAL',
|
|
station_id=station_id,
|
|
pump_id=pump_id,
|
|
user_id=triggered_by,
|
|
action='trigger_emergency_stop',
|
|
result='SUCCESS',
|
|
event_data={
|
|
'event_id': event_id,
|
|
'reason': reason,
|
|
'affected_pumps': len(affected_pumps)
|
|
}
|
|
)
|
|
|
|
# Send alerts
|
|
await self.alert_manager.send_alert(
|
|
severity='CRITICAL',
|
|
title='EMERGENCY STOP ACTIVATED',
|
|
message=(
|
|
f"Emergency stop triggered by {triggered_by}. "
|
|
f"Reason: {reason}. "
|
|
f"Scope: {self._get_scope_description(station_id, pump_id)}. "
|
|
f"Affected pumps: {len(affected_pumps)}"
|
|
),
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
|
|
return event_id
|
|
|
|
async def clear_emergency_stop(
|
|
self,
|
|
event_id: int,
|
|
cleared_by: str,
|
|
notes: str
|
|
):
|
|
"""Clear emergency stop and resume normal operation."""
|
|
logger.info(
|
|
"emergency_stop_cleared",
|
|
event_id=event_id,
|
|
cleared_by=cleared_by
|
|
)
|
|
|
|
# Update event record
|
|
query = """
|
|
UPDATE emergency_stop_events
|
|
SET cleared_at = NOW(), cleared_by = %s, clear_notes = %s
|
|
WHERE event_id = %s
|
|
RETURNING station_id, pump_id
|
|
"""
|
|
result = self.db_client.execute(query, (cleared_by, notes, event_id))
|
|
|
|
if result:
|
|
station_id = result[0]['station_id']
|
|
pump_id = result[0]['pump_id']
|
|
|
|
# Clear emergency stop flag
|
|
key = (station_id, pump_id)
|
|
if key in self.emergency_stop_active:
|
|
del self.emergency_stop_active[key]
|
|
|
|
# Log to audit trail
|
|
self.audit_logger.log(
|
|
event_type='EMERGENCY_STOP_CLEARED',
|
|
severity='INFO',
|
|
station_id=station_id,
|
|
pump_id=pump_id,
|
|
user_id=cleared_by,
|
|
action='clear_emergency_stop',
|
|
result='SUCCESS',
|
|
event_data={
|
|
'event_id': event_id,
|
|
'notes': notes
|
|
}
|
|
)
|
|
|
|
# Send alert
|
|
await self.alert_manager.send_alert(
|
|
severity='INFO',
|
|
title='Emergency Stop Cleared',
|
|
message=f"Emergency stop cleared by {cleared_by}. Notes: {notes}",
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
|
|
def is_emergency_stop_active(
|
|
self,
|
|
station_id: str,
|
|
pump_id: str
|
|
) -> bool:
|
|
"""Check if emergency stop is active for a pump."""
|
|
# Check specific pump
|
|
if (station_id, pump_id) in self.emergency_stop_active:
|
|
return True
|
|
|
|
# Check station-wide
|
|
if (station_id, None) in self.emergency_stop_active:
|
|
return True
|
|
|
|
# Check system-wide
|
|
if (None, None) in self.emergency_stop_active:
|
|
return True
|
|
|
|
return False
|
|
|
|
async def _get_affected_pumps(
|
|
self,
|
|
station_id: Optional[str],
|
|
pump_id: Optional[str]
|
|
) -> List[Dict]:
|
|
"""Get list of pumps affected by emergency stop."""
|
|
if station_id and pump_id:
|
|
# Single pump
|
|
query = """
|
|
SELECT station_id, pump_id
|
|
FROM pumps
|
|
WHERE station_id = %s AND pump_id = %s AND active = TRUE
|
|
"""
|
|
return self.db_client.execute_query(query, (station_id, pump_id))
|
|
elif station_id:
|
|
# All pumps at station
|
|
query = """
|
|
SELECT station_id, pump_id
|
|
FROM pumps
|
|
WHERE station_id = %s AND active = TRUE
|
|
"""
|
|
return self.db_client.execute_query(query, (station_id,))
|
|
else:
|
|
# ALL pumps in system
|
|
query = """
|
|
SELECT station_id, pump_id
|
|
FROM pumps
|
|
WHERE active = TRUE
|
|
"""
|
|
return self.db_client.execute_query(query)
|
|
|
|
async def _set_pump_to_default(
|
|
self,
|
|
station_id: str,
|
|
pump_id: str,
|
|
event_id: int
|
|
):
|
|
"""Set pump to default safe setpoint."""
|
|
# Get default setpoint
|
|
query = """
|
|
SELECT default_setpoint_hz
|
|
FROM pumps
|
|
WHERE station_id = %s AND pump_id = %s
|
|
"""
|
|
result = self.db_client.execute_query(query, (station_id, pump_id))
|
|
|
|
if result:
|
|
default_setpoint = result[0]['default_setpoint_hz']
|
|
|
|
logger.info(
|
|
"pump_set_to_default",
|
|
station_id=station_id,
|
|
pump_id=pump_id,
|
|
default_setpoint=default_setpoint,
|
|
event_id=event_id
|
|
)
|
|
|
|
# Note: Actual setpoint override is handled by SetpointManager
|
|
# This method just logs the action
|
|
|
|
def _get_scope_description(
|
|
self,
|
|
station_id: Optional[str],
|
|
pump_id: Optional[str]
|
|
) -> str:
|
|
"""Get human-readable description of emergency stop scope."""
|
|
if station_id and pump_id:
|
|
return f"Single pump ({station_id}/{pump_id})"
|
|
elif station_id:
|
|
return f"All pumps at station {station_id}"
|
|
else:
|
|
return "ALL PUMPS SYSTEM-WIDE"
|
|
4.3.4 Alert Manager
|
|
Purpose: Multi-channel alerting (email, SMS, SCADA, webhooks).
|
|
import aiosmtplib
|
|
from email.message import EmailMessage
|
|
from twilio.rest import Client as TwilioClient
|
|
import httpx
|
|
|
|
class AlertManager:
|
|
"""
|
|
Send alerts via multiple channels.
|
|
|
|
Supports:
|
|
- Email (SMTP)
|
|
- SMS (Twilio)
|
|
- SCADA HMI alarms (OPC UA)
|
|
- Webhooks (HTTP POST)
|
|
"""
|
|
|
|
def __init__(self, config: AlertConfig):
|
|
self.config = config
|
|
self.twilio_client = None
|
|
if config.sms_enabled:
|
|
self.twilio_client = TwilioClient(
|
|
config.twilio_account_sid,
|
|
config.twilio_auth_token
|
|
)
|
|
|
|
async def send_alert(
|
|
self,
|
|
severity: str, # 'INFO', 'WARNING', 'CRITICAL'
|
|
title: str,
|
|
message: str,
|
|
station_id: Optional[str] = None,
|
|
pump_id: Optional[str] = None
|
|
):
|
|
"""Send alert via all configured channels."""
|
|
|
|
# Email
|
|
if self.config.email_enabled:
|
|
await self._send_email(severity, title, message, station_id, pump_id)
|
|
|
|
# SMS (for critical alerts only)
|
|
if severity == 'CRITICAL' and self.config.sms_enabled:
|
|
await self._send_sms(title, message)
|
|
|
|
# SCADA HMI alarm
|
|
if self.config.scada_alarms_enabled:
|
|
await self._trigger_scada_alarm(severity, title, message, station_id, pump_id)
|
|
|
|
# Webhook
|
|
if self.config.webhook_enabled:
|
|
await self._send_webhook(severity, title, message, station_id, pump_id)
|
|
|
|
async def _send_email(
|
|
self,
|
|
severity: str,
|
|
title: str,
|
|
message: str,
|
|
station_id: Optional[str],
|
|
pump_id: Optional[str]
|
|
):
|
|
"""Send email alert via SMTP."""
|
|
try:
|
|
email = EmailMessage()
|
|
email['From'] = self.config.email_from
|
|
email['To'] = ', '.join(self.config.email_recipients)
|
|
email['Subject'] = f"[{severity}] Calejo Control: {title}"
|
|
|
|
body = f"""
|
|
Calejo Control Alert
|
|
|
|
Severity: {severity}
|
|
Title: {title}
|
|
Station: {station_id or 'N/A'}
|
|
Pump: {pump_id or 'N/A'}
|
|
Timestamp: {datetime.now().isoformat()}
|
|
|
|
Message:
|
|
{message}
|
|
|
|
---
|
|
This is an automated alert from Calejo Control.
|
|
"""
|
|
email.set_content(body)
|
|
|
|
await aiosmtplib.send(
|
|
email,
|
|
hostname=self.config.smtp_host,
|
|
port=self.config.smtp_port,
|
|
username=self.config.smtp_username,
|
|
password=self.config.smtp_password,
|
|
use_tls=self.config.smtp_use_tls
|
|
)
|
|
|
|
logger.info("email_alert_sent", severity=severity, title=title)
|
|
except Exception as e:
|
|
logger.error("email_alert_failed", error=str(e))
|
|
|
|
async def _send_sms(self, title: str, message: str):
|
|
"""Send SMS alert via Twilio."""
|
|
try:
|
|
for phone_number in self.config.sms_recipients:
|
|
self.twilio_client.messages.create(
|
|
body=f"Calejo Control CRITICAL: {title}. {message[:100]}",
|
|
from_=self.config.twilio_phone_number,
|
|
to=phone_number
|
|
)
|
|
|
|
logger.info("sms_alert_sent", title=title)
|
|
except Exception as e:
|
|
logger.error("sms_alert_failed", error=str(e))
|
|
|
|
async def _trigger_scada_alarm(
|
|
self,
|
|
severity: str,
|
|
title: str,
|
|
message: str,
|
|
station_id: Optional[str],
|
|
pump_id: Optional[str]
|
|
):
|
|
"""Trigger alarm in SCADA HMI via OPC UA."""
|
|
# Implementation depends on SCADA system
|
|
# Typically write to OPC UA alarm node
|
|
pass
|
|
|
|
async def _send_webhook(
|
|
self,
|
|
severity: str,
|
|
title: str,
|
|
message: str,
|
|
station_id: Optional[str],
|
|
pump_id: Optional[str]
|
|
):
|
|
"""Send webhook alert via HTTP POST."""
|
|
try:
|
|
payload = {
|
|
'severity': severity,
|
|
'title': title,
|
|
'message': message,
|
|
'station_id': station_id,
|
|
'pump_id': pump_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
self.config.webhook_url,
|
|
json=payload,
|
|
headers={'Authorization': f'Bearer {self.config.webhook_token}'},
|
|
timeout=10.0
|
|
)
|
|
response.raise_for_status()
|
|
|
|
logger.info("webhook_alert_sent", severity=severity, title=title)
|
|
except Exception as e:
|
|
logger.error("webhook_alert_failed", error=str(e))
|
|
|
|
@dataclass
|
|
class AlertConfig:
|
|
"""Alert configuration."""
|
|
# Email
|
|
email_enabled: bool = True
|
|
email_from: str = "calejo-control@example.com"
|
|
email_recipients: List[str] = None
|
|
smtp_host: str = "smtp.gmail.com"
|
|
smtp_port: int = 587
|
|
smtp_username: str = ""
|
|
smtp_password: str = ""
|
|
smtp_use_tls: bool = True
|
|
|
|
# SMS
|
|
sms_enabled: bool = True
|
|
sms_recipients: List[str] = None
|
|
twilio_account_sid: str = ""
|
|
twilio_auth_token: str = ""
|
|
twilio_phone_number: str = ""
|
|
|
|
# SCADA
|
|
scada_alarms_enabled: bool = True
|
|
|
|
# Webhook
|
|
webhook_enabled: bool = True
|
|
webhook_url: str = ""
|
|
webhook_token: str = ""
|
|
4.4 Plan-to-Setpoint Logic Engine
|
|
File: src/core/setpoint_logic.py
|
|
(Implementation unchanged from v1.0 - see previous specification)
|
|
Integration with Safety Framework:
|
|
class SetpointManager:
|
|
"""
|
|
Manages setpoint calculation for all pumps.
|
|
|
|
Integrates with safety framework to enforce limits and handle failsafe mode.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
discovery: AdapterAutoDiscovery,
|
|
db_client: DatabaseClient,
|
|
safety_enforcer: SafetyLimitEnforcer,
|
|
emergency_stop_manager: EmergencyStopManager,
|
|
watchdog: DatabaseWatchdog
|
|
):
|
|
self.discovery = discovery
|
|
self.db_client = db_client
|
|
self.safety_enforcer = safety_enforcer
|
|
self.emergency_stop_manager = emergency_stop_manager
|
|
self.watchdog = watchdog
|
|
|
|
# Create calculator instances
|
|
self.calculators = {
|
|
'DIRECT_SPEED': DirectSpeedCalculator(),
|
|
'LEVEL_CONTROLLED': LevelControlledCalculator(),
|
|
'POWER_CONTROLLED': PowerControlledCalculator()
|
|
}
|
|
|
|
def get_current_setpoint(self, station_id: str, pump_id: str) -> Optional[float]:
|
|
"""
|
|
Get current setpoint for a pump.
|
|
|
|
Integrates safety checks:
|
|
1. Check if emergency stop is active
|
|
2. Check if failsafe mode is active
|
|
3. Calculate setpoint from optimization plan
|
|
4. Enforce safety limits
|
|
|
|
Returns:
|
|
Setpoint in Hz, or None if no valid plan exists
|
|
"""
|
|
# Check emergency stop
|
|
if self.emergency_stop_manager.is_emergency_stop_active(station_id, pump_id):
|
|
logger.info(
|
|
"emergency_stop_active",
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
return self._get_default_setpoint(station_id, pump_id)
|
|
|
|
# Check failsafe mode
|
|
if self.watchdog.failsafe_mode.get((station_id, pump_id), False):
|
|
logger.info(
|
|
"failsafe_mode_active",
|
|
station_id=station_id,
|
|
pump_id=pump_id
|
|
)
|
|
return self._get_default_setpoint(station_id, pump_id)
|
|
|
|
# Get pump info
|
|
pump_info = self.discovery.get_pump(station_id, pump_id)
|
|
if not pump_info:
|
|
logger.error("pump_not_found", station_id=station_id, pump_id=pump_id)
|
|
return None
|
|
|
|
# Get current optimization plan
|
|
plan = self.db_client.get_current_plan(station_id, pump_id)
|
|
if not plan:
|
|
logger.warning("no_active_plan", station_id=station_id, pump_id=pump_id)
|
|
return self._get_default_setpoint(station_id, pump_id)
|
|
|
|
# Get latest feedback (optional)
|
|
feedback = self.db_client.get_latest_feedback(station_id, pump_id)
|
|
|
|
# Get appropriate calculator
|
|
calculator = self.calculators.get(pump_info.control_type)
|
|
if not calculator:
|
|
logger.error("unknown_control_type", control_type=pump_info.control_type)
|
|
return None
|
|
|
|
# Calculate setpoint
|
|
setpoint = calculator.calculate_setpoint(plan, feedback, pump_info)
|
|
|
|
# Enforce safety limits (LAST LINE OF DEFENSE)
|
|
enforced_setpoint, violations = self.safety_enforcer.enforce_setpoint(
|
|
station_id,
|
|
pump_id,
|
|
setpoint
|
|
)
|
|
|
|
return enforced_setpoint
|
|
|
|
def _get_default_setpoint(self, station_id: str, pump_id: str) -> float:
|
|
"""Get default safe setpoint from pump configuration."""
|
|
query = """
|
|
SELECT default_setpoint_hz
|
|
FROM pumps
|
|
WHERE station_id = %s AND pump_id = %s
|
|
"""
|
|
result = self.db_client.execute_query(query, (station_id, pump_id))
|
|
|
|
if result and result[0]['default_setpoint_hz']:
|
|
return float(result[0]['default_setpoint_hz'])
|
|
|
|
# Fallback
|
|
return 35.0
|
|
4.5 Multi-Protocol Server
|
|
(Implementation unchanged from v1.0 - see previous specification)
|
|
5. REST API Endpoints
|
|
5.1 Emergency Stop Endpoints
|
|
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from pydantic import BaseModel
|
|
|
|
app = FastAPI(title="Calejo Control API", version="2.0")
|
|
|
|
class EmergencyStopRequest(BaseModel):
|
|
triggered_by: str
|
|
reason: str
|
|
station_id: Optional[str] = None
|
|
pump_id: Optional[str] = None
|
|
|
|
class EmergencyStopClearRequest(BaseModel):
|
|
cleared_by: str
|
|
notes: str
|
|
|
|
@app.post(
|
|
"/api/v1/emergency-stop",
|
|
summary="Trigger emergency stop",
|
|
description="Immediately halt optimization control and revert to safe defaults (BIG RED BUTTON)",
|
|
status_code=status.HTTP_201_CREATED,
|
|
dependencies=[Depends(verify_admin_permission)]
|
|
)
|
|
async def trigger_emergency_stop(request: EmergencyStopRequest):
|
|
"""
|
|
Trigger emergency stop ("big red button").
|
|
|
|
Requires admin permission.
|
|
|
|
Scope:
|
|
- If station_id and pump_id provided: Stop single pump
|
|
- If station_id only: Stop all pumps at station
|
|
- If neither: Stop ALL pumps system-wide
|
|
"""
|
|
event_id = await emergency_stop_manager.trigger_emergency_stop(
|
|
triggered_by=request.triggered_by,
|
|
reason=request.reason,
|
|
station_id=request.station_id,
|
|
pump_id=request.pump_id
|
|
)
|
|
|
|
return {
|
|
"status": "emergency_stop_triggered",
|
|
"event_id": event_id,
|
|
"scope": emergency_stop_manager._get_scope_description(
|
|
request.station_id,
|
|
request.pump_id
|
|
),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
@app.post(
|
|
"/api/v1/emergency-stop/{event_id}/clear",
|
|
summary="Clear emergency stop",
|
|
description="Resume normal operation after emergency stop",
|
|
dependencies=[Depends(verify_admin_permission)]
|
|
)
|
|
async def clear_emergency_stop(event_id: int, request: EmergencyStopClearRequest):
|
|
"""Clear emergency stop and resume normal operation."""
|
|
await emergency_stop_manager.clear_emergency_stop(
|
|
event_id=event_id,
|
|
cleared_by=request.cleared_by,
|
|
notes=request.notes
|
|
)
|
|
|
|
return {
|
|
"status": "emergency_stop_cleared",
|
|
"event_id": event_id,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
@app.get(
|
|
"/api/v1/emergency-stop/status",
|
|
summary="Get emergency stop status",
|
|
description="Check if any emergency stops are active"
|
|
)
|
|
async def get_emergency_stop_status():
|
|
"""Get current emergency stop status."""
|
|
query = """
|
|
SELECT event_id, station_id, pump_id, triggered_by, reason, timestamp
|
|
FROM emergency_stop_events
|
|
WHERE cleared_at IS NULL
|
|
ORDER BY timestamp DESC
|
|
"""
|
|
active_stops = db_client.execute_query(query)
|
|
|
|
return {
|
|
"active_emergency_stops": len(active_stops),
|
|
"events": active_stops
|
|
}
|
|
5.2 Safety Status Endpoints
|
|
@app.get(
|
|
"/api/v1/safety/status",
|
|
summary="Get safety status",
|
|
description="Get current safety status for all pumps"
|
|
)
|
|
async def get_safety_status():
|
|
"""Get safety status dashboard data."""
|
|
# Get active violations
|
|
violations_query = """
|
|
SELECT station_id, pump_id, COUNT(*) as violation_count
|
|
FROM safety_limit_violations
|
|
WHERE timestamp > NOW() - INTERVAL '1 hour'
|
|
GROUP BY station_id, pump_id
|
|
"""
|
|
violations = db_client.execute_query(violations_query)
|
|
|
|
# Get failsafe mode pumps
|
|
failsafe_query = """
|
|
SELECT station_id, pump_id, event_type, timestamp
|
|
FROM failsafe_events
|
|
WHERE cleared_at IS NULL
|
|
"""
|
|
failsafe_pumps = db_client.execute_query(failsafe_query)
|
|
|
|
# Get emergency stops
|
|
emergency_stops_query = """
|
|
SELECT event_id, station_id, pump_id, triggered_by, reason, timestamp
|
|
FROM emergency_stop_events
|
|
WHERE cleared_at IS NULL
|
|
"""
|
|
emergency_stops = db_client.execute_query(emergency_stops_query)
|
|
|
|
return {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"safety_violations_last_hour": len(violations),
|
|
"pumps_in_failsafe_mode": len(failsafe_pumps),
|
|
"active_emergency_stops": len(emergency_stops),
|
|
"violations": violations,
|
|
"failsafe_pumps": failsafe_pumps,
|
|
"emergency_stops": emergency_stops
|
|
}
|
|
|
|
@app.get(
|
|
"/api/v1/safety/violations",
|
|
summary="Get safety violations",
|
|
description="Get recent safety limit violations"
|
|
)
|
|
async def get_safety_violations(hours: int = 24, limit: int = 100):
|
|
"""Get recent safety limit violations."""
|
|
query = """
|
|
SELECT station_id, pump_id, requested_setpoint, enforced_setpoint,
|
|
violations, timestamp
|
|
FROM safety_limit_violations
|
|
WHERE timestamp > NOW() - INTERVAL '%s hours'
|
|
ORDER BY timestamp DESC
|
|
LIMIT %s
|
|
"""
|
|
violations = db_client.execute_query(query, (hours, limit))
|
|
|
|
return {
|
|
"violations": violations,
|
|
"count": len(violations)
|
|
}
|
|
6. Configuration Management
|
|
6.1 Environment Variables
|
|
File: .env
|
|
# Database connection
|
|
DB_HOST=calejo-optimize
|
|
DB_PORT=5432
|
|
DB_NAME=calejo
|
|
DB_USER=control_reader
|
|
DB_PASSWORD=secure_password
|
|
|
|
# Station filter (optional)
|
|
STATION_FILTER=
|
|
|
|
# Security
|
|
API_KEY=your_api_key_here
|
|
TLS_ENABLED=true
|
|
TLS_CERT_PATH=/etc/calejo/ssl/cert.pem
|
|
TLS_KEY_PATH=/etc/calejo/ssl/key.pem
|
|
|
|
# OPC UA
|
|
OPCUA_ENABLED=true
|
|
OPCUA_PORT=4840
|
|
OPCUA_SECURITY_MODE=SignAndEncrypt
|
|
OPCUA_CERT_PATH=/etc/calejo/opcua/cert.der
|
|
OPCUA_KEY_PATH=/etc/calejo/opcua/key.pem
|
|
|
|
# Modbus TCP
|
|
MODBUS_ENABLED=true
|
|
MODBUS_PORT=502
|
|
MODBUS_SLAVE_ID=1
|
|
|
|
# REST API
|
|
REST_API_ENABLED=true
|
|
REST_API_PORT=8080
|
|
REST_API_CORS_ENABLED=true
|
|
|
|
# Safety - Watchdog
|
|
WATCHDOG_ENABLED=true
|
|
WATCHDOG_TIMEOUT_SECONDS=1200 # 20 minutes
|
|
WATCHDOG_CHECK_INTERVAL_SECONDS=60 # Check every minute
|
|
|
|
# Alerts - Email
|
|
ALERT_EMAIL_ENABLED=true
|
|
ALERT_EMAIL_FROM=calejo-control@example.com
|
|
ALERT_EMAIL_RECIPIENTS=operator1@utility.it,operator2@utility.it
|
|
SMTP_HOST=smtp.gmail.com
|
|
SMTP_PORT=587
|
|
SMTP_USERNAME=calejo-control@example.com
|
|
SMTP_PASSWORD=smtp_password
|
|
SMTP_USE_TLS=true
|
|
|
|
# Alerts - SMS
|
|
ALERT_SMS_ENABLED=true
|
|
ALERT_SMS_RECIPIENTS=+393401234567,+393407654321
|
|
TWILIO_ACCOUNT_SID=your_twilio_account_sid
|
|
TWILIO_AUTH_TOKEN=your_twilio_auth_token
|
|
TWILIO_PHONE_NUMBER=+15551234567
|
|
|
|
# Alerts - Webhook
|
|
ALERT_WEBHOOK_ENABLED=true
|
|
ALERT_WEBHOOK_URL=https://utility-monitoring.example.com/webhook
|
|
ALERT_WEBHOOK_TOKEN=webhook_bearer_token
|
|
|
|
# Alerts - SCADA
|
|
ALERT_SCADA_ENABLED=true
|
|
|
|
# Logging
|
|
LOG_LEVEL=INFO
|
|
LOG_FORMAT=json
|
|
AUDIT_LOG_ENABLED=true
|
|
|
|
# Compliance
|
|
COMPLIANCE_STANDARDS=IEC_62443,ISO_27001,NIS2
|
|
7. Implementation Priorities (Updated)
|
|
Phase 1: Core Foundation (Week 1-2)
|
|
Project setup
|
|
Database client
|
|
Auto-discovery
|
|
Configuration management
|
|
Phase 2: Safety Framework (Week 2-3) ⚠️ NEW
|
|
Safety limit enforcer
|
|
Database watchdog (20-minute timeout)
|
|
Emergency stop manager
|
|
Alert manager (email, SMS, webhook)
|
|
Safety database tables
|
|
Audit logging
|
|
Phase 3: Setpoint Logic (Week 3-4)
|
|
Implement setpoint calculators
|
|
Implement SetpointManager with safety integration
|
|
Unit tests
|
|
Phase 4: Security Layer (Week 4-5)
|
|
Authentication
|
|
Authorization
|
|
TLS/SSL
|
|
Compliance logging (IEC 62443, ISO 27001, NIS2)
|
|
Phase 5: Protocol Servers (Week 5-7)
|
|
OPC UA Server
|
|
Modbus TCP Server
|
|
REST API (including emergency stop endpoints)
|
|
Phase 6: Integration and Testing (Week 7-8)
|
|
Integration testing
|
|
Docker containerization
|
|
Documentation
|
|
Phase 7: Production Hardening (Week 8-9)
|
|
Error handling
|
|
Monitoring
|
|
Security hardening
|
|
8. Success Criteria (Updated)
|
|
The implementation is considered complete when:
|
|
✅ Auto-discovery successfully discovers all stations and pumps
|
|
✅ Safety limit enforcer clamps all setpoints within hard limits
|
|
✅ Database watchdog detects 20-minute timeout and triggers failsafe
|
|
✅ Emergency stop API works for single pump and system-wide
|
|
✅ Multi-channel alerts (email, SMS, webhook) are sent for critical events
|
|
✅ All three control types calculate correct setpoints
|
|
✅ OPC UA server exposes hierarchical structure
|
|
✅ Modbus TCP server responds correctly
|
|
✅ REST API provides all endpoints with OpenAPI docs
|
|
✅ Security layer enforces authentication and authorization
|
|
✅ Audit logging records all safety events (immutable)
|
|
✅ Compliance requirements met (IEC 62443, ISO 27001, NIS2)
|
|
✅ Container deploys successfully
|
|
✅ Integration tests pass with real SCADA systems
|
|
✅ Performance meets requirements
|
|
9. Compliance Documentation
|
|
9.1 IEC 62443 (Industrial Cybersecurity)
|
|
Implemented Controls:
|
|
Requirement
|
|
|
|
Implementation
|
|
SR 1.1 - Human user identification
|
|
|
|
API key authentication, user tracking in audit log
|
|
SR 1.2 - Software process identification
|
|
|
|
Process-level logging, component identification
|
|
SR 1.3 - Device identification
|
|
|
|
IP address logging, geofencing
|
|
SR 1.5 - Authenticator management
|
|
|
|
Secure API key storage, certificate management
|
|
SR 2.1 - Authorization enforcement
|
|
|
|
Role-based access control (read, write, admin)
|
|
SR 3.1 - Communication integrity
|
|
|
|
TLS/SSL encryption, OPC UA SignAndEncrypt
|
|
SR 3.3 - Security functionality verification
|
|
|
|
Health checks, integrity monitoring
|
|
SR 4.1 - Information confidentiality
|
|
|
|
Encryption at rest and in transit
|
|
SR 5.1 - Network segmentation
|
|
|
|
DMZ deployment, firewall rules
|
|
SR 6.1 - Audit log accessibility
|
|
|
|
Immutable audit log, query API
|
|
SR 7.1 - Denial of service protection
|
|
|
|
Rate limiting, resource monitoring
|
|
9.2 ISO 27001 (Information Security)
|
|
Implemented Controls:
|
|
Control
|
|
|
|
Implementation
|
|
A.9 - Access control
|
|
|
|
Authentication, authorization, RBAC
|
|
A.10 - Cryptography
|
|
|
|
TLS/SSL, certificate-based auth
|
|
A.12 - Operations security
|
|
|
|
Logging, monitoring, change management
|
|
A.16 - Incident management
|
|
|
|
Alert system, emergency stop, audit trail
|
|
A.18 - Compliance
|
|
|
|
Audit logging, compliance reporting
|
|
9.3 NIS2 Directive (EU Critical Infrastructure)
|
|
Implemented Measures:
|
|
Requirement
|
|
|
|
Implementation
|
|
Risk management
|
|
|
|
Multi-layer safety limits, failsafe mechanisms
|
|
Incident handling
|
|
|
|
Emergency stop, alert system, audit trail
|
|
Business continuity
|
|
|
|
Failover to default setpoints, caching
|
|
Supply chain security
|
|
|
|
Dependency management, security updates
|
|
Security monitoring
|
|
|
|
Real-time monitoring, anomaly detection
|
|
Incident reporting
|
|
|
|
Audit log, compliance reports
|
|
10. Additional Notes
|
|
10.1 Safety Philosophy
|
|
The implementation follows the "Defense in Depth" principle with multiple independent safety layers:
|
|
Physical Layer: PLC/VFD hard limits (15-55 Hz)
|
|
Adapter Layer: Safety limit enforcer (20-50 Hz)
|
|
Optimization Layer: Optimization constraints (25-45 Hz)
|
|
Watchdog Layer: Database timeout detection (20 minutes)
|
|
Emergency Layer: Big red button (immediate halt)
|
|
Fail-Safe Design: On any failure, the system reverts to safe defaults (existing pump configuration).
|
|
10.2 Utility Sales Benefits
|
|
Key selling points for Italian utilities:
|
|
✅ "Cannot damage equipment" - Three independent safety layers
|
|
✅ "Fails safe" - Automatic reversion to safe defaults
|
|
✅ "Complete audit trail" - Meets regulatory requirements
|
|
✅ "Operators have final control" - Emergency stop always available
|
|
✅ "Proven security" - Follows IEC 62443, ISO 27001, NIS2
|
|
✅ "Immediate alerts" - Email, SMS, SCADA, webhooks
|
|
END OF SPECIFICATION v2.0
|
|
This specification provides complete requirements for implementing Calejo Control adapter with comprehensive safety and security framework. Begin with Phase 1 and proceed sequentially.
|
|
|