""" Flexible Database Client for Calejo Control Adapter. Supports multiple database backends (PostgreSQL, SQLite) using SQLAlchemy Core. """ from typing import Dict, List, Optional, Any from datetime import datetime import structlog from sqlalchemy import create_engine, text, MetaData, Table, Column, String, Float, Integer, Boolean, DateTime from sqlalchemy.engine import Engine from sqlalchemy.exc import SQLAlchemyError logger = structlog.get_logger() class FlexibleDatabaseClient: """ Flexible database client supporting multiple backends. Supports: - PostgreSQL: postgresql://user:pass@host:port/dbname - SQLite: sqlite:///path/to/database.db """ def __init__( self, database_url: str, pool_size: int = 5, max_overflow: int = 10, pool_timeout: int = 30, pool_recycle: int = 3600 ): self.database_url = database_url self.pool_size = pool_size self.max_overflow = max_overflow self.pool_timeout = pool_timeout self.pool_recycle = pool_recycle self.engine: Optional[Engine] = None self.metadata = MetaData() # Define table schemas self._define_tables() def _define_tables(self): """Define database table schemas.""" self.stations = Table( 'stations', self.metadata, Column('station_id', String(50), primary_key=True), Column('station_name', String(100)), Column('location', String(200)), Column('created_at', DateTime, default=datetime.now) ) self.pumps = Table( 'pumps', self.metadata, Column('station_id', String(50)), Column('pump_id', String(50)), Column('pump_name', String(100)), Column('control_type', String(50)), Column('min_speed_hz', Float, default=20.0), Column('max_speed_hz', Float, default=60.0), Column('default_setpoint_hz', Float, default=35.0), Column('created_at', DateTime, default=datetime.now) ) self.pump_plans = Table( 'pump_plans', self.metadata, Column('plan_id', Integer, primary_key=True, autoincrement=True), Column('station_id', String(50)), Column('pump_id', String(50)), Column('target_flow_m3h', Float), Column('target_power_kw', Float), Column('target_level_m', Float), Column('suggested_speed_hz', Float), Column('interval_start', DateTime), Column('interval_end', DateTime), Column('plan_version', Integer), Column('plan_status', String(20), default='ACTIVE'), Column('plan_created_at', DateTime, default=datetime.now), Column('plan_updated_at', DateTime, default=datetime.now, onupdate=datetime.now), Column('optimization_run_id', String(100)) ) self.pump_feedback = Table( 'pump_feedback', self.metadata, Column('feedback_id', Integer, primary_key=True, autoincrement=True), Column('station_id', String(50)), Column('pump_id', String(50)), Column('timestamp', DateTime, default=datetime.now), Column('actual_speed_hz', Float), Column('actual_power_kw', Float), Column('actual_flow_m3h', Float), Column('wet_well_level_m', Float), Column('pump_running', Boolean), Column('alarm_active', Boolean), Column('alarm_code', String(50)) ) self.emergency_stop_events = Table( 'emergency_stop_events', self.metadata, Column('event_id', Integer, primary_key=True, autoincrement=True), Column('triggered_by', String(100)), Column('reason', String), Column('station_id', String(50)), Column('pump_id', String(50)), Column('event_timestamp', DateTime, default=datetime.now), Column('cleared_by', String(100)), Column('cleared_timestamp', DateTime), Column('cleared_notes', String) ) self.safety_limit_violations = Table( 'safety_limit_violations', self.metadata, Column('violation_id', Integer, primary_key=True, autoincrement=True), Column('station_id', String(50)), Column('pump_id', String(50)), Column('requested_setpoint', Float), Column('enforced_setpoint', Float), Column('violations', String), Column('timestamp', DateTime, default=datetime.now) ) async def connect(self): """Connect to the database.""" try: # Configure engine based on database type if self.database_url.startswith('sqlite://'): # SQLite configuration self.engine = create_engine( self.database_url, poolclass=None, # No connection pooling for SQLite connect_args={"check_same_thread": False} ) else: # PostgreSQL configuration self.engine = create_engine( self.database_url, pool_size=self.pool_size, max_overflow=self.max_overflow, pool_timeout=self.pool_timeout, pool_recycle=self.pool_recycle ) # Test connection with self.engine.connect() as conn: conn.execute(text("SELECT 1")) logger.info( "database_connected", database_type=self._get_database_type(), url=self._get_safe_url() ) except SQLAlchemyError as e: logger.error("database_connection_failed", error=str(e)) raise async def disconnect(self): """Disconnect from the database.""" if self.engine: self.engine.dispose() logger.info("database_disconnected") def _get_database_type(self) -> str: """Get database type from URL.""" if self.database_url.startswith('sqlite://'): return 'SQLite' elif self.database_url.startswith('postgresql://'): return 'PostgreSQL' else: return 'Unknown' def _get_safe_url(self) -> str: """Get safe URL for logging (without credentials).""" if self.database_url.startswith('postgresql://'): # Remove credentials from PostgreSQL URL parts = self.database_url.split('@') if len(parts) > 1: return f"postgresql://...@{parts[1]}" return self.database_url def execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """Execute a query and return results as dictionaries.""" try: with self.engine.connect() as conn: result = conn.execute(text(query), params or {}) return [dict(row._mapping) for row in result] except SQLAlchemyError as e: logger.error("query_execution_failed", query=query, error=str(e)) raise def execute(self, query: str, params: Optional[Dict[str, Any]] = None) -> int: """Execute a query and return number of affected rows.""" try: with self.engine.connect() as conn: result = conn.execute(text(query), params or {}) conn.commit() return result.rowcount except SQLAlchemyError as e: logger.error("query_execution_failed", query=query, error=str(e)) raise def health_check(self) -> bool: """Check if database is healthy and responsive.""" try: with self.engine.connect() as conn: result = conn.execute(text("SELECT 1 as health_check")) row = result.fetchone() return row[0] == 1 except SQLAlchemyError as e: logger.error("database_health_check_failed", error=str(e)) return False def get_connection_stats(self) -> Dict[str, Any]: """Get connection pool statistics.""" if not self.engine: return {"status": "not_connected"} return { "database_type": self._get_database_type(), "pool_size": self.pool_size, "max_overflow": self.max_overflow, "status": "connected" } # Database-specific methods def get_pump_stations(self) -> List[Dict[str, Any]]: """Get all pump stations.""" query = "SELECT * FROM stations ORDER BY station_id" return self.execute_query(query) def get_pumps(self, station_id: Optional[str] = None) -> List[Dict[str, Any]]: """Get pumps, optionally filtered by station.""" if station_id: query = "SELECT * FROM pumps WHERE station_id = :station_id ORDER BY pump_id" return self.execute_query(query, {"station_id": station_id}) else: query = "SELECT * FROM pumps ORDER BY station_id, pump_id" return self.execute_query(query) def get_pump(self, station_id: str, pump_id: str) -> Optional[Dict[str, Any]]: """Get specific pump.""" query = """ SELECT * FROM pumps WHERE station_id = :station_id AND pump_id = :pump_id """ result = self.execute_query(query, { "station_id": station_id, "pump_id": pump_id }) return result[0] if result else None def get_current_plan(self, station_id: str, pump_id: str) -> Optional[Dict[str, Any]]: """Get current active plan for a specific pump.""" query = """ SELECT plan_id, target_flow_m3h, target_power_kw, target_level_m, suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, plan_created_at, plan_updated_at, optimization_run_id FROM pump_plans WHERE station_id = :station_id AND pump_id = :pump_id AND interval_start <= datetime('now') AND interval_end >= datetime('now') AND plan_status = 'ACTIVE' ORDER BY plan_version DESC LIMIT 1 """ result = self.execute_query(query, { "station_id": station_id, "pump_id": pump_id }) return result[0] if result else None def get_latest_feedback(self, station_id: str, pump_id: str) -> Optional[Dict[str, Any]]: """Get latest feedback for a specific pump.""" query = """ SELECT timestamp, actual_speed_hz, actual_power_kw, actual_flow_m3h, wet_well_level_m, pump_running, alarm_active, alarm_code FROM pump_feedback WHERE station_id = :station_id AND pump_id = :pump_id ORDER BY timestamp DESC LIMIT 1 """ result = self.execute_query(query, { "station_id": station_id, "pump_id": pump_id }) return result[0] if result else None def get_pump_feedback(self, station_id: str, pump_id: str, limit: int = 10) -> List[Dict[str, Any]]: """Get recent feedback for a specific pump.""" query = """ SELECT timestamp, actual_speed_hz, actual_power_kw, actual_flow_m3h, wet_well_level_m, pump_running, alarm_active, alarm_code FROM pump_feedback WHERE station_id = :station_id AND pump_id = :pump_id ORDER BY timestamp DESC LIMIT :limit """ return self.execute_query(query, { "station_id": station_id, "pump_id": pump_id, "limit": limit }) def get_active_plans(self, resource_type: str = 'pump') -> List[Dict[str, Any]]: """Get all active plans for a resource type.""" query = """ SELECT station_id, pump_id, suggested_speed_hz, interval_start, interval_end, plan_version, plan_status, optimization_run_id FROM pump_plans WHERE interval_start <= CURRENT_TIMESTAMP AND interval_end >= CURRENT_TIMESTAMP AND plan_status = 'ACTIVE' AND resource_type = :resource_type ORDER BY station_id, resource_id, plan_version DESC """ return self.execute_query(query, {"resource_type": resource_type}) def get_safety_limits(self) -> List[Dict[str, Any]]: """Get safety limits for all pumps.""" query = """ SELECT station_id, pump_id, min_speed_hz as hard_min_speed_hz, max_speed_hz as hard_max_speed_hz, NULL as hard_min_level_m, NULL as hard_max_level_m, NULL as hard_max_power_kw, 5.0 as max_speed_change_hz_per_min FROM pumps """ return self.execute_query(query) def create_tables(self): """Create all tables if they don't exist.""" try: self.metadata.create_all(self.engine) logger.info("database_tables_created") except SQLAlchemyError as e: logger.error("failed_to_create_tables", error=str(e)) raise def drop_tables(self): """Drop all tables (for testing).""" try: self.metadata.drop_all(self.engine) logger.info("database_tables_dropped") except SQLAlchemyError as e: logger.error("failed_to_drop_tables", error=str(e)) raise