""" Performance and Load Testing for Calejo Control Adapter. Tests system behavior under load with concurrent connections, high-frequency updates, and stress conditions. """ import asyncio import time import pytest import pytest_asyncio from typing import Dict, List, Any import statistics from src.database.flexible_client import FlexibleDatabaseClient from src.core.auto_discovery import AutoDiscovery from src.core.setpoint_manager import SetpointManager from src.core.safety import SafetyLimitEnforcer from src.core.emergency_stop import EmergencyStopManager from src.core.optimization_manager import OptimizationPlanManager from src.core.security import SecurityManager from src.core.compliance_audit import ComplianceAuditLogger from src.protocols.opcua_server import OPCUAServer from src.protocols.modbus_server import ModbusServer from src.protocols.rest_api import RESTAPIServer from src.monitoring.watchdog import DatabaseWatchdog from unittest.mock import Mock class TestPerformanceLoad: """Performance and load testing for Calejo Control Adapter.""" @pytest_asyncio.fixture async def performance_db_client(self): """Create database client for performance testing.""" client = FlexibleDatabaseClient("sqlite:///:memory:") await client.connect() client.create_tables() # Insert performance test data client.execute( """INSERT INTO pump_stations (station_id, station_name, location) VALUES ('PERF_STATION_001', 'Performance Station 1', 'Test Area'), ('PERF_STATION_002', 'Performance Station 2', 'Test Area'), ('PERF_STATION_003', 'Performance Station 3', 'Test Area')""" ) client.execute( """INSERT INTO pumps (station_id, pump_id, pump_name, control_type, default_setpoint_hz) VALUES ('PERF_STATION_001', 'PERF_PUMP_001', 'Performance Pump 1', 'DIRECT_SPEED', 35.0), ('PERF_STATION_001', 'PERF_PUMP_002', 'Performance Pump 2', 'LEVEL_CONTROLLED', 40.0), ('PERF_STATION_002', 'PERF_PUMP_003', 'Performance Pump 3', 'POWER_CONTROLLED', 38.0), ('PERF_STATION_002', 'PERF_PUMP_004', 'Performance Pump 4', 'DIRECT_SPEED', 42.0), ('PERF_STATION_003', 'PERF_PUMP_005', 'Performance Pump 5', 'LEVEL_CONTROLLED', 37.0), ('PERF_STATION_003', 'PERF_PUMP_006', 'Performance Pump 6', 'POWER_CONTROLLED', 39.0)""" ) client.execute( """INSERT INTO pump_safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz, hard_min_level_m, hard_max_level_m, emergency_stop_level_m, dry_run_protection_level_m, hard_max_power_kw, hard_max_flow_m3h, max_starts_per_hour, min_run_time_seconds, max_continuous_run_hours, max_speed_change_hz_per_min, set_by, approved_by) VALUES ('PERF_STATION_001', 'PERF_PUMP_001', 20.0, 60.0, 1.0, 5.0, 0.5, 1.5, 15.0, 200.0, 6, 300, 24, 10.0, 'admin', 'admin'), ('PERF_STATION_001', 'PERF_PUMP_002', 25.0, 55.0, 1.5, 4.5, 1.0, 2.0, 12.0, 180.0, 6, 300, 24, 10.0, 'admin', 'admin'), ('PERF_STATION_002', 'PERF_PUMP_003', 30.0, 50.0, 2.0, 4.0, 1.5, 2.5, 10.0, 160.0, 6, 300, 24, 10.0, 'admin', 'admin'), ('PERF_STATION_002', 'PERF_PUMP_004', 15.0, 70.0, 0.5, 6.0, 0.2, 1.0, 20.0, 250.0, 6, 300, 24, 10.0, 'admin', 'admin'), ('PERF_STATION_003', 'PERF_PUMP_005', 22.0, 58.0, 1.2, 4.8, 0.8, 1.8, 14.0, 190.0, 6, 300, 24, 10.0, 'admin', 'admin'), ('PERF_STATION_003', 'PERF_PUMP_006', 28.0, 52.0, 1.8, 4.2, 1.2, 2.2, 11.0, 170.0, 6, 300, 24, 10.0, 'admin', 'admin')""" ) yield client await client.disconnect() @pytest_asyncio.fixture async def performance_components(self, performance_db_client): """Create all system components for performance testing.""" # Initialize core components discovery = AutoDiscovery(performance_db_client) await discovery.discover() safety_enforcer = SafetyLimitEnforcer(performance_db_client) await safety_enforcer.load_safety_limits() emergency_stop_manager = EmergencyStopManager(performance_db_client) # Create mock alert manager for watchdog mock_alert_manager = Mock() watchdog = DatabaseWatchdog(performance_db_client, mock_alert_manager) setpoint_manager = SetpointManager( db_client=performance_db_client, discovery=discovery, safety_enforcer=safety_enforcer, emergency_stop_manager=emergency_stop_manager, watchdog=watchdog ) await setpoint_manager.start() optimization_engine = OptimizationPlanManager(performance_db_client) security_manager = SecurityManager() audit_logger = ComplianceAuditLogger(performance_db_client) # Get dynamic ports for testing from tests.utils.port_utils import find_free_port opcua_port = find_free_port(4840) modbus_port = find_free_port(5020) rest_api_port = find_free_port(8001) # Initialize protocol servers opcua_server = OPCUAServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=False, # Disable security for testing endpoint=f"opc.tcp://127.0.0.1:{opcua_port}" ) modbus_server = ModbusServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, host="127.0.0.1", port=modbus_port ) rest_api = RESTAPIServer( setpoint_manager=setpoint_manager, emergency_stop_manager=emergency_stop_manager, host="127.0.0.1", port=rest_api_port ) components = { 'db_client': performance_db_client, 'discovery': discovery, 'safety_enforcer': safety_enforcer, 'setpoint_manager': setpoint_manager, 'emergency_stop_manager': emergency_stop_manager, 'optimization_engine': optimization_engine, 'security_manager': security_manager, 'opcua_server': opcua_server, 'modbus_server': modbus_server, 'rest_api': rest_api } yield components # Cleanup await setpoint_manager.stop() await opcua_server.stop() await modbus_server.stop() await rest_api.stop() @pytest.mark.asyncio async def test_concurrent_setpoint_updates(self, performance_components): """Test performance with concurrent setpoint updates.""" safety_enforcer = performance_components['safety_enforcer'] # Test parameters num_concurrent_tasks = 10 updates_per_task = 50 async def update_setpoint_task(task_id: int): """Task that performs multiple setpoint updates.""" latencies = [] for i in range(updates_per_task): station_id = f"PERF_STATION_{(task_id % 3) + 1:03d}" pump_id = f"PERF_PUMP_{(task_id % 6) + 1:03d}" setpoint = 35.0 + (task_id * 2) + (i * 0.1) start_time = time.perf_counter() enforced_setpoint, warnings = safety_enforcer.enforce_setpoint( station_id=station_id, pump_id=pump_id, setpoint=setpoint ) end_time = time.perf_counter() latency = (end_time - start_time) * 1000 # Convert to milliseconds latencies.append(latency) # Verify result is within safety limits (if safety limits exist) assert enforced_setpoint is not None if enforced_setpoint > 0: # Only check if safety limits exist assert 20.0 <= enforced_setpoint <= 70.0 # Within safety limits return latencies # Run concurrent tasks start_time = time.perf_counter() tasks = [update_setpoint_task(i) for i in range(num_concurrent_tasks)] results = await asyncio.gather(*tasks) end_time = time.perf_counter() # Calculate performance metrics total_updates = num_concurrent_tasks * updates_per_task total_time = end_time - start_time throughput = total_updates / total_time # Flatten all latencies all_latencies = [latency for task_latencies in results for latency in task_latencies] # Calculate statistics avg_latency = statistics.mean(all_latencies) p95_latency = statistics.quantiles(all_latencies, n=20)[18] # 95th percentile max_latency = max(all_latencies) # Performance assertions print(f"\nConcurrent Setpoint Updates Performance:") print(f" Total Updates: {total_updates}") print(f" Total Time: {total_time:.2f}s") print(f" Throughput: {throughput:.1f} updates/sec") print(f" Average Latency: {avg_latency:.2f}ms") print(f" 95th Percentile Latency: {p95_latency:.2f}ms") print(f" Max Latency: {max_latency:.2f}ms") # Performance requirements assert throughput > 100, f"Throughput too low: {throughput:.1f} updates/sec" assert avg_latency < 100, f"Average latency too high: {avg_latency:.2f}ms" assert p95_latency < 200, f"95th percentile latency too high: {p95_latency:.2f}ms" @pytest.mark.asyncio async def test_concurrent_protocol_access(self, performance_components): """Test performance with concurrent access across all protocols.""" setpoint_manager = performance_components['setpoint_manager'] opcua_server = performance_components['opcua_server'] modbus_server = performance_components['modbus_server'] rest_api = performance_components['rest_api'] # Start all servers await opcua_server.start() await modbus_server.start() await rest_api.start() # Wait for servers to initialize await asyncio.sleep(1) # Test parameters num_clients = 5 requests_per_client = 20 async def protocol_client_task(client_id: int): """Task that performs requests across different protocols.""" latencies = [] for i in range(requests_per_client): protocol = i % 3 station_id = f"PERF_STATION_{(client_id % 3) + 1:03d}" pump_id = f"PERF_PUMP_{(client_id % 6) + 1:03d}" start_time = time.perf_counter() if protocol == 0: # OPCUA-like access (simulated) setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) # Note: setpoint may be None if no optimization plans exist elif protocol == 1: # Modbus-like access (simulated) setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) # Note: setpoint may be None if no optimization plans exist else: # REST API-like access (simulated) setpoint = setpoint_manager.get_current_setpoint(station_id, pump_id) # Note: setpoint may be None if no optimization plans exist end_time = time.perf_counter() latency = (end_time - start_time) * 1000 latencies.append(latency) return latencies # Run concurrent clients start_time = time.perf_counter() tasks = [protocol_client_task(i) for i in range(num_clients)] results = await asyncio.gather(*tasks) end_time = time.perf_counter() # Calculate performance metrics total_requests = num_clients * requests_per_client total_time = end_time - start_time throughput = total_requests / total_time # Flatten all latencies all_latencies = [latency for client_latencies in results for latency in client_latencies] # Calculate statistics avg_latency = statistics.mean(all_latencies) p95_latency = statistics.quantiles(all_latencies, n=20)[18] # 95th percentile print(f"\nConcurrent Protocol Access Performance:") print(f" Total Requests: {total_requests}") print(f" Total Time: {total_time:.2f}s") print(f" Throughput: {throughput:.1f} requests/sec") print(f" Average Latency: {avg_latency:.2f}ms") print(f" 95th Percentile Latency: {p95_latency:.2f}ms") # Performance requirements assert throughput > 50, f"Protocol throughput too low: {throughput:.1f} requests/sec" assert avg_latency < 50, f"Protocol latency too high: {avg_latency:.2f}ms" @pytest.mark.asyncio async def test_memory_usage_under_load(self, performance_components): """Test memory usage doesn't grow excessively under sustained load.""" safety_enforcer = performance_components['safety_enforcer'] # Get initial memory usage (approximate) import psutil import os process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB # Sustained load test num_operations = 1000 for i in range(num_operations): station_id = f"PERF_STATION_{(i % 3) + 1:03d}" pump_id = f"PERF_PUMP_{(i % 6) + 1:03d}" setpoint = 35.0 + (i * 0.1) % 20.0 # Perform setpoint enforcement result, warnings = safety_enforcer.enforce_setpoint( station_id, pump_id, setpoint ) # Note: result may be 0.0 if no safety limits exist # Every 100 operations, check memory if i % 100 == 0: current_memory = process.memory_info().rss / 1024 / 1024 memory_increase = current_memory - initial_memory print(f" Operation {i}: Memory increase = {memory_increase:.2f} MB") # Check memory doesn't grow excessively assert memory_increase < 50, f"Memory growth too high: {memory_increase:.2f} MB" # Final memory check final_memory = process.memory_info().rss / 1024 / 1024 total_memory_increase = final_memory - initial_memory print(f"\nMemory Usage Under Load:") print(f" Initial Memory: {initial_memory:.2f} MB") print(f" Final Memory: {final_memory:.2f} MB") print(f" Total Increase: {total_memory_increase:.2f} MB") # Final memory requirement assert total_memory_increase < 100, f"Excessive memory growth: {total_memory_increase:.2f} MB"