From bfb52a5c45eb5bbe5697d19abd95a4321842b7fa Mon Sep 17 00:00:00 2001 From: openhands Date: Tue, 28 Oct 2025 17:25:00 +0000 Subject: [PATCH] Phase 6 Integration Testing: Complete safety workflow and optimization-to-SCADA integration tests - Fixed critical safety limit loading bug: get_safety_limits() now queries pump_safety_limits table - Fixed emergency stop logic: setpoint manager returns 0.0 during emergency stop - Added comprehensive test data with proper column mappings - All 5 safety workflow tests now passing - 5/6 optimization-to-SCADA integration tests passing - Created failsafe operation test suite (requires DatabaseWatchdog API updates) Key fixes: - Safety limit enforcement now works correctly - Emergency stop properly shuts down pumps (0.0 setpoint) - Dynamic safety limit updates are reflected in real-time - Test data includes all required columns for setpoint calculation Remaining issues: - REST API test failing (no server running on port 8000) - Failsafe tests require DatabaseWatchdog public API methods --- src/core/setpoint_manager.py | 4 +- src/database/flexible_client.py | 9 +- tests/integration/test_failsafe_operations.py | 291 ++++++++++++++ .../integration/test_optimization_to_scada.py | 374 ++++++++++++++++++ tests/integration/test_safety_workflows.py | 261 ++++++++++++ 5 files changed, 932 insertions(+), 7 deletions(-) create mode 100644 tests/integration/test_failsafe_operations.py create mode 100644 tests/integration/test_optimization_to_scada.py create mode 100644 tests/integration/test_safety_workflows.py diff --git a/src/core/setpoint_manager.py b/src/core/setpoint_manager.py index 37e0d40..02b81a5 100644 --- a/src/core/setpoint_manager.py +++ b/src/core/setpoint_manager.py @@ -163,7 +163,7 @@ class SetpointManager: station_id=station_id, pump_id=pump_id ) - return self._get_default_setpoint(station_id, pump_id) + return 0.0 # Complete stop during emergency # Check failsafe mode if self.watchdog.is_failsafe_active(station_id, pump_id): @@ -199,7 +199,7 @@ class SetpointManager: setpoint = calculator.calculate_setpoint(plan, feedback, pump_info) # Enforce safety limits (LAST LINE OF DEFENSE) - safe_setpoint = self.safety_enforcer.enforce_limits( + safe_setpoint, violations = self.safety_enforcer.enforce_setpoint( station_id, pump_id, setpoint ) diff --git a/src/database/flexible_client.py b/src/database/flexible_client.py index 56e2509..62fade6 100644 --- a/src/database/flexible_client.py +++ b/src/database/flexible_client.py @@ -416,11 +416,10 @@ class FlexibleDatabaseClient: def get_safety_limits(self) -> List[Dict[str, Any]]: """Get safety limits for all pumps.""" query = """ - SELECT station_id, pump_id, min_speed_hz as hard_min_speed_hz, - max_speed_hz as hard_max_speed_hz, - NULL as hard_min_level_m, NULL as hard_max_level_m, - NULL as hard_max_power_kw, 5.0 as max_speed_change_hz_per_min - FROM pumps + SELECT station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz, + hard_min_level_m, hard_max_level_m, hard_max_power_kw, + max_speed_change_hz_per_min + FROM pump_safety_limits """ return self.execute_query(query) diff --git a/tests/integration/test_failsafe_operations.py b/tests/integration/test_failsafe_operations.py new file mode 100644 index 0000000..9761bdc --- /dev/null +++ b/tests/integration/test_failsafe_operations.py @@ -0,0 +1,291 @@ +""" +Integration tests for failsafe mode operations. + +Tests failsafe mode activation, operation, and recovery scenarios. +""" + +import pytest +import pytest_asyncio +from unittest.mock import Mock, AsyncMock +import asyncio + +from src.database.flexible_client import FlexibleDatabaseClient +from src.core.auto_discovery import AutoDiscovery +from src.core.safety import SafetyLimitEnforcer +from src.core.emergency_stop import EmergencyStopManager +from src.core.setpoint_manager import SetpointManager +from src.core.security import SecurityManager +from src.core.compliance_audit import ComplianceAuditLogger +from src.monitoring.watchdog import DatabaseWatchdog + + +class TestFailsafeOperations: + """Test failsafe mode operations and scenarios.""" + + @pytest_asyncio.fixture + async def db_client(self): + """Create database client with failsafe test data.""" + client = FlexibleDatabaseClient( + database_url="sqlite:///:memory:", + pool_size=5, + max_overflow=10, + pool_timeout=30 + ) + + await client.connect() + client.create_tables() + + # Insert failsafe test data + self._insert_failsafe_test_data(client) + return client + + def _insert_failsafe_test_data(self, db_client): + """Insert comprehensive failsafe test data.""" + # Insert stations + db_client.execute( + """INSERT INTO pump_stations (station_id, station_name, location) VALUES + ('FAILSAFE_STATION_001', 'Failsafe Test Station 1', 'Test Location A'), + ('FAILSAFE_STATION_002', 'Failsafe Test Station 2', 'Test Location B')""" + ) + + # Insert pumps with different failsafe configurations + db_client.execute( + """INSERT INTO pumps (station_id, pump_id, pump_name, control_type, + min_speed_hz, max_speed_hz, default_setpoint_hz) VALUES + ('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_001', 'Failsafe Pump 1', 'DIRECT_SPEED', 20.0, 60.0, 35.0), + ('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_002', 'Failsafe Pump 2', 'LEVEL_CONTROLLED', 25.0, 55.0, 40.0), + ('FAILSAFE_STATION_002', 'FAILSAFE_PUMP_003', 'Failsafe Pump 3', 'POWER_CONTROLLED', 30.0, 50.0, 38.0)""" + ) + + # Insert safety limits + db_client.execute( + """INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz, + hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m, + hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds, + max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES + ('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'), + ('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'), + ('FAILSAFE_STATION_002', 'FAILSAFE_PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin')""" + ) + + # Insert optimization plans + import datetime + now = datetime.datetime.now() + db_client.execute( + f"""INSERT INTO pump_plans (station_id, pump_id, interval_start, interval_end, + suggested_speed_hz, target_level_m, target_power_kw, plan_status) VALUES + ('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_001', '{now}', '{now + datetime.timedelta(hours=1)}', 42.5, 3.0, 12.0, 'ACTIVE'), + ('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_002', '{now}', '{now + datetime.timedelta(hours=1)}', 38.0, 2.5, 10.0, 'ACTIVE'), + ('FAILSAFE_STATION_002', 'FAILSAFE_PUMP_003', '{now}', '{now + datetime.timedelta(hours=1)}', 36.0, 3.5, 8.0, 'ACTIVE')""" + ) + + @pytest_asyncio.fixture + async def failsafe_components(self, db_client): + """Create failsafe-related components for testing.""" + # Create auto discovery + auto_discovery = AutoDiscovery(db_client) + await auto_discovery.discover() + + # Create safety components + safety_enforcer = SafetyLimitEnforcer(db_client) + emergency_stop_manager = EmergencyStopManager(db_client) + + # Load safety limits + await safety_enforcer.load_safety_limits() + + # Create mock alert manager for watchdog + mock_alert_manager = Mock() + watchdog = DatabaseWatchdog(db_client, mock_alert_manager) + + # Create setpoint manager + setpoint_manager = SetpointManager( + discovery=auto_discovery, + db_client=db_client, + safety_enforcer=safety_enforcer, + emergency_stop_manager=emergency_stop_manager, + watchdog=watchdog + ) + + # Create security components + security_manager = SecurityManager() + audit_logger = ComplianceAuditLogger(db_client) + + return { + 'auto_discovery': auto_discovery, + 'safety_enforcer': safety_enforcer, + 'emergency_stop_manager': emergency_stop_manager, + 'setpoint_manager': setpoint_manager, + 'security_manager': security_manager, + 'audit_logger': audit_logger, + 'watchdog': watchdog + } + + @pytest.mark.asyncio + async def test_failsafe_mode_activation(self, failsafe_components): + """Test failsafe mode activation for individual pumps.""" + setpoint_manager = failsafe_components['setpoint_manager'] + watchdog = failsafe_components['watchdog'] + + station_id = 'FAILSAFE_STATION_001' + pump_id = 'FAILSAFE_PUMP_001' + + # Get normal setpoint + normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert normal_setpoint is not None + + # Activate failsafe mode + watchdog.activate_failsafe_mode(station_id, pump_id, "communication_loss") + + # Verify setpoint switches to failsafe mode + failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + + # In failsafe mode, should use default setpoint (35.0) as fallback + # since we don't have a specific failsafe_setpoint_hz in the database + assert failsafe_setpoint == 35.0, f"Setpoint not in failsafe mode: {failsafe_setpoint}" + + @pytest.mark.asyncio + async def test_failsafe_mode_recovery(self, failsafe_components): + """Test failsafe mode recovery and return to normal operation.""" + setpoint_manager = failsafe_components['setpoint_manager'] + watchdog = failsafe_components['watchdog'] + + station_id = 'FAILSAFE_STATION_001' + pump_id = 'FAILSAFE_PUMP_002' + + # Get normal setpoint + normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + + # Activate failsafe mode + watchdog.activate_failsafe_mode(station_id, pump_id, "sensor_failure") + + # Verify failsafe mode + failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert failsafe_setpoint == 40.0, f"Setpoint not in failsafe mode: {failsafe_setpoint}" + + # Clear failsafe mode + watchdog.clear_failsafe_mode(station_id, pump_id) + + # Verify return to normal operation + recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert recovered_setpoint == normal_setpoint, "Setpoint did not recover after failsafe clearance" + + @pytest.mark.asyncio + async def test_failsafe_mode_station_wide(self, failsafe_components): + """Test station-wide failsafe mode activation.""" + setpoint_manager = failsafe_components['setpoint_manager'] + watchdog = failsafe_components['watchdog'] + + station_id = 'FAILSAFE_STATION_001' + + # Activate station-wide failsafe mode + watchdog.activate_failsafe_mode_station(station_id, "station_communication_loss") + + # Verify all pumps in station are in failsafe mode + pumps = setpoint_manager.discovery.get_pumps(station_id) + for pump in pumps: + pump_id = pump['pump_id'] + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + + # Should use default setpoint as failsafe + default_setpoint = pump.get('default_setpoint_hz', 35.0) + assert setpoint == default_setpoint, \ + f"Pump {pump_id} not in failsafe mode: {setpoint} != {default_setpoint}" + + # Verify pumps in other stations are unaffected + other_station_id = 'FAILSAFE_STATION_002' + other_pumps = setpoint_manager.discovery.get_pumps(other_station_id) + for pump in other_pumps: + pump_id = pump['pump_id'] + setpoint = setpoint_manager.get_current_setpoint(other_station_id, pump_id) + assert setpoint != pump.get('default_setpoint_hz', 35.0), \ + f"Pump {pump_id} incorrectly in failsafe mode" + + # Clear station-wide failsafe mode + watchdog.clear_failsafe_mode_station(station_id) + + # Verify pumps return to normal operation + for pump in pumps: + pump_id = pump['pump_id'] + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert setpoint != pump.get('default_setpoint_hz', 35.0), \ + f"Pump {pump_id} not recovered from failsafe mode" + + @pytest.mark.asyncio + async def test_failsafe_priority_over_optimization(self, failsafe_components): + """Test that failsafe mode takes priority over optimization plans.""" + setpoint_manager = failsafe_components['setpoint_manager'] + watchdog = failsafe_components['watchdog'] + + station_id = 'FAILSAFE_STATION_002' + pump_id = 'FAILSAFE_PUMP_003' + + # Get optimization-based setpoint + optimization_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert optimization_setpoint is not None + + # Activate failsafe mode + watchdog.activate_failsafe_mode(station_id, pump_id, "optimizer_failure") + + # Verify failsafe mode overrides optimization + failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert failsafe_setpoint == 38.0, f"Failsafe mode not overriding optimization: {failsafe_setpoint}" + assert failsafe_setpoint != optimization_setpoint, "Failsafe mode should differ from optimization" + + @pytest.mark.asyncio + async def test_emergency_stop_priority_over_failsafe(self, failsafe_components): + """Test that emergency stop takes priority over failsafe mode.""" + setpoint_manager = failsafe_components['setpoint_manager'] + emergency_stop_manager = failsafe_components['emergency_stop_manager'] + watchdog = failsafe_components['watchdog'] + + station_id = 'FAILSAFE_STATION_001' + pump_id = 'FAILSAFE_PUMP_001' + + # Activate failsafe mode + watchdog.activate_failsafe_mode(station_id, pump_id, "test_priority") + + # Verify failsafe mode is active + failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert failsafe_setpoint == 35.0, "Failsafe mode not active" + + # Activate emergency stop + emergency_stop_manager.emergency_stop_pump(station_id, pump_id, "emergency_override_test") + + # Verify emergency stop overrides failsafe mode + emergency_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert emergency_setpoint == 0.0, "Emergency stop not overriding failsafe mode" + + # Clear emergency stop + emergency_stop_manager.clear_emergency_stop_pump(station_id, pump_id, "clearance_test") + + # Verify failsafe mode is restored + restored_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert restored_setpoint == 35.0, "Failsafe mode not restored after emergency stop clearance" + + @pytest.mark.asyncio + async def test_failsafe_mode_audit_logging(self, failsafe_components, db_client): + """Test that failsafe mode activations are properly logged.""" + setpoint_manager = failsafe_components['setpoint_manager'] + watchdog = failsafe_components['watchdog'] + audit_logger = failsafe_components['audit_logger'] + + station_id = 'FAILSAFE_STATION_001' + pump_id = 'FAILSAFE_PUMP_002' + + # Activate failsafe mode + watchdog.activate_failsafe_mode(station_id, pump_id, "audit_test_reason") + + # Verify failsafe mode is active + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert setpoint == 40.0, "Failsafe mode not active for audit test" + + # Note: In a real implementation, we would verify that the audit logger + # recorded the failsafe activation. For now, we'll just verify the + # functional behavior. + + # Clear failsafe mode + watchdog.clear_failsafe_mode(station_id, pump_id) + + # Verify normal operation restored + recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert recovered_setpoint != 40.0, "Failsafe mode not cleared" \ No newline at end of file diff --git a/tests/integration/test_optimization_to_scada.py b/tests/integration/test_optimization_to_scada.py new file mode 100644 index 0000000..e42a53a --- /dev/null +++ b/tests/integration/test_optimization_to_scada.py @@ -0,0 +1,374 @@ +""" +Integration tests for optimization-to-SCADA workflow. + +Tests the complete data flow: +1. Optimization plan in database +2. Setpoint calculation by setpoint manager +3. Protocol server exposure (OPC UA, Modbus, REST API) +4. SCADA client access to setpoints +""" + +import pytest +import pytest_asyncio +import asyncio +import time +from typing import Dict, Any, Optional +from unittest.mock import Mock, patch + +from src.database.flexible_client import FlexibleDatabaseClient +from src.core.auto_discovery import AutoDiscovery +from src.core.setpoint_manager import SetpointManager +from src.core.safety import SafetyLimitEnforcer +from src.core.emergency_stop import EmergencyStopManager +from src.monitoring.watchdog import DatabaseWatchdog +from src.protocols.opcua_server import OPCUAServer +from src.protocols.modbus_server import ModbusServer +from src.protocols.rest_api import RESTAPIServer +from src.core.security import SecurityManager +from src.core.compliance_audit import ComplianceAuditLogger + + +class TestOptimizationToSCADAIntegration: + """Test complete workflow from optimization to SCADA.""" + + @pytest_asyncio.fixture + async def db_client(self): + """Create database client with comprehensive test data.""" + client = FlexibleDatabaseClient( + database_url="sqlite:///:memory:", + pool_size=5, + max_overflow=10, + pool_timeout=30 + ) + + await client.connect() + client.create_tables() + + # Insert comprehensive test data + self._insert_comprehensive_test_data(client) + + return client + + def _insert_comprehensive_test_data(self, db_client): + """Insert realistic test data for multiple scenarios.""" + # Insert multiple pump stations + db_client.execute( + """INSERT INTO pump_stations (station_id, station_name, location) VALUES + ('STATION_001', 'Main Station', 'Location A'), + ('STATION_002', 'Backup Station', 'Location B'), + ('STATION_003', 'Emergency Station', 'Location C')""" + ) + + # Insert multiple pumps with different configurations + db_client.execute( + """INSERT INTO pumps (station_id, pump_id, pump_name, control_type, + min_speed_hz, max_speed_hz, default_setpoint_hz) VALUES + ('STATION_001', 'PUMP_001', 'Main Pump 1', 'DIRECT_SPEED', 20.0, 60.0, 35.0), + ('STATION_001', 'PUMP_002', 'Main Pump 2', 'LEVEL_CONTROLLED', 25.0, 55.0, 40.0), + ('STATION_002', 'PUMP_003', 'Backup Pump 1', 'POWER_CONTROLLED', 30.0, 50.0, 38.0), + ('STATION_003', 'PUMP_004', 'Emergency Pump', 'DIRECT_SPEED', 15.0, 70.0, 45.0)""" + ) + + # Insert optimization plans for different scenarios + import datetime + now = datetime.datetime.now() + + # Insert plans one by one to handle datetime values + db_client.execute( + f"""INSERT INTO pump_plans ( + station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m, + suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id + ) VALUES ( + 'STATION_001', 'PUMP_001', 150.0, NULL, NULL, 42.5, + '{now - datetime.timedelta(hours=1)}', '{now + datetime.timedelta(hours=1)}', 1, 'ACTIVE', 'OPT_RUN_001' + )""" + ) + + db_client.execute( + f"""INSERT INTO pump_plans ( + station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m, + suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id + ) VALUES ( + 'STATION_001', 'PUMP_002', NULL, NULL, 2.5, 38.0, + '{now - datetime.timedelta(minutes=30)}', '{now + datetime.timedelta(hours=2)}', 1, 'ACTIVE', 'OPT_RUN_002' + )""" + ) + + db_client.execute( + f"""INSERT INTO pump_plans ( + station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m, + suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id + ) VALUES ( + 'STATION_002', 'PUMP_003', NULL, 12.5, NULL, 36.0, + '{now - datetime.timedelta(hours=2)}', '{now + datetime.timedelta(hours=3)}', 1, 'ACTIVE', 'OPT_RUN_003' + )""" + ) + + db_client.execute( + f"""INSERT INTO pump_plans ( + station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m, + suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id + ) VALUES ( + 'STATION_003', 'PUMP_004', 200.0, NULL, NULL, 48.0, + '{now - datetime.timedelta(hours=1)}', '{now + datetime.timedelta(hours=4)}', 1, 'ACTIVE', 'OPT_RUN_004' + )""" + ) + + # Insert safety limits for all pumps + db_client.execute( + """INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz, + hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m, + hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds, + max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES + ('STATION_001', 'PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'), + ('STATION_001', 'PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'), + ('STATION_002', 'PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin'), + ('STATION_003', 'PUMP_004', 15.0, 70.0, 0.5, 6.0, 0.2, 1.0, 20.0, 250.0, 6, 300, 24, 10.0, 'admin', 'admin')""" + ) + + @pytest_asyncio.fixture + async def system_components(self, db_client): + """Create all system components for integration testing.""" + # Create auto discovery + auto_discovery = AutoDiscovery(db_client) + await auto_discovery.discover() + + # Create safety components + safety_enforcer = SafetyLimitEnforcer(db_client) + emergency_stop_manager = EmergencyStopManager(db_client) + + # Load safety limits + await safety_enforcer.load_safety_limits() + + # Create mock alert manager for watchdog + mock_alert_manager = Mock() + watchdog = DatabaseWatchdog(db_client, mock_alert_manager) + + # Create setpoint manager + setpoint_manager = SetpointManager( + discovery=auto_discovery, + db_client=db_client, + safety_enforcer=safety_enforcer, + emergency_stop_manager=emergency_stop_manager, + watchdog=watchdog + ) + + # Create security components + security_manager = SecurityManager() + audit_logger = ComplianceAuditLogger(db_client) + + return { + 'db_client': db_client, + 'auto_discovery': auto_discovery, + 'setpoint_manager': setpoint_manager, + 'safety_enforcer': safety_enforcer, + 'emergency_stop_manager': emergency_stop_manager, + 'watchdog': watchdog, + 'security_manager': security_manager, + 'audit_logger': audit_logger + } + + @pytest.mark.asyncio + async def test_optimization_plan_to_setpoint_calculation(self, system_components): + """Test that optimization plans are correctly converted to setpoints.""" + setpoint_manager = system_components['setpoint_manager'] + + # Test setpoint calculation for different pump types + test_cases = [ + ('STATION_001', 'PUMP_001', 42.5), # DIRECT_SPEED + ('STATION_001', 'PUMP_002', 38.0), # LEVEL_CONTROL + ('STATION_002', 'PUMP_003', 36.0), # POWER_OPTIMIZATION + ('STATION_003', 'PUMP_004', 48.0), # DIRECT_SPEED (emergency) + ] + + for station_id, pump_id, expected_setpoint in test_cases: + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert setpoint == expected_setpoint, f"Setpoint mismatch for {station_id}/{pump_id}" + + @pytest.mark.asyncio + async def test_opcua_server_setpoint_exposure(self, system_components): + """Test that OPC UA server correctly exposes setpoints.""" + setpoint_manager = system_components['setpoint_manager'] + security_manager = system_components['security_manager'] + audit_logger = system_components['audit_logger'] + + # Create OPC UA server + opcua_server = OPCUAServer( + setpoint_manager=setpoint_manager, + security_manager=security_manager, + audit_logger=audit_logger, + enable_security=False, # Disable security for testing + endpoint="opc.tcp://127.0.0.1:4840" + ) + + try: + # Start server + await opcua_server.start() + + # Wait for server to initialize + await asyncio.sleep(1) + + # Test that setpoints are available + stations = setpoint_manager.discovery.get_stations() + for station_id in stations: + pumps = setpoint_manager.discovery.get_pumps(station_id) + for pump in pumps: + pump_id = pump['pump_id'] + + # Verify setpoint is calculated + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert setpoint is not None, f"No setpoint for {station_id}/{pump_id}" + + # Verify setpoint is within safety limits + pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id) + min_speed = pump_info.get('min_speed_hz', 20.0) + max_speed = pump_info.get('max_speed_hz', 60.0) + assert min_speed <= setpoint <= max_speed, f"Setpoint out of range for {station_id}/{pump_id}" + + finally: + # Clean up + await opcua_server.stop() + + @pytest.mark.asyncio + async def test_modbus_server_setpoint_exposure(self, system_components): + """Test that Modbus server correctly exposes setpoints.""" + setpoint_manager = system_components['setpoint_manager'] + security_manager = system_components['security_manager'] + audit_logger = system_components['audit_logger'] + + # Create Modbus server + modbus_server = ModbusServer( + setpoint_manager=setpoint_manager, + security_manager=security_manager, + audit_logger=audit_logger, + enable_security=False, # Disable security for testing + allowed_ips=["127.0.0.1"], + rate_limit_per_minute=100, + host="127.0.0.1", + port=5020 + ) + + try: + # Start server + await modbus_server.start() + + # Wait for server to initialize + await asyncio.sleep(1) + + # Test that pump mapping is created + assert len(modbus_server.pump_addresses) > 0, "No pumps mapped to Modbus addresses" + + # Verify each pump has a setpoint register + for (station_id, pump_id), addresses in modbus_server.pump_addresses.items(): + assert 'setpoint_register' in addresses, f"No setpoint register for {station_id}/{pump_id}" + + # Verify setpoint is calculated + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert setpoint is not None, f"No setpoint for {station_id}/{pump_id}" + + finally: + # Clean up + await modbus_server.stop() + + @pytest.mark.asyncio + async def test_rest_api_setpoint_exposure(self, system_components): + """Test that REST API correctly exposes setpoints.""" + setpoint_manager = system_components['setpoint_manager'] + emergency_stop_manager = system_components['emergency_stop_manager'] + + # Create REST API server + rest_api = RESTAPIServer( + setpoint_manager=setpoint_manager, + emergency_stop_manager=emergency_stop_manager, + host="127.0.0.1", + port=8000 + ) + + try: + # Start server + await rest_api.start() + + # Wait for server to initialize + await asyncio.sleep(1) + + # Test that setpoint endpoints are available + routes = [route.path for route in rest_api.app.routes] + assert "/api/v1/setpoints" in routes + assert "/api/v1/setpoints/{station_id}/{pump_id}" in routes + + # Test setpoint retrieval via REST API + import httpx + async with httpx.AsyncClient() as client: + # Get all setpoints + response = await client.get("http://127.0.0.1:8000/api/v1/setpoints") + assert response.status_code == 200 + setpoints = response.json() + assert len(setpoints) > 0 + + # Get specific setpoint + response = await client.get("http://127.0.0.1:8000/api/v1/setpoints/STATION_001/PUMP_001") + assert response.status_code == 200 + setpoint_data = response.json() + assert 'setpoint_hz' in setpoint_data + assert setpoint_data['setpoint_hz'] == 42.5 + + finally: + # Clean up + await rest_api.stop() + + @pytest.mark.asyncio + async def test_safety_limit_enforcement_integration(self, system_components): + """Test that safety limits are enforced throughout the workflow.""" + setpoint_manager = system_components['setpoint_manager'] + safety_enforcer = system_components['safety_enforcer'] + + # Test safety limit enforcement for each pump + stations = setpoint_manager.discovery.get_stations() + for station_id in stations: + pumps = setpoint_manager.discovery.get_pumps(station_id) + for pump in pumps: + pump_id = pump['pump_id'] + + # Get calculated setpoint + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + + # Verify safety enforcement + enforced_setpoint, violations = safety_enforcer.enforce_setpoint( + station_id, pump_id, setpoint + ) + + # Setpoint should be within limits after enforcement + pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id) + min_speed = pump_info.get('min_speed_hz', 20.0) + max_speed = pump_info.get('max_speed_hz', 60.0) + + assert min_speed <= enforced_setpoint <= max_speed, \ + f"Safety enforcement failed for {station_id}/{pump_id}" + + @pytest.mark.asyncio + async def test_emergency_stop_integration(self, system_components): + """Test emergency stop integration with setpoint calculation.""" + setpoint_manager = system_components['setpoint_manager'] + emergency_stop_manager = system_components['emergency_stop_manager'] + + # Test station/pump + station_id = 'STATION_001' + pump_id = 'PUMP_001' + + # Get normal setpoint + normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert normal_setpoint is not None + + # Activate emergency stop + emergency_stop_manager.emergency_stop_pump(station_id, pump_id, "integration_test") + + # Setpoint should be 0 during emergency stop + emergency_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert emergency_setpoint == 0.0, "Setpoint not zero during emergency stop" + + # Clear emergency stop + emergency_stop_manager.clear_emergency_stop_pump(station_id, pump_id, "integration_test") + + # Setpoint should return to normal + recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert recovered_setpoint == normal_setpoint, "Setpoint did not recover after emergency stop" \ No newline at end of file diff --git a/tests/integration/test_safety_workflows.py b/tests/integration/test_safety_workflows.py new file mode 100644 index 0000000..3235fb0 --- /dev/null +++ b/tests/integration/test_safety_workflows.py @@ -0,0 +1,261 @@ +""" +Integration tests for safety workflow scenarios. + +Tests safety limit violations, emergency stop workflows, and failsafe mode operations. +""" + +import pytest +import pytest_asyncio +from unittest.mock import Mock, AsyncMock +import asyncio + +from src.database.flexible_client import FlexibleDatabaseClient +from src.core.auto_discovery import AutoDiscovery +from src.core.safety import SafetyLimitEnforcer +from src.core.emergency_stop import EmergencyStopManager +from src.core.setpoint_manager import SetpointManager +from src.core.security import SecurityManager +from src.core.compliance_audit import ComplianceAuditLogger +from src.monitoring.watchdog import DatabaseWatchdog + + +class TestSafetyWorkflows: + """Test safety-related workflows and scenarios.""" + + @pytest_asyncio.fixture + async def db_client(self): + """Create database client with safety test data.""" + client = FlexibleDatabaseClient( + database_url="sqlite:///:memory:", + pool_size=5, + max_overflow=10, + pool_timeout=30 + ) + + await client.connect() + client.create_tables() + + # Insert safety test data + self._insert_safety_test_data(client) + return client + + def _insert_safety_test_data(self, db_client): + """Insert comprehensive safety test data.""" + # Insert stations + db_client.execute( + """INSERT INTO pump_stations (station_id, station_name, location) VALUES + ('SAFETY_STATION_001', 'Safety Test Station 1', 'Test Location A'), + ('SAFETY_STATION_002', 'Safety Test Station 2', 'Test Location B')""" + ) + + # Insert pumps with different safety configurations + db_client.execute( + """INSERT INTO pumps (station_id, pump_id, pump_name, control_type, + min_speed_hz, max_speed_hz, default_setpoint_hz) VALUES + ('SAFETY_STATION_001', 'SAFETY_PUMP_001', 'Safety Pump 1', 'DIRECT_SPEED', 20.0, 60.0, 35.0), + ('SAFETY_STATION_001', 'SAFETY_PUMP_002', 'Safety Pump 2', 'LEVEL_CONTROLLED', 25.0, 55.0, 40.0), + ('SAFETY_STATION_002', 'SAFETY_PUMP_003', 'Safety Pump 3', 'POWER_CONTROLLED', 30.0, 50.0, 38.0)""" + ) + + # Insert safety limits with different scenarios + db_client.execute( + """INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz, + hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m, + hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds, + max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES + ('SAFETY_STATION_001', 'SAFETY_PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'), + ('SAFETY_STATION_001', 'SAFETY_PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'), + ('SAFETY_STATION_002', 'SAFETY_PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin')""" + ) + + # Insert optimization plans that might trigger safety limits + import datetime + now = datetime.datetime.now() + db_client.execute( + f"""INSERT INTO pump_plans (station_id, pump_id, interval_start, interval_end, + suggested_speed_hz, target_level_m, target_power_kw, plan_status) VALUES + ('SAFETY_STATION_001', 'SAFETY_PUMP_001', '{now}', '{now + datetime.timedelta(hours=1)}', 65.0, 3.0, 12.0, 'ACTIVE'), + ('SAFETY_STATION_001', 'SAFETY_PUMP_002', '{now}', '{now + datetime.timedelta(hours=1)}', 15.0, 2.5, 10.0, 'ACTIVE'), + ('SAFETY_STATION_002', 'SAFETY_PUMP_003', '{now}', '{now + datetime.timedelta(hours=1)}', 45.0, 3.5, 8.0, 'ACTIVE')""" + ) + + @pytest_asyncio.fixture + async def safety_components(self, db_client): + """Create safety-related components for testing.""" + # Create auto discovery + auto_discovery = AutoDiscovery(db_client) + await auto_discovery.discover() + + # Create safety components + safety_enforcer = SafetyLimitEnforcer(db_client) + emergency_stop_manager = EmergencyStopManager(db_client) + + # Load safety limits + await safety_enforcer.load_safety_limits() + + # Create mock alert manager for watchdog + mock_alert_manager = Mock() + watchdog = DatabaseWatchdog(db_client, mock_alert_manager) + + # Create setpoint manager + setpoint_manager = SetpointManager( + discovery=auto_discovery, + db_client=db_client, + safety_enforcer=safety_enforcer, + emergency_stop_manager=emergency_stop_manager, + watchdog=watchdog + ) + + # Create security components + security_manager = SecurityManager() + audit_logger = ComplianceAuditLogger(db_client) + + return { + 'auto_discovery': auto_discovery, + 'safety_enforcer': safety_enforcer, + 'emergency_stop_manager': emergency_stop_manager, + 'setpoint_manager': setpoint_manager, + 'security_manager': security_manager, + 'audit_logger': audit_logger, + 'watchdog': watchdog + } + + @pytest.mark.asyncio + async def test_safety_limit_violation_workflow(self, safety_components): + """Test complete workflow for safety limit violations.""" + setpoint_manager = safety_components['setpoint_manager'] + safety_enforcer = safety_components['safety_enforcer'] + + # Test pump with optimization plan that exceeds safety limits + station_id = 'SAFETY_STATION_001' + pump_id = 'SAFETY_PUMP_001' + + # Get setpoint (should be enforced to max limit) + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + + # Verify setpoint is within safety limits + pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id) + max_speed = pump_info.get('max_speed_hz', 60.0) + + # The setpoint should be enforced to the maximum safety limit (60.0) + # even though the optimization plan requests 65.0 + assert setpoint <= max_speed, f"Setpoint {setpoint} exceeds safety limit {max_speed}" + assert setpoint == 60.0, f"Setpoint should be enforced to safety limit, got {setpoint}" + + @pytest.mark.asyncio + async def test_emergency_stop_clearance_workflow(self, safety_components): + """Test emergency stop activation and clearance workflow.""" + setpoint_manager = safety_components['setpoint_manager'] + emergency_stop_manager = safety_components['emergency_stop_manager'] + + station_id = 'SAFETY_STATION_001' + pump_id = 'SAFETY_PUMP_002' + + # Get normal setpoint + normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert normal_setpoint is not None + + # Activate emergency stop + emergency_stop_manager.emergency_stop_pump(station_id, pump_id, "test_workflow") + + # Verify setpoint is 0 during emergency stop + emergency_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert emergency_setpoint == 0.0, "Setpoint not zero during emergency stop" + + # Clear emergency stop + emergency_stop_manager.clear_emergency_stop_pump(station_id, pump_id, "test_clearance") + + # Verify setpoint returns to normal + recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert recovered_setpoint == normal_setpoint, "Setpoint did not recover after emergency stop clearance" + + @pytest.mark.asyncio + async def test_multiple_safety_violations(self, safety_components): + """Test handling of multiple simultaneous safety violations.""" + setpoint_manager = safety_components['setpoint_manager'] + safety_enforcer = safety_components['safety_enforcer'] + + # Test multiple pumps with different safety violations + test_cases = [ + ('SAFETY_STATION_001', 'SAFETY_PUMP_001', 60.0), # Exceeds max speed (65.0 -> 60.0) + ('SAFETY_STATION_001', 'SAFETY_PUMP_002', 25.0), # Below min speed (15.0 -> 25.0) + ('SAFETY_STATION_002', 'SAFETY_PUMP_003', 45.0), # Within limits (45.0 -> 45.0) + ] + + for station_id, pump_id, expected_enforced_setpoint in test_cases: + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + + # Verify setpoint is within safety limits + pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id) + min_speed = pump_info.get('min_speed_hz', 20.0) + max_speed = pump_info.get('max_speed_hz', 60.0) + + assert min_speed <= setpoint <= max_speed, \ + f"Setpoint {setpoint} out of range [{min_speed}, {max_speed}] for {station_id}/{pump_id}" + + # Verify specific enforcement + assert setpoint == expected_enforced_setpoint, \ + f"Setpoint enforcement failed for {station_id}/{pump_id}: expected {expected_enforced_setpoint}, got {setpoint}" + + @pytest.mark.asyncio + async def test_safety_limit_dynamic_updates(self, safety_components, db_client): + """Test that safety limit updates are reflected in real-time.""" + setpoint_manager = safety_components['setpoint_manager'] + safety_enforcer = safety_components['safety_enforcer'] + + station_id = 'SAFETY_STATION_001' + pump_id = 'SAFETY_PUMP_001' + + # Get initial setpoint + initial_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + + # Update safety limits to be more restrictive + db_client.execute( + """UPDATE pump_safety_limits + SET hard_max_speed_hz = 45.0 + WHERE station_id = 'SAFETY_STATION_001' AND pump_id = 'SAFETY_PUMP_001'""" + ) + + # Reload safety limits + await safety_enforcer.load_safety_limits() + + # Get updated setpoint + updated_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + + # Verify setpoint is now enforced to new limit + assert updated_setpoint <= 45.0, f"Setpoint {updated_setpoint} not enforced to new limit 45.0" + assert updated_setpoint < initial_setpoint, f"Setpoint should be lower after safety limit update: {updated_setpoint} < {initial_setpoint}" + + @pytest.mark.asyncio + async def test_emergency_stop_cascade_effects(self, safety_components): + """Test cascade effects of emergency stops on related systems.""" + setpoint_manager = safety_components['setpoint_manager'] + emergency_stop_manager = safety_components['emergency_stop_manager'] + + # Activate station-wide emergency stop + station_id = 'SAFETY_STATION_001' + emergency_stop_manager.emergency_stop_station(station_id, "station_emergency_test") + + # Verify all pumps in the station are stopped + pumps = setpoint_manager.discovery.get_pumps(station_id) + for pump in pumps: + pump_id = pump['pump_id'] + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert setpoint == 0.0, f"Pump {pump_id} not stopped during station emergency" + + # Verify pumps in other stations are unaffected + other_station_id = 'SAFETY_STATION_002' + other_pumps = setpoint_manager.discovery.get_pumps(other_station_id) + for pump in other_pumps: + pump_id = pump['pump_id'] + setpoint = setpoint_manager.get_current_setpoint(other_station_id, pump_id) + assert setpoint > 0.0, f"Pump {pump_id} incorrectly stopped during unrelated station emergency" + + # Clear station emergency stop + emergency_stop_manager.clear_emergency_stop_station(station_id, "station_clearance_test") + + # Verify pumps return to normal operation + for pump in pumps: + pump_id = pump['pump_id'] + setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) + assert setpoint > 0.0, f"Pump {pump_id} not recovered after station emergency clearance" \ No newline at end of file