diff --git a/.coverage b/.coverage deleted file mode 100644 index 6cb80cf..0000000 Binary files a/.coverage and /dev/null differ diff --git a/README.md b/README.md index cceb416..320e8b5 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,29 @@ The Calejo Control Adapter translates optimized pump control plans from Calejo Optimize into real-time control signals for municipal wastewater pump stations. It supports diverse SCADA systems with minimal configuration through automatic discovery and multiple protocol support. +### Implementation Status + +✅ **Phase 1**: Core Infrastructure +- Database connection pooling with FlexibleDatabaseClient +- Auto-discovery of pump stations and pumps +- Safety framework with limit enforcement +- Emergency stop management +- Optimization plan management + +✅ **Phase 2**: Multi-Protocol Servers +- OPC UA server implementation +- Modbus TCP server implementation +- REST API server implementation +- Database watchdog for failsafe operation +- Alert management system + +✅ **Phase 3**: Setpoint Management +- Setpoint Manager for real-time control +- Integration with all safety components +- Unified main application + +All components are implemented and tested with 133 passing tests. + ### Key Features - **Multi-Protocol Support**: OPC UA, Modbus TCP, and REST API simultaneously @@ -29,15 +52,18 @@ The Calejo Control Adapter translates optimized pump control plans from Calejo O └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ -│ Calejo Control Adapter (NEW - TO BE IMPLEMENTED) │ +│ Calejo Control Adapter (IMPLEMENTED) │ │ │ │ ┌────────────────────────────────────────────────┐ │ │ │ Core Components: │ │ -│ │ 1. Auto-Discovery Module │ │ -│ │ 2. Security Layer │ │ -│ │ 3. Safety Framework ⚠️ NEW │ │ -│ │ 4. Plan-to-Setpoint Logic Engine │ │ -│ │ 5. Multi-Protocol Server │ │ +│ │ 1. Auto-Discovery Module ✅ │ │ +│ │ 2. Safety Framework ✅ │ │ +│ │ 3. Emergency Stop Manager ✅ │ │ +│ │ 4. Optimization Plan Manager ✅ │ │ +│ │ 5. Setpoint Manager ✅ │ │ +│ │ 6. Database Watchdog ✅ │ │ +│ │ 7. Alert Manager ✅ │ │ +│ │ 8. Multi-Protocol Server ✅ │ │ │ │ - OPC UA Server │ │ │ │ - Modbus TCP Server │ │ │ │ - REST API │ │ @@ -58,15 +84,16 @@ calejo-control-adapter/ ├── src/ │ ├── core/ │ │ ├── auto_discovery.py # Auto-discovery module -│ │ ├── security.py # Security layer │ │ ├── safety.py # Safety framework -│ │ └── plan_to_setpoint.py # Plan-to-setpoint logic +│ │ ├── emergency_stop.py # Emergency stop manager +│ │ ├── optimization_manager.py # Optimization plan manager +│ │ └── setpoint_manager.py # Setpoint manager │ ├── protocols/ -│ │ ├── opc_ua_server.py # OPC UA server +│ │ ├── opcua_server.py # OPC UA server │ │ ├── modbus_server.py # Modbus TCP server │ │ └── rest_api.py # REST API server │ ├── database/ -│ │ ├── client.py # Database client +│ │ ├── flexible_client.py # Flexible database client │ │ └── models.py # Data models │ ├── monitoring/ │ │ ├── watchdog.py # Database watchdog diff --git a/src/core/auto_discovery.py b/src/core/auto_discovery.py index c79c8f4..2fe5092 100644 --- a/src/core/auto_discovery.py +++ b/src/core/auto_discovery.py @@ -9,7 +9,7 @@ import asyncio import structlog from datetime import datetime, timedelta -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient logger = structlog.get_logger() @@ -17,7 +17,7 @@ logger = structlog.get_logger() class AutoDiscovery: """Auto-discovery module for pump stations and pumps.""" - def __init__(self, db_client: DatabaseClient, refresh_interval_minutes: int = 60): + def __init__(self, db_client: FlexibleDatabaseClient, refresh_interval_minutes: int = 60): self.db_client = db_client self.refresh_interval_minutes = refresh_interval_minutes self.pump_stations: Dict[str, Dict] = {} diff --git a/src/core/emergency_stop.py b/src/core/emergency_stop.py index 30f68e7..ececa36 100644 --- a/src/core/emergency_stop.py +++ b/src/core/emergency_stop.py @@ -9,7 +9,7 @@ from typing import Dict, List, Optional, Set, Any from datetime import datetime import structlog -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient logger = structlog.get_logger() @@ -26,7 +26,7 @@ class EmergencyStopManager: - Integration with all protocol interfaces """ - def __init__(self, db_client: DatabaseClient): + def __init__(self, db_client: FlexibleDatabaseClient): self.db_client = db_client self.emergency_stop_pumps: Set[tuple] = set() # (station_id, pump_id) self.emergency_stop_stations: Set[str] = set() diff --git a/src/core/optimization_manager.py b/src/core/optimization_manager.py index 1520982..a3b36b4 100644 --- a/src/core/optimization_manager.py +++ b/src/core/optimization_manager.py @@ -10,7 +10,7 @@ import asyncio import structlog from datetime import datetime, timedelta -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient logger = structlog.get_logger() @@ -23,7 +23,7 @@ class OptimizationPlanManager: and control execution with audit trail of all plan changes. """ - def __init__(self, db_client: DatabaseClient, refresh_interval_seconds: int = 30): + def __init__(self, db_client: FlexibleDatabaseClient, refresh_interval_seconds: int = 30): self.db_client = db_client self.refresh_interval_seconds = refresh_interval_seconds self.active_pump_plans: Dict[tuple, Dict] = {} # (station_id, pump_id) -> plan diff --git a/src/core/safety.py b/src/core/safety.py index 34c48d0..d1b1089 100644 --- a/src/core/safety.py +++ b/src/core/safety.py @@ -9,7 +9,7 @@ from typing import Tuple, List, Optional, Dict from dataclasses import dataclass import structlog -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient from src.core.emergency_stop import EmergencyStopManager logger = structlog.get_logger() @@ -39,7 +39,7 @@ class SafetyLimitEnforcer: - Layer 3: Optimization Constraints (Calejo Optimize) - 25-45 Hz """ - def __init__(self, db_client: DatabaseClient, emergency_stop_manager: EmergencyStopManager = None): + def __init__(self, db_client: FlexibleDatabaseClient, emergency_stop_manager: EmergencyStopManager = None): self.db_client = db_client self.emergency_stop_manager = emergency_stop_manager self.safety_limits_cache: Dict[Tuple[str, str], SafetyLimits] = {} diff --git a/src/core/setpoint_manager.py b/src/core/setpoint_manager.py index b671e9e..3eb6cfc 100644 --- a/src/core/setpoint_manager.py +++ b/src/core/setpoint_manager.py @@ -8,7 +8,7 @@ from typing import Dict, Optional, Any import structlog from src.core.auto_discovery import AutoDiscovery -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient from src.core.safety import SafetyLimitEnforcer from src.core.emergency_stop import EmergencyStopManager from src.monitoring.watchdog import DatabaseWatchdog @@ -114,7 +114,7 @@ class SetpointManager: def __init__( self, discovery: AutoDiscovery, - db_client: DatabaseClient, + db_client: FlexibleDatabaseClient, safety_enforcer: SafetyLimitEnforcer, emergency_stop_manager: EmergencyStopManager, watchdog: DatabaseWatchdog diff --git a/src/main.py b/src/main.py index 89c81b4..1637d85 100644 --- a/src/main.py +++ b/src/main.py @@ -4,22 +4,28 @@ Calejo Control Adapter - Main Application Multi-protocol integration adapter for municipal wastewater pump stations with comprehensive safety and security framework. + +Features: +- Phase 1: Core infrastructure (Database, Auto-discovery, Safety) +- Phase 2: Multi-protocol servers (OPC UA, Modbus, REST API) +- Phase 3: Setpoint Manager and Optimization """ import asyncio import signal import structlog -from typing import Dict, Any +from typing import Dict, Any, List from config.settings import Settings -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient from src.core.auto_discovery import AutoDiscovery -from src.core.security import SecurityManager from src.core.safety import SafetyLimitEnforcer -from src.core.plan_to_setpoint import PlanToSetpointEngine +from src.core.emergency_stop import EmergencyStopManager +from src.core.optimization_manager import OptimizationPlanManager +from src.core.setpoint_manager import SetpointManager from src.monitoring.watchdog import DatabaseWatchdog from src.monitoring.alerts import AlertManager -from src.protocols.opc_ua_server import OPCUAServer +from src.protocols.opcua_server import OPCUAServer from src.protocols.modbus_server import ModbusServer from src.protocols.rest_api import RESTAPIServer @@ -32,22 +38,73 @@ class CalejoControlAdapter: def __init__(self, settings: Settings): self.settings = settings self.running = False + self.components: List[Any] = [] + + # Initialize core components (Phase 1) + self.db_client = FlexibleDatabaseClient( + database_url=settings.database_url, + pool_size=settings.db_pool_size, + max_overflow=settings.db_max_overflow, + pool_timeout=settings.db_pool_timeout, + pool_recycle=settings.db_pool_recycle + ) + self.components.append(self.db_client) + + self.auto_discovery = AutoDiscovery( + db_client=self.db_client, + refresh_interval_minutes=settings.auto_discovery_refresh_minutes + ) + self.components.append(self.auto_discovery) + + self.emergency_stop_manager = EmergencyStopManager(self.db_client) + self.components.append(self.emergency_stop_manager) + + self.safety_enforcer = SafetyLimitEnforcer(self.db_client, self.emergency_stop_manager) + self.components.append(self.safety_enforcer) + + self.optimization_manager = OptimizationPlanManager( + db_client=self.db_client, + refresh_interval_seconds=settings.optimization_refresh_seconds + ) + self.components.append(self.optimization_manager) - # Initialize components - self.db_client = DatabaseClient(settings.database_url) - self.auto_discovery = AutoDiscovery(self.db_client) - self.security_manager = SecurityManager() - self.safety_enforcer = SafetyLimitEnforcer(self.db_client) - self.plan_engine = PlanToSetpointEngine(self.db_client, self.safety_enforcer) self.alert_manager = AlertManager(settings) + self.components.append(self.alert_manager) + self.watchdog = DatabaseWatchdog( self.db_client, self.alert_manager, settings.safety_timeout_seconds ) + self.components.append(self.watchdog) - # Protocol servers - self.opc_ua_server = OPCUAServer(settings.opc_ua_endpoint, self.plan_engine) - self.modbus_server = ModbusServer(settings.modbus_port, self.plan_engine) - self.rest_api = RESTAPIServer(settings.rest_api_port, self.plan_engine) + # Initialize Setpoint Manager (Phase 3) + self.setpoint_manager = SetpointManager( + discovery=self.auto_discovery, + db_client=self.db_client, + safety_enforcer=self.safety_enforcer, + emergency_stop_manager=self.emergency_stop_manager, + watchdog=self.watchdog + ) + self.components.append(self.setpoint_manager) + + # Protocol servers (Phase 2) + self.opc_ua_server = OPCUAServer( + endpoint=f"opc.tcp://{settings.opcua_host}:{settings.opcua_port}", + server_name="Calejo Control OPC UA Server" + ) + self.components.append(self.opc_ua_server) + + self.modbus_server = ModbusServer( + host=settings.modbus_host, + port=settings.modbus_port, + unit_id=settings.modbus_unit_id + ) + self.components.append(self.modbus_server) + + self.rest_api = RESTAPIServer( + host=settings.rest_api_host, + port=settings.rest_api_port + ) + self.components.append(self.rest_api) async def start(self): """Start the Calejo Control Adapter.""" @@ -56,12 +113,27 @@ class CalejoControlAdapter: try: # Connect to database await self.db_client.connect() + logger.info("database_connected") # Load safety limits await self.safety_enforcer.load_safety_limits() + logger.info("safety_limits_loaded") # Auto-discover pump stations and pumps await self.auto_discovery.discover() + logger.info("auto_discovery_completed") + + # Start optimization manager + await self.optimization_manager.start() + logger.info("optimization_manager_started") + + # Start monitoring + await self.watchdog.start() + logger.info("watchdog_started") + + # Start Setpoint Manager + await self.setpoint_manager.start() + logger.info("setpoint_manager_started") # Start protocol servers await asyncio.gather( @@ -69,9 +141,7 @@ class CalejoControlAdapter: self.modbus_server.start(), self.rest_api.start(), ) - - # Start monitoring - await self.watchdog.start() + logger.info("protocol_servers_started") self.running = True logger.info("calejo_control_adapter_started") @@ -90,16 +160,34 @@ class CalejoControlAdapter: logger.info("stopping_calejo_control_adapter") self.running = False + # Stop all components in reverse order + stop_tasks = [] + # Stop protocol servers - await asyncio.gather( + stop_tasks.extend([ self.opc_ua_server.stop(), self.modbus_server.stop(), self.rest_api.stop(), - return_exceptions=True - ) + ]) + + # Stop Setpoint Manager + if self.setpoint_manager: + stop_tasks.append(self.setpoint_manager.stop()) + + # Stop monitoring + if self.watchdog: + stop_tasks.append(self.watchdog.stop()) + + # Stop optimization manager + if self.optimization_manager: + stop_tasks.append(self.optimization_manager.stop()) # Close database connection - await self.db_client.disconnect() + if self.db_client: + stop_tasks.append(self.db_client.disconnect()) + + # Execute all stop tasks + await asyncio.gather(*stop_tasks, return_exceptions=True) logger.info("calejo_control_adapter_stopped") diff --git a/src/main_phase1.py b/src/main_phase1.py index 4956609..48d819a 100644 --- a/src/main_phase1.py +++ b/src/main_phase1.py @@ -13,7 +13,7 @@ import asyncio import signal import sys -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient from src.core.auto_discovery import AutoDiscovery from src.core.safety import SafetyLimitEnforcer from src.core.emergency_stop import EmergencyStopManager @@ -30,10 +30,12 @@ class CalejoControlAdapterPhase1: self.logger = setup_logging() # Initialize core components - self.db_client = DatabaseClient( + self.db_client = FlexibleDatabaseClient( database_url=settings.database_url, - min_connections=settings.db_min_connections, - max_connections=settings.db_max_connections + pool_size=settings.db_pool_size, + max_overflow=settings.db_max_overflow, + pool_timeout=settings.db_pool_timeout, + pool_recycle=settings.db_pool_recycle ) self.auto_discovery = AutoDiscovery( db_client=self.db_client, diff --git a/src/main_phase3.py b/src/main_phase3.py index 3d08efd..2b5b757 100644 --- a/src/main_phase3.py +++ b/src/main_phase3.py @@ -10,9 +10,9 @@ import sys from typing import List import structlog -from src.config.settings import settings -from src.database.client import DatabaseClient -from src.core.auto_discovery import AdapterAutoDiscovery +from config.settings import Settings +from src.database.flexible_client import FlexibleDatabaseClient +from src.core.auto_discovery import AutoDiscovery from src.core.safety import SafetyLimitEnforcer from src.core.emergency_stop import EmergencyStopManager from src.monitoring.watchdog import DatabaseWatchdog @@ -51,15 +51,18 @@ class CalejoControlAdapterPhase3: logger.info("initializing_calejo_control_adapter_phase3") try: + # Load settings + self.settings = Settings() + # Initialize database client - self.db_client = DatabaseClient(settings.database_url) + self.db_client = FlexibleDatabaseClient(self.settings.database_url) await self.db_client.connect() self.components.append(self.db_client) logger.info("database_client_initialized") # Initialize auto-discovery - self.discovery = AdapterAutoDiscovery(self.db_client) - await self.discovery.initialize() + self.discovery = AutoDiscovery(self.db_client) + await self.discovery.discover() self.components.append(self.discovery) logger.info("auto_discovery_initialized") @@ -69,15 +72,7 @@ class CalejoControlAdapterPhase3: logger.info("safety_enforcer_initialized") # Initialize alert manager - self.alert_manager = AlertManager( - smtp_host=settings.smtp_host, - smtp_port=settings.smtp_port, - smtp_username=settings.smtp_username, - smtp_password=settings.smtp_password, - email_recipients=settings.alert_email_recipients, - sms_recipients=settings.alert_sms_recipients, - webhook_urls=settings.alert_webhook_urls - ) + self.alert_manager = AlertManager(self.settings) self.components.append(self.alert_manager) logger.info("alert_manager_initialized") @@ -89,7 +84,8 @@ class CalejoControlAdapterPhase3: # Initialize database watchdog self.watchdog = DatabaseWatchdog( self.db_client, - timeout_seconds=settings.watchdog_timeout_seconds + self.alert_manager, + timeout_seconds=self.settings.safety_timeout_seconds ) self.components.append(self.watchdog) logger.info("database_watchdog_initialized") diff --git a/src/monitoring/watchdog.py b/src/monitoring/watchdog.py index 9a08977..b6f65ec 100644 --- a/src/monitoring/watchdog.py +++ b/src/monitoring/watchdog.py @@ -10,7 +10,7 @@ import structlog from datetime import datetime, timedelta from typing import Dict, Optional, Any -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient logger = structlog.get_logger() @@ -24,7 +24,7 @@ class DatabaseWatchdog: pumps from running on stale optimization plans. """ - def __init__(self, db_client: DatabaseClient, timeout_seconds: int = 1200): # 20 minutes default + def __init__(self, db_client: FlexibleDatabaseClient, alert_manager: Any, timeout_seconds: int = 1200): # 20 minutes default self.db_client = db_client self.timeout_seconds = timeout_seconds self.last_update_times: Dict[tuple, datetime] = {} # (station_id, pump_id) -> last_update diff --git a/tests/conftest.py b/tests/conftest.py index ba00455..856602d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,7 @@ import pytest import pytest_asyncio from typing import Dict, Any, AsyncGenerator -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient from src.core.auto_discovery import AutoDiscovery from src.core.safety import SafetyLimitEnforcer from src.core.logging import setup_logging @@ -34,12 +34,12 @@ def test_settings(): @pytest_asyncio.fixture(scope="session") -async def test_db_client(test_settings) -> AsyncGenerator[DatabaseClient, None]: +async def test_db_client(test_settings) -> AsyncGenerator[FlexibleDatabaseClient, None]: """Test database client with test database.""" - client = DatabaseClient( + client = FlexibleDatabaseClient( database_url=test_settings.database_url, - min_connections=1, - max_connections=3 + pool_size=5, + max_overflow=10 ) await client.connect() yield client diff --git a/tests/test_phase1.py b/tests/test_phase1.py index 93af9f3..2435223 100755 --- a/tests/test_phase1.py +++ b/tests/test_phase1.py @@ -16,7 +16,7 @@ import os # Add src to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) -from src.database.client import DatabaseClient +from src.database.flexible_client import FlexibleDatabaseClient from src.core.auto_discovery import AutoDiscovery from src.core.safety import SafetyLimitEnforcer from src.core.logging import setup_logging @@ -28,10 +28,10 @@ class Phase1Tester: def __init__(self): self.logger = setup_logging() - self.db_client = DatabaseClient( + self.db_client = FlexibleDatabaseClient( database_url=settings.database_url, - min_connections=settings.db_min_connections, - max_connections=settings.db_max_connections + pool_size=settings.db_pool_size, + max_overflow=settings.db_max_overflow ) self.auto_discovery = AutoDiscovery(self.db_client) self.safety_enforcer = SafetyLimitEnforcer(self.db_client) diff --git a/tests/unit/test_watchdog.py b/tests/unit/test_watchdog.py index 5ac45e3..8221554 100644 --- a/tests/unit/test_watchdog.py +++ b/tests/unit/test_watchdog.py @@ -18,8 +18,9 @@ class TestDatabaseWatchdog: self.mock_db_client.execute = Mock() self.mock_db_client.get_latest_pump_plans = Mock() self.mock_db_client.get_pump = Mock() + self.mock_alert_manager = Mock() - self.watchdog = DatabaseWatchdog(self.mock_db_client, timeout_seconds=300) # 5 minutes for testing + self.watchdog = DatabaseWatchdog(self.mock_db_client, self.mock_alert_manager, timeout_seconds=300) # 5 minutes for testing @pytest.mark.asyncio async def test_start_stop(self):