300 lines
10 KiB
Markdown
300 lines
10 KiB
Markdown
|
|
# Calejo Control Adapter - Testing & Validation Guide
|
||
|
|
|
||
|
|
## Overview
|
||
|
|
|
||
|
|
This guide provides comprehensive testing and validation procedures for the Calejo Control Adapter, ensuring system reliability, safety compliance, and operational readiness.
|
||
|
|
|
||
|
|
## Test Framework Architecture
|
||
|
|
|
||
|
|
### Test Categories
|
||
|
|
|
||
|
|
```
|
||
|
|
┌─────────────────────────────────────────────────────────┐
|
||
|
|
│ Test Framework │
|
||
|
|
├─────────────────────────────────────────────────────────┤
|
||
|
|
│ Unit Tests │ Integration Tests │
|
||
|
|
│ - Core Components │ - Component Interactions │
|
||
|
|
│ - Safety Framework │ - Protocol Integration │
|
||
|
|
│ - Security Layer │ - Database Operations │
|
||
|
|
└─────────────────────────────────────────────────────────┘
|
||
|
|
│ End-to-End Tests │ Deployment Tests │
|
||
|
|
│ - Full Workflows │ - Production Validation │
|
||
|
|
│ - Safety Scenarios │ - Performance Validation │
|
||
|
|
└─────────────────────────────────────────────────────────┘
|
||
|
|
```
|
||
|
|
|
||
|
|
### Test Environment Setup
|
||
|
|
|
||
|
|
#### Development Environment
|
||
|
|
|
||
|
|
```bash
|
||
|
|
# Set up test environment
|
||
|
|
python -m venv venv-test
|
||
|
|
source venv-test/bin/activate
|
||
|
|
|
||
|
|
# Install test dependencies
|
||
|
|
pip install -r requirements-test.txt
|
||
|
|
|
||
|
|
# Configure test database
|
||
|
|
export TEST_DATABASE_URL=postgresql://test_user:test_pass@localhost:5432/calejo_test
|
||
|
|
```
|
||
|
|
|
||
|
|
#### Test Database Configuration
|
||
|
|
|
||
|
|
```sql
|
||
|
|
-- Create test database
|
||
|
|
CREATE DATABASE calejo_test;
|
||
|
|
CREATE USER test_user WITH PASSWORD 'test_pass';
|
||
|
|
GRANT ALL PRIVILEGES ON DATABASE calejo_test TO test_user;
|
||
|
|
|
||
|
|
-- Test data setup
|
||
|
|
INSERT INTO safety_limits (station_id, pump_id, hard_min_speed_hz, hard_max_speed_hz, max_speed_change_hz_per_min)
|
||
|
|
VALUES ('test_station', 'test_pump', 20.0, 50.0, 30.0);
|
||
|
|
```
|
||
|
|
|
||
|
|
## Unit Testing
|
||
|
|
|
||
|
|
### Core Component Tests
|
||
|
|
|
||
|
|
#### Safety Framework Tests
|
||
|
|
|
||
|
|
```python
|
||
|
|
# tests/unit/test_safety_framework.py
|
||
|
|
import pytest
|
||
|
|
from src.core.safety import SafetyFramework
|
||
|
|
|
||
|
|
class TestSafetyFramework:
|
||
|
|
def test_safety_limits_enforcement(self):
|
||
|
|
"""Test that safety limits are properly enforced"""
|
||
|
|
safety = SafetyFramework()
|
||
|
|
|
||
|
|
# Test within limits
|
||
|
|
result = safety.validate_setpoint('station_001', 'pump_001', 35.0)
|
||
|
|
assert result.valid == True
|
||
|
|
assert result.enforced_setpoint == 35.0
|
||
|
|
|
||
|
|
# Test above maximum limit
|
||
|
|
result = safety.validate_setpoint('station_001', 'pump_001', 55.0)
|
||
|
|
assert result.valid == False
|
||
|
|
assert result.enforced_setpoint == 50.0
|
||
|
|
assert result.violations == ['ABOVE_MAX_SPEED']
|
||
|
|
|
||
|
|
def test_rate_of_change_limiting(self):
|
||
|
|
"""Test rate of change limiting"""
|
||
|
|
safety = SafetyFramework()
|
||
|
|
|
||
|
|
# Test acceptable change
|
||
|
|
result = safety.validate_setpoint_change('station_001', 'pump_001', 30.0, 35.0)
|
||
|
|
assert result.valid == True
|
||
|
|
|
||
|
|
# Test excessive change
|
||
|
|
result = safety.validate_setpoint_change('station_001', 'pump_001', 30.0, 70.0)
|
||
|
|
assert result.valid == False
|
||
|
|
assert result.violations == ['EXCESSIVE_RATE_OF_CHANGE']
|
||
|
|
```
|
||
|
|
|
||
|
|
#### Security Layer Tests
|
||
|
|
|
||
|
|
```python
|
||
|
|
# tests/unit/test_security.py
|
||
|
|
import pytest
|
||
|
|
from src.security.authentication import AuthenticationManager
|
||
|
|
from src.security.authorization import AuthorizationManager
|
||
|
|
|
||
|
|
class TestAuthentication:
|
||
|
|
def test_jwt_token_validation(self):
|
||
|
|
"""Test JWT token creation and validation"""
|
||
|
|
auth = AuthenticationManager()
|
||
|
|
|
||
|
|
# Create token
|
||
|
|
token = auth.create_token('user_001', 'operator')
|
||
|
|
assert token is not None
|
||
|
|
|
||
|
|
# Validate token
|
||
|
|
payload = auth.validate_token(token)
|
||
|
|
assert payload['user_id'] == 'user_001'
|
||
|
|
assert payload['role'] == 'operator'
|
||
|
|
|
||
|
|
def test_password_hashing(self):
|
||
|
|
"""Test password hashing and verification"""
|
||
|
|
auth = AuthenticationManager()
|
||
|
|
|
||
|
|
password = 'secure_password'
|
||
|
|
hashed = auth.hash_password(password)
|
||
|
|
|
||
|
|
# Verify password
|
||
|
|
assert auth.verify_password(password, hashed) == True
|
||
|
|
assert auth.verify_password('wrong_password', hashed) == False
|
||
|
|
|
||
|
|
class TestAuthorization:
|
||
|
|
def test_role_based_access_control(self):
|
||
|
|
"""Test RBAC permissions"""
|
||
|
|
authz = AuthorizationManager()
|
||
|
|
|
||
|
|
# Test operator permissions
|
||
|
|
assert authz.has_permission('operator', 'read_pump_status') == True
|
||
|
|
assert authz.has_permission('operator', 'emergency_stop') == True
|
||
|
|
assert authz.has_permission('operator', 'user_management') == False
|
||
|
|
|
||
|
|
# Test administrator permissions
|
||
|
|
assert authz.has_permission('administrator', 'user_management') == True
|
||
|
|
```
|
||
|
|
|
||
|
|
#### Protocol Server Tests
|
||
|
|
|
||
|
|
```python
|
||
|
|
# tests/unit/test_protocols.py
|
||
|
|
import pytest
|
||
|
|
from src.protocols.opcua_server import OPCUAServer
|
||
|
|
from src.protocols.modbus_server import ModbusServer
|
||
|
|
|
||
|
|
class TestOPCUAServer:
|
||
|
|
def test_node_creation(self):
|
||
|
|
"""Test OPC UA node creation and management"""
|
||
|
|
server = OPCUAServer()
|
||
|
|
|
||
|
|
# Create pump node
|
||
|
|
node_id = server.create_pump_node('station_001', 'pump_001')
|
||
|
|
assert node_id is not None
|
||
|
|
|
||
|
|
# Verify node exists
|
||
|
|
assert server.node_exists(node_id) == True
|
||
|
|
|
||
|
|
def test_data_publishing(self):
|
||
|
|
"""Test OPC UA data publishing"""
|
||
|
|
server = OPCUAServer()
|
||
|
|
|
||
|
|
# Publish setpoint data
|
||
|
|
success = server.publish_setpoint('station_001', 'pump_001', 35.5)
|
||
|
|
assert success == True
|
||
|
|
|
||
|
|
class TestModbusServer:
|
||
|
|
def test_register_mapping(self):
|
||
|
|
"""Test Modbus register mapping"""
|
||
|
|
server = ModbusServer()
|
||
|
|
|
||
|
|
# Map pump registers
|
||
|
|
registers = server.map_pump_registers('station_001', 'pump_001')
|
||
|
|
assert len(registers) > 0
|
||
|
|
assert 'setpoint' in registers
|
||
|
|
assert 'actual_speed' in registers
|
||
|
|
|
||
|
|
def test_data_encoding(self):
|
||
|
|
"""Test Modbus data encoding/decoding"""
|
||
|
|
server = ModbusServer()
|
||
|
|
|
||
|
|
# Test float encoding
|
||
|
|
encoded = server.encode_float(35.5)
|
||
|
|
decoded = server.decode_float(encoded)
|
||
|
|
assert abs(decoded - 35.5) < 0.01
|
||
|
|
```
|
||
|
|
|
||
|
|
### Test Coverage Requirements
|
||
|
|
|
||
|
|
#### Minimum Coverage Targets
|
||
|
|
|
||
|
|
| Component | Target Coverage | Critical Paths |
|
||
|
|
|-----------|----------------|----------------|
|
||
|
|
| **Safety Framework** | 95% | All limit checks, emergency procedures |
|
||
|
|
| **Security Layer** | 90% | Authentication, authorization, audit |
|
||
|
|
| **Protocol Servers** | 85% | Data encoding, connection handling |
|
||
|
|
| **Database Layer** | 80% | CRUD operations, transactions |
|
||
|
|
| **Core Components** | 85% | Setpoint management, discovery |
|
||
|
|
|
||
|
|
#### Coverage Reporting
|
||
|
|
|
||
|
|
```bash
|
||
|
|
# Generate coverage report
|
||
|
|
pytest --cov=src --cov-report=html --cov-report=term-missing
|
||
|
|
|
||
|
|
# Check specific component coverage
|
||
|
|
pytest --cov=src.core.safety --cov-report=term-missing
|
||
|
|
|
||
|
|
# Generate coverage badge
|
||
|
|
coverage-badge -o coverage.svg
|
||
|
|
```
|
||
|
|
|
||
|
|
## Integration Testing
|
||
|
|
|
||
|
|
### Component Integration Tests
|
||
|
|
|
||
|
|
#### Safety-Protocol Integration
|
||
|
|
|
||
|
|
```python
|
||
|
|
# tests/integration/test_safety_protocol_integration.py
|
||
|
|
import pytest
|
||
|
|
from src.core.safety import SafetyFramework
|
||
|
|
from src.protocols.opcua_server import OPCUAServer
|
||
|
|
|
||
|
|
class TestSafetyProtocolIntegration:
|
||
|
|
def test_safety_enforced_setpoint_publishing(self):
|
||
|
|
"""Test that safety-enforced setpoints are published correctly"""
|
||
|
|
safety = SafetyFramework()
|
||
|
|
opcua = OPCUAServer()
|
||
|
|
|
||
|
|
# Attempt to set unsafe setpoint
|
||
|
|
validation = safety.validate_setpoint('station_001', 'pump_001', 55.0)
|
||
|
|
|
||
|
|
# Publish enforced setpoint
|
||
|
|
if not validation.valid:
|
||
|
|
success = opcua.publish_setpoint('station_001', 'pump_001', validation.enforced_setpoint)
|
||
|
|
assert success == True
|
||
|
|
assert validation.enforced_setpoint == 50.0 # Enforced to max limit
|
||
|
|
|
||
|
|
def test_emergency_stop_protocol_notification(self):
|
||
|
|
"""Test emergency stop notification across protocols"""
|
||
|
|
safety = SafetyFramework()
|
||
|
|
opcua = OPCUAServer()
|
||
|
|
modbus = ModbusServer()
|
||
|
|
|
||
|
|
# Activate emergency stop
|
||
|
|
safety.activate_emergency_stop('station_001', 'operator_001', 'Test emergency')
|
||
|
|
|
||
|
|
# Verify all protocols reflect emergency state
|
||
|
|
assert opcua.get_emergency_status('station_001') == True
|
||
|
|
assert modbus.get_emergency_status('station_001') == True
|
||
|
|
```
|
||
|
|
|
||
|
|
#### Database-Application Integration
|
||
|
|
|
||
|
|
```python
|
||
|
|
# tests/integration/test_database_integration.py
|
||
|
|
import pytest
|
||
|
|
from src.database.flexible_client import FlexibleDatabaseClient
|
||
|
|
from src.core.optimization_manager import OptimizationManager
|
||
|
|
|
||
|
|
class TestDatabaseIntegration:
|
||
|
|
def test_optimization_plan_loading(self):
|
||
|
|
"""Test loading optimization plans from database"""
|
||
|
|
db = FlexibleDatabaseClient()
|
||
|
|
manager = OptimizationManager()
|
||
|
|
|
||
|
|
# Load optimization plans
|
||
|
|
plans = db.get_optimization_plans('station_001')
|
||
|
|
assert len(plans) > 0
|
||
|
|
|
||
|
|
# Process plans
|
||
|
|
for plan in plans:
|
||
|
|
success = manager.process_optimization_plan(plan)
|
||
|
|
assert success == True
|
||
|
|
|
||
|
|
def test_safety_limits_persistence(self):
|
||
|
|
"""Test safety limits persistence and retrieval"""
|
||
|
|
db = FlexibleDatabaseClient()
|
||
|
|
safety = SafetyFramework()
|
||
|
|
|
||
|
|
# Update safety limits
|
||
|
|
new_limits = {
|
||
|
|
'hard_min_speed_hz': 25.0,
|
||
|
|
'hard_max_speed_hz': 48.0,
|
||
|
|
'max_speed_change_hz_per_min': 25.0
|
||
|
|
}
|
||
|
|
|
||
|
|
success = db.update_safety_limits('station_001', 'pump_001', new_limits)
|
||
|
|
assert success == True
|
||
|
|
|
||
|
|
# Verify limits are loaded by safety framework
|
||
|
|
limits = safety.get_safety_limits('station_001', 'pump_001')
|
||
|
|
assert limits.hard_min_speed_hz == 25.0
|
||
|
|
assert limits.hard_max_speed_hz == 48.0
|
||
|
|
```
|