CalejoControl/src/main.py

225 lines
7.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Calejo Control Adapter - Main Application
Multi-protocol integration adapter for municipal wastewater pump stations
with comprehensive safety and security framework.
Features:
- Phase 1: Core infrastructure (Database, Auto-discovery, Safety)
- Phase 2: Multi-protocol servers (OPC UA, Modbus, REST API)
- Phase 3: Setpoint Manager and Optimization
"""
import asyncio
import signal
import structlog
from typing import Dict, Any, List
from config.settings import Settings
from src.database.flexible_client import FlexibleDatabaseClient
from src.core.auto_discovery import AutoDiscovery
from src.core.safety import SafetyLimitEnforcer
from src.core.emergency_stop import EmergencyStopManager
from src.core.optimization_manager import OptimizationPlanManager
from src.core.setpoint_manager import SetpointManager
from src.monitoring.watchdog import DatabaseWatchdog
from src.monitoring.alerts import AlertManager
from src.protocols.opcua_server import OPCUAServer
from src.protocols.modbus_server import ModbusServer
from src.protocols.rest_api import RESTAPIServer
logger = structlog.get_logger()
class CalejoControlAdapter:
"""Main application class for Calejo Control Adapter."""
def __init__(self, settings: Settings):
self.settings = settings
self.running = False
self.components: List[Any] = []
# Initialize core components (Phase 1)
self.db_client = FlexibleDatabaseClient(
database_url=settings.database_url,
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
pool_timeout=settings.db_pool_timeout,
pool_recycle=settings.db_pool_recycle
)
self.components.append(self.db_client)
self.auto_discovery = AutoDiscovery(
db_client=self.db_client,
refresh_interval_minutes=settings.auto_discovery_refresh_minutes
)
self.components.append(self.auto_discovery)
self.emergency_stop_manager = EmergencyStopManager(self.db_client)
self.components.append(self.emergency_stop_manager)
self.safety_enforcer = SafetyLimitEnforcer(self.db_client, self.emergency_stop_manager)
self.components.append(self.safety_enforcer)
self.optimization_manager = OptimizationPlanManager(
db_client=self.db_client,
refresh_interval_seconds=settings.optimization_refresh_seconds
)
self.components.append(self.optimization_manager)
self.alert_manager = AlertManager(settings)
self.components.append(self.alert_manager)
self.watchdog = DatabaseWatchdog(
self.db_client, self.alert_manager, settings.safety_timeout_seconds
)
self.components.append(self.watchdog)
# Initialize Setpoint Manager (Phase 3)
self.setpoint_manager = SetpointManager(
discovery=self.auto_discovery,
db_client=self.db_client,
safety_enforcer=self.safety_enforcer,
emergency_stop_manager=self.emergency_stop_manager,
watchdog=self.watchdog
)
self.components.append(self.setpoint_manager)
# Protocol servers (Phase 2)
self.opc_ua_server = OPCUAServer(
endpoint=f"opc.tcp://{settings.opcua_host}:{settings.opcua_port}",
server_name="Calejo Control OPC UA Server"
)
self.components.append(self.opc_ua_server)
self.modbus_server = ModbusServer(
host=settings.modbus_host,
port=settings.modbus_port,
unit_id=settings.modbus_unit_id
)
self.components.append(self.modbus_server)
self.rest_api = RESTAPIServer(
host=settings.rest_api_host,
port=settings.rest_api_port
)
self.components.append(self.rest_api)
async def start(self):
"""Start the Calejo Control Adapter."""
logger.info("starting_calejo_control_adapter", version="2.0.0")
try:
# Connect to database
await self.db_client.connect()
logger.info("database_connected")
# Load safety limits
await self.safety_enforcer.load_safety_limits()
logger.info("safety_limits_loaded")
# Auto-discover pump stations and pumps
await self.auto_discovery.discover()
logger.info("auto_discovery_completed")
# Start optimization manager
await self.optimization_manager.start()
logger.info("optimization_manager_started")
# Start monitoring
await self.watchdog.start()
logger.info("watchdog_started")
# Start Setpoint Manager
await self.setpoint_manager.start()
logger.info("setpoint_manager_started")
# Start protocol servers
await asyncio.gather(
self.opc_ua_server.start(),
self.modbus_server.start(),
self.rest_api.start(),
)
logger.info("protocol_servers_started")
self.running = True
logger.info("calejo_control_adapter_started")
# Keep application running
while self.running:
await asyncio.sleep(1)
except Exception as e:
logger.error("failed_to_start_adapter", error=str(e))
await self.stop()
raise
async def stop(self):
"""Stop the Calejo Control Adapter gracefully."""
logger.info("stopping_calejo_control_adapter")
self.running = False
# Stop all components in reverse order
stop_tasks = []
# Stop protocol servers
stop_tasks.extend([
self.opc_ua_server.stop(),
self.modbus_server.stop(),
self.rest_api.stop(),
])
# Stop Setpoint Manager
if self.setpoint_manager:
stop_tasks.append(self.setpoint_manager.stop())
# Stop monitoring
if self.watchdog:
stop_tasks.append(self.watchdog.stop())
# Stop optimization manager
if self.optimization_manager:
stop_tasks.append(self.optimization_manager.stop())
# Close database connection
if self.db_client:
stop_tasks.append(self.db_client.disconnect())
# Execute all stop tasks
await asyncio.gather(*stop_tasks, return_exceptions=True)
logger.info("calejo_control_adapter_stopped")
def handle_shutdown(signum, frame):
"""Handle shutdown signals gracefully."""
logger.info("received_shutdown_signal", signal=signum)
# Signal handling will be implemented in the async context
async def main():
"""Main application entry point."""
# Load settings
settings = Settings()
# Set up signal handlers
signal.signal(signal.SIGINT, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
# Create and start adapter
adapter = CalejoControlAdapter(settings)
try:
await adapter.start()
except KeyboardInterrupt:
logger.info("keyboard_interrupt_received")
except Exception as e:
logger.error("unexpected_error", error=str(e))
raise
finally:
await adapter.stop()
if __name__ == "__main__":
asyncio.run(main())