Complete Phase 1: Core Infrastructure & Database Setup

- Created comprehensive database schema with all required tables
- Implemented database client with connection pooling and async support
- Enhanced auto-discovery module with periodic refresh and validation
- Completed configuration management with comprehensive settings
- Set up structured logging with JSON formatting and audit trail
- Created test suite for Phase 1 functionality
- Added main_phase1.py for Phase 1 demonstration

Key Features:
- PostgreSQL schema with 9 tables including audit_log, emergency_stop_events, etc.
- Connection pooling with health monitoring
- Auto-discovery with station and pump validation
- Configuration validation and sensitive field masking
- Structured logging with correlation IDs
- Comprehensive test suite
This commit is contained in:
openhands 2025-10-26 19:04:51 +00:00
parent 941bed9096
commit 097574733e
9 changed files with 1417 additions and 53 deletions

View File

@ -1,41 +1,160 @@
"""
Application settings for Calejo Control Adapter.
Configuration management for Calejo Control Adapter.
"""
from pydantic_settings import BaseSettings
from typing import Optional
from typing import Optional, List
from pydantic import validator
import os
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# Database
database_url: str = "postgresql://control_reader:secure_password@localhost:5432/calejo"
# Database configuration
db_host: str = "localhost"
db_port: int = 5432
db_name: str = "calejo"
db_user: str = "control_reader"
db_password: str = "secure_password"
db_min_connections: int = 2
db_max_connections: int = 10
# Protocol endpoints
opc_ua_endpoint: str = "opc.tcp://0.0.0.0:4840"
# Station filter (optional)
station_filter: Optional[str] = None
# Security
api_key: str = "your_api_key_here"
tls_enabled: bool = True
tls_cert_path: Optional[str] = None
tls_key_path: Optional[str] = None
# OPC UA
opcua_enabled: bool = True
opcua_port: int = 4840
opcua_security_mode: str = "SignAndEncrypt"
opcua_cert_path: Optional[str] = None
opcua_key_path: Optional[str] = None
# Modbus TCP
modbus_enabled: bool = True
modbus_port: int = 502
modbus_slave_id: int = 1
# REST API
rest_api_enabled: bool = True
rest_api_port: int = 8080
rest_api_cors_enabled: bool = True
# Safety settings
safety_timeout_seconds: int = 1200 # 20 minutes
# Safety - Watchdog
watchdog_enabled: bool = True
watchdog_timeout_seconds: int = 1200 # 20 minutes
watchdog_check_interval_seconds: int = 60 # Check every minute
# Security settings
jwt_secret_key: str = "your-secret-key-change-in-production"
jwt_algorithm: str = "HS256"
# Auto-Discovery
auto_discovery_enabled: bool = True
auto_discovery_refresh_minutes: int = 60
# Alert settings
smtp_server: Optional[str] = None
smtp_port: Optional[int] = 587
smtp_username: Optional[str] = None
smtp_password: Optional[str] = None
twilio_account_sid: Optional[str] = None
twilio_auth_token: Optional[str] = None
twilio_phone_number: Optional[str] = None
# Alerts - Email
alert_email_enabled: bool = True
alert_email_from: str = "calejo-control@example.com"
alert_email_recipients: List[str] = ["operator1@utility.it", "operator2@utility.it"]
smtp_host: str = "smtp.gmail.com"
smtp_port: int = 587
smtp_username: str = "calejo-control@example.com"
smtp_password: str = "smtp_password"
smtp_use_tls: bool = True
# Alerts - SMS
alert_sms_enabled: bool = True
alert_sms_recipients: List[str] = ["+393401234567", "+393407654321"]
twilio_account_sid: str = "your_twilio_account_sid"
twilio_auth_token: str = "your_twilio_auth_token"
twilio_phone_number: str = "+15551234567"
# Alerts - Webhook
alert_webhook_enabled: bool = True
alert_webhook_url: str = "https://utility-monitoring.example.com/webhook"
alert_webhook_token: str = "webhook_bearer_token"
# Alerts - SCADA
alert_scada_enabled: bool = True
# Logging
log_level: str = "INFO"
log_format: str = "json"
audit_log_enabled: bool = True
# Application
app_name: str = "Calejo Control Adapter"
app_version: str = "2.0.0"
environment: str = "development"
@property
def database_url(self) -> str:
"""Generate database URL from components."""
return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"
@validator('db_port')
def validate_db_port(cls, v):
if not 1 <= v <= 65535:
raise ValueError('Database port must be between 1 and 65535')
return v
@validator('opcua_port')
def validate_opcua_port(cls, v):
if not 1 <= v <= 65535:
raise ValueError('OPC UA port must be between 1 and 65535')
return v
@validator('modbus_port')
def validate_modbus_port(cls, v):
if not 1 <= v <= 65535:
raise ValueError('Modbus port must be between 1 and 65535')
return v
@validator('rest_api_port')
def validate_rest_api_port(cls, v):
if not 1 <= v <= 65535:
raise ValueError('REST API port must be between 1 and 65535')
return v
@validator('log_level')
def validate_log_level(cls, v):
valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
if v.upper() not in valid_levels:
raise ValueError(f'Log level must be one of: {valid_levels}')
return v.upper()
@validator('alert_email_recipients', 'alert_sms_recipients', pre=True)
def parse_recipients(cls, v):
if isinstance(v, str):
return [recipient.strip() for recipient in v.split(',')]
return v
def get_sensitive_fields(self) -> List[str]:
"""Get list of sensitive fields for logging/masking."""
return [
'db_password',
'api_key',
'smtp_password',
'twilio_auth_token',
'alert_webhook_token'
]
def get_safe_dict(self) -> dict:
"""Get settings dictionary with sensitive fields masked."""
settings_dict = self.dict()
for field in self.get_sensitive_fields():
if field in settings_dict and settings_dict[field]:
settings_dict[field] = '***MASKED***'
return settings_dict
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = False
# Global settings instance
settings = Settings()

257
database/schema.sql Normal file
View File

@ -0,0 +1,257 @@
-- Calejo Control Adapter Database Schema
-- Version: 2.0
-- Date: October 26, 2025
-- Drop existing tables if they exist (for clean setup)
DROP TABLE IF EXISTS audit_log CASCADE;
DROP TABLE IF EXISTS emergency_stop_events CASCADE;
DROP TABLE IF EXISTS failsafe_events CASCADE;
DROP TABLE IF EXISTS safety_limit_violations CASCADE;
DROP TABLE IF EXISTS pump_safety_limits CASCADE;
DROP TABLE IF EXISTS pump_feedback CASCADE;
DROP TABLE IF EXISTS pump_plans CASCADE;
DROP TABLE IF EXISTS pumps CASCADE;
DROP TABLE IF EXISTS pump_stations CASCADE;
-- Create pump_stations table
CREATE TABLE pump_stations (
station_id VARCHAR(50) PRIMARY KEY,
station_name VARCHAR(200) NOT NULL,
location VARCHAR(200),
latitude DECIMAL(10, 8),
longitude DECIMAL(11, 8),
timezone VARCHAR(50) DEFAULT 'Europe/Rome',
active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
COMMENT ON TABLE pump_stations IS 'Metadata about pump stations';
COMMENT ON COLUMN pump_stations.timezone IS 'Timezone for the pump station (default: Europe/Rome for Italian utilities)';
-- Create pumps table
CREATE TABLE pumps (
pump_id VARCHAR(50) NOT NULL,
station_id VARCHAR(50) NOT NULL,
pump_name VARCHAR(200),
pump_type VARCHAR(50), -- 'SUBMERSIBLE', 'CENTRIFUGAL', etc.
control_type VARCHAR(50) NOT NULL, -- 'LEVEL_CONTROLLED', 'POWER_CONTROLLED', 'DIRECT_SPEED'
manufacturer VARCHAR(100),
model VARCHAR(100),
rated_power_kw DECIMAL(10, 2),
min_speed_hz DECIMAL(5, 2) DEFAULT 20.0,
max_speed_hz DECIMAL(5, 2) DEFAULT 50.0,
-- Default setpoint (used in failsafe mode)
default_setpoint_hz DECIMAL(5, 2) NOT NULL DEFAULT 35.0,
-- Control-specific parameters (JSON)
control_parameters JSONB,
active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (station_id, pump_id),
FOREIGN KEY (station_id) REFERENCES pump_stations(station_id)
);
COMMENT ON TABLE pumps IS 'Metadata about individual pumps';
COMMENT ON COLUMN pumps.default_setpoint_hz IS 'Default safe setpoint used in failsafe mode (existing pump configuration)';
COMMENT ON COLUMN pumps.control_parameters IS 'Control-specific parameters in JSON format (PID gains, thresholds, etc.)';
-- Create pump_plans table
CREATE TABLE pump_plans (
plan_id SERIAL PRIMARY KEY,
station_id VARCHAR(50) NOT NULL,
pump_id VARCHAR(50) NOT NULL,
interval_start TIMESTAMP NOT NULL,
interval_end TIMESTAMP NOT NULL,
-- Optimization outputs
target_flow_m3h DECIMAL(10, 2),
target_power_kw DECIMAL(10, 2),
target_level_m DECIMAL(5, 2),
suggested_speed_hz DECIMAL(5, 2),
-- Metadata
plan_created_at TIMESTAMP DEFAULT NOW(),
optimization_run_id INTEGER,
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
);
COMMENT ON TABLE pump_plans IS 'Optimization plans generated by Calejo Optimize';
-- Create index for active pump plans
CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end);
-- Create pump_feedback table
CREATE TABLE pump_feedback (
feedback_id SERIAL PRIMARY KEY,
station_id VARCHAR(50) NOT NULL,
pump_id VARCHAR(50) NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
-- Actual measurements
actual_speed_hz DECIMAL(5, 2),
actual_power_kw DECIMAL(10, 2),
actual_flow_m3h DECIMAL(10, 2),
wet_well_level_m DECIMAL(5, 2),
pump_running BOOLEAN,
-- Status
alarm_active BOOLEAN DEFAULT FALSE,
alarm_code VARCHAR(50),
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
);
COMMENT ON TABLE pump_feedback IS 'Real-time feedback from pumps';
-- Create index for latest feedback
CREATE INDEX idx_pump_feedback_latest ON pump_feedback(station_id, pump_id, timestamp DESC);
-- Create pump_safety_limits table
CREATE TABLE pump_safety_limits (
station_id VARCHAR(50) NOT NULL,
pump_id VARCHAR(50) NOT NULL,
-- Speed limits (Layer 2: Station Safety Limits)
hard_min_speed_hz DECIMAL(5, 2) NOT NULL,
hard_max_speed_hz DECIMAL(5, 2) NOT NULL,
-- Level limits
hard_min_level_m DECIMAL(5, 2),
hard_max_level_m DECIMAL(5, 2),
emergency_stop_level_m DECIMAL(5, 2), -- Emergency stop if exceeded
dry_run_protection_level_m DECIMAL(5, 2), -- Stop pump if level too low
-- Power and flow limits
hard_max_power_kw DECIMAL(10, 2),
hard_max_flow_m3h DECIMAL(10, 2),
-- Operational limits
max_starts_per_hour INTEGER DEFAULT 6,
min_run_time_seconds INTEGER DEFAULT 300,
max_continuous_run_hours INTEGER DEFAULT 24,
-- Rate of change limits (prevent sudden changes that damage equipment)
max_speed_change_hz_per_min DECIMAL(5, 2) DEFAULT 5.0,
-- Metadata
set_by VARCHAR(100),
set_at TIMESTAMP DEFAULT NOW(),
approved_by VARCHAR(100), -- Dual approval
approved_at TIMESTAMP,
notes TEXT,
PRIMARY KEY (station_id, pump_id),
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id),
-- Constraints
CONSTRAINT check_speed_limits CHECK (hard_min_speed_hz >= 15 AND hard_max_speed_hz <= 55),
CONSTRAINT check_min_max CHECK (hard_min_speed_hz < hard_max_speed_hz),
CONSTRAINT check_approved CHECK (approved_by IS NULL OR approved_by != set_by) -- Dual approval
);
COMMENT ON TABLE pump_safety_limits IS 'Hard operational limits enforced by Calejo Control adapter (Layer 2)';
COMMENT ON COLUMN pump_safety_limits.hard_min_speed_hz IS 'Minimum speed enforced by adapter (must be >= PLC physical limit)';
COMMENT ON COLUMN pump_safety_limits.hard_max_speed_hz IS 'Maximum speed enforced by adapter (must be <= PLC physical limit)';
-- Create safety_limit_violations table
CREATE TABLE safety_limit_violations (
violation_id SERIAL PRIMARY KEY,
station_id VARCHAR(50) NOT NULL,
pump_id VARCHAR(50) NOT NULL,
requested_setpoint DECIMAL(5, 2),
enforced_setpoint DECIMAL(5, 2),
violations TEXT[], -- Array of violation descriptions
timestamp TIMESTAMP DEFAULT NOW(),
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
);
COMMENT ON TABLE safety_limit_violations IS 'Audit trail of safety limit violations (immutable)';
-- Create index for violation timestamp
CREATE INDEX idx_violations_timestamp ON safety_limit_violations(timestamp DESC);
-- Create failsafe_events table
CREATE TABLE failsafe_events (
event_id SERIAL PRIMARY KEY,
station_id VARCHAR(50) NOT NULL,
pump_id VARCHAR(50) NOT NULL,
event_type VARCHAR(50) NOT NULL, -- 'DATABASE_TIMEOUT', 'COMMUNICATION_LOSS', 'INVALID_DATA'
default_setpoint DECIMAL(5, 2),
triggered_by VARCHAR(100),
timestamp TIMESTAMP DEFAULT NOW(),
cleared_at TIMESTAMP,
notes TEXT,
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
);
COMMENT ON TABLE failsafe_events IS 'Record of failsafe mode activations';
-- Create index for active failsafe events
CREATE INDEX idx_failsafe_events_active ON failsafe_events(station_id, pump_id, timestamp DESC)
WHERE cleared_at IS NULL;
-- Create emergency_stop_events table
CREATE TABLE emergency_stop_events (
event_id SERIAL PRIMARY KEY,
station_id VARCHAR(50), -- NULL = all stations
pump_id VARCHAR(50), -- NULL = all pumps at station
triggered_by VARCHAR(100) NOT NULL,
reason TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT NOW(),
cleared_at TIMESTAMP,
cleared_by VARCHAR(100),
clear_notes TEXT,
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
);
COMMENT ON TABLE emergency_stop_events IS 'Emergency stop event log (big red button)';
-- Create index for active emergency stops
CREATE INDEX idx_emergency_stop_active ON emergency_stop_events(timestamp DESC)
WHERE cleared_at IS NULL;
-- Create audit_log table
CREATE TABLE audit_log (
log_id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMP DEFAULT NOW(),
event_type VARCHAR(50) NOT NULL,
severity VARCHAR(20) NOT NULL, -- 'INFO', 'WARNING', 'CRITICAL'
station_id VARCHAR(50),
pump_id VARCHAR(50),
user_id VARCHAR(100),
ip_address INET,
protocol VARCHAR(20), -- 'OPC_UA', 'MODBUS', 'REST_API'
action VARCHAR(100),
resource VARCHAR(200),
result VARCHAR(20), -- 'SUCCESS', 'FAILURE', 'DENIED'
event_data JSONB
);
COMMENT ON TABLE audit_log IS 'Immutable audit trail for compliance (IEC 62443, ISO 27001, NIS2 Directive)';
-- Create indexes for audit log
CREATE INDEX idx_audit_log_timestamp ON audit_log(timestamp DESC);
CREATE INDEX idx_audit_log_severity ON audit_log(severity, timestamp DESC);
CREATE INDEX idx_audit_log_user ON audit_log(user_id, timestamp DESC);
-- Make audit log immutable (append-only)
CREATE RULE audit_log_no_update AS ON UPDATE TO audit_log DO INSTEAD NOTHING;
CREATE RULE audit_log_no_delete AS ON DELETE TO audit_log DO INSTEAD NOTHING;
-- Create database user for adapter
-- Note: This should be executed by database administrator
-- CREATE USER control_reader WITH PASSWORD 'secure_password';
-- GRANT CONNECT ON DATABASE calejo TO control_reader;
-- GRANT USAGE ON SCHEMA public TO control_reader;
-- GRANT SELECT ON pump_stations, pumps, pump_plans, pump_feedback, pump_safety_limits TO control_reader;
-- GRANT INSERT ON safety_limit_violations, failsafe_events, emergency_stop_events, audit_log TO control_reader;
-- GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO control_reader;

94
database/setup_database.sh Executable file
View File

@ -0,0 +1,94 @@
#!/bin/bash
# Calejo Control Adapter Database Setup Script
# This script sets up the PostgreSQL database for the Calejo Control Adapter
set -e
# Configuration
DB_NAME="calejo"
DB_USER="control_reader"
DB_PASSWORD="secure_password"
DB_HOST="localhost"
DB_PORT="5432"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
echo -e "${GREEN}Calejo Control Adapter Database Setup${NC}"
echo "=========================================="
# Check if PostgreSQL is running
if ! pg_isready -h $DB_HOST -p $DB_PORT >/dev/null 2>&1; then
echo -e "${RED}Error: PostgreSQL is not running on $DB_HOST:$DB_PORT${NC}"
echo "Please start PostgreSQL and ensure it's accessible"
exit 1
fi
# Check if database exists
if psql -h $DB_HOST -p $DB_PORT -U postgres -lqt | cut -d \| -f 1 | grep -qw "$DB_NAME"; then
echo -e "${YELLOW}Database '$DB_NAME' already exists${NC}"
read -p "Do you want to recreate it? (y/N): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
echo "Dropping existing database..."
psql -h $DB_HOST -p $DB_PORT -U postgres -c "DROP DATABASE IF EXISTS $DB_NAME;"
else
echo "Using existing database"
fi
fi
# Create database if it doesn't exist
if ! psql -h $DB_HOST -p $DB_PORT -U postgres -lqt | cut -d \| -f 1 | grep -qw "$DB_NAME"; then
echo "Creating database '$DB_NAME'..."
psql -h $DB_HOST -p $DB_PORT -U postgres -c "CREATE DATABASE $DB_NAME;"
fi
# Create schema
echo "Creating database schema..."
psql -h $DB_HOST -p $DB_PORT -U postgres -d $DB_NAME -f database/schema.sql
# Create database user (if it doesn't exist)
if ! psql -h $DB_HOST -p $DB_PORT -U postgres -t -c "\du" | grep -qw "$DB_USER"; then
echo "Creating database user '$DB_USER'..."
psql -h $DB_HOST -p $DB_PORT -U postgres -c "CREATE USER $DB_USER WITH PASSWORD '$DB_PASSWORD';"
fi
# Grant permissions
echo "Granting permissions to user '$DB_USER'..."
psql -h $DB_HOST -p $DB_PORT -U postgres -d $DB_NAME -c "
GRANT CONNECT ON DATABASE $DB_NAME TO $DB_USER;
GRANT USAGE ON SCHEMA public TO $DB_USER;
GRANT SELECT ON pump_stations, pumps, pump_plans, pump_feedback, pump_safety_limits TO $DB_USER;
GRANT INSERT ON safety_limit_violations, failsafe_events, emergency_stop_events, audit_log TO $DB_USER;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO $DB_USER;
"
# Insert test data (optional)
read -p "Do you want to insert test data? (y/N): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
echo "Inserting test data..."
psql -h $DB_HOST -p $DB_PORT -U postgres -d $DB_NAME -f database/test_data.sql
fi
# Test connection with the new user
echo "Testing connection with user '$DB_USER'..."
if psql -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME -c "SELECT 'Connection successful' AS result;" >/dev/null 2>&1; then
echo -e "${GREEN}Database setup completed successfully!${NC}"
echo ""
echo "Database Information:"
echo " Name: $DB_NAME"
echo " User: $DB_USER"
echo " Host: $DB_HOST"
echo " Port: $DB_PORT"
echo ""
echo "Connection string:"
echo " postgresql://$DB_USER:$DB_PASSWORD@$DB_HOST:$DB_PORT/$DB_NAME"
else
echo -e "${RED}Error: Failed to connect with user '$DB_USER'${NC}"
exit 1
fi

142
database/test_data.sql Normal file
View File

@ -0,0 +1,142 @@
-- Calejo Control Adapter Test Data
-- Version: 2.0
-- Date: October 26, 2025
-- Insert test pump stations
INSERT INTO pump_stations (station_id, station_name, location, latitude, longitude, timezone) VALUES
('STATION_001', 'North Wastewater Treatment Plant', 'Via Roma 123, Milano', 45.4642035, 9.189982, 'Europe/Rome'),
('STATION_002', 'South Pumping Station', 'Corso Italia 456, Milano', 45.448759, 9.163420, 'Europe/Rome'),
('STATION_003', 'Industrial Zone Station', 'Via Industria 789, Milano', 45.485649, 9.204041, 'Europe/Rome');
-- Insert test pumps
INSERT INTO pumps (
station_id, pump_id, pump_name, pump_type, control_type,
manufacturer, model, rated_power_kw, min_speed_hz, max_speed_hz,
default_setpoint_hz, control_parameters
) VALUES
-- Station 001 - Level controlled pumps
('STATION_001', 'PUMP_001', 'Main Pump 1', 'SUBMERSIBLE', 'LEVEL_CONTROLLED',
'KSB', 'Amarex N 200-400', 75.0, 20.0, 50.0, 35.0,
'{"pid_gains": {"kp": 2.5, "ki": 0.1, "kd": 0.5}, "level_setpoint": 2.5, "deadband": 0.2}'::JSONB),
('STATION_001', 'PUMP_002', 'Main Pump 2', 'SUBMERSIBLE', 'LEVEL_CONTROLLED',
'KSB', 'Amarex N 200-400', 75.0, 20.0, 50.0, 35.0,
'{"pid_gains": {"kp": 2.5, "ki": 0.1, "kd": 0.5}, "level_setpoint": 2.5, "deadband": 0.2}'::JSONB),
-- Station 002 - Power controlled pumps
('STATION_002', 'PUMP_001', 'Booster Pump 1', 'CENTRIFUGAL', 'POWER_CONTROLLED',
'Grundfos', 'CR 45-3-2', 45.0, 25.0, 45.0, 30.0,
'{"power_setpoint": 35.0, "efficiency_target": 0.85, "ramp_rate": 2.0}'::JSONB),
('STATION_002', 'PUMP_002', 'Booster Pump 2', 'CENTRIFUGAL', 'POWER_CONTROLLED',
'Grundfos', 'CR 45-3-2', 45.0, 25.0, 45.0, 30.0,
'{"power_setpoint": 35.0, "efficiency_target": 0.85, "ramp_rate": 2.0}'::JSONB),
-- Station 003 - Direct speed controlled pumps
('STATION_003', 'PUMP_001', 'Process Pump 1', 'SUBMERSIBLE', 'DIRECT_SPEED',
'Flygt', 'N-Pump 3085', 55.0, 20.0, 50.0, 40.0,
'{"speed_ramp_rate": 5.0}'::JSONB),
('STATION_003', 'PUMP_002', 'Process Pump 2', 'SUBMERSIBLE', 'DIRECT_SPEED',
'Flygt', 'N-Pump 3085', 55.0, 20.0, 50.0, 40.0,
'{"speed_ramp_rate": 5.0}'::JSONB);
-- Insert safety limits
INSERT INTO pump_safety_limits (
station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m,
hard_max_power_kw, hard_max_flow_m3h, max_speed_change_hz_per_min,
set_by, approved_by, approved_at
) VALUES
-- Station 001 pumps
('STATION_001', 'PUMP_001', 20.0, 50.0, 1.0, 4.0, 4.5, 0.8, 80.0, 400.0, 5.0,
'system_admin', 'safety_engineer', NOW()),
('STATION_001', 'PUMP_002', 20.0, 50.0, 1.0, 4.0, 4.5, 0.8, 80.0, 400.0, 5.0,
'system_admin', 'safety_engineer', NOW()),
-- Station 002 pumps
('STATION_002', 'PUMP_001', 25.0, 45.0, 0.5, 3.5, 4.0, 0.5, 50.0, 250.0, 3.0,
'system_admin', 'safety_engineer', NOW()),
('STATION_002', 'PUMP_002', 25.0, 45.0, 0.5, 3.5, 4.0, 0.5, 50.0, 250.0, 3.0,
'system_admin', 'safety_engineer', NOW()),
-- Station 003 pumps
('STATION_003', 'PUMP_001', 20.0, 50.0, 0.8, 3.8, 4.2, 0.6, 60.0, 300.0, 4.0,
'system_admin', 'safety_engineer', NOW()),
('STATION_003', 'PUMP_002', 20.0, 50.0, 0.8, 3.8, 4.2, 0.6, 60.0, 300.0, 4.0,
'system_admin', 'safety_engineer', NOW());
-- Insert sample pump plans (current and future intervals)
INSERT INTO pump_plans (
station_id, pump_id, interval_start, interval_end,
target_flow_m3h, target_power_kw, target_level_m, suggested_speed_hz,
optimization_run_id
) VALUES
-- Current plans for all pumps
('STATION_001', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 320.5, 65.2, 2.5, 42.3, 1001),
('STATION_001', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 315.8, 63.8, 2.5, 41.7, 1001),
('STATION_002', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 180.2, 32.5, 1.8, 35.2, 1001),
('STATION_002', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 175.8, 31.8, 1.8, 34.8, 1001),
('STATION_003', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 245.6, 48.3, 2.2, 38.5, 1001),
('STATION_003', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 242.1, 47.6, 2.2, 38.1, 1001),
-- Future plans
('STATION_001', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 335.2, 68.1, 2.6, 43.5, 1001),
('STATION_002', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 185.6, 33.8, 1.9, 36.2, 1001),
('STATION_003', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 252.3, 49.8, 2.3, 39.2, 1001);
-- Insert sample feedback data
INSERT INTO pump_feedback (
station_id, pump_id, timestamp,
actual_speed_hz, actual_power_kw, actual_flow_m3h, wet_well_level_m, pump_running,
alarm_active, alarm_code
) VALUES
-- Recent feedback for all pumps
('STATION_001', 'PUMP_001', NOW() - INTERVAL '2 minutes', 42.1, 64.8, 318.2, 2.48, TRUE, FALSE, NULL),
('STATION_001', 'PUMP_002', NOW() - INTERVAL '2 minutes', 41.5, 63.2, 312.5, 2.51, TRUE, FALSE, NULL),
('STATION_002', 'PUMP_001', NOW() - INTERVAL '2 minutes', 35.0, 32.1, 178.6, 1.79, TRUE, FALSE, NULL),
('STATION_002', 'PUMP_002', NOW() - INTERVAL '2 minutes', 34.6, 31.4, 173.2, 1.82, TRUE, FALSE, NULL),
('STATION_003', 'PUMP_001', NOW() - INTERVAL '2 minutes', 38.3, 47.9, 242.8, 2.21, TRUE, FALSE, NULL),
('STATION_003', 'PUMP_002', NOW() - INTERVAL '2 minutes', 37.9, 47.2, 239.5, 2.19, TRUE, FALSE, NULL),
-- Historical feedback (for testing trends)
('STATION_001', 'PUMP_001', NOW() - INTERVAL '1 hour', 40.2, 62.1, 305.8, 2.45, TRUE, FALSE, NULL),
('STATION_002', 'PUMP_001', NOW() - INTERVAL '1 hour', 33.5, 30.2, 170.1, 1.75, TRUE, FALSE, NULL),
('STATION_003', 'PUMP_001', NOW() - INTERVAL '1 hour', 37.2, 45.8, 235.6, 2.15, TRUE, FALSE, NULL);
-- Insert sample audit log entries
INSERT INTO audit_log (
event_type, severity, station_id, pump_id, user_id, ip_address,
protocol, action, resource, result, event_data
) VALUES
('SYSTEM_STARTUP', 'INFO', NULL, NULL, 'system', '127.0.0.1',
'INTERNAL', 'startup', 'system', 'SUCCESS',
'{"version": "2.0.0", "components_initialized": ["database", "safety", "protocols"]}'::JSONB),
('SAFETY_LIMITS_LOADED', 'INFO', NULL, NULL, 'system', '127.0.0.1',
'INTERNAL', 'load_safety_limits', 'safety_framework', 'SUCCESS',
'{"pump_count": 6, "limits_loaded": 6}'::JSONB),
('AUTO_DISCOVERY', 'INFO', NULL, NULL, 'system', '127.0.0.1',
'INTERNAL', 'discover_pumps', 'auto_discovery', 'SUCCESS',
'{"stations_discovered": 3, "pumps_discovered": 6}'::JSONB),
('SETPOINT_UPDATE', 'INFO', 'STATION_001', 'PUMP_001', 'optimization_engine', '192.168.1.100',
'REST_API', 'update_setpoint', 'pump_control', 'SUCCESS',
'{"requested_setpoint": 42.3, "enforced_setpoint": 42.3, "violations": []}'::JSONB);
-- Verify data insertion
SELECT 'Pump Stations:' as "Verification", COUNT(*) as count FROM pump_stations
UNION ALL
SELECT 'Pumps:', COUNT(*) FROM pumps
UNION ALL
SELECT 'Safety Limits:', COUNT(*) FROM pump_safety_limits
UNION ALL
SELECT 'Pump Plans:', COUNT(*) FROM pump_plans
UNION ALL
SELECT 'Feedback Records:', COUNT(*) FROM pump_feedback
UNION ALL
SELECT 'Audit Log Entries:', COUNT(*) FROM audit_log;

View File

@ -4,8 +4,10 @@ Auto-Discovery Module for Calejo Control Adapter.
Automatically discovers pump stations and pumps from database on startup.
"""
from typing import Dict, List
from typing import Dict, List, Optional
import asyncio
import structlog
from datetime import datetime, timedelta
from src.database.client import DatabaseClient
@ -15,14 +17,28 @@ logger = structlog.get_logger()
class AutoDiscovery:
"""Auto-discovery module for pump stations and pumps."""
def __init__(self, db_client: DatabaseClient):
def __init__(self, db_client: DatabaseClient, refresh_interval_minutes: int = 60):
self.db_client = db_client
self.refresh_interval_minutes = refresh_interval_minutes
self.pump_stations: Dict[str, Dict] = {}
self.pumps: Dict[str, List[Dict]] = {}
self.last_discovery: Optional[datetime] = None
self.discovery_running = False
async def discover(self):
"""Discover all pump stations and pumps from database."""
if self.discovery_running:
logger.warning("auto_discovery_already_running")
return
self.discovery_running = True
try:
logger.info("auto_discovery_started")
# Clear previous discovery
self.pump_stations.clear()
self.pumps.clear()
# Discover pump stations
stations = self.db_client.get_pump_stations()
for station in stations:
@ -36,10 +52,13 @@ class AutoDiscovery:
self.pumps[station_id] = []
self.pumps[station_id].append(pump)
self.last_discovery = datetime.now()
logger.info(
"auto_discovery_completed",
station_count=len(self.pump_stations),
pump_count=len(pumps)
pump_count=len(pumps),
last_discovery=self.last_discovery.isoformat()
)
# Log discovered stations and pumps
@ -58,31 +77,99 @@ class AutoDiscovery:
station_id=station_id,
pump_id=pump['pump_id'],
pump_name=pump['pump_name'],
control_type=pump['control_type']
control_type=pump['control_type'],
default_setpoint=pump['default_setpoint_hz']
)
except Exception as e:
logger.error("auto_discovery_failed", error=str(e))
raise
finally:
self.discovery_running = False
async def start_periodic_discovery(self):
"""Start periodic auto-discovery in background."""
logger.info(
"periodic_discovery_started",
refresh_interval_minutes=self.refresh_interval_minutes
)
while True:
await asyncio.sleep(self.refresh_interval_minutes * 60) # Convert to seconds
try:
await self.discover()
except Exception as e:
logger.error("periodic_discovery_failed", error=str(e))
def get_stations(self) -> Dict[str, Dict]:
"""Get all discovered pump stations."""
return self.pump_stations
return self.pump_stations.copy()
def get_pumps(self, station_id: str = None) -> List[Dict]:
def get_pumps(self, station_id: Optional[str] = None) -> List[Dict]:
"""Get all discovered pumps, optionally filtered by station."""
if station_id:
return self.pumps.get(station_id, [])
return self.pumps.get(station_id, []).copy()
else:
all_pumps = []
for station_pumps in self.pumps.values():
all_pumps.extend(station_pumps)
return all_pumps
def get_pump(self, station_id: str, pump_id: str) -> Dict:
def get_pump(self, station_id: str, pump_id: str) -> Optional[Dict]:
"""Get a specific pump by station and pump ID."""
station_pumps = self.pumps.get(station_id, [])
for pump in station_pumps:
if pump['pump_id'] == pump_id:
return pump
return pump.copy()
return None
def get_station(self, station_id: str) -> Optional[Dict]:
"""Get a specific station by ID."""
return self.pump_stations.get(station_id)
def get_discovery_status(self) -> Dict[str, any]:
"""Get auto-discovery status information."""
return {
"last_discovery": self.last_discovery.isoformat() if self.last_discovery else None,
"station_count": len(self.pump_stations),
"pump_count": sum(len(pumps) for pumps in self.pumps.values()),
"refresh_interval_minutes": self.refresh_interval_minutes,
"discovery_running": self.discovery_running
}
def is_stale(self, max_age_minutes: int = 120) -> bool:
"""Check if discovery data is stale."""
if not self.last_discovery:
return True
age = datetime.now() - self.last_discovery
return age > timedelta(minutes=max_age_minutes)
def validate_discovery(self) -> Dict[str, any]:
"""Validate discovered data for consistency."""
issues = []
# Check if all pumps have corresponding stations
for station_id, pumps in self.pumps.items():
if station_id not in self.pump_stations:
issues.append(f"Pumps found for unknown station: {station_id}")
# Check if all stations have pumps
for station_id in self.pump_stations:
if station_id not in self.pumps or not self.pumps[station_id]:
issues.append(f"No pumps found for station: {station_id}")
# Check pump data completeness
for station_id, pumps in self.pumps.items():
for pump in pumps:
if not pump.get('control_type'):
issues.append(f"Pump {station_id}/{pump['pump_id']} missing control_type")
if not pump.get('default_setpoint_hz'):
issues.append(f"Pump {station_id}/{pump['pump_id']} missing default_setpoint_hz")
return {
"valid": len(issues) == 0,
"issues": issues,
"station_count": len(self.pump_stations),
"pump_count": sum(len(pumps) for pumps in self.pumps.values())
}

136
src/core/logging.py Normal file
View File

@ -0,0 +1,136 @@
"""
Structured logging setup for Calejo Control Adapter.
"""
import structlog
import logging
import sys
import uuid
from datetime import datetime
from typing import Dict, Any, Optional
from config.settings import settings
def setup_logging():
"""Setup structured logging with JSON formatting."""
# Configure standard logging
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=getattr(logging, settings.log_level)
)
# Configure structlog
processors = [
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
add_correlation_id,
add_application_info,
]
if settings.log_format == "json":
processors.append(structlog.processors.JSONRenderer())
else:
processors.append(structlog.dev.ConsoleRenderer())
structlog.configure(
processors=processors,
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
logger.info(
"logging_configured",
log_level=settings.log_level,
log_format=settings.log_format,
environment=settings.environment
)
return logger
def add_correlation_id(_, __, event_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Add correlation ID to log entries for request tracing."""
if 'correlation_id' not in event_dict:
event_dict['correlation_id'] = str(uuid.uuid4())
return event_dict
def add_application_info(_, __, event_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Add application information to log entries."""
event_dict['app_name'] = settings.app_name
event_dict['app_version'] = settings.app_version
event_dict['environment'] = settings.environment
return event_dict
class AuditLogger:
"""Audit logger for compliance requirements."""
def __init__(self, db_client):
self.db_client = db_client
self.logger = structlog.get_logger(__name__)
def log(
self,
event_type: str,
severity: str,
station_id: Optional[str] = None,
pump_id: Optional[str] = None,
user_id: Optional[str] = None,
ip_address: Optional[str] = None,
protocol: Optional[str] = None,
action: Optional[str] = None,
resource: Optional[str] = None,
result: Optional[str] = None,
event_data: Optional[Dict[str, Any]] = None
):
"""Log an audit event to both structured logs and database."""
# Log to structured logs
self.logger.info(
"audit_event",
event_type=event_type,
severity=severity,
station_id=station_id,
pump_id=pump_id,
user_id=user_id,
ip_address=ip_address,
protocol=protocol,
action=action,
resource=resource,
result=result,
event_data=event_data
)
# Log to database if audit logging is enabled
if settings.audit_log_enabled:
try:
query = """
INSERT INTO audit_log
(event_type, severity, station_id, pump_id, user_id, ip_address,
protocol, action, resource, result, event_data)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
self.db_client.execute(
query,
(
event_type, severity, station_id, pump_id, user_id, ip_address,
protocol, action, resource, result, event_data
)
)
except Exception as e:
self.logger.error("audit_log_database_failed", error=str(e))
# Global logger instance
logger = setup_logging()

View File

@ -5,46 +5,86 @@ Database client for Calejo Control Adapter.
import asyncio
import psycopg2
import psycopg2.extras
from typing import List, Dict, Any, Optional
import psycopg2.pool
from typing import List, Dict, Any, Optional, Tuple
import structlog
from contextlib import contextmanager
logger = structlog.get_logger()
class DatabaseClient:
"""Database client for PostgreSQL operations."""
"""Database client for PostgreSQL operations with connection pooling."""
def __init__(self, database_url: str):
def __init__(self, database_url: str, min_connections: int = 2, max_connections: int = 10):
self.database_url = database_url
self.connection = None
self.cursor = None
self.min_connections = min_connections
self.max_connections = max_connections
self.connection_pool = None
async def connect(self):
"""Connect to the PostgreSQL database."""
"""Connect to the PostgreSQL database and initialize connection pool."""
try:
self.connection = psycopg2.connect(
self.database_url,
# Create connection pool
self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=self.min_connections,
maxconn=self.max_connections,
dsn=self.database_url,
cursor_factory=psycopg2.extras.RealDictCursor
)
self.cursor = self.connection.cursor()
logger.info("database_connected")
# Test connection
with self._get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version();")
version = cursor.fetchone()
logger.info("database_connected", version=version['version'])
except Exception as e:
logger.error("database_connection_failed", error=str(e))
raise
async def disconnect(self):
"""Disconnect from the database."""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
logger.info("database_disconnected")
"""Disconnect from the database and close connection pool."""
if self.connection_pool:
self.connection_pool.closeall()
logger.info("database_disconnected")
@contextmanager
def _get_connection(self):
"""Get a connection from the pool with context management."""
if not self.connection_pool:
raise RuntimeError("Database connection pool not initialized")
conn = self.connection_pool.getconn()
try:
yield conn
except Exception as e:
conn.rollback()
raise
finally:
self.connection_pool.putconn(conn)
@contextmanager
def _get_cursor(self):
"""Get a cursor from a connection with context management."""
with self._get_connection() as conn:
cursor = conn.cursor()
try:
yield cursor
conn.commit()
except Exception as e:
conn.rollback()
raise
finally:
cursor.close()
def execute_query(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
"""Execute a SELECT query and return results."""
try:
self.cursor.execute(query, params)
return self.cursor.fetchall()
with self._get_cursor() as cursor:
cursor.execute(query, params)
return cursor.fetchall()
except Exception as e:
logger.error("query_execution_failed", query=query, error=str(e))
raise
@ -52,10 +92,9 @@ class DatabaseClient:
def execute(self, query: str, params: tuple = None) -> None:
"""Execute an INSERT/UPDATE/DELETE query."""
try:
self.cursor.execute(query, params)
self.connection.commit()
with self._get_cursor() as cursor:
cursor.execute(query, params)
except Exception as e:
self.connection.rollback()
logger.error("query_execution_failed", query=query, error=str(e))
raise
@ -126,3 +165,25 @@ class DatabaseClient:
LIMIT %s
"""
return self.execute_query(query, (station_id, pump_id, limit))
def health_check(self) -> bool:
"""Check if database is healthy and responsive."""
try:
with self._get_cursor() as cursor:
cursor.execute("SELECT 1 as health_check;")
result = cursor.fetchone()
return result['health_check'] == 1
except Exception as e:
logger.error("database_health_check_failed", error=str(e))
return False
def get_connection_stats(self) -> Dict[str, Any]:
"""Get connection pool statistics."""
if not self.connection_pool:
return {"status": "pool_not_initialized"}
return {
"min_connections": self.min_connections,
"max_connections": self.max_connections,
"pool_status": "active"
}

207
src/main_phase1.py Normal file
View File

@ -0,0 +1,207 @@
#!/usr/bin/env python3
"""
Calejo Control Adapter - Phase 1 Implementation
Core infrastructure implementation including:
- Database setup and connection pooling
- Auto-discovery of pump stations and pumps
- Configuration management
- Structured logging and audit system
"""
import asyncio
import signal
import sys
from src.database.client import DatabaseClient
from src.core.auto_discovery import AutoDiscovery
from src.core.safety import SafetyLimitEnforcer
from src.core.logging import setup_logging, AuditLogger
from config.settings import settings
class CalejoControlAdapterPhase1:
"""Main application class for Phase 1 implementation."""
def __init__(self):
# Setup logging first
self.logger = setup_logging()
# Initialize core components
self.db_client = DatabaseClient(
database_url=settings.database_url,
min_connections=settings.db_min_connections,
max_connections=settings.db_max_connections
)
self.auto_discovery = AutoDiscovery(
db_client=self.db_client,
refresh_interval_minutes=settings.auto_discovery_refresh_minutes
)
self.safety_enforcer = SafetyLimitEnforcer(self.db_client)
self.audit_logger = AuditLogger(self.db_client)
self.running = False
self.startup_time = None
async def start(self):
"""Start the Phase 1 application."""
self.startup_time = asyncio.get_event_loop().time()
self.logger.info(
"phase1_application_starting",
app_name=settings.app_name,
version=settings.app_version,
environment=settings.environment
)
try:
# Connect to database
await self.db_client.connect()
# Initialize auto-discovery
if settings.auto_discovery_enabled:
await self.auto_discovery.discover()
# Validate discovery
validation = self.auto_discovery.validate_discovery()
if not validation['valid']:
self.logger.warning(
"auto_discovery_validation_issues",
issues=validation['issues']
)
# Start periodic discovery
asyncio.create_task(self.auto_discovery.start_periodic_discovery())
# Initialize safety framework
await self.safety_enforcer.load_safety_limits()
# Log startup to audit trail
self.audit_logger.log(
event_type='SYSTEM_STARTUP',
severity='INFO',
user_id='system',
action='startup',
resource='system',
result='SUCCESS',
event_data={
'version': settings.app_version,
'phase': '1',
'components_initialized': ['database', 'auto_discovery', 'safety']
}
)
self.running = True
startup_duration = asyncio.get_event_loop().time() - self.startup_time
self.logger.info(
"phase1_application_started",
startup_duration_seconds=round(startup_duration, 2),
station_count=len(self.auto_discovery.get_stations()),
pump_count=len(self.auto_discovery.get_pumps()),
safety_limits_loaded=len(self.safety_enforcer.safety_limits_cache)
)
# Print status information
self.print_status()
# Keep application running
while self.running:
await asyncio.sleep(1)
except Exception as e:
self.logger.error("phase1_application_startup_failed", error=str(e))
await self.stop()
raise
async def stop(self):
"""Stop the application."""
self.logger.info("phase1_application_stopping")
self.running = False
# Log shutdown to audit trail
self.audit_logger.log(
event_type='SYSTEM_SHUTDOWN',
severity='INFO',
user_id='system',
action='shutdown',
resource='system',
result='SUCCESS'
)
# Disconnect from database
await self.db_client.disconnect()
self.logger.info("phase1_application_stopped")
def get_status(self) -> dict:
"""Get application status information."""
return {
"running": self.running,
"app_name": settings.app_name,
"version": settings.app_version,
"environment": settings.environment,
"database": self.db_client.get_connection_stats(),
"auto_discovery": self.auto_discovery.get_discovery_status(),
"safety_limits_loaded": len(self.safety_enforcer.safety_limits_cache)
}
def print_status(self):
"""Print human-readable status information."""
status = self.get_status()
print("\n" + "="*60)
print(f"Calejo Control Adapter - Phase 1 Status")
print("="*60)
print(f"Application: {status['app_name']} v{status['version']}")
print(f"Environment: {status['environment']}")
print(f"Status: {'RUNNING' if status['running'] else 'STOPPED'}")
print()
# Database status
db_stats = status['database']
print(f"Database:")
print(f" Status: {db_stats.get('pool_status', 'N/A')}")
print(f" Connections: {db_stats.get('min_connections', 'N/A')}-{db_stats.get('max_connections', 'N/A')}")
# Auto-discovery status
discovery_status = status['auto_discovery']
print(f"\nAuto-Discovery:")
print(f" Stations: {discovery_status['station_count']}")
print(f" Pumps: {discovery_status['pump_count']}")
print(f" Last Discovery: {discovery_status['last_discovery'] or 'Never'}")
# Safety framework status
print(f"\nSafety Framework:")
print(f" Limits Loaded: {status['safety_limits_loaded']}")
print("="*60)
print("\nPress Ctrl+C to stop the application\n")
async def main():
"""Main application entry point for Phase 1."""
app = CalejoControlAdapterPhase1()
# Setup signal handlers for graceful shutdown
def signal_handler(signum, frame):
logger = app.logger
logger.info("signal_received", signal=signum)
asyncio.create_task(app.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
await app.start()
except KeyboardInterrupt:
await app.stop()
except Exception as e:
logger = app.logger
logger.error("phase1_application_failed", error=str(e))
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())

261
tests/test_phase1.py Executable file
View File

@ -0,0 +1,261 @@
#!/usr/bin/env python3
"""
Test script for Phase 1 implementation.
Tests core infrastructure components:
- Database connectivity and queries
- Auto-discovery functionality
- Configuration management
- Safety framework loading
"""
import asyncio
import sys
import os
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.database.client import DatabaseClient
from src.core.auto_discovery import AutoDiscovery
from src.core.safety import SafetyLimitEnforcer
from src.core.logging import setup_logging
from config.settings import settings
class Phase1Tester:
"""Test class for Phase 1 components."""
def __init__(self):
self.logger = setup_logging()
self.db_client = DatabaseClient(
database_url=settings.database_url,
min_connections=settings.db_min_connections,
max_connections=settings.db_max_connections
)
self.auto_discovery = AutoDiscovery(self.db_client)
self.safety_enforcer = SafetyLimitEnforcer(self.db_client)
self.tests_passed = 0
self.tests_failed = 0
async def run_all_tests(self):
"""Run all Phase 1 tests."""
self.logger.info("starting_phase1_tests")
print("\n" + "="*60)
print("Calejo Control Adapter - Phase 1 Test Suite")
print("="*60)
try:
# Test 1: Database connection
await self.test_database_connection()
# Test 2: Database queries
await self.test_database_queries()
# Test 3: Auto-discovery
await self.test_auto_discovery()
# Test 4: Safety framework
await self.test_safety_framework()
# Test 5: Configuration
await self.test_configuration()
# Print summary
self.print_test_summary()
except Exception as e:
self.logger.error("test_suite_failed", error=str(e))
raise
finally:
await self.db_client.disconnect()
async def test_database_connection(self):
"""Test database connectivity."""
print("\n1. Testing Database Connection...")
try:
await self.db_client.connect()
# Test health check
is_healthy = self.db_client.health_check()
if is_healthy:
print(" ✓ Database connection successful")
self.tests_passed += 1
else:
print(" ✗ Database health check failed")
self.tests_failed += 1
except Exception as e:
print(f" ✗ Database connection failed: {e}")
self.tests_failed += 1
raise
async def test_database_queries(self):
"""Test database queries."""
print("\n2. Testing Database Queries...")
try:
# Test pump stations query
stations = self.db_client.get_pump_stations()
print(f" ✓ Found {len(stations)} pump stations")
# Test pumps query
pumps = self.db_client.get_pumps()
print(f" ✓ Found {len(pumps)} pumps")
# Test safety limits query
safety_limits = self.db_client.get_safety_limits()
print(f" ✓ Found {len(safety_limits)} safety limits")
# Test pump plans query
pump_plans = self.db_client.get_latest_pump_plans()
print(f" ✓ Found {len(pump_plans)} active pump plans")
self.tests_passed += 1
except Exception as e:
print(f" ✗ Database queries failed: {e}")
self.tests_failed += 1
raise
async def test_auto_discovery(self):
"""Test auto-discovery functionality."""
print("\n3. Testing Auto-Discovery...")
try:
await self.auto_discovery.discover()
stations = self.auto_discovery.get_stations()
pumps = self.auto_discovery.get_pumps()
print(f" ✓ Discovered {len(stations)} stations")
print(f" ✓ Discovered {len(pumps)} pumps")
# Test individual station/pump retrieval
if stations:
station_id = list(stations.keys())[0]
station = self.auto_discovery.get_station(station_id)
if station:
print(f" ✓ Station retrieval successful: {station['station_name']}")
station_pumps = self.auto_discovery.get_pumps(station_id)
if station_pumps:
pump_id = station_pumps[0]['pump_id']
pump = self.auto_discovery.get_pump(station_id, pump_id)
if pump:
print(f" ✓ Pump retrieval successful: {pump['pump_name']}")
# Test validation
validation = self.auto_discovery.validate_discovery()
if validation['valid']:
print(" ✓ Auto-discovery validation passed")
else:
print(f" ⚠ Auto-discovery validation issues: {validation['issues']}")
self.tests_passed += 1
except Exception as e:
print(f" ✗ Auto-discovery failed: {e}")
self.tests_failed += 1
raise
async def test_safety_framework(self):
"""Test safety framework loading."""
print("\n4. Testing Safety Framework...")
try:
await self.safety_enforcer.load_safety_limits()
limits_count = len(self.safety_enforcer.safety_limits_cache)
print(f" ✓ Loaded {limits_count} safety limits")
# Test setpoint enforcement
if limits_count > 0:
# Get first pump with safety limits
pumps = self.auto_discovery.get_pumps()
if pumps:
pump = pumps[0]
station_id = pump['station_id']
pump_id = pump['pump_id']
# Test within limits
enforced, violations = self.safety_enforcer.enforce_setpoint(
station_id, pump_id, 35.0
)
if enforced == 35.0 and not violations:
print(" ✓ Setpoint enforcement within limits successful")
# Test below minimum
enforced, violations = self.safety_enforcer.enforce_setpoint(
station_id, pump_id, 10.0
)
if enforced > 10.0 and violations:
print(" ✓ Setpoint enforcement below minimum successful")
self.tests_passed += 1
except Exception as e:
print(f" ✗ Safety framework failed: {e}")
self.tests_failed += 1
raise
async def test_configuration(self):
"""Test configuration management."""
print("\n5. Testing Configuration Management...")
try:
# Test database URL generation
db_url = settings.database_url
if db_url:
print(" ✓ Database URL generation successful")
# Test safe settings dict
safe_settings = settings.get_safe_dict()
if 'db_password' in safe_settings and safe_settings['db_password'] == '***MASKED***':
print(" ✓ Sensitive field masking successful")
# Test configuration validation
print(f" ✓ Configuration loaded: {settings.app_name} v{settings.app_version}")
print(f" ✓ Environment: {settings.environment}")
self.tests_passed += 1
except Exception as e:
print(f" ✗ Configuration test failed: {e}")
self.tests_failed += 1
raise
def print_test_summary(self):
"""Print test summary."""
print("\n" + "="*60)
print("TEST SUMMARY")
print("="*60)
print(f"Tests Passed: {self.tests_passed}")
print(f"Tests Failed: {self.tests_failed}")
total_tests = self.tests_passed + self.tests_failed
if total_tests > 0:
success_rate = (self.tests_passed / total_tests) * 100
print(f"Success Rate: {success_rate:.1f}%")
if self.tests_failed == 0:
print("\n🎉 All Phase 1 tests passed!")
print("Phase 1 implementation is ready for development.")
else:
print(f"\n{self.tests_failed} test(s) failed.")
print("Please review the failed tests before proceeding.")
print("="*60)
async def main():
"""Run Phase 1 tests."""
tester = Phase1Tester()
await tester.run_all_tests()
if __name__ == "__main__":
asyncio.run(main())