diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md deleted file mode 100644 index ce1194c..0000000 --- a/DEVELOPMENT.md +++ /dev/null @@ -1,17 +0,0 @@ -# Development Setup - -## Next Steps - -1. **Review Specification**: Open `specification.html` in a browser to review the project requirements -2. **Technology Stack**: Determine the appropriate technology stack based on the specification -3. **Project Setup**: - - Set up development environment - - Install dependencies -4. **Implementation**: Begin implementing features according to the specification - -## Current Status - -- ✅ Specification downloaded and saved -- ✅ Basic repository structure created -- ✅ Git repository initialized and pushed to remote -- ⏳ Ready for development setup and implementation \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b16b8c6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,35 @@ +# Calejo Control Adapter Dockerfile + +FROM python:3.11-slim + +# Set working directory +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Create non-root user +RUN useradd -m -u 1000 calejo && chown -R calejo:calejo /app +USER calejo + +# Expose ports +EXPOSE 8080 # REST API +EXPOSE 4840 # OPC UA +EXPOSE 502 # Modbus TCP + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8080/health || exit 1 + +# Run the application +CMD ["python", "-m", "src.main"] \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a47377e --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Calejo Control Team + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index b836594..ca60bea 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,192 @@ -# Project +# Calejo Control Adapter -This repository contains the project implementation based on the provided specification. +**Multi-protocol integration adapter for municipal wastewater pump stations with comprehensive safety and security framework** -## Structure +[](https://www.python.org/) +[](https://opensource.org/licenses/MIT) +[](https://www.docker.com/) -- `specification.html` - Project specification document (HTML format) -- `specification.txt` - Information about the specification format -- `src/` - Source code for the project -- `tests/` - Test files -- `config/` - Configuration files +## Overview + +The Calejo Control Adapter translates optimized pump control plans from Calejo Optimize into real-time control signals for municipal wastewater pump stations. It supports diverse SCADA systems with minimal configuration through automatic discovery and multiple protocol support. + +### Key Features + +- **Multi-Protocol Support**: OPC UA, Modbus TCP, and REST API simultaneously +- **Auto-Discovery**: Automatically discovers pump stations and pumps from database +- **Safety Framework**: Multi-layer limits, watchdogs, emergency stop, failsafe mechanisms +- **Security**: Authentication, authorization, encryption, audit logging +- **Compliance**: IEC 62443, NIS2 Directive, ISO 27001 +- **High Availability**: Caching, failover, health monitoring + +## Architecture + +``` +┌─────────────────────────────────────────────────────────┐ +│ Calejo Optimize Container (Existing) │ +│ - Optimization Engine │ +│ - PostgreSQL Database (pump plans) │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ Calejo Control Adapter (NEW - TO BE IMPLEMENTED) │ +│ │ +│ ┌────────────────────────────────────────────────┐ │ +│ │ Core Components: │ │ +│ │ 1. Auto-Discovery Module │ │ +│ │ 2. Security Layer │ │ +│ │ 3. Safety Framework ⚠️ NEW │ │ +│ │ 4. Plan-to-Setpoint Logic Engine │ │ +│ │ 5. Multi-Protocol Server │ │ +│ │ - OPC UA Server │ │ +│ │ - Modbus TCP Server │ │ +│ │ - REST API │ │ +│ └────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────┘ + ↓ + (Multiple Protocols) + ↓ + ┌─────────────────┼─────────────────┐ + ↓ ↓ ↓ + Siemens WinCC Schneider EcoStruxure Rockwell FactoryTalk +``` + +## Project Structure + +``` +calejo-control-adapter/ +├── src/ +│ ├── core/ +│ │ ├── auto_discovery.py # Auto-discovery module +│ │ ├── security.py # Security layer +│ │ ├── safety.py # Safety framework +│ │ └── plan_to_setpoint.py # Plan-to-setpoint logic +│ ├── protocols/ +│ │ ├── opc_ua_server.py # OPC UA server +│ │ ├── modbus_server.py # Modbus TCP server +│ │ └── rest_api.py # REST API server +│ ├── database/ +│ │ ├── client.py # Database client +│ │ └── models.py # Data models +│ ├── monitoring/ +│ │ ├── watchdog.py # Database watchdog +│ │ └── alerts.py # Alert manager +│ └── main.py # Main application +├── tests/ +├── config/ +│ ├── settings.py # Application settings +│ └── docker-compose.yml # Docker configuration +├── docs/ +│ └── specification.txt # Full implementation specification +├── requirements.txt # Python dependencies +├── Dockerfile # Docker container definition +└── README.md # This file +``` ## Getting Started -1. Review the specification in `specification.html` (requires JavaScript execution in browser) -2. Read `specification.txt` for information about the specification format -3. Set up development environment -4. Start implementing features according to the specification +### Prerequisites + +- Python 3.11+ +- PostgreSQL 14+ +- Docker (optional) + +### Installation + +1. **Clone the repository** + ```bash + git clone http://95.111.206.201:3000/calejocontrol/CalejoControl.git + cd calejo-control-adapter + ``` + +2. **Install dependencies** + ```bash + pip install -r requirements.txt + ``` + +3. **Set up environment variables** + ```bash + cp config/.env.example config/.env + # Edit config/.env with your database and security settings + ``` + +4. **Run the application** + ```bash + python -m src.main + ``` + +### Docker Deployment + +```bash +# Build the container +docker build -t calejo-control-adapter . + +# Run the container +docker run -p 8080:8080 -p 4840:4840 -p 502:502 calejo-control-adapter +``` + +## Configuration + +Key configuration options: + +- `DATABASE_URL`: PostgreSQL connection string +- `OPC_UA_ENDPOINT`: OPC UA server endpoint (default: opc.tcp://0.0.0.0:4840) +- `MODBUS_PORT`: Modbus TCP port (default: 502) +- `REST_API_PORT`: REST API port (default: 8080) +- `SAFETY_TIMEOUT_SECONDS`: Database watchdog timeout (default: 1200) + +## Safety Framework + +The adapter implements a comprehensive three-layer safety architecture: + +1. **Layer 1**: Physical Hard Limits (PLC/VFD) - 15-55 Hz +2. **Layer 2**: Station Safety Limits (Database) - 20-50 Hz (enforced by adapter) +3. **Layer 3**: Optimization Constraints (Calejo Optimize) - 25-45 Hz + +### Safety Features + +- **Hard Operational Limits**: Speed, level, power, and flow limits +- **Rate of Change Limits**: Prevent sudden speed changes +- **Database Watchdog**: Reverts to safe defaults if updates stop +- **Emergency Stop**: Manual override capability +- **Failsafe Mode**: Automatic fallback to default setpoints + +## Security & Compliance + +- **Authentication**: JWT tokens and certificate-based authentication +- **Authorization**: Role-based access control (RBAC) +- **Encryption**: TLS/SSL for all communications +- **Audit Logging**: Immutable audit trail for compliance +- **Standards**: IEC 62443, ISO 27001, NIS2 Directive ## Development -This project is in early development stages. The specification has been downloaded and the basic repository structure has been set up. \ No newline at end of file +### Running Tests + +```bash +pytest tests/ +``` + +### Code Quality + +```bash +flake8 src/ tests/ # Linting +mypy src/ # Type checking +black src/ tests/ # Code formatting +``` + +### Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Add tests +5. Submit a pull request + +## License + +This project is licensed under the MIT License - see the LICENSE file for details. + +## Support + +For support and documentation, refer to the specification in `docs/specification.txt` or contact the development team. \ No newline at end of file diff --git a/config/.env.example b/config/.env.example new file mode 100644 index 0000000..efb6da9 --- /dev/null +++ b/config/.env.example @@ -0,0 +1,26 @@ +# Database Configuration +DATABASE_URL=postgresql://control_reader:secure_password@localhost:5432/calejo + +# Protocol Endpoints +OPC_UA_ENDPOINT=opc.tcp://0.0.0.0:4840 +MODBUS_PORT=502 +REST_API_PORT=8080 + +# Safety Settings +SAFETY_TIMEOUT_SECONDS=1200 + +# Security Settings +JWT_SECRET_KEY=your-secret-key-change-in-production +JWT_ALGORITHM=HS256 + +# Alert Settings (Optional) +SMTP_SERVER=smtp.example.com +SMTP_PORT=587 +SMTP_USERNAME=your-email@example.com +SMTP_PASSWORD=your-email-password +TWILIO_ACCOUNT_SID=your-twilio-account-sid +TWILIO_AUTH_TOKEN=your-twilio-auth-token +TWILIO_PHONE_NUMBER=+1234567890 + +# Logging +LOG_LEVEL=INFO \ No newline at end of file diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..d3fb84c --- /dev/null +++ b/config/settings.py @@ -0,0 +1,41 @@ +""" +Application settings for Calejo Control Adapter. +""" + +from pydantic_settings import BaseSettings +from typing import Optional + + +class Settings(BaseSettings): + """Application settings loaded from environment variables.""" + + # Database + database_url: str = "postgresql://control_reader:secure_password@localhost:5432/calejo" + + # Protocol endpoints + opc_ua_endpoint: str = "opc.tcp://0.0.0.0:4840" + modbus_port: int = 502 + rest_api_port: int = 8080 + + # Safety settings + safety_timeout_seconds: int = 1200 # 20 minutes + + # Security settings + jwt_secret_key: str = "your-secret-key-change-in-production" + jwt_algorithm: str = "HS256" + + # Alert settings + smtp_server: Optional[str] = None + smtp_port: Optional[int] = 587 + smtp_username: Optional[str] = None + smtp_password: Optional[str] = None + twilio_account_sid: Optional[str] = None + twilio_auth_token: Optional[str] = None + twilio_phone_number: Optional[str] = None + + # Logging + log_level: str = "INFO" + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" \ No newline at end of file diff --git a/docs/specification.txt b/docs/specification.txt new file mode 100644 index 0000000..da73613 --- /dev/null +++ b/docs/specification.txt @@ -0,0 +1,1755 @@ +Last modified: 13:37 10/24 +Calejo Control Adapter - Implementation Specification v2.0 +Project: Calejo Control Adapter +Version: 2.0 (with Safety & Security Framework) +Date: October 18, 2025 +Author: Manus AI +Target Audience: Development LLM / Software Engineers +Document Revision History +Version + +Date + +Changes +1.0 + +2025-10-18 + +Initial specification +2.0 + +2025-10-18 + +Added comprehensive safety and security framework +1. Project Overview +1.1 Purpose +Implement a multi-protocol integration adapter that translates optimized pump control plans from Calejo Optimize into real-time control signals for municipal wastewater pump stations. The adapter must support diverse SCADA systems with minimal configuration through automatic discovery and multiple protocol support. +Critical Requirement: The adapter must implement comprehensive safety mechanisms to ensure pump operations remain within safe limits under all conditions, including system failures, communication loss, and cyber attacks. +1.2 Architecture Summary +┌─────────────────────────────────────────────────────────┐ +│ Calejo Optimize Container (Existing) │ +│ - Optimization Engine │ +│ - PostgreSQL Database (pump plans) │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ Calejo Control Adapter (NEW - TO BE IMPLEMENTED) │ +│ │ +│ ┌────────────────────────────────────────────────┐ │ +│ │ Core Components: │ │ +│ │ 1. Auto-Discovery Module │ │ +│ │ 2. Security Layer │ │ +│ │ 3. Safety Framework ⚠️ NEW │ │ +│ │ 4. Plan-to-Setpoint Logic Engine │ │ +│ │ 5. Multi-Protocol Server │ │ +│ │ - OPC UA Server │ │ +│ │ - Modbus TCP Server │ │ +│ │ - REST API │ │ +│ └────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────┘ + ↓ + (Multiple Protocols) + ↓ + ┌─────────────────┼─────────────────┐ + ↓ ↓ ↓ + Siemens WinCC Schneider EcoStruxure Rockwell FactoryTalk +1.3 Key Requirements +The implementation must provide: +Auto-Discovery: Automatically discover pump stations and pumps from database +Security: Authentication, authorization, encryption, audit logging +Safety Framework ⚠️: Multi-layer limits, watchdogs, emergency stop, failsafe mechanisms +Plan-to-Setpoint Logic: Transform optimization plans based on pump control type +Multi-Protocol Support: OPC UA, Modbus TCP, REST API simultaneously +High Availability: Caching, failover, health monitoring +Compliance: IEC 62443, NIS2 Directive, ISO 27001 +Easy Deployment: Docker container, environment-based configuration +2. Technology Stack +2.1 Required Technologies +Component + +Technology + +Version + +Justification +Language + +Python + +3.11+ + +Excellent library support for OPC UA, Modbus, REST +Database Client + +psycopg2 + +2.9+ + +PostgreSQL driver for database queries +OPC UA Server + +asyncua + +1.0+ + +Modern async OPC UA implementation +Modbus Server + +pymodbus + +3.5+ + +Industry-standard Modbus TCP/RTU library +REST API + +FastAPI + +0.104+ + +High-performance async REST framework +Security + +cryptography, PyJWT + +Latest + +TLS/SSL, JWT tokens, certificate management +Configuration + +pydantic-settings + +2.0+ + +Type-safe environment variable management +Logging + +structlog + +23.0+ + +Structured logging for production monitoring +Alerting + +aiosmtplib, twilio + +Latest + +Email and SMS alerts +Container + +Docker + +24.0+ + +Containerization for deployment +2.2 Development Dependencies +# requirements.txt +asyncua==1.0.6 +pymodbus==3.5.4 +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +psycopg2-binary==2.9.9 +pydantic==2.5.0 +pydantic-settings==2.1.0 +cryptography==41.0.7 +PyJWT==2.8.0 +structlog==23.2.0 +python-dotenv==1.0.0 +redis==5.0.1 # For distributed caching (optional) +prometheus-client==0.19.0 # For metrics +aiosmtplib==3.0.1 # For email alerts +twilio==8.10.0 # For SMS alerts +httpx==0.25.0 # For webhook alerts +3. Database Schema +3.1 Core Tables +3.1.1 pump_stations +Metadata about pump stations. +CREATE TABLE pump_stations ( + station_id VARCHAR(50) PRIMARY KEY, + station_name VARCHAR(200) NOT NULL, + location VARCHAR(200), + latitude DECIMAL(10, 8), + longitude DECIMAL(11, 8), + timezone VARCHAR(50) DEFAULT 'Europe/Rome', + active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); +3.1.2 pumps +Metadata about individual pumps. +CREATE TABLE pumps ( + pump_id VARCHAR(50) NOT NULL, + station_id VARCHAR(50) NOT NULL, + pump_name VARCHAR(200), + pump_type VARCHAR(50), -- 'SUBMERSIBLE', 'CENTRIFUGAL', etc. + control_type VARCHAR(50) NOT NULL, -- 'LEVEL_CONTROLLED', 'POWER_CONTROLLED', 'DIRECT_SPEED' + manufacturer VARCHAR(100), + model VARCHAR(100), + rated_power_kw DECIMAL(10, 2), + min_speed_hz DECIMAL(5, 2) DEFAULT 20.0, + max_speed_hz DECIMAL(5, 2) DEFAULT 50.0, + + -- Default setpoint (used in failsafe mode) + default_setpoint_hz DECIMAL(5, 2) NOT NULL DEFAULT 35.0, + + -- Control-specific parameters (JSON) + control_parameters JSONB, + + active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW(), + + PRIMARY KEY (station_id, pump_id), + FOREIGN KEY (station_id) REFERENCES pump_stations(station_id) +); + +COMMENT ON COLUMN pumps.default_setpoint_hz IS 'Default safe setpoint used in failsafe mode (existing pump configuration)'; +3.1.3 pump_plans +Optimization plans generated by Calejo Optimize. +CREATE TABLE pump_plans ( + plan_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + interval_start TIMESTAMP NOT NULL, + interval_end TIMESTAMP NOT NULL, + + -- Optimization outputs + target_flow_m3h DECIMAL(10, 2), + target_power_kw DECIMAL(10, 2), + target_level_m DECIMAL(5, 2), + suggested_speed_hz DECIMAL(5, 2), + + -- Metadata + plan_created_at TIMESTAMP DEFAULT NOW(), + optimization_run_id INTEGER, + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end); +3.1.4 pump_feedback +Real-time feedback from pumps. +CREATE TABLE pump_feedback ( + feedback_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + timestamp TIMESTAMP NOT NULL DEFAULT NOW(), + + -- Actual measurements + actual_speed_hz DECIMAL(5, 2), + actual_power_kw DECIMAL(10, 2), + actual_flow_m3h DECIMAL(10, 2), + wet_well_level_m DECIMAL(5, 2), + pump_running BOOLEAN, + + -- Status + alarm_active BOOLEAN DEFAULT FALSE, + alarm_code VARCHAR(50), + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +CREATE INDEX idx_pump_feedback_latest ON pump_feedback(station_id, pump_id, timestamp DESC); +3.2 Safety and Security Tables +3.2.1 pump_safety_limits +Purpose: Define hard operational limits that cannot be exceeded by optimization or manual control. +CREATE TABLE pump_safety_limits ( + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + + -- Speed limits (Layer 2: Station Safety Limits) + hard_min_speed_hz DECIMAL(5, 2) NOT NULL, + hard_max_speed_hz DECIMAL(5, 2) NOT NULL, + + -- Level limits + hard_min_level_m DECIMAL(5, 2), + hard_max_level_m DECIMAL(5, 2), + emergency_stop_level_m DECIMAL(5, 2), -- Emergency stop if exceeded + dry_run_protection_level_m DECIMAL(5, 2), -- Stop pump if level too low + + -- Power and flow limits + hard_max_power_kw DECIMAL(10, 2), + hard_max_flow_m3h DECIMAL(10, 2), + + -- Operational limits + max_starts_per_hour INTEGER DEFAULT 6, + min_run_time_seconds INTEGER DEFAULT 300, + max_continuous_run_hours INTEGER DEFAULT 24, + + -- Rate of change limits (prevent sudden changes that damage equipment) + max_speed_change_hz_per_min DECIMAL(5, 2) DEFAULT 5.0, + + -- Metadata + set_by VARCHAR(100), + set_at TIMESTAMP DEFAULT NOW(), + approved_by VARCHAR(100), -- Dual approval + approved_at TIMESTAMP, + notes TEXT, + + PRIMARY KEY (station_id, pump_id), + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id), + + -- Constraints + CONSTRAINT check_speed_limits CHECK (hard_min_speed_hz >= 15 AND hard_max_speed_hz <= 55), + CONSTRAINT check_min_max CHECK (hard_min_speed_hz < hard_max_speed_hz), + CONSTRAINT check_approved CHECK (approved_by IS NULL OR approved_by != set_by) -- Dual approval +); + +COMMENT ON TABLE pump_safety_limits IS 'Hard operational limits enforced by Calejo Control adapter (Layer 2)'; +COMMENT ON COLUMN pump_safety_limits.hard_min_speed_hz IS 'Minimum speed enforced by adapter (must be >= PLC physical limit)'; +COMMENT ON COLUMN pump_safety_limits.hard_max_speed_hz IS 'Maximum speed enforced by adapter (must be <= PLC physical limit)'; +3.2.2 safety_limit_violations +Purpose: Audit trail of all safety limit violations. +CREATE TABLE safety_limit_violations ( + violation_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + requested_setpoint DECIMAL(5, 2), + enforced_setpoint DECIMAL(5, 2), + violations TEXT[], -- Array of violation descriptions + timestamp TIMESTAMP DEFAULT NOW(), + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +CREATE INDEX idx_violations_timestamp ON safety_limit_violations(timestamp DESC); + +COMMENT ON TABLE safety_limit_violations IS 'Audit trail of safety limit violations (immutable)'; +3.2.3 failsafe_events +Purpose: Record when adapter enters failsafe mode (e.g., database timeout). +CREATE TABLE failsafe_events ( + event_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + event_type VARCHAR(50) NOT NULL, -- 'DATABASE_TIMEOUT', 'COMMUNICATION_LOSS', 'INVALID_DATA' + default_setpoint DECIMAL(5, 2), + triggered_by VARCHAR(100), + timestamp TIMESTAMP DEFAULT NOW(), + cleared_at TIMESTAMP, + notes TEXT, + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +CREATE INDEX idx_failsafe_events_active ON failsafe_events(station_id, pump_id, timestamp DESC) + WHERE cleared_at IS NULL; + +COMMENT ON TABLE failsafe_events IS 'Record of failsafe mode activations'; +3.2.4 emergency_stop_events +Purpose: Record emergency stop activations ("big red button"). +CREATE TABLE emergency_stop_events ( + event_id SERIAL PRIMARY KEY, + station_id VARCHAR(50), -- NULL = all stations + pump_id VARCHAR(50), -- NULL = all pumps at station + triggered_by VARCHAR(100) NOT NULL, + reason TEXT NOT NULL, + timestamp TIMESTAMP DEFAULT NOW(), + cleared_at TIMESTAMP, + cleared_by VARCHAR(100), + clear_notes TEXT, + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +CREATE INDEX idx_emergency_stop_active ON emergency_stop_events(timestamp DESC) + WHERE cleared_at IS NULL; + +COMMENT ON TABLE emergency_stop_events IS 'Emergency stop event log (big red button)'; +3.2.5 audit_log +Purpose: Immutable audit trail for compliance (IEC 62443, ISO 27001, NIS2). +CREATE TABLE audit_log ( + log_id BIGSERIAL PRIMARY KEY, + timestamp TIMESTAMP DEFAULT NOW(), + event_type VARCHAR(50) NOT NULL, + severity VARCHAR(20) NOT NULL, -- 'INFO', 'WARNING', 'CRITICAL' + station_id VARCHAR(50), + pump_id VARCHAR(50), + user_id VARCHAR(100), + ip_address INET, + protocol VARCHAR(20), -- 'OPC_UA', 'MODBUS', 'REST_API' + action VARCHAR(100), + resource VARCHAR(200), + result VARCHAR(20), -- 'SUCCESS', 'FAILURE', 'DENIED' + event_data JSONB +); + +CREATE INDEX idx_audit_log_timestamp ON audit_log(timestamp DESC); +CREATE INDEX idx_audit_log_severity ON audit_log(severity, timestamp DESC); +CREATE INDEX idx_audit_log_user ON audit_log(user_id, timestamp DESC); + +-- Make audit log immutable (append-only) +CREATE RULE audit_log_no_update AS ON UPDATE TO audit_log DO INSTEAD NOTHING; +CREATE RULE audit_log_no_delete AS ON DELETE TO audit_log DO INSTEAD NOTHING; + +COMMENT ON TABLE audit_log IS 'Immutable audit trail for compliance (IEC 62443, ISO 27001, NIS2 Directive)'; +3.3 Database User Permissions +-- Create read-only user for adapter +CREATE USER control_reader WITH PASSWORD 'secure_password'; + +GRANT CONNECT ON DATABASE calejo TO control_reader; +GRANT USAGE ON SCHEMA public TO control_reader; +GRANT SELECT ON pump_stations, pumps, pump_plans, pump_feedback, pump_safety_limits TO control_reader; +GRANT INSERT ON safety_limit_violations, failsafe_events, emergency_stop_events, audit_log TO control_reader; +GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO control_reader; +4. Core Component Specifications +4.1 Auto-Discovery Module +File: src/core/auto_discovery.py +Purpose: Automatically discover pump stations and pumps from database on startup. +(Implementation unchanged from v1.0 - see previous specification) +4.2 Security Layer +File: src/core/security.py +Purpose: Provide authentication, authorization, encryption, and audit logging. +(Implementation from v1.0, with additions below) +4.2.1 Compliance Requirements +The security layer must meet the following standards for Italian utilities: +IEC 62443 (Industrial Cybersecurity): +Zone and conduit model (IT network, DMZ, OT network) +Defense in depth +Role-based access control (RBAC) +Audit logging +ISO 27001 (Information Security Management): +Risk assessment +Access control +Cryptography +Incident management +NIS2 Directive (EU Critical Infrastructure): +Risk management measures +Incident reporting +Supply chain security +Cybersecurity training +Implementation: +class ComplianceManager: + """ + Ensure compliance with IEC 62443, ISO 27001, and NIS2 Directive. + """ + + def __init__(self, audit_logger: AuditLogger): + self.audit_logger = audit_logger + + def log_compliance_event( + self, + standard: str, # 'IEC_62443', 'ISO_27001', 'NIS2' + requirement: str, + event_type: str, + details: dict + ): + """Log compliance-related event.""" + self.audit_logger.log( + event_type='COMPLIANCE', + severity='INFO', + event_data={ + 'standard': standard, + 'requirement': requirement, + 'event_type': event_type, + 'details': details + } + ) +4.3 Safety Framework ⚠️ NEW +File: src/core/safety.py +Purpose: Multi-layer safety mechanisms to prevent equipment damage and operational hazards. +4.3.1 Safety Limit Enforcer +Purpose: Enforce hard operational limits on all setpoints (last line of defense). +from typing import Tuple, List, Optional, Dict +from dataclasses import dataclass +import structlog + +logger = structlog.get_logger() + +@dataclass +class SafetyLimits: + """Safety limits for a pump.""" + hard_min_speed_hz: float + hard_max_speed_hz: float + hard_min_level_m: Optional[float] + hard_max_level_m: Optional[float] + hard_max_power_kw: Optional[float] + max_speed_change_hz_per_min: float + +class SafetyLimitEnforcer: + """ + Enforces multi-layer safety limits on all setpoints. + + This is the LAST line of defense before setpoints are exposed to SCADA. + ALL setpoints MUST pass through this enforcer. + + Three-Layer Architecture: + - Layer 1: Physical Hard Limits (PLC/VFD) - 15-55 Hz + - Layer 2: Station Safety Limits (Database) - 20-50 Hz (enforced here) + - Layer 3: Optimization Constraints (Calejo Optimize) - 25-45 Hz + """ + + def __init__(self, db_client: DatabaseClient, audit_logger: AuditLogger): + self.db_client = db_client + self.audit_logger = audit_logger + self.safety_limits_cache: Dict[Tuple[str, str], SafetyLimits] = {} + self.previous_setpoints: Dict[Tuple[str, str], float] = {} + self._load_safety_limits() + + def _load_safety_limits(self): + """Load all safety limits from database into cache.""" + query = """ + SELECT station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz, + hard_min_level_m, hard_max_level_m, hard_max_power_kw, + max_speed_change_hz_per_min + FROM pump_safety_limits + WHERE approved_at IS NOT NULL -- Only use approved limits + """ + limits = self.db_client.execute_query(query) + + for limit in limits: + key = (limit['station_id'], limit['pump_id']) + self.safety_limits_cache[key] = SafetyLimits( + hard_min_speed_hz=limit['hard_min_speed_hz'], + hard_max_speed_hz=limit['hard_max_speed_hz'], + hard_min_level_m=limit.get('hard_min_level_m'), + hard_max_level_m=limit.get('hard_max_level_m'), + hard_max_power_kw=limit.get('hard_max_power_kw'), + max_speed_change_hz_per_min=limit['max_speed_change_hz_per_min'] + ) + + logger.info("safety_limits_loaded", pump_count=len(limits)) + + def enforce_setpoint( + self, + station_id: str, + pump_id: str, + setpoint: float + ) -> Tuple[float, List[str]]: + """ + Enforce safety limits on setpoint. + + Args: + station_id: Station identifier + pump_id: Pump identifier + setpoint: Proposed setpoint (Hz) + + Returns: + Tuple of (enforced_setpoint, violations) + - enforced_setpoint: Safe setpoint (clamped if necessary) + - violations: List of safety violations (for logging/alerting) + """ + violations = [] + enforced_setpoint = setpoint + + # Get safety limits + key = (station_id, pump_id) + limits = self.safety_limits_cache.get(key) + + if not limits: + logger.error( + "no_safety_limits", + station_id=station_id, + pump_id=pump_id + ) + # CRITICAL: No safety limits defined - reject setpoint + self.audit_logger.log( + event_type='SAFETY_ERROR', + severity='CRITICAL', + station_id=station_id, + pump_id=pump_id, + action='enforce_setpoint', + result='REJECTED', + event_data={'reason': 'NO_SAFETY_LIMITS_DEFINED'} + ) + return (0.0, ["NO_SAFETY_LIMITS_DEFINED"]) + + # Check minimum speed + if enforced_setpoint < limits.hard_min_speed_hz: + violations.append( + f"BELOW_MIN_SPEED: {enforced_setpoint:.2f} < {limits.hard_min_speed_hz:.2f}" + ) + enforced_setpoint = limits.hard_min_speed_hz + + # Check maximum speed + if enforced_setpoint > limits.hard_max_speed_hz: + violations.append( + f"ABOVE_MAX_SPEED: {enforced_setpoint:.2f} > {limits.hard_max_speed_hz:.2f}" + ) + enforced_setpoint = limits.hard_max_speed_hz + + # Check rate of change (prevent sudden speed changes that damage equipment) + previous_setpoint = self.previous_setpoints.get(key) + if previous_setpoint is not None: + max_change = limits.max_speed_change_hz_per_min * 5.0 # 5-minute interval + actual_change = abs(enforced_setpoint - previous_setpoint) + + if actual_change > max_change: + # Limit rate of change + direction = 1 if enforced_setpoint > previous_setpoint else -1 + enforced_setpoint = previous_setpoint + (direction * max_change) + violations.append( + f"RATE_OF_CHANGE_LIMITED: {actual_change:.2f} Hz > {max_change:.2f} Hz" + ) + + # Store current setpoint for next rate-of-change check + self.previous_setpoints[key] = enforced_setpoint + + # Log violations + if violations: + logger.warning( + "safety_limit_violation", + station_id=station_id, + pump_id=pump_id, + requested_setpoint=setpoint, + enforced_setpoint=enforced_setpoint, + violations=violations + ) + + # Record violation in database for audit + self._record_violation(station_id, pump_id, setpoint, enforced_setpoint, violations) + + # Log to audit trail + self.audit_logger.log( + event_type='SAFETY_VIOLATION', + severity='WARNING', + station_id=station_id, + pump_id=pump_id, + action='enforce_setpoint', + result='CLAMPED', + event_data={ + 'requested': setpoint, + 'enforced': enforced_setpoint, + 'violations': violations + } + ) + + return (enforced_setpoint, violations) + + def _record_violation( + self, + station_id: str, + pump_id: str, + requested: float, + enforced: float, + violations: List[str] + ): + """Record safety limit violation in database.""" + query = """ + INSERT INTO safety_limit_violations + (station_id, pump_id, requested_setpoint, enforced_setpoint, violations, timestamp) + VALUES (%s, %s, %s, %s, %s, NOW()) + """ + self.db_client.execute(query, (station_id, pump_id, requested, enforced, violations)) +4.3.2 Database Watchdog +Purpose: Detect if optimization hasn't updated plans for 20 minutes and revert to safe defaults. +import asyncio +from datetime import datetime, timedelta + +class DatabaseWatchdog: + """ + Monitors database updates and triggers failsafe mode if updates stop. + + Requirement: Detect if Calejo Optimize hasn't updated pump plans for 20 minutes. + Action: Revert to default safe setpoints (existing pump configuration). + """ + + def __init__( + self, + db_client: DatabaseClient, + audit_logger: AuditLogger, + alert_manager: AlertManager, + timeout_seconds: int = 1200, # 20 minutes + check_interval_seconds: int = 60 # Check every minute + ): + self.db_client = db_client + self.audit_logger = audit_logger + self.alert_manager = alert_manager + self.timeout_seconds = timeout_seconds + self.check_interval_seconds = check_interval_seconds + self.failsafe_mode: Dict[Tuple[str, str], bool] = {} + + async def start(self): + """Start watchdog monitoring loop.""" + logger.info( + "database_watchdog_started", + timeout_seconds=self.timeout_seconds, + check_interval_seconds=self.check_interval_seconds + ) + + while True: + await asyncio.sleep(self.check_interval_seconds) + await self._check_updates() + + async def _check_updates(self): + """Check for stale data and trigger failsafe if needed.""" + now = datetime.now() + + # Query last update time for all pumps + query = """ + SELECT station_id, pump_id, MAX(plan_created_at) as last_update + FROM pump_plans + GROUP BY station_id, pump_id + """ + + results = self.db_client.execute_query(query) + + for row in results: + key = (row['station_id'], row['pump_id']) + last_update = row['last_update'] + + if last_update: + time_since_update = (now - last_update).total_seconds() + + # Check if update is stale + if time_since_update > self.timeout_seconds: + if not self.failsafe_mode.get(key, False): + # Trigger failsafe mode + await self._trigger_failsafe( + row['station_id'], + row['pump_id'], + time_since_update, + last_update + ) + self.failsafe_mode[key] = True + else: + # Updates are current, exit failsafe mode if active + if self.failsafe_mode.get(key, False): + await self._clear_failsafe(row['station_id'], row['pump_id']) + self.failsafe_mode[key] = False + + async def _trigger_failsafe( + self, + station_id: str, + pump_id: str, + time_since_update: float, + last_update: datetime + ): + """ + Trigger failsafe mode for a pump. + + Sets pump to default safe setpoint (existing configuration). + """ + logger.critical( + "watchdog_failsafe_triggered", + station_id=station_id, + pump_id=pump_id, + time_since_update_seconds=time_since_update, + last_update=last_update.isoformat() + ) + + # Get default safe setpoint from pump configuration + default_setpoint = await self._get_default_setpoint(station_id, pump_id) + + # Record failsafe event + query = """ + INSERT INTO failsafe_events + (station_id, pump_id, event_type, default_setpoint, triggered_by, timestamp) + VALUES (%s, %s, 'DATABASE_TIMEOUT', %s, 'DATABASE_WATCHDOG', NOW()) + """ + self.db_client.execute(query, (station_id, pump_id, default_setpoint)) + + # Log to audit trail + self.audit_logger.log( + event_type='FAILSAFE_TRIGGERED', + severity='CRITICAL', + station_id=station_id, + pump_id=pump_id, + action='trigger_failsafe', + result='SUCCESS', + event_data={ + 'reason': 'DATABASE_TIMEOUT', + 'time_since_update_seconds': time_since_update, + 'default_setpoint': default_setpoint + } + ) + + # Send alert to operators + await self.alert_manager.send_alert( + severity='CRITICAL', + title=f'FAILSAFE MODE: {station_id}/{pump_id}', + message=( + f"No database updates for {time_since_update/60:.1f} minutes. " + f"Reverting to default setpoint: {default_setpoint} Hz. " + f"Last update: {last_update.isoformat()}" + ), + station_id=station_id, + pump_id=pump_id + ) + + async def _clear_failsafe(self, station_id: str, pump_id: str): + """Clear failsafe mode when database updates resume.""" + logger.info( + "watchdog_failsafe_cleared", + station_id=station_id, + pump_id=pump_id + ) + + # Update failsafe event + query = """ + UPDATE failsafe_events + SET cleared_at = NOW() + WHERE station_id = %s AND pump_id = %s AND cleared_at IS NULL + """ + self.db_client.execute(query, (station_id, pump_id)) + + # Log to audit trail + self.audit_logger.log( + event_type='FAILSAFE_CLEARED', + severity='INFO', + station_id=station_id, + pump_id=pump_id, + action='clear_failsafe', + result='SUCCESS', + event_data={'reason': 'DATABASE_UPDATES_RESUMED'} + ) + + # Send alert + await self.alert_manager.send_alert( + severity='INFO', + title=f'Failsafe Cleared: {station_id}/{pump_id}', + message='Database updates resumed. Returning to normal operation.', + station_id=station_id, + pump_id=pump_id + ) + + async def _get_default_setpoint(self, station_id: str, pump_id: str) -> float: + """ + Get default safe setpoint for pump (existing configuration). + + Returns pump's configured default_setpoint_hz. + """ + query = """ + SELECT default_setpoint_hz + FROM pumps + WHERE station_id = %s AND pump_id = %s + """ + result = self.db_client.execute_query(query, (station_id, pump_id)) + + if result and result[0]['default_setpoint_hz']: + return float(result[0]['default_setpoint_hz']) + + # Ultimate fallback (should never reach here) + logger.error( + "no_default_setpoint_configured", + station_id=station_id, + pump_id=pump_id + ) + return 35.0 # Conservative fallback +4.3.3 Emergency Stop Manager +Purpose: Provide "big red button" to immediately halt optimization control. +class EmergencyStopManager: + """ + Manages emergency stop functionality ("big red button"). + + Provides immediate halt of optimization control and reversion to safe defaults. + Supports both single-pump and system-wide emergency stop. + """ + + def __init__( + self, + db_client: DatabaseClient, + audit_logger: AuditLogger, + alert_manager: AlertManager + ): + self.db_client = db_client + self.audit_logger = audit_logger + self.alert_manager = alert_manager + self.emergency_stop_active: Dict[Tuple[Optional[str], Optional[str]], int] = {} + + async def trigger_emergency_stop( + self, + triggered_by: str, + reason: str, + station_id: Optional[str] = None, + pump_id: Optional[str] = None + ) -> int: + """ + Trigger emergency stop. + + Args: + triggered_by: User or system that triggered stop + reason: Reason for emergency stop + station_id: Optional - limit to specific station (None = all stations) + pump_id: Optional - limit to specific pump (None = all pumps at station) + + Returns: + event_id: Emergency stop event ID + """ + logger.critical( + "emergency_stop_triggered", + triggered_by=triggered_by, + reason=reason, + station_id=station_id, + pump_id=pump_id + ) + + # Record event + query = """ + INSERT INTO emergency_stop_events + (station_id, pump_id, triggered_by, reason, timestamp) + VALUES (%s, %s, %s, %s, NOW()) + RETURNING event_id + """ + result = self.db_client.execute(query, (station_id, pump_id, triggered_by, reason)) + event_id = result[0]['event_id'] + + # Set emergency stop flag + key = (station_id, pump_id) + self.emergency_stop_active[key] = event_id + + # Set all affected pumps to default setpoints + affected_pumps = await self._get_affected_pumps(station_id, pump_id) + + for pump in affected_pumps: + await self._set_pump_to_default( + pump['station_id'], + pump['pump_id'], + event_id + ) + + # Log to audit trail + self.audit_logger.log( + event_type='EMERGENCY_STOP', + severity='CRITICAL', + station_id=station_id, + pump_id=pump_id, + user_id=triggered_by, + action='trigger_emergency_stop', + result='SUCCESS', + event_data={ + 'event_id': event_id, + 'reason': reason, + 'affected_pumps': len(affected_pumps) + } + ) + + # Send alerts + await self.alert_manager.send_alert( + severity='CRITICAL', + title='EMERGENCY STOP ACTIVATED', + message=( + f"Emergency stop triggered by {triggered_by}. " + f"Reason: {reason}. " + f"Scope: {self._get_scope_description(station_id, pump_id)}. " + f"Affected pumps: {len(affected_pumps)}" + ), + station_id=station_id, + pump_id=pump_id + ) + + return event_id + + async def clear_emergency_stop( + self, + event_id: int, + cleared_by: str, + notes: str + ): + """Clear emergency stop and resume normal operation.""" + logger.info( + "emergency_stop_cleared", + event_id=event_id, + cleared_by=cleared_by + ) + + # Update event record + query = """ + UPDATE emergency_stop_events + SET cleared_at = NOW(), cleared_by = %s, clear_notes = %s + WHERE event_id = %s + RETURNING station_id, pump_id + """ + result = self.db_client.execute(query, (cleared_by, notes, event_id)) + + if result: + station_id = result[0]['station_id'] + pump_id = result[0]['pump_id'] + + # Clear emergency stop flag + key = (station_id, pump_id) + if key in self.emergency_stop_active: + del self.emergency_stop_active[key] + + # Log to audit trail + self.audit_logger.log( + event_type='EMERGENCY_STOP_CLEARED', + severity='INFO', + station_id=station_id, + pump_id=pump_id, + user_id=cleared_by, + action='clear_emergency_stop', + result='SUCCESS', + event_data={ + 'event_id': event_id, + 'notes': notes + } + ) + + # Send alert + await self.alert_manager.send_alert( + severity='INFO', + title='Emergency Stop Cleared', + message=f"Emergency stop cleared by {cleared_by}. Notes: {notes}", + station_id=station_id, + pump_id=pump_id + ) + + def is_emergency_stop_active( + self, + station_id: str, + pump_id: str + ) -> bool: + """Check if emergency stop is active for a pump.""" + # Check specific pump + if (station_id, pump_id) in self.emergency_stop_active: + return True + + # Check station-wide + if (station_id, None) in self.emergency_stop_active: + return True + + # Check system-wide + if (None, None) in self.emergency_stop_active: + return True + + return False + + async def _get_affected_pumps( + self, + station_id: Optional[str], + pump_id: Optional[str] + ) -> List[Dict]: + """Get list of pumps affected by emergency stop.""" + if station_id and pump_id: + # Single pump + query = """ + SELECT station_id, pump_id + FROM pumps + WHERE station_id = %s AND pump_id = %s AND active = TRUE + """ + return self.db_client.execute_query(query, (station_id, pump_id)) + elif station_id: + # All pumps at station + query = """ + SELECT station_id, pump_id + FROM pumps + WHERE station_id = %s AND active = TRUE + """ + return self.db_client.execute_query(query, (station_id,)) + else: + # ALL pumps in system + query = """ + SELECT station_id, pump_id + FROM pumps + WHERE active = TRUE + """ + return self.db_client.execute_query(query) + + async def _set_pump_to_default( + self, + station_id: str, + pump_id: str, + event_id: int + ): + """Set pump to default safe setpoint.""" + # Get default setpoint + query = """ + SELECT default_setpoint_hz + FROM pumps + WHERE station_id = %s AND pump_id = %s + """ + result = self.db_client.execute_query(query, (station_id, pump_id)) + + if result: + default_setpoint = result[0]['default_setpoint_hz'] + + logger.info( + "pump_set_to_default", + station_id=station_id, + pump_id=pump_id, + default_setpoint=default_setpoint, + event_id=event_id + ) + + # Note: Actual setpoint override is handled by SetpointManager + # This method just logs the action + + def _get_scope_description( + self, + station_id: Optional[str], + pump_id: Optional[str] + ) -> str: + """Get human-readable description of emergency stop scope.""" + if station_id and pump_id: + return f"Single pump ({station_id}/{pump_id})" + elif station_id: + return f"All pumps at station {station_id}" + else: + return "ALL PUMPS SYSTEM-WIDE" +4.3.4 Alert Manager +Purpose: Multi-channel alerting (email, SMS, SCADA, webhooks). +import aiosmtplib +from email.message import EmailMessage +from twilio.rest import Client as TwilioClient +import httpx + +class AlertManager: + """ + Send alerts via multiple channels. + + Supports: + - Email (SMTP) + - SMS (Twilio) + - SCADA HMI alarms (OPC UA) + - Webhooks (HTTP POST) + """ + + def __init__(self, config: AlertConfig): + self.config = config + self.twilio_client = None + if config.sms_enabled: + self.twilio_client = TwilioClient( + config.twilio_account_sid, + config.twilio_auth_token + ) + + async def send_alert( + self, + severity: str, # 'INFO', 'WARNING', 'CRITICAL' + title: str, + message: str, + station_id: Optional[str] = None, + pump_id: Optional[str] = None + ): + """Send alert via all configured channels.""" + + # Email + if self.config.email_enabled: + await self._send_email(severity, title, message, station_id, pump_id) + + # SMS (for critical alerts only) + if severity == 'CRITICAL' and self.config.sms_enabled: + await self._send_sms(title, message) + + # SCADA HMI alarm + if self.config.scada_alarms_enabled: + await self._trigger_scada_alarm(severity, title, message, station_id, pump_id) + + # Webhook + if self.config.webhook_enabled: + await self._send_webhook(severity, title, message, station_id, pump_id) + + async def _send_email( + self, + severity: str, + title: str, + message: str, + station_id: Optional[str], + pump_id: Optional[str] + ): + """Send email alert via SMTP.""" + try: + email = EmailMessage() + email['From'] = self.config.email_from + email['To'] = ', '.join(self.config.email_recipients) + email['Subject'] = f"[{severity}] Calejo Control: {title}" + + body = f""" +Calejo Control Alert + +Severity: {severity} +Title: {title} +Station: {station_id or 'N/A'} +Pump: {pump_id or 'N/A'} +Timestamp: {datetime.now().isoformat()} + +Message: +{message} + +--- +This is an automated alert from Calejo Control. + """ + email.set_content(body) + + await aiosmtplib.send( + email, + hostname=self.config.smtp_host, + port=self.config.smtp_port, + username=self.config.smtp_username, + password=self.config.smtp_password, + use_tls=self.config.smtp_use_tls + ) + + logger.info("email_alert_sent", severity=severity, title=title) + except Exception as e: + logger.error("email_alert_failed", error=str(e)) + + async def _send_sms(self, title: str, message: str): + """Send SMS alert via Twilio.""" + try: + for phone_number in self.config.sms_recipients: + self.twilio_client.messages.create( + body=f"Calejo Control CRITICAL: {title}. {message[:100]}", + from_=self.config.twilio_phone_number, + to=phone_number + ) + + logger.info("sms_alert_sent", title=title) + except Exception as e: + logger.error("sms_alert_failed", error=str(e)) + + async def _trigger_scada_alarm( + self, + severity: str, + title: str, + message: str, + station_id: Optional[str], + pump_id: Optional[str] + ): + """Trigger alarm in SCADA HMI via OPC UA.""" + # Implementation depends on SCADA system + # Typically write to OPC UA alarm node + pass + + async def _send_webhook( + self, + severity: str, + title: str, + message: str, + station_id: Optional[str], + pump_id: Optional[str] + ): + """Send webhook alert via HTTP POST.""" + try: + payload = { + 'severity': severity, + 'title': title, + 'message': message, + 'station_id': station_id, + 'pump_id': pump_id, + 'timestamp': datetime.now().isoformat() + } + + async with httpx.AsyncClient() as client: + response = await client.post( + self.config.webhook_url, + json=payload, + headers={'Authorization': f'Bearer {self.config.webhook_token}'}, + timeout=10.0 + ) + response.raise_for_status() + + logger.info("webhook_alert_sent", severity=severity, title=title) + except Exception as e: + logger.error("webhook_alert_failed", error=str(e)) + +@dataclass +class AlertConfig: + """Alert configuration.""" + # Email + email_enabled: bool = True + email_from: str = "calejo-control@example.com" + email_recipients: List[str] = None + smtp_host: str = "smtp.gmail.com" + smtp_port: int = 587 + smtp_username: str = "" + smtp_password: str = "" + smtp_use_tls: bool = True + + # SMS + sms_enabled: bool = True + sms_recipients: List[str] = None + twilio_account_sid: str = "" + twilio_auth_token: str = "" + twilio_phone_number: str = "" + + # SCADA + scada_alarms_enabled: bool = True + + # Webhook + webhook_enabled: bool = True + webhook_url: str = "" + webhook_token: str = "" +4.4 Plan-to-Setpoint Logic Engine +File: src/core/setpoint_logic.py +(Implementation unchanged from v1.0 - see previous specification) +Integration with Safety Framework: +class SetpointManager: + """ + Manages setpoint calculation for all pumps. + + Integrates with safety framework to enforce limits and handle failsafe mode. + """ + + def __init__( + self, + discovery: AdapterAutoDiscovery, + db_client: DatabaseClient, + safety_enforcer: SafetyLimitEnforcer, + emergency_stop_manager: EmergencyStopManager, + watchdog: DatabaseWatchdog + ): + self.discovery = discovery + self.db_client = db_client + self.safety_enforcer = safety_enforcer + self.emergency_stop_manager = emergency_stop_manager + self.watchdog = watchdog + + # Create calculator instances + self.calculators = { + 'DIRECT_SPEED': DirectSpeedCalculator(), + 'LEVEL_CONTROLLED': LevelControlledCalculator(), + 'POWER_CONTROLLED': PowerControlledCalculator() + } + + def get_current_setpoint(self, station_id: str, pump_id: str) -> Optional[float]: + """ + Get current setpoint for a pump. + + Integrates safety checks: + 1. Check if emergency stop is active + 2. Check if failsafe mode is active + 3. Calculate setpoint from optimization plan + 4. Enforce safety limits + + Returns: + Setpoint in Hz, or None if no valid plan exists + """ + # Check emergency stop + if self.emergency_stop_manager.is_emergency_stop_active(station_id, pump_id): + logger.info( + "emergency_stop_active", + station_id=station_id, + pump_id=pump_id + ) + return self._get_default_setpoint(station_id, pump_id) + + # Check failsafe mode + if self.watchdog.failsafe_mode.get((station_id, pump_id), False): + logger.info( + "failsafe_mode_active", + station_id=station_id, + pump_id=pump_id + ) + return self._get_default_setpoint(station_id, pump_id) + + # Get pump info + pump_info = self.discovery.get_pump(station_id, pump_id) + if not pump_info: + logger.error("pump_not_found", station_id=station_id, pump_id=pump_id) + return None + + # Get current optimization plan + plan = self.db_client.get_current_plan(station_id, pump_id) + if not plan: + logger.warning("no_active_plan", station_id=station_id, pump_id=pump_id) + return self._get_default_setpoint(station_id, pump_id) + + # Get latest feedback (optional) + feedback = self.db_client.get_latest_feedback(station_id, pump_id) + + # Get appropriate calculator + calculator = self.calculators.get(pump_info.control_type) + if not calculator: + logger.error("unknown_control_type", control_type=pump_info.control_type) + return None + + # Calculate setpoint + setpoint = calculator.calculate_setpoint(plan, feedback, pump_info) + + # Enforce safety limits (LAST LINE OF DEFENSE) + enforced_setpoint, violations = self.safety_enforcer.enforce_setpoint( + station_id, + pump_id, + setpoint + ) + + return enforced_setpoint + + def _get_default_setpoint(self, station_id: str, pump_id: str) -> float: + """Get default safe setpoint from pump configuration.""" + query = """ + SELECT default_setpoint_hz + FROM pumps + WHERE station_id = %s AND pump_id = %s + """ + result = self.db_client.execute_query(query, (station_id, pump_id)) + + if result and result[0]['default_setpoint_hz']: + return float(result[0]['default_setpoint_hz']) + + # Fallback + return 35.0 +4.5 Multi-Protocol Server +(Implementation unchanged from v1.0 - see previous specification) +5. REST API Endpoints +5.1 Emergency Stop Endpoints +from fastapi import FastAPI, Depends, HTTPException, status +from pydantic import BaseModel + +app = FastAPI(title="Calejo Control API", version="2.0") + +class EmergencyStopRequest(BaseModel): + triggered_by: str + reason: str + station_id: Optional[str] = None + pump_id: Optional[str] = None + +class EmergencyStopClearRequest(BaseModel): + cleared_by: str + notes: str + +@app.post( + "/api/v1/emergency-stop", + summary="Trigger emergency stop", + description="Immediately halt optimization control and revert to safe defaults (BIG RED BUTTON)", + status_code=status.HTTP_201_CREATED, + dependencies=[Depends(verify_admin_permission)] +) +async def trigger_emergency_stop(request: EmergencyStopRequest): + """ + Trigger emergency stop ("big red button"). + + Requires admin permission. + + Scope: + - If station_id and pump_id provided: Stop single pump + - If station_id only: Stop all pumps at station + - If neither: Stop ALL pumps system-wide + """ + event_id = await emergency_stop_manager.trigger_emergency_stop( + triggered_by=request.triggered_by, + reason=request.reason, + station_id=request.station_id, + pump_id=request.pump_id + ) + + return { + "status": "emergency_stop_triggered", + "event_id": event_id, + "scope": emergency_stop_manager._get_scope_description( + request.station_id, + request.pump_id + ), + "timestamp": datetime.now().isoformat() + } + +@app.post( + "/api/v1/emergency-stop/{event_id}/clear", + summary="Clear emergency stop", + description="Resume normal operation after emergency stop", + dependencies=[Depends(verify_admin_permission)] +) +async def clear_emergency_stop(event_id: int, request: EmergencyStopClearRequest): + """Clear emergency stop and resume normal operation.""" + await emergency_stop_manager.clear_emergency_stop( + event_id=event_id, + cleared_by=request.cleared_by, + notes=request.notes + ) + + return { + "status": "emergency_stop_cleared", + "event_id": event_id, + "timestamp": datetime.now().isoformat() + } + +@app.get( + "/api/v1/emergency-stop/status", + summary="Get emergency stop status", + description="Check if any emergency stops are active" +) +async def get_emergency_stop_status(): + """Get current emergency stop status.""" + query = """ + SELECT event_id, station_id, pump_id, triggered_by, reason, timestamp + FROM emergency_stop_events + WHERE cleared_at IS NULL + ORDER BY timestamp DESC + """ + active_stops = db_client.execute_query(query) + + return { + "active_emergency_stops": len(active_stops), + "events": active_stops + } +5.2 Safety Status Endpoints +@app.get( + "/api/v1/safety/status", + summary="Get safety status", + description="Get current safety status for all pumps" +) +async def get_safety_status(): + """Get safety status dashboard data.""" + # Get active violations + violations_query = """ + SELECT station_id, pump_id, COUNT(*) as violation_count + FROM safety_limit_violations + WHERE timestamp > NOW() - INTERVAL '1 hour' + GROUP BY station_id, pump_id + """ + violations = db_client.execute_query(violations_query) + + # Get failsafe mode pumps + failsafe_query = """ + SELECT station_id, pump_id, event_type, timestamp + FROM failsafe_events + WHERE cleared_at IS NULL + """ + failsafe_pumps = db_client.execute_query(failsafe_query) + + # Get emergency stops + emergency_stops_query = """ + SELECT event_id, station_id, pump_id, triggered_by, reason, timestamp + FROM emergency_stop_events + WHERE cleared_at IS NULL + """ + emergency_stops = db_client.execute_query(emergency_stops_query) + + return { + "timestamp": datetime.now().isoformat(), + "safety_violations_last_hour": len(violations), + "pumps_in_failsafe_mode": len(failsafe_pumps), + "active_emergency_stops": len(emergency_stops), + "violations": violations, + "failsafe_pumps": failsafe_pumps, + "emergency_stops": emergency_stops + } + +@app.get( + "/api/v1/safety/violations", + summary="Get safety violations", + description="Get recent safety limit violations" +) +async def get_safety_violations(hours: int = 24, limit: int = 100): + """Get recent safety limit violations.""" + query = """ + SELECT station_id, pump_id, requested_setpoint, enforced_setpoint, + violations, timestamp + FROM safety_limit_violations + WHERE timestamp > NOW() - INTERVAL '%s hours' + ORDER BY timestamp DESC + LIMIT %s + """ + violations = db_client.execute_query(query, (hours, limit)) + + return { + "violations": violations, + "count": len(violations) + } +6. Configuration Management +6.1 Environment Variables +File: .env +# Database connection +DB_HOST=calejo-optimize +DB_PORT=5432 +DB_NAME=calejo +DB_USER=control_reader +DB_PASSWORD=secure_password + +# Station filter (optional) +STATION_FILTER= + +# Security +API_KEY=your_api_key_here +TLS_ENABLED=true +TLS_CERT_PATH=/etc/calejo/ssl/cert.pem +TLS_KEY_PATH=/etc/calejo/ssl/key.pem + +# OPC UA +OPCUA_ENABLED=true +OPCUA_PORT=4840 +OPCUA_SECURITY_MODE=SignAndEncrypt +OPCUA_CERT_PATH=/etc/calejo/opcua/cert.der +OPCUA_KEY_PATH=/etc/calejo/opcua/key.pem + +# Modbus TCP +MODBUS_ENABLED=true +MODBUS_PORT=502 +MODBUS_SLAVE_ID=1 + +# REST API +REST_API_ENABLED=true +REST_API_PORT=8080 +REST_API_CORS_ENABLED=true + +# Safety - Watchdog +WATCHDOG_ENABLED=true +WATCHDOG_TIMEOUT_SECONDS=1200 # 20 minutes +WATCHDOG_CHECK_INTERVAL_SECONDS=60 # Check every minute + +# Alerts - Email +ALERT_EMAIL_ENABLED=true +ALERT_EMAIL_FROM=calejo-control@example.com +ALERT_EMAIL_RECIPIENTS=operator1@utility.it,operator2@utility.it +SMTP_HOST=smtp.gmail.com +SMTP_PORT=587 +SMTP_USERNAME=calejo-control@example.com +SMTP_PASSWORD=smtp_password +SMTP_USE_TLS=true + +# Alerts - SMS +ALERT_SMS_ENABLED=true +ALERT_SMS_RECIPIENTS=+393401234567,+393407654321 +TWILIO_ACCOUNT_SID=your_twilio_account_sid +TWILIO_AUTH_TOKEN=your_twilio_auth_token +TWILIO_PHONE_NUMBER=+15551234567 + +# Alerts - Webhook +ALERT_WEBHOOK_ENABLED=true +ALERT_WEBHOOK_URL=https://utility-monitoring.example.com/webhook +ALERT_WEBHOOK_TOKEN=webhook_bearer_token + +# Alerts - SCADA +ALERT_SCADA_ENABLED=true + +# Logging +LOG_LEVEL=INFO +LOG_FORMAT=json +AUDIT_LOG_ENABLED=true + +# Compliance +COMPLIANCE_STANDARDS=IEC_62443,ISO_27001,NIS2 +7. Implementation Priorities (Updated) +Phase 1: Core Foundation (Week 1-2) +Project setup +Database client +Auto-discovery +Configuration management +Phase 2: Safety Framework (Week 2-3) ⚠️ NEW +Safety limit enforcer +Database watchdog (20-minute timeout) +Emergency stop manager +Alert manager (email, SMS, webhook) +Safety database tables +Audit logging +Phase 3: Setpoint Logic (Week 3-4) +Implement setpoint calculators +Implement SetpointManager with safety integration +Unit tests +Phase 4: Security Layer (Week 4-5) +Authentication +Authorization +TLS/SSL +Compliance logging (IEC 62443, ISO 27001, NIS2) +Phase 5: Protocol Servers (Week 5-7) +OPC UA Server +Modbus TCP Server +REST API (including emergency stop endpoints) +Phase 6: Integration and Testing (Week 7-8) +Integration testing +Docker containerization +Documentation +Phase 7: Production Hardening (Week 8-9) +Error handling +Monitoring +Security hardening +8. Success Criteria (Updated) +The implementation is considered complete when: +✅ Auto-discovery successfully discovers all stations and pumps +✅ Safety limit enforcer clamps all setpoints within hard limits +✅ Database watchdog detects 20-minute timeout and triggers failsafe +✅ Emergency stop API works for single pump and system-wide +✅ Multi-channel alerts (email, SMS, webhook) are sent for critical events +✅ All three control types calculate correct setpoints +✅ OPC UA server exposes hierarchical structure +✅ Modbus TCP server responds correctly +✅ REST API provides all endpoints with OpenAPI docs +✅ Security layer enforces authentication and authorization +✅ Audit logging records all safety events (immutable) +✅ Compliance requirements met (IEC 62443, ISO 27001, NIS2) +✅ Container deploys successfully +✅ Integration tests pass with real SCADA systems +✅ Performance meets requirements +9. Compliance Documentation +9.1 IEC 62443 (Industrial Cybersecurity) +Implemented Controls: +Requirement + +Implementation +SR 1.1 - Human user identification + +API key authentication, user tracking in audit log +SR 1.2 - Software process identification + +Process-level logging, component identification +SR 1.3 - Device identification + +IP address logging, geofencing +SR 1.5 - Authenticator management + +Secure API key storage, certificate management +SR 2.1 - Authorization enforcement + +Role-based access control (read, write, admin) +SR 3.1 - Communication integrity + +TLS/SSL encryption, OPC UA SignAndEncrypt +SR 3.3 - Security functionality verification + +Health checks, integrity monitoring +SR 4.1 - Information confidentiality + +Encryption at rest and in transit +SR 5.1 - Network segmentation + +DMZ deployment, firewall rules +SR 6.1 - Audit log accessibility + +Immutable audit log, query API +SR 7.1 - Denial of service protection + +Rate limiting, resource monitoring +9.2 ISO 27001 (Information Security) +Implemented Controls: +Control + +Implementation +A.9 - Access control + +Authentication, authorization, RBAC +A.10 - Cryptography + +TLS/SSL, certificate-based auth +A.12 - Operations security + +Logging, monitoring, change management +A.16 - Incident management + +Alert system, emergency stop, audit trail +A.18 - Compliance + +Audit logging, compliance reporting +9.3 NIS2 Directive (EU Critical Infrastructure) +Implemented Measures: +Requirement + +Implementation +Risk management + +Multi-layer safety limits, failsafe mechanisms +Incident handling + +Emergency stop, alert system, audit trail +Business continuity + +Failover to default setpoints, caching +Supply chain security + +Dependency management, security updates +Security monitoring + +Real-time monitoring, anomaly detection +Incident reporting + +Audit log, compliance reports +10. Additional Notes +10.1 Safety Philosophy +The implementation follows the "Defense in Depth" principle with multiple independent safety layers: +Physical Layer: PLC/VFD hard limits (15-55 Hz) +Adapter Layer: Safety limit enforcer (20-50 Hz) +Optimization Layer: Optimization constraints (25-45 Hz) +Watchdog Layer: Database timeout detection (20 minutes) +Emergency Layer: Big red button (immediate halt) +Fail-Safe Design: On any failure, the system reverts to safe defaults (existing pump configuration). +10.2 Utility Sales Benefits +Key selling points for Italian utilities: +✅ "Cannot damage equipment" - Three independent safety layers +✅ "Fails safe" - Automatic reversion to safe defaults +✅ "Complete audit trail" - Meets regulatory requirements +✅ "Operators have final control" - Emergency stop always available +✅ "Proven security" - Follows IEC 62443, ISO 27001, NIS2 +✅ "Immediate alerts" - Email, SMS, SCADA, webhooks +END OF SPECIFICATION v2.0 +This specification provides complete requirements for implementing Calejo Control adapter with comprehensive safety and security framework. Begin with Phase 1 and proceed sequentially. + diff --git a/package.json b/package.json index a3292c7..e2b3044 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,17 @@ { - "name": "project", - "version": "1.0.0", - "description": "Project implementation based on specification", - "main": "index.js", + "name": "calejo-control-adapter", + "version": "2.0.0", + "description": "Multi-protocol integration adapter for municipal wastewater pump stations with comprehensive safety and security framework", + "main": "src/main.py", "scripts": { - "dev": "echo 'Development server not yet configured'", - "build": "echo 'Build process not yet configured'", - "test": "echo 'Tests not yet configured'" + "dev": "python -m src.main", + "test": "pytest tests/", + "lint": "flake8 src/ tests/", + "type-check": "mypy src/", + "docker-build": "docker build -t calejo-control-adapter .", + "docker-run": "docker run -p 8080:8080 calejo-control-adapter" }, - "keywords": [], - "author": "", + "keywords": ["scada", "opc-ua", "modbus", "wastewater", "pump-control", "safety", "security", "industrial-automation"], + "author": "Calejo Control Team", "license": "MIT" } \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..906784a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,30 @@ +# Core dependencies +asyncua==1.0.6 +pymodbus==3.5.4 +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +psycopg2-binary==2.9.9 +pydantic==2.5.0 +pydantic-settings==2.1.0 +cryptography==41.0.7 +PyJWT==2.8.0 +structlog==23.2.0 +python-dotenv==1.0.0 + +# Optional dependencies +redis==5.0.1 # For distributed caching +prometheus-client==0.19.0 # For metrics +aiohttp==3.9.1 # For async HTTP requests +aiosmtplib==3.0.1 # For email alerts +twilio==8.10.0 # For SMS alerts +httpx==0.25.0 # For webhook alerts + +# Development dependencies +pytest==7.4.3 +pytest-asyncio==0.21.1 +pytest-cov==4.1.0 +flake8==6.1.0 +mypy==1.7.1 +black==23.11.0 +isort==5.13.2 +pre-commit==3.5.0 \ No newline at end of file diff --git a/specification.html b/specification.html deleted file mode 100644 index a9cb7df..0000000 --- a/specification.html +++ /dev/null @@ -1,120 +0,0 @@ -