Phase 6 Integration Testing: Complete safety workflow and optimization-to-SCADA integration tests

- Fixed critical safety limit loading bug: get_safety_limits() now queries pump_safety_limits table
- Fixed emergency stop logic: setpoint manager returns 0.0 during emergency stop
- Added comprehensive test data with proper column mappings
- All 5 safety workflow tests now passing
- 5/6 optimization-to-SCADA integration tests passing
- Created failsafe operation test suite (requires DatabaseWatchdog API updates)

Key fixes:
- Safety limit enforcement now works correctly
- Emergency stop properly shuts down pumps (0.0 setpoint)
- Dynamic safety limit updates are reflected in real-time
- Test data includes all required columns for setpoint calculation

Remaining issues:
- REST API test failing (no server running on port 8000)
- Failsafe tests require DatabaseWatchdog public API methods
This commit is contained in:
openhands 2025-10-28 17:25:00 +00:00
parent f8623e9ec7
commit bfb52a5c45
5 changed files with 932 additions and 7 deletions

View File

@ -163,7 +163,7 @@ class SetpointManager:
station_id=station_id, station_id=station_id,
pump_id=pump_id pump_id=pump_id
) )
return self._get_default_setpoint(station_id, pump_id) return 0.0 # Complete stop during emergency
# Check failsafe mode # Check failsafe mode
if self.watchdog.is_failsafe_active(station_id, pump_id): if self.watchdog.is_failsafe_active(station_id, pump_id):
@ -199,7 +199,7 @@ class SetpointManager:
setpoint = calculator.calculate_setpoint(plan, feedback, pump_info) setpoint = calculator.calculate_setpoint(plan, feedback, pump_info)
# Enforce safety limits (LAST LINE OF DEFENSE) # Enforce safety limits (LAST LINE OF DEFENSE)
safe_setpoint = self.safety_enforcer.enforce_limits( safe_setpoint, violations = self.safety_enforcer.enforce_setpoint(
station_id, pump_id, setpoint station_id, pump_id, setpoint
) )

View File

@ -416,11 +416,10 @@ class FlexibleDatabaseClient:
def get_safety_limits(self) -> List[Dict[str, Any]]: def get_safety_limits(self) -> List[Dict[str, Any]]:
"""Get safety limits for all pumps.""" """Get safety limits for all pumps."""
query = """ query = """
SELECT station_id, pump_id, min_speed_hz as hard_min_speed_hz, SELECT station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
max_speed_hz as hard_max_speed_hz, hard_min_level_m, hard_max_level_m, hard_max_power_kw,
NULL as hard_min_level_m, NULL as hard_max_level_m, max_speed_change_hz_per_min
NULL as hard_max_power_kw, 5.0 as max_speed_change_hz_per_min FROM pump_safety_limits
FROM pumps
""" """
return self.execute_query(query) return self.execute_query(query)

View File

@ -0,0 +1,291 @@
"""
Integration tests for failsafe mode operations.
Tests failsafe mode activation, operation, and recovery scenarios.
"""
import pytest
import pytest_asyncio
from unittest.mock import Mock, AsyncMock
import asyncio
from src.database.flexible_client import FlexibleDatabaseClient
from src.core.auto_discovery import AutoDiscovery
from src.core.safety import SafetyLimitEnforcer
from src.core.emergency_stop import EmergencyStopManager
from src.core.setpoint_manager import SetpointManager
from src.core.security import SecurityManager
from src.core.compliance_audit import ComplianceAuditLogger
from src.monitoring.watchdog import DatabaseWatchdog
class TestFailsafeOperations:
"""Test failsafe mode operations and scenarios."""
@pytest_asyncio.fixture
async def db_client(self):
"""Create database client with failsafe test data."""
client = FlexibleDatabaseClient(
database_url="sqlite:///:memory:",
pool_size=5,
max_overflow=10,
pool_timeout=30
)
await client.connect()
client.create_tables()
# Insert failsafe test data
self._insert_failsafe_test_data(client)
return client
def _insert_failsafe_test_data(self, db_client):
"""Insert comprehensive failsafe test data."""
# Insert stations
db_client.execute(
"""INSERT INTO pump_stations (station_id, station_name, location) VALUES
('FAILSAFE_STATION_001', 'Failsafe Test Station 1', 'Test Location A'),
('FAILSAFE_STATION_002', 'Failsafe Test Station 2', 'Test Location B')"""
)
# Insert pumps with different failsafe configurations
db_client.execute(
"""INSERT INTO pumps (station_id, pump_id, pump_name, control_type,
min_speed_hz, max_speed_hz, default_setpoint_hz) VALUES
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_001', 'Failsafe Pump 1', 'DIRECT_SPEED', 20.0, 60.0, 35.0),
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_002', 'Failsafe Pump 2', 'LEVEL_CONTROLLED', 25.0, 55.0, 40.0),
('FAILSAFE_STATION_002', 'FAILSAFE_PUMP_003', 'Failsafe Pump 3', 'POWER_CONTROLLED', 30.0, 50.0, 38.0)"""
)
# Insert safety limits
db_client.execute(
"""INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m,
hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds,
max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'),
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'),
('FAILSAFE_STATION_002', 'FAILSAFE_PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin')"""
)
# Insert optimization plans
import datetime
now = datetime.datetime.now()
db_client.execute(
f"""INSERT INTO pump_plans (station_id, pump_id, interval_start, interval_end,
suggested_speed_hz, target_level_m, target_power_kw, plan_status) VALUES
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_001', '{now}', '{now + datetime.timedelta(hours=1)}', 42.5, 3.0, 12.0, 'ACTIVE'),
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_002', '{now}', '{now + datetime.timedelta(hours=1)}', 38.0, 2.5, 10.0, 'ACTIVE'),
('FAILSAFE_STATION_002', 'FAILSAFE_PUMP_003', '{now}', '{now + datetime.timedelta(hours=1)}', 36.0, 3.5, 8.0, 'ACTIVE')"""
)
@pytest_asyncio.fixture
async def failsafe_components(self, db_client):
"""Create failsafe-related components for testing."""
# Create auto discovery
auto_discovery = AutoDiscovery(db_client)
await auto_discovery.discover()
# Create safety components
safety_enforcer = SafetyLimitEnforcer(db_client)
emergency_stop_manager = EmergencyStopManager(db_client)
# Load safety limits
await safety_enforcer.load_safety_limits()
# Create mock alert manager for watchdog
mock_alert_manager = Mock()
watchdog = DatabaseWatchdog(db_client, mock_alert_manager)
# Create setpoint manager
setpoint_manager = SetpointManager(
discovery=auto_discovery,
db_client=db_client,
safety_enforcer=safety_enforcer,
emergency_stop_manager=emergency_stop_manager,
watchdog=watchdog
)
# Create security components
security_manager = SecurityManager()
audit_logger = ComplianceAuditLogger(db_client)
return {
'auto_discovery': auto_discovery,
'safety_enforcer': safety_enforcer,
'emergency_stop_manager': emergency_stop_manager,
'setpoint_manager': setpoint_manager,
'security_manager': security_manager,
'audit_logger': audit_logger,
'watchdog': watchdog
}
@pytest.mark.asyncio
async def test_failsafe_mode_activation(self, failsafe_components):
"""Test failsafe mode activation for individual pumps."""
setpoint_manager = failsafe_components['setpoint_manager']
watchdog = failsafe_components['watchdog']
station_id = 'FAILSAFE_STATION_001'
pump_id = 'FAILSAFE_PUMP_001'
# Get normal setpoint
normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert normal_setpoint is not None
# Activate failsafe mode
watchdog.activate_failsafe_mode(station_id, pump_id, "communication_loss")
# Verify setpoint switches to failsafe mode
failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
# In failsafe mode, should use default setpoint (35.0) as fallback
# since we don't have a specific failsafe_setpoint_hz in the database
assert failsafe_setpoint == 35.0, f"Setpoint not in failsafe mode: {failsafe_setpoint}"
@pytest.mark.asyncio
async def test_failsafe_mode_recovery(self, failsafe_components):
"""Test failsafe mode recovery and return to normal operation."""
setpoint_manager = failsafe_components['setpoint_manager']
watchdog = failsafe_components['watchdog']
station_id = 'FAILSAFE_STATION_001'
pump_id = 'FAILSAFE_PUMP_002'
# Get normal setpoint
normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
# Activate failsafe mode
watchdog.activate_failsafe_mode(station_id, pump_id, "sensor_failure")
# Verify failsafe mode
failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert failsafe_setpoint == 40.0, f"Setpoint not in failsafe mode: {failsafe_setpoint}"
# Clear failsafe mode
watchdog.clear_failsafe_mode(station_id, pump_id)
# Verify return to normal operation
recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert recovered_setpoint == normal_setpoint, "Setpoint did not recover after failsafe clearance"
@pytest.mark.asyncio
async def test_failsafe_mode_station_wide(self, failsafe_components):
"""Test station-wide failsafe mode activation."""
setpoint_manager = failsafe_components['setpoint_manager']
watchdog = failsafe_components['watchdog']
station_id = 'FAILSAFE_STATION_001'
# Activate station-wide failsafe mode
watchdog.activate_failsafe_mode_station(station_id, "station_communication_loss")
# Verify all pumps in station are in failsafe mode
pumps = setpoint_manager.discovery.get_pumps(station_id)
for pump in pumps:
pump_id = pump['pump_id']
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
# Should use default setpoint as failsafe
default_setpoint = pump.get('default_setpoint_hz', 35.0)
assert setpoint == default_setpoint, \
f"Pump {pump_id} not in failsafe mode: {setpoint} != {default_setpoint}"
# Verify pumps in other stations are unaffected
other_station_id = 'FAILSAFE_STATION_002'
other_pumps = setpoint_manager.discovery.get_pumps(other_station_id)
for pump in other_pumps:
pump_id = pump['pump_id']
setpoint = setpoint_manager.get_current_setpoint(other_station_id, pump_id)
assert setpoint != pump.get('default_setpoint_hz', 35.0), \
f"Pump {pump_id} incorrectly in failsafe mode"
# Clear station-wide failsafe mode
watchdog.clear_failsafe_mode_station(station_id)
# Verify pumps return to normal operation
for pump in pumps:
pump_id = pump['pump_id']
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert setpoint != pump.get('default_setpoint_hz', 35.0), \
f"Pump {pump_id} not recovered from failsafe mode"
@pytest.mark.asyncio
async def test_failsafe_priority_over_optimization(self, failsafe_components):
"""Test that failsafe mode takes priority over optimization plans."""
setpoint_manager = failsafe_components['setpoint_manager']
watchdog = failsafe_components['watchdog']
station_id = 'FAILSAFE_STATION_002'
pump_id = 'FAILSAFE_PUMP_003'
# Get optimization-based setpoint
optimization_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert optimization_setpoint is not None
# Activate failsafe mode
watchdog.activate_failsafe_mode(station_id, pump_id, "optimizer_failure")
# Verify failsafe mode overrides optimization
failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert failsafe_setpoint == 38.0, f"Failsafe mode not overriding optimization: {failsafe_setpoint}"
assert failsafe_setpoint != optimization_setpoint, "Failsafe mode should differ from optimization"
@pytest.mark.asyncio
async def test_emergency_stop_priority_over_failsafe(self, failsafe_components):
"""Test that emergency stop takes priority over failsafe mode."""
setpoint_manager = failsafe_components['setpoint_manager']
emergency_stop_manager = failsafe_components['emergency_stop_manager']
watchdog = failsafe_components['watchdog']
station_id = 'FAILSAFE_STATION_001'
pump_id = 'FAILSAFE_PUMP_001'
# Activate failsafe mode
watchdog.activate_failsafe_mode(station_id, pump_id, "test_priority")
# Verify failsafe mode is active
failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert failsafe_setpoint == 35.0, "Failsafe mode not active"
# Activate emergency stop
emergency_stop_manager.emergency_stop_pump(station_id, pump_id, "emergency_override_test")
# Verify emergency stop overrides failsafe mode
emergency_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert emergency_setpoint == 0.0, "Emergency stop not overriding failsafe mode"
# Clear emergency stop
emergency_stop_manager.clear_emergency_stop_pump(station_id, pump_id, "clearance_test")
# Verify failsafe mode is restored
restored_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert restored_setpoint == 35.0, "Failsafe mode not restored after emergency stop clearance"
@pytest.mark.asyncio
async def test_failsafe_mode_audit_logging(self, failsafe_components, db_client):
"""Test that failsafe mode activations are properly logged."""
setpoint_manager = failsafe_components['setpoint_manager']
watchdog = failsafe_components['watchdog']
audit_logger = failsafe_components['audit_logger']
station_id = 'FAILSAFE_STATION_001'
pump_id = 'FAILSAFE_PUMP_002'
# Activate failsafe mode
watchdog.activate_failsafe_mode(station_id, pump_id, "audit_test_reason")
# Verify failsafe mode is active
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert setpoint == 40.0, "Failsafe mode not active for audit test"
# Note: In a real implementation, we would verify that the audit logger
# recorded the failsafe activation. For now, we'll just verify the
# functional behavior.
# Clear failsafe mode
watchdog.clear_failsafe_mode(station_id, pump_id)
# Verify normal operation restored
recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert recovered_setpoint != 40.0, "Failsafe mode not cleared"

View File

@ -0,0 +1,374 @@
"""
Integration tests for optimization-to-SCADA workflow.
Tests the complete data flow:
1. Optimization plan in database
2. Setpoint calculation by setpoint manager
3. Protocol server exposure (OPC UA, Modbus, REST API)
4. SCADA client access to setpoints
"""
import pytest
import pytest_asyncio
import asyncio
import time
from typing import Dict, Any, Optional
from unittest.mock import Mock, patch
from src.database.flexible_client import FlexibleDatabaseClient
from src.core.auto_discovery import AutoDiscovery
from src.core.setpoint_manager import SetpointManager
from src.core.safety import SafetyLimitEnforcer
from src.core.emergency_stop import EmergencyStopManager
from src.monitoring.watchdog import DatabaseWatchdog
from src.protocols.opcua_server import OPCUAServer
from src.protocols.modbus_server import ModbusServer
from src.protocols.rest_api import RESTAPIServer
from src.core.security import SecurityManager
from src.core.compliance_audit import ComplianceAuditLogger
class TestOptimizationToSCADAIntegration:
"""Test complete workflow from optimization to SCADA."""
@pytest_asyncio.fixture
async def db_client(self):
"""Create database client with comprehensive test data."""
client = FlexibleDatabaseClient(
database_url="sqlite:///:memory:",
pool_size=5,
max_overflow=10,
pool_timeout=30
)
await client.connect()
client.create_tables()
# Insert comprehensive test data
self._insert_comprehensive_test_data(client)
return client
def _insert_comprehensive_test_data(self, db_client):
"""Insert realistic test data for multiple scenarios."""
# Insert multiple pump stations
db_client.execute(
"""INSERT INTO pump_stations (station_id, station_name, location) VALUES
('STATION_001', 'Main Station', 'Location A'),
('STATION_002', 'Backup Station', 'Location B'),
('STATION_003', 'Emergency Station', 'Location C')"""
)
# Insert multiple pumps with different configurations
db_client.execute(
"""INSERT INTO pumps (station_id, pump_id, pump_name, control_type,
min_speed_hz, max_speed_hz, default_setpoint_hz) VALUES
('STATION_001', 'PUMP_001', 'Main Pump 1', 'DIRECT_SPEED', 20.0, 60.0, 35.0),
('STATION_001', 'PUMP_002', 'Main Pump 2', 'LEVEL_CONTROLLED', 25.0, 55.0, 40.0),
('STATION_002', 'PUMP_003', 'Backup Pump 1', 'POWER_CONTROLLED', 30.0, 50.0, 38.0),
('STATION_003', 'PUMP_004', 'Emergency Pump', 'DIRECT_SPEED', 15.0, 70.0, 45.0)"""
)
# Insert optimization plans for different scenarios
import datetime
now = datetime.datetime.now()
# Insert plans one by one to handle datetime values
db_client.execute(
f"""INSERT INTO pump_plans (
station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m,
suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id
) VALUES (
'STATION_001', 'PUMP_001', 150.0, NULL, NULL, 42.5,
'{now - datetime.timedelta(hours=1)}', '{now + datetime.timedelta(hours=1)}', 1, 'ACTIVE', 'OPT_RUN_001'
)"""
)
db_client.execute(
f"""INSERT INTO pump_plans (
station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m,
suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id
) VALUES (
'STATION_001', 'PUMP_002', NULL, NULL, 2.5, 38.0,
'{now - datetime.timedelta(minutes=30)}', '{now + datetime.timedelta(hours=2)}', 1, 'ACTIVE', 'OPT_RUN_002'
)"""
)
db_client.execute(
f"""INSERT INTO pump_plans (
station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m,
suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id
) VALUES (
'STATION_002', 'PUMP_003', NULL, 12.5, NULL, 36.0,
'{now - datetime.timedelta(hours=2)}', '{now + datetime.timedelta(hours=3)}', 1, 'ACTIVE', 'OPT_RUN_003'
)"""
)
db_client.execute(
f"""INSERT INTO pump_plans (
station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m,
suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id
) VALUES (
'STATION_003', 'PUMP_004', 200.0, NULL, NULL, 48.0,
'{now - datetime.timedelta(hours=1)}', '{now + datetime.timedelta(hours=4)}', 1, 'ACTIVE', 'OPT_RUN_004'
)"""
)
# Insert safety limits for all pumps
db_client.execute(
"""INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m,
hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds,
max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES
('STATION_001', 'PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'),
('STATION_001', 'PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'),
('STATION_002', 'PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin'),
('STATION_003', 'PUMP_004', 15.0, 70.0, 0.5, 6.0, 0.2, 1.0, 20.0, 250.0, 6, 300, 24, 10.0, 'admin', 'admin')"""
)
@pytest_asyncio.fixture
async def system_components(self, db_client):
"""Create all system components for integration testing."""
# Create auto discovery
auto_discovery = AutoDiscovery(db_client)
await auto_discovery.discover()
# Create safety components
safety_enforcer = SafetyLimitEnforcer(db_client)
emergency_stop_manager = EmergencyStopManager(db_client)
# Load safety limits
await safety_enforcer.load_safety_limits()
# Create mock alert manager for watchdog
mock_alert_manager = Mock()
watchdog = DatabaseWatchdog(db_client, mock_alert_manager)
# Create setpoint manager
setpoint_manager = SetpointManager(
discovery=auto_discovery,
db_client=db_client,
safety_enforcer=safety_enforcer,
emergency_stop_manager=emergency_stop_manager,
watchdog=watchdog
)
# Create security components
security_manager = SecurityManager()
audit_logger = ComplianceAuditLogger(db_client)
return {
'db_client': db_client,
'auto_discovery': auto_discovery,
'setpoint_manager': setpoint_manager,
'safety_enforcer': safety_enforcer,
'emergency_stop_manager': emergency_stop_manager,
'watchdog': watchdog,
'security_manager': security_manager,
'audit_logger': audit_logger
}
@pytest.mark.asyncio
async def test_optimization_plan_to_setpoint_calculation(self, system_components):
"""Test that optimization plans are correctly converted to setpoints."""
setpoint_manager = system_components['setpoint_manager']
# Test setpoint calculation for different pump types
test_cases = [
('STATION_001', 'PUMP_001', 42.5), # DIRECT_SPEED
('STATION_001', 'PUMP_002', 38.0), # LEVEL_CONTROL
('STATION_002', 'PUMP_003', 36.0), # POWER_OPTIMIZATION
('STATION_003', 'PUMP_004', 48.0), # DIRECT_SPEED (emergency)
]
for station_id, pump_id, expected_setpoint in test_cases:
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert setpoint == expected_setpoint, f"Setpoint mismatch for {station_id}/{pump_id}"
@pytest.mark.asyncio
async def test_opcua_server_setpoint_exposure(self, system_components):
"""Test that OPC UA server correctly exposes setpoints."""
setpoint_manager = system_components['setpoint_manager']
security_manager = system_components['security_manager']
audit_logger = system_components['audit_logger']
# Create OPC UA server
opcua_server = OPCUAServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=False, # Disable security for testing
endpoint="opc.tcp://127.0.0.1:4840"
)
try:
# Start server
await opcua_server.start()
# Wait for server to initialize
await asyncio.sleep(1)
# Test that setpoints are available
stations = setpoint_manager.discovery.get_stations()
for station_id in stations:
pumps = setpoint_manager.discovery.get_pumps(station_id)
for pump in pumps:
pump_id = pump['pump_id']
# Verify setpoint is calculated
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert setpoint is not None, f"No setpoint for {station_id}/{pump_id}"
# Verify setpoint is within safety limits
pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id)
min_speed = pump_info.get('min_speed_hz', 20.0)
max_speed = pump_info.get('max_speed_hz', 60.0)
assert min_speed <= setpoint <= max_speed, f"Setpoint out of range for {station_id}/{pump_id}"
finally:
# Clean up
await opcua_server.stop()
@pytest.mark.asyncio
async def test_modbus_server_setpoint_exposure(self, system_components):
"""Test that Modbus server correctly exposes setpoints."""
setpoint_manager = system_components['setpoint_manager']
security_manager = system_components['security_manager']
audit_logger = system_components['audit_logger']
# Create Modbus server
modbus_server = ModbusServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=False, # Disable security for testing
allowed_ips=["127.0.0.1"],
rate_limit_per_minute=100,
host="127.0.0.1",
port=5020
)
try:
# Start server
await modbus_server.start()
# Wait for server to initialize
await asyncio.sleep(1)
# Test that pump mapping is created
assert len(modbus_server.pump_addresses) > 0, "No pumps mapped to Modbus addresses"
# Verify each pump has a setpoint register
for (station_id, pump_id), addresses in modbus_server.pump_addresses.items():
assert 'setpoint_register' in addresses, f"No setpoint register for {station_id}/{pump_id}"
# Verify setpoint is calculated
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert setpoint is not None, f"No setpoint for {station_id}/{pump_id}"
finally:
# Clean up
await modbus_server.stop()
@pytest.mark.asyncio
async def test_rest_api_setpoint_exposure(self, system_components):
"""Test that REST API correctly exposes setpoints."""
setpoint_manager = system_components['setpoint_manager']
emergency_stop_manager = system_components['emergency_stop_manager']
# Create REST API server
rest_api = RESTAPIServer(
setpoint_manager=setpoint_manager,
emergency_stop_manager=emergency_stop_manager,
host="127.0.0.1",
port=8000
)
try:
# Start server
await rest_api.start()
# Wait for server to initialize
await asyncio.sleep(1)
# Test that setpoint endpoints are available
routes = [route.path for route in rest_api.app.routes]
assert "/api/v1/setpoints" in routes
assert "/api/v1/setpoints/{station_id}/{pump_id}" in routes
# Test setpoint retrieval via REST API
import httpx
async with httpx.AsyncClient() as client:
# Get all setpoints
response = await client.get("http://127.0.0.1:8000/api/v1/setpoints")
assert response.status_code == 200
setpoints = response.json()
assert len(setpoints) > 0
# Get specific setpoint
response = await client.get("http://127.0.0.1:8000/api/v1/setpoints/STATION_001/PUMP_001")
assert response.status_code == 200
setpoint_data = response.json()
assert 'setpoint_hz' in setpoint_data
assert setpoint_data['setpoint_hz'] == 42.5
finally:
# Clean up
await rest_api.stop()
@pytest.mark.asyncio
async def test_safety_limit_enforcement_integration(self, system_components):
"""Test that safety limits are enforced throughout the workflow."""
setpoint_manager = system_components['setpoint_manager']
safety_enforcer = system_components['safety_enforcer']
# Test safety limit enforcement for each pump
stations = setpoint_manager.discovery.get_stations()
for station_id in stations:
pumps = setpoint_manager.discovery.get_pumps(station_id)
for pump in pumps:
pump_id = pump['pump_id']
# Get calculated setpoint
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
# Verify safety enforcement
enforced_setpoint, violations = safety_enforcer.enforce_setpoint(
station_id, pump_id, setpoint
)
# Setpoint should be within limits after enforcement
pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id)
min_speed = pump_info.get('min_speed_hz', 20.0)
max_speed = pump_info.get('max_speed_hz', 60.0)
assert min_speed <= enforced_setpoint <= max_speed, \
f"Safety enforcement failed for {station_id}/{pump_id}"
@pytest.mark.asyncio
async def test_emergency_stop_integration(self, system_components):
"""Test emergency stop integration with setpoint calculation."""
setpoint_manager = system_components['setpoint_manager']
emergency_stop_manager = system_components['emergency_stop_manager']
# Test station/pump
station_id = 'STATION_001'
pump_id = 'PUMP_001'
# Get normal setpoint
normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert normal_setpoint is not None
# Activate emergency stop
emergency_stop_manager.emergency_stop_pump(station_id, pump_id, "integration_test")
# Setpoint should be 0 during emergency stop
emergency_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert emergency_setpoint == 0.0, "Setpoint not zero during emergency stop"
# Clear emergency stop
emergency_stop_manager.clear_emergency_stop_pump(station_id, pump_id, "integration_test")
# Setpoint should return to normal
recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert recovered_setpoint == normal_setpoint, "Setpoint did not recover after emergency stop"

View File

@ -0,0 +1,261 @@
"""
Integration tests for safety workflow scenarios.
Tests safety limit violations, emergency stop workflows, and failsafe mode operations.
"""
import pytest
import pytest_asyncio
from unittest.mock import Mock, AsyncMock
import asyncio
from src.database.flexible_client import FlexibleDatabaseClient
from src.core.auto_discovery import AutoDiscovery
from src.core.safety import SafetyLimitEnforcer
from src.core.emergency_stop import EmergencyStopManager
from src.core.setpoint_manager import SetpointManager
from src.core.security import SecurityManager
from src.core.compliance_audit import ComplianceAuditLogger
from src.monitoring.watchdog import DatabaseWatchdog
class TestSafetyWorkflows:
"""Test safety-related workflows and scenarios."""
@pytest_asyncio.fixture
async def db_client(self):
"""Create database client with safety test data."""
client = FlexibleDatabaseClient(
database_url="sqlite:///:memory:",
pool_size=5,
max_overflow=10,
pool_timeout=30
)
await client.connect()
client.create_tables()
# Insert safety test data
self._insert_safety_test_data(client)
return client
def _insert_safety_test_data(self, db_client):
"""Insert comprehensive safety test data."""
# Insert stations
db_client.execute(
"""INSERT INTO pump_stations (station_id, station_name, location) VALUES
('SAFETY_STATION_001', 'Safety Test Station 1', 'Test Location A'),
('SAFETY_STATION_002', 'Safety Test Station 2', 'Test Location B')"""
)
# Insert pumps with different safety configurations
db_client.execute(
"""INSERT INTO pumps (station_id, pump_id, pump_name, control_type,
min_speed_hz, max_speed_hz, default_setpoint_hz) VALUES
('SAFETY_STATION_001', 'SAFETY_PUMP_001', 'Safety Pump 1', 'DIRECT_SPEED', 20.0, 60.0, 35.0),
('SAFETY_STATION_001', 'SAFETY_PUMP_002', 'Safety Pump 2', 'LEVEL_CONTROLLED', 25.0, 55.0, 40.0),
('SAFETY_STATION_002', 'SAFETY_PUMP_003', 'Safety Pump 3', 'POWER_CONTROLLED', 30.0, 50.0, 38.0)"""
)
# Insert safety limits with different scenarios
db_client.execute(
"""INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m,
hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds,
max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES
('SAFETY_STATION_001', 'SAFETY_PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'),
('SAFETY_STATION_001', 'SAFETY_PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'),
('SAFETY_STATION_002', 'SAFETY_PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin')"""
)
# Insert optimization plans that might trigger safety limits
import datetime
now = datetime.datetime.now()
db_client.execute(
f"""INSERT INTO pump_plans (station_id, pump_id, interval_start, interval_end,
suggested_speed_hz, target_level_m, target_power_kw, plan_status) VALUES
('SAFETY_STATION_001', 'SAFETY_PUMP_001', '{now}', '{now + datetime.timedelta(hours=1)}', 65.0, 3.0, 12.0, 'ACTIVE'),
('SAFETY_STATION_001', 'SAFETY_PUMP_002', '{now}', '{now + datetime.timedelta(hours=1)}', 15.0, 2.5, 10.0, 'ACTIVE'),
('SAFETY_STATION_002', 'SAFETY_PUMP_003', '{now}', '{now + datetime.timedelta(hours=1)}', 45.0, 3.5, 8.0, 'ACTIVE')"""
)
@pytest_asyncio.fixture
async def safety_components(self, db_client):
"""Create safety-related components for testing."""
# Create auto discovery
auto_discovery = AutoDiscovery(db_client)
await auto_discovery.discover()
# Create safety components
safety_enforcer = SafetyLimitEnforcer(db_client)
emergency_stop_manager = EmergencyStopManager(db_client)
# Load safety limits
await safety_enforcer.load_safety_limits()
# Create mock alert manager for watchdog
mock_alert_manager = Mock()
watchdog = DatabaseWatchdog(db_client, mock_alert_manager)
# Create setpoint manager
setpoint_manager = SetpointManager(
discovery=auto_discovery,
db_client=db_client,
safety_enforcer=safety_enforcer,
emergency_stop_manager=emergency_stop_manager,
watchdog=watchdog
)
# Create security components
security_manager = SecurityManager()
audit_logger = ComplianceAuditLogger(db_client)
return {
'auto_discovery': auto_discovery,
'safety_enforcer': safety_enforcer,
'emergency_stop_manager': emergency_stop_manager,
'setpoint_manager': setpoint_manager,
'security_manager': security_manager,
'audit_logger': audit_logger,
'watchdog': watchdog
}
@pytest.mark.asyncio
async def test_safety_limit_violation_workflow(self, safety_components):
"""Test complete workflow for safety limit violations."""
setpoint_manager = safety_components['setpoint_manager']
safety_enforcer = safety_components['safety_enforcer']
# Test pump with optimization plan that exceeds safety limits
station_id = 'SAFETY_STATION_001'
pump_id = 'SAFETY_PUMP_001'
# Get setpoint (should be enforced to max limit)
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
# Verify setpoint is within safety limits
pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id)
max_speed = pump_info.get('max_speed_hz', 60.0)
# The setpoint should be enforced to the maximum safety limit (60.0)
# even though the optimization plan requests 65.0
assert setpoint <= max_speed, f"Setpoint {setpoint} exceeds safety limit {max_speed}"
assert setpoint == 60.0, f"Setpoint should be enforced to safety limit, got {setpoint}"
@pytest.mark.asyncio
async def test_emergency_stop_clearance_workflow(self, safety_components):
"""Test emergency stop activation and clearance workflow."""
setpoint_manager = safety_components['setpoint_manager']
emergency_stop_manager = safety_components['emergency_stop_manager']
station_id = 'SAFETY_STATION_001'
pump_id = 'SAFETY_PUMP_002'
# Get normal setpoint
normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert normal_setpoint is not None
# Activate emergency stop
emergency_stop_manager.emergency_stop_pump(station_id, pump_id, "test_workflow")
# Verify setpoint is 0 during emergency stop
emergency_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert emergency_setpoint == 0.0, "Setpoint not zero during emergency stop"
# Clear emergency stop
emergency_stop_manager.clear_emergency_stop_pump(station_id, pump_id, "test_clearance")
# Verify setpoint returns to normal
recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert recovered_setpoint == normal_setpoint, "Setpoint did not recover after emergency stop clearance"
@pytest.mark.asyncio
async def test_multiple_safety_violations(self, safety_components):
"""Test handling of multiple simultaneous safety violations."""
setpoint_manager = safety_components['setpoint_manager']
safety_enforcer = safety_components['safety_enforcer']
# Test multiple pumps with different safety violations
test_cases = [
('SAFETY_STATION_001', 'SAFETY_PUMP_001', 60.0), # Exceeds max speed (65.0 -> 60.0)
('SAFETY_STATION_001', 'SAFETY_PUMP_002', 25.0), # Below min speed (15.0 -> 25.0)
('SAFETY_STATION_002', 'SAFETY_PUMP_003', 45.0), # Within limits (45.0 -> 45.0)
]
for station_id, pump_id, expected_enforced_setpoint in test_cases:
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
# Verify setpoint is within safety limits
pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id)
min_speed = pump_info.get('min_speed_hz', 20.0)
max_speed = pump_info.get('max_speed_hz', 60.0)
assert min_speed <= setpoint <= max_speed, \
f"Setpoint {setpoint} out of range [{min_speed}, {max_speed}] for {station_id}/{pump_id}"
# Verify specific enforcement
assert setpoint == expected_enforced_setpoint, \
f"Setpoint enforcement failed for {station_id}/{pump_id}: expected {expected_enforced_setpoint}, got {setpoint}"
@pytest.mark.asyncio
async def test_safety_limit_dynamic_updates(self, safety_components, db_client):
"""Test that safety limit updates are reflected in real-time."""
setpoint_manager = safety_components['setpoint_manager']
safety_enforcer = safety_components['safety_enforcer']
station_id = 'SAFETY_STATION_001'
pump_id = 'SAFETY_PUMP_001'
# Get initial setpoint
initial_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
# Update safety limits to be more restrictive
db_client.execute(
"""UPDATE pump_safety_limits
SET hard_max_speed_hz = 45.0
WHERE station_id = 'SAFETY_STATION_001' AND pump_id = 'SAFETY_PUMP_001'"""
)
# Reload safety limits
await safety_enforcer.load_safety_limits()
# Get updated setpoint
updated_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
# Verify setpoint is now enforced to new limit
assert updated_setpoint <= 45.0, f"Setpoint {updated_setpoint} not enforced to new limit 45.0"
assert updated_setpoint < initial_setpoint, f"Setpoint should be lower after safety limit update: {updated_setpoint} < {initial_setpoint}"
@pytest.mark.asyncio
async def test_emergency_stop_cascade_effects(self, safety_components):
"""Test cascade effects of emergency stops on related systems."""
setpoint_manager = safety_components['setpoint_manager']
emergency_stop_manager = safety_components['emergency_stop_manager']
# Activate station-wide emergency stop
station_id = 'SAFETY_STATION_001'
emergency_stop_manager.emergency_stop_station(station_id, "station_emergency_test")
# Verify all pumps in the station are stopped
pumps = setpoint_manager.discovery.get_pumps(station_id)
for pump in pumps:
pump_id = pump['pump_id']
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert setpoint == 0.0, f"Pump {pump_id} not stopped during station emergency"
# Verify pumps in other stations are unaffected
other_station_id = 'SAFETY_STATION_002'
other_pumps = setpoint_manager.discovery.get_pumps(other_station_id)
for pump in other_pumps:
pump_id = pump['pump_id']
setpoint = setpoint_manager.get_current_setpoint(other_station_id, pump_id)
assert setpoint > 0.0, f"Pump {pump_id} incorrectly stopped during unrelated station emergency"
# Clear station emergency stop
emergency_stop_manager.clear_emergency_stop_station(station_id, "station_clearance_test")
# Verify pumps return to normal operation
for pump in pumps:
pump_id = pump['pump_id']
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
assert setpoint > 0.0, f"Pump {pump_id} not recovered after station emergency clearance"