Compare commits
4 Commits
f8623e9ec7
...
ad4b0fb7a2
| Author | SHA1 | Date |
|---|---|---|
|
|
ad4b0fb7a2 | |
|
|
20b781feac | |
|
|
ab890f923d | |
|
|
bfb52a5c45 |
|
|
@ -57,7 +57,7 @@ The Calejo Control Adapter translates optimized pump control plans from Calejo O
|
||||||
- Performance optimization
|
- Performance optimization
|
||||||
- Monitoring and alerting
|
- Monitoring and alerting
|
||||||
|
|
||||||
**Current Status**: All 164 tests passing (100% success rate)
|
**Current Status**: All 220 tests passing (100% success rate)
|
||||||
|
|
||||||
**Recent Updates**:
|
**Recent Updates**:
|
||||||
- SetpointManager fully integrated with main application
|
- SetpointManager fully integrated with main application
|
||||||
|
|
@ -65,6 +65,8 @@ The Calejo Control Adapter translates optimized pump control plans from Calejo O
|
||||||
- Fixed configuration settings and database pool parameters
|
- Fixed configuration settings and database pool parameters
|
||||||
- Updated protocol server initializations
|
- Updated protocol server initializations
|
||||||
- Verified main application starts and stops gracefully
|
- Verified main application starts and stops gracefully
|
||||||
|
- **Fixed main application integration with enhanced protocol servers**
|
||||||
|
- **All 220 tests passing (100% success rate)**
|
||||||
|
|
||||||
### Key Features
|
### Key Features
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -163,7 +163,7 @@ class SetpointManager:
|
||||||
station_id=station_id,
|
station_id=station_id,
|
||||||
pump_id=pump_id
|
pump_id=pump_id
|
||||||
)
|
)
|
||||||
return self._get_default_setpoint(station_id, pump_id)
|
return 0.0 # Complete stop during emergency
|
||||||
|
|
||||||
# Check failsafe mode
|
# Check failsafe mode
|
||||||
if self.watchdog.is_failsafe_active(station_id, pump_id):
|
if self.watchdog.is_failsafe_active(station_id, pump_id):
|
||||||
|
|
@ -199,7 +199,7 @@ class SetpointManager:
|
||||||
setpoint = calculator.calculate_setpoint(plan, feedback, pump_info)
|
setpoint = calculator.calculate_setpoint(plan, feedback, pump_info)
|
||||||
|
|
||||||
# Enforce safety limits (LAST LINE OF DEFENSE)
|
# Enforce safety limits (LAST LINE OF DEFENSE)
|
||||||
safe_setpoint = self.safety_enforcer.enforce_limits(
|
safe_setpoint, violations = self.safety_enforcer.enforce_setpoint(
|
||||||
station_id, pump_id, setpoint
|
station_id, pump_id, setpoint
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -224,8 +224,7 @@ class SetpointManager:
|
||||||
"""
|
"""
|
||||||
setpoints = {}
|
setpoints = {}
|
||||||
|
|
||||||
for station in self.discovery.get_stations():
|
for station_id, station_data in self.discovery.get_stations().items():
|
||||||
station_id = station['station_id']
|
|
||||||
setpoints[station_id] = {}
|
setpoints[station_id] = {}
|
||||||
|
|
||||||
for pump in self.discovery.get_pumps(station_id):
|
for pump in self.discovery.get_pumps(station_id):
|
||||||
|
|
@ -245,9 +244,12 @@ class SetpointManager:
|
||||||
query = """
|
query = """
|
||||||
SELECT default_setpoint_hz
|
SELECT default_setpoint_hz
|
||||||
FROM pumps
|
FROM pumps
|
||||||
WHERE station_id = %s AND pump_id = %s
|
WHERE station_id = :station_id AND pump_id = :pump_id
|
||||||
"""
|
"""
|
||||||
result = self.db_client.execute_query(query, (station_id, pump_id))
|
result = self.db_client.execute_query(query, {
|
||||||
|
'station_id': station_id,
|
||||||
|
'pump_id': pump_id
|
||||||
|
})
|
||||||
|
|
||||||
if result and result[0]['default_setpoint_hz']:
|
if result and result[0]['default_setpoint_hz']:
|
||||||
return float(result[0]['default_setpoint_hz'])
|
return float(result[0]['default_setpoint_hz'])
|
||||||
|
|
|
||||||
|
|
@ -416,11 +416,10 @@ class FlexibleDatabaseClient:
|
||||||
def get_safety_limits(self) -> List[Dict[str, Any]]:
|
def get_safety_limits(self) -> List[Dict[str, Any]]:
|
||||||
"""Get safety limits for all pumps."""
|
"""Get safety limits for all pumps."""
|
||||||
query = """
|
query = """
|
||||||
SELECT station_id, pump_id, min_speed_hz as hard_min_speed_hz,
|
SELECT station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
|
||||||
max_speed_hz as hard_max_speed_hz,
|
hard_min_level_m, hard_max_level_m, hard_max_power_kw,
|
||||||
NULL as hard_min_level_m, NULL as hard_max_level_m,
|
max_speed_change_hz_per_min
|
||||||
NULL as hard_max_power_kw, 5.0 as max_speed_change_hz_per_min
|
FROM pump_safety_limits
|
||||||
FROM pumps
|
|
||||||
"""
|
"""
|
||||||
return self.execute_query(query)
|
return self.execute_query(query)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -172,10 +172,15 @@ class DatabaseWatchdog:
|
||||||
try:
|
try:
|
||||||
query = """
|
query = """
|
||||||
INSERT INTO failsafe_events
|
INSERT INTO failsafe_events
|
||||||
(station_id, pump_id, default_setpoint_hz, timestamp)
|
(station_id, pump_id, default_setpoint, timestamp)
|
||||||
VALUES (%s, %s, %s, NOW())
|
VALUES (:station_id, :pump_id, :default_setpoint, :timestamp)
|
||||||
"""
|
"""
|
||||||
self.db_client.execute(query, (station_id, pump_id, default_setpoint))
|
self.db_client.execute(query, {
|
||||||
|
'station_id': station_id,
|
||||||
|
'pump_id': pump_id,
|
||||||
|
'default_setpoint': default_setpoint,
|
||||||
|
'timestamp': datetime.now()
|
||||||
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("failed_to_record_failsafe_event", error=str(e))
|
logger.error("failed_to_record_failsafe_event", error=str(e))
|
||||||
|
|
||||||
|
|
@ -185,9 +190,13 @@ class DatabaseWatchdog:
|
||||||
query = """
|
query = """
|
||||||
INSERT INTO failsafe_events
|
INSERT INTO failsafe_events
|
||||||
(station_id, pump_id, event_type, timestamp)
|
(station_id, pump_id, event_type, timestamp)
|
||||||
VALUES (%s, %s, 'DEACTIVATED', NOW())
|
VALUES (:station_id, :pump_id, 'DEACTIVATED', :timestamp)
|
||||||
"""
|
"""
|
||||||
self.db_client.execute(query, (station_id, pump_id))
|
self.db_client.execute(query, {
|
||||||
|
'station_id': station_id,
|
||||||
|
'pump_id': pump_id,
|
||||||
|
'timestamp': datetime.now()
|
||||||
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("failed_to_record_failsafe_deactivation", error=str(e))
|
logger.error("failed_to_record_failsafe_deactivation", error=str(e))
|
||||||
|
|
||||||
|
|
@ -201,6 +210,86 @@ class DatabaseWatchdog:
|
||||||
key = (station_id, pump_id)
|
key = (station_id, pump_id)
|
||||||
return self.last_update_times.get(key)
|
return self.last_update_times.get(key)
|
||||||
|
|
||||||
|
async def activate_failsafe_mode(self, station_id: str, pump_id: str, reason: str):
|
||||||
|
"""
|
||||||
|
Manually activate failsafe mode for testing purposes.
|
||||||
|
|
||||||
|
This method is intended for testing scenarios where failsafe mode
|
||||||
|
needs to be triggered manually, rather than waiting for automatic
|
||||||
|
detection of stale data.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
station_id: Station identifier
|
||||||
|
pump_id: Pump identifier
|
||||||
|
reason: Reason for manual activation (for logging)
|
||||||
|
"""
|
||||||
|
logger.info(
|
||||||
|
"manual_failsafe_activation",
|
||||||
|
station_id=station_id,
|
||||||
|
pump_id=pump_id,
|
||||||
|
reason=reason
|
||||||
|
)
|
||||||
|
# Use a large time_since_update to trigger failsafe
|
||||||
|
await self._activate_failsafe(station_id, pump_id, self.timeout_seconds + 1)
|
||||||
|
|
||||||
|
async def activate_failsafe_mode_station(self, station_id: str, reason: str):
|
||||||
|
"""
|
||||||
|
Manually activate failsafe mode for all pumps in a station.
|
||||||
|
|
||||||
|
This method is intended for testing scenarios where station-wide
|
||||||
|
failsafe mode needs to be triggered manually.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
station_id: Station identifier
|
||||||
|
reason: Reason for manual activation (for logging)
|
||||||
|
"""
|
||||||
|
logger.info(
|
||||||
|
"manual_failsafe_activation_station",
|
||||||
|
station_id=station_id,
|
||||||
|
reason=reason
|
||||||
|
)
|
||||||
|
# Get all pumps in the station
|
||||||
|
pumps = self.db_client.get_pumps(station_id)
|
||||||
|
for pump in pumps:
|
||||||
|
await self.activate_failsafe_mode(station_id, pump['pump_id'], reason)
|
||||||
|
|
||||||
|
async def clear_failsafe_mode(self, station_id: str, pump_id: str):
|
||||||
|
"""
|
||||||
|
Manually clear failsafe mode for a pump.
|
||||||
|
|
||||||
|
This method is intended for testing scenarios where failsafe mode
|
||||||
|
needs to be cleared manually.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
station_id: Station identifier
|
||||||
|
pump_id: Pump identifier
|
||||||
|
"""
|
||||||
|
logger.info(
|
||||||
|
"manual_failsafe_clear",
|
||||||
|
station_id=station_id,
|
||||||
|
pump_id=pump_id
|
||||||
|
)
|
||||||
|
await self._deactivate_failsafe(station_id, pump_id)
|
||||||
|
|
||||||
|
async def clear_failsafe_mode_station(self, station_id: str):
|
||||||
|
"""
|
||||||
|
Manually clear failsafe mode for all pumps in a station.
|
||||||
|
|
||||||
|
This method is intended for testing scenarios where station-wide
|
||||||
|
failsafe mode needs to be cleared manually.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
station_id: Station identifier
|
||||||
|
"""
|
||||||
|
logger.info(
|
||||||
|
"manual_failsafe_clear_station",
|
||||||
|
station_id=station_id
|
||||||
|
)
|
||||||
|
# Get all pumps in the station
|
||||||
|
pumps = self.db_client.get_pumps(station_id)
|
||||||
|
for pump in pumps:
|
||||||
|
await self.clear_failsafe_mode(station_id, pump['pump_id'])
|
||||||
|
|
||||||
def get_status(self) -> Dict[str, Any]:
|
def get_status(self) -> Dict[str, Any]:
|
||||||
"""Get watchdog status information."""
|
"""Get watchdog status information."""
|
||||||
current_time = datetime.now()
|
current_time = datetime.now()
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ Provides REST endpoints for emergency stop, status monitoring, and setpoint acce
|
||||||
Enhanced with OpenAPI documentation and performance optimizations for Phase 5.
|
Enhanced with OpenAPI documentation and performance optimizations for Phase 5.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
from typing import Optional, Dict, Any, Tuple
|
from typing import Optional, Dict, Any, Tuple
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
import structlog
|
import structlog
|
||||||
|
|
@ -180,6 +181,11 @@ class RESTAPIServer:
|
||||||
self.enable_compression = enable_compression
|
self.enable_compression = enable_compression
|
||||||
self.cache_ttl_seconds = cache_ttl_seconds
|
self.cache_ttl_seconds = cache_ttl_seconds
|
||||||
|
|
||||||
|
# Server state
|
||||||
|
self._server_task = None
|
||||||
|
self._server = None
|
||||||
|
self._is_running = False
|
||||||
|
|
||||||
# Performance tracking
|
# Performance tracking
|
||||||
self.total_requests = 0
|
self.total_requests = 0
|
||||||
self.cache_hits = 0
|
self.cache_hits = 0
|
||||||
|
|
@ -660,9 +666,13 @@ class RESTAPIServer:
|
||||||
)
|
)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
"""Start the REST API server."""
|
"""Start the REST API server in a non-blocking background task."""
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
||||||
|
if self._is_running:
|
||||||
|
logger.warning("rest_api_server_already_running")
|
||||||
|
return
|
||||||
|
|
||||||
# Get TLS configuration
|
# Get TLS configuration
|
||||||
tls_manager = get_tls_manager()
|
tls_manager = get_tls_manager()
|
||||||
ssl_config = tls_manager.get_rest_api_ssl_config()
|
ssl_config = tls_manager.get_rest_api_ssl_config()
|
||||||
|
|
@ -690,13 +700,39 @@ class RESTAPIServer:
|
||||||
})
|
})
|
||||||
|
|
||||||
config = uvicorn.Config(**config_kwargs)
|
config = uvicorn.Config(**config_kwargs)
|
||||||
server = uvicorn.Server(config)
|
self._server = uvicorn.Server(config)
|
||||||
await server.serve()
|
|
||||||
|
# Start server in background task (non-blocking)
|
||||||
|
self._server_task = asyncio.create_task(self._server.serve())
|
||||||
|
self._is_running = True
|
||||||
|
|
||||||
|
# Wait briefly for server to start accepting connections
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
"""Stop the REST API server."""
|
"""Stop the REST API server."""
|
||||||
|
if not self._is_running:
|
||||||
|
logger.warning("rest_api_server_not_running")
|
||||||
|
return
|
||||||
|
|
||||||
logger.info("rest_api_server_stopping")
|
logger.info("rest_api_server_stopping")
|
||||||
|
|
||||||
|
if self._server:
|
||||||
|
self._server.should_exit = True
|
||||||
|
|
||||||
|
if self._server_task:
|
||||||
|
self._server_task.cancel()
|
||||||
|
try:
|
||||||
|
await self._server_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._server = None
|
||||||
|
self._server_task = None
|
||||||
|
self._is_running = False
|
||||||
|
|
||||||
|
logger.info("rest_api_server_stopped")
|
||||||
|
|
||||||
def get_performance_status(self) -> Dict[str, Any]:
|
def get_performance_status(self) -> Dict[str, Any]:
|
||||||
"""Get performance status information."""
|
"""Get performance status information."""
|
||||||
cache_stats = self.response_cache.get_stats() if self.response_cache else {
|
cache_stats = self.response_cache.get_stats() if self.response_cache else {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,291 @@
|
||||||
|
"""
|
||||||
|
Integration tests for failsafe mode operations.
|
||||||
|
|
||||||
|
Tests failsafe mode activation, operation, and recovery scenarios.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
from unittest.mock import Mock, AsyncMock
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from src.database.flexible_client import FlexibleDatabaseClient
|
||||||
|
from src.core.auto_discovery import AutoDiscovery
|
||||||
|
from src.core.safety import SafetyLimitEnforcer
|
||||||
|
from src.core.emergency_stop import EmergencyStopManager
|
||||||
|
from src.core.setpoint_manager import SetpointManager
|
||||||
|
from src.core.security import SecurityManager
|
||||||
|
from src.core.compliance_audit import ComplianceAuditLogger
|
||||||
|
from src.monitoring.watchdog import DatabaseWatchdog
|
||||||
|
|
||||||
|
|
||||||
|
class TestFailsafeOperations:
|
||||||
|
"""Test failsafe mode operations and scenarios."""
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def db_client(self):
|
||||||
|
"""Create database client with failsafe test data."""
|
||||||
|
client = FlexibleDatabaseClient(
|
||||||
|
database_url="sqlite:///:memory:",
|
||||||
|
pool_size=5,
|
||||||
|
max_overflow=10,
|
||||||
|
pool_timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
|
await client.connect()
|
||||||
|
client.create_tables()
|
||||||
|
|
||||||
|
# Insert failsafe test data
|
||||||
|
self._insert_failsafe_test_data(client)
|
||||||
|
return client
|
||||||
|
|
||||||
|
def _insert_failsafe_test_data(self, db_client):
|
||||||
|
"""Insert comprehensive failsafe test data."""
|
||||||
|
# Insert stations
|
||||||
|
db_client.execute(
|
||||||
|
"""INSERT INTO pump_stations (station_id, station_name, location) VALUES
|
||||||
|
('FAILSAFE_STATION_001', 'Failsafe Test Station 1', 'Test Location A'),
|
||||||
|
('FAILSAFE_STATION_002', 'Failsafe Test Station 2', 'Test Location B')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert pumps with different failsafe configurations
|
||||||
|
db_client.execute(
|
||||||
|
"""INSERT INTO pumps (station_id, pump_id, pump_name, control_type,
|
||||||
|
min_speed_hz, max_speed_hz, default_setpoint_hz) VALUES
|
||||||
|
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_001', 'Failsafe Pump 1', 'DIRECT_SPEED', 20.0, 60.0, 35.0),
|
||||||
|
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_002', 'Failsafe Pump 2', 'LEVEL_CONTROLLED', 25.0, 55.0, 40.0),
|
||||||
|
('FAILSAFE_STATION_002', 'FAILSAFE_PUMP_003', 'Failsafe Pump 3', 'POWER_CONTROLLED', 30.0, 50.0, 38.0)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert safety limits
|
||||||
|
db_client.execute(
|
||||||
|
"""INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
|
||||||
|
hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m,
|
||||||
|
hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds,
|
||||||
|
max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES
|
||||||
|
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('FAILSAFE_STATION_002', 'FAILSAFE_PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert optimization plans
|
||||||
|
import datetime
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
db_client.execute(
|
||||||
|
f"""INSERT INTO pump_plans (station_id, pump_id, interval_start, interval_end,
|
||||||
|
suggested_speed_hz, target_level_m, target_power_kw, plan_status) VALUES
|
||||||
|
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_001', '{now}', '{now + datetime.timedelta(hours=1)}', 42.5, 3.0, 12.0, 'ACTIVE'),
|
||||||
|
('FAILSAFE_STATION_001', 'FAILSAFE_PUMP_002', '{now}', '{now + datetime.timedelta(hours=1)}', 38.0, 2.5, 10.0, 'ACTIVE'),
|
||||||
|
('FAILSAFE_STATION_002', 'FAILSAFE_PUMP_003', '{now}', '{now + datetime.timedelta(hours=1)}', 36.0, 3.5, 8.0, 'ACTIVE')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def failsafe_components(self, db_client):
|
||||||
|
"""Create failsafe-related components for testing."""
|
||||||
|
# Create auto discovery
|
||||||
|
auto_discovery = AutoDiscovery(db_client)
|
||||||
|
await auto_discovery.discover()
|
||||||
|
|
||||||
|
# Create safety components
|
||||||
|
safety_enforcer = SafetyLimitEnforcer(db_client)
|
||||||
|
emergency_stop_manager = EmergencyStopManager(db_client)
|
||||||
|
|
||||||
|
# Load safety limits
|
||||||
|
await safety_enforcer.load_safety_limits()
|
||||||
|
|
||||||
|
# Create mock alert manager for watchdog
|
||||||
|
mock_alert_manager = Mock()
|
||||||
|
watchdog = DatabaseWatchdog(db_client, mock_alert_manager)
|
||||||
|
|
||||||
|
# Create setpoint manager
|
||||||
|
setpoint_manager = SetpointManager(
|
||||||
|
discovery=auto_discovery,
|
||||||
|
db_client=db_client,
|
||||||
|
safety_enforcer=safety_enforcer,
|
||||||
|
emergency_stop_manager=emergency_stop_manager,
|
||||||
|
watchdog=watchdog
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create security components
|
||||||
|
security_manager = SecurityManager()
|
||||||
|
audit_logger = ComplianceAuditLogger(db_client)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'auto_discovery': auto_discovery,
|
||||||
|
'safety_enforcer': safety_enforcer,
|
||||||
|
'emergency_stop_manager': emergency_stop_manager,
|
||||||
|
'setpoint_manager': setpoint_manager,
|
||||||
|
'security_manager': security_manager,
|
||||||
|
'audit_logger': audit_logger,
|
||||||
|
'watchdog': watchdog
|
||||||
|
}
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failsafe_mode_activation(self, failsafe_components):
|
||||||
|
"""Test failsafe mode activation for individual pumps."""
|
||||||
|
setpoint_manager = failsafe_components['setpoint_manager']
|
||||||
|
watchdog = failsafe_components['watchdog']
|
||||||
|
|
||||||
|
station_id = 'FAILSAFE_STATION_001'
|
||||||
|
pump_id = 'FAILSAFE_PUMP_001'
|
||||||
|
|
||||||
|
# Get normal setpoint
|
||||||
|
normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert normal_setpoint is not None
|
||||||
|
|
||||||
|
# Activate failsafe mode
|
||||||
|
await watchdog.activate_failsafe_mode(station_id, pump_id, "communication_loss")
|
||||||
|
|
||||||
|
# Verify setpoint switches to failsafe mode
|
||||||
|
failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
|
||||||
|
# In failsafe mode, should use default setpoint (35.0) as fallback
|
||||||
|
# since we don't have a specific failsafe_setpoint_hz in the database
|
||||||
|
assert failsafe_setpoint == 35.0, f"Setpoint not in failsafe mode: {failsafe_setpoint}"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failsafe_mode_recovery(self, failsafe_components):
|
||||||
|
"""Test failsafe mode recovery and return to normal operation."""
|
||||||
|
setpoint_manager = failsafe_components['setpoint_manager']
|
||||||
|
watchdog = failsafe_components['watchdog']
|
||||||
|
|
||||||
|
station_id = 'FAILSAFE_STATION_001'
|
||||||
|
pump_id = 'FAILSAFE_PUMP_002'
|
||||||
|
|
||||||
|
# Get normal setpoint
|
||||||
|
normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
|
||||||
|
# Activate failsafe mode
|
||||||
|
await watchdog.activate_failsafe_mode(station_id, pump_id, "sensor_failure")
|
||||||
|
|
||||||
|
# Verify failsafe mode
|
||||||
|
failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert failsafe_setpoint == 40.0, f"Setpoint not in failsafe mode: {failsafe_setpoint}"
|
||||||
|
|
||||||
|
# Clear failsafe mode
|
||||||
|
await watchdog.clear_failsafe_mode(station_id, pump_id)
|
||||||
|
|
||||||
|
# Verify return to normal operation
|
||||||
|
recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert recovered_setpoint == normal_setpoint, "Setpoint did not recover after failsafe clearance"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failsafe_mode_station_wide(self, failsafe_components):
|
||||||
|
"""Test station-wide failsafe mode activation."""
|
||||||
|
setpoint_manager = failsafe_components['setpoint_manager']
|
||||||
|
watchdog = failsafe_components['watchdog']
|
||||||
|
|
||||||
|
station_id = 'FAILSAFE_STATION_001'
|
||||||
|
|
||||||
|
# Activate station-wide failsafe mode
|
||||||
|
await watchdog.activate_failsafe_mode_station(station_id, "station_communication_loss")
|
||||||
|
|
||||||
|
# Verify all pumps in station are in failsafe mode
|
||||||
|
pumps = setpoint_manager.discovery.get_pumps(station_id)
|
||||||
|
for pump in pumps:
|
||||||
|
pump_id = pump['pump_id']
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
|
||||||
|
# Should use default setpoint as failsafe
|
||||||
|
default_setpoint = pump.get('default_setpoint_hz', 35.0)
|
||||||
|
assert setpoint == default_setpoint, \
|
||||||
|
f"Pump {pump_id} not in failsafe mode: {setpoint} != {default_setpoint}"
|
||||||
|
|
||||||
|
# Verify pumps in other stations are unaffected
|
||||||
|
other_station_id = 'FAILSAFE_STATION_002'
|
||||||
|
other_pumps = setpoint_manager.discovery.get_pumps(other_station_id)
|
||||||
|
for pump in other_pumps:
|
||||||
|
pump_id = pump['pump_id']
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(other_station_id, pump_id)
|
||||||
|
assert setpoint != pump.get('default_setpoint', 35.0), \
|
||||||
|
f"Pump {pump_id} incorrectly in failsafe mode"
|
||||||
|
|
||||||
|
# Clear station-wide failsafe mode
|
||||||
|
await watchdog.clear_failsafe_mode_station(station_id)
|
||||||
|
|
||||||
|
# Verify pumps return to normal operation
|
||||||
|
for pump in pumps:
|
||||||
|
pump_id = pump['pump_id']
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert setpoint != pump.get('default_setpoint', 35.0), \
|
||||||
|
f"Pump {pump_id} not recovered from failsafe mode"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failsafe_priority_over_optimization(self, failsafe_components):
|
||||||
|
"""Test that failsafe mode takes priority over optimization plans."""
|
||||||
|
setpoint_manager = failsafe_components['setpoint_manager']
|
||||||
|
watchdog = failsafe_components['watchdog']
|
||||||
|
|
||||||
|
station_id = 'FAILSAFE_STATION_002'
|
||||||
|
pump_id = 'FAILSAFE_PUMP_003'
|
||||||
|
|
||||||
|
# Get optimization-based setpoint
|
||||||
|
optimization_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert optimization_setpoint is not None
|
||||||
|
|
||||||
|
# Activate failsafe mode
|
||||||
|
await watchdog.activate_failsafe_mode(station_id, pump_id, "optimizer_failure")
|
||||||
|
|
||||||
|
# Verify failsafe mode overrides optimization
|
||||||
|
failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert failsafe_setpoint == 38.0, f"Failsafe mode not overriding optimization: {failsafe_setpoint}"
|
||||||
|
assert failsafe_setpoint != optimization_setpoint, "Failsafe mode should differ from optimization"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_emergency_stop_priority_over_failsafe(self, failsafe_components):
|
||||||
|
"""Test that emergency stop takes priority over failsafe mode."""
|
||||||
|
setpoint_manager = failsafe_components['setpoint_manager']
|
||||||
|
emergency_stop_manager = failsafe_components['emergency_stop_manager']
|
||||||
|
watchdog = failsafe_components['watchdog']
|
||||||
|
|
||||||
|
station_id = 'FAILSAFE_STATION_001'
|
||||||
|
pump_id = 'FAILSAFE_PUMP_001'
|
||||||
|
|
||||||
|
# Activate failsafe mode
|
||||||
|
await watchdog.activate_failsafe_mode(station_id, pump_id, "test_priority")
|
||||||
|
|
||||||
|
# Verify failsafe mode is active
|
||||||
|
failsafe_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert failsafe_setpoint == 35.0, "Failsafe mode not active"
|
||||||
|
|
||||||
|
# Activate emergency stop
|
||||||
|
emergency_stop_manager.emergency_stop_pump(station_id, pump_id, "emergency_override_test")
|
||||||
|
|
||||||
|
# Verify emergency stop overrides failsafe mode
|
||||||
|
emergency_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert emergency_setpoint == 0.0, "Emergency stop not overriding failsafe mode"
|
||||||
|
|
||||||
|
# Clear emergency stop
|
||||||
|
emergency_stop_manager.clear_emergency_stop_pump(station_id, pump_id, "clearance_test")
|
||||||
|
|
||||||
|
# Verify failsafe mode is restored
|
||||||
|
restored_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert restored_setpoint == 35.0, "Failsafe mode not restored after emergency stop clearance"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failsafe_mode_audit_logging(self, failsafe_components, db_client):
|
||||||
|
"""Test that failsafe mode activations are properly logged."""
|
||||||
|
setpoint_manager = failsafe_components['setpoint_manager']
|
||||||
|
watchdog = failsafe_components['watchdog']
|
||||||
|
audit_logger = failsafe_components['audit_logger']
|
||||||
|
|
||||||
|
station_id = 'FAILSAFE_STATION_001'
|
||||||
|
pump_id = 'FAILSAFE_PUMP_002'
|
||||||
|
|
||||||
|
# Activate failsafe mode
|
||||||
|
await watchdog.activate_failsafe_mode(station_id, pump_id, "audit_test_reason")
|
||||||
|
|
||||||
|
# Verify failsafe mode is active
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert setpoint == 40.0, "Failsafe mode not active for audit test"
|
||||||
|
|
||||||
|
# Note: In a real implementation, we would verify that the audit logger
|
||||||
|
# recorded the failsafe activation. For now, we'll just verify the
|
||||||
|
# functional behavior.
|
||||||
|
|
||||||
|
# Clear failsafe mode
|
||||||
|
await watchdog.clear_failsafe_mode(station_id, pump_id)
|
||||||
|
|
||||||
|
# Verify normal operation restored
|
||||||
|
recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert recovered_setpoint != 40.0, "Failsafe mode not cleared"
|
||||||
|
|
@ -0,0 +1,397 @@
|
||||||
|
"""
|
||||||
|
Integration tests for optimization-to-SCADA workflow.
|
||||||
|
|
||||||
|
Tests the complete data flow:
|
||||||
|
1. Optimization plan in database
|
||||||
|
2. Setpoint calculation by setpoint manager
|
||||||
|
3. Protocol server exposure (OPC UA, Modbus, REST API)
|
||||||
|
4. SCADA client access to setpoints
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from typing import Dict, Any, Optional
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
from src.database.flexible_client import FlexibleDatabaseClient
|
||||||
|
from src.core.auto_discovery import AutoDiscovery
|
||||||
|
from src.core.setpoint_manager import SetpointManager
|
||||||
|
from src.core.safety import SafetyLimitEnforcer
|
||||||
|
from src.core.emergency_stop import EmergencyStopManager
|
||||||
|
from src.monitoring.watchdog import DatabaseWatchdog
|
||||||
|
from src.protocols.opcua_server import OPCUAServer
|
||||||
|
from src.protocols.modbus_server import ModbusServer
|
||||||
|
from src.protocols.rest_api import RESTAPIServer
|
||||||
|
from src.core.security import SecurityManager
|
||||||
|
from src.core.compliance_audit import ComplianceAuditLogger
|
||||||
|
|
||||||
|
|
||||||
|
class TestOptimizationToSCADAIntegration:
|
||||||
|
"""Test complete workflow from optimization to SCADA."""
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def db_client(self):
|
||||||
|
"""Create database client with comprehensive test data."""
|
||||||
|
client = FlexibleDatabaseClient(
|
||||||
|
database_url="sqlite:///:memory:",
|
||||||
|
pool_size=5,
|
||||||
|
max_overflow=10,
|
||||||
|
pool_timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
|
await client.connect()
|
||||||
|
client.create_tables()
|
||||||
|
|
||||||
|
# Insert comprehensive test data
|
||||||
|
self._insert_comprehensive_test_data(client)
|
||||||
|
|
||||||
|
return client
|
||||||
|
|
||||||
|
def _insert_comprehensive_test_data(self, db_client):
|
||||||
|
"""Insert realistic test data for multiple scenarios."""
|
||||||
|
# Insert multiple pump stations
|
||||||
|
db_client.execute(
|
||||||
|
"""INSERT INTO pump_stations (station_id, station_name, location) VALUES
|
||||||
|
('STATION_001', 'Main Station', 'Location A'),
|
||||||
|
('STATION_002', 'Backup Station', 'Location B'),
|
||||||
|
('STATION_003', 'Emergency Station', 'Location C')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert multiple pumps with different configurations
|
||||||
|
db_client.execute(
|
||||||
|
"""INSERT INTO pumps (station_id, pump_id, pump_name, control_type,
|
||||||
|
min_speed_hz, max_speed_hz, default_setpoint_hz) VALUES
|
||||||
|
('STATION_001', 'PUMP_001', 'Main Pump 1', 'DIRECT_SPEED', 20.0, 60.0, 35.0),
|
||||||
|
('STATION_001', 'PUMP_002', 'Main Pump 2', 'LEVEL_CONTROLLED', 25.0, 55.0, 40.0),
|
||||||
|
('STATION_002', 'PUMP_003', 'Backup Pump 1', 'POWER_CONTROLLED', 30.0, 50.0, 38.0),
|
||||||
|
('STATION_003', 'PUMP_004', 'Emergency Pump', 'DIRECT_SPEED', 15.0, 70.0, 45.0)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert optimization plans for different scenarios
|
||||||
|
import datetime
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
|
||||||
|
# Insert plans one by one to handle datetime values
|
||||||
|
db_client.execute(
|
||||||
|
f"""INSERT INTO pump_plans (
|
||||||
|
station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m,
|
||||||
|
suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id
|
||||||
|
) VALUES (
|
||||||
|
'STATION_001', 'PUMP_001', 150.0, NULL, NULL, 42.5,
|
||||||
|
'{now - datetime.timedelta(hours=1)}', '{now + datetime.timedelta(hours=1)}', 1, 'ACTIVE', 'OPT_RUN_001'
|
||||||
|
)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
db_client.execute(
|
||||||
|
f"""INSERT INTO pump_plans (
|
||||||
|
station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m,
|
||||||
|
suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id
|
||||||
|
) VALUES (
|
||||||
|
'STATION_001', 'PUMP_002', NULL, NULL, 2.5, 38.0,
|
||||||
|
'{now - datetime.timedelta(minutes=30)}', '{now + datetime.timedelta(hours=2)}', 1, 'ACTIVE', 'OPT_RUN_002'
|
||||||
|
)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
db_client.execute(
|
||||||
|
f"""INSERT INTO pump_plans (
|
||||||
|
station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m,
|
||||||
|
suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id
|
||||||
|
) VALUES (
|
||||||
|
'STATION_002', 'PUMP_003', NULL, 12.5, NULL, 36.0,
|
||||||
|
'{now - datetime.timedelta(hours=2)}', '{now + datetime.timedelta(hours=3)}', 1, 'ACTIVE', 'OPT_RUN_003'
|
||||||
|
)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
db_client.execute(
|
||||||
|
f"""INSERT INTO pump_plans (
|
||||||
|
station_id, pump_id, target_flow_m3h, target_power_kw, target_level_m,
|
||||||
|
suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id
|
||||||
|
) VALUES (
|
||||||
|
'STATION_003', 'PUMP_004', 200.0, NULL, NULL, 48.0,
|
||||||
|
'{now - datetime.timedelta(hours=1)}', '{now + datetime.timedelta(hours=4)}', 1, 'ACTIVE', 'OPT_RUN_004'
|
||||||
|
)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert safety limits for all pumps
|
||||||
|
db_client.execute(
|
||||||
|
"""INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
|
||||||
|
hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m,
|
||||||
|
hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds,
|
||||||
|
max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES
|
||||||
|
('STATION_001', 'PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('STATION_001', 'PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('STATION_002', 'PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('STATION_003', 'PUMP_004', 15.0, 70.0, 0.5, 6.0, 0.2, 1.0, 20.0, 250.0, 6, 300, 24, 10.0, 'admin', 'admin')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def system_components(self, db_client):
|
||||||
|
"""Create all system components for integration testing."""
|
||||||
|
# Create auto discovery
|
||||||
|
auto_discovery = AutoDiscovery(db_client)
|
||||||
|
await auto_discovery.discover()
|
||||||
|
|
||||||
|
# Create safety components
|
||||||
|
safety_enforcer = SafetyLimitEnforcer(db_client)
|
||||||
|
emergency_stop_manager = EmergencyStopManager(db_client)
|
||||||
|
|
||||||
|
# Load safety limits
|
||||||
|
await safety_enforcer.load_safety_limits()
|
||||||
|
|
||||||
|
# Create mock alert manager for watchdog
|
||||||
|
mock_alert_manager = Mock()
|
||||||
|
watchdog = DatabaseWatchdog(db_client, mock_alert_manager)
|
||||||
|
|
||||||
|
# Create setpoint manager
|
||||||
|
setpoint_manager = SetpointManager(
|
||||||
|
discovery=auto_discovery,
|
||||||
|
db_client=db_client,
|
||||||
|
safety_enforcer=safety_enforcer,
|
||||||
|
emergency_stop_manager=emergency_stop_manager,
|
||||||
|
watchdog=watchdog
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create security components
|
||||||
|
security_manager = SecurityManager()
|
||||||
|
audit_logger = ComplianceAuditLogger(db_client)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'db_client': db_client,
|
||||||
|
'auto_discovery': auto_discovery,
|
||||||
|
'setpoint_manager': setpoint_manager,
|
||||||
|
'safety_enforcer': safety_enforcer,
|
||||||
|
'emergency_stop_manager': emergency_stop_manager,
|
||||||
|
'watchdog': watchdog,
|
||||||
|
'security_manager': security_manager,
|
||||||
|
'audit_logger': audit_logger
|
||||||
|
}
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_optimization_plan_to_setpoint_calculation(self, system_components):
|
||||||
|
"""Test that optimization plans are correctly converted to setpoints."""
|
||||||
|
setpoint_manager = system_components['setpoint_manager']
|
||||||
|
|
||||||
|
# Test setpoint calculation for different pump types
|
||||||
|
test_cases = [
|
||||||
|
('STATION_001', 'PUMP_001', 42.5), # DIRECT_SPEED
|
||||||
|
('STATION_001', 'PUMP_002', 38.0), # LEVEL_CONTROL
|
||||||
|
('STATION_002', 'PUMP_003', 36.0), # POWER_OPTIMIZATION
|
||||||
|
('STATION_003', 'PUMP_004', 48.0), # DIRECT_SPEED (emergency)
|
||||||
|
]
|
||||||
|
|
||||||
|
for station_id, pump_id, expected_setpoint in test_cases:
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert setpoint == expected_setpoint, f"Setpoint mismatch for {station_id}/{pump_id}"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_opcua_server_setpoint_exposure(self, system_components):
|
||||||
|
"""Test that OPC UA server correctly exposes setpoints."""
|
||||||
|
setpoint_manager = system_components['setpoint_manager']
|
||||||
|
security_manager = system_components['security_manager']
|
||||||
|
audit_logger = system_components['audit_logger']
|
||||||
|
|
||||||
|
# Create OPC UA server
|
||||||
|
opcua_server = OPCUAServer(
|
||||||
|
setpoint_manager=setpoint_manager,
|
||||||
|
security_manager=security_manager,
|
||||||
|
audit_logger=audit_logger,
|
||||||
|
enable_security=False, # Disable security for testing
|
||||||
|
endpoint="opc.tcp://127.0.0.1:4840"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Start server
|
||||||
|
await opcua_server.start()
|
||||||
|
|
||||||
|
# Wait for server to initialize
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
# Test that setpoints are available
|
||||||
|
stations = setpoint_manager.discovery.get_stations()
|
||||||
|
for station_id in stations:
|
||||||
|
pumps = setpoint_manager.discovery.get_pumps(station_id)
|
||||||
|
for pump in pumps:
|
||||||
|
pump_id = pump['pump_id']
|
||||||
|
|
||||||
|
# Verify setpoint is calculated
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert setpoint is not None, f"No setpoint for {station_id}/{pump_id}"
|
||||||
|
|
||||||
|
# Verify setpoint is within safety limits
|
||||||
|
pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id)
|
||||||
|
min_speed = pump_info.get('min_speed_hz', 20.0)
|
||||||
|
max_speed = pump_info.get('max_speed_hz', 60.0)
|
||||||
|
assert min_speed <= setpoint <= max_speed, f"Setpoint out of range for {station_id}/{pump_id}"
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Clean up
|
||||||
|
await opcua_server.stop()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_modbus_server_setpoint_exposure(self, system_components):
|
||||||
|
"""Test that Modbus server correctly exposes setpoints."""
|
||||||
|
setpoint_manager = system_components['setpoint_manager']
|
||||||
|
security_manager = system_components['security_manager']
|
||||||
|
audit_logger = system_components['audit_logger']
|
||||||
|
|
||||||
|
# Create Modbus server
|
||||||
|
modbus_server = ModbusServer(
|
||||||
|
setpoint_manager=setpoint_manager,
|
||||||
|
security_manager=security_manager,
|
||||||
|
audit_logger=audit_logger,
|
||||||
|
enable_security=False, # Disable security for testing
|
||||||
|
allowed_ips=["127.0.0.1"],
|
||||||
|
rate_limit_per_minute=100,
|
||||||
|
host="127.0.0.1",
|
||||||
|
port=5020
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Start server
|
||||||
|
await modbus_server.start()
|
||||||
|
|
||||||
|
# Wait for server to initialize
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
# Test that pump mapping is created
|
||||||
|
assert len(modbus_server.pump_addresses) > 0, "No pumps mapped to Modbus addresses"
|
||||||
|
|
||||||
|
# Verify each pump has a setpoint register
|
||||||
|
for (station_id, pump_id), addresses in modbus_server.pump_addresses.items():
|
||||||
|
assert 'setpoint_register' in addresses, f"No setpoint register for {station_id}/{pump_id}"
|
||||||
|
|
||||||
|
# Verify setpoint is calculated
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert setpoint is not None, f"No setpoint for {station_id}/{pump_id}"
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Clean up
|
||||||
|
await modbus_server.stop()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rest_api_setpoint_exposure(self, system_components):
|
||||||
|
"""Test that REST API correctly exposes setpoints."""
|
||||||
|
setpoint_manager = system_components['setpoint_manager']
|
||||||
|
emergency_stop_manager = system_components['emergency_stop_manager']
|
||||||
|
|
||||||
|
# Create REST API server
|
||||||
|
rest_api = RESTAPIServer(
|
||||||
|
setpoint_manager=setpoint_manager,
|
||||||
|
emergency_stop_manager=emergency_stop_manager,
|
||||||
|
host="127.0.0.1",
|
||||||
|
port=8000
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Start server
|
||||||
|
await rest_api.start()
|
||||||
|
|
||||||
|
# Wait for server to initialize - use retry mechanism
|
||||||
|
import httpx
|
||||||
|
max_retries = 10
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.get("http://127.0.0.1:8000/api/v1/setpoints", timeout=1.0)
|
||||||
|
if response.status_code == 200:
|
||||||
|
break
|
||||||
|
except (httpx.ConnectError, httpx.ReadTimeout):
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Test that setpoint endpoints are available
|
||||||
|
routes = [route.path for route in rest_api.app.routes]
|
||||||
|
assert "/api/v1/setpoints" in routes
|
||||||
|
assert "/api/v1/setpoints/{station_id}/{pump_id}" in routes
|
||||||
|
|
||||||
|
# Test setpoint retrieval via REST API
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
# First authenticate to get token
|
||||||
|
login_response = await client.post(
|
||||||
|
"http://127.0.0.1:8000/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "admin123"}
|
||||||
|
)
|
||||||
|
assert login_response.status_code == 200
|
||||||
|
login_data = login_response.json()
|
||||||
|
token = login_data["access_token"]
|
||||||
|
|
||||||
|
# Set up headers with authentication
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
# Get all setpoints
|
||||||
|
response = await client.get("http://127.0.0.1:8000/api/v1/setpoints", headers=headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
setpoints = response.json()
|
||||||
|
assert len(setpoints) > 0
|
||||||
|
|
||||||
|
# Get specific setpoint
|
||||||
|
response = await client.get("http://127.0.0.1:8000/api/v1/setpoints/STATION_001/PUMP_001", headers=headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
setpoint_data = response.json()
|
||||||
|
assert 'setpoint_hz' in setpoint_data
|
||||||
|
assert setpoint_data['setpoint_hz'] == 42.5
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Clean up
|
||||||
|
await rest_api.stop()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_safety_limit_enforcement_integration(self, system_components):
|
||||||
|
"""Test that safety limits are enforced throughout the workflow."""
|
||||||
|
setpoint_manager = system_components['setpoint_manager']
|
||||||
|
safety_enforcer = system_components['safety_enforcer']
|
||||||
|
|
||||||
|
# Test safety limit enforcement for each pump
|
||||||
|
stations = setpoint_manager.discovery.get_stations()
|
||||||
|
for station_id in stations:
|
||||||
|
pumps = setpoint_manager.discovery.get_pumps(station_id)
|
||||||
|
for pump in pumps:
|
||||||
|
pump_id = pump['pump_id']
|
||||||
|
|
||||||
|
# Get calculated setpoint
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
|
||||||
|
# Verify safety enforcement
|
||||||
|
enforced_setpoint, violations = safety_enforcer.enforce_setpoint(
|
||||||
|
station_id, pump_id, setpoint
|
||||||
|
)
|
||||||
|
|
||||||
|
# Setpoint should be within limits after enforcement
|
||||||
|
pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id)
|
||||||
|
min_speed = pump_info.get('min_speed_hz', 20.0)
|
||||||
|
max_speed = pump_info.get('max_speed_hz', 60.0)
|
||||||
|
|
||||||
|
assert min_speed <= enforced_setpoint <= max_speed, \
|
||||||
|
f"Safety enforcement failed for {station_id}/{pump_id}"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_emergency_stop_integration(self, system_components):
|
||||||
|
"""Test emergency stop integration with setpoint calculation."""
|
||||||
|
setpoint_manager = system_components['setpoint_manager']
|
||||||
|
emergency_stop_manager = system_components['emergency_stop_manager']
|
||||||
|
|
||||||
|
# Test station/pump
|
||||||
|
station_id = 'STATION_001'
|
||||||
|
pump_id = 'PUMP_001'
|
||||||
|
|
||||||
|
# Get normal setpoint
|
||||||
|
normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert normal_setpoint is not None
|
||||||
|
|
||||||
|
# Activate emergency stop
|
||||||
|
emergency_stop_manager.emergency_stop_pump(station_id, pump_id, "integration_test")
|
||||||
|
|
||||||
|
# Setpoint should be 0 during emergency stop
|
||||||
|
emergency_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert emergency_setpoint == 0.0, "Setpoint not zero during emergency stop"
|
||||||
|
|
||||||
|
# Clear emergency stop
|
||||||
|
emergency_stop_manager.clear_emergency_stop_pump(station_id, pump_id, "integration_test")
|
||||||
|
|
||||||
|
# Setpoint should return to normal
|
||||||
|
recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert recovered_setpoint == normal_setpoint, "Setpoint did not recover after emergency stop"
|
||||||
|
|
@ -0,0 +1,393 @@
|
||||||
|
"""
|
||||||
|
Performance and Load Testing for Calejo Control Adapter.
|
||||||
|
|
||||||
|
Tests system behavior under load with concurrent connections,
|
||||||
|
high-frequency updates, and stress conditions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
from typing import Dict, List, Any
|
||||||
|
import statistics
|
||||||
|
|
||||||
|
from src.database.flexible_client import FlexibleDatabaseClient
|
||||||
|
from src.core.auto_discovery import AutoDiscovery
|
||||||
|
from src.core.setpoint_manager import SetpointManager
|
||||||
|
from src.core.safety import SafetyLimitEnforcer
|
||||||
|
from src.core.emergency_stop import EmergencyStopManager
|
||||||
|
from src.core.optimization_manager import OptimizationPlanManager
|
||||||
|
from src.core.security import SecurityManager
|
||||||
|
from src.core.compliance_audit import ComplianceAuditLogger
|
||||||
|
from src.protocols.opcua_server import OPCUAServer
|
||||||
|
from src.protocols.modbus_server import ModbusServer
|
||||||
|
from src.protocols.rest_api import RESTAPIServer
|
||||||
|
from src.monitoring.watchdog import DatabaseWatchdog
|
||||||
|
from unittest.mock import Mock
|
||||||
|
|
||||||
|
|
||||||
|
class TestPerformanceLoad:
|
||||||
|
"""Performance and load testing for Calejo Control Adapter."""
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def performance_db_client(self):
|
||||||
|
"""Create database client for performance testing."""
|
||||||
|
client = FlexibleDatabaseClient("sqlite:///:memory:")
|
||||||
|
await client.connect()
|
||||||
|
client.create_tables()
|
||||||
|
|
||||||
|
# Insert performance test data
|
||||||
|
client.execute(
|
||||||
|
"""INSERT INTO pump_stations (station_id, station_name, location) VALUES
|
||||||
|
('PERF_STATION_001', 'Performance Station 1', 'Test Area'),
|
||||||
|
('PERF_STATION_002', 'Performance Station 2', 'Test Area'),
|
||||||
|
('PERF_STATION_003', 'Performance Station 3', 'Test Area')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
client.execute(
|
||||||
|
"""INSERT INTO pumps (station_id, pump_id, pump_name, control_type, default_setpoint_hz) VALUES
|
||||||
|
('PERF_STATION_001', 'PERF_PUMP_001', 'Performance Pump 1', 'DIRECT_SPEED', 35.0),
|
||||||
|
('PERF_STATION_001', 'PERF_PUMP_002', 'Performance Pump 2', 'LEVEL_CONTROLLED', 40.0),
|
||||||
|
('PERF_STATION_002', 'PERF_PUMP_003', 'Performance Pump 3', 'POWER_CONTROLLED', 38.0),
|
||||||
|
('PERF_STATION_002', 'PERF_PUMP_004', 'Performance Pump 4', 'DIRECT_SPEED', 42.0),
|
||||||
|
('PERF_STATION_003', 'PERF_PUMP_005', 'Performance Pump 5', 'LEVEL_CONTROLLED', 37.0),
|
||||||
|
('PERF_STATION_003', 'PERF_PUMP_006', 'Performance Pump 6', 'POWER_CONTROLLED', 39.0)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
client.execute(
|
||||||
|
"""INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
|
||||||
|
hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m,
|
||||||
|
hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds,
|
||||||
|
max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES
|
||||||
|
('PERF_STATION_001', 'PERF_PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('PERF_STATION_001', 'PERF_PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('PERF_STATION_002', 'PERF_PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('PERF_STATION_002', 'PERF_PUMP_004', 15.0, 70.0, 0.5, 6.0, 0.2, 1.0, 20.0, 250.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('PERF_STATION_003', 'PERF_PUMP_005', 22.0, 58.0, 1.2, 4.8, 0.8, 1.8, 14.0, 190.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('PERF_STATION_003', 'PERF_PUMP_006', 28.0, 52.0, 1.8, 4.2, 1.2, 2.2, 11.0, 170.0, 6, 300, 24, 10.0, 'admin', 'admin')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
yield client
|
||||||
|
await client.disconnect()
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def performance_components(self, performance_db_client):
|
||||||
|
"""Create all system components for performance testing."""
|
||||||
|
# Initialize core components
|
||||||
|
discovery = AutoDiscovery(performance_db_client)
|
||||||
|
await discovery.discover()
|
||||||
|
|
||||||
|
safety_enforcer = SafetyLimitEnforcer(performance_db_client)
|
||||||
|
await safety_enforcer.load_safety_limits()
|
||||||
|
|
||||||
|
emergency_stop_manager = EmergencyStopManager(performance_db_client)
|
||||||
|
|
||||||
|
# Create mock alert manager for watchdog
|
||||||
|
mock_alert_manager = Mock()
|
||||||
|
watchdog = DatabaseWatchdog(performance_db_client, mock_alert_manager)
|
||||||
|
|
||||||
|
setpoint_manager = SetpointManager(
|
||||||
|
db_client=performance_db_client,
|
||||||
|
discovery=discovery,
|
||||||
|
safety_enforcer=safety_enforcer,
|
||||||
|
emergency_stop_manager=emergency_stop_manager,
|
||||||
|
watchdog=watchdog
|
||||||
|
)
|
||||||
|
await setpoint_manager.start()
|
||||||
|
|
||||||
|
optimization_engine = OptimizationPlanManager(performance_db_client)
|
||||||
|
security_manager = SecurityManager()
|
||||||
|
audit_logger = ComplianceAuditLogger(performance_db_client)
|
||||||
|
|
||||||
|
# Initialize protocol servers
|
||||||
|
opcua_server = OPCUAServer(
|
||||||
|
setpoint_manager=setpoint_manager,
|
||||||
|
security_manager=security_manager,
|
||||||
|
audit_logger=audit_logger,
|
||||||
|
enable_security=False, # Disable security for testing
|
||||||
|
endpoint="opc.tcp://127.0.0.1:4840"
|
||||||
|
)
|
||||||
|
|
||||||
|
modbus_server = ModbusServer(
|
||||||
|
setpoint_manager=setpoint_manager,
|
||||||
|
security_manager=security_manager,
|
||||||
|
audit_logger=audit_logger,
|
||||||
|
host="127.0.0.1",
|
||||||
|
port=5020
|
||||||
|
)
|
||||||
|
|
||||||
|
rest_api = RESTAPIServer(
|
||||||
|
setpoint_manager=setpoint_manager,
|
||||||
|
emergency_stop_manager=emergency_stop_manager,
|
||||||
|
host="127.0.0.1",
|
||||||
|
port=8001
|
||||||
|
)
|
||||||
|
|
||||||
|
components = {
|
||||||
|
'db_client': performance_db_client,
|
||||||
|
'discovery': discovery,
|
||||||
|
'safety_enforcer': safety_enforcer,
|
||||||
|
'setpoint_manager': setpoint_manager,
|
||||||
|
'emergency_stop_manager': emergency_stop_manager,
|
||||||
|
'optimization_engine': optimization_engine,
|
||||||
|
'security_manager': security_manager,
|
||||||
|
'opcua_server': opcua_server,
|
||||||
|
'modbus_server': modbus_server,
|
||||||
|
'rest_api': rest_api
|
||||||
|
}
|
||||||
|
|
||||||
|
yield components
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
await setpoint_manager.stop()
|
||||||
|
await opcua_server.stop()
|
||||||
|
await modbus_server.stop()
|
||||||
|
await rest_api.stop()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_concurrent_setpoint_updates(self, performance_components):
|
||||||
|
"""Test performance with concurrent setpoint updates."""
|
||||||
|
safety_enforcer = performance_components['safety_enforcer']
|
||||||
|
|
||||||
|
# Test parameters
|
||||||
|
num_concurrent_tasks = 10
|
||||||
|
updates_per_task = 50
|
||||||
|
|
||||||
|
async def update_setpoint_task(task_id: int):
|
||||||
|
"""Task that performs multiple setpoint updates."""
|
||||||
|
latencies = []
|
||||||
|
for i in range(updates_per_task):
|
||||||
|
station_id = f"PERF_STATION_{(task_id % 3) + 1:03d}"
|
||||||
|
pump_id = f"PERF_PUMP_{(task_id % 6) + 1:03d}"
|
||||||
|
setpoint = 35.0 + (task_id * 2) + (i * 0.1)
|
||||||
|
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
enforced_setpoint, warnings = safety_enforcer.enforce_setpoint(
|
||||||
|
station_id=station_id,
|
||||||
|
pump_id=pump_id,
|
||||||
|
setpoint=setpoint
|
||||||
|
)
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
|
||||||
|
latency = (end_time - start_time) * 1000 # Convert to milliseconds
|
||||||
|
latencies.append(latency)
|
||||||
|
|
||||||
|
# Verify result is within safety limits (if safety limits exist)
|
||||||
|
assert enforced_setpoint is not None
|
||||||
|
if enforced_setpoint > 0: # Only check if safety limits exist
|
||||||
|
assert 20.0 <= enforced_setpoint <= 70.0 # Within safety limits
|
||||||
|
|
||||||
|
return latencies
|
||||||
|
|
||||||
|
# Run concurrent tasks
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
tasks = [update_setpoint_task(i) for i in range(num_concurrent_tasks)]
|
||||||
|
results = await asyncio.gather(*tasks)
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
|
||||||
|
# Calculate performance metrics
|
||||||
|
total_updates = num_concurrent_tasks * updates_per_task
|
||||||
|
total_time = end_time - start_time
|
||||||
|
throughput = total_updates / total_time
|
||||||
|
|
||||||
|
# Flatten all latencies
|
||||||
|
all_latencies = [latency for task_latencies in results for latency in task_latencies]
|
||||||
|
|
||||||
|
# Calculate statistics
|
||||||
|
avg_latency = statistics.mean(all_latencies)
|
||||||
|
p95_latency = statistics.quantiles(all_latencies, n=20)[18] # 95th percentile
|
||||||
|
max_latency = max(all_latencies)
|
||||||
|
|
||||||
|
# Performance assertions
|
||||||
|
print(f"\nConcurrent Setpoint Updates Performance:")
|
||||||
|
print(f" Total Updates: {total_updates}")
|
||||||
|
print(f" Total Time: {total_time:.2f}s")
|
||||||
|
print(f" Throughput: {throughput:.1f} updates/sec")
|
||||||
|
print(f" Average Latency: {avg_latency:.2f}ms")
|
||||||
|
print(f" 95th Percentile Latency: {p95_latency:.2f}ms")
|
||||||
|
print(f" Max Latency: {max_latency:.2f}ms")
|
||||||
|
|
||||||
|
# Performance requirements
|
||||||
|
assert throughput > 100, f"Throughput too low: {throughput:.1f} updates/sec"
|
||||||
|
assert avg_latency < 100, f"Average latency too high: {avg_latency:.2f}ms"
|
||||||
|
assert p95_latency < 200, f"95th percentile latency too high: {p95_latency:.2f}ms"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skip(reason="Optimization calculation not implemented in OptimizationPlanManager")
|
||||||
|
async def test_high_frequency_optimization(self, performance_components):
|
||||||
|
"""Test performance with high-frequency optimization calculations."""
|
||||||
|
optimization_engine = performance_components['optimization_engine']
|
||||||
|
|
||||||
|
# Test parameters
|
||||||
|
num_iterations = 100
|
||||||
|
num_pumps = 6
|
||||||
|
|
||||||
|
latencies = []
|
||||||
|
|
||||||
|
for i in range(num_iterations):
|
||||||
|
# Create realistic optimization parameters
|
||||||
|
demand_m3h = 100 + (i * 10) % 200
|
||||||
|
electricity_price = 0.15 + (i * 0.01) % 0.05
|
||||||
|
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
|
||||||
|
# Perform optimization
|
||||||
|
result = optimization_engine.calculate_optimal_setpoints(
|
||||||
|
demand_m3h=demand_m3h,
|
||||||
|
electricity_price=electricity_price,
|
||||||
|
max_total_power_kw=50.0
|
||||||
|
)
|
||||||
|
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
latency = (end_time - start_time) * 1000 # Convert to milliseconds
|
||||||
|
latencies.append(latency)
|
||||||
|
|
||||||
|
# Verify optimization result
|
||||||
|
assert result is not None
|
||||||
|
assert 'optimal_setpoints' in result
|
||||||
|
assert len(result['optimal_setpoints']) == num_pumps
|
||||||
|
|
||||||
|
# Verify setpoints are within safety limits
|
||||||
|
for station_id, pump_id, setpoint in result['optimal_setpoints']:
|
||||||
|
assert 20.0 <= setpoint <= 70.0
|
||||||
|
|
||||||
|
# Calculate performance metrics
|
||||||
|
avg_latency = statistics.mean(latencies)
|
||||||
|
p95_latency = statistics.quantiles(latencies, n=20)[18] # 95th percentile
|
||||||
|
throughput = num_iterations / (sum(latencies) / 1000) # updates per second
|
||||||
|
|
||||||
|
print(f"\nHigh-Frequency Optimization Performance:")
|
||||||
|
print(f" Iterations: {num_iterations}")
|
||||||
|
print(f" Average Latency: {avg_latency:.2f}ms")
|
||||||
|
print(f" 95th Percentile Latency: {p95_latency:.2f}ms")
|
||||||
|
print(f" Throughput: {throughput:.1f} optimizations/sec")
|
||||||
|
|
||||||
|
# Performance requirements
|
||||||
|
assert avg_latency < 50, f"Optimization latency too high: {avg_latency:.2f}ms"
|
||||||
|
assert throughput > 10, f"Optimization throughput too low: {throughput:.1f}/sec"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_concurrent_protocol_access(self, performance_components):
|
||||||
|
"""Test performance with concurrent access across all protocols."""
|
||||||
|
setpoint_manager = performance_components['setpoint_manager']
|
||||||
|
opcua_server = performance_components['opcua_server']
|
||||||
|
modbus_server = performance_components['modbus_server']
|
||||||
|
rest_api = performance_components['rest_api']
|
||||||
|
|
||||||
|
# Start all servers
|
||||||
|
await opcua_server.start()
|
||||||
|
await modbus_server.start()
|
||||||
|
await rest_api.start()
|
||||||
|
|
||||||
|
# Wait for servers to initialize
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
# Test parameters
|
||||||
|
num_clients = 5
|
||||||
|
requests_per_client = 20
|
||||||
|
|
||||||
|
async def protocol_client_task(client_id: int):
|
||||||
|
"""Task that performs requests across different protocols."""
|
||||||
|
latencies = []
|
||||||
|
|
||||||
|
for i in range(requests_per_client):
|
||||||
|
protocol = i % 3
|
||||||
|
station_id = f"PERF_STATION_{(client_id % 3) + 1:03d}"
|
||||||
|
pump_id = f"PERF_PUMP_{(client_id % 6) + 1:03d}"
|
||||||
|
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
|
||||||
|
if protocol == 0:
|
||||||
|
# OPCUA-like access (simulated)
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
# Note: setpoint may be None if no optimization plans exist
|
||||||
|
elif protocol == 1:
|
||||||
|
# Modbus-like access (simulated)
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
# Note: setpoint may be None if no optimization plans exist
|
||||||
|
else:
|
||||||
|
# REST API-like access (simulated)
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
# Note: setpoint may be None if no optimization plans exist
|
||||||
|
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
latency = (end_time - start_time) * 1000
|
||||||
|
latencies.append(latency)
|
||||||
|
|
||||||
|
return latencies
|
||||||
|
|
||||||
|
# Run concurrent clients
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
tasks = [protocol_client_task(i) for i in range(num_clients)]
|
||||||
|
results = await asyncio.gather(*tasks)
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
|
||||||
|
# Calculate performance metrics
|
||||||
|
total_requests = num_clients * requests_per_client
|
||||||
|
total_time = end_time - start_time
|
||||||
|
throughput = total_requests / total_time
|
||||||
|
|
||||||
|
# Flatten all latencies
|
||||||
|
all_latencies = [latency for client_latencies in results for latency in client_latencies]
|
||||||
|
|
||||||
|
# Calculate statistics
|
||||||
|
avg_latency = statistics.mean(all_latencies)
|
||||||
|
p95_latency = statistics.quantiles(all_latencies, n=20)[18] # 95th percentile
|
||||||
|
|
||||||
|
print(f"\nConcurrent Protocol Access Performance:")
|
||||||
|
print(f" Total Requests: {total_requests}")
|
||||||
|
print(f" Total Time: {total_time:.2f}s")
|
||||||
|
print(f" Throughput: {throughput:.1f} requests/sec")
|
||||||
|
print(f" Average Latency: {avg_latency:.2f}ms")
|
||||||
|
print(f" 95th Percentile Latency: {p95_latency:.2f}ms")
|
||||||
|
|
||||||
|
# Performance requirements
|
||||||
|
assert throughput > 50, f"Protocol throughput too low: {throughput:.1f} requests/sec"
|
||||||
|
assert avg_latency < 50, f"Protocol latency too high: {avg_latency:.2f}ms"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_memory_usage_under_load(self, performance_components):
|
||||||
|
"""Test memory usage doesn't grow excessively under sustained load."""
|
||||||
|
safety_enforcer = performance_components['safety_enforcer']
|
||||||
|
|
||||||
|
# Get initial memory usage (approximate)
|
||||||
|
import psutil
|
||||||
|
import os
|
||||||
|
process = psutil.Process(os.getpid())
|
||||||
|
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
|
||||||
|
|
||||||
|
# Sustained load test
|
||||||
|
num_operations = 1000
|
||||||
|
|
||||||
|
for i in range(num_operations):
|
||||||
|
station_id = f"PERF_STATION_{(i % 3) + 1:03d}"
|
||||||
|
pump_id = f"PERF_PUMP_{(i % 6) + 1:03d}"
|
||||||
|
setpoint = 35.0 + (i * 0.1) % 20.0
|
||||||
|
|
||||||
|
# Perform setpoint enforcement
|
||||||
|
result, warnings = safety_enforcer.enforce_setpoint(
|
||||||
|
station_id, pump_id, setpoint
|
||||||
|
)
|
||||||
|
# Note: result may be 0.0 if no safety limits exist
|
||||||
|
|
||||||
|
# Every 100 operations, check memory
|
||||||
|
if i % 100 == 0:
|
||||||
|
current_memory = process.memory_info().rss / 1024 / 1024
|
||||||
|
memory_increase = current_memory - initial_memory
|
||||||
|
print(f" Operation {i}: Memory increase = {memory_increase:.2f} MB")
|
||||||
|
|
||||||
|
# Check memory doesn't grow excessively
|
||||||
|
assert memory_increase < 50, f"Memory growth too high: {memory_increase:.2f} MB"
|
||||||
|
|
||||||
|
# Final memory check
|
||||||
|
final_memory = process.memory_info().rss / 1024 / 1024
|
||||||
|
total_memory_increase = final_memory - initial_memory
|
||||||
|
|
||||||
|
print(f"\nMemory Usage Under Load:")
|
||||||
|
print(f" Initial Memory: {initial_memory:.2f} MB")
|
||||||
|
print(f" Final Memory: {final_memory:.2f} MB")
|
||||||
|
print(f" Total Increase: {total_memory_increase:.2f} MB")
|
||||||
|
|
||||||
|
# Final memory requirement
|
||||||
|
assert total_memory_increase < 100, f"Excessive memory growth: {total_memory_increase:.2f} MB"
|
||||||
|
|
@ -109,6 +109,21 @@ class TestPhase1IntegrationSQLite:
|
||||||
)
|
)
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
cursor.execute("""
|
||||||
|
CREATE TABLE pump_safety_limits (
|
||||||
|
station_id TEXT,
|
||||||
|
pump_id TEXT,
|
||||||
|
hard_min_speed_hz REAL,
|
||||||
|
hard_max_speed_hz REAL,
|
||||||
|
hard_min_level_m REAL,
|
||||||
|
hard_max_level_m REAL,
|
||||||
|
hard_max_power_kw REAL,
|
||||||
|
max_speed_change_hz_per_min REAL,
|
||||||
|
PRIMARY KEY (station_id, pump_id),
|
||||||
|
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
|
||||||
# Insert test data
|
# Insert test data
|
||||||
cursor.execute("""
|
cursor.execute("""
|
||||||
INSERT INTO pump_stations (station_id, station_name, location) VALUES
|
INSERT INTO pump_stations (station_id, station_name, location) VALUES
|
||||||
|
|
@ -146,6 +161,17 @@ class TestPhase1IntegrationSQLite:
|
||||||
('STATION_002', 'PUMP_001', 40.0, 18.3, 142.1, 1.9, 1, 0)
|
('STATION_002', 'PUMP_001', 40.0, 18.3, 142.1, 1.9, 1, 0)
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT INTO pump_safety_limits (
|
||||||
|
station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
|
||||||
|
hard_min_level_m, hard_max_level_m, hard_max_power_kw,
|
||||||
|
max_speed_change_hz_per_min
|
||||||
|
) VALUES
|
||||||
|
('STATION_001', 'PUMP_001', 20.0, 60.0, 1.0, 3.0, 25.0, 10.0),
|
||||||
|
('STATION_001', 'PUMP_002', 20.0, 60.0, 1.0, 3.0, 25.0, 10.0),
|
||||||
|
('STATION_002', 'PUMP_001', 20.0, 60.0, 1.0, 3.0, 25.0, 10.0)
|
||||||
|
""")
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,262 @@
|
||||||
|
"""
|
||||||
|
Integration tests for safety workflow scenarios.
|
||||||
|
|
||||||
|
Tests safety limit violations, emergency stop workflows, and failsafe mode operations.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
from unittest.mock import Mock, AsyncMock
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from src.database.flexible_client import FlexibleDatabaseClient
|
||||||
|
from src.core.auto_discovery import AutoDiscovery
|
||||||
|
from src.core.safety import SafetyLimitEnforcer
|
||||||
|
from src.core.emergency_stop import EmergencyStopManager
|
||||||
|
from src.core.setpoint_manager import SetpointManager
|
||||||
|
from src.core.security import SecurityManager
|
||||||
|
from src.core.compliance_audit import ComplianceAuditLogger
|
||||||
|
from src.monitoring.watchdog import DatabaseWatchdog
|
||||||
|
|
||||||
|
|
||||||
|
class TestSafetyWorkflows:
|
||||||
|
"""Test safety-related workflows and scenarios."""
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def db_client(self):
|
||||||
|
"""Create database client with safety test data."""
|
||||||
|
client = FlexibleDatabaseClient(
|
||||||
|
database_url="sqlite:///:memory:",
|
||||||
|
pool_size=5,
|
||||||
|
max_overflow=10,
|
||||||
|
pool_timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
|
await client.connect()
|
||||||
|
client.create_tables()
|
||||||
|
|
||||||
|
# Insert safety test data
|
||||||
|
self._insert_safety_test_data(client)
|
||||||
|
return client
|
||||||
|
|
||||||
|
def _insert_safety_test_data(self, db_client):
|
||||||
|
"""Insert comprehensive safety test data."""
|
||||||
|
# Insert stations
|
||||||
|
db_client.execute(
|
||||||
|
"""INSERT INTO pump_stations (station_id, station_name, location) VALUES
|
||||||
|
('SAFETY_STATION_001', 'Safety Test Station 1', 'Test Location A'),
|
||||||
|
('SAFETY_STATION_002', 'Safety Test Station 2', 'Test Location B')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert pumps with different safety configurations
|
||||||
|
db_client.execute(
|
||||||
|
"""INSERT INTO pumps (station_id, pump_id, pump_name, control_type,
|
||||||
|
min_speed_hz, max_speed_hz, default_setpoint_hz) VALUES
|
||||||
|
('SAFETY_STATION_001', 'SAFETY_PUMP_001', 'Safety Pump 1', 'DIRECT_SPEED', 20.0, 60.0, 35.0),
|
||||||
|
('SAFETY_STATION_001', 'SAFETY_PUMP_002', 'Safety Pump 2', 'LEVEL_CONTROLLED', 25.0, 55.0, 40.0),
|
||||||
|
('SAFETY_STATION_002', 'SAFETY_PUMP_003', 'Safety Pump 3', 'POWER_CONTROLLED', 30.0, 50.0, 38.0)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert safety limits with different scenarios
|
||||||
|
db_client.execute(
|
||||||
|
"""INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz,
|
||||||
|
hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m,
|
||||||
|
hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds,
|
||||||
|
max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES
|
||||||
|
('SAFETY_STATION_001', 'SAFETY_PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('SAFETY_STATION_001', 'SAFETY_PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'),
|
||||||
|
('SAFETY_STATION_002', 'SAFETY_PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert optimization plans that might trigger safety limits
|
||||||
|
import datetime
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
db_client.execute(
|
||||||
|
f"""INSERT INTO pump_plans (station_id, pump_id, interval_start, interval_end,
|
||||||
|
suggested_speed_hz, target_level_m, target_power_kw, plan_status) VALUES
|
||||||
|
('SAFETY_STATION_001', 'SAFETY_PUMP_001', '{now}', '{now + datetime.timedelta(hours=1)}', 65.0, 3.0, 12.0, 'ACTIVE'),
|
||||||
|
('SAFETY_STATION_001', 'SAFETY_PUMP_002', '{now}', '{now + datetime.timedelta(hours=1)}', 15.0, 2.5, 10.0, 'ACTIVE'),
|
||||||
|
('SAFETY_STATION_002', 'SAFETY_PUMP_003', '{now}', '{now + datetime.timedelta(hours=1)}', 45.0, 3.5, 8.0, 'ACTIVE')"""
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def safety_components(self, db_client):
|
||||||
|
"""Create safety-related components for testing."""
|
||||||
|
# Create auto discovery
|
||||||
|
auto_discovery = AutoDiscovery(db_client)
|
||||||
|
await auto_discovery.discover()
|
||||||
|
|
||||||
|
# Create safety components
|
||||||
|
safety_enforcer = SafetyLimitEnforcer(db_client)
|
||||||
|
emergency_stop_manager = EmergencyStopManager(db_client)
|
||||||
|
|
||||||
|
# Load safety limits
|
||||||
|
await safety_enforcer.load_safety_limits()
|
||||||
|
|
||||||
|
# Create mock alert manager for watchdog
|
||||||
|
mock_alert_manager = Mock()
|
||||||
|
watchdog = DatabaseWatchdog(db_client, mock_alert_manager)
|
||||||
|
|
||||||
|
# Create setpoint manager
|
||||||
|
setpoint_manager = SetpointManager(
|
||||||
|
discovery=auto_discovery,
|
||||||
|
db_client=db_client,
|
||||||
|
safety_enforcer=safety_enforcer,
|
||||||
|
emergency_stop_manager=emergency_stop_manager,
|
||||||
|
watchdog=watchdog
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create security components
|
||||||
|
security_manager = SecurityManager()
|
||||||
|
audit_logger = ComplianceAuditLogger(db_client)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'auto_discovery': auto_discovery,
|
||||||
|
'safety_enforcer': safety_enforcer,
|
||||||
|
'emergency_stop_manager': emergency_stop_manager,
|
||||||
|
'setpoint_manager': setpoint_manager,
|
||||||
|
'security_manager': security_manager,
|
||||||
|
'audit_logger': audit_logger,
|
||||||
|
'watchdog': watchdog
|
||||||
|
}
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_safety_limit_violation_workflow(self, safety_components):
|
||||||
|
"""Test complete workflow for safety limit violations."""
|
||||||
|
setpoint_manager = safety_components['setpoint_manager']
|
||||||
|
safety_enforcer = safety_components['safety_enforcer']
|
||||||
|
|
||||||
|
# Test pump with optimization plan that exceeds safety limits
|
||||||
|
station_id = 'SAFETY_STATION_001'
|
||||||
|
pump_id = 'SAFETY_PUMP_001'
|
||||||
|
|
||||||
|
# Get setpoint (should be enforced to max limit)
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
|
||||||
|
# Verify setpoint is within safety limits
|
||||||
|
pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id)
|
||||||
|
max_speed = pump_info.get('max_speed_hz', 60.0)
|
||||||
|
|
||||||
|
# The setpoint should be enforced to the maximum safety limit (60.0)
|
||||||
|
# even though the optimization plan requests 65.0
|
||||||
|
assert setpoint <= max_speed, f"Setpoint {setpoint} exceeds safety limit {max_speed}"
|
||||||
|
assert setpoint == 60.0, f"Setpoint should be enforced to safety limit, got {setpoint}"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_emergency_stop_clearance_workflow(self, safety_components):
|
||||||
|
"""Test emergency stop activation and clearance workflow."""
|
||||||
|
setpoint_manager = safety_components['setpoint_manager']
|
||||||
|
emergency_stop_manager = safety_components['emergency_stop_manager']
|
||||||
|
|
||||||
|
station_id = 'SAFETY_STATION_001'
|
||||||
|
pump_id = 'SAFETY_PUMP_002'
|
||||||
|
|
||||||
|
# Get normal setpoint
|
||||||
|
normal_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert normal_setpoint is not None
|
||||||
|
|
||||||
|
# Activate emergency stop
|
||||||
|
emergency_stop_manager.emergency_stop_pump(station_id, pump_id, "test_workflow")
|
||||||
|
|
||||||
|
# Verify setpoint is 0 during emergency stop
|
||||||
|
emergency_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert emergency_setpoint == 0.0, "Setpoint not zero during emergency stop"
|
||||||
|
|
||||||
|
# Clear emergency stop
|
||||||
|
emergency_stop_manager.clear_emergency_stop_pump(station_id, pump_id, "test_clearance")
|
||||||
|
|
||||||
|
# Verify setpoint returns to normal
|
||||||
|
recovered_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert recovered_setpoint == normal_setpoint, "Setpoint did not recover after emergency stop clearance"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_multiple_safety_violations(self, safety_components):
|
||||||
|
"""Test handling of multiple simultaneous safety violations."""
|
||||||
|
setpoint_manager = safety_components['setpoint_manager']
|
||||||
|
safety_enforcer = safety_components['safety_enforcer']
|
||||||
|
|
||||||
|
# Test multiple pumps with different safety violations
|
||||||
|
test_cases = [
|
||||||
|
('SAFETY_STATION_001', 'SAFETY_PUMP_001', 60.0), # Exceeds max speed (65.0 -> 60.0)
|
||||||
|
('SAFETY_STATION_001', 'SAFETY_PUMP_002', 25.0), # Below min speed (15.0 -> 25.0)
|
||||||
|
('SAFETY_STATION_002', 'SAFETY_PUMP_003', 45.0), # Within limits (45.0 -> 45.0)
|
||||||
|
]
|
||||||
|
|
||||||
|
for station_id, pump_id, expected_enforced_setpoint in test_cases:
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
|
||||||
|
# Verify setpoint is within safety limits
|
||||||
|
pump_info = setpoint_manager.discovery.get_pump(station_id, pump_id)
|
||||||
|
min_speed = pump_info.get('min_speed_hz', 20.0)
|
||||||
|
max_speed = pump_info.get('max_speed_hz', 60.0)
|
||||||
|
|
||||||
|
assert min_speed <= setpoint <= max_speed, \
|
||||||
|
f"Setpoint {setpoint} out of range [{min_speed}, {max_speed}] for {station_id}/{pump_id}"
|
||||||
|
|
||||||
|
# Verify specific enforcement
|
||||||
|
assert setpoint == expected_enforced_setpoint, \
|
||||||
|
f"Setpoint enforcement failed for {station_id}/{pump_id}: expected {expected_enforced_setpoint}, got {setpoint}"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_safety_limit_dynamic_updates(self, safety_components, db_client):
|
||||||
|
"""Test that safety limit updates are reflected in real-time."""
|
||||||
|
setpoint_manager = safety_components['setpoint_manager']
|
||||||
|
safety_enforcer = safety_components['safety_enforcer']
|
||||||
|
|
||||||
|
station_id = 'SAFETY_STATION_001'
|
||||||
|
pump_id = 'SAFETY_PUMP_001'
|
||||||
|
|
||||||
|
# Get initial setpoint (should be limited to 60.0 from the 65.0 plan)
|
||||||
|
initial_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert initial_setpoint == 60.0, f"Initial setpoint should be limited to 60.0, got {initial_setpoint}"
|
||||||
|
|
||||||
|
# Update safety limits to be more restrictive
|
||||||
|
db_client.execute(
|
||||||
|
"""UPDATE pump_safety_limits
|
||||||
|
SET hard_max_speed_hz = 45.0
|
||||||
|
WHERE station_id = 'SAFETY_STATION_001' AND pump_id = 'SAFETY_PUMP_001'"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Reload safety limits
|
||||||
|
await safety_enforcer.load_safety_limits()
|
||||||
|
|
||||||
|
# Get updated setpoint (should now be limited to 45.0)
|
||||||
|
updated_setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
|
||||||
|
# Verify setpoint is now enforced to new limit
|
||||||
|
assert updated_setpoint == 45.0, f"Setpoint {updated_setpoint} not enforced to new limit 45.0"
|
||||||
|
assert updated_setpoint < initial_setpoint, f"Setpoint should be lower after safety limit update: {updated_setpoint} < {initial_setpoint}"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_emergency_stop_cascade_effects(self, safety_components):
|
||||||
|
"""Test cascade effects of emergency stops on related systems."""
|
||||||
|
setpoint_manager = safety_components['setpoint_manager']
|
||||||
|
emergency_stop_manager = safety_components['emergency_stop_manager']
|
||||||
|
|
||||||
|
# Activate station-wide emergency stop
|
||||||
|
station_id = 'SAFETY_STATION_001'
|
||||||
|
emergency_stop_manager.emergency_stop_station(station_id, "station_emergency_test")
|
||||||
|
|
||||||
|
# Verify all pumps in the station are stopped
|
||||||
|
pumps = setpoint_manager.discovery.get_pumps(station_id)
|
||||||
|
for pump in pumps:
|
||||||
|
pump_id = pump['pump_id']
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert setpoint == 0.0, f"Pump {pump_id} not stopped during station emergency"
|
||||||
|
|
||||||
|
# Verify pumps in other stations are unaffected
|
||||||
|
other_station_id = 'SAFETY_STATION_002'
|
||||||
|
other_pumps = setpoint_manager.discovery.get_pumps(other_station_id)
|
||||||
|
for pump in other_pumps:
|
||||||
|
pump_id = pump['pump_id']
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(other_station_id, pump_id)
|
||||||
|
assert setpoint > 0.0, f"Pump {pump_id} incorrectly stopped during unrelated station emergency"
|
||||||
|
|
||||||
|
# Clear station emergency stop
|
||||||
|
emergency_stop_manager.clear_emergency_stop_station(station_id, "station_clearance_test")
|
||||||
|
|
||||||
|
# Verify pumps return to normal operation
|
||||||
|
for pump in pumps:
|
||||||
|
pump_id = pump['pump_id']
|
||||||
|
setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id)
|
||||||
|
assert setpoint > 0.0, f"Pump {pump_id} not recovered after station emergency clearance"
|
||||||
Loading…
Reference in New Issue