183 lines
6.2 KiB
Python
183 lines
6.2 KiB
Python
|
|
"""
|
||
|
|
Unit tests for DatabaseWatchdog.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
from unittest.mock import Mock, AsyncMock, patch
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
|
||
|
|
from src.monitoring.watchdog import DatabaseWatchdog
|
||
|
|
|
||
|
|
|
||
|
|
class TestDatabaseWatchdog:
|
||
|
|
"""Test cases for DatabaseWatchdog."""
|
||
|
|
|
||
|
|
def setup_method(self):
|
||
|
|
"""Set up test fixtures."""
|
||
|
|
self.mock_db_client = Mock()
|
||
|
|
self.mock_db_client.execute = Mock()
|
||
|
|
self.mock_db_client.get_latest_pump_plans = Mock()
|
||
|
|
self.mock_db_client.get_pump = Mock()
|
||
|
|
|
||
|
|
self.watchdog = DatabaseWatchdog(self.mock_db_client, timeout_seconds=300) # 5 minutes for testing
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_start_stop(self):
|
||
|
|
"""Test starting and stopping the watchdog."""
|
||
|
|
# Act
|
||
|
|
await self.watchdog.start()
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert self.watchdog.running is True
|
||
|
|
|
||
|
|
# Act
|
||
|
|
await self.watchdog.stop()
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert self.watchdog.running is False
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_check_updates_fresh_plans(self):
|
||
|
|
"""Test checking updates with fresh plans."""
|
||
|
|
# Arrange
|
||
|
|
recent_time = datetime.now() - timedelta(minutes=1)
|
||
|
|
self.mock_db_client.get_latest_pump_plans.return_value = [
|
||
|
|
{
|
||
|
|
'station_id': 'STATION_001',
|
||
|
|
'pump_id': 'PUMP_001',
|
||
|
|
'plan_updated_at': recent_time
|
||
|
|
}
|
||
|
|
]
|
||
|
|
|
||
|
|
# Act
|
||
|
|
await self.watchdog._check_updates()
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is False
|
||
|
|
assert self.mock_db_client.get_latest_pump_plans.called
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_check_updates_stale_plans(self):
|
||
|
|
"""Test checking updates with stale plans."""
|
||
|
|
# Arrange
|
||
|
|
stale_time = datetime.now() - timedelta(minutes=10) # 10 minutes old
|
||
|
|
self.mock_db_client.get_latest_pump_plans.return_value = [
|
||
|
|
{
|
||
|
|
'station_id': 'STATION_001',
|
||
|
|
'pump_id': 'PUMP_001',
|
||
|
|
'plan_updated_at': stale_time
|
||
|
|
}
|
||
|
|
]
|
||
|
|
self.mock_db_client.get_pump.return_value = {
|
||
|
|
'default_setpoint_hz': 30.0
|
||
|
|
}
|
||
|
|
|
||
|
|
# Act
|
||
|
|
await self.watchdog._check_updates()
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is True
|
||
|
|
assert self.mock_db_client.execute.called # Should record failsafe event
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_check_updates_no_plans(self):
|
||
|
|
"""Test checking updates when no plans exist."""
|
||
|
|
# Arrange
|
||
|
|
self.mock_db_client.get_latest_pump_plans.return_value = []
|
||
|
|
|
||
|
|
# Act
|
||
|
|
await self.watchdog._check_updates()
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert self.mock_db_client.get_latest_pump_plans.called
|
||
|
|
# Should not trigger failsafe immediately
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_activate_failsafe(self):
|
||
|
|
"""Test activating failsafe mode."""
|
||
|
|
# Arrange
|
||
|
|
self.mock_db_client.get_pump.return_value = {
|
||
|
|
'default_setpoint_hz': 30.0
|
||
|
|
}
|
||
|
|
|
||
|
|
# Act
|
||
|
|
await self.watchdog._activate_failsafe('STATION_001', 'PUMP_001', 350)
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is True
|
||
|
|
assert self.mock_db_client.execute.called
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_deactivate_failsafe(self):
|
||
|
|
"""Test deactivating failsafe mode."""
|
||
|
|
# Arrange
|
||
|
|
await self.watchdog._activate_failsafe('STATION_001', 'PUMP_001', 350)
|
||
|
|
|
||
|
|
# Act
|
||
|
|
await self.watchdog._deactivate_failsafe('STATION_001', 'PUMP_001')
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is False
|
||
|
|
assert self.mock_db_client.execute.call_count == 2 # Activation + deactivation
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_failsafe_recovery(self):
|
||
|
|
"""Test failsafe recovery when updates resume."""
|
||
|
|
# Arrange - First check with stale plans (trigger failsafe)
|
||
|
|
stale_time = datetime.now() - timedelta(minutes=10)
|
||
|
|
self.mock_db_client.get_latest_pump_plans.return_value = [
|
||
|
|
{
|
||
|
|
'station_id': 'STATION_001',
|
||
|
|
'pump_id': 'PUMP_001',
|
||
|
|
'plan_updated_at': stale_time
|
||
|
|
}
|
||
|
|
]
|
||
|
|
self.mock_db_client.get_pump.return_value = {'default_setpoint_hz': 30.0}
|
||
|
|
|
||
|
|
await self.watchdog._check_updates()
|
||
|
|
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is True
|
||
|
|
|
||
|
|
# Arrange - Second check with fresh plans (should recover)
|
||
|
|
recent_time = datetime.now() - timedelta(minutes=1)
|
||
|
|
self.mock_db_client.get_latest_pump_plans.return_value = [
|
||
|
|
{
|
||
|
|
'station_id': 'STATION_001',
|
||
|
|
'pump_id': 'PUMP_001',
|
||
|
|
'plan_updated_at': recent_time
|
||
|
|
}
|
||
|
|
]
|
||
|
|
|
||
|
|
# Act
|
||
|
|
await self.watchdog._check_updates()
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is False
|
||
|
|
|
||
|
|
def test_get_last_update_time(self):
|
||
|
|
"""Test getting last update time."""
|
||
|
|
# Arrange
|
||
|
|
test_time = datetime.now()
|
||
|
|
self.watchdog.last_update_times[('STATION_001', 'PUMP_001')] = test_time
|
||
|
|
|
||
|
|
# Act
|
||
|
|
result = self.watchdog.get_last_update_time('STATION_001', 'PUMP_001')
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert result == test_time
|
||
|
|
|
||
|
|
def test_get_status(self):
|
||
|
|
"""Test getting watchdog status."""
|
||
|
|
# Arrange
|
||
|
|
test_time = datetime.now() - timedelta(minutes=2)
|
||
|
|
self.watchdog.last_update_times[('STATION_001', 'PUMP_001')] = test_time
|
||
|
|
self.watchdog.failsafe_active[('STATION_001', 'PUMP_001')] = False
|
||
|
|
|
||
|
|
# Act
|
||
|
|
status = self.watchdog.get_status()
|
||
|
|
|
||
|
|
# Assert
|
||
|
|
assert status['running'] is False # Not started yet
|
||
|
|
assert status['timeout_seconds'] == 300
|
||
|
|
assert status['monitored_pumps'] == 1
|
||
|
|
assert status['failsafe_active_pumps'] == 0
|
||
|
|
assert 'STATION_001_PUMP_001' in status['pump_status']
|