280 lines
10 KiB
Python
280 lines
10 KiB
Python
"""
|
|
Unit tests for AutoDiscovery class.
|
|
"""
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from unittest.mock import Mock, patch, AsyncMock
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, List
|
|
|
|
from src.core.auto_discovery import AutoDiscovery
|
|
|
|
|
|
class TestAutoDiscovery:
|
|
"""Test cases for AutoDiscovery."""
|
|
|
|
@pytest_asyncio.fixture
|
|
async def auto_discovery(self):
|
|
"""Create a test auto-discovery instance."""
|
|
mock_db_client = Mock()
|
|
discovery = AutoDiscovery(mock_db_client, refresh_interval_minutes=5)
|
|
return discovery
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_discover_success(self, auto_discovery):
|
|
"""Test successful discovery."""
|
|
# Mock database responses
|
|
mock_stations = [
|
|
{
|
|
'station_id': 'STATION_001',
|
|
'station_name': 'Test Station 1',
|
|
'location': 'Test Location 1',
|
|
'latitude': 45.4642035,
|
|
'longitude': 9.189982,
|
|
'timezone': 'Europe/Rome',
|
|
'active': True
|
|
}
|
|
]
|
|
|
|
mock_pumps = [
|
|
{
|
|
'station_id': 'STATION_001',
|
|
'pump_id': 'PUMP_001',
|
|
'pump_name': 'Test Pump 1',
|
|
'pump_type': 'SUBMERSIBLE',
|
|
'control_type': 'DIRECT_SPEED',
|
|
'manufacturer': 'Test Manufacturer',
|
|
'model': 'Test Model',
|
|
'rated_power_kw': 50.0,
|
|
'min_speed_hz': 20.0,
|
|
'max_speed_hz': 50.0,
|
|
'default_setpoint_hz': 35.0,
|
|
'control_parameters': {'speed_ramp_rate': 5.0},
|
|
'active': True
|
|
}
|
|
]
|
|
|
|
auto_discovery.db_client.get_pump_stations.return_value = mock_stations
|
|
auto_discovery.db_client.get_pumps.return_value = mock_pumps
|
|
|
|
await auto_discovery.discover()
|
|
|
|
# Verify stations were discovered
|
|
stations = auto_discovery.get_stations()
|
|
assert len(stations) == 1
|
|
assert 'STATION_001' in stations
|
|
assert stations['STATION_001']['station_name'] == 'Test Station 1'
|
|
|
|
# Verify pumps were discovered
|
|
pumps = auto_discovery.get_pumps()
|
|
assert len(pumps) == 1
|
|
assert pumps[0]['pump_id'] == 'PUMP_001'
|
|
|
|
# Verify last discovery timestamp was set
|
|
assert auto_discovery.last_discovery is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_discover_failure(self, auto_discovery):
|
|
"""Test discovery failure."""
|
|
auto_discovery.db_client.get_pump_stations.side_effect = Exception("Database error")
|
|
|
|
with pytest.raises(Exception, match="Database error"):
|
|
await auto_discovery.discover()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_discover_already_running(self, auto_discovery):
|
|
"""Test discovery when already running."""
|
|
auto_discovery.discovery_running = True
|
|
|
|
await auto_discovery.discover()
|
|
|
|
# Should not call database methods
|
|
auto_discovery.db_client.get_pump_stations.assert_not_called()
|
|
auto_discovery.db_client.get_pumps.assert_not_called()
|
|
|
|
def test_get_stations(self, auto_discovery):
|
|
"""Test getting discovered stations."""
|
|
# Set up test data
|
|
auto_discovery.pump_stations = {
|
|
'STATION_001': {'station_id': 'STATION_001', 'station_name': 'Test Station 1'},
|
|
'STATION_002': {'station_id': 'STATION_002', 'station_name': 'Test Station 2'}
|
|
}
|
|
|
|
stations = auto_discovery.get_stations()
|
|
|
|
assert len(stations) == 2
|
|
assert stations['STATION_001']['station_name'] == 'Test Station 1'
|
|
assert stations['STATION_002']['station_name'] == 'Test Station 2'
|
|
|
|
def test_get_pumps_no_filter(self, auto_discovery):
|
|
"""Test getting all pumps."""
|
|
# Set up test data
|
|
auto_discovery.pumps = {
|
|
'STATION_001': [
|
|
{'station_id': 'STATION_001', 'pump_id': 'PUMP_001', 'pump_name': 'Pump 1'},
|
|
{'station_id': 'STATION_001', 'pump_id': 'PUMP_002', 'pump_name': 'Pump 2'}
|
|
],
|
|
'STATION_002': [
|
|
{'station_id': 'STATION_002', 'pump_id': 'PUMP_003', 'pump_name': 'Pump 3'}
|
|
]
|
|
}
|
|
|
|
pumps = auto_discovery.get_pumps()
|
|
|
|
assert len(pumps) == 3
|
|
|
|
def test_get_pumps_with_station_filter(self, auto_discovery):
|
|
"""Test getting pumps for specific station."""
|
|
# Set up test data
|
|
auto_discovery.pumps = {
|
|
'STATION_001': [
|
|
{'station_id': 'STATION_001', 'pump_id': 'PUMP_001', 'pump_name': 'Pump 1'},
|
|
{'station_id': 'STATION_001', 'pump_id': 'PUMP_002', 'pump_name': 'Pump 2'}
|
|
],
|
|
'STATION_002': [
|
|
{'station_id': 'STATION_002', 'pump_id': 'PUMP_003', 'pump_name': 'Pump 3'}
|
|
]
|
|
}
|
|
|
|
pumps = auto_discovery.get_pumps('STATION_001')
|
|
|
|
assert len(pumps) == 2
|
|
assert all(pump['station_id'] == 'STATION_001' for pump in pumps)
|
|
|
|
def test_get_pump_success(self, auto_discovery):
|
|
"""Test getting specific pump."""
|
|
# Set up test data
|
|
auto_discovery.pumps = {
|
|
'STATION_001': [
|
|
{'station_id': 'STATION_001', 'pump_id': 'PUMP_001', 'pump_name': 'Pump 1'},
|
|
{'station_id': 'STATION_001', 'pump_id': 'PUMP_002', 'pump_name': 'Pump 2'}
|
|
]
|
|
}
|
|
|
|
pump = auto_discovery.get_pump('STATION_001', 'PUMP_001')
|
|
|
|
assert pump is not None
|
|
assert pump['pump_name'] == 'Pump 1'
|
|
|
|
def test_get_pump_not_found(self, auto_discovery):
|
|
"""Test getting non-existent pump."""
|
|
auto_discovery.pumps = {
|
|
'STATION_001': [
|
|
{'station_id': 'STATION_001', 'pump_id': 'PUMP_001', 'pump_name': 'Pump 1'}
|
|
]
|
|
}
|
|
|
|
pump = auto_discovery.get_pump('STATION_001', 'PUMP_999')
|
|
|
|
assert pump is None
|
|
|
|
def test_get_station_success(self, auto_discovery):
|
|
"""Test getting specific station."""
|
|
auto_discovery.pump_stations = {
|
|
'STATION_001': {'station_id': 'STATION_001', 'station_name': 'Test Station 1'}
|
|
}
|
|
|
|
station = auto_discovery.get_station('STATION_001')
|
|
|
|
assert station is not None
|
|
assert station['station_name'] == 'Test Station 1'
|
|
|
|
def test_get_station_not_found(self, auto_discovery):
|
|
"""Test getting non-existent station."""
|
|
station = auto_discovery.get_station('STATION_999')
|
|
|
|
assert station is None
|
|
|
|
def test_get_discovery_status(self, auto_discovery):
|
|
"""Test getting discovery status."""
|
|
auto_discovery.last_discovery = datetime(2023, 1, 1, 12, 0, 0)
|
|
auto_discovery.pump_stations = {'STATION_001': {}}
|
|
auto_discovery.pumps = {'STATION_001': [{}, {}]}
|
|
|
|
status = auto_discovery.get_discovery_status()
|
|
|
|
assert status['last_discovery'] == '2023-01-01T12:00:00'
|
|
assert status['station_count'] == 1
|
|
assert status['pump_count'] == 2
|
|
assert status['refresh_interval_minutes'] == 5
|
|
assert status['discovery_running'] is False
|
|
|
|
def test_is_stale_fresh(self, auto_discovery):
|
|
"""Test staleness check with fresh data."""
|
|
auto_discovery.last_discovery = datetime.now() - timedelta(minutes=30)
|
|
|
|
assert auto_discovery.is_stale(max_age_minutes=60) is False
|
|
|
|
def test_is_stale_stale(self, auto_discovery):
|
|
"""Test staleness check with stale data."""
|
|
auto_discovery.last_discovery = datetime.now() - timedelta(minutes=90)
|
|
|
|
assert auto_discovery.is_stale(max_age_minutes=60) is True
|
|
|
|
def test_is_stale_no_discovery(self, auto_discovery):
|
|
"""Test staleness check with no discovery."""
|
|
auto_discovery.last_discovery = None
|
|
|
|
assert auto_discovery.is_stale() is True
|
|
|
|
def test_validate_discovery_valid(self, auto_discovery):
|
|
"""Test validation with valid discovery data."""
|
|
auto_discovery.pump_stations = {
|
|
'STATION_001': {'station_id': 'STATION_001'}
|
|
}
|
|
auto_discovery.pumps = {
|
|
'STATION_001': [
|
|
{
|
|
'station_id': 'STATION_001',
|
|
'pump_id': 'PUMP_001',
|
|
'control_type': 'DIRECT_SPEED',
|
|
'default_setpoint_hz': 35.0
|
|
}
|
|
]
|
|
}
|
|
|
|
validation = auto_discovery.validate_discovery()
|
|
|
|
assert validation['valid'] is True
|
|
assert len(validation['issues']) == 0
|
|
|
|
def test_validate_discovery_invalid(self, auto_discovery):
|
|
"""Test validation with invalid discovery data."""
|
|
auto_discovery.pump_stations = {
|
|
'STATION_001': {'station_id': 'STATION_001'}
|
|
}
|
|
auto_discovery.pumps = {
|
|
'STATION_002': [ # Station not in pump_stations
|
|
{
|
|
'station_id': 'STATION_002',
|
|
'pump_id': 'PUMP_001',
|
|
'control_type': None, # Missing control_type
|
|
'default_setpoint_hz': None # Missing default_setpoint
|
|
}
|
|
]
|
|
}
|
|
|
|
validation = auto_discovery.validate_discovery()
|
|
|
|
assert validation['valid'] is False
|
|
assert len(validation['issues']) == 4 # Unknown station + 2 missing fields + station without pumps
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_periodic_discovery(self, auto_discovery):
|
|
"""Test starting periodic discovery."""
|
|
with patch('asyncio.sleep', new_callable=AsyncMock) as mock_sleep:
|
|
# Make sleep return immediately to avoid infinite loop
|
|
mock_sleep.side_effect = [None, Exception("Break loop")]
|
|
|
|
with patch.object(auto_discovery, 'discover', new_callable=AsyncMock) as mock_discover:
|
|
mock_discover.side_effect = Exception("Break loop")
|
|
|
|
with pytest.raises(Exception, match="Break loop"):
|
|
await auto_discovery.start_periodic_discovery()
|
|
|
|
# Verify discover was called
|
|
mock_discover.assert_called_once()
|
|
|
|
# Verify sleep was called with correct interval
|
|
mock_sleep.assert_called_with(300) # 5 minutes * 60 seconds |