""" Async Database Client for Calejo Control Adapter. Supports true async/await operations with SQLAlchemy async support. """ from typing import Dict, List, Optional, Any from datetime import datetime import structlog from sqlalchemy import text, MetaData, Table, Column, String, Float, Integer, Boolean, DateTime from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine, AsyncSession from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.pool import AsyncAdaptedQueuePool logger = structlog.get_logger() class AsyncDatabaseClient: """ Async database client supporting true async/await operations. Supports: - PostgreSQL: asyncpg driver - SQLite: aiosqlite driver """ def __init__( self, database_url: str, pool_size: int = 5, max_overflow: int = 10, pool_timeout: int = 30, pool_recycle: int = 3600, query_timeout: int = 30 ): self.database_url = database_url self.pool_size = pool_size self.max_overflow = max_overflow self.pool_timeout = pool_timeout self.pool_recycle = pool_recycle self.query_timeout = query_timeout self.engine: Optional[AsyncEngine] = None self.metadata = MetaData() # Define table schemas (same as flexible_client) self._define_tables() def _define_tables(self): """Define database table schemas.""" self.pump_stations = Table( 'pump_stations', self.metadata, Column('station_id', String(50), primary_key=True), Column('station_name', String(200)), Column('location', String(200)), Column('latitude', Float), Column('longitude', Float), Column('timezone', String(50)), Column('active', Boolean), Column('created_at', DateTime, default=datetime.now), Column('updated_at', DateTime, default=datetime.now) ) self.pumps = Table( 'pumps', self.metadata, Column('station_id', String(50)), Column('pump_id', String(50)), Column('pump_name', String(200)), Column('pump_type', String(50)), Column('control_type', String(50)), Column('manufacturer', String(100)), Column('model', String(100)), Column('rated_power_kw', Float), Column('min_speed_hz', Float, default=20.0), Column('max_speed_hz', Float, default=50.0), Column('default_setpoint_hz', Float, default=35.0), Column('control_parameters', String), # JSONB in PostgreSQL Column('active', Boolean), Column('created_at', DateTime, default=datetime.now), Column('updated_at', DateTime, default=datetime.now) ) self.pump_plans = Table( 'pump_plans', self.metadata, Column('plan_id', Integer, primary_key=True, autoincrement=True), Column('station_id', String(50)), Column('pump_id', String(50)), Column('interval_start', DateTime), Column('interval_end', DateTime), Column('target_flow_m3h', Float), Column('target_power_kw', Float), Column('target_level_m', Float), Column('suggested_speed_hz', Float), Column('plan_created_at', DateTime, default=datetime.now), Column('plan_updated_at', DateTime, default=datetime.now, onupdate=datetime.now), Column('plan_version', Integer), Column('optimization_run_id', Integer), Column('plan_status', String(20), default='ACTIVE'), Column('superseded_by', Integer) ) self.pump_feedback = Table( 'pump_feedback', self.metadata, Column('feedback_id', Integer, primary_key=True, autoincrement=True), Column('station_id', String(50)), Column('pump_id', String(50)), Column('timestamp', DateTime, default=datetime.now), Column('actual_speed_hz', Float), Column('actual_power_kw', Float), Column('actual_flow_m3h', Float), Column('wet_well_level_m', Float), Column('pump_running', Boolean), Column('alarm_active', Boolean), Column('alarm_code', String(50)) ) self.emergency_stop_events = Table( 'emergency_stop_events', self.metadata, Column('event_id', Integer, primary_key=True, autoincrement=True), Column('triggered_by', String(100)), Column('reason', String), Column('station_id', String(50)), Column('pump_id', String(50)), Column('event_timestamp', DateTime, default=datetime.now), Column('cleared_by', String(100)), Column('cleared_timestamp', DateTime), Column('cleared_notes', String) ) self.safety_limit_violations = Table( 'safety_limit_violations', self.metadata, Column('violation_id', Integer, primary_key=True, autoincrement=True), Column('station_id', String(50)), Column('pump_id', String(50)), Column('requested_setpoint', Float), Column('enforced_setpoint', Float), Column('violations', String), Column('timestamp', DateTime, default=datetime.now) ) self.pump_safety_limits = Table( 'pump_safety_limits', self.metadata, Column('station_id', String(50)), Column('pump_id', String(50)), Column('hard_min_speed_hz', Float), Column('hard_max_speed_hz', Float), Column('hard_min_level_m', Float), Column('hard_max_level_m', Float), Column('emergency_stop_level_m', Float), Column('dry_run_protection_level_m', Float), Column('hard_max_power_kw', Float), Column('hard_max_flow_m3h', Float), Column('max_starts_per_hour', Integer), Column('min_run_time_seconds', Integer), Column('max_continuous_run_hours', Integer), Column('max_speed_change_hz_per_min', Float), Column('set_by', String(100)), Column('set_at', DateTime, default=datetime.now), Column('approved_by', String(100)), Column('approved_at', DateTime), Column('notes', String) ) self.failsafe_events = Table( 'failsafe_events', self.metadata, Column('event_id', Integer, primary_key=True, autoincrement=True), Column('station_id', String(50)), Column('pump_id', String(50)), Column('event_type', String(50)), Column('default_setpoint', Float), Column('triggered_by', String(100)), Column('timestamp', DateTime, default=datetime.now), Column('cleared_at', DateTime), Column('notes', String) ) self.audit_log = Table( 'audit_log', self.metadata, Column('log_id', Integer, primary_key=True, autoincrement=True), Column('timestamp', DateTime, default=datetime.now), Column('event_type', String(50)), Column('severity', String(20)), Column('station_id', String(50)), Column('pump_id', String(50)), Column('user_id', String(100)), Column('ip_address', String(50)), Column('protocol', String(20)), Column('action', String(100)), Column('resource', String(200)), Column('result', String(20)), Column('event_data', String) # JSONB in PostgreSQL ) async def connect(self): """Connect to the database asynchronously.""" try: # Convert sync URL to async URL async_url = self._convert_to_async_url(self.database_url) # Create async engine self.engine = create_async_engine( async_url, pool_size=self.pool_size, max_overflow=self.max_overflow, pool_timeout=self.pool_timeout, pool_recycle=self.pool_recycle, poolclass=AsyncAdaptedQueuePool ) # Test connection async with self.engine.connect() as conn: await conn.execute(text("SELECT 1")) logger.info( "async_database_connected", database_type=self._get_database_type(), url=self._get_safe_url() ) except SQLAlchemyError as e: logger.error("async_database_connection_failed", error=str(e)) raise async def disconnect(self): """Disconnect from the database asynchronously.""" if self.engine: await self.engine.dispose() logger.info("async_database_disconnected") def _convert_to_async_url(self, sync_url: str) -> str: """Convert sync database URL to async URL.""" if sync_url.startswith('postgresql://'): return sync_url.replace('postgresql://', 'postgresql+asyncpg://') elif sync_url.startswith('sqlite://'): return sync_url.replace('sqlite://', 'sqlite+aiosqlite://') else: return sync_url def _get_database_type(self) -> str: """Get database type from URL.""" if self.database_url.startswith('sqlite://'): return 'SQLite' elif self.database_url.startswith('postgresql://'): return 'PostgreSQL' else: return 'Unknown' def _get_safe_url(self) -> str: """Get safe URL for logging (without credentials).""" if self.database_url.startswith('postgresql://'): # Remove credentials from PostgreSQL URL parts = self.database_url.split('@') if len(parts) > 1: return f"postgresql://...@{parts[1]}" return self.database_url async def execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """Execute a query asynchronously and return results as dictionaries.""" try: async with self.engine.connect() as conn: result = await conn.execute(text(query), params or {}) return [dict(row._mapping) for row in result] except SQLAlchemyError as e: logger.error("async_query_execution_failed", query=query, error=str(e)) raise async def execute(self, query: str, params: Optional[Dict[str, Any]] = None) -> int: """Execute a query asynchronously and return number of affected rows.""" try: async with self.engine.connect() as conn: result = await conn.execute(text(query), params or {}) await conn.commit() return result.rowcount except SQLAlchemyError as e: logger.error("async_execution_failed", query=query, error=str(e)) raise async def health_check(self) -> bool: """Perform a health check on the database connection.""" try: async with self.engine.connect() as conn: result = await conn.execute(text("SELECT 1")) return result.scalar() == 1 except SQLAlchemyError: return False async def get_connection_info(self) -> Dict[str, Any]: """Get connection information and statistics.""" return { "database_type": self._get_database_type(), "pool_size": self.pool_size, "max_overflow": self.max_overflow, "query_timeout": self.query_timeout, "url": self._get_safe_url() }