353 lines
16 KiB
Python
353 lines
16 KiB
Python
|
|
"""
|
||
|
|
Failure Mode and Recovery Testing for Calejo Control Adapter.
|
||
|
|
|
||
|
|
Tests system behavior during failures and recovery scenarios including:
|
||
|
|
- Database connection loss
|
||
|
|
- Network connectivity issues
|
||
|
|
- Protocol server failures
|
||
|
|
- Safety system failures
|
||
|
|
- Emergency stop scenarios
|
||
|
|
- Resource exhaustion
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import pytest
|
||
|
|
import pytest_asyncio
|
||
|
|
from unittest.mock import Mock, patch, AsyncMock
|
||
|
|
import time
|
||
|
|
from typing import Dict, List, Any
|
||
|
|
|
||
|
|
from src.database.flexible_client import FlexibleDatabaseClient
|
||
|
|
from src.core.auto_discovery import AutoDiscovery
|
||
|
|
from src.core.setpoint_manager import SetpointManager
|
||
|
|
from src.core.safety import SafetyLimitEnforcer
|
||
|
|
from src.core.emergency_stop import EmergencyStopManager
|
||
|
|
from src.core.optimization_manager import OptimizationPlanManager
|
||
|
|
from src.core.security import SecurityManager
|
||
|
|
from src.core.compliance_audit import ComplianceAuditLogger
|
||
|
|
from src.protocols.opcua_server import OPCUAServer
|
||
|
|
from src.protocols.modbus_server import ModbusServer
|
||
|
|
from src.protocols.rest_api import RESTAPIServer
|
||
|
|
from src.monitoring.watchdog import DatabaseWatchdog
|
||
|
|
|
||
|
|
|
||
|
|
class TestFailureRecovery:
|
||
|
|
"""Failure mode and recovery testing for Calejo Control Adapter."""
|
||
|
|
|
||
|
|
@pytest_asyncio.fixture
|
||
|
|
async def failure_db_client(self):
|
||
|
|
"""Create database client for failure testing."""
|
||
|
|
client = FlexibleDatabaseClient("sqlite:///:memory:")
|
||
|
|
await client.connect()
|
||
|
|
client.create_tables()
|
||
|
|
|
||
|
|
# Insert failure test data
|
||
|
|
client.execute(
|
||
|
|
"""INSERT INTO pump_stations (station_id, station_name, location) VALUES
|
||
|
|
('FAIL_STATION_001', 'Failure Station 1', 'Test Area'),
|
||
|
|
('FAIL_STATION_002', 'Failure Station 2', 'Test Area')"""
|
||
|
|
)
|
||
|
|
|
||
|
|
client.execute(
|
||
|
|
"""INSERT INTO pumps (station_id, pump_id, pump_name, control_type, default_setpoint_hz) VALUES
|
||
|
|
('FAIL_STATION_001', 'FAIL_PUMP_001', 'Failure Pump 1', 'DIRECT_SPEED', 35.0),
|
||
|
|
('FAIL_STATION_001', 'FAIL_PUMP_002', 'Failure Pump 2', 'LEVEL_CONTROLLED', 40.0),
|
||
|
|
('FAIL_STATION_002', 'FAIL_PUMP_003', 'Failure Pump 3', 'POWER_CONTROLLED', 45.0)"""
|
||
|
|
)
|
||
|
|
|
||
|
|
client.execute(
|
||
|
|
"""INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
|
||
|
|
hard_min_level_m, hard_max_level_m, hard_max_power_kw, hard_max_flow_m3h,
|
||
|
|
emergency_stop_level_m, dry_run_protection_level_m, max_speed_change_hz_per_min) VALUES
|
||
|
|
('FAIL_STATION_001', 'FAIL_PUMP_001', 20.0, 70.0, 0.5, 5.0, 100.0, 500.0, 4.8, 0.6, 10.0),
|
||
|
|
('FAIL_STATION_001', 'FAIL_PUMP_002', 25.0, 65.0, 0.5, 4.5, 90.0, 450.0, 4.3, 0.6, 10.0),
|
||
|
|
('FAIL_STATION_002', 'FAIL_PUMP_003', 30.0, 60.0, 0.5, 4.0, 80.0, 400.0, 3.8, 0.6, 10.0)"""
|
||
|
|
)
|
||
|
|
|
||
|
|
client.execute(
|
||
|
|
"""INSERT INTO pump_plans (station_id, pump_id, interval_start, interval_end,
|
||
|
|
suggested_speed_hz, target_flow_m3h, target_power_kw, plan_version, optimization_run_id, plan_status) VALUES
|
||
|
|
('FAIL_STATION_001', 'FAIL_PUMP_001', datetime('now', '-1 hour'), datetime('now', '+1 hour'),
|
||
|
|
42.5, 320.0, 65.0, 1, 'FAIL_OPT_001', 'ACTIVE'),
|
||
|
|
('FAIL_STATION_001', 'FAIL_PUMP_002', datetime('now', '-1 hour'), datetime('now', '+1 hour'),
|
||
|
|
38.0, 280.0, 55.0, 1, 'FAIL_OPT_001', 'ACTIVE')"""
|
||
|
|
)
|
||
|
|
|
||
|
|
return client
|
||
|
|
|
||
|
|
@pytest_asyncio.fixture
|
||
|
|
async def failure_components(self, failure_db_client):
|
||
|
|
"""Create all components for failure testing."""
|
||
|
|
discovery = AutoDiscovery(failure_db_client)
|
||
|
|
await discovery.discover()
|
||
|
|
|
||
|
|
safety_enforcer = SafetyLimitEnforcer(failure_db_client)
|
||
|
|
await safety_enforcer.load_safety_limits()
|
||
|
|
emergency_stop_manager = EmergencyStopManager(failure_db_client)
|
||
|
|
watchdog = DatabaseWatchdog(failure_db_client, alert_manager=None, timeout_seconds=6) # Short timeout for testing
|
||
|
|
|
||
|
|
setpoint_manager = SetpointManager(
|
||
|
|
db_client=failure_db_client,
|
||
|
|
discovery=discovery,
|
||
|
|
safety_enforcer=safety_enforcer,
|
||
|
|
emergency_stop_manager=emergency_stop_manager,
|
||
|
|
watchdog=watchdog
|
||
|
|
)
|
||
|
|
await setpoint_manager.start()
|
||
|
|
|
||
|
|
optimization_manager = OptimizationPlanManager(failure_db_client)
|
||
|
|
security_manager = SecurityManager()
|
||
|
|
audit_logger = ComplianceAuditLogger(failure_db_client)
|
||
|
|
|
||
|
|
# Initialize protocol servers with mock transports
|
||
|
|
opcua_server = OPCUAServer(
|
||
|
|
setpoint_manager=setpoint_manager,
|
||
|
|
security_manager=security_manager,
|
||
|
|
audit_logger=audit_logger,
|
||
|
|
enable_security=False, # Disable security for testing
|
||
|
|
endpoint="opc.tcp://127.0.0.1:4840"
|
||
|
|
)
|
||
|
|
|
||
|
|
modbus_server = ModbusServer(
|
||
|
|
setpoint_manager=setpoint_manager,
|
||
|
|
security_manager=security_manager,
|
||
|
|
audit_logger=audit_logger,
|
||
|
|
host="127.0.0.1",
|
||
|
|
port=5020
|
||
|
|
)
|
||
|
|
|
||
|
|
rest_api_server = RESTAPIServer(
|
||
|
|
setpoint_manager=setpoint_manager,
|
||
|
|
emergency_stop_manager=emergency_stop_manager,
|
||
|
|
host="127.0.0.1",
|
||
|
|
port=8000
|
||
|
|
)
|
||
|
|
|
||
|
|
return {
|
||
|
|
'db_client': failure_db_client,
|
||
|
|
'discovery': discovery,
|
||
|
|
'safety_enforcer': safety_enforcer,
|
||
|
|
'emergency_stop_manager': emergency_stop_manager,
|
||
|
|
'watchdog': watchdog,
|
||
|
|
'setpoint_manager': setpoint_manager,
|
||
|
|
'optimization_manager': optimization_manager,
|
||
|
|
'security_manager': security_manager,
|
||
|
|
'audit_logger': audit_logger,
|
||
|
|
'opcua_server': opcua_server,
|
||
|
|
'modbus_server': modbus_server,
|
||
|
|
'rest_api_server': rest_api_server
|
||
|
|
}
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_database_connection_loss_recovery(self, failure_components):
|
||
|
|
"""Test system behavior during database connection loss and recovery."""
|
||
|
|
db_client = failure_components['db_client']
|
||
|
|
setpoint_manager = failure_components['setpoint_manager']
|
||
|
|
|
||
|
|
# Get initial setpoint
|
||
|
|
initial_setpoint = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
assert initial_setpoint is not None
|
||
|
|
|
||
|
|
# Simulate database connection loss
|
||
|
|
with patch.object(db_client, 'execute', side_effect=Exception("Database connection lost")):
|
||
|
|
# System should handle database errors gracefully
|
||
|
|
try:
|
||
|
|
setpoint = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
# If we get here, system should return failsafe/default value
|
||
|
|
assert setpoint is not None
|
||
|
|
assert 20.0 <= setpoint <= 70.0 # Within safety limits
|
||
|
|
except Exception as e:
|
||
|
|
# Exception is acceptable if handled gracefully
|
||
|
|
assert "Database" in str(e) or "connection" in str(e)
|
||
|
|
|
||
|
|
# Test recovery after connection restored
|
||
|
|
setpoint_after_recovery = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
assert setpoint_after_recovery is not None
|
||
|
|
|
||
|
|
print(f"Database failure recovery test completed successfully")
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_failsafe_mode_activation(self, failure_components):
|
||
|
|
"""Test failsafe mode activation when database updates stop."""
|
||
|
|
db_client = failure_components['db_client']
|
||
|
|
watchdog = failure_components['watchdog']
|
||
|
|
setpoint_manager = failure_components['setpoint_manager']
|
||
|
|
|
||
|
|
# Start watchdog monitoring
|
||
|
|
await watchdog.start()
|
||
|
|
|
||
|
|
# Get initial setpoint
|
||
|
|
initial_setpoint = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
|
||
|
|
# Simulate no database updates for longer than timeout
|
||
|
|
await asyncio.sleep(10) # Wait for watchdog timeout (6 seconds)
|
||
|
|
|
||
|
|
# Check if failsafe mode is active
|
||
|
|
failsafe_active = watchdog.is_failsafe_active('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
|
||
|
|
# In failsafe mode, setpoints should use default values
|
||
|
|
if failsafe_active:
|
||
|
|
failsafe_setpoint = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
# Should use default setpoint (35.0 from pump configuration)
|
||
|
|
assert failsafe_setpoint == 35.0
|
||
|
|
|
||
|
|
# Simulate database update to recover from failsafe
|
||
|
|
db_client.execute(
|
||
|
|
"UPDATE pump_plans SET suggested_speed_hz = 45.0 WHERE station_id = 'FAIL_STATION_001' AND pump_id = 'FAIL_PUMP_001'"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Wait for watchdog to detect update
|
||
|
|
await asyncio.sleep(2)
|
||
|
|
|
||
|
|
# Check if failsafe mode is cleared
|
||
|
|
failsafe_cleared = not watchdog.is_failsafe_active('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
|
||
|
|
print(f"Failsafe mode test: active={failsafe_active}, cleared={failsafe_cleared}")
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_emergency_stop_override(self, failure_components):
|
||
|
|
"""Test emergency stop override during normal operation."""
|
||
|
|
emergency_stop_manager = failure_components['emergency_stop_manager']
|
||
|
|
setpoint_manager = failure_components['setpoint_manager']
|
||
|
|
|
||
|
|
# Get normal setpoint
|
||
|
|
normal_setpoint = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
assert normal_setpoint is not None
|
||
|
|
|
||
|
|
# Activate emergency stop for station
|
||
|
|
emergency_stop_manager.emergency_stop_station('FAIL_STATION_001', 'test_operator')
|
||
|
|
|
||
|
|
# Get setpoint during emergency stop
|
||
|
|
emergency_setpoint = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
|
||
|
|
# During emergency stop, should be 0.0 to stop pumps
|
||
|
|
assert emergency_setpoint == 0.0 # Emergency stop should set pumps to 0 Hz
|
||
|
|
|
||
|
|
# Clear emergency stop
|
||
|
|
emergency_stop_manager.clear_emergency_stop_station('FAIL_STATION_001', 'test_operator')
|
||
|
|
|
||
|
|
# Verify normal operation resumes
|
||
|
|
recovered_setpoint = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
assert recovered_setpoint is not None
|
||
|
|
|
||
|
|
print(f"Emergency stop override test completed: normal={normal_setpoint}, emergency={emergency_setpoint}, recovered={recovered_setpoint}")
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_safety_limit_enforcement_failure(self, failure_components):
|
||
|
|
"""Test safety system behavior when limits cannot be retrieved."""
|
||
|
|
safety_enforcer = failure_components['safety_enforcer']
|
||
|
|
|
||
|
|
# Test normal safety enforcement
|
||
|
|
safe_setpoint, violations = safety_enforcer.enforce_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001', 50.0)
|
||
|
|
# The setpoint might be adjusted based on safety limits, so we check it's within bounds
|
||
|
|
assert safe_setpoint is not None
|
||
|
|
assert 20.0 <= safe_setpoint <= 70.0 # Within safety limits
|
||
|
|
|
||
|
|
# Simulate safety limit retrieval failure
|
||
|
|
with patch.object(safety_enforcer.db_client, 'execute', side_effect=Exception("Safety limits unavailable")):
|
||
|
|
# System should handle safety limit retrieval failure
|
||
|
|
try:
|
||
|
|
safe_setpoint, violations = safety_enforcer.enforce_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001', 50.0)
|
||
|
|
# If we get here, should use conservative defaults
|
||
|
|
assert safe_setpoint is not None
|
||
|
|
assert 20.0 <= safe_setpoint <= 70.0 # Conservative range
|
||
|
|
except Exception as e:
|
||
|
|
# Exception is acceptable if handled gracefully
|
||
|
|
assert "Safety" in str(e) or "limit" in str(e)
|
||
|
|
|
||
|
|
print(f"Safety limit enforcement failure test completed")
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_protocol_server_failure_recovery(self, failure_components):
|
||
|
|
"""Test protocol server failure and recovery scenarios."""
|
||
|
|
opcua_server = failure_components['opcua_server']
|
||
|
|
modbus_server = failure_components['modbus_server']
|
||
|
|
rest_api_server = failure_components['rest_api_server']
|
||
|
|
|
||
|
|
# Test OPC UA server error handling
|
||
|
|
with patch.object(opcua_server, '_update_setpoints_loop', side_effect=Exception("OPC UA server error")):
|
||
|
|
try:
|
||
|
|
await opcua_server.start()
|
||
|
|
# Server should handle startup errors gracefully
|
||
|
|
except Exception as e:
|
||
|
|
assert "OPC UA" in str(e) or "server" in str(e)
|
||
|
|
|
||
|
|
# Test Modbus server error handling
|
||
|
|
with patch.object(modbus_server, '_update_registers_loop', side_effect=Exception("Modbus server error")):
|
||
|
|
try:
|
||
|
|
await modbus_server.start()
|
||
|
|
# Server should handle startup errors gracefully
|
||
|
|
except Exception as e:
|
||
|
|
assert "Modbus" in str(e) or "server" in str(e)
|
||
|
|
|
||
|
|
# Test REST API server error handling
|
||
|
|
with patch.object(rest_api_server, 'start', side_effect=Exception("REST API server error")):
|
||
|
|
try:
|
||
|
|
await rest_api_server.start()
|
||
|
|
# Server should handle startup errors gracefully
|
||
|
|
except Exception as e:
|
||
|
|
assert "REST" in str(e) or "API" in str(e)
|
||
|
|
|
||
|
|
print(f"Protocol server failure recovery test completed")
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
@pytest.mark.xfail(reason="SQLite has limitations with concurrent database access")
|
||
|
|
async def test_resource_exhaustion_handling(self, failure_components):
|
||
|
|
"""Test system behavior under resource exhaustion conditions."""
|
||
|
|
setpoint_manager = failure_components['setpoint_manager']
|
||
|
|
|
||
|
|
# Simulate memory pressure by creating many concurrent requests
|
||
|
|
tasks = []
|
||
|
|
for i in range(10): # Reduced concurrent load to avoid overwhelming SQLite
|
||
|
|
# Since get_current_setpoint is synchronous, we can just call it directly
|
||
|
|
task = asyncio.create_task(
|
||
|
|
asyncio.to_thread(setpoint_manager.get_current_setpoint, 'FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
)
|
||
|
|
tasks.append(task)
|
||
|
|
|
||
|
|
# Wait for all tasks to complete
|
||
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
|
|
||
|
|
# Verify system handled load gracefully
|
||
|
|
successful_results = [r for r in results if not isinstance(r, Exception)]
|
||
|
|
failed_results = [r for r in results if isinstance(r, Exception)]
|
||
|
|
|
||
|
|
# Under extreme concurrent load, some failures are expected
|
||
|
|
# but we should still have some successful requests
|
||
|
|
assert len(successful_results) > 0, f"No successful requests under load: {failed_results[0] if failed_results else 'No errors'}"
|
||
|
|
|
||
|
|
# Log the results for debugging
|
||
|
|
print(f"Resource exhaustion test: {len(successful_results)} successful, {len(failed_results)} failed")
|
||
|
|
|
||
|
|
# All successful results should be valid setpoints
|
||
|
|
for result in successful_results:
|
||
|
|
assert result is not None
|
||
|
|
assert 20.0 <= result <= 70.0
|
||
|
|
|
||
|
|
print(f"Resource exhaustion test: {len(successful_results)} successful, {len(failed_results)} failed")
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_graceful_shutdown_and_restart(self, failure_components):
|
||
|
|
"""Test graceful shutdown and restart procedures."""
|
||
|
|
setpoint_manager = failure_components['setpoint_manager']
|
||
|
|
watchdog = failure_components['watchdog']
|
||
|
|
|
||
|
|
# Get current state
|
||
|
|
initial_setpoint = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
|
||
|
|
# Perform graceful shutdown
|
||
|
|
await setpoint_manager.stop()
|
||
|
|
await watchdog.stop()
|
||
|
|
|
||
|
|
# Verify components are stopped
|
||
|
|
# Note: We can't directly check private attributes, so we'll just verify the operations completed
|
||
|
|
|
||
|
|
# Simulate restart
|
||
|
|
await setpoint_manager.start()
|
||
|
|
await watchdog.start()
|
||
|
|
|
||
|
|
# Verify normal operation after restart
|
||
|
|
restarted_setpoint = setpoint_manager.get_current_setpoint('FAIL_STATION_001', 'FAIL_PUMP_001')
|
||
|
|
assert restarted_setpoint is not None
|
||
|
|
|
||
|
|
print(f"Graceful shutdown and restart test completed: initial={initial_setpoint}, restarted={restarted_setpoint}")
|