CalejoControl/tests/integration/test_protocol_security.py

280 lines
9.6 KiB
Python

"""
Simplified integration tests for protocol security features.
Tests security integration across OPC UA, Modbus TCP, and REST API protocols
without actually starting the servers.
"""
import pytest
from unittest.mock import Mock, patch
from src.core.setpoint_manager import SetpointManager
from src.core.emergency_stop import EmergencyStopManager
from src.core.security import SecurityManager
from src.core.tls_manager import TLSManager
from src.core.compliance_audit import ComplianceAuditLogger
from src.protocols.opcua_server import OPCUAServer
from src.protocols.modbus_server import ModbusServer
from src.protocols.rest_api import RESTAPIServer
@pytest.fixture
def mock_db_client():
"""Create mock database client."""
return Mock()
@pytest.fixture
def security_components(mock_db_client):
"""Create security components for testing."""
security_manager = SecurityManager()
tls_manager = TLSManager()
audit_logger = ComplianceAuditLogger(db_client=mock_db_client)
return security_manager, tls_manager, audit_logger
@pytest.fixture
def setpoint_manager(mock_db_client):
"""Create setpoint manager for testing."""
# Create mock dependencies
mock_discovery = Mock()
mock_safety_enforcer = Mock()
mock_emergency_stop_manager = Mock()
mock_watchdog = Mock()
# Configure mocks
mock_safety_enforcer.enforce_limits = Mock(return_value=40.0)
mock_emergency_stop_manager.is_emergency_stop_active = Mock(return_value=False)
mock_watchdog.is_failsafe_active = Mock(return_value=False)
manager = SetpointManager(
discovery=mock_discovery,
db_client=mock_db_client,
safety_enforcer=mock_safety_enforcer,
emergency_stop_manager=mock_emergency_stop_manager,
watchdog=mock_watchdog
)
return manager
@pytest.fixture
def emergency_stop_manager(mock_db_client):
"""Create emergency stop manager for testing."""
return EmergencyStopManager(db_client=mock_db_client)
class TestProtocolSecurityIntegration:
"""Integration tests for protocol security features."""
def test_opcua_security_integration(
self,
setpoint_manager,
security_components
):
"""Test OPC UA server security integration."""
security_manager, tls_manager, audit_logger = security_components
# Create OPC UA server with security enabled
opcua_server = OPCUAServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=True,
endpoint="opc.tcp://127.0.0.1:4841"
)
# Verify security configuration
security_status = opcua_server.get_security_status()
assert security_status["security_enabled"] == True
assert security_status["connected_clients"] == 0
# Check that security manager is properly integrated
assert opcua_server.security_manager == security_manager
assert opcua_server.audit_logger == audit_logger
def test_modbus_security_integration(
self,
setpoint_manager,
security_components
):
"""Test Modbus TCP server security integration."""
security_manager, tls_manager, audit_logger = security_components
# Create Modbus server with security enabled
modbus_server = ModbusServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=True,
allowed_ips=["127.0.0.1"],
rate_limit_per_minute=100,
host="127.0.0.1",
port=5021
)
# Verify security configuration
security_status = modbus_server.get_security_status()
assert security_status["security_enabled"] == True
assert security_status["connected_clients"] == 0
assert security_status["allowed_ips"] == ["127.0.0.1"]
assert security_status["rate_limit_per_minute"] == 100
# Check that security manager is properly integrated
assert modbus_server.security_manager == security_manager
assert modbus_server.audit_logger == audit_logger
def test_rest_api_security_integration(
self,
setpoint_manager,
emergency_stop_manager,
security_components
):
"""Test REST API security integration."""
security_manager, tls_manager, audit_logger = security_components
# Create REST API server
rest_api = RESTAPIServer(
setpoint_manager=setpoint_manager,
emergency_stop_manager=emergency_stop_manager,
host="127.0.0.1",
port=8001
)
# Verify that security dependencies are properly configured
assert rest_api.app is not None
# Check that security endpoints are registered
routes = [route.path for route in rest_api.app.routes]
assert "/api/v1/auth/login" in routes
assert "/api/v1/auth/me" in routes
assert "/api/v1/security/status" in routes
def test_protocol_security_audit_logging(
self,
setpoint_manager,
security_components
):
"""Test that security events are properly logged across protocols."""
security_manager, tls_manager, audit_logger = security_components
# Create servers
opcua_server = OPCUAServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=True,
endpoint="opc.tcp://127.0.0.1:4842"
)
modbus_server = ModbusServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=True,
host="127.0.0.1",
port=5022
)
# Verify audit logger is shared between protocols
assert opcua_server.audit_logger == audit_logger
assert modbus_server.audit_logger == audit_logger
# Verify security managers are shared
assert opcua_server.security_manager == security_manager
assert modbus_server.security_manager == security_manager
def test_security_status_endpoints(
self,
setpoint_manager,
security_components
):
"""Test that security status endpoints work correctly."""
security_manager, tls_manager, audit_logger = security_components
# Create servers
opcua_server = OPCUAServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=True,
endpoint="opc.tcp://127.0.0.1:4843"
)
modbus_server = ModbusServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=True,
host="127.0.0.1",
port=5023
)
# Test OPC UA security status
opcua_status = opcua_server.get_security_status()
assert isinstance(opcua_status, dict)
assert 'security_enabled' in opcua_status
assert 'connected_clients' in opcua_status
assert 'certificate_configured' in opcua_status
# Test Modbus security status
modbus_status = modbus_server.get_security_status()
assert isinstance(modbus_status, dict)
assert 'security_enabled' in modbus_status
assert 'connected_clients' in modbus_status
assert 'allowed_ips' in modbus_status
assert 'rate_limit_per_minute' in modbus_status
class TestProtocolSecurityConfiguration:
"""Tests for protocol security configuration options."""
def test_opcua_security_disabled(self, setpoint_manager, security_components):
"""Test OPC UA server with security disabled."""
security_manager, tls_manager, audit_logger = security_components
opcua_server = OPCUAServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=False,
endpoint="opc.tcp://127.0.0.1:4844"
)
security_status = opcua_server.get_security_status()
assert security_status["security_enabled"] == False
def test_modbus_security_disabled(self, setpoint_manager, security_components):
"""Test Modbus server with security disabled."""
security_manager, tls_manager, audit_logger = security_components
modbus_server = ModbusServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=False,
host="127.0.0.1",
port=5024
)
security_status = modbus_server.get_security_status()
assert security_status["security_enabled"] == False
def test_modbus_rate_limiting(self, setpoint_manager, security_components):
"""Test Modbus server rate limiting functionality."""
security_manager, tls_manager, audit_logger = security_components
modbus_server = ModbusServer(
setpoint_manager=setpoint_manager,
security_manager=security_manager,
audit_logger=audit_logger,
enable_security=True,
rate_limit_per_minute=5, # Very low limit for testing
host="127.0.0.1",
port=5025
)
# Test rate limiting configuration
security_status = modbus_server.get_security_status()
assert security_status["rate_limit_per_minute"] == 5