From 097574733ec35609d25e776a133e71946e1628cf Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 26 Oct 2025 19:04:51 +0000 Subject: [PATCH] Complete Phase 1: Core Infrastructure & Database Setup - Created comprehensive database schema with all required tables - Implemented database client with connection pooling and async support - Enhanced auto-discovery module with periodic refresh and validation - Completed configuration management with comprehensive settings - Set up structured logging with JSON formatting and audit trail - Created test suite for Phase 1 functionality - Added main_phase1.py for Phase 1 demonstration Key Features: - PostgreSQL schema with 9 tables including audit_log, emergency_stop_events, etc. - Connection pooling with health monitoring - Auto-discovery with station and pump validation - Configuration validation and sensitive field masking - Structured logging with correlation IDs - Comprehensive test suite --- config/settings.py | 159 +++++++++++++++++++--- database/schema.sql | 257 ++++++++++++++++++++++++++++++++++++ database/setup_database.sh | 94 +++++++++++++ database/test_data.sql | 142 ++++++++++++++++++++ src/core/auto_discovery.py | 107 +++++++++++++-- src/core/logging.py | 136 +++++++++++++++++++ src/database/client.py | 107 +++++++++++---- src/main_phase1.py | 207 +++++++++++++++++++++++++++++ tests/test_phase1.py | 261 +++++++++++++++++++++++++++++++++++++ 9 files changed, 1417 insertions(+), 53 deletions(-) create mode 100644 database/schema.sql create mode 100755 database/setup_database.sh create mode 100644 database/test_data.sql create mode 100644 src/core/logging.py create mode 100644 src/main_phase1.py create mode 100755 tests/test_phase1.py diff --git a/config/settings.py b/config/settings.py index d3fb84c..b0e46f2 100644 --- a/config/settings.py +++ b/config/settings.py @@ -1,41 +1,160 @@ """ -Application settings for Calejo Control Adapter. +Configuration management for Calejo Control Adapter. """ from pydantic_settings import BaseSettings -from typing import Optional +from typing import Optional, List +from pydantic import validator +import os class Settings(BaseSettings): """Application settings loaded from environment variables.""" - # Database - database_url: str = "postgresql://control_reader:secure_password@localhost:5432/calejo" + # Database configuration + db_host: str = "localhost" + db_port: int = 5432 + db_name: str = "calejo" + db_user: str = "control_reader" + db_password: str = "secure_password" + db_min_connections: int = 2 + db_max_connections: int = 10 - # Protocol endpoints - opc_ua_endpoint: str = "opc.tcp://0.0.0.0:4840" + # Station filter (optional) + station_filter: Optional[str] = None + + # Security + api_key: str = "your_api_key_here" + tls_enabled: bool = True + tls_cert_path: Optional[str] = None + tls_key_path: Optional[str] = None + + # OPC UA + opcua_enabled: bool = True + opcua_port: int = 4840 + opcua_security_mode: str = "SignAndEncrypt" + opcua_cert_path: Optional[str] = None + opcua_key_path: Optional[str] = None + + # Modbus TCP + modbus_enabled: bool = True modbus_port: int = 502 + modbus_slave_id: int = 1 + + # REST API + rest_api_enabled: bool = True rest_api_port: int = 8080 + rest_api_cors_enabled: bool = True - # Safety settings - safety_timeout_seconds: int = 1200 # 20 minutes + # Safety - Watchdog + watchdog_enabled: bool = True + watchdog_timeout_seconds: int = 1200 # 20 minutes + watchdog_check_interval_seconds: int = 60 # Check every minute - # Security settings - jwt_secret_key: str = "your-secret-key-change-in-production" - jwt_algorithm: str = "HS256" + # Auto-Discovery + auto_discovery_enabled: bool = True + auto_discovery_refresh_minutes: int = 60 - # Alert settings - smtp_server: Optional[str] = None - smtp_port: Optional[int] = 587 - smtp_username: Optional[str] = None - smtp_password: Optional[str] = None - twilio_account_sid: Optional[str] = None - twilio_auth_token: Optional[str] = None - twilio_phone_number: Optional[str] = None + # Alerts - Email + alert_email_enabled: bool = True + alert_email_from: str = "calejo-control@example.com" + alert_email_recipients: List[str] = ["operator1@utility.it", "operator2@utility.it"] + smtp_host: str = "smtp.gmail.com" + smtp_port: int = 587 + smtp_username: str = "calejo-control@example.com" + smtp_password: str = "smtp_password" + smtp_use_tls: bool = True + + # Alerts - SMS + alert_sms_enabled: bool = True + alert_sms_recipients: List[str] = ["+393401234567", "+393407654321"] + twilio_account_sid: str = "your_twilio_account_sid" + twilio_auth_token: str = "your_twilio_auth_token" + twilio_phone_number: str = "+15551234567" + + # Alerts - Webhook + alert_webhook_enabled: bool = True + alert_webhook_url: str = "https://utility-monitoring.example.com/webhook" + alert_webhook_token: str = "webhook_bearer_token" + + # Alerts - SCADA + alert_scada_enabled: bool = True # Logging log_level: str = "INFO" + log_format: str = "json" + audit_log_enabled: bool = True + + # Application + app_name: str = "Calejo Control Adapter" + app_version: str = "2.0.0" + environment: str = "development" + + @property + def database_url(self) -> str: + """Generate database URL from components.""" + return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}" + + @validator('db_port') + def validate_db_port(cls, v): + if not 1 <= v <= 65535: + raise ValueError('Database port must be between 1 and 65535') + return v + + @validator('opcua_port') + def validate_opcua_port(cls, v): + if not 1 <= v <= 65535: + raise ValueError('OPC UA port must be between 1 and 65535') + return v + + @validator('modbus_port') + def validate_modbus_port(cls, v): + if not 1 <= v <= 65535: + raise ValueError('Modbus port must be between 1 and 65535') + return v + + @validator('rest_api_port') + def validate_rest_api_port(cls, v): + if not 1 <= v <= 65535: + raise ValueError('REST API port must be between 1 and 65535') + return v + + @validator('log_level') + def validate_log_level(cls, v): + valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] + if v.upper() not in valid_levels: + raise ValueError(f'Log level must be one of: {valid_levels}') + return v.upper() + + @validator('alert_email_recipients', 'alert_sms_recipients', pre=True) + def parse_recipients(cls, v): + if isinstance(v, str): + return [recipient.strip() for recipient in v.split(',')] + return v + + def get_sensitive_fields(self) -> List[str]: + """Get list of sensitive fields for logging/masking.""" + return [ + 'db_password', + 'api_key', + 'smtp_password', + 'twilio_auth_token', + 'alert_webhook_token' + ] + + def get_safe_dict(self) -> dict: + """Get settings dictionary with sensitive fields masked.""" + settings_dict = self.dict() + for field in self.get_sensitive_fields(): + if field in settings_dict and settings_dict[field]: + settings_dict[field] = '***MASKED***' + return settings_dict class Config: env_file = ".env" - env_file_encoding = "utf-8" \ No newline at end of file + env_file_encoding = "utf-8" + case_sensitive = False + + +# Global settings instance +settings = Settings() \ No newline at end of file diff --git a/database/schema.sql b/database/schema.sql new file mode 100644 index 0000000..30a12d0 --- /dev/null +++ b/database/schema.sql @@ -0,0 +1,257 @@ +-- Calejo Control Adapter Database Schema +-- Version: 2.0 +-- Date: October 26, 2025 + +-- Drop existing tables if they exist (for clean setup) +DROP TABLE IF EXISTS audit_log CASCADE; +DROP TABLE IF EXISTS emergency_stop_events CASCADE; +DROP TABLE IF EXISTS failsafe_events CASCADE; +DROP TABLE IF EXISTS safety_limit_violations CASCADE; +DROP TABLE IF EXISTS pump_safety_limits CASCADE; +DROP TABLE IF EXISTS pump_feedback CASCADE; +DROP TABLE IF EXISTS pump_plans CASCADE; +DROP TABLE IF EXISTS pumps CASCADE; +DROP TABLE IF EXISTS pump_stations CASCADE; + +-- Create pump_stations table +CREATE TABLE pump_stations ( + station_id VARCHAR(50) PRIMARY KEY, + station_name VARCHAR(200) NOT NULL, + location VARCHAR(200), + latitude DECIMAL(10, 8), + longitude DECIMAL(11, 8), + timezone VARCHAR(50) DEFAULT 'Europe/Rome', + active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); + +COMMENT ON TABLE pump_stations IS 'Metadata about pump stations'; +COMMENT ON COLUMN pump_stations.timezone IS 'Timezone for the pump station (default: Europe/Rome for Italian utilities)'; + +-- Create pumps table +CREATE TABLE pumps ( + pump_id VARCHAR(50) NOT NULL, + station_id VARCHAR(50) NOT NULL, + pump_name VARCHAR(200), + pump_type VARCHAR(50), -- 'SUBMERSIBLE', 'CENTRIFUGAL', etc. + control_type VARCHAR(50) NOT NULL, -- 'LEVEL_CONTROLLED', 'POWER_CONTROLLED', 'DIRECT_SPEED' + manufacturer VARCHAR(100), + model VARCHAR(100), + rated_power_kw DECIMAL(10, 2), + min_speed_hz DECIMAL(5, 2) DEFAULT 20.0, + max_speed_hz DECIMAL(5, 2) DEFAULT 50.0, + + -- Default setpoint (used in failsafe mode) + default_setpoint_hz DECIMAL(5, 2) NOT NULL DEFAULT 35.0, + + -- Control-specific parameters (JSON) + control_parameters JSONB, + + active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW(), + + PRIMARY KEY (station_id, pump_id), + FOREIGN KEY (station_id) REFERENCES pump_stations(station_id) +); + +COMMENT ON TABLE pumps IS 'Metadata about individual pumps'; +COMMENT ON COLUMN pumps.default_setpoint_hz IS 'Default safe setpoint used in failsafe mode (existing pump configuration)'; +COMMENT ON COLUMN pumps.control_parameters IS 'Control-specific parameters in JSON format (PID gains, thresholds, etc.)'; + +-- Create pump_plans table +CREATE TABLE pump_plans ( + plan_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + interval_start TIMESTAMP NOT NULL, + interval_end TIMESTAMP NOT NULL, + + -- Optimization outputs + target_flow_m3h DECIMAL(10, 2), + target_power_kw DECIMAL(10, 2), + target_level_m DECIMAL(5, 2), + suggested_speed_hz DECIMAL(5, 2), + + -- Metadata + plan_created_at TIMESTAMP DEFAULT NOW(), + optimization_run_id INTEGER, + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +COMMENT ON TABLE pump_plans IS 'Optimization plans generated by Calejo Optimize'; + +-- Create index for active pump plans +CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end); + +-- Create pump_feedback table +CREATE TABLE pump_feedback ( + feedback_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + timestamp TIMESTAMP NOT NULL DEFAULT NOW(), + + -- Actual measurements + actual_speed_hz DECIMAL(5, 2), + actual_power_kw DECIMAL(10, 2), + actual_flow_m3h DECIMAL(10, 2), + wet_well_level_m DECIMAL(5, 2), + pump_running BOOLEAN, + + -- Status + alarm_active BOOLEAN DEFAULT FALSE, + alarm_code VARCHAR(50), + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +COMMENT ON TABLE pump_feedback IS 'Real-time feedback from pumps'; + +-- Create index for latest feedback +CREATE INDEX idx_pump_feedback_latest ON pump_feedback(station_id, pump_id, timestamp DESC); + +-- Create pump_safety_limits table +CREATE TABLE pump_safety_limits ( + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + + -- Speed limits (Layer 2: Station Safety Limits) + hard_min_speed_hz DECIMAL(5, 2) NOT NULL, + hard_max_speed_hz DECIMAL(5, 2) NOT NULL, + + -- Level limits + hard_min_level_m DECIMAL(5, 2), + hard_max_level_m DECIMAL(5, 2), + emergency_stop_level_m DECIMAL(5, 2), -- Emergency stop if exceeded + dry_run_protection_level_m DECIMAL(5, 2), -- Stop pump if level too low + + -- Power and flow limits + hard_max_power_kw DECIMAL(10, 2), + hard_max_flow_m3h DECIMAL(10, 2), + + -- Operational limits + max_starts_per_hour INTEGER DEFAULT 6, + min_run_time_seconds INTEGER DEFAULT 300, + max_continuous_run_hours INTEGER DEFAULT 24, + + -- Rate of change limits (prevent sudden changes that damage equipment) + max_speed_change_hz_per_min DECIMAL(5, 2) DEFAULT 5.0, + + -- Metadata + set_by VARCHAR(100), + set_at TIMESTAMP DEFAULT NOW(), + approved_by VARCHAR(100), -- Dual approval + approved_at TIMESTAMP, + notes TEXT, + + PRIMARY KEY (station_id, pump_id), + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id), + + -- Constraints + CONSTRAINT check_speed_limits CHECK (hard_min_speed_hz >= 15 AND hard_max_speed_hz <= 55), + CONSTRAINT check_min_max CHECK (hard_min_speed_hz < hard_max_speed_hz), + CONSTRAINT check_approved CHECK (approved_by IS NULL OR approved_by != set_by) -- Dual approval +); + +COMMENT ON TABLE pump_safety_limits IS 'Hard operational limits enforced by Calejo Control adapter (Layer 2)'; +COMMENT ON COLUMN pump_safety_limits.hard_min_speed_hz IS 'Minimum speed enforced by adapter (must be >= PLC physical limit)'; +COMMENT ON COLUMN pump_safety_limits.hard_max_speed_hz IS 'Maximum speed enforced by adapter (must be <= PLC physical limit)'; + +-- Create safety_limit_violations table +CREATE TABLE safety_limit_violations ( + violation_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + requested_setpoint DECIMAL(5, 2), + enforced_setpoint DECIMAL(5, 2), + violations TEXT[], -- Array of violation descriptions + timestamp TIMESTAMP DEFAULT NOW(), + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +COMMENT ON TABLE safety_limit_violations IS 'Audit trail of safety limit violations (immutable)'; + +-- Create index for violation timestamp +CREATE INDEX idx_violations_timestamp ON safety_limit_violations(timestamp DESC); + +-- Create failsafe_events table +CREATE TABLE failsafe_events ( + event_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + event_type VARCHAR(50) NOT NULL, -- 'DATABASE_TIMEOUT', 'COMMUNICATION_LOSS', 'INVALID_DATA' + default_setpoint DECIMAL(5, 2), + triggered_by VARCHAR(100), + timestamp TIMESTAMP DEFAULT NOW(), + cleared_at TIMESTAMP, + notes TEXT, + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +COMMENT ON TABLE failsafe_events IS 'Record of failsafe mode activations'; + +-- Create index for active failsafe events +CREATE INDEX idx_failsafe_events_active ON failsafe_events(station_id, pump_id, timestamp DESC) + WHERE cleared_at IS NULL; + +-- Create emergency_stop_events table +CREATE TABLE emergency_stop_events ( + event_id SERIAL PRIMARY KEY, + station_id VARCHAR(50), -- NULL = all stations + pump_id VARCHAR(50), -- NULL = all pumps at station + triggered_by VARCHAR(100) NOT NULL, + reason TEXT NOT NULL, + timestamp TIMESTAMP DEFAULT NOW(), + cleared_at TIMESTAMP, + cleared_by VARCHAR(100), + clear_notes TEXT, + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) +); + +COMMENT ON TABLE emergency_stop_events IS 'Emergency stop event log (big red button)'; + +-- Create index for active emergency stops +CREATE INDEX idx_emergency_stop_active ON emergency_stop_events(timestamp DESC) + WHERE cleared_at IS NULL; + +-- Create audit_log table +CREATE TABLE audit_log ( + log_id BIGSERIAL PRIMARY KEY, + timestamp TIMESTAMP DEFAULT NOW(), + event_type VARCHAR(50) NOT NULL, + severity VARCHAR(20) NOT NULL, -- 'INFO', 'WARNING', 'CRITICAL' + station_id VARCHAR(50), + pump_id VARCHAR(50), + user_id VARCHAR(100), + ip_address INET, + protocol VARCHAR(20), -- 'OPC_UA', 'MODBUS', 'REST_API' + action VARCHAR(100), + resource VARCHAR(200), + result VARCHAR(20), -- 'SUCCESS', 'FAILURE', 'DENIED' + event_data JSONB +); + +COMMENT ON TABLE audit_log IS 'Immutable audit trail for compliance (IEC 62443, ISO 27001, NIS2 Directive)'; + +-- Create indexes for audit log +CREATE INDEX idx_audit_log_timestamp ON audit_log(timestamp DESC); +CREATE INDEX idx_audit_log_severity ON audit_log(severity, timestamp DESC); +CREATE INDEX idx_audit_log_user ON audit_log(user_id, timestamp DESC); + +-- Make audit log immutable (append-only) +CREATE RULE audit_log_no_update AS ON UPDATE TO audit_log DO INSTEAD NOTHING; +CREATE RULE audit_log_no_delete AS ON DELETE TO audit_log DO INSTEAD NOTHING; + +-- Create database user for adapter +-- Note: This should be executed by database administrator +-- CREATE USER control_reader WITH PASSWORD 'secure_password'; +-- GRANT CONNECT ON DATABASE calejo TO control_reader; +-- GRANT USAGE ON SCHEMA public TO control_reader; +-- GRANT SELECT ON pump_stations, pumps, pump_plans, pump_feedback, pump_safety_limits TO control_reader; +-- GRANT INSERT ON safety_limit_violations, failsafe_events, emergency_stop_events, audit_log TO control_reader; +-- GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO control_reader; \ No newline at end of file diff --git a/database/setup_database.sh b/database/setup_database.sh new file mode 100755 index 0000000..2bedf47 --- /dev/null +++ b/database/setup_database.sh @@ -0,0 +1,94 @@ +#!/bin/bash + +# Calejo Control Adapter Database Setup Script +# This script sets up the PostgreSQL database for the Calejo Control Adapter + +set -e + +# Configuration +DB_NAME="calejo" +DB_USER="control_reader" +DB_PASSWORD="secure_password" +DB_HOST="localhost" +DB_PORT="5432" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +echo -e "${GREEN}Calejo Control Adapter Database Setup${NC}" +echo "==========================================" + +# Check if PostgreSQL is running +if ! pg_isready -h $DB_HOST -p $DB_PORT >/dev/null 2>&1; then + echo -e "${RED}Error: PostgreSQL is not running on $DB_HOST:$DB_PORT${NC}" + echo "Please start PostgreSQL and ensure it's accessible" + exit 1 +fi + +# Check if database exists +if psql -h $DB_HOST -p $DB_PORT -U postgres -lqt | cut -d \| -f 1 | grep -qw "$DB_NAME"; then + echo -e "${YELLOW}Database '$DB_NAME' already exists${NC}" + read -p "Do you want to recreate it? (y/N): " -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + echo "Dropping existing database..." + psql -h $DB_HOST -p $DB_PORT -U postgres -c "DROP DATABASE IF EXISTS $DB_NAME;" + else + echo "Using existing database" + fi +fi + +# Create database if it doesn't exist +if ! psql -h $DB_HOST -p $DB_PORT -U postgres -lqt | cut -d \| -f 1 | grep -qw "$DB_NAME"; then + echo "Creating database '$DB_NAME'..." + psql -h $DB_HOST -p $DB_PORT -U postgres -c "CREATE DATABASE $DB_NAME;" +fi + +# Create schema +echo "Creating database schema..." +psql -h $DB_HOST -p $DB_PORT -U postgres -d $DB_NAME -f database/schema.sql + +# Create database user (if it doesn't exist) +if ! psql -h $DB_HOST -p $DB_PORT -U postgres -t -c "\du" | grep -qw "$DB_USER"; then + echo "Creating database user '$DB_USER'..." + psql -h $DB_HOST -p $DB_PORT -U postgres -c "CREATE USER $DB_USER WITH PASSWORD '$DB_PASSWORD';" +fi + +# Grant permissions +echo "Granting permissions to user '$DB_USER'..." +psql -h $DB_HOST -p $DB_PORT -U postgres -d $DB_NAME -c " +GRANT CONNECT ON DATABASE $DB_NAME TO $DB_USER; +GRANT USAGE ON SCHEMA public TO $DB_USER; +GRANT SELECT ON pump_stations, pumps, pump_plans, pump_feedback, pump_safety_limits TO $DB_USER; +GRANT INSERT ON safety_limit_violations, failsafe_events, emergency_stop_events, audit_log TO $DB_USER; +GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO $DB_USER; +" + +# Insert test data (optional) +read -p "Do you want to insert test data? (y/N): " -n 1 -r +echo +if [[ $REPLY =~ ^[Yy]$ ]]; then + echo "Inserting test data..." + psql -h $DB_HOST -p $DB_PORT -U postgres -d $DB_NAME -f database/test_data.sql +fi + +# Test connection with the new user +echo "Testing connection with user '$DB_USER'..." +if psql -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME -c "SELECT 'Connection successful' AS result;" >/dev/null 2>&1; then + echo -e "${GREEN}Database setup completed successfully!${NC}" + echo "" + echo "Database Information:" + echo " Name: $DB_NAME" + echo " User: $DB_USER" + echo " Host: $DB_HOST" + echo " Port: $DB_PORT" + echo "" + echo "Connection string:" + echo " postgresql://$DB_USER:$DB_PASSWORD@$DB_HOST:$DB_PORT/$DB_NAME" +else + echo -e "${RED}Error: Failed to connect with user '$DB_USER'${NC}" + exit 1 +fi \ No newline at end of file diff --git a/database/test_data.sql b/database/test_data.sql new file mode 100644 index 0000000..6b7f547 --- /dev/null +++ b/database/test_data.sql @@ -0,0 +1,142 @@ +-- Calejo Control Adapter Test Data +-- Version: 2.0 +-- Date: October 26, 2025 + +-- Insert test pump stations +INSERT INTO pump_stations (station_id, station_name, location, latitude, longitude, timezone) VALUES +('STATION_001', 'North Wastewater Treatment Plant', 'Via Roma 123, Milano', 45.4642035, 9.189982, 'Europe/Rome'), +('STATION_002', 'South Pumping Station', 'Corso Italia 456, Milano', 45.448759, 9.163420, 'Europe/Rome'), +('STATION_003', 'Industrial Zone Station', 'Via Industria 789, Milano', 45.485649, 9.204041, 'Europe/Rome'); + +-- Insert test pumps +INSERT INTO pumps ( + station_id, pump_id, pump_name, pump_type, control_type, + manufacturer, model, rated_power_kw, min_speed_hz, max_speed_hz, + default_setpoint_hz, control_parameters +) VALUES +-- Station 001 - Level controlled pumps +('STATION_001', 'PUMP_001', 'Main Pump 1', 'SUBMERSIBLE', 'LEVEL_CONTROLLED', + 'KSB', 'Amarex N 200-400', 75.0, 20.0, 50.0, 35.0, + '{"pid_gains": {"kp": 2.5, "ki": 0.1, "kd": 0.5}, "level_setpoint": 2.5, "deadband": 0.2}'::JSONB), + +('STATION_001', 'PUMP_002', 'Main Pump 2', 'SUBMERSIBLE', 'LEVEL_CONTROLLED', + 'KSB', 'Amarex N 200-400', 75.0, 20.0, 50.0, 35.0, + '{"pid_gains": {"kp": 2.5, "ki": 0.1, "kd": 0.5}, "level_setpoint": 2.5, "deadband": 0.2}'::JSONB), + +-- Station 002 - Power controlled pumps +('STATION_002', 'PUMP_001', 'Booster Pump 1', 'CENTRIFUGAL', 'POWER_CONTROLLED', + 'Grundfos', 'CR 45-3-2', 45.0, 25.0, 45.0, 30.0, + '{"power_setpoint": 35.0, "efficiency_target": 0.85, "ramp_rate": 2.0}'::JSONB), + +('STATION_002', 'PUMP_002', 'Booster Pump 2', 'CENTRIFUGAL', 'POWER_CONTROLLED', + 'Grundfos', 'CR 45-3-2', 45.0, 25.0, 45.0, 30.0, + '{"power_setpoint": 35.0, "efficiency_target": 0.85, "ramp_rate": 2.0}'::JSONB), + +-- Station 003 - Direct speed controlled pumps +('STATION_003', 'PUMP_001', 'Process Pump 1', 'SUBMERSIBLE', 'DIRECT_SPEED', + 'Flygt', 'N-Pump 3085', 55.0, 20.0, 50.0, 40.0, + '{"speed_ramp_rate": 5.0}'::JSONB), + +('STATION_003', 'PUMP_002', 'Process Pump 2', 'SUBMERSIBLE', 'DIRECT_SPEED', + 'Flygt', 'N-Pump 3085', 55.0, 20.0, 50.0, 40.0, + '{"speed_ramp_rate": 5.0}'::JSONB); + +-- Insert safety limits +INSERT INTO pump_safety_limits ( + station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz, + hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m, + hard_max_power_kw, hard_max_flow_m3h, max_speed_change_hz_per_min, + set_by, approved_by, approved_at +) VALUES +-- Station 001 pumps +('STATION_001', 'PUMP_001', 20.0, 50.0, 1.0, 4.0, 4.5, 0.8, 80.0, 400.0, 5.0, + 'system_admin', 'safety_engineer', NOW()), + +('STATION_001', 'PUMP_002', 20.0, 50.0, 1.0, 4.0, 4.5, 0.8, 80.0, 400.0, 5.0, + 'system_admin', 'safety_engineer', NOW()), + +-- Station 002 pumps +('STATION_002', 'PUMP_001', 25.0, 45.0, 0.5, 3.5, 4.0, 0.5, 50.0, 250.0, 3.0, + 'system_admin', 'safety_engineer', NOW()), + +('STATION_002', 'PUMP_002', 25.0, 45.0, 0.5, 3.5, 4.0, 0.5, 50.0, 250.0, 3.0, + 'system_admin', 'safety_engineer', NOW()), + +-- Station 003 pumps +('STATION_003', 'PUMP_001', 20.0, 50.0, 0.8, 3.8, 4.2, 0.6, 60.0, 300.0, 4.0, + 'system_admin', 'safety_engineer', NOW()), + +('STATION_003', 'PUMP_002', 20.0, 50.0, 0.8, 3.8, 4.2, 0.6, 60.0, 300.0, 4.0, + 'system_admin', 'safety_engineer', NOW()); + +-- Insert sample pump plans (current and future intervals) +INSERT INTO pump_plans ( + station_id, pump_id, interval_start, interval_end, + target_flow_m3h, target_power_kw, target_level_m, suggested_speed_hz, + optimization_run_id +) VALUES +-- Current plans for all pumps +('STATION_001', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 320.5, 65.2, 2.5, 42.3, 1001), +('STATION_001', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 315.8, 63.8, 2.5, 41.7, 1001), +('STATION_002', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 180.2, 32.5, 1.8, 35.2, 1001), +('STATION_002', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 175.8, 31.8, 1.8, 34.8, 1001), +('STATION_003', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 245.6, 48.3, 2.2, 38.5, 1001), +('STATION_003', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 242.1, 47.6, 2.2, 38.1, 1001), + +-- Future plans +('STATION_001', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 335.2, 68.1, 2.6, 43.5, 1001), +('STATION_002', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 185.6, 33.8, 1.9, 36.2, 1001), +('STATION_003', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 252.3, 49.8, 2.3, 39.2, 1001); + +-- Insert sample feedback data +INSERT INTO pump_feedback ( + station_id, pump_id, timestamp, + actual_speed_hz, actual_power_kw, actual_flow_m3h, wet_well_level_m, pump_running, + alarm_active, alarm_code +) VALUES +-- Recent feedback for all pumps +('STATION_001', 'PUMP_001', NOW() - INTERVAL '2 minutes', 42.1, 64.8, 318.2, 2.48, TRUE, FALSE, NULL), +('STATION_001', 'PUMP_002', NOW() - INTERVAL '2 minutes', 41.5, 63.2, 312.5, 2.51, TRUE, FALSE, NULL), +('STATION_002', 'PUMP_001', NOW() - INTERVAL '2 minutes', 35.0, 32.1, 178.6, 1.79, TRUE, FALSE, NULL), +('STATION_002', 'PUMP_002', NOW() - INTERVAL '2 minutes', 34.6, 31.4, 173.2, 1.82, TRUE, FALSE, NULL), +('STATION_003', 'PUMP_001', NOW() - INTERVAL '2 minutes', 38.3, 47.9, 242.8, 2.21, TRUE, FALSE, NULL), +('STATION_003', 'PUMP_002', NOW() - INTERVAL '2 minutes', 37.9, 47.2, 239.5, 2.19, TRUE, FALSE, NULL), + +-- Historical feedback (for testing trends) +('STATION_001', 'PUMP_001', NOW() - INTERVAL '1 hour', 40.2, 62.1, 305.8, 2.45, TRUE, FALSE, NULL), +('STATION_002', 'PUMP_001', NOW() - INTERVAL '1 hour', 33.5, 30.2, 170.1, 1.75, TRUE, FALSE, NULL), +('STATION_003', 'PUMP_001', NOW() - INTERVAL '1 hour', 37.2, 45.8, 235.6, 2.15, TRUE, FALSE, NULL); + +-- Insert sample audit log entries +INSERT INTO audit_log ( + event_type, severity, station_id, pump_id, user_id, ip_address, + protocol, action, resource, result, event_data +) VALUES +('SYSTEM_STARTUP', 'INFO', NULL, NULL, 'system', '127.0.0.1', + 'INTERNAL', 'startup', 'system', 'SUCCESS', + '{"version": "2.0.0", "components_initialized": ["database", "safety", "protocols"]}'::JSONB), + +('SAFETY_LIMITS_LOADED', 'INFO', NULL, NULL, 'system', '127.0.0.1', + 'INTERNAL', 'load_safety_limits', 'safety_framework', 'SUCCESS', + '{"pump_count": 6, "limits_loaded": 6}'::JSONB), + +('AUTO_DISCOVERY', 'INFO', NULL, NULL, 'system', '127.0.0.1', + 'INTERNAL', 'discover_pumps', 'auto_discovery', 'SUCCESS', + '{"stations_discovered": 3, "pumps_discovered": 6}'::JSONB), + +('SETPOINT_UPDATE', 'INFO', 'STATION_001', 'PUMP_001', 'optimization_engine', '192.168.1.100', + 'REST_API', 'update_setpoint', 'pump_control', 'SUCCESS', + '{"requested_setpoint": 42.3, "enforced_setpoint": 42.3, "violations": []}'::JSONB); + +-- Verify data insertion +SELECT 'Pump Stations:' as "Verification", COUNT(*) as count FROM pump_stations +UNION ALL +SELECT 'Pumps:', COUNT(*) FROM pumps +UNION ALL +SELECT 'Safety Limits:', COUNT(*) FROM pump_safety_limits +UNION ALL +SELECT 'Pump Plans:', COUNT(*) FROM pump_plans +UNION ALL +SELECT 'Feedback Records:', COUNT(*) FROM pump_feedback +UNION ALL +SELECT 'Audit Log Entries:', COUNT(*) FROM audit_log; \ No newline at end of file diff --git a/src/core/auto_discovery.py b/src/core/auto_discovery.py index d556cca..c79c8f4 100644 --- a/src/core/auto_discovery.py +++ b/src/core/auto_discovery.py @@ -4,8 +4,10 @@ Auto-Discovery Module for Calejo Control Adapter. Automatically discovers pump stations and pumps from database on startup. """ -from typing import Dict, List +from typing import Dict, List, Optional +import asyncio import structlog +from datetime import datetime, timedelta from src.database.client import DatabaseClient @@ -15,14 +17,28 @@ logger = structlog.get_logger() class AutoDiscovery: """Auto-discovery module for pump stations and pumps.""" - def __init__(self, db_client: DatabaseClient): + def __init__(self, db_client: DatabaseClient, refresh_interval_minutes: int = 60): self.db_client = db_client + self.refresh_interval_minutes = refresh_interval_minutes self.pump_stations: Dict[str, Dict] = {} self.pumps: Dict[str, List[Dict]] = {} + self.last_discovery: Optional[datetime] = None + self.discovery_running = False async def discover(self): """Discover all pump stations and pumps from database.""" + if self.discovery_running: + logger.warning("auto_discovery_already_running") + return + + self.discovery_running = True try: + logger.info("auto_discovery_started") + + # Clear previous discovery + self.pump_stations.clear() + self.pumps.clear() + # Discover pump stations stations = self.db_client.get_pump_stations() for station in stations: @@ -36,10 +52,13 @@ class AutoDiscovery: self.pumps[station_id] = [] self.pumps[station_id].append(pump) + self.last_discovery = datetime.now() + logger.info( "auto_discovery_completed", station_count=len(self.pump_stations), - pump_count=len(pumps) + pump_count=len(pumps), + last_discovery=self.last_discovery.isoformat() ) # Log discovered stations and pumps @@ -58,31 +77,99 @@ class AutoDiscovery: station_id=station_id, pump_id=pump['pump_id'], pump_name=pump['pump_name'], - control_type=pump['control_type'] + control_type=pump['control_type'], + default_setpoint=pump['default_setpoint_hz'] ) except Exception as e: logger.error("auto_discovery_failed", error=str(e)) raise + finally: + self.discovery_running = False + + async def start_periodic_discovery(self): + """Start periodic auto-discovery in background.""" + logger.info( + "periodic_discovery_started", + refresh_interval_minutes=self.refresh_interval_minutes + ) + + while True: + await asyncio.sleep(self.refresh_interval_minutes * 60) # Convert to seconds + try: + await self.discover() + except Exception as e: + logger.error("periodic_discovery_failed", error=str(e)) def get_stations(self) -> Dict[str, Dict]: """Get all discovered pump stations.""" - return self.pump_stations + return self.pump_stations.copy() - def get_pumps(self, station_id: str = None) -> List[Dict]: + def get_pumps(self, station_id: Optional[str] = None) -> List[Dict]: """Get all discovered pumps, optionally filtered by station.""" if station_id: - return self.pumps.get(station_id, []) + return self.pumps.get(station_id, []).copy() else: all_pumps = [] for station_pumps in self.pumps.values(): all_pumps.extend(station_pumps) return all_pumps - def get_pump(self, station_id: str, pump_id: str) -> Dict: + def get_pump(self, station_id: str, pump_id: str) -> Optional[Dict]: """Get a specific pump by station and pump ID.""" station_pumps = self.pumps.get(station_id, []) for pump in station_pumps: if pump['pump_id'] == pump_id: - return pump - return None \ No newline at end of file + return pump.copy() + return None + + def get_station(self, station_id: str) -> Optional[Dict]: + """Get a specific station by ID.""" + return self.pump_stations.get(station_id) + + def get_discovery_status(self) -> Dict[str, any]: + """Get auto-discovery status information.""" + return { + "last_discovery": self.last_discovery.isoformat() if self.last_discovery else None, + "station_count": len(self.pump_stations), + "pump_count": sum(len(pumps) for pumps in self.pumps.values()), + "refresh_interval_minutes": self.refresh_interval_minutes, + "discovery_running": self.discovery_running + } + + def is_stale(self, max_age_minutes: int = 120) -> bool: + """Check if discovery data is stale.""" + if not self.last_discovery: + return True + + age = datetime.now() - self.last_discovery + return age > timedelta(minutes=max_age_minutes) + + def validate_discovery(self) -> Dict[str, any]: + """Validate discovered data for consistency.""" + issues = [] + + # Check if all pumps have corresponding stations + for station_id, pumps in self.pumps.items(): + if station_id not in self.pump_stations: + issues.append(f"Pumps found for unknown station: {station_id}") + + # Check if all stations have pumps + for station_id in self.pump_stations: + if station_id not in self.pumps or not self.pumps[station_id]: + issues.append(f"No pumps found for station: {station_id}") + + # Check pump data completeness + for station_id, pumps in self.pumps.items(): + for pump in pumps: + if not pump.get('control_type'): + issues.append(f"Pump {station_id}/{pump['pump_id']} missing control_type") + if not pump.get('default_setpoint_hz'): + issues.append(f"Pump {station_id}/{pump['pump_id']} missing default_setpoint_hz") + + return { + "valid": len(issues) == 0, + "issues": issues, + "station_count": len(self.pump_stations), + "pump_count": sum(len(pumps) for pumps in self.pumps.values()) + } \ No newline at end of file diff --git a/src/core/logging.py b/src/core/logging.py new file mode 100644 index 0000000..c9db0f6 --- /dev/null +++ b/src/core/logging.py @@ -0,0 +1,136 @@ +""" +Structured logging setup for Calejo Control Adapter. +""" + +import structlog +import logging +import sys +import uuid +from datetime import datetime +from typing import Dict, Any, Optional + +from config.settings import settings + + +def setup_logging(): + """Setup structured logging with JSON formatting.""" + + # Configure standard logging + logging.basicConfig( + format="%(message)s", + stream=sys.stdout, + level=getattr(logging, settings.log_level) + ) + + # Configure structlog + processors = [ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + add_correlation_id, + add_application_info, + ] + + if settings.log_format == "json": + processors.append(structlog.processors.JSONRenderer()) + else: + processors.append(structlog.dev.ConsoleRenderer()) + + structlog.configure( + processors=processors, + wrapper_class=structlog.stdlib.BoundLogger, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, + ) + + logger = structlog.get_logger() + logger.info( + "logging_configured", + log_level=settings.log_level, + log_format=settings.log_format, + environment=settings.environment + ) + + return logger + + +def add_correlation_id(_, __, event_dict: Dict[str, Any]) -> Dict[str, Any]: + """Add correlation ID to log entries for request tracing.""" + if 'correlation_id' not in event_dict: + event_dict['correlation_id'] = str(uuid.uuid4()) + return event_dict + + +def add_application_info(_, __, event_dict: Dict[str, Any]) -> Dict[str, Any]: + """Add application information to log entries.""" + event_dict['app_name'] = settings.app_name + event_dict['app_version'] = settings.app_version + event_dict['environment'] = settings.environment + return event_dict + + +class AuditLogger: + """Audit logger for compliance requirements.""" + + def __init__(self, db_client): + self.db_client = db_client + self.logger = structlog.get_logger(__name__) + + def log( + self, + event_type: str, + severity: str, + station_id: Optional[str] = None, + pump_id: Optional[str] = None, + user_id: Optional[str] = None, + ip_address: Optional[str] = None, + protocol: Optional[str] = None, + action: Optional[str] = None, + resource: Optional[str] = None, + result: Optional[str] = None, + event_data: Optional[Dict[str, Any]] = None + ): + """Log an audit event to both structured logs and database.""" + + # Log to structured logs + self.logger.info( + "audit_event", + event_type=event_type, + severity=severity, + station_id=station_id, + pump_id=pump_id, + user_id=user_id, + ip_address=ip_address, + protocol=protocol, + action=action, + resource=resource, + result=result, + event_data=event_data + ) + + # Log to database if audit logging is enabled + if settings.audit_log_enabled: + try: + query = """ + INSERT INTO audit_log + (event_type, severity, station_id, pump_id, user_id, ip_address, + protocol, action, resource, result, event_data) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + self.db_client.execute( + query, + ( + event_type, severity, station_id, pump_id, user_id, ip_address, + protocol, action, resource, result, event_data + ) + ) + except Exception as e: + self.logger.error("audit_log_database_failed", error=str(e)) + + +# Global logger instance +logger = setup_logging() \ No newline at end of file diff --git a/src/database/client.py b/src/database/client.py index a3066a8..a17011c 100644 --- a/src/database/client.py +++ b/src/database/client.py @@ -5,46 +5,86 @@ Database client for Calejo Control Adapter. import asyncio import psycopg2 import psycopg2.extras -from typing import List, Dict, Any, Optional +import psycopg2.pool +from typing import List, Dict, Any, Optional, Tuple import structlog +from contextlib import contextmanager logger = structlog.get_logger() class DatabaseClient: - """Database client for PostgreSQL operations.""" + """Database client for PostgreSQL operations with connection pooling.""" - def __init__(self, database_url: str): + def __init__(self, database_url: str, min_connections: int = 2, max_connections: int = 10): self.database_url = database_url - self.connection = None - self.cursor = None - + self.min_connections = min_connections + self.max_connections = max_connections + self.connection_pool = None + async def connect(self): - """Connect to the PostgreSQL database.""" + """Connect to the PostgreSQL database and initialize connection pool.""" try: - self.connection = psycopg2.connect( - self.database_url, + # Create connection pool + self.connection_pool = psycopg2.pool.ThreadedConnectionPool( + minconn=self.min_connections, + maxconn=self.max_connections, + dsn=self.database_url, cursor_factory=psycopg2.extras.RealDictCursor ) - self.cursor = self.connection.cursor() - logger.info("database_connected") + + # Test connection + with self._get_connection() as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT version();") + version = cursor.fetchone() + logger.info("database_connected", version=version['version']) + except Exception as e: logger.error("database_connection_failed", error=str(e)) raise async def disconnect(self): - """Disconnect from the database.""" - if self.cursor: - self.cursor.close() - if self.connection: - self.connection.close() - logger.info("database_disconnected") + """Disconnect from the database and close connection pool.""" + if self.connection_pool: + self.connection_pool.closeall() + logger.info("database_disconnected") + + @contextmanager + def _get_connection(self): + """Get a connection from the pool with context management.""" + if not self.connection_pool: + raise RuntimeError("Database connection pool not initialized") + + conn = self.connection_pool.getconn() + try: + yield conn + except Exception as e: + conn.rollback() + raise + finally: + self.connection_pool.putconn(conn) + + @contextmanager + def _get_cursor(self): + """Get a cursor from a connection with context management.""" + with self._get_connection() as conn: + cursor = conn.cursor() + try: + yield cursor + conn.commit() + except Exception as e: + conn.rollback() + raise + finally: + cursor.close() def execute_query(self, query: str, params: tuple = None) -> List[Dict[str, Any]]: """Execute a SELECT query and return results.""" try: - self.cursor.execute(query, params) - return self.cursor.fetchall() + with self._get_cursor() as cursor: + cursor.execute(query, params) + return cursor.fetchall() except Exception as e: logger.error("query_execution_failed", query=query, error=str(e)) raise @@ -52,10 +92,9 @@ class DatabaseClient: def execute(self, query: str, params: tuple = None) -> None: """Execute an INSERT/UPDATE/DELETE query.""" try: - self.cursor.execute(query, params) - self.connection.commit() + with self._get_cursor() as cursor: + cursor.execute(query, params) except Exception as e: - self.connection.rollback() logger.error("query_execution_failed", query=query, error=str(e)) raise @@ -125,4 +164,26 @@ class DatabaseClient: ORDER BY timestamp DESC LIMIT %s """ - return self.execute_query(query, (station_id, pump_id, limit)) \ No newline at end of file + return self.execute_query(query, (station_id, pump_id, limit)) + + def health_check(self) -> bool: + """Check if database is healthy and responsive.""" + try: + with self._get_cursor() as cursor: + cursor.execute("SELECT 1 as health_check;") + result = cursor.fetchone() + return result['health_check'] == 1 + except Exception as e: + logger.error("database_health_check_failed", error=str(e)) + return False + + def get_connection_stats(self) -> Dict[str, Any]: + """Get connection pool statistics.""" + if not self.connection_pool: + return {"status": "pool_not_initialized"} + + return { + "min_connections": self.min_connections, + "max_connections": self.max_connections, + "pool_status": "active" + } \ No newline at end of file diff --git a/src/main_phase1.py b/src/main_phase1.py new file mode 100644 index 0000000..cbc1c13 --- /dev/null +++ b/src/main_phase1.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +""" +Calejo Control Adapter - Phase 1 Implementation + +Core infrastructure implementation including: +- Database setup and connection pooling +- Auto-discovery of pump stations and pumps +- Configuration management +- Structured logging and audit system +""" + +import asyncio +import signal +import sys + +from src.database.client import DatabaseClient +from src.core.auto_discovery import AutoDiscovery +from src.core.safety import SafetyLimitEnforcer +from src.core.logging import setup_logging, AuditLogger +from config.settings import settings + + +class CalejoControlAdapterPhase1: + """Main application class for Phase 1 implementation.""" + + def __init__(self): + # Setup logging first + self.logger = setup_logging() + + # Initialize core components + self.db_client = DatabaseClient( + database_url=settings.database_url, + min_connections=settings.db_min_connections, + max_connections=settings.db_max_connections + ) + self.auto_discovery = AutoDiscovery( + db_client=self.db_client, + refresh_interval_minutes=settings.auto_discovery_refresh_minutes + ) + self.safety_enforcer = SafetyLimitEnforcer(self.db_client) + self.audit_logger = AuditLogger(self.db_client) + + self.running = False + self.startup_time = None + + async def start(self): + """Start the Phase 1 application.""" + self.startup_time = asyncio.get_event_loop().time() + + self.logger.info( + "phase1_application_starting", + app_name=settings.app_name, + version=settings.app_version, + environment=settings.environment + ) + + try: + # Connect to database + await self.db_client.connect() + + # Initialize auto-discovery + if settings.auto_discovery_enabled: + await self.auto_discovery.discover() + + # Validate discovery + validation = self.auto_discovery.validate_discovery() + if not validation['valid']: + self.logger.warning( + "auto_discovery_validation_issues", + issues=validation['issues'] + ) + + # Start periodic discovery + asyncio.create_task(self.auto_discovery.start_periodic_discovery()) + + # Initialize safety framework + await self.safety_enforcer.load_safety_limits() + + # Log startup to audit trail + self.audit_logger.log( + event_type='SYSTEM_STARTUP', + severity='INFO', + user_id='system', + action='startup', + resource='system', + result='SUCCESS', + event_data={ + 'version': settings.app_version, + 'phase': '1', + 'components_initialized': ['database', 'auto_discovery', 'safety'] + } + ) + + self.running = True + startup_duration = asyncio.get_event_loop().time() - self.startup_time + + self.logger.info( + "phase1_application_started", + startup_duration_seconds=round(startup_duration, 2), + station_count=len(self.auto_discovery.get_stations()), + pump_count=len(self.auto_discovery.get_pumps()), + safety_limits_loaded=len(self.safety_enforcer.safety_limits_cache) + ) + + # Print status information + self.print_status() + + # Keep application running + while self.running: + await asyncio.sleep(1) + + except Exception as e: + self.logger.error("phase1_application_startup_failed", error=str(e)) + await self.stop() + raise + + async def stop(self): + """Stop the application.""" + self.logger.info("phase1_application_stopping") + self.running = False + + # Log shutdown to audit trail + self.audit_logger.log( + event_type='SYSTEM_SHUTDOWN', + severity='INFO', + user_id='system', + action='shutdown', + resource='system', + result='SUCCESS' + ) + + # Disconnect from database + await self.db_client.disconnect() + + self.logger.info("phase1_application_stopped") + + def get_status(self) -> dict: + """Get application status information.""" + return { + "running": self.running, + "app_name": settings.app_name, + "version": settings.app_version, + "environment": settings.environment, + "database": self.db_client.get_connection_stats(), + "auto_discovery": self.auto_discovery.get_discovery_status(), + "safety_limits_loaded": len(self.safety_enforcer.safety_limits_cache) + } + + def print_status(self): + """Print human-readable status information.""" + status = self.get_status() + + print("\n" + "="*60) + print(f"Calejo Control Adapter - Phase 1 Status") + print("="*60) + print(f"Application: {status['app_name']} v{status['version']}") + print(f"Environment: {status['environment']}") + print(f"Status: {'RUNNING' if status['running'] else 'STOPPED'}") + print() + + # Database status + db_stats = status['database'] + print(f"Database:") + print(f" Status: {db_stats.get('pool_status', 'N/A')}") + print(f" Connections: {db_stats.get('min_connections', 'N/A')}-{db_stats.get('max_connections', 'N/A')}") + + # Auto-discovery status + discovery_status = status['auto_discovery'] + print(f"\nAuto-Discovery:") + print(f" Stations: {discovery_status['station_count']}") + print(f" Pumps: {discovery_status['pump_count']}") + print(f" Last Discovery: {discovery_status['last_discovery'] or 'Never'}") + + # Safety framework status + print(f"\nSafety Framework:") + print(f" Limits Loaded: {status['safety_limits_loaded']}") + + print("="*60) + print("\nPress Ctrl+C to stop the application\n") + + +async def main(): + """Main application entry point for Phase 1.""" + + app = CalejoControlAdapterPhase1() + + # Setup signal handlers for graceful shutdown + def signal_handler(signum, frame): + logger = app.logger + logger.info("signal_received", signal=signum) + asyncio.create_task(app.stop()) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + await app.start() + except KeyboardInterrupt: + await app.stop() + except Exception as e: + logger = app.logger + logger.error("phase1_application_failed", error=str(e)) + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tests/test_phase1.py b/tests/test_phase1.py new file mode 100755 index 0000000..93af9f3 --- /dev/null +++ b/tests/test_phase1.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python3 +""" +Test script for Phase 1 implementation. + +Tests core infrastructure components: +- Database connectivity and queries +- Auto-discovery functionality +- Configuration management +- Safety framework loading +""" + +import asyncio +import sys +import os + +# Add src to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from src.database.client import DatabaseClient +from src.core.auto_discovery import AutoDiscovery +from src.core.safety import SafetyLimitEnforcer +from src.core.logging import setup_logging +from config.settings import settings + + +class Phase1Tester: + """Test class for Phase 1 components.""" + + def __init__(self): + self.logger = setup_logging() + self.db_client = DatabaseClient( + database_url=settings.database_url, + min_connections=settings.db_min_connections, + max_connections=settings.db_max_connections + ) + self.auto_discovery = AutoDiscovery(self.db_client) + self.safety_enforcer = SafetyLimitEnforcer(self.db_client) + + self.tests_passed = 0 + self.tests_failed = 0 + + async def run_all_tests(self): + """Run all Phase 1 tests.""" + self.logger.info("starting_phase1_tests") + + print("\n" + "="*60) + print("Calejo Control Adapter - Phase 1 Test Suite") + print("="*60) + + try: + # Test 1: Database connection + await self.test_database_connection() + + # Test 2: Database queries + await self.test_database_queries() + + # Test 3: Auto-discovery + await self.test_auto_discovery() + + # Test 4: Safety framework + await self.test_safety_framework() + + # Test 5: Configuration + await self.test_configuration() + + # Print summary + self.print_test_summary() + + except Exception as e: + self.logger.error("test_suite_failed", error=str(e)) + raise + finally: + await self.db_client.disconnect() + + async def test_database_connection(self): + """Test database connectivity.""" + print("\n1. Testing Database Connection...") + + try: + await self.db_client.connect() + + # Test health check + is_healthy = self.db_client.health_check() + if is_healthy: + print(" ✓ Database connection successful") + self.tests_passed += 1 + else: + print(" ✗ Database health check failed") + self.tests_failed += 1 + + except Exception as e: + print(f" ✗ Database connection failed: {e}") + self.tests_failed += 1 + raise + + async def test_database_queries(self): + """Test database queries.""" + print("\n2. Testing Database Queries...") + + try: + # Test pump stations query + stations = self.db_client.get_pump_stations() + print(f" ✓ Found {len(stations)} pump stations") + + # Test pumps query + pumps = self.db_client.get_pumps() + print(f" ✓ Found {len(pumps)} pumps") + + # Test safety limits query + safety_limits = self.db_client.get_safety_limits() + print(f" ✓ Found {len(safety_limits)} safety limits") + + # Test pump plans query + pump_plans = self.db_client.get_latest_pump_plans() + print(f" ✓ Found {len(pump_plans)} active pump plans") + + self.tests_passed += 1 + + except Exception as e: + print(f" ✗ Database queries failed: {e}") + self.tests_failed += 1 + raise + + async def test_auto_discovery(self): + """Test auto-discovery functionality.""" + print("\n3. Testing Auto-Discovery...") + + try: + await self.auto_discovery.discover() + + stations = self.auto_discovery.get_stations() + pumps = self.auto_discovery.get_pumps() + + print(f" ✓ Discovered {len(stations)} stations") + print(f" ✓ Discovered {len(pumps)} pumps") + + # Test individual station/pump retrieval + if stations: + station_id = list(stations.keys())[0] + station = self.auto_discovery.get_station(station_id) + if station: + print(f" ✓ Station retrieval successful: {station['station_name']}") + + station_pumps = self.auto_discovery.get_pumps(station_id) + if station_pumps: + pump_id = station_pumps[0]['pump_id'] + pump = self.auto_discovery.get_pump(station_id, pump_id) + if pump: + print(f" ✓ Pump retrieval successful: {pump['pump_name']}") + + # Test validation + validation = self.auto_discovery.validate_discovery() + if validation['valid']: + print(" ✓ Auto-discovery validation passed") + else: + print(f" ⚠ Auto-discovery validation issues: {validation['issues']}") + + self.tests_passed += 1 + + except Exception as e: + print(f" ✗ Auto-discovery failed: {e}") + self.tests_failed += 1 + raise + + async def test_safety_framework(self): + """Test safety framework loading.""" + print("\n4. Testing Safety Framework...") + + try: + await self.safety_enforcer.load_safety_limits() + + limits_count = len(self.safety_enforcer.safety_limits_cache) + print(f" ✓ Loaded {limits_count} safety limits") + + # Test setpoint enforcement + if limits_count > 0: + # Get first pump with safety limits + pumps = self.auto_discovery.get_pumps() + if pumps: + pump = pumps[0] + station_id = pump['station_id'] + pump_id = pump['pump_id'] + + # Test within limits + enforced, violations = self.safety_enforcer.enforce_setpoint( + station_id, pump_id, 35.0 + ) + if enforced == 35.0 and not violations: + print(" ✓ Setpoint enforcement within limits successful") + + # Test below minimum + enforced, violations = self.safety_enforcer.enforce_setpoint( + station_id, pump_id, 10.0 + ) + if enforced > 10.0 and violations: + print(" ✓ Setpoint enforcement below minimum successful") + + self.tests_passed += 1 + + except Exception as e: + print(f" ✗ Safety framework failed: {e}") + self.tests_failed += 1 + raise + + async def test_configuration(self): + """Test configuration management.""" + print("\n5. Testing Configuration Management...") + + try: + # Test database URL generation + db_url = settings.database_url + if db_url: + print(" ✓ Database URL generation successful") + + # Test safe settings dict + safe_settings = settings.get_safe_dict() + if 'db_password' in safe_settings and safe_settings['db_password'] == '***MASKED***': + print(" ✓ Sensitive field masking successful") + + # Test configuration validation + print(f" ✓ Configuration loaded: {settings.app_name} v{settings.app_version}") + print(f" ✓ Environment: {settings.environment}") + + self.tests_passed += 1 + + except Exception as e: + print(f" ✗ Configuration test failed: {e}") + self.tests_failed += 1 + raise + + def print_test_summary(self): + """Print test summary.""" + print("\n" + "="*60) + print("TEST SUMMARY") + print("="*60) + print(f"Tests Passed: {self.tests_passed}") + print(f"Tests Failed: {self.tests_failed}") + + total_tests = self.tests_passed + self.tests_failed + if total_tests > 0: + success_rate = (self.tests_passed / total_tests) * 100 + print(f"Success Rate: {success_rate:.1f}%") + + if self.tests_failed == 0: + print("\n🎉 All Phase 1 tests passed!") + print("Phase 1 implementation is ready for development.") + else: + print(f"\n⚠ {self.tests_failed} test(s) failed.") + print("Please review the failed tests before proceeding.") + + print("="*60) + + +async def main(): + """Run Phase 1 tests.""" + tester = Phase1Tester() + await tester.run_all_tests() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file