""" Simplified integration tests for protocol security features. Tests security integration across OPC UA, Modbus TCP, and REST API protocols without actually starting the servers. """ import pytest from unittest.mock import Mock, patch from src.core.setpoint_manager import SetpointManager from src.core.emergency_stop import EmergencyStopManager from src.core.security import SecurityManager from src.core.tls_manager import TLSManager from src.core.compliance_audit import ComplianceAuditLogger from src.protocols.opcua_server import OPCUAServer from src.protocols.modbus_server import ModbusServer from src.protocols.rest_api import RESTAPIServer @pytest.fixture def mock_db_client(): """Create mock database client.""" return Mock() @pytest.fixture def security_components(mock_db_client): """Create security components for testing.""" security_manager = SecurityManager() tls_manager = TLSManager() audit_logger = ComplianceAuditLogger(db_client=mock_db_client) return security_manager, tls_manager, audit_logger @pytest.fixture def setpoint_manager(mock_db_client): """Create setpoint manager for testing.""" # Create mock dependencies mock_discovery = Mock() mock_safety_enforcer = Mock() mock_emergency_stop_manager = Mock() mock_watchdog = Mock() # Configure mocks mock_safety_enforcer.enforce_limits = Mock(return_value=40.0) mock_emergency_stop_manager.is_emergency_stop_active = Mock(return_value=False) mock_watchdog.is_failsafe_active = Mock(return_value=False) manager = SetpointManager( discovery=mock_discovery, db_client=mock_db_client, safety_enforcer=mock_safety_enforcer, emergency_stop_manager=mock_emergency_stop_manager, watchdog=mock_watchdog ) return manager @pytest.fixture def emergency_stop_manager(mock_db_client): """Create emergency stop manager for testing.""" return EmergencyStopManager(db_client=mock_db_client) class TestProtocolSecurityIntegration: """Integration tests for protocol security features.""" def test_opcua_security_integration( self, setpoint_manager, security_components ): """Test OPC UA server security integration.""" security_manager, tls_manager, audit_logger = security_components # Create OPC UA server with security enabled opcua_server = OPCUAServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=True, endpoint="opc.tcp://127.0.0.1:4841" ) # Verify security configuration security_status = opcua_server.get_security_status() assert security_status["security_enabled"] == True assert security_status["connected_clients"] == 0 # Check that security manager is properly integrated assert opcua_server.security_manager == security_manager assert opcua_server.audit_logger == audit_logger def test_modbus_security_integration( self, setpoint_manager, security_components ): """Test Modbus TCP server security integration.""" security_manager, tls_manager, audit_logger = security_components # Create Modbus server with security enabled modbus_server = ModbusServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=True, allowed_ips=["127.0.0.1"], rate_limit_per_minute=100, host="127.0.0.1", port=5021 ) # Verify security configuration security_status = modbus_server.get_security_status() assert security_status["security_enabled"] == True assert security_status["connected_clients"] == 0 assert security_status["allowed_ips"] == ["127.0.0.1"] assert security_status["rate_limit_per_minute"] == 100 # Check that security manager is properly integrated assert modbus_server.security_manager == security_manager assert modbus_server.audit_logger == audit_logger def test_rest_api_security_integration( self, setpoint_manager, emergency_stop_manager, security_components ): """Test REST API security integration.""" security_manager, tls_manager, audit_logger = security_components # Create REST API server rest_api = RESTAPIServer( setpoint_manager=setpoint_manager, emergency_stop_manager=emergency_stop_manager, host="127.0.0.1", port=8001 ) # Verify that security dependencies are properly configured assert rest_api.app is not None # Check that security endpoints are registered routes = [route.path for route in rest_api.app.routes] assert "/api/v1/auth/login" in routes assert "/api/v1/auth/me" in routes assert "/api/v1/security/status" in routes def test_protocol_security_audit_logging( self, setpoint_manager, security_components ): """Test that security events are properly logged across protocols.""" security_manager, tls_manager, audit_logger = security_components # Create servers opcua_server = OPCUAServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=True, endpoint="opc.tcp://127.0.0.1:4842" ) modbus_server = ModbusServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=True, host="127.0.0.1", port=5022 ) # Verify audit logger is shared between protocols assert opcua_server.audit_logger == audit_logger assert modbus_server.audit_logger == audit_logger # Verify security managers are shared assert opcua_server.security_manager == security_manager assert modbus_server.security_manager == security_manager def test_security_status_endpoints( self, setpoint_manager, security_components ): """Test that security status endpoints work correctly.""" security_manager, tls_manager, audit_logger = security_components # Create servers opcua_server = OPCUAServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=True, endpoint="opc.tcp://127.0.0.1:4843" ) modbus_server = ModbusServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=True, host="127.0.0.1", port=5023 ) # Test OPC UA security status opcua_status = opcua_server.get_security_status() assert isinstance(opcua_status, dict) assert 'security_enabled' in opcua_status assert 'connected_clients' in opcua_status assert 'certificate_configured' in opcua_status # Test Modbus security status modbus_status = modbus_server.get_security_status() assert isinstance(modbus_status, dict) assert 'security_enabled' in modbus_status assert 'connected_clients' in modbus_status assert 'allowed_ips' in modbus_status assert 'rate_limit_per_minute' in modbus_status class TestProtocolSecurityConfiguration: """Tests for protocol security configuration options.""" def test_opcua_security_disabled(self, setpoint_manager, security_components): """Test OPC UA server with security disabled.""" security_manager, tls_manager, audit_logger = security_components opcua_server = OPCUAServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=False, endpoint="opc.tcp://127.0.0.1:4844" ) security_status = opcua_server.get_security_status() assert security_status["security_enabled"] == False def test_modbus_security_disabled(self, setpoint_manager, security_components): """Test Modbus server with security disabled.""" security_manager, tls_manager, audit_logger = security_components modbus_server = ModbusServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=False, host="127.0.0.1", port=5024 ) security_status = modbus_server.get_security_status() assert security_status["security_enabled"] == False def test_modbus_rate_limiting(self, setpoint_manager, security_components): """Test Modbus server rate limiting functionality.""" security_manager, tls_manager, audit_logger = security_components modbus_server = ModbusServer( setpoint_manager=setpoint_manager, security_manager=security_manager, audit_logger=audit_logger, enable_security=True, rate_limit_per_minute=5, # Very low limit for testing host="127.0.0.1", port=5025 ) # Test rate limiting configuration security_status = modbus_server.get_security_status() assert security_status["rate_limit_per_minute"] == 5