CalejoControl/tests/unit/test_auto_discovery.py

280 lines
10 KiB
Python
Raw Permalink Normal View History

"""
Unit tests for AutoDiscovery class.
"""
import pytest
import pytest_asyncio
from unittest.mock import Mock, patch, AsyncMock
from datetime import datetime, timedelta
from typing import Dict, Any, List
from src.core.auto_discovery import AutoDiscovery
class TestAutoDiscovery:
"""Test cases for AutoDiscovery."""
@pytest_asyncio.fixture
async def auto_discovery(self):
"""Create a test auto-discovery instance."""
mock_db_client = Mock()
discovery = AutoDiscovery(mock_db_client, refresh_interval_minutes=5)
return discovery
@pytest.mark.asyncio
async def test_discover_success(self, auto_discovery):
"""Test successful discovery."""
# Mock database responses
mock_stations = [
{
'station_id': 'STATION_001',
'station_name': 'Test Station 1',
'location': 'Test Location 1',
'latitude': 45.4642035,
'longitude': 9.189982,
'timezone': 'Europe/Rome',
'active': True
}
]
mock_pumps = [
{
'station_id': 'STATION_001',
'pump_id': 'PUMP_001',
'pump_name': 'Test Pump 1',
'pump_type': 'SUBMERSIBLE',
'control_type': 'DIRECT_SPEED',
'manufacturer': 'Test Manufacturer',
'model': 'Test Model',
'rated_power_kw': 50.0,
'min_speed_hz': 20.0,
'max_speed_hz': 50.0,
'default_setpoint_hz': 35.0,
'control_parameters': {'speed_ramp_rate': 5.0},
'active': True
}
]
auto_discovery.db_client.get_pump_stations.return_value = mock_stations
auto_discovery.db_client.get_pumps.return_value = mock_pumps
await auto_discovery.discover()
# Verify stations were discovered
stations = auto_discovery.get_stations()
assert len(stations) == 1
assert 'STATION_001' in stations
assert stations['STATION_001']['station_name'] == 'Test Station 1'
# Verify pumps were discovered
pumps = auto_discovery.get_pumps()
assert len(pumps) == 1
assert pumps[0]['pump_id'] == 'PUMP_001'
# Verify last discovery timestamp was set
assert auto_discovery.last_discovery is not None
@pytest.mark.asyncio
async def test_discover_failure(self, auto_discovery):
"""Test discovery failure."""
auto_discovery.db_client.get_pump_stations.side_effect = Exception("Database error")
with pytest.raises(Exception, match="Database error"):
await auto_discovery.discover()
@pytest.mark.asyncio
async def test_discover_already_running(self, auto_discovery):
"""Test discovery when already running."""
auto_discovery.discovery_running = True
await auto_discovery.discover()
# Should not call database methods
auto_discovery.db_client.get_pump_stations.assert_not_called()
auto_discovery.db_client.get_pumps.assert_not_called()
def test_get_stations(self, auto_discovery):
"""Test getting discovered stations."""
# Set up test data
auto_discovery.pump_stations = {
'STATION_001': {'station_id': 'STATION_001', 'station_name': 'Test Station 1'},
'STATION_002': {'station_id': 'STATION_002', 'station_name': 'Test Station 2'}
}
stations = auto_discovery.get_stations()
assert len(stations) == 2
assert stations['STATION_001']['station_name'] == 'Test Station 1'
assert stations['STATION_002']['station_name'] == 'Test Station 2'
def test_get_pumps_no_filter(self, auto_discovery):
"""Test getting all pumps."""
# Set up test data
auto_discovery.pumps = {
'STATION_001': [
{'station_id': 'STATION_001', 'pump_id': 'PUMP_001', 'pump_name': 'Pump 1'},
{'station_id': 'STATION_001', 'pump_id': 'PUMP_002', 'pump_name': 'Pump 2'}
],
'STATION_002': [
{'station_id': 'STATION_002', 'pump_id': 'PUMP_003', 'pump_name': 'Pump 3'}
]
}
pumps = auto_discovery.get_pumps()
assert len(pumps) == 3
def test_get_pumps_with_station_filter(self, auto_discovery):
"""Test getting pumps for specific station."""
# Set up test data
auto_discovery.pumps = {
'STATION_001': [
{'station_id': 'STATION_001', 'pump_id': 'PUMP_001', 'pump_name': 'Pump 1'},
{'station_id': 'STATION_001', 'pump_id': 'PUMP_002', 'pump_name': 'Pump 2'}
],
'STATION_002': [
{'station_id': 'STATION_002', 'pump_id': 'PUMP_003', 'pump_name': 'Pump 3'}
]
}
pumps = auto_discovery.get_pumps('STATION_001')
assert len(pumps) == 2
assert all(pump['station_id'] == 'STATION_001' for pump in pumps)
def test_get_pump_success(self, auto_discovery):
"""Test getting specific pump."""
# Set up test data
auto_discovery.pumps = {
'STATION_001': [
{'station_id': 'STATION_001', 'pump_id': 'PUMP_001', 'pump_name': 'Pump 1'},
{'station_id': 'STATION_001', 'pump_id': 'PUMP_002', 'pump_name': 'Pump 2'}
]
}
pump = auto_discovery.get_pump('STATION_001', 'PUMP_001')
assert pump is not None
assert pump['pump_name'] == 'Pump 1'
def test_get_pump_not_found(self, auto_discovery):
"""Test getting non-existent pump."""
auto_discovery.pumps = {
'STATION_001': [
{'station_id': 'STATION_001', 'pump_id': 'PUMP_001', 'pump_name': 'Pump 1'}
]
}
pump = auto_discovery.get_pump('STATION_001', 'PUMP_999')
assert pump is None
def test_get_station_success(self, auto_discovery):
"""Test getting specific station."""
auto_discovery.pump_stations = {
'STATION_001': {'station_id': 'STATION_001', 'station_name': 'Test Station 1'}
}
station = auto_discovery.get_station('STATION_001')
assert station is not None
assert station['station_name'] == 'Test Station 1'
def test_get_station_not_found(self, auto_discovery):
"""Test getting non-existent station."""
station = auto_discovery.get_station('STATION_999')
assert station is None
def test_get_discovery_status(self, auto_discovery):
"""Test getting discovery status."""
auto_discovery.last_discovery = datetime(2023, 1, 1, 12, 0, 0)
auto_discovery.pump_stations = {'STATION_001': {}}
auto_discovery.pumps = {'STATION_001': [{}, {}]}
status = auto_discovery.get_discovery_status()
assert status['last_discovery'] == '2023-01-01T12:00:00'
assert status['station_count'] == 1
assert status['pump_count'] == 2
assert status['refresh_interval_minutes'] == 5
assert status['discovery_running'] is False
def test_is_stale_fresh(self, auto_discovery):
"""Test staleness check with fresh data."""
auto_discovery.last_discovery = datetime.now() - timedelta(minutes=30)
assert auto_discovery.is_stale(max_age_minutes=60) is False
def test_is_stale_stale(self, auto_discovery):
"""Test staleness check with stale data."""
auto_discovery.last_discovery = datetime.now() - timedelta(minutes=90)
assert auto_discovery.is_stale(max_age_minutes=60) is True
def test_is_stale_no_discovery(self, auto_discovery):
"""Test staleness check with no discovery."""
auto_discovery.last_discovery = None
assert auto_discovery.is_stale() is True
def test_validate_discovery_valid(self, auto_discovery):
"""Test validation with valid discovery data."""
auto_discovery.pump_stations = {
'STATION_001': {'station_id': 'STATION_001'}
}
auto_discovery.pumps = {
'STATION_001': [
{
'station_id': 'STATION_001',
'pump_id': 'PUMP_001',
'control_type': 'DIRECT_SPEED',
'default_setpoint_hz': 35.0
}
]
}
validation = auto_discovery.validate_discovery()
assert validation['valid'] is True
assert len(validation['issues']) == 0
def test_validate_discovery_invalid(self, auto_discovery):
"""Test validation with invalid discovery data."""
auto_discovery.pump_stations = {
'STATION_001': {'station_id': 'STATION_001'}
}
auto_discovery.pumps = {
'STATION_002': [ # Station not in pump_stations
{
'station_id': 'STATION_002',
'pump_id': 'PUMP_001',
'control_type': None, # Missing control_type
'default_setpoint_hz': None # Missing default_setpoint
}
]
}
validation = auto_discovery.validate_discovery()
assert validation['valid'] is False
assert len(validation['issues']) == 4 # Unknown station + 2 missing fields + station without pumps
@pytest.mark.asyncio
async def test_start_periodic_discovery(self, auto_discovery):
"""Test starting periodic discovery."""
with patch('asyncio.sleep', new_callable=AsyncMock) as mock_sleep:
# Make sleep return immediately to avoid infinite loop
mock_sleep.side_effect = [None, Exception("Break loop")]
with patch.object(auto_discovery, 'discover', new_callable=AsyncMock) as mock_discover:
mock_discover.side_effect = Exception("Break loop")
with pytest.raises(Exception, match="Break loop"):
await auto_discovery.start_periodic_discovery()
# Verify discover was called
mock_discover.assert_called_once()
# Verify sleep was called with correct interval
mock_sleep.assert_called_with(300) # 5 minutes * 60 seconds