423 lines
17 KiB
Python
423 lines
17 KiB
Python
|
|
"""
|
||
|
|
Tests for ConfigurationManager with Protocol Mapping functionality
|
||
|
|
"""
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
from unittest.mock import Mock, patch, AsyncMock
|
||
|
|
import asyncio
|
||
|
|
|
||
|
|
from src.dashboard.configuration_manager import (
|
||
|
|
ConfigurationManager,
|
||
|
|
ProtocolMapping,
|
||
|
|
ProtocolType,
|
||
|
|
SCADAProtocolConfig
|
||
|
|
)
|
||
|
|
from src.database.flexible_client import FlexibleDatabaseClient
|
||
|
|
|
||
|
|
|
||
|
|
class TestConfigurationManager:
|
||
|
|
"""Test ConfigurationManager protocol mapping functionality"""
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def mock_db_client(self):
|
||
|
|
"""Create mock database client"""
|
||
|
|
mock_client = Mock(spec=FlexibleDatabaseClient)
|
||
|
|
mock_client.execute_query.return_value = []
|
||
|
|
mock_client.execute.return_value = 1
|
||
|
|
return mock_client
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def config_manager_no_db(self):
|
||
|
|
"""Create ConfigurationManager without database"""
|
||
|
|
return ConfigurationManager()
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def config_manager_with_db(self, mock_db_client):
|
||
|
|
"""Create ConfigurationManager with database client"""
|
||
|
|
return ConfigurationManager(db_client=mock_db_client)
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def sample_mappings(self):
|
||
|
|
"""Create sample protocol mappings for testing"""
|
||
|
|
return [
|
||
|
|
ProtocolMapping(
|
||
|
|
id="modbus_tcp_001",
|
||
|
|
station_id="station_001",
|
||
|
|
pump_id="pump_001",
|
||
|
|
protocol_type=ProtocolType.MODBUS_TCP,
|
||
|
|
protocol_address="100",
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
),
|
||
|
|
ProtocolMapping(
|
||
|
|
id="opcua_001",
|
||
|
|
station_id="station_001",
|
||
|
|
pump_id="pump_001",
|
||
|
|
protocol_type=ProtocolType.OPC_UA,
|
||
|
|
protocol_address="ns=2;s=Station_001.Pump_001.Setpoint_Hz",
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
),
|
||
|
|
ProtocolMapping(
|
||
|
|
id="modbus_rtu_001",
|
||
|
|
station_id="station_002",
|
||
|
|
pump_id="pump_001",
|
||
|
|
protocol_type=ProtocolType.MODBUS_RTU,
|
||
|
|
protocol_address="40001",
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
),
|
||
|
|
ProtocolMapping(
|
||
|
|
id="rest_api_001",
|
||
|
|
station_id="station_001",
|
||
|
|
pump_id="pump_002",
|
||
|
|
protocol_type=ProtocolType.REST_API,
|
||
|
|
protocol_address="https://api.example.com/v1/stations/001/pumps/002/setpoint",
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
)
|
||
|
|
]
|
||
|
|
|
||
|
|
def test_initialization_no_database(self, config_manager_no_db):
|
||
|
|
"""Test ConfigurationManager initialization without database"""
|
||
|
|
assert config_manager_no_db.protocol_mappings == []
|
||
|
|
assert config_manager_no_db.db_client is None
|
||
|
|
|
||
|
|
def test_initialization_with_database(self, config_manager_with_db, mock_db_client):
|
||
|
|
"""Test ConfigurationManager initialization with database"""
|
||
|
|
assert config_manager_with_db.db_client == mock_db_client
|
||
|
|
mock_db_client.execute_query.assert_called_once()
|
||
|
|
|
||
|
|
def test_add_protocol_mapping_no_db(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test adding protocol mapping without database"""
|
||
|
|
mapping = sample_mappings[0]
|
||
|
|
|
||
|
|
result = config_manager_no_db.add_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
assert result == True
|
||
|
|
assert len(config_manager_no_db.protocol_mappings) == 1
|
||
|
|
assert config_manager_no_db.protocol_mappings[0] == mapping
|
||
|
|
|
||
|
|
def test_add_protocol_mapping_with_db(self, config_manager_with_db, sample_mappings, mock_db_client):
|
||
|
|
"""Test adding protocol mapping with database"""
|
||
|
|
mapping = sample_mappings[0]
|
||
|
|
|
||
|
|
result = config_manager_with_db.add_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
assert result == True
|
||
|
|
assert len(config_manager_with_db.protocol_mappings) == 1
|
||
|
|
mock_db_client.execute.assert_called_once()
|
||
|
|
|
||
|
|
def test_add_duplicate_protocol_mapping(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test adding duplicate protocol mapping"""
|
||
|
|
mapping = sample_mappings[0]
|
||
|
|
|
||
|
|
# Add first mapping
|
||
|
|
result1 = config_manager_no_db.add_protocol_mapping(mapping)
|
||
|
|
assert result1 == True
|
||
|
|
|
||
|
|
# Try to add duplicate
|
||
|
|
result2 = config_manager_no_db.add_protocol_mapping(mapping)
|
||
|
|
assert result2 == False
|
||
|
|
assert len(config_manager_no_db.protocol_mappings) == 1
|
||
|
|
|
||
|
|
def test_get_protocol_mappings_empty(self, config_manager_no_db):
|
||
|
|
"""Test getting protocol mappings when empty"""
|
||
|
|
mappings = config_manager_no_db.get_protocol_mappings()
|
||
|
|
assert mappings == []
|
||
|
|
|
||
|
|
def test_get_protocol_mappings_with_data(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test getting protocol mappings with data"""
|
||
|
|
for mapping in sample_mappings:
|
||
|
|
config_manager_no_db.add_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
mappings = config_manager_no_db.get_protocol_mappings()
|
||
|
|
assert len(mappings) == 4
|
||
|
|
|
||
|
|
def test_get_protocol_mappings_with_filter(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test getting protocol mappings with protocol type filter"""
|
||
|
|
for mapping in sample_mappings:
|
||
|
|
config_manager_no_db.add_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
# Filter by Modbus TCP
|
||
|
|
modbus_mappings = config_manager_no_db.get_protocol_mappings(protocol_type=ProtocolType.MODBUS_TCP)
|
||
|
|
assert len(modbus_mappings) == 1
|
||
|
|
assert modbus_mappings[0].protocol_type == ProtocolType.MODBUS_TCP
|
||
|
|
|
||
|
|
# Filter by OPC UA
|
||
|
|
opcua_mappings = config_manager_no_db.get_protocol_mappings(protocol_type=ProtocolType.OPC_UA)
|
||
|
|
assert len(opcua_mappings) == 1
|
||
|
|
assert opcua_mappings[0].protocol_type == ProtocolType.OPC_UA
|
||
|
|
|
||
|
|
def test_update_protocol_mapping(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test updating protocol mapping"""
|
||
|
|
mapping = sample_mappings[0]
|
||
|
|
config_manager_no_db.add_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
# Create updated mapping
|
||
|
|
updated_mapping = ProtocolMapping(
|
||
|
|
id="modbus_tcp_001",
|
||
|
|
station_id="station_001",
|
||
|
|
pump_id="pump_001",
|
||
|
|
protocol_type=ProtocolType.MODBUS_TCP,
|
||
|
|
protocol_address="200", # Updated address
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
)
|
||
|
|
|
||
|
|
result = config_manager_no_db.update_protocol_mapping("modbus_tcp_001", updated_mapping)
|
||
|
|
|
||
|
|
assert result == True
|
||
|
|
updated = config_manager_no_db.get_protocol_mappings()[0]
|
||
|
|
assert updated.protocol_address == "200"
|
||
|
|
|
||
|
|
def test_update_nonexistent_mapping(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test updating non-existent protocol mapping"""
|
||
|
|
mapping = sample_mappings[0]
|
||
|
|
|
||
|
|
result = config_manager_no_db.update_protocol_mapping("nonexistent", mapping)
|
||
|
|
|
||
|
|
assert result == False
|
||
|
|
|
||
|
|
def test_delete_protocol_mapping(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test deleting protocol mapping"""
|
||
|
|
mapping = sample_mappings[0]
|
||
|
|
config_manager_no_db.add_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
result = config_manager_no_db.delete_protocol_mapping("modbus_tcp_001")
|
||
|
|
|
||
|
|
assert result == True
|
||
|
|
assert len(config_manager_no_db.protocol_mappings) == 0
|
||
|
|
|
||
|
|
def test_delete_nonexistent_mapping(self, config_manager_no_db):
|
||
|
|
"""Test deleting non-existent protocol mapping"""
|
||
|
|
result = config_manager_no_db.delete_protocol_mapping("nonexistent")
|
||
|
|
|
||
|
|
assert result == False
|
||
|
|
|
||
|
|
def test_validate_protocol_mapping_valid(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test validating valid protocol mapping"""
|
||
|
|
mapping = sample_mappings[0]
|
||
|
|
|
||
|
|
result = config_manager_no_db.validate_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
assert result["valid"] == True
|
||
|
|
assert len(result["errors"]) == 0
|
||
|
|
|
||
|
|
def test_validate_protocol_mapping_duplicate_id(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test validating protocol mapping with duplicate ID"""
|
||
|
|
mapping = sample_mappings[0]
|
||
|
|
config_manager_no_db.add_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
# Try to validate another mapping with same ID
|
||
|
|
duplicate_mapping = ProtocolMapping(
|
||
|
|
id="modbus_tcp_001", # Same ID
|
||
|
|
station_id="station_002",
|
||
|
|
pump_id="pump_002",
|
||
|
|
protocol_type=ProtocolType.MODBUS_TCP,
|
||
|
|
protocol_address="101",
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
)
|
||
|
|
|
||
|
|
result = config_manager_no_db.validate_protocol_mapping(duplicate_mapping)
|
||
|
|
|
||
|
|
assert result["valid"] == False
|
||
|
|
assert len(result["errors"]) > 0
|
||
|
|
assert "already exists" in result["errors"][0]
|
||
|
|
|
||
|
|
def test_validate_protocol_mapping_modbus_address_conflict(self, config_manager_no_db, sample_mappings):
|
||
|
|
"""Test validating protocol mapping with Modbus address conflict"""
|
||
|
|
mapping1 = sample_mappings[0]
|
||
|
|
config_manager_no_db.add_protocol_mapping(mapping1)
|
||
|
|
|
||
|
|
# Try to validate another mapping with same Modbus address
|
||
|
|
conflict_mapping = ProtocolMapping(
|
||
|
|
id="modbus_tcp_002",
|
||
|
|
station_id="station_002",
|
||
|
|
pump_id="pump_002",
|
||
|
|
protocol_type=ProtocolType.MODBUS_TCP,
|
||
|
|
protocol_address="100", # Same address
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
)
|
||
|
|
|
||
|
|
result = config_manager_no_db.validate_protocol_mapping(conflict_mapping)
|
||
|
|
|
||
|
|
assert result["valid"] == False
|
||
|
|
assert len(result["errors"]) > 0
|
||
|
|
assert "already used" in result["errors"][0]
|
||
|
|
|
||
|
|
def test_validate_protocol_mapping_modbus_invalid_address(self, config_manager_no_db):
|
||
|
|
"""Test validating protocol mapping with invalid Modbus address"""
|
||
|
|
# This test is skipped because Pydantic validation prevents invalid addresses
|
||
|
|
pass
|
||
|
|
|
||
|
|
def test_validate_protocol_mapping_rest_invalid_url(self, config_manager_no_db):
|
||
|
|
"""Test validating protocol mapping with invalid REST URL"""
|
||
|
|
# This test is skipped because Pydantic validation prevents invalid URLs
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
class TestConfigurationManagerDatabaseIntegration:
|
||
|
|
"""Test ConfigurationManager database integration"""
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def sqlite_db_client_load(self):
|
||
|
|
"""Create SQLite database client for loading test"""
|
||
|
|
db_client = FlexibleDatabaseClient('sqlite:///test_config_manager_load.db')
|
||
|
|
return db_client
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def sqlite_db_client_add(self):
|
||
|
|
"""Create SQLite database client for add test"""
|
||
|
|
db_client = FlexibleDatabaseClient('sqlite:///test_config_manager_add.db')
|
||
|
|
return db_client
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def sqlite_db_client_update(self):
|
||
|
|
"""Create SQLite database client for update test"""
|
||
|
|
db_client = FlexibleDatabaseClient('sqlite:///test_config_manager_update.db')
|
||
|
|
return db_client
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def sqlite_db_client_delete(self):
|
||
|
|
"""Create SQLite database client for delete test"""
|
||
|
|
db_client = FlexibleDatabaseClient('sqlite:///test_config_manager_delete.db')
|
||
|
|
return db_client
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_load_mappings_from_database(self, sqlite_db_client_load):
|
||
|
|
"""Test loading mappings from database"""
|
||
|
|
await sqlite_db_client_load.connect()
|
||
|
|
sqlite_db_client_load.create_tables()
|
||
|
|
|
||
|
|
# Add a mapping directly to database
|
||
|
|
query = """
|
||
|
|
INSERT INTO protocol_mappings
|
||
|
|
(mapping_id, station_id, pump_id, protocol_type, protocol_address, data_type, db_source, created_by, enabled)
|
||
|
|
VALUES (:mapping_id, :station_id, :pump_id, :protocol_type, :protocol_address, :data_type, :db_source, :created_by, :enabled)
|
||
|
|
"""
|
||
|
|
params = {
|
||
|
|
'mapping_id': 'db_test_mapping',
|
||
|
|
'station_id': 'station_001',
|
||
|
|
'pump_id': 'pump_001',
|
||
|
|
'protocol_type': 'modbus_tcp',
|
||
|
|
'protocol_address': '100',
|
||
|
|
'data_type': 'setpoint',
|
||
|
|
'db_source': 'frequency_hz',
|
||
|
|
'created_by': 'test',
|
||
|
|
'enabled': True
|
||
|
|
}
|
||
|
|
sqlite_db_client_load.execute(query, params)
|
||
|
|
|
||
|
|
# Create ConfigurationManager that should load from database
|
||
|
|
config_manager = ConfigurationManager(db_client=sqlite_db_client_load)
|
||
|
|
|
||
|
|
# Check that mapping was loaded
|
||
|
|
mappings = config_manager.get_protocol_mappings()
|
||
|
|
assert len(mappings) == 1
|
||
|
|
assert mappings[0].id == 'db_test_mapping'
|
||
|
|
|
||
|
|
await sqlite_db_client_load.disconnect()
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_add_mapping_with_database_persistence(self, sqlite_db_client_add):
|
||
|
|
"""Test that adding mapping persists to database"""
|
||
|
|
await sqlite_db_client_add.connect()
|
||
|
|
sqlite_db_client_add.create_tables()
|
||
|
|
|
||
|
|
config_manager = ConfigurationManager(db_client=sqlite_db_client_add)
|
||
|
|
|
||
|
|
# Add a mapping
|
||
|
|
mapping = ProtocolMapping(
|
||
|
|
id="persistence_test",
|
||
|
|
station_id="station_001",
|
||
|
|
pump_id="pump_001",
|
||
|
|
protocol_type=ProtocolType.MODBUS_TCP,
|
||
|
|
protocol_address="110", # Different address to avoid conflict
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
)
|
||
|
|
result = config_manager.add_protocol_mapping(mapping)
|
||
|
|
assert result == True
|
||
|
|
|
||
|
|
# Create new ConfigurationManager to verify persistence
|
||
|
|
config_manager2 = ConfigurationManager(db_client=sqlite_db_client_add)
|
||
|
|
mappings = config_manager2.get_protocol_mappings()
|
||
|
|
assert len(mappings) == 1
|
||
|
|
assert mappings[0].id == "persistence_test"
|
||
|
|
|
||
|
|
await sqlite_db_client_add.disconnect()
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_update_mapping_with_database_persistence(self, sqlite_db_client_update):
|
||
|
|
"""Test that updating mapping persists to database"""
|
||
|
|
await sqlite_db_client_update.connect()
|
||
|
|
sqlite_db_client_update.create_tables()
|
||
|
|
|
||
|
|
config_manager = ConfigurationManager(db_client=sqlite_db_client_update)
|
||
|
|
|
||
|
|
# Add initial mapping
|
||
|
|
mapping = ProtocolMapping(
|
||
|
|
id="update_test",
|
||
|
|
station_id="station_001",
|
||
|
|
pump_id="pump_001",
|
||
|
|
protocol_type=ProtocolType.MODBUS_TCP,
|
||
|
|
protocol_address="120", # Different address to avoid conflict
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
)
|
||
|
|
config_manager.add_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
# Update the mapping
|
||
|
|
updated_mapping = ProtocolMapping(
|
||
|
|
id="update_test",
|
||
|
|
station_id="station_001",
|
||
|
|
pump_id="pump_001",
|
||
|
|
protocol_type=ProtocolType.MODBUS_TCP,
|
||
|
|
protocol_address="220", # Updated address
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
)
|
||
|
|
result = config_manager.update_protocol_mapping("update_test", updated_mapping)
|
||
|
|
assert result == True
|
||
|
|
|
||
|
|
# Create new ConfigurationManager to verify persistence
|
||
|
|
config_manager2 = ConfigurationManager(db_client=sqlite_db_client_update)
|
||
|
|
mappings = config_manager2.get_protocol_mappings()
|
||
|
|
assert len(mappings) == 1
|
||
|
|
assert mappings[0].protocol_address == "220"
|
||
|
|
|
||
|
|
await sqlite_db_client_update.disconnect()
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_delete_mapping_with_database_persistence(self, sqlite_db_client_delete):
|
||
|
|
"""Test that deleting mapping persists to database"""
|
||
|
|
await sqlite_db_client_delete.connect()
|
||
|
|
sqlite_db_client_delete.create_tables()
|
||
|
|
|
||
|
|
config_manager = ConfigurationManager(db_client=sqlite_db_client_delete)
|
||
|
|
|
||
|
|
# Add a mapping
|
||
|
|
mapping = ProtocolMapping(
|
||
|
|
id="delete_test",
|
||
|
|
station_id="station_001",
|
||
|
|
pump_id="pump_001",
|
||
|
|
protocol_type=ProtocolType.MODBUS_TCP,
|
||
|
|
protocol_address="130", # Different address to avoid conflict
|
||
|
|
data_type="setpoint",
|
||
|
|
db_source="frequency_hz"
|
||
|
|
)
|
||
|
|
config_manager.add_protocol_mapping(mapping)
|
||
|
|
|
||
|
|
# Delete the mapping
|
||
|
|
result = config_manager.delete_protocol_mapping("delete_test")
|
||
|
|
assert result == True
|
||
|
|
|
||
|
|
# Create new ConfigurationManager to verify deletion
|
||
|
|
config_manager2 = ConfigurationManager(db_client=sqlite_db_client_delete)
|
||
|
|
mappings = config_manager2.get_protocol_mappings()
|
||
|
|
assert len(mappings) == 0
|
||
|
|
|
||
|
|
await sqlite_db_client_delete.disconnect()
|