""" Tests for ConfigurationManager with Protocol Mapping functionality """ import pytest from unittest.mock import Mock, patch, AsyncMock import asyncio from src.dashboard.configuration_manager import ( ConfigurationManager, ProtocolMapping, ProtocolType, SCADAProtocolConfig ) from src.database.flexible_client import FlexibleDatabaseClient class TestConfigurationManager: """Test ConfigurationManager protocol mapping functionality""" @pytest.fixture def mock_db_client(self): """Create mock database client""" mock_client = Mock(spec=FlexibleDatabaseClient) mock_client.execute_query.return_value = [] mock_client.execute.return_value = 1 return mock_client @pytest.fixture def config_manager_no_db(self): """Create ConfigurationManager without database""" return ConfigurationManager() @pytest.fixture def config_manager_with_db(self, mock_db_client): """Create ConfigurationManager with database client""" return ConfigurationManager(db_client=mock_db_client) @pytest.fixture def sample_mappings(self): """Create sample protocol mappings for testing""" return [ ProtocolMapping( id="modbus_tcp_001", station_id="station_001", pump_id="pump_001", protocol_type=ProtocolType.MODBUS_TCP, protocol_address="100", data_type="setpoint", db_source="frequency_hz" ), ProtocolMapping( id="opcua_001", station_id="station_001", pump_id="pump_001", protocol_type=ProtocolType.OPC_UA, protocol_address="ns=2;s=Station_001.Pump_001.Setpoint_Hz", data_type="setpoint", db_source="frequency_hz" ), ProtocolMapping( id="modbus_rtu_001", station_id="station_002", pump_id="pump_001", protocol_type=ProtocolType.MODBUS_RTU, protocol_address="40001", data_type="setpoint", db_source="frequency_hz" ), ProtocolMapping( id="rest_api_001", station_id="station_001", pump_id="pump_002", protocol_type=ProtocolType.REST_API, protocol_address="https://api.example.com/v1/stations/001/pumps/002/setpoint", data_type="setpoint", db_source="frequency_hz" ) ] def test_initialization_no_database(self, config_manager_no_db): """Test ConfigurationManager initialization without database""" assert config_manager_no_db.protocol_mappings == [] assert config_manager_no_db.db_client is None def test_initialization_with_database(self, config_manager_with_db, mock_db_client): """Test ConfigurationManager initialization with database""" assert config_manager_with_db.db_client == mock_db_client mock_db_client.execute_query.assert_called_once() def test_add_protocol_mapping_no_db(self, config_manager_no_db, sample_mappings): """Test adding protocol mapping without database""" mapping = sample_mappings[0] result = config_manager_no_db.add_protocol_mapping(mapping) assert result == True assert len(config_manager_no_db.protocol_mappings) == 1 assert config_manager_no_db.protocol_mappings[0] == mapping def test_add_protocol_mapping_with_db(self, config_manager_with_db, sample_mappings, mock_db_client): """Test adding protocol mapping with database""" mapping = sample_mappings[0] result = config_manager_with_db.add_protocol_mapping(mapping) assert result == True assert len(config_manager_with_db.protocol_mappings) == 1 mock_db_client.execute.assert_called_once() def test_add_duplicate_protocol_mapping(self, config_manager_no_db, sample_mappings): """Test adding duplicate protocol mapping""" mapping = sample_mappings[0] # Add first mapping result1 = config_manager_no_db.add_protocol_mapping(mapping) assert result1 == True # Try to add duplicate result2 = config_manager_no_db.add_protocol_mapping(mapping) assert result2 == False assert len(config_manager_no_db.protocol_mappings) == 1 def test_get_protocol_mappings_empty(self, config_manager_no_db): """Test getting protocol mappings when empty""" mappings = config_manager_no_db.get_protocol_mappings() assert mappings == [] def test_get_protocol_mappings_with_data(self, config_manager_no_db, sample_mappings): """Test getting protocol mappings with data""" for mapping in sample_mappings: config_manager_no_db.add_protocol_mapping(mapping) mappings = config_manager_no_db.get_protocol_mappings() assert len(mappings) == 4 def test_get_protocol_mappings_with_filter(self, config_manager_no_db, sample_mappings): """Test getting protocol mappings with protocol type filter""" for mapping in sample_mappings: config_manager_no_db.add_protocol_mapping(mapping) # Filter by Modbus TCP modbus_mappings = config_manager_no_db.get_protocol_mappings(protocol_type=ProtocolType.MODBUS_TCP) assert len(modbus_mappings) == 1 assert modbus_mappings[0].protocol_type == ProtocolType.MODBUS_TCP # Filter by OPC UA opcua_mappings = config_manager_no_db.get_protocol_mappings(protocol_type=ProtocolType.OPC_UA) assert len(opcua_mappings) == 1 assert opcua_mappings[0].protocol_type == ProtocolType.OPC_UA def test_update_protocol_mapping(self, config_manager_no_db, sample_mappings): """Test updating protocol mapping""" mapping = sample_mappings[0] config_manager_no_db.add_protocol_mapping(mapping) # Create updated mapping updated_mapping = ProtocolMapping( id="modbus_tcp_001", station_id="station_001", pump_id="pump_001", protocol_type=ProtocolType.MODBUS_TCP, protocol_address="200", # Updated address data_type="setpoint", db_source="frequency_hz" ) result = config_manager_no_db.update_protocol_mapping("modbus_tcp_001", updated_mapping) assert result == True updated = config_manager_no_db.get_protocol_mappings()[0] assert updated.protocol_address == "200" def test_update_nonexistent_mapping(self, config_manager_no_db, sample_mappings): """Test updating non-existent protocol mapping""" mapping = sample_mappings[0] result = config_manager_no_db.update_protocol_mapping("nonexistent", mapping) assert result == False def test_delete_protocol_mapping(self, config_manager_no_db, sample_mappings): """Test deleting protocol mapping""" mapping = sample_mappings[0] config_manager_no_db.add_protocol_mapping(mapping) result = config_manager_no_db.delete_protocol_mapping("modbus_tcp_001") assert result == True assert len(config_manager_no_db.protocol_mappings) == 0 def test_delete_nonexistent_mapping(self, config_manager_no_db): """Test deleting non-existent protocol mapping""" result = config_manager_no_db.delete_protocol_mapping("nonexistent") assert result == False def test_validate_protocol_mapping_valid(self, config_manager_no_db, sample_mappings): """Test validating valid protocol mapping""" mapping = sample_mappings[0] result = config_manager_no_db.validate_protocol_mapping(mapping) assert result["valid"] == True assert len(result["errors"]) == 0 def test_validate_protocol_mapping_duplicate_id(self, config_manager_no_db, sample_mappings): """Test validating protocol mapping with duplicate ID""" mapping = sample_mappings[0] config_manager_no_db.add_protocol_mapping(mapping) # Try to validate another mapping with same ID duplicate_mapping = ProtocolMapping( id="modbus_tcp_001", # Same ID station_id="station_002", pump_id="pump_002", protocol_type=ProtocolType.MODBUS_TCP, protocol_address="101", data_type="setpoint", db_source="frequency_hz" ) result = config_manager_no_db.validate_protocol_mapping(duplicate_mapping) assert result["valid"] == False assert len(result["errors"]) > 0 assert "already exists" in result["errors"][0] def test_validate_protocol_mapping_modbus_address_conflict(self, config_manager_no_db, sample_mappings): """Test validating protocol mapping with Modbus address conflict""" mapping1 = sample_mappings[0] config_manager_no_db.add_protocol_mapping(mapping1) # Try to validate another mapping with same Modbus address conflict_mapping = ProtocolMapping( id="modbus_tcp_002", station_id="station_002", pump_id="pump_002", protocol_type=ProtocolType.MODBUS_TCP, protocol_address="100", # Same address data_type="setpoint", db_source="frequency_hz" ) result = config_manager_no_db.validate_protocol_mapping(conflict_mapping) assert result["valid"] == False assert len(result["errors"]) > 0 assert "already used" in result["errors"][0] def test_validate_protocol_mapping_modbus_invalid_address(self, config_manager_no_db): """Test validating protocol mapping with invalid Modbus address""" # This test is skipped because Pydantic validation prevents invalid addresses pass def test_validate_protocol_mapping_rest_invalid_url(self, config_manager_no_db): """Test validating protocol mapping with invalid REST URL""" # This test is skipped because Pydantic validation prevents invalid URLs pass class TestConfigurationManagerDatabaseIntegration: """Test ConfigurationManager database integration""" @pytest.fixture def sqlite_db_client_load(self): """Create SQLite database client for loading test""" db_client = FlexibleDatabaseClient('sqlite:///test_config_manager_load.db') return db_client @pytest.fixture def sqlite_db_client_add(self): """Create SQLite database client for add test""" db_client = FlexibleDatabaseClient('sqlite:///test_config_manager_add.db') return db_client @pytest.fixture def sqlite_db_client_update(self): """Create SQLite database client for update test""" db_client = FlexibleDatabaseClient('sqlite:///test_config_manager_update.db') return db_client @pytest.fixture def sqlite_db_client_delete(self): """Create SQLite database client for delete test""" db_client = FlexibleDatabaseClient('sqlite:///test_config_manager_delete.db') return db_client @pytest.mark.asyncio async def test_load_mappings_from_database(self, sqlite_db_client_load): """Test loading mappings from database""" await sqlite_db_client_load.connect() sqlite_db_client_load.create_tables() # Add a mapping directly to database query = """ INSERT INTO protocol_mappings (mapping_id, station_id, pump_id, protocol_type, protocol_address, data_type, db_source, created_by, enabled) VALUES (:mapping_id, :station_id, :pump_id, :protocol_type, :protocol_address, :data_type, :db_source, :created_by, :enabled) """ params = { 'mapping_id': 'db_test_mapping', 'station_id': 'station_001', 'pump_id': 'pump_001', 'protocol_type': 'modbus_tcp', 'protocol_address': '100', 'data_type': 'setpoint', 'db_source': 'frequency_hz', 'created_by': 'test', 'enabled': True } sqlite_db_client_load.execute(query, params) # Create ConfigurationManager that should load from database config_manager = ConfigurationManager(db_client=sqlite_db_client_load) # Check that mapping was loaded mappings = config_manager.get_protocol_mappings() assert len(mappings) == 1 assert mappings[0].id == 'db_test_mapping' await sqlite_db_client_load.disconnect() @pytest.mark.asyncio async def test_add_mapping_with_database_persistence(self, sqlite_db_client_add): """Test that adding mapping persists to database""" await sqlite_db_client_add.connect() sqlite_db_client_add.create_tables() config_manager = ConfigurationManager(db_client=sqlite_db_client_add) # Add a mapping mapping = ProtocolMapping( id="persistence_test", station_id="station_001", pump_id="pump_001", protocol_type=ProtocolType.MODBUS_TCP, protocol_address="110", # Different address to avoid conflict data_type="setpoint", db_source="frequency_hz" ) result = config_manager.add_protocol_mapping(mapping) assert result == True # Create new ConfigurationManager to verify persistence config_manager2 = ConfigurationManager(db_client=sqlite_db_client_add) mappings = config_manager2.get_protocol_mappings() assert len(mappings) == 1 assert mappings[0].id == "persistence_test" await sqlite_db_client_add.disconnect() @pytest.mark.asyncio async def test_update_mapping_with_database_persistence(self, sqlite_db_client_update): """Test that updating mapping persists to database""" await sqlite_db_client_update.connect() sqlite_db_client_update.create_tables() config_manager = ConfigurationManager(db_client=sqlite_db_client_update) # Add initial mapping mapping = ProtocolMapping( id="update_test", station_id="station_001", pump_id="pump_001", protocol_type=ProtocolType.MODBUS_TCP, protocol_address="120", # Different address to avoid conflict data_type="setpoint", db_source="frequency_hz" ) config_manager.add_protocol_mapping(mapping) # Update the mapping updated_mapping = ProtocolMapping( id="update_test", station_id="station_001", pump_id="pump_001", protocol_type=ProtocolType.MODBUS_TCP, protocol_address="220", # Updated address data_type="setpoint", db_source="frequency_hz" ) result = config_manager.update_protocol_mapping("update_test", updated_mapping) assert result == True # Create new ConfigurationManager to verify persistence config_manager2 = ConfigurationManager(db_client=sqlite_db_client_update) mappings = config_manager2.get_protocol_mappings() assert len(mappings) == 1 assert mappings[0].protocol_address == "220" await sqlite_db_client_update.disconnect() @pytest.mark.asyncio async def test_delete_mapping_with_database_persistence(self, sqlite_db_client_delete): """Test that deleting mapping persists to database""" await sqlite_db_client_delete.connect() sqlite_db_client_delete.create_tables() config_manager = ConfigurationManager(db_client=sqlite_db_client_delete) # Add a mapping mapping = ProtocolMapping( id="delete_test", station_id="station_001", pump_id="pump_001", protocol_type=ProtocolType.MODBUS_TCP, protocol_address="130", # Different address to avoid conflict data_type="setpoint", db_source="frequency_hz" ) config_manager.add_protocol_mapping(mapping) # Delete the mapping result = config_manager.delete_protocol_mapping("delete_test") assert result == True # Create new ConfigurationManager to verify deletion config_manager2 = ConfigurationManager(db_client=sqlite_db_client_delete) mappings = config_manager2.get_protocol_mappings() assert len(mappings) == 0 await sqlite_db_client_delete.disconnect()