From 1bb98a7a3b0f7938c070052cbd359d1934e8b312 Mon Sep 17 00:00:00 2001 From: openhands Date: Mon, 27 Oct 2025 07:00:57 +0000 Subject: [PATCH] Extend optimization system with version-based updates - Added version-based optimization plan management with Strategy B approach - Extended database schema with plan versioning and status tracking - Created generic optimization_plans table for multi-actuator support - Implemented OptimizationPlanManager for real-time plan monitoring - Added comprehensive documentation for optimization plan management - Updated main application to include optimization manager - All 66 unit tests continue to pass Co-authored-by: openhands --- .coverage | Bin 0 -> 53248 bytes config/settings.py | 8 + database/schema.sql | 68 +++++- database/test_data.sql | 44 ++-- docs/optimization_plan_management.md | 296 +++++++++++++++++++++++++++ src/core/optimization_manager.py | 214 +++++++++++++++++++ src/database/client.py | 94 ++++++++- src/main_phase1.py | 28 ++- 8 files changed, 729 insertions(+), 23 deletions(-) create mode 100644 .coverage create mode 100644 docs/optimization_plan_management.md create mode 100644 src/core/optimization_manager.py diff --git a/.coverage b/.coverage new file mode 100644 index 0000000000000000000000000000000000000000..39cf1078a1c18a1b628baffb682779229e631ddd GIT binary patch literal 53248 zcmeI)Piz!b90%~3o&Cf1-(ZkznuIBFkzY9eY+jlompmP zX?v);#e{@}lLs&UImk&9FUFGxP4r+ikr-mQXks7|qQr}06bSnJ&7W?Uwq!Y!3VtWu z-TCw0oA*BN-_A}mFYViFdQy~ar)GF!i?UWxRb`hDilU_H)l08vNzqC?+MrK$!TNHm zX=U`(gT49zr6+k>(GT@LuaES+-TPSgcRjWAsqS}EM!HA`us{F;5P-n{TcEkGCza{% zSLbFuqfnKeV-%$my!Sr+;_iw4yT$&Ar}pj^!9KC6OVBbhA|`}mPl>v8MA@uLVOk}# zXn3Yo5#FSXLN{Dl;)#y-(NxC)NB#J`mN!cj%aavKqVAYA!iWLY}WDoQuV#U`^f)HQVCwXRfV-8%K8^vh&8Mf&S*D-G>v6=hGE z5-QqGiF03Y468UP-C<$Om#}D4KBv8)w2ocNdbX&VmS0TEH9ga|gq)Veh9^r)7I2u+ zqJ{7lmWI+g%mq3e#0<~1K&KUXiI`#VFmjnPXW0>JD}%W!NFR1US~g?%O)VHgf~G+~DUDW|L2+Bmb62T`cn zh4!}Qw-{Ga2cfYb3RZ4dMeZstm!W7MmyS`9qv6qDSXk|bL`^4`PG)v*T(0TNH-UU^ zezmLFpG;*22GqH<-;BcA3O`b9O(?3#C5F-HP)!H@b+1f6HUZg`Voao%}1WqdG^ z%xqu3T*f&I`5eC|PrMTIvwX7O*RRo_&o8Yl%e<$R<>%zg$Z?}eqeCH~r?xb5AS3K?1Gq$euzC`N64szhJwrNz*D%TkaFlUNJanxI7BE zgQ8&DRcTnk+(0q*xO7g_6@DHT&q_O6X}B|-M&_`Lpu2OWEJ$%`QZ7m^ zn2nw>Jn5M=DL9Cq8NXpo8LlWgk`B^bIy9)=t{HG`<#YVbSIKMmt6X!Vn#ye6tTr2A zXAH)fyi1dM&4{ucZpPXg$t?$4?d5qIaOvC5!gj)T3pjW)I0FxEK@AoHg(y_`U8ie^ zFYs0UxF z+Q0$<2tWV=5P$##AOHafKmY;|xcdTIlbX`VPT9^;w@wcP$`*~PJYtXVQz9I@IzmrK zsC&}Mx=t}$GfXRAr|Vpqqh|q(B{Zedc|?k_Ev_k3oyDL>^vG=0u2kqbJrr}R)^R5N zn66Ql-V8cj9ae%`oDo*zW=Z4Hx&BA0s#m>00Izz00bZa0SG_<0uX?}eJG%5y=lJx zAJ00Izz00bZa0SG_<0uX?}$_hLbkEhc+E@ZF&a`97Z{Bh-v z^N+l)D7%!HP_CVSc!L&K(>t`-!JAu@L)Xv#s(yau&2N5Ae6{m~Pd?_lNRV&mH*o?^(Y8AJ;D_ z`X&9EentO%WebSZApijgKmY;|fB*y_009U<00IzLIe`TIuAs(~^gDp6Y5qq%mY}V4 zjPL&|Ygg{fNEreUfB*y_009U<00Izz00bZafz=n_`~SH9U;TPO4j=#l2tWV=5P$## qAOHafKmY str: """Generate database URL from components.""" diff --git a/database/schema.sql b/database/schema.sql index 30a12d0..9fcceba 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -60,7 +60,7 @@ COMMENT ON TABLE pumps IS 'Metadata about individual pumps'; COMMENT ON COLUMN pumps.default_setpoint_hz IS 'Default safe setpoint used in failsafe mode (existing pump configuration)'; COMMENT ON COLUMN pumps.control_parameters IS 'Control-specific parameters in JSON format (PID gains, thresholds, etc.)'; --- Create pump_plans table +-- Create pump_plans table with version-based updates CREATE TABLE pump_plans ( plan_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, @@ -74,17 +74,73 @@ CREATE TABLE pump_plans ( target_level_m DECIMAL(5, 2), suggested_speed_hz DECIMAL(5, 2), - -- Metadata + -- Version-based update metadata plan_created_at TIMESTAMP DEFAULT NOW(), + plan_updated_at TIMESTAMP DEFAULT NOW(), + plan_version INTEGER DEFAULT 1, optimization_run_id INTEGER, - FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) + -- Status tracking + plan_status VARCHAR(20) DEFAULT 'ACTIVE', -- 'ACTIVE', 'SUPERSEDED', 'CANCELLED' + superseded_by INTEGER, -- Points to plan_id that superseded this plan + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id), + FOREIGN KEY (superseded_by) REFERENCES pump_plans(plan_id) ); -COMMENT ON TABLE pump_plans IS 'Optimization plans generated by Calejo Optimize'; +COMMENT ON TABLE pump_plans IS 'Optimization plans generated by Calejo Optimize with version-based updates'; +COMMENT ON COLUMN pump_plans.plan_version IS 'Version number for tracking updates (increments on each update)'; +COMMENT ON COLUMN pump_plans.plan_status IS 'Status of the plan: ACTIVE, SUPERSEDED, or CANCELLED'; +COMMENT ON COLUMN pump_plans.superseded_by IS 'Reference to plan that superseded this one (for audit trail)'; --- Create index for active pump plans -CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end); +-- Create indexes for efficient plan management +CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end) + WHERE plan_status = 'ACTIVE'; +CREATE INDEX idx_pump_plans_latest ON pump_plans(station_id, pump_id, plan_created_at DESC) + WHERE plan_status = 'ACTIVE'; +CREATE INDEX idx_pump_plans_version ON pump_plans(station_id, pump_id, plan_version DESC); + +-- Create generic optimization_plans table for future actuator support +CREATE TABLE optimization_plans ( + plan_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + resource_id VARCHAR(50) NOT NULL, + resource_type VARCHAR(20) NOT NULL DEFAULT 'PUMP', -- 'PUMP', 'VALVE', 'BLOWER', etc. + + interval_start TIMESTAMP NOT NULL, + interval_end TIMESTAMP NOT NULL, + + -- Generic optimization targets (JSON for flexibility) + optimization_targets JSONB NOT NULL, + + -- Version-based update metadata + plan_created_at TIMESTAMP DEFAULT NOW(), + plan_updated_at TIMESTAMP DEFAULT NOW(), + plan_version INTEGER DEFAULT 1, + optimization_run_id INTEGER, + plan_priority INTEGER DEFAULT 1, + + -- Status tracking + plan_status VARCHAR(20) DEFAULT 'ACTIVE', + superseded_by INTEGER, + + FOREIGN KEY (station_id, resource_id) REFERENCES pumps(station_id, pump_id), + FOREIGN KEY (superseded_by) REFERENCES optimization_plans(plan_id), + + -- Ensure resource_type is valid + CONSTRAINT valid_resource_type CHECK (resource_type IN ('PUMP', 'VALVE', 'BLOWER', 'COMPRESSOR', 'GATE')) +); + +COMMENT ON TABLE optimization_plans IS 'Generic optimization plans for all actuator types with version-based updates'; +COMMENT ON COLUMN optimization_plans.optimization_targets IS 'JSON structure containing actuator-specific optimization targets'; +COMMENT ON COLUMN optimization_plans.resource_type IS 'Type of actuator: PUMP, VALVE, BLOWER, COMPRESSOR, GATE'; + +-- Create indexes for generic optimization plans +CREATE INDEX idx_optimization_plans_active ON optimization_plans(station_id, resource_id, interval_start, interval_end) + WHERE plan_status = 'ACTIVE'; +CREATE INDEX idx_optimization_plans_latest ON optimization_plans(station_id, resource_id, plan_created_at DESC) + WHERE plan_status = 'ACTIVE'; +CREATE INDEX idx_optimization_plans_type ON optimization_plans(resource_type, plan_status); -- Create pump_feedback table CREATE TABLE pump_feedback ( diff --git a/database/test_data.sql b/database/test_data.sql index 6b7f547..dec5c25 100644 --- a/database/test_data.sql +++ b/database/test_data.sql @@ -69,24 +69,44 @@ INSERT INTO pump_safety_limits ( ('STATION_003', 'PUMP_002', 20.0, 50.0, 0.8, 3.8, 4.2, 0.6, 60.0, 300.0, 4.0, 'system_admin', 'safety_engineer', NOW()); --- Insert sample pump plans (current and future intervals) +-- Insert sample pump plans with version-based updates INSERT INTO pump_plans ( station_id, pump_id, interval_start, interval_end, target_flow_m3h, target_power_kw, target_level_m, suggested_speed_hz, - optimization_run_id + optimization_run_id, plan_version, plan_status ) VALUES --- Current plans for all pumps -('STATION_001', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 320.5, 65.2, 2.5, 42.3, 1001), -('STATION_001', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 315.8, 63.8, 2.5, 41.7, 1001), -('STATION_002', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 180.2, 32.5, 1.8, 35.2, 1001), -('STATION_002', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 175.8, 31.8, 1.8, 34.8, 1001), -('STATION_003', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 245.6, 48.3, 2.2, 38.5, 1001), -('STATION_003', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 242.1, 47.6, 2.2, 38.1, 1001), +-- Initial plans for all pumps (version 1) +('STATION_001', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 320.5, 65.2, 2.5, 42.3, 1001, 1, 'ACTIVE'), +('STATION_001', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 315.8, 63.8, 2.5, 41.7, 1001, 1, 'ACTIVE'), +('STATION_002', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 180.2, 32.5, 1.8, 35.2, 1001, 1, 'ACTIVE'), +('STATION_002', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 175.8, 31.8, 1.8, 34.8, 1001, 1, 'ACTIVE'), +('STATION_003', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 245.6, 48.3, 2.2, 38.5, 1001, 1, 'ACTIVE'), +('STATION_003', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 242.1, 47.6, 2.2, 38.1, 1001, 1, 'ACTIVE'), -- Future plans -('STATION_001', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 335.2, 68.1, 2.6, 43.5, 1001), -('STATION_002', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 185.6, 33.8, 1.9, 36.2, 1001), -('STATION_003', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 252.3, 49.8, 2.3, 39.2, 1001); +('STATION_001', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 335.2, 68.1, 2.6, 43.5, 1001, 1, 'ACTIVE'), +('STATION_002', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 185.6, 33.8, 1.9, 36.2, 1001, 1, 'ACTIVE'), +('STATION_003', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 252.3, 49.8, 2.3, 39.2, 1001, 1, 'ACTIVE'); + +-- Insert sample optimization_plans for demonstration +INSERT INTO optimization_plans ( + station_id, resource_id, resource_type, interval_start, interval_end, + optimization_targets, optimization_run_id, plan_version +) VALUES +-- Example pump optimization plan with JSON targets +('STATION_001', 'PUMP_001', 'PUMP', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', + '{"target_type": "SPEED", "target_value": 42.3, "secondary_targets": {"power_kw": 65.2, "flow_m3h": 320.5, "level_m": 2.5}, "constraints": {"max_rate_of_change": 5.0, "deadband": 0.2}}'::JSONB, + 1001, 1), + +-- Example valve optimization plan +('STATION_001', 'VALVE_001', 'VALVE', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', + '{"target_type": "POSITION", "target_value": 75.0, "secondary_targets": {"flow_m3h": 200.0, "pressure_bar": 1.5}, "constraints": {"max_rate_of_change": 10.0, "deadband": 1.0}}'::JSONB, + 1001, 1), + +-- Example blower optimization plan +('STATION_002', 'BLOWER_001', 'BLOWER', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', + '{"target_type": "SPEED", "target_value": 35.0, "secondary_targets": {"power_kw": 25.0, "pressure_bar": 1.2}, "constraints": {"max_rate_of_change": 3.0, "deadband": 0.5}}'::JSONB, + 1001, 1); -- Insert sample feedback data INSERT INTO pump_feedback ( diff --git a/docs/optimization_plan_management.md b/docs/optimization_plan_management.md new file mode 100644 index 0000000..14f51d9 --- /dev/null +++ b/docs/optimization_plan_management.md @@ -0,0 +1,296 @@ +# Optimization Plan Management + +## Overview + +The Calejo Control Adapter implements a sophisticated version-based optimization plan management system that enables real-time synchronization between optimization intervals and control execution. This system supports both pump-specific plans and generic actuator optimization plans. + +## Database Schema + +### Version-Based Pump Plans (`pump_plans`) + +```sql +CREATE TABLE pump_plans ( + plan_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + pump_id VARCHAR(50) NOT NULL, + interval_start TIMESTAMP NOT NULL, + interval_end TIMESTAMP NOT NULL, + + -- Optimization outputs + target_flow_m3h DECIMAL(10, 2), + target_power_kw DECIMAL(10, 2), + target_level_m DECIMAL(5, 2), + suggested_speed_hz DECIMAL(5, 2), + + -- Version-based update metadata + plan_created_at TIMESTAMP DEFAULT NOW(), + plan_updated_at TIMESTAMP DEFAULT NOW(), + plan_version INTEGER DEFAULT 1, + optimization_run_id INTEGER, + + -- Status tracking + plan_status VARCHAR(20) DEFAULT 'ACTIVE', -- 'ACTIVE', 'SUPERSEDED', 'CANCELLED' + superseded_by INTEGER, -- Points to plan_id that superseded this plan + + FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id), + FOREIGN KEY (superseded_by) REFERENCES pump_plans(plan_id) +); +``` + +### Generic Optimization Plans (`optimization_plans`) + +```sql +CREATE TABLE optimization_plans ( + plan_id SERIAL PRIMARY KEY, + station_id VARCHAR(50) NOT NULL, + resource_id VARCHAR(50) NOT NULL, + resource_type VARCHAR(20) NOT NULL DEFAULT 'PUMP', -- 'PUMP', 'VALVE', 'BLOWER', etc. + + interval_start TIMESTAMP NOT NULL, + interval_end TIMESTAMP NOT NULL, + + -- Generic optimization targets (JSON for flexibility) + optimization_targets JSONB NOT NULL, + + -- Version-based update metadata + plan_created_at TIMESTAMP DEFAULT NOW(), + plan_updated_at TIMESTAMP DEFAULT NOW(), + plan_version INTEGER DEFAULT 1, + optimization_run_id INTEGER, + plan_priority INTEGER DEFAULT 1, + + -- Status tracking + plan_status VARCHAR(20) DEFAULT 'ACTIVE', + superseded_by INTEGER, + + FOREIGN KEY (station_id, resource_id) REFERENCES pumps(station_id, pump_id), + FOREIGN KEY (superseded_by) REFERENCES optimization_plans(plan_id), + + -- Ensure resource_type is valid + CONSTRAINT valid_resource_type CHECK (resource_type IN ('PUMP', 'VALVE', 'BLOWER', 'COMPRESSOR', 'GATE')) +); +``` + +## Version-Based Update Strategy (Strategy B) + +### Core Principles + +1. **Immutable Plan History**: Each plan update creates a new version while preserving the old version +2. **Status Tracking**: Plans can be `ACTIVE`, `SUPERSEDED`, or `CANCELLED` +3. **Audit Trail**: Complete history of all plan changes with timestamps +4. **Real-time Synchronization**: Support for optimization systems that update more frequently than control intervals + +### Update Process + +```python +# Example update sequence +plan_id = 123 +updates = { + 'target_flow_m3h': 325.0, + 'target_power_kw': 66.5, + 'suggested_speed_hz': 42.8 +} + +# This creates a new version and marks the old one as SUPERSEDED +success = db_client.update_pump_plan(plan_id, updates) +``` + +### Query Strategy + +```sql +-- Get latest active plans for current time interval +SELECT DISTINCT ON (station_id, pump_id) + station_id, pump_id, target_flow_m3h, target_power_kw, + target_level_m, suggested_speed_hz, interval_start, interval_end, + plan_version, plan_created_at, plan_updated_at +FROM pump_plans +WHERE interval_start <= NOW() AND interval_end >= NOW() + AND plan_status = 'ACTIVE' +ORDER BY station_id, pump_id, plan_version DESC; +``` + +## Optimization Targets JSON Structure + +### Pump Optimization Targets + +```json +{ + "target_type": "SPEED", + "target_value": 42.3, + "secondary_targets": { + "power_kw": 65.2, + "flow_m3h": 320.5, + "level_m": 2.5 + }, + "constraints": { + "max_rate_of_change": 5.0, + "deadband": 0.2 + } +} +``` + +### Valve Optimization Targets + +```json +{ + "target_type": "POSITION", + "target_value": 75.0, + "secondary_targets": { + "flow_m3h": 200.0, + "pressure_bar": 1.5 + }, + "constraints": { + "max_rate_of_change": 10.0, + "deadband": 1.0 + } +} +``` + +### Blower Optimization Targets + +```json +{ + "target_type": "SPEED", + "target_value": 35.0, + "secondary_targets": { + "power_kw": 25.0, + "pressure_bar": 1.2 + }, + "constraints": { + "max_rate_of_change": 3.0, + "deadband": 0.5 + } +} +``` + +## Real-Time Plan Management + +### OptimizationPlanManager Class + +The `OptimizationPlanManager` provides real-time monitoring and synchronization: + +```python +# Initialize manager +manager = OptimizationPlanManager( + db_client=db_client, + refresh_interval_seconds=30 +) + +# Start monitoring +await manager.start_monitoring() + +# Register for plan updates +manager.register_plan_update_callback(handle_plan_update) + +# Get current plans +current_plan = manager.get_current_pump_plan('STATION_001', 'PUMP_001') +``` + +### Plan Update Detection + +The manager automatically detects plan updates by: + +1. **Periodic Refresh**: Configurable refresh interval (default: 30 seconds) +2. **Version Comparison**: Compares plan versions to detect updates +3. **Callback Notification**: Notifies registered callbacks of plan changes +4. **Status Tracking**: Maintains cache of current active plans + +### Update Types + +- `NEW_PUMP_PLAN`: New plan detected for a pump +- `PUMP_PLAN_UPDATE`: Version update for existing pump plan +- `NEW_GENERIC_PLAN`: New plan for generic actuator +- `GENERIC_PLAN_UPDATE`: Version update for generic actuator plan + +## Integration with Control System + +### Safety Enforcement Integration + +```python +# Get current optimization plan +current_plan = optimization_manager.get_current_pump_plan(station_id, pump_id) + +if current_plan: + # Extract suggested speed + suggested_speed = current_plan['suggested_speed_hz'] + + # Apply safety enforcement + enforced_speed, violations = safety_enforcer.enforce_setpoint( + station_id, pump_id, suggested_speed + ) + + # Send to control system + await send_to_scada(station_id, pump_id, enforced_speed) +``` + +### Real-Time Plan Updates + +```python +async def handle_plan_update(updates): + """Handle real-time optimization plan updates.""" + for update in updates: + if update['type'] == 'PUMP_PLAN_UPDATE': + logger.info( + "pump_plan_updated_realtime", + station_id=update['station_id'], + pump_id=update['pump_id'], + old_version=update['old_version'], + new_version=update['new_version'] + ) + + # Immediately apply updated plan + await apply_updated_plan(update['plan']) +``` + +## Configuration + +### Settings + +```python +# Auto-discovery settings +auto_discovery_enabled: bool = True +auto_discovery_refresh_minutes: int = 60 + +# Optimization plan management settings +optimization_monitoring_enabled: bool = True +optimization_refresh_seconds: int = 30 +``` + +## Benefits + +### For Optimization System + +1. **Flexible Update Frequency**: Optimization can update plans more frequently than control intervals +2. **Audit Trail**: Complete history of all optimization decisions +3. **Rollback Capability**: Can revert to previous plan versions if needed +4. **Multi-Actuator Support**: Unified system for pumps, valves, blowers, etc. + +### For Control System + +1. **Real-time Synchronization**: Immediate application of updated optimization plans +2. **Safety Integration**: All optimization targets pass through safety enforcement +3. **Status Monitoring**: Real-time visibility into optimization plan status +4. **Error Recovery**: Automatic handling of optimization system failures + +### For Operations + +1. **Transparency**: Complete visibility into optimization decisions and changes +2. **Compliance**: Audit trail for regulatory requirements +3. **Troubleshooting**: Historical data for performance analysis +4. **Flexibility**: Support for various optimization strategies and intervals + +## Extensibility + +The system is designed to support future extensions: + +1. **Additional Actuator Types**: Easy to add new resource types +2. **Advanced Optimization Strategies**: Support for complex multi-actuator coordination +3. **Machine Learning Integration**: Real-time model updates and predictions +4. **Distributed Optimization**: Support for multiple optimization engines + +## Performance Considerations + +1. **Indexed Queries**: Efficient retrieval of current plans +2. **Caching**: In-memory cache of active plans for fast access +3. **Partial Indexes**: Only index active plans for better performance +4. **Connection Pooling**: Efficient database access patterns \ No newline at end of file diff --git a/src/core/optimization_manager.py b/src/core/optimization_manager.py new file mode 100644 index 0000000..1520982 --- /dev/null +++ b/src/core/optimization_manager.py @@ -0,0 +1,214 @@ +""" +Optimization Plan Manager for Calejo Control Adapter. + +Manages version-based optimization plan updates and real-time synchronization +between optimization system and control execution. +""" + +from typing import Dict, List, Optional, Callable, Any +import asyncio +import structlog +from datetime import datetime, timedelta + +from src.database.client import DatabaseClient + +logger = structlog.get_logger() + + +class OptimizationPlanManager: + """ + Manages optimization plans with version-based updates. + + Supports real-time synchronization between optimization intervals + and control execution with audit trail of all plan changes. + """ + + def __init__(self, db_client: DatabaseClient, refresh_interval_seconds: int = 30): + self.db_client = db_client + self.refresh_interval_seconds = refresh_interval_seconds + self.active_pump_plans: Dict[tuple, Dict] = {} # (station_id, pump_id) -> plan + self.active_generic_plans: Dict[tuple, Dict] = {} # (station_id, resource_id) -> plan + self.plan_update_callbacks: List[Callable] = [] + self.running = False + self.last_refresh = None + + async def start_monitoring(self): + """Start monitoring for optimization plan updates.""" + self.running = True + logger.info("optimization_plan_monitoring_started") + + # Initial load + await self.refresh_plans() + + # Start periodic refresh + asyncio.create_task(self._periodic_refresh()) + + async def stop_monitoring(self): + """Stop monitoring for optimization plan updates.""" + self.running = False + logger.info("optimization_plan_monitoring_stopped") + + async def _periodic_refresh(self): + """Periodically refresh optimization plans.""" + while self.running: + try: + await asyncio.sleep(self.refresh_interval_seconds) + await self.refresh_plans() + except Exception as e: + logger.error("periodic_refresh_failed", error=str(e)) + + async def refresh_plans(self): + """Refresh all optimization plans from database.""" + try: + # Get latest pump plans + pump_plans = self.db_client.get_latest_pump_plans() + + # Get latest generic optimization plans + generic_plans = self.db_client.get_generic_optimization_plans() + + # Check for updates + pump_updates = self._detect_pump_plan_updates(pump_plans) + generic_updates = self._detect_generic_plan_updates(generic_plans) + + # Update caches + self.active_pump_plans.clear() + for plan in pump_plans: + key = (plan['station_id'], plan['pump_id']) + self.active_pump_plans[key] = plan + + self.active_generic_plans.clear() + for plan in generic_plans: + key = (plan['station_id'], plan['resource_id']) + self.active_generic_plans[key] = plan + + # Notify callbacks of updates + if pump_updates or generic_updates: + await self._notify_plan_updates(pump_updates, generic_updates) + + self.last_refresh = datetime.now() + logger.info( + "optimization_plans_refreshed", + pump_plans=len(pump_plans), + generic_plans=len(generic_plans), + updates_detected=len(pump_updates) + len(generic_updates) + ) + + except Exception as e: + logger.error("optimization_plans_refresh_failed", error=str(e)) + + def _detect_pump_plan_updates(self, new_plans: List[Dict]) -> List[Dict]: + """Detect pump plan updates by comparing with current cache.""" + updates = [] + + for plan in new_plans: + key = (plan['station_id'], plan['pump_id']) + current_plan = self.active_pump_plans.get(key) + + if not current_plan: + # New plan + updates.append({ + 'type': 'NEW_PUMP_PLAN', + 'station_id': plan['station_id'], + 'pump_id': plan['pump_id'], + 'plan': plan + }) + elif current_plan['plan_version'] < plan['plan_version']: + # Version update + updates.append({ + 'type': 'PUMP_PLAN_UPDATE', + 'station_id': plan['station_id'], + 'pump_id': plan['pump_id'], + 'old_version': current_plan['plan_version'], + 'new_version': plan['plan_version'], + 'plan': plan + }) + + return updates + + def _detect_generic_plan_updates(self, new_plans: List[Dict]) -> List[Dict]: + """Detect generic plan updates by comparing with current cache.""" + updates = [] + + for plan in new_plans: + key = (plan['station_id'], plan['resource_id']) + current_plan = self.active_generic_plans.get(key) + + if not current_plan: + # New plan + updates.append({ + 'type': 'NEW_GENERIC_PLAN', + 'station_id': plan['station_id'], + 'resource_id': plan['resource_id'], + 'resource_type': plan['resource_type'], + 'plan': plan + }) + elif current_plan['plan_version'] < plan['plan_version']: + # Version update + updates.append({ + 'type': 'GENERIC_PLAN_UPDATE', + 'station_id': plan['station_id'], + 'resource_id': plan['resource_id'], + 'resource_type': plan['resource_type'], + 'old_version': current_plan['plan_version'], + 'new_version': plan['plan_version'], + 'plan': plan + }) + + return updates + + async def _notify_plan_updates(self, pump_updates: List[Dict], generic_updates: List[Dict]): + """Notify registered callbacks of plan updates.""" + all_updates = pump_updates + generic_updates + + for callback in self.plan_update_callbacks: + try: + await callback(all_updates) + except Exception as e: + logger.error("plan_update_callback_failed", callback=str(callback), error=str(e)) + + def register_plan_update_callback(self, callback: Callable): + """Register a callback for plan update notifications.""" + self.plan_update_callbacks.append(callback) + logger.info("plan_update_callback_registered", callback=str(callback)) + + def get_current_pump_plan(self, station_id: str, pump_id: str) -> Optional[Dict]: + """Get current optimization plan for a specific pump.""" + key = (station_id, pump_id) + return self.active_pump_plans.get(key) + + def get_current_generic_plan(self, station_id: str, resource_id: str) -> Optional[Dict]: + """Get current optimization plan for a specific resource.""" + key = (station_id, resource_id) + return self.active_generic_plans.get(key) + + def get_all_pump_plans(self) -> List[Dict]: + """Get all current pump optimization plans.""" + return list(self.active_pump_plans.values()) + + def get_all_generic_plans(self) -> List[Dict]: + """Get all current generic optimization plans.""" + return list(self.active_generic_plans.values()) + + def get_plan_history(self, station_id: str, resource_id: str, resource_type: str = 'PUMP') -> List[Dict]: + """Get plan history for a specific resource.""" + if resource_type == 'PUMP': + return self.db_client.get_pump_plan_history(station_id, resource_id) + else: + # For now, only pump history is implemented + logger.warning("plan_history_not_implemented", resource_type=resource_type) + return [] + + def update_pump_plan(self, plan_id: int, updates: Dict[str, Any]) -> bool: + """Update a pump plan with version increment.""" + return self.db_client.update_pump_plan(plan_id, updates) + + def get_status(self) -> Dict[str, Any]: + """Get optimization manager status.""" + return { + 'running': self.running, + 'last_refresh': self.last_refresh.isoformat() if self.last_refresh else None, + 'refresh_interval_seconds': self.refresh_interval_seconds, + 'active_pump_plans': len(self.active_pump_plans), + 'active_generic_plans': len(self.active_generic_plans), + 'registered_callbacks': len(self.plan_update_callbacks) + } \ No newline at end of file diff --git a/src/database/client.py b/src/database/client.py index a17011c..63c345a 100644 --- a/src/database/client.py +++ b/src/database/client.py @@ -147,13 +147,103 @@ class DatabaseClient: query = """ SELECT DISTINCT ON (station_id, pump_id) station_id, pump_id, target_flow_m3h, target_power_kw, - target_level_m, suggested_speed_hz, interval_start, interval_end + target_level_m, suggested_speed_hz, interval_start, interval_end, + plan_version, plan_created_at, plan_updated_at FROM pump_plans WHERE interval_start <= NOW() AND interval_end >= NOW() - ORDER BY station_id, pump_id, plan_created_at DESC + AND plan_status = 'ACTIVE' + ORDER BY station_id, pump_id, plan_version DESC """ return self.execute_query(query) + def get_pump_plan_history(self, station_id: str, pump_id: str, limit: int = 10) -> List[Dict[str, Any]]: + """Get plan history for a specific pump.""" + query = """ + SELECT plan_id, target_flow_m3h, target_power_kw, target_level_m, + suggested_speed_hz, interval_start, interval_end, plan_version, + plan_status, plan_created_at, plan_updated_at, optimization_run_id + FROM pump_plans + WHERE station_id = %s AND pump_id = %s + ORDER BY plan_version DESC + LIMIT %s + """ + return self.execute_query(query, (station_id, pump_id, limit)) + + def update_pump_plan(self, plan_id: int, updates: Dict[str, Any]) -> bool: + """Update an existing pump plan with version increment.""" + try: + # Get current plan + current_query = """ + SELECT plan_version, station_id, pump_id, interval_start, interval_end + FROM pump_plans + WHERE plan_id = %s + """ + current_plan = self.execute_query(current_query, (plan_id,)) + + if not current_plan: + logger.error("plan_not_found", plan_id=plan_id) + return False + + current = current_plan[0] + new_version = current['plan_version'] + 1 + + # Update existing plan to SUPERSEDED status + supersede_query = """ + UPDATE pump_plans + SET plan_status = 'SUPERSEDED', + superseded_by = %s, + plan_updated_at = NOW() + WHERE plan_id = %s + """ + + # Create new version of the plan + insert_query = """ + INSERT INTO pump_plans ( + station_id, pump_id, interval_start, interval_end, + target_flow_m3h, target_power_kw, target_level_m, suggested_speed_hz, + optimization_run_id, plan_version, plan_status, superseded_by + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'ACTIVE', %s) + """ + + # Execute in transaction + with self._get_cursor() as cursor: + # Mark old plan as superseded + cursor.execute(supersede_query, (plan_id, plan_id)) + + # Insert new version + cursor.execute(insert_query, ( + current['station_id'], current['pump_id'], + current['interval_start'], current['interval_end'], + updates.get('target_flow_m3h'), + updates.get('target_power_kw'), + updates.get('target_level_m'), + updates.get('suggested_speed_hz'), + updates.get('optimization_run_id'), + new_version, + plan_id + )) + + logger.info("pump_plan_updated", plan_id=plan_id, new_version=new_version) + return True + + except Exception as e: + logger.error("pump_plan_update_failed", plan_id=plan_id, error=str(e)) + return False + + def get_generic_optimization_plans(self, resource_type: str = 'PUMP') -> List[Dict[str, Any]]: + """Get latest optimization plans for all resource types.""" + query = """ + SELECT DISTINCT ON (station_id, resource_id) + station_id, resource_id, resource_type, interval_start, interval_end, + optimization_targets, plan_version, plan_created_at, plan_updated_at + FROM optimization_plans + WHERE interval_start <= NOW() AND interval_end >= NOW() + AND plan_status = 'ACTIVE' + AND resource_type = %s + ORDER BY station_id, resource_id, plan_version DESC + """ + return self.execute_query(query, (resource_type,)) + def get_pump_feedback(self, station_id: str, pump_id: str, limit: int = 10) -> List[Dict[str, Any]]: """Get recent feedback for a specific pump.""" query = """ diff --git a/src/main_phase1.py b/src/main_phase1.py index cbc1c13..0dafdb0 100644 --- a/src/main_phase1.py +++ b/src/main_phase1.py @@ -16,6 +16,7 @@ import sys from src.database.client import DatabaseClient from src.core.auto_discovery import AutoDiscovery from src.core.safety import SafetyLimitEnforcer +from src.core.optimization_manager import OptimizationPlanManager from src.core.logging import setup_logging, AuditLogger from config.settings import settings @@ -38,6 +39,10 @@ class CalejoControlAdapterPhase1: refresh_interval_minutes=settings.auto_discovery_refresh_minutes ) self.safety_enforcer = SafetyLimitEnforcer(self.db_client) + self.optimization_manager = OptimizationPlanManager( + db_client=self.db_client, + refresh_interval_seconds=settings.optimization_refresh_seconds + ) self.audit_logger = AuditLogger(self.db_client) self.running = False @@ -76,6 +81,10 @@ class CalejoControlAdapterPhase1: # Initialize safety framework await self.safety_enforcer.load_safety_limits() + # Initialize optimization plan monitoring + if settings.optimization_monitoring_enabled: + await self.optimization_manager.start_monitoring() + # Log startup to audit trail self.audit_logger.log( event_type='SYSTEM_STARTUP', @@ -87,7 +96,7 @@ class CalejoControlAdapterPhase1: event_data={ 'version': settings.app_version, 'phase': '1', - 'components_initialized': ['database', 'auto_discovery', 'safety'] + 'components_initialized': ['database', 'auto_discovery', 'safety', 'optimization'] } ) @@ -99,7 +108,8 @@ class CalejoControlAdapterPhase1: startup_duration_seconds=round(startup_duration, 2), station_count=len(self.auto_discovery.get_stations()), pump_count=len(self.auto_discovery.get_pumps()), - safety_limits_loaded=len(self.safety_enforcer.safety_limits_cache) + safety_limits_loaded=len(self.safety_enforcer.safety_limits_cache), + optimization_plans_loaded=len(self.optimization_manager.get_all_pump_plans()) ) # Print status information @@ -129,6 +139,9 @@ class CalejoControlAdapterPhase1: result='SUCCESS' ) + # Stop optimization monitoring + await self.optimization_manager.stop_monitoring() + # Disconnect from database await self.db_client.disconnect() @@ -143,7 +156,8 @@ class CalejoControlAdapterPhase1: "environment": settings.environment, "database": self.db_client.get_connection_stats(), "auto_discovery": self.auto_discovery.get_discovery_status(), - "safety_limits_loaded": len(self.safety_enforcer.safety_limits_cache) + "safety_limits_loaded": len(self.safety_enforcer.safety_limits_cache), + "optimization_manager": self.optimization_manager.get_status() } def print_status(self): @@ -175,6 +189,14 @@ class CalejoControlAdapterPhase1: print(f"\nSafety Framework:") print(f" Limits Loaded: {status['safety_limits_loaded']}") + # Optimization manager status + opt_status = status['optimization_manager'] + print(f"\nOptimization Manager:") + print(f" Status: {'RUNNING' if opt_status['running'] else 'STOPPED'}") + print(f" Pump Plans: {opt_status['active_pump_plans']}") + print(f" Generic Plans: {opt_status['active_generic_plans']}") + print(f" Last Refresh: {opt_status['last_refresh'] or 'Never'}") + print("="*60) print("\nPress Ctrl+C to stop the application\n")