CalejoControl/tests/unit/test_watchdog.py

184 lines
6.2 KiB
Python

"""
Unit tests for DatabaseWatchdog.
"""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime, timedelta
from src.monitoring.watchdog import DatabaseWatchdog
class TestDatabaseWatchdog:
"""Test cases for DatabaseWatchdog."""
def setup_method(self):
"""Set up test fixtures."""
self.mock_db_client = Mock()
self.mock_db_client.execute = Mock()
self.mock_db_client.get_latest_pump_plans = Mock()
self.mock_db_client.get_pump = Mock()
self.mock_alert_manager = Mock()
self.watchdog = DatabaseWatchdog(self.mock_db_client, self.mock_alert_manager, timeout_seconds=300) # 5 minutes for testing
@pytest.mark.asyncio
async def test_start_stop(self):
"""Test starting and stopping the watchdog."""
# Act
await self.watchdog.start()
# Assert
assert self.watchdog.running is True
# Act
await self.watchdog.stop()
# Assert
assert self.watchdog.running is False
@pytest.mark.asyncio
async def test_check_updates_fresh_plans(self):
"""Test checking updates with fresh plans."""
# Arrange
recent_time = datetime.now() - timedelta(minutes=1)
self.mock_db_client.get_latest_pump_plans.return_value = [
{
'station_id': 'STATION_001',
'pump_id': 'PUMP_001',
'plan_updated_at': recent_time
}
]
# Act
await self.watchdog._check_updates()
# Assert
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is False
assert self.mock_db_client.get_latest_pump_plans.called
@pytest.mark.asyncio
async def test_check_updates_stale_plans(self):
"""Test checking updates with stale plans."""
# Arrange
stale_time = datetime.now() - timedelta(minutes=10) # 10 minutes old
self.mock_db_client.get_latest_pump_plans.return_value = [
{
'station_id': 'STATION_001',
'pump_id': 'PUMP_001',
'plan_updated_at': stale_time
}
]
self.mock_db_client.get_pump.return_value = {
'default_setpoint_hz': 30.0
}
# Act
await self.watchdog._check_updates()
# Assert
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is True
assert self.mock_db_client.execute.called # Should record failsafe event
@pytest.mark.asyncio
async def test_check_updates_no_plans(self):
"""Test checking updates when no plans exist."""
# Arrange
self.mock_db_client.get_latest_pump_plans.return_value = []
# Act
await self.watchdog._check_updates()
# Assert
assert self.mock_db_client.get_latest_pump_plans.called
# Should not trigger failsafe immediately
@pytest.mark.asyncio
async def test_activate_failsafe(self):
"""Test activating failsafe mode."""
# Arrange
self.mock_db_client.get_pump.return_value = {
'default_setpoint_hz': 30.0
}
# Act
await self.watchdog._activate_failsafe('STATION_001', 'PUMP_001', 350)
# Assert
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is True
assert self.mock_db_client.execute.called
@pytest.mark.asyncio
async def test_deactivate_failsafe(self):
"""Test deactivating failsafe mode."""
# Arrange
await self.watchdog._activate_failsafe('STATION_001', 'PUMP_001', 350)
# Act
await self.watchdog._deactivate_failsafe('STATION_001', 'PUMP_001')
# Assert
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is False
assert self.mock_db_client.execute.call_count == 2 # Activation + deactivation
@pytest.mark.asyncio
async def test_failsafe_recovery(self):
"""Test failsafe recovery when updates resume."""
# Arrange - First check with stale plans (trigger failsafe)
stale_time = datetime.now() - timedelta(minutes=10)
self.mock_db_client.get_latest_pump_plans.return_value = [
{
'station_id': 'STATION_001',
'pump_id': 'PUMP_001',
'plan_updated_at': stale_time
}
]
self.mock_db_client.get_pump.return_value = {'default_setpoint_hz': 30.0}
await self.watchdog._check_updates()
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is True
# Arrange - Second check with fresh plans (should recover)
recent_time = datetime.now() - timedelta(minutes=1)
self.mock_db_client.get_latest_pump_plans.return_value = [
{
'station_id': 'STATION_001',
'pump_id': 'PUMP_001',
'plan_updated_at': recent_time
}
]
# Act
await self.watchdog._check_updates()
# Assert
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is False
def test_get_last_update_time(self):
"""Test getting last update time."""
# Arrange
test_time = datetime.now()
self.watchdog.last_update_times[('STATION_001', 'PUMP_001')] = test_time
# Act
result = self.watchdog.get_last_update_time('STATION_001', 'PUMP_001')
# Assert
assert result == test_time
def test_get_status(self):
"""Test getting watchdog status."""
# Arrange
test_time = datetime.now() - timedelta(minutes=2)
self.watchdog.last_update_times[('STATION_001', 'PUMP_001')] = test_time
self.watchdog.failsafe_active[('STATION_001', 'PUMP_001')] = False
# Act
status = self.watchdog.get_status()
# Assert
assert status['running'] is False # Not started yet
assert status['timeout_seconds'] == 300
assert status['monitored_pumps'] == 1
assert status['failsafe_active_pumps'] == 0
assert 'STATION_001_PUMP_001' in status['pump_status']