diff --git a/src/core/emergency_stop.py b/src/core/emergency_stop.py new file mode 100644 index 0000000..30f68e7 --- /dev/null +++ b/src/core/emergency_stop.py @@ -0,0 +1,327 @@ +""" +Emergency Stop Manager for Calejo Control Adapter. + +Implements system-wide and targeted emergency stop functionality +with manual clearance and audit trail. +""" + +from typing import Dict, List, Optional, Set, Any +from datetime import datetime +import structlog + +from src.database.client import DatabaseClient + +logger = structlog.get_logger() + + +class EmergencyStopManager: + """ + Manages emergency stop functionality for pumps and stations. + + Features: + - Single pump emergency stop + - Station-wide emergency stop + - System-wide emergency stop + - Manual clearance with audit trail + - Integration with all protocol interfaces + """ + + def __init__(self, db_client: DatabaseClient): + self.db_client = db_client + self.emergency_stop_pumps: Set[tuple] = set() # (station_id, pump_id) + self.emergency_stop_stations: Set[str] = set() + self.system_emergency_stop = False + self.emergency_stop_history: List[Dict] = [] + + def emergency_stop_pump(self, station_id: str, pump_id: str, reason: str = "Manual stop", user_id: str = "system") -> bool: + """ + Emergency stop a specific pump. + + Args: + station_id: Station identifier + pump_id: Pump identifier + reason: Reason for emergency stop + user_id: User who initiated the stop + + Returns: + True if stop was successful + """ + try: + key = (station_id, pump_id) + self.emergency_stop_pumps.add(key) + + # Record emergency stop event + self._record_emergency_stop_event(station_id, pump_id, 'PUMP', reason, user_id) + + logger.critical( + "emergency_stop_pump_activated", + station_id=station_id, + pump_id=pump_id, + reason=reason, + user_id=user_id + ) + + return True + + except Exception as e: + logger.error( + "emergency_stop_pump_failed", + station_id=station_id, + pump_id=pump_id, + error=str(e) + ) + return False + + def emergency_stop_station(self, station_id: str, reason: str = "Manual stop", user_id: str = "system") -> bool: + """ + Emergency stop all pumps in a station. + + Args: + station_id: Station identifier + reason: Reason for emergency stop + user_id: User who initiated the stop + + Returns: + True if stop was successful + """ + try: + self.emergency_stop_stations.add(station_id) + + # Record emergency stop event + self._record_emergency_stop_event(station_id, None, 'STATION', reason, user_id) + + logger.critical( + "emergency_stop_station_activated", + station_id=station_id, + reason=reason, + user_id=user_id + ) + + return True + + except Exception as e: + logger.error( + "emergency_stop_station_failed", + station_id=station_id, + error=str(e) + ) + return False + + def emergency_stop_system(self, reason: str = "Manual stop", user_id: str = "system") -> bool: + """ + Emergency stop all pumps in the system. + + Args: + reason: Reason for emergency stop + user_id: User who initiated the stop + + Returns: + True if stop was successful + """ + try: + self.system_emergency_stop = True + + # Record emergency stop event + self._record_emergency_stop_event(None, None, 'SYSTEM', reason, user_id) + + logger.critical( + "emergency_stop_system_activated", + reason=reason, + user_id=user_id + ) + + return True + + except Exception as e: + logger.error("emergency_stop_system_failed", error=str(e)) + return False + + def clear_emergency_stop_pump(self, station_id: str, pump_id: str, reason: str = "Manual clearance", user_id: str = "system") -> bool: + """ + Clear emergency stop for a specific pump. + + Args: + station_id: Station identifier + pump_id: Pump identifier + reason: Reason for clearance + user_id: User who cleared the stop + + Returns: + True if clearance was successful + """ + try: + key = (station_id, pump_id) + if key in self.emergency_stop_pumps: + self.emergency_stop_pumps.remove(key) + + # Record clearance event + self._record_emergency_stop_clearance(station_id, pump_id, 'PUMP', reason, user_id) + + logger.info( + "emergency_stop_pump_cleared", + station_id=station_id, + pump_id=pump_id, + reason=reason, + user_id=user_id + ) + + return True + else: + logger.warning( + "emergency_stop_pump_not_active", + station_id=station_id, + pump_id=pump_id + ) + return False + + except Exception as e: + logger.error( + "emergency_stop_pump_clearance_failed", + station_id=station_id, + pump_id=pump_id, + error=str(e) + ) + return False + + def clear_emergency_stop_station(self, station_id: str, reason: str = "Manual clearance", user_id: str = "system") -> bool: + """ + Clear emergency stop for all pumps in a station. + + Args: + station_id: Station identifier + reason: Reason for clearance + user_id: User who cleared the stop + + Returns: + True if clearance was successful + """ + try: + if station_id in self.emergency_stop_stations: + self.emergency_stop_stations.remove(station_id) + + # Record clearance event + self._record_emergency_stop_clearance(station_id, None, 'STATION', reason, user_id) + + logger.info( + "emergency_stop_station_cleared", + station_id=station_id, + reason=reason, + user_id=user_id + ) + + return True + else: + logger.warning( + "emergency_stop_station_not_active", + station_id=station_id + ) + return False + + except Exception as e: + logger.error( + "emergency_stop_station_clearance_failed", + station_id=station_id, + error=str(e) + ) + return False + + def clear_emergency_stop_system(self, reason: str = "Manual clearance", user_id: str = "system") -> bool: + """ + Clear system-wide emergency stop. + + Args: + reason: Reason for clearance + user_id: User who cleared the stop + + Returns: + True if clearance was successful + """ + try: + if self.system_emergency_stop: + self.system_emergency_stop = False + + # Record clearance event + self._record_emergency_stop_clearance(None, None, 'SYSTEM', reason, user_id) + + logger.info( + "emergency_stop_system_cleared", + reason=reason, + user_id=user_id + ) + + return True + else: + logger.warning("emergency_stop_system_not_active") + return False + + except Exception as e: + logger.error("emergency_stop_system_clearance_failed", error=str(e)) + return False + + def is_emergency_stop_active(self, station_id: str, pump_id: str) -> bool: + """ + Check if emergency stop is active for a pump. + + Args: + station_id: Station identifier + pump_id: Pump identifier + + Returns: + True if emergency stop is active + """ + # Check system-wide stop + if self.system_emergency_stop: + return True + + # Check station-wide stop + if station_id in self.emergency_stop_stations: + return True + + # Check pump-specific stop + key = (station_id, pump_id) + if key in self.emergency_stop_pumps: + return True + + return False + + def get_emergency_stop_status(self) -> Dict[str, Any]: + """Get current emergency stop status.""" + return { + 'system_emergency_stop': self.system_emergency_stop, + 'emergency_stop_stations': list(self.emergency_stop_stations), + 'emergency_stop_pumps': [ + {'station_id': station_id, 'pump_id': pump_id} + for station_id, pump_id in self.emergency_stop_pumps + ], + 'total_active_stops': ( + (1 if self.system_emergency_stop else 0) + + len(self.emergency_stop_stations) + + len(self.emergency_stop_pumps) + ) + } + + def _record_emergency_stop_event(self, station_id: Optional[str], pump_id: Optional[str], + stop_type: str, reason: str, user_id: str): + """Record emergency stop event in database.""" + try: + query = """ + INSERT INTO emergency_stop_events + (station_id, pump_id, stop_type, reason, user_id, timestamp) + VALUES (%s, %s, %s, %s, %s, NOW()) + """ + self.db_client.execute(query, (station_id, pump_id, stop_type, reason, user_id)) + except Exception as e: + logger.error("failed_to_record_emergency_stop_event", error=str(e)) + + def _record_emergency_stop_clearance(self, station_id: Optional[str], pump_id: Optional[str], + stop_type: str, reason: str, user_id: str): + """Record emergency stop clearance event in database.""" + try: + query = """ + INSERT INTO emergency_stop_events + (station_id, pump_id, stop_type, event_type, reason, user_id, timestamp) + VALUES (%s, %s, %s, 'CLEARED', %s, %s, NOW()) + """ + self.db_client.execute(query, (station_id, pump_id, stop_type, reason, user_id)) + except Exception as e: + logger.error("failed_to_record_emergency_stop_clearance", error=str(e)) \ No newline at end of file diff --git a/src/core/safety.py b/src/core/safety.py index b25e86b..5b049f7 100644 --- a/src/core/safety.py +++ b/src/core/safety.py @@ -10,6 +10,7 @@ from dataclasses import dataclass import structlog from src.database.client import DatabaseClient +from src.core.emergency_stop import EmergencyStopManager logger = structlog.get_logger() @@ -38,8 +39,9 @@ class SafetyLimitEnforcer: - Layer 3: Optimization Constraints (Calejo Optimize) - 25-45 Hz """ - def __init__(self, db_client: DatabaseClient): + def __init__(self, db_client: DatabaseClient, emergency_stop_manager: EmergencyStopManager = None): self.db_client = db_client + self.emergency_stop_manager = emergency_stop_manager self.safety_limits_cache: Dict[Tuple[str, str], SafetyLimits] = {} self.previous_setpoints: Dict[Tuple[str, str], float] = {} @@ -86,6 +88,12 @@ class SafetyLimitEnforcer: violations = [] enforced_setpoint = setpoint + # Check emergency stop first (highest priority) + if self.emergency_stop_manager and self.emergency_stop_manager.is_emergency_stop_active(station_id, pump_id): + violations.append("EMERGENCY_STOP_ACTIVE") + # Emergency stop overrides everything - set to 0 Hz + return (0.0, violations) + # Get safety limits key = (station_id, pump_id) limits = self.safety_limits_cache.get(key) diff --git a/src/main_phase1.py b/src/main_phase1.py index 0dafdb0..4956609 100644 --- a/src/main_phase1.py +++ b/src/main_phase1.py @@ -16,6 +16,7 @@ import sys from src.database.client import DatabaseClient from src.core.auto_discovery import AutoDiscovery from src.core.safety import SafetyLimitEnforcer +from src.core.emergency_stop import EmergencyStopManager from src.core.optimization_manager import OptimizationPlanManager from src.core.logging import setup_logging, AuditLogger from config.settings import settings @@ -38,7 +39,8 @@ class CalejoControlAdapterPhase1: db_client=self.db_client, refresh_interval_minutes=settings.auto_discovery_refresh_minutes ) - self.safety_enforcer = SafetyLimitEnforcer(self.db_client) + self.emergency_stop_manager = EmergencyStopManager(self.db_client) + self.safety_enforcer = SafetyLimitEnforcer(self.db_client, self.emergency_stop_manager) self.optimization_manager = OptimizationPlanManager( db_client=self.db_client, refresh_interval_seconds=settings.optimization_refresh_seconds diff --git a/src/monitoring/alerts.py b/src/monitoring/alerts.py new file mode 100644 index 0000000..925ea04 --- /dev/null +++ b/src/monitoring/alerts.py @@ -0,0 +1,286 @@ +""" +Alert Manager for Calejo Control Adapter. + +Manages multi-channel alert delivery including email, SMS, webhook, +and SCADA alarm integration for safety events and system issues. +""" + +import asyncio +import smtplib +import json +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from typing import Dict, List, Optional, Any +import structlog +import aiohttp + +from config.settings import Settings + +logger = structlog.get_logger() + + +class AlertManager: + """ + Manages multi-channel alert delivery for safety events and system issues. + + Supports: + - Email alerts with configurable recipients + - SMS alerts for critical events + - Webhook integration for external systems + - SCADA HMI alarm integration via OPC UA + """ + + def __init__(self, settings: Settings): + self.settings = settings + self.alert_history: List[Dict] = [] + self.max_history_size = 1000 + + async def send_alert( + self, + alert_type: str, + severity: str, + message: str, + context: Optional[Dict[str, Any]] = None, + station_id: Optional[str] = None, + pump_id: Optional[str] = None + ) -> bool: + """ + Send alert through all configured channels. + + Args: + alert_type: Type of alert (e.g., 'SAFETY_VIOLATION', 'FAILSAFE_ACTIVATED') + severity: Severity level ('INFO', 'WARNING', 'ERROR', 'CRITICAL') + message: Human-readable alert message + context: Additional context data + station_id: Optional station identifier + pump_id: Optional pump identifier + + Returns: + True if alert was sent successfully through at least one channel + """ + context = context or {} + + # Create alert data + alert_data = { + 'alert_type': alert_type, + 'severity': severity, + 'message': message, + 'context': context, + 'station_id': station_id, + 'pump_id': pump_id, + 'timestamp': asyncio.get_event_loop().time(), + 'app_name': self.settings.app_name, + 'app_version': self.settings.app_version + } + + # Store in history + self._store_alert_history(alert_data) + + # Send through all configured channels + results = await asyncio.gather( + self._send_email_alert(alert_data), + self._send_sms_alert(alert_data), + self._send_webhook_alert(alert_data), + self._send_scada_alert(alert_data), + return_exceptions=True + ) + + # Log alert delivery + successful_channels = [ + channel for channel, result in zip(['email', 'sms', 'webhook', 'scada'], results) + if result is True + ] + + logger.info( + "alert_sent", + alert_type=alert_type, + severity=severity, + station_id=station_id, + pump_id=pump_id, + successful_channels=successful_channels, + total_channels=len(results) + ) + + return len(successful_channels) > 0 + + async def _send_email_alert(self, alert_data: Dict) -> bool: + """Send alert via email.""" + if not self.settings.alert_email_enabled: + return False + + try: + # Create email message + msg = MIMEMultipart() + msg['From'] = self.settings.alert_email_from + msg['To'] = ', '.join(self.settings.alert_email_recipients) + msg['Subject'] = f"[{alert_data['severity']}] {self.settings.app_name} - {alert_data['alert_type']}" + + # Create email body + body = self._format_email_body(alert_data) + msg.attach(MIMEText(body, 'plain')) + + # Send email + with smtplib.SMTP(self.settings.smtp_host, self.settings.smtp_port) as server: + if self.settings.smtp_use_tls: + server.starttls() + + if self.settings.smtp_username and self.settings.smtp_password: + server.login(self.settings.smtp_username, self.settings.smtp_password) + + server.send_message(msg) + + return True + + except Exception as e: + logger.error("email_alert_failed", error=str(e)) + return False + + async def _send_sms_alert(self, alert_data: Dict) -> bool: + """Send alert via SMS.""" + if not self.settings.alert_sms_enabled: + return False + + # Only send SMS for critical alerts + if alert_data['severity'] not in ['ERROR', 'CRITICAL']: + return False + + try: + # For now, log SMS alert (Twilio integration would go here) + logger.info( + "sms_alert_ready", + alert_type=alert_data['alert_type'], + severity=alert_data['severity'], + recipients=self.settings.alert_sms_recipients, + message=alert_data['message'] + ) + + # TODO: Implement actual SMS delivery via Twilio or similar service + # This would require actual API credentials and billing setup + + return True + + except Exception as e: + logger.error("sms_alert_failed", error=str(e)) + return False + + async def _send_webhook_alert(self, alert_data: Dict) -> bool: + """Send alert via webhook.""" + if not self.settings.alert_webhook_enabled: + return False + + try: + async with aiohttp.ClientSession() as session: + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {self.settings.alert_webhook_token}' + } + + async with session.post( + self.settings.alert_webhook_url, + json=alert_data, + headers=headers, + timeout=30 + ) as response: + + if response.status == 200: + return True + else: + logger.error( + "webhook_alert_failed", + status_code=response.status, + response_text=await response.text() + ) + return False + + except Exception as e: + logger.error("webhook_alert_failed", error=str(e)) + return False + + async def _send_scada_alert(self, alert_data: Dict) -> bool: + """Send alert to SCADA system via OPC UA.""" + if not self.settings.alert_scada_enabled: + return False + + try: + # For now, log SCADA alert (OPC UA integration would go here) + logger.info( + "scada_alert_ready", + alert_type=alert_data['alert_type'], + severity=alert_data['severity'], + station_id=alert_data.get('station_id'), + pump_id=alert_data.get('pump_id') + ) + + # TODO: Implement actual OPC UA alarm integration + # This will be implemented in Phase 4 with the OPC UA server + + return True + + except Exception as e: + logger.error("scada_alert_failed", error=str(e)) + return False + + def _format_email_body(self, alert_data: Dict) -> str: + """Format email body for alert.""" + body = f""" +Calejo Control Adapter Alert +============================ + +Alert Type: {alert_data['alert_type']} +Severity: {alert_data['severity']} +Timestamp: {alert_data['timestamp']} + +Message: {alert_data['message']} + +""" + + if alert_data.get('station_id'): + body += f"Station ID: {alert_data['station_id']}\n" + + if alert_data.get('pump_id'): + body += f"Pump ID: {alert_data['pump_id']}\n" + + if alert_data.get('context'): + body += "\nContext:\n" + for key, value in alert_data['context'].items(): + body += f" {key}: {value}\n" + + body += f"\nApplication: {alert_data['app_name']} v{alert_data['app_version']}\n" + + return body + + def _store_alert_history(self, alert_data: Dict): + """Store alert in history with size limit.""" + self.alert_history.append(alert_data) + + # Maintain history size limit + if len(self.alert_history) > self.max_history_size: + self.alert_history = self.alert_history[-self.max_history_size:] + + def get_alert_history(self, limit: int = 50) -> List[Dict]: + """Get recent alert history.""" + return self.alert_history[-limit:] + + def get_alert_stats(self) -> Dict[str, Any]: + """Get alert statistics.""" + severity_counts = {} + type_counts = {} + + for alert in self.alert_history: + severity = alert['severity'] + alert_type = alert['alert_type'] + + severity_counts[severity] = severity_counts.get(severity, 0) + 1 + type_counts[alert_type] = type_counts.get(alert_type, 0) + 1 + + return { + 'total_alerts': len(self.alert_history), + 'severity_counts': severity_counts, + 'type_counts': type_counts, + 'channels_enabled': { + 'email': self.settings.alert_email_enabled, + 'sms': self.settings.alert_sms_enabled, + 'webhook': self.settings.alert_webhook_enabled, + 'scada': self.settings.alert_scada_enabled + } + } \ No newline at end of file diff --git a/src/monitoring/watchdog.py b/src/monitoring/watchdog.py new file mode 100644 index 0000000..9a08977 --- /dev/null +++ b/src/monitoring/watchdog.py @@ -0,0 +1,228 @@ +""" +Database Watchdog for Calejo Control Adapter. + +Monitors database updates and triggers failsafe mode when updates stop, +preventing stale optimization plans from controlling pumps indefinitely. +""" + +import asyncio +import structlog +from datetime import datetime, timedelta +from typing import Dict, Optional, Any + +from src.database.client import DatabaseClient + +logger = structlog.get_logger() + + +class DatabaseWatchdog: + """ + Monitors database updates and triggers failsafe mode when updates stop. + + Safety Feature: If optimization system stops updating plans for more than + 20 minutes, automatically revert to default safe setpoints to prevent + pumps from running on stale optimization plans. + """ + + def __init__(self, db_client: DatabaseClient, timeout_seconds: int = 1200): # 20 minutes default + self.db_client = db_client + self.timeout_seconds = timeout_seconds + self.last_update_times: Dict[tuple, datetime] = {} # (station_id, pump_id) -> last_update + self.failsafe_active: Dict[tuple, bool] = {} + self.running = False + self.check_interval_seconds = 60 # Check every minute + + async def start(self): + """Start the watchdog monitoring.""" + self.running = True + logger.info("database_watchdog_started", timeout_seconds=self.timeout_seconds) + + # Initial check + await self._check_updates() + + # Start periodic monitoring + asyncio.create_task(self._monitor_loop()) + + async def stop(self): + """Stop the watchdog monitoring.""" + self.running = False + logger.info("database_watchdog_stopped") + + async def _monitor_loop(self): + """Main monitoring loop.""" + while self.running: + try: + await asyncio.sleep(self.check_interval_seconds) + await self._check_updates() + except Exception as e: + logger.error("watchdog_monitor_loop_error", error=str(e)) + + async def _check_updates(self): + """Check for recent updates and trigger failsafe if needed.""" + try: + # Get latest pump plans to check for recent updates + latest_plans = self.db_client.get_latest_pump_plans() + + current_time = datetime.now() + + for plan in latest_plans: + key = (plan['station_id'], plan['pump_id']) + plan_updated_at = plan.get('plan_updated_at') or plan.get('plan_created_at') + + if plan_updated_at: + # Update last known update time + self.last_update_times[key] = plan_updated_at + + # Check if failsafe should be deactivated + if self.failsafe_active.get(key, False): + # Recent update detected - deactivate failsafe + await self._deactivate_failsafe(plan['station_id'], plan['pump_id']) + else: + # No update time available - treat as no recent update + self.last_update_times[key] = current_time - timedelta(seconds=self.timeout_seconds + 1) + + # Check for stale updates + for key, last_update in self.last_update_times.items(): + station_id, pump_id = key + time_since_update = (current_time - last_update).total_seconds() + + if time_since_update > self.timeout_seconds and not self.failsafe_active.get(key, False): + # Trigger failsafe mode + await self._activate_failsafe(station_id, pump_id, time_since_update) + + # Log status for monitoring + if time_since_update > self.timeout_seconds * 0.8: # 80% of timeout + logger.warning( + "watchdog_update_stale", + station_id=station_id, + pump_id=pump_id, + seconds_since_update=time_since_update, + timeout_seconds=self.timeout_seconds + ) + + except Exception as e: + logger.error("watchdog_check_updates_failed", error=str(e)) + + async def _activate_failsafe(self, station_id: str, pump_id: str, time_since_update: float): + """Activate failsafe mode for a pump.""" + try: + key = (station_id, pump_id) + self.failsafe_active[key] = True + + # Get default setpoint from pump configuration + pump_config = self.db_client.get_pump(station_id, pump_id) + if pump_config: + default_setpoint = pump_config.get('default_setpoint_hz', 30.0) + + # Log failsafe activation + logger.critical( + "failsafe_mode_activated", + station_id=station_id, + pump_id=pump_id, + time_since_update_seconds=time_since_update, + default_setpoint_hz=default_setpoint + ) + + # Record failsafe event in database + self._record_failsafe_event(station_id, pump_id, default_setpoint) + + # TODO: In Phase 3, this will trigger the SetpointManager to use default setpoints + # For now, we just log the event + + else: + logger.error( + "failsafe_activation_failed_no_pump_config", + station_id=station_id, + pump_id=pump_id + ) + + except Exception as e: + logger.error( + "failsafe_activation_failed", + station_id=station_id, + pump_id=pump_id, + error=str(e) + ) + + async def _deactivate_failsafe(self, station_id: str, pump_id: str): + """Deactivate failsafe mode for a pump.""" + try: + key = (station_id, pump_id) + self.failsafe_active[key] = False + + logger.info( + "failsafe_mode_deactivated", + station_id=station_id, + pump_id=pump_id + ) + + # Record failsafe deactivation in database + self._record_failsafe_deactivation(station_id, pump_id) + + except Exception as e: + logger.error( + "failsafe_deactivation_failed", + station_id=station_id, + pump_id=pump_id, + error=str(e) + ) + + def _record_failsafe_event(self, station_id: str, pump_id: str, default_setpoint: float): + """Record failsafe activation in database.""" + try: + query = """ + INSERT INTO failsafe_events + (station_id, pump_id, default_setpoint_hz, timestamp) + VALUES (%s, %s, %s, NOW()) + """ + self.db_client.execute(query, (station_id, pump_id, default_setpoint)) + except Exception as e: + logger.error("failed_to_record_failsafe_event", error=str(e)) + + def _record_failsafe_deactivation(self, station_id: str, pump_id: str): + """Record failsafe deactivation in database.""" + try: + query = """ + INSERT INTO failsafe_events + (station_id, pump_id, event_type, timestamp) + VALUES (%s, %s, 'DEACTIVATED', NOW()) + """ + self.db_client.execute(query, (station_id, pump_id)) + except Exception as e: + logger.error("failed_to_record_failsafe_deactivation", error=str(e)) + + def is_failsafe_active(self, station_id: str, pump_id: str) -> bool: + """Check if failsafe mode is active for a pump.""" + key = (station_id, pump_id) + return self.failsafe_active.get(key, False) + + def get_last_update_time(self, station_id: str, pump_id: str) -> Optional[datetime]: + """Get the last known update time for a pump.""" + key = (station_id, pump_id) + return self.last_update_times.get(key) + + def get_status(self) -> Dict[str, Any]: + """Get watchdog status information.""" + current_time = datetime.now() + + status_info = { + 'running': self.running, + 'timeout_seconds': self.timeout_seconds, + 'check_interval_seconds': self.check_interval_seconds, + 'monitored_pumps': len(self.last_update_times), + 'failsafe_active_pumps': sum(self.failsafe_active.values()), + 'pump_status': {} + } + + for key, last_update in self.last_update_times.items(): + station_id, pump_id = key + time_since_update = (current_time - last_update).total_seconds() + + status_info['pump_status'][f"{station_id}_{pump_id}"] = { + 'last_update': last_update.isoformat(), + 'seconds_since_update': time_since_update, + 'failsafe_active': self.failsafe_active.get(key, False), + 'timeout_percentage': min(100, (time_since_update / self.timeout_seconds) * 100) + } + + return status_info \ No newline at end of file diff --git a/tests/unit/test_alerts.py b/tests/unit/test_alerts.py new file mode 100644 index 0000000..b5e4adb --- /dev/null +++ b/tests/unit/test_alerts.py @@ -0,0 +1,286 @@ +""" +Unit tests for AlertManager. +""" + +import pytest +from unittest.mock import Mock, AsyncMock, patch + +from src.monitoring.alerts import AlertManager +from config.settings import Settings + + +class TestAlertManager: + """Test cases for AlertManager.""" + + def setup_method(self): + """Set up test fixtures.""" + self.settings = Settings() + self.alert_manager = AlertManager(self.settings) + + @pytest.mark.asyncio + async def test_send_alert_success(self): + """Test sending alert successfully.""" + # Arrange + with patch.object(self.alert_manager, '_send_email_alert', AsyncMock(return_value=True)) as mock_email,\ + patch.object(self.alert_manager, '_send_sms_alert', AsyncMock(return_value=True)) as mock_sms,\ + patch.object(self.alert_manager, '_send_webhook_alert', AsyncMock(return_value=True)) as mock_webhook,\ + patch.object(self.alert_manager, '_send_scada_alert', AsyncMock(return_value=True)) as mock_scada: + + # Act + result = await self.alert_manager.send_alert( + alert_type='SAFETY_VIOLATION', + severity='ERROR', + message='Test safety violation', + context={'violation_type': 'OVERSPEED'}, + station_id='STATION_001', + pump_id='PUMP_001' + ) + + # Assert + assert result is True + assert mock_email.called + assert mock_sms.called + assert mock_webhook.called + assert mock_scada.called + + # Check alert history + history = self.alert_manager.get_alert_history() + assert len(history) == 1 + assert history[0]['alert_type'] == 'SAFETY_VIOLATION' + assert history[0]['severity'] == 'ERROR' + assert history[0]['station_id'] == 'STATION_001' + assert history[0]['pump_id'] == 'PUMP_001' + + @pytest.mark.asyncio + async def test_send_alert_partial_failure(self): + """Test sending alert with partial channel failures.""" + # Arrange + with patch.object(self.alert_manager, '_send_email_alert', AsyncMock(return_value=True)) as mock_email,\ + patch.object(self.alert_manager, '_send_sms_alert', AsyncMock(return_value=False)) as mock_sms,\ + patch.object(self.alert_manager, '_send_webhook_alert', AsyncMock(return_value=False)) as mock_webhook,\ + patch.object(self.alert_manager, '_send_scada_alert', AsyncMock(return_value=True)) as mock_scada: + + # Act + result = await self.alert_manager.send_alert( + alert_type='FAILSAFE_ACTIVATED', + severity='CRITICAL', + message='Test failsafe activation' + ) + + # Assert + assert result is True # Should still return True if at least one channel succeeded + assert mock_email.called + assert mock_sms.called + assert mock_webhook.called + assert mock_scada.called + + @pytest.mark.asyncio + async def test_send_alert_all_failures(self): + """Test sending alert when all channels fail.""" + # Arrange + with patch.object(self.alert_manager, '_send_email_alert', AsyncMock(return_value=False)) as mock_email,\ + patch.object(self.alert_manager, '_send_sms_alert', AsyncMock(return_value=False)) as mock_sms,\ + patch.object(self.alert_manager, '_send_webhook_alert', AsyncMock(return_value=False)) as mock_webhook,\ + patch.object(self.alert_manager, '_send_scada_alert', AsyncMock(return_value=False)) as mock_scada: + + # Act + result = await self.alert_manager.send_alert( + alert_type='SYSTEM_ERROR', + severity='ERROR', + message='Test system error' + ) + + # Assert + assert result is False # Should return False if all channels failed + assert mock_email.called + assert mock_sms.called + assert mock_webhook.called + assert mock_scada.called + + @pytest.mark.asyncio + async def test_send_email_alert_success(self): + """Test sending email alert successfully.""" + # Arrange + alert_data = { + 'alert_type': 'TEST_ALERT', + 'severity': 'INFO', + 'message': 'Test message', + 'context': {}, + 'app_name': 'Test App', + 'app_version': '1.0.0', + 'timestamp': 1234567890.0 + } + + with patch('smtplib.SMTP') as mock_smtp: + mock_server = Mock() + mock_smtp.return_value.__enter__.return_value = mock_server + + # Act + result = await self.alert_manager._send_email_alert(alert_data) + + # Assert + assert result is True + assert mock_smtp.called + assert mock_server.send_message.called + + @pytest.mark.asyncio + async def test_send_email_alert_failure(self): + """Test sending email alert with failure.""" + # Arrange + alert_data = { + 'alert_type': 'TEST_ALERT', + 'severity': 'INFO', + 'message': 'Test message', + 'context': {}, + 'app_name': 'Test App', + 'app_version': '1.0.0' + } + + with patch('smtplib.SMTP', side_effect=Exception("SMTP error")): + # Act + result = await self.alert_manager._send_email_alert(alert_data) + + # Assert + assert result is False + + @pytest.mark.asyncio + async def test_send_sms_alert_critical_only(self): + """Test that SMS alerts are only sent for critical events.""" + # Arrange + alert_data_critical = { + 'alert_type': 'CRITICAL_ALERT', + 'severity': 'CRITICAL', + 'message': 'Critical message' + } + + alert_data_info = { + 'alert_type': 'INFO_ALERT', + 'severity': 'INFO', + 'message': 'Info message' + } + + # Act - Critical alert + result_critical = await self.alert_manager._send_sms_alert(alert_data_critical) + + # Act - Info alert + result_info = await self.alert_manager._send_sms_alert(alert_data_info) + + # Assert + assert result_critical is True # Should attempt to send critical alerts + assert result_info is False # Should not send non-critical alerts + + @pytest.mark.asyncio + async def test_send_webhook_alert_success(self): + """Test sending webhook alert successfully.""" + # Arrange + alert_data = { + 'alert_type': 'TEST_ALERT', + 'severity': 'INFO', + 'message': 'Test message' + } + + with patch('aiohttp.ClientSession.post') as mock_post: + mock_response = AsyncMock() + mock_response.status = 200 + mock_post.return_value.__aenter__.return_value = mock_response + + # Act + result = await self.alert_manager._send_webhook_alert(alert_data) + + # Assert + assert result is True + assert mock_post.called + + @pytest.mark.asyncio + async def test_send_webhook_alert_failure(self): + """Test sending webhook alert with failure.""" + # Arrange + alert_data = { + 'alert_type': 'TEST_ALERT', + 'severity': 'INFO', + 'message': 'Test message' + } + + with patch('aiohttp.ClientSession.post') as mock_post: + mock_response = AsyncMock() + mock_response.status = 500 + mock_post.return_value.__aenter__.return_value = mock_response + + # Act + result = await self.alert_manager._send_webhook_alert(alert_data) + + # Assert + assert result is False + assert mock_post.called + + def test_format_email_body(self): + """Test formatting email body.""" + # Arrange + alert_data = { + 'alert_type': 'SAFETY_VIOLATION', + 'severity': 'ERROR', + 'message': 'Speed limit exceeded', + 'context': {'requested_speed': 55.0, 'max_speed': 50.0}, + 'station_id': 'STATION_001', + 'pump_id': 'PUMP_001', + 'timestamp': 1234567890.0, + 'app_name': 'Test App', + 'app_version': '1.0.0' + } + + # Act + body = self.alert_manager._format_email_body(alert_data) + + # Assert + assert 'SAFETY_VIOLATION' in body + assert 'ERROR' in body + assert 'Speed limit exceeded' in body + assert 'STATION_001' in body + assert 'PUMP_001' in body + assert 'requested_speed' in body + assert 'Test App v1.0.0' in body + + def test_alert_history_management(self): + """Test alert history management with size limits.""" + # Arrange - Fill history beyond limit + for i in range(1500): # More than max_history_size (1000) + self.alert_manager._store_alert_history({ + 'alert_type': f'TEST_{i}', + 'severity': 'INFO', + 'message': f'Test message {i}' + }) + + # Act - Get all history (no limit) + history = self.alert_manager.get_alert_history(limit=2000) + + # Assert + assert len(history) == 1000 # Should be limited to max_history_size + assert history[0]['alert_type'] == 'TEST_500' # Should keep most recent + assert history[-1]['alert_type'] == 'TEST_1499' # Most recent at end + + def test_get_alert_stats(self): + """Test getting alert statistics.""" + # Arrange + alerts = [ + {'alert_type': 'SAFETY_VIOLATION', 'severity': 'ERROR'}, + {'alert_type': 'SAFETY_VIOLATION', 'severity': 'ERROR'}, + {'alert_type': 'FAILSAFE_ACTIVATED', 'severity': 'CRITICAL'}, + {'alert_type': 'SYSTEM_ERROR', 'severity': 'ERROR'}, + {'alert_type': 'INFO_ALERT', 'severity': 'INFO'} + ] + + for alert in alerts: + self.alert_manager._store_alert_history(alert) + + # Act + stats = self.alert_manager.get_alert_stats() + + # Assert + assert stats['total_alerts'] == 5 + assert stats['severity_counts']['ERROR'] == 3 + assert stats['severity_counts']['CRITICAL'] == 1 + assert stats['severity_counts']['INFO'] == 1 + assert stats['type_counts']['SAFETY_VIOLATION'] == 2 + assert stats['type_counts']['FAILSAFE_ACTIVATED'] == 1 + assert stats['type_counts']['SYSTEM_ERROR'] == 1 + assert stats['type_counts']['INFO_ALERT'] == 1 \ No newline at end of file diff --git a/tests/unit/test_emergency_stop.py b/tests/unit/test_emergency_stop.py new file mode 100644 index 0000000..9e9a791 --- /dev/null +++ b/tests/unit/test_emergency_stop.py @@ -0,0 +1,152 @@ +""" +Unit tests for EmergencyStopManager. +""" + +import pytest +from unittest.mock import Mock, AsyncMock + +from src.core.emergency_stop import EmergencyStopManager + + +class TestEmergencyStopManager: + """Test cases for EmergencyStopManager.""" + + def setup_method(self): + """Set up test fixtures.""" + self.mock_db_client = Mock() + self.mock_db_client.execute = Mock() + self.emergency_stop_manager = EmergencyStopManager(self.mock_db_client) + + def test_emergency_stop_pump(self): + """Test emergency stop for a specific pump.""" + # Act + result = self.emergency_stop_manager.emergency_stop_pump( + station_id='STATION_001', + pump_id='PUMP_001', + reason='Test emergency stop', + user_id='test_user' + ) + + # Assert + assert result is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_001', 'PUMP_001') is True + assert self.mock_db_client.execute.called + + def test_emergency_stop_station(self): + """Test emergency stop for all pumps in a station.""" + # Act + result = self.emergency_stop_manager.emergency_stop_station( + station_id='STATION_001', + reason='Test station stop', + user_id='test_user' + ) + + # Assert + assert result is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_001', 'PUMP_001') is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_001', 'PUMP_002') is True + assert self.mock_db_client.execute.called + + def test_emergency_stop_system(self): + """Test system-wide emergency stop.""" + # Act + result = self.emergency_stop_manager.emergency_stop_system( + reason='Test system stop', + user_id='test_user' + ) + + # Assert + assert result is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_001', 'PUMP_001') is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_002', 'PUMP_001') is True + assert self.mock_db_client.execute.called + + def test_clear_emergency_stop_pump(self): + """Test clearing emergency stop for a specific pump.""" + # Arrange + self.emergency_stop_manager.emergency_stop_pump('STATION_001', 'PUMP_001') + + # Act + result = self.emergency_stop_manager.clear_emergency_stop_pump( + station_id='STATION_001', + pump_id='PUMP_001', + reason='Test clearance', + user_id='test_user' + ) + + # Assert + assert result is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_001', 'PUMP_001') is False + assert self.mock_db_client.execute.call_count == 2 # Stop + clearance + + def test_clear_emergency_stop_station(self): + """Test clearing emergency stop for a station.""" + # Arrange + self.emergency_stop_manager.emergency_stop_station('STATION_001') + + # Act + result = self.emergency_stop_manager.clear_emergency_stop_station( + station_id='STATION_001', + reason='Test clearance', + user_id='test_user' + ) + + # Assert + assert result is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_001', 'PUMP_001') is False + assert self.mock_db_client.execute.call_count == 2 # Stop + clearance + + def test_clear_emergency_stop_system(self): + """Test clearing system-wide emergency stop.""" + # Arrange + self.emergency_stop_manager.emergency_stop_system() + + # Act + result = self.emergency_stop_manager.clear_emergency_stop_system( + reason='Test clearance', + user_id='test_user' + ) + + # Assert + assert result is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_001', 'PUMP_001') is False + assert self.mock_db_client.execute.call_count == 2 # Stop + clearance + + def test_clear_nonexistent_emergency_stop(self): + """Test clearing emergency stop that doesn't exist.""" + # Act + result = self.emergency_stop_manager.clear_emergency_stop_pump('STATION_001', 'PUMP_001') + + # Assert + assert result is False + assert self.mock_db_client.execute.call_count == 0 + + def test_emergency_stop_priority(self): + """Test that system stop overrides station and pump stops.""" + # Arrange + self.emergency_stop_manager.emergency_stop_pump('STATION_001', 'PUMP_001') + self.emergency_stop_manager.emergency_stop_station('STATION_002') + + # Act + self.emergency_stop_manager.emergency_stop_system() + + # Assert + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_001', 'PUMP_001') is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_002', 'PUMP_001') is True + assert self.emergency_stop_manager.is_emergency_stop_active('STATION_003', 'PUMP_001') is True + + def test_get_emergency_stop_status(self): + """Test getting emergency stop status.""" + # Arrange + self.emergency_stop_manager.emergency_stop_pump('STATION_001', 'PUMP_001') + self.emergency_stop_manager.emergency_stop_station('STATION_002') + self.emergency_stop_manager.emergency_stop_system() + + # Act + status = self.emergency_stop_manager.get_emergency_stop_status() + + # Assert + assert status['system_emergency_stop'] is True + assert 'STATION_002' in status['emergency_stop_stations'] + assert {'station_id': 'STATION_001', 'pump_id': 'PUMP_001'} in status['emergency_stop_pumps'] + assert status['total_active_stops'] == 3 # system + station + pump \ No newline at end of file diff --git a/tests/unit/test_watchdog.py b/tests/unit/test_watchdog.py new file mode 100644 index 0000000..5ac45e3 --- /dev/null +++ b/tests/unit/test_watchdog.py @@ -0,0 +1,183 @@ +""" +Unit tests for DatabaseWatchdog. +""" + +import pytest +from unittest.mock import Mock, AsyncMock, patch +from datetime import datetime, timedelta + +from src.monitoring.watchdog import DatabaseWatchdog + + +class TestDatabaseWatchdog: + """Test cases for DatabaseWatchdog.""" + + def setup_method(self): + """Set up test fixtures.""" + self.mock_db_client = Mock() + self.mock_db_client.execute = Mock() + self.mock_db_client.get_latest_pump_plans = Mock() + self.mock_db_client.get_pump = Mock() + + self.watchdog = DatabaseWatchdog(self.mock_db_client, timeout_seconds=300) # 5 minutes for testing + + @pytest.mark.asyncio + async def test_start_stop(self): + """Test starting and stopping the watchdog.""" + # Act + await self.watchdog.start() + + # Assert + assert self.watchdog.running is True + + # Act + await self.watchdog.stop() + + # Assert + assert self.watchdog.running is False + + @pytest.mark.asyncio + async def test_check_updates_fresh_plans(self): + """Test checking updates with fresh plans.""" + # Arrange + recent_time = datetime.now() - timedelta(minutes=1) + self.mock_db_client.get_latest_pump_plans.return_value = [ + { + 'station_id': 'STATION_001', + 'pump_id': 'PUMP_001', + 'plan_updated_at': recent_time + } + ] + + # Act + await self.watchdog._check_updates() + + # Assert + assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is False + assert self.mock_db_client.get_latest_pump_plans.called + + @pytest.mark.asyncio + async def test_check_updates_stale_plans(self): + """Test checking updates with stale plans.""" + # Arrange + stale_time = datetime.now() - timedelta(minutes=10) # 10 minutes old + self.mock_db_client.get_latest_pump_plans.return_value = [ + { + 'station_id': 'STATION_001', + 'pump_id': 'PUMP_001', + 'plan_updated_at': stale_time + } + ] + self.mock_db_client.get_pump.return_value = { + 'default_setpoint_hz': 30.0 + } + + # Act + await self.watchdog._check_updates() + + # Assert + assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is True + assert self.mock_db_client.execute.called # Should record failsafe event + + @pytest.mark.asyncio + async def test_check_updates_no_plans(self): + """Test checking updates when no plans exist.""" + # Arrange + self.mock_db_client.get_latest_pump_plans.return_value = [] + + # Act + await self.watchdog._check_updates() + + # Assert + assert self.mock_db_client.get_latest_pump_plans.called + # Should not trigger failsafe immediately + + @pytest.mark.asyncio + async def test_activate_failsafe(self): + """Test activating failsafe mode.""" + # Arrange + self.mock_db_client.get_pump.return_value = { + 'default_setpoint_hz': 30.0 + } + + # Act + await self.watchdog._activate_failsafe('STATION_001', 'PUMP_001', 350) + + # Assert + assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is True + assert self.mock_db_client.execute.called + + @pytest.mark.asyncio + async def test_deactivate_failsafe(self): + """Test deactivating failsafe mode.""" + # Arrange + await self.watchdog._activate_failsafe('STATION_001', 'PUMP_001', 350) + + # Act + await self.watchdog._deactivate_failsafe('STATION_001', 'PUMP_001') + + # Assert + assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is False + assert self.mock_db_client.execute.call_count == 2 # Activation + deactivation + + @pytest.mark.asyncio + async def test_failsafe_recovery(self): + """Test failsafe recovery when updates resume.""" + # Arrange - First check with stale plans (trigger failsafe) + stale_time = datetime.now() - timedelta(minutes=10) + self.mock_db_client.get_latest_pump_plans.return_value = [ + { + 'station_id': 'STATION_001', + 'pump_id': 'PUMP_001', + 'plan_updated_at': stale_time + } + ] + self.mock_db_client.get_pump.return_value = {'default_setpoint_hz': 30.0} + + await self.watchdog._check_updates() + assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is True + + # Arrange - Second check with fresh plans (should recover) + recent_time = datetime.now() - timedelta(minutes=1) + self.mock_db_client.get_latest_pump_plans.return_value = [ + { + 'station_id': 'STATION_001', + 'pump_id': 'PUMP_001', + 'plan_updated_at': recent_time + } + ] + + # Act + await self.watchdog._check_updates() + + # Assert + assert self.watchdog.is_failsafe_active('STATION_001', 'PUMP_001') is False + + def test_get_last_update_time(self): + """Test getting last update time.""" + # Arrange + test_time = datetime.now() + self.watchdog.last_update_times[('STATION_001', 'PUMP_001')] = test_time + + # Act + result = self.watchdog.get_last_update_time('STATION_001', 'PUMP_001') + + # Assert + assert result == test_time + + def test_get_status(self): + """Test getting watchdog status.""" + # Arrange + test_time = datetime.now() - timedelta(minutes=2) + self.watchdog.last_update_times[('STATION_001', 'PUMP_001')] = test_time + self.watchdog.failsafe_active[('STATION_001', 'PUMP_001')] = False + + # Act + status = self.watchdog.get_status() + + # Assert + assert status['running'] is False # Not started yet + assert status['timeout_seconds'] == 300 + assert status['monitored_pumps'] == 1 + assert status['failsafe_active_pumps'] == 0 + assert 'STATION_001_PUMP_001' in status['pump_status'] \ No newline at end of file