Extend optimization system with version-based updates

- Added version-based optimization plan management with Strategy B approach
- Extended database schema with plan versioning and status tracking
- Created generic optimization_plans table for multi-actuator support
- Implemented OptimizationPlanManager for real-time plan monitoring
- Added comprehensive documentation for optimization plan management
- Updated main application to include optimization manager
- All 66 unit tests continue to pass

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
openhands 2025-10-27 07:00:57 +00:00
parent 0b28253927
commit 1bb98a7a3b
8 changed files with 729 additions and 23 deletions

BIN
.coverage Normal file

Binary file not shown.

View File

@ -90,6 +90,14 @@ class Settings(BaseSettings):
app_version: str = "2.0.0"
environment: str = "development"
# Auto-discovery settings
auto_discovery_enabled: bool = True
auto_discovery_refresh_minutes: int = 60
# Optimization plan management settings
optimization_monitoring_enabled: bool = True
optimization_refresh_seconds: int = 30
@property
def database_url(self) -> str:
"""Generate database URL from components."""

View File

@ -60,7 +60,7 @@ COMMENT ON TABLE pumps IS 'Metadata about individual pumps';
COMMENT ON COLUMN pumps.default_setpoint_hz IS 'Default safe setpoint used in failsafe mode (existing pump configuration)';
COMMENT ON COLUMN pumps.control_parameters IS 'Control-specific parameters in JSON format (PID gains, thresholds, etc.)';
-- Create pump_plans table
-- Create pump_plans table with version-based updates
CREATE TABLE pump_plans (
plan_id SERIAL PRIMARY KEY,
station_id VARCHAR(50) NOT NULL,
@ -74,17 +74,73 @@ CREATE TABLE pump_plans (
target_level_m DECIMAL(5, 2),
suggested_speed_hz DECIMAL(5, 2),
-- Metadata
-- Version-based update metadata
plan_created_at TIMESTAMP DEFAULT NOW(),
plan_updated_at TIMESTAMP DEFAULT NOW(),
plan_version INTEGER DEFAULT 1,
optimization_run_id INTEGER,
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id)
-- Status tracking
plan_status VARCHAR(20) DEFAULT 'ACTIVE', -- 'ACTIVE', 'SUPERSEDED', 'CANCELLED'
superseded_by INTEGER, -- Points to plan_id that superseded this plan
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id),
FOREIGN KEY (superseded_by) REFERENCES pump_plans(plan_id)
);
COMMENT ON TABLE pump_plans IS 'Optimization plans generated by Calejo Optimize';
COMMENT ON TABLE pump_plans IS 'Optimization plans generated by Calejo Optimize with version-based updates';
COMMENT ON COLUMN pump_plans.plan_version IS 'Version number for tracking updates (increments on each update)';
COMMENT ON COLUMN pump_plans.plan_status IS 'Status of the plan: ACTIVE, SUPERSEDED, or CANCELLED';
COMMENT ON COLUMN pump_plans.superseded_by IS 'Reference to plan that superseded this one (for audit trail)';
-- Create index for active pump plans
CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end);
-- Create indexes for efficient plan management
CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end)
WHERE plan_status = 'ACTIVE';
CREATE INDEX idx_pump_plans_latest ON pump_plans(station_id, pump_id, plan_created_at DESC)
WHERE plan_status = 'ACTIVE';
CREATE INDEX idx_pump_plans_version ON pump_plans(station_id, pump_id, plan_version DESC);
-- Create generic optimization_plans table for future actuator support
CREATE TABLE optimization_plans (
plan_id SERIAL PRIMARY KEY,
station_id VARCHAR(50) NOT NULL,
resource_id VARCHAR(50) NOT NULL,
resource_type VARCHAR(20) NOT NULL DEFAULT 'PUMP', -- 'PUMP', 'VALVE', 'BLOWER', etc.
interval_start TIMESTAMP NOT NULL,
interval_end TIMESTAMP NOT NULL,
-- Generic optimization targets (JSON for flexibility)
optimization_targets JSONB NOT NULL,
-- Version-based update metadata
plan_created_at TIMESTAMP DEFAULT NOW(),
plan_updated_at TIMESTAMP DEFAULT NOW(),
plan_version INTEGER DEFAULT 1,
optimization_run_id INTEGER,
plan_priority INTEGER DEFAULT 1,
-- Status tracking
plan_status VARCHAR(20) DEFAULT 'ACTIVE',
superseded_by INTEGER,
FOREIGN KEY (station_id, resource_id) REFERENCES pumps(station_id, pump_id),
FOREIGN KEY (superseded_by) REFERENCES optimization_plans(plan_id),
-- Ensure resource_type is valid
CONSTRAINT valid_resource_type CHECK (resource_type IN ('PUMP', 'VALVE', 'BLOWER', 'COMPRESSOR', 'GATE'))
);
COMMENT ON TABLE optimization_plans IS 'Generic optimization plans for all actuator types with version-based updates';
COMMENT ON COLUMN optimization_plans.optimization_targets IS 'JSON structure containing actuator-specific optimization targets';
COMMENT ON COLUMN optimization_plans.resource_type IS 'Type of actuator: PUMP, VALVE, BLOWER, COMPRESSOR, GATE';
-- Create indexes for generic optimization plans
CREATE INDEX idx_optimization_plans_active ON optimization_plans(station_id, resource_id, interval_start, interval_end)
WHERE plan_status = 'ACTIVE';
CREATE INDEX idx_optimization_plans_latest ON optimization_plans(station_id, resource_id, plan_created_at DESC)
WHERE plan_status = 'ACTIVE';
CREATE INDEX idx_optimization_plans_type ON optimization_plans(resource_type, plan_status);
-- Create pump_feedback table
CREATE TABLE pump_feedback (

View File

@ -69,24 +69,44 @@ INSERT INTO pump_safety_limits (
('STATION_003', 'PUMP_002', 20.0, 50.0, 0.8, 3.8, 4.2, 0.6, 60.0, 300.0, 4.0,
'system_admin', 'safety_engineer', NOW());
-- Insert sample pump plans (current and future intervals)
-- Insert sample pump plans with version-based updates
INSERT INTO pump_plans (
station_id, pump_id, interval_start, interval_end,
target_flow_m3h, target_power_kw, target_level_m, suggested_speed_hz,
optimization_run_id
optimization_run_id, plan_version, plan_status
) VALUES
-- Current plans for all pumps
('STATION_001', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 320.5, 65.2, 2.5, 42.3, 1001),
('STATION_001', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 315.8, 63.8, 2.5, 41.7, 1001),
('STATION_002', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 180.2, 32.5, 1.8, 35.2, 1001),
('STATION_002', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 175.8, 31.8, 1.8, 34.8, 1001),
('STATION_003', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 245.6, 48.3, 2.2, 38.5, 1001),
('STATION_003', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 242.1, 47.6, 2.2, 38.1, 1001),
-- Initial plans for all pumps (version 1)
('STATION_001', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 320.5, 65.2, 2.5, 42.3, 1001, 1, 'ACTIVE'),
('STATION_001', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 315.8, 63.8, 2.5, 41.7, 1001, 1, 'ACTIVE'),
('STATION_002', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 180.2, 32.5, 1.8, 35.2, 1001, 1, 'ACTIVE'),
('STATION_002', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 175.8, 31.8, 1.8, 34.8, 1001, 1, 'ACTIVE'),
('STATION_003', 'PUMP_001', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 245.6, 48.3, 2.2, 38.5, 1001, 1, 'ACTIVE'),
('STATION_003', 'PUMP_002', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes', 242.1, 47.6, 2.2, 38.1, 1001, 1, 'ACTIVE'),
-- Future plans
('STATION_001', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 335.2, 68.1, 2.6, 43.5, 1001),
('STATION_002', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 185.6, 33.8, 1.9, 36.2, 1001),
('STATION_003', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 252.3, 49.8, 2.3, 39.2, 1001);
('STATION_001', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 335.2, 68.1, 2.6, 43.5, 1001, 1, 'ACTIVE'),
('STATION_002', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 185.6, 33.8, 1.9, 36.2, 1001, 1, 'ACTIVE'),
('STATION_003', 'PUMP_001', NOW() + INTERVAL '1 hour', NOW() + INTERVAL '2 hours', 252.3, 49.8, 2.3, 39.2, 1001, 1, 'ACTIVE');
-- Insert sample optimization_plans for demonstration
INSERT INTO optimization_plans (
station_id, resource_id, resource_type, interval_start, interval_end,
optimization_targets, optimization_run_id, plan_version
) VALUES
-- Example pump optimization plan with JSON targets
('STATION_001', 'PUMP_001', 'PUMP', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes',
'{"target_type": "SPEED", "target_value": 42.3, "secondary_targets": {"power_kw": 65.2, "flow_m3h": 320.5, "level_m": 2.5}, "constraints": {"max_rate_of_change": 5.0, "deadband": 0.2}}'::JSONB,
1001, 1),
-- Example valve optimization plan
('STATION_001', 'VALVE_001', 'VALVE', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes',
'{"target_type": "POSITION", "target_value": 75.0, "secondary_targets": {"flow_m3h": 200.0, "pressure_bar": 1.5}, "constraints": {"max_rate_of_change": 10.0, "deadband": 1.0}}'::JSONB,
1001, 1),
-- Example blower optimization plan
('STATION_002', 'BLOWER_001', 'BLOWER', NOW() - INTERVAL '5 minutes', NOW() + INTERVAL '55 minutes',
'{"target_type": "SPEED", "target_value": 35.0, "secondary_targets": {"power_kw": 25.0, "pressure_bar": 1.2}, "constraints": {"max_rate_of_change": 3.0, "deadband": 0.5}}'::JSONB,
1001, 1);
-- Insert sample feedback data
INSERT INTO pump_feedback (

View File

@ -0,0 +1,296 @@
# Optimization Plan Management
## Overview
The Calejo Control Adapter implements a sophisticated version-based optimization plan management system that enables real-time synchronization between optimization intervals and control execution. This system supports both pump-specific plans and generic actuator optimization plans.
## Database Schema
### Version-Based Pump Plans (`pump_plans`)
```sql
CREATE TABLE pump_plans (
plan_id SERIAL PRIMARY KEY,
station_id VARCHAR(50) NOT NULL,
pump_id VARCHAR(50) NOT NULL,
interval_start TIMESTAMP NOT NULL,
interval_end TIMESTAMP NOT NULL,
-- Optimization outputs
target_flow_m3h DECIMAL(10, 2),
target_power_kw DECIMAL(10, 2),
target_level_m DECIMAL(5, 2),
suggested_speed_hz DECIMAL(5, 2),
-- Version-based update metadata
plan_created_at TIMESTAMP DEFAULT NOW(),
plan_updated_at TIMESTAMP DEFAULT NOW(),
plan_version INTEGER DEFAULT 1,
optimization_run_id INTEGER,
-- Status tracking
plan_status VARCHAR(20) DEFAULT 'ACTIVE', -- 'ACTIVE', 'SUPERSEDED', 'CANCELLED'
superseded_by INTEGER, -- Points to plan_id that superseded this plan
FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id),
FOREIGN KEY (superseded_by) REFERENCES pump_plans(plan_id)
);
```
### Generic Optimization Plans (`optimization_plans`)
```sql
CREATE TABLE optimization_plans (
plan_id SERIAL PRIMARY KEY,
station_id VARCHAR(50) NOT NULL,
resource_id VARCHAR(50) NOT NULL,
resource_type VARCHAR(20) NOT NULL DEFAULT 'PUMP', -- 'PUMP', 'VALVE', 'BLOWER', etc.
interval_start TIMESTAMP NOT NULL,
interval_end TIMESTAMP NOT NULL,
-- Generic optimization targets (JSON for flexibility)
optimization_targets JSONB NOT NULL,
-- Version-based update metadata
plan_created_at TIMESTAMP DEFAULT NOW(),
plan_updated_at TIMESTAMP DEFAULT NOW(),
plan_version INTEGER DEFAULT 1,
optimization_run_id INTEGER,
plan_priority INTEGER DEFAULT 1,
-- Status tracking
plan_status VARCHAR(20) DEFAULT 'ACTIVE',
superseded_by INTEGER,
FOREIGN KEY (station_id, resource_id) REFERENCES pumps(station_id, pump_id),
FOREIGN KEY (superseded_by) REFERENCES optimization_plans(plan_id),
-- Ensure resource_type is valid
CONSTRAINT valid_resource_type CHECK (resource_type IN ('PUMP', 'VALVE', 'BLOWER', 'COMPRESSOR', 'GATE'))
);
```
## Version-Based Update Strategy (Strategy B)
### Core Principles
1. **Immutable Plan History**: Each plan update creates a new version while preserving the old version
2. **Status Tracking**: Plans can be `ACTIVE`, `SUPERSEDED`, or `CANCELLED`
3. **Audit Trail**: Complete history of all plan changes with timestamps
4. **Real-time Synchronization**: Support for optimization systems that update more frequently than control intervals
### Update Process
```python
# Example update sequence
plan_id = 123
updates = {
'target_flow_m3h': 325.0,
'target_power_kw': 66.5,
'suggested_speed_hz': 42.8
}
# This creates a new version and marks the old one as SUPERSEDED
success = db_client.update_pump_plan(plan_id, updates)
```
### Query Strategy
```sql
-- Get latest active plans for current time interval
SELECT DISTINCT ON (station_id, pump_id)
station_id, pump_id, target_flow_m3h, target_power_kw,
target_level_m, suggested_speed_hz, interval_start, interval_end,
plan_version, plan_created_at, plan_updated_at
FROM pump_plans
WHERE interval_start <= NOW() AND interval_end >= NOW()
AND plan_status = 'ACTIVE'
ORDER BY station_id, pump_id, plan_version DESC;
```
## Optimization Targets JSON Structure
### Pump Optimization Targets
```json
{
"target_type": "SPEED",
"target_value": 42.3,
"secondary_targets": {
"power_kw": 65.2,
"flow_m3h": 320.5,
"level_m": 2.5
},
"constraints": {
"max_rate_of_change": 5.0,
"deadband": 0.2
}
}
```
### Valve Optimization Targets
```json
{
"target_type": "POSITION",
"target_value": 75.0,
"secondary_targets": {
"flow_m3h": 200.0,
"pressure_bar": 1.5
},
"constraints": {
"max_rate_of_change": 10.0,
"deadband": 1.0
}
}
```
### Blower Optimization Targets
```json
{
"target_type": "SPEED",
"target_value": 35.0,
"secondary_targets": {
"power_kw": 25.0,
"pressure_bar": 1.2
},
"constraints": {
"max_rate_of_change": 3.0,
"deadband": 0.5
}
}
```
## Real-Time Plan Management
### OptimizationPlanManager Class
The `OptimizationPlanManager` provides real-time monitoring and synchronization:
```python
# Initialize manager
manager = OptimizationPlanManager(
db_client=db_client,
refresh_interval_seconds=30
)
# Start monitoring
await manager.start_monitoring()
# Register for plan updates
manager.register_plan_update_callback(handle_plan_update)
# Get current plans
current_plan = manager.get_current_pump_plan('STATION_001', 'PUMP_001')
```
### Plan Update Detection
The manager automatically detects plan updates by:
1. **Periodic Refresh**: Configurable refresh interval (default: 30 seconds)
2. **Version Comparison**: Compares plan versions to detect updates
3. **Callback Notification**: Notifies registered callbacks of plan changes
4. **Status Tracking**: Maintains cache of current active plans
### Update Types
- `NEW_PUMP_PLAN`: New plan detected for a pump
- `PUMP_PLAN_UPDATE`: Version update for existing pump plan
- `NEW_GENERIC_PLAN`: New plan for generic actuator
- `GENERIC_PLAN_UPDATE`: Version update for generic actuator plan
## Integration with Control System
### Safety Enforcement Integration
```python
# Get current optimization plan
current_plan = optimization_manager.get_current_pump_plan(station_id, pump_id)
if current_plan:
# Extract suggested speed
suggested_speed = current_plan['suggested_speed_hz']
# Apply safety enforcement
enforced_speed, violations = safety_enforcer.enforce_setpoint(
station_id, pump_id, suggested_speed
)
# Send to control system
await send_to_scada(station_id, pump_id, enforced_speed)
```
### Real-Time Plan Updates
```python
async def handle_plan_update(updates):
"""Handle real-time optimization plan updates."""
for update in updates:
if update['type'] == 'PUMP_PLAN_UPDATE':
logger.info(
"pump_plan_updated_realtime",
station_id=update['station_id'],
pump_id=update['pump_id'],
old_version=update['old_version'],
new_version=update['new_version']
)
# Immediately apply updated plan
await apply_updated_plan(update['plan'])
```
## Configuration
### Settings
```python
# Auto-discovery settings
auto_discovery_enabled: bool = True
auto_discovery_refresh_minutes: int = 60
# Optimization plan management settings
optimization_monitoring_enabled: bool = True
optimization_refresh_seconds: int = 30
```
## Benefits
### For Optimization System
1. **Flexible Update Frequency**: Optimization can update plans more frequently than control intervals
2. **Audit Trail**: Complete history of all optimization decisions
3. **Rollback Capability**: Can revert to previous plan versions if needed
4. **Multi-Actuator Support**: Unified system for pumps, valves, blowers, etc.
### For Control System
1. **Real-time Synchronization**: Immediate application of updated optimization plans
2. **Safety Integration**: All optimization targets pass through safety enforcement
3. **Status Monitoring**: Real-time visibility into optimization plan status
4. **Error Recovery**: Automatic handling of optimization system failures
### For Operations
1. **Transparency**: Complete visibility into optimization decisions and changes
2. **Compliance**: Audit trail for regulatory requirements
3. **Troubleshooting**: Historical data for performance analysis
4. **Flexibility**: Support for various optimization strategies and intervals
## Extensibility
The system is designed to support future extensions:
1. **Additional Actuator Types**: Easy to add new resource types
2. **Advanced Optimization Strategies**: Support for complex multi-actuator coordination
3. **Machine Learning Integration**: Real-time model updates and predictions
4. **Distributed Optimization**: Support for multiple optimization engines
## Performance Considerations
1. **Indexed Queries**: Efficient retrieval of current plans
2. **Caching**: In-memory cache of active plans for fast access
3. **Partial Indexes**: Only index active plans for better performance
4. **Connection Pooling**: Efficient database access patterns

View File

@ -0,0 +1,214 @@
"""
Optimization Plan Manager for Calejo Control Adapter.
Manages version-based optimization plan updates and real-time synchronization
between optimization system and control execution.
"""
from typing import Dict, List, Optional, Callable, Any
import asyncio
import structlog
from datetime import datetime, timedelta
from src.database.client import DatabaseClient
logger = structlog.get_logger()
class OptimizationPlanManager:
"""
Manages optimization plans with version-based updates.
Supports real-time synchronization between optimization intervals
and control execution with audit trail of all plan changes.
"""
def __init__(self, db_client: DatabaseClient, refresh_interval_seconds: int = 30):
self.db_client = db_client
self.refresh_interval_seconds = refresh_interval_seconds
self.active_pump_plans: Dict[tuple, Dict] = {} # (station_id, pump_id) -> plan
self.active_generic_plans: Dict[tuple, Dict] = {} # (station_id, resource_id) -> plan
self.plan_update_callbacks: List[Callable] = []
self.running = False
self.last_refresh = None
async def start_monitoring(self):
"""Start monitoring for optimization plan updates."""
self.running = True
logger.info("optimization_plan_monitoring_started")
# Initial load
await self.refresh_plans()
# Start periodic refresh
asyncio.create_task(self._periodic_refresh())
async def stop_monitoring(self):
"""Stop monitoring for optimization plan updates."""
self.running = False
logger.info("optimization_plan_monitoring_stopped")
async def _periodic_refresh(self):
"""Periodically refresh optimization plans."""
while self.running:
try:
await asyncio.sleep(self.refresh_interval_seconds)
await self.refresh_plans()
except Exception as e:
logger.error("periodic_refresh_failed", error=str(e))
async def refresh_plans(self):
"""Refresh all optimization plans from database."""
try:
# Get latest pump plans
pump_plans = self.db_client.get_latest_pump_plans()
# Get latest generic optimization plans
generic_plans = self.db_client.get_generic_optimization_plans()
# Check for updates
pump_updates = self._detect_pump_plan_updates(pump_plans)
generic_updates = self._detect_generic_plan_updates(generic_plans)
# Update caches
self.active_pump_plans.clear()
for plan in pump_plans:
key = (plan['station_id'], plan['pump_id'])
self.active_pump_plans[key] = plan
self.active_generic_plans.clear()
for plan in generic_plans:
key = (plan['station_id'], plan['resource_id'])
self.active_generic_plans[key] = plan
# Notify callbacks of updates
if pump_updates or generic_updates:
await self._notify_plan_updates(pump_updates, generic_updates)
self.last_refresh = datetime.now()
logger.info(
"optimization_plans_refreshed",
pump_plans=len(pump_plans),
generic_plans=len(generic_plans),
updates_detected=len(pump_updates) + len(generic_updates)
)
except Exception as e:
logger.error("optimization_plans_refresh_failed", error=str(e))
def _detect_pump_plan_updates(self, new_plans: List[Dict]) -> List[Dict]:
"""Detect pump plan updates by comparing with current cache."""
updates = []
for plan in new_plans:
key = (plan['station_id'], plan['pump_id'])
current_plan = self.active_pump_plans.get(key)
if not current_plan:
# New plan
updates.append({
'type': 'NEW_PUMP_PLAN',
'station_id': plan['station_id'],
'pump_id': plan['pump_id'],
'plan': plan
})
elif current_plan['plan_version'] < plan['plan_version']:
# Version update
updates.append({
'type': 'PUMP_PLAN_UPDATE',
'station_id': plan['station_id'],
'pump_id': plan['pump_id'],
'old_version': current_plan['plan_version'],
'new_version': plan['plan_version'],
'plan': plan
})
return updates
def _detect_generic_plan_updates(self, new_plans: List[Dict]) -> List[Dict]:
"""Detect generic plan updates by comparing with current cache."""
updates = []
for plan in new_plans:
key = (plan['station_id'], plan['resource_id'])
current_plan = self.active_generic_plans.get(key)
if not current_plan:
# New plan
updates.append({
'type': 'NEW_GENERIC_PLAN',
'station_id': plan['station_id'],
'resource_id': plan['resource_id'],
'resource_type': plan['resource_type'],
'plan': plan
})
elif current_plan['plan_version'] < plan['plan_version']:
# Version update
updates.append({
'type': 'GENERIC_PLAN_UPDATE',
'station_id': plan['station_id'],
'resource_id': plan['resource_id'],
'resource_type': plan['resource_type'],
'old_version': current_plan['plan_version'],
'new_version': plan['plan_version'],
'plan': plan
})
return updates
async def _notify_plan_updates(self, pump_updates: List[Dict], generic_updates: List[Dict]):
"""Notify registered callbacks of plan updates."""
all_updates = pump_updates + generic_updates
for callback in self.plan_update_callbacks:
try:
await callback(all_updates)
except Exception as e:
logger.error("plan_update_callback_failed", callback=str(callback), error=str(e))
def register_plan_update_callback(self, callback: Callable):
"""Register a callback for plan update notifications."""
self.plan_update_callbacks.append(callback)
logger.info("plan_update_callback_registered", callback=str(callback))
def get_current_pump_plan(self, station_id: str, pump_id: str) -> Optional[Dict]:
"""Get current optimization plan for a specific pump."""
key = (station_id, pump_id)
return self.active_pump_plans.get(key)
def get_current_generic_plan(self, station_id: str, resource_id: str) -> Optional[Dict]:
"""Get current optimization plan for a specific resource."""
key = (station_id, resource_id)
return self.active_generic_plans.get(key)
def get_all_pump_plans(self) -> List[Dict]:
"""Get all current pump optimization plans."""
return list(self.active_pump_plans.values())
def get_all_generic_plans(self) -> List[Dict]:
"""Get all current generic optimization plans."""
return list(self.active_generic_plans.values())
def get_plan_history(self, station_id: str, resource_id: str, resource_type: str = 'PUMP') -> List[Dict]:
"""Get plan history for a specific resource."""
if resource_type == 'PUMP':
return self.db_client.get_pump_plan_history(station_id, resource_id)
else:
# For now, only pump history is implemented
logger.warning("plan_history_not_implemented", resource_type=resource_type)
return []
def update_pump_plan(self, plan_id: int, updates: Dict[str, Any]) -> bool:
"""Update a pump plan with version increment."""
return self.db_client.update_pump_plan(plan_id, updates)
def get_status(self) -> Dict[str, Any]:
"""Get optimization manager status."""
return {
'running': self.running,
'last_refresh': self.last_refresh.isoformat() if self.last_refresh else None,
'refresh_interval_seconds': self.refresh_interval_seconds,
'active_pump_plans': len(self.active_pump_plans),
'active_generic_plans': len(self.active_generic_plans),
'registered_callbacks': len(self.plan_update_callbacks)
}

View File

@ -147,13 +147,103 @@ class DatabaseClient:
query = """
SELECT DISTINCT ON (station_id, pump_id)
station_id, pump_id, target_flow_m3h, target_power_kw,
target_level_m, suggested_speed_hz, interval_start, interval_end
target_level_m, suggested_speed_hz, interval_start, interval_end,
plan_version, plan_created_at, plan_updated_at
FROM pump_plans
WHERE interval_start <= NOW() AND interval_end >= NOW()
ORDER BY station_id, pump_id, plan_created_at DESC
AND plan_status = 'ACTIVE'
ORDER BY station_id, pump_id, plan_version DESC
"""
return self.execute_query(query)
def get_pump_plan_history(self, station_id: str, pump_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Get plan history for a specific pump."""
query = """
SELECT plan_id, target_flow_m3h, target_power_kw, target_level_m,
suggested_speed_hz, interval_start, interval_end, plan_version,
plan_status, plan_created_at, plan_updated_at, optimization_run_id
FROM pump_plans
WHERE station_id = %s AND pump_id = %s
ORDER BY plan_version DESC
LIMIT %s
"""
return self.execute_query(query, (station_id, pump_id, limit))
def update_pump_plan(self, plan_id: int, updates: Dict[str, Any]) -> bool:
"""Update an existing pump plan with version increment."""
try:
# Get current plan
current_query = """
SELECT plan_version, station_id, pump_id, interval_start, interval_end
FROM pump_plans
WHERE plan_id = %s
"""
current_plan = self.execute_query(current_query, (plan_id,))
if not current_plan:
logger.error("plan_not_found", plan_id=plan_id)
return False
current = current_plan[0]
new_version = current['plan_version'] + 1
# Update existing plan to SUPERSEDED status
supersede_query = """
UPDATE pump_plans
SET plan_status = 'SUPERSEDED',
superseded_by = %s,
plan_updated_at = NOW()
WHERE plan_id = %s
"""
# Create new version of the plan
insert_query = """
INSERT INTO pump_plans (
station_id, pump_id, interval_start, interval_end,
target_flow_m3h, target_power_kw, target_level_m, suggested_speed_hz,
optimization_run_id, plan_version, plan_status, superseded_by
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'ACTIVE', %s)
"""
# Execute in transaction
with self._get_cursor() as cursor:
# Mark old plan as superseded
cursor.execute(supersede_query, (plan_id, plan_id))
# Insert new version
cursor.execute(insert_query, (
current['station_id'], current['pump_id'],
current['interval_start'], current['interval_end'],
updates.get('target_flow_m3h'),
updates.get('target_power_kw'),
updates.get('target_level_m'),
updates.get('suggested_speed_hz'),
updates.get('optimization_run_id'),
new_version,
plan_id
))
logger.info("pump_plan_updated", plan_id=plan_id, new_version=new_version)
return True
except Exception as e:
logger.error("pump_plan_update_failed", plan_id=plan_id, error=str(e))
return False
def get_generic_optimization_plans(self, resource_type: str = 'PUMP') -> List[Dict[str, Any]]:
"""Get latest optimization plans for all resource types."""
query = """
SELECT DISTINCT ON (station_id, resource_id)
station_id, resource_id, resource_type, interval_start, interval_end,
optimization_targets, plan_version, plan_created_at, plan_updated_at
FROM optimization_plans
WHERE interval_start <= NOW() AND interval_end >= NOW()
AND plan_status = 'ACTIVE'
AND resource_type = %s
ORDER BY station_id, resource_id, plan_version DESC
"""
return self.execute_query(query, (resource_type,))
def get_pump_feedback(self, station_id: str, pump_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent feedback for a specific pump."""
query = """

View File

@ -16,6 +16,7 @@ import sys
from src.database.client import DatabaseClient
from src.core.auto_discovery import AutoDiscovery
from src.core.safety import SafetyLimitEnforcer
from src.core.optimization_manager import OptimizationPlanManager
from src.core.logging import setup_logging, AuditLogger
from config.settings import settings
@ -38,6 +39,10 @@ class CalejoControlAdapterPhase1:
refresh_interval_minutes=settings.auto_discovery_refresh_minutes
)
self.safety_enforcer = SafetyLimitEnforcer(self.db_client)
self.optimization_manager = OptimizationPlanManager(
db_client=self.db_client,
refresh_interval_seconds=settings.optimization_refresh_seconds
)
self.audit_logger = AuditLogger(self.db_client)
self.running = False
@ -76,6 +81,10 @@ class CalejoControlAdapterPhase1:
# Initialize safety framework
await self.safety_enforcer.load_safety_limits()
# Initialize optimization plan monitoring
if settings.optimization_monitoring_enabled:
await self.optimization_manager.start_monitoring()
# Log startup to audit trail
self.audit_logger.log(
event_type='SYSTEM_STARTUP',
@ -87,7 +96,7 @@ class CalejoControlAdapterPhase1:
event_data={
'version': settings.app_version,
'phase': '1',
'components_initialized': ['database', 'auto_discovery', 'safety']
'components_initialized': ['database', 'auto_discovery', 'safety', 'optimization']
}
)
@ -99,7 +108,8 @@ class CalejoControlAdapterPhase1:
startup_duration_seconds=round(startup_duration, 2),
station_count=len(self.auto_discovery.get_stations()),
pump_count=len(self.auto_discovery.get_pumps()),
safety_limits_loaded=len(self.safety_enforcer.safety_limits_cache)
safety_limits_loaded=len(self.safety_enforcer.safety_limits_cache),
optimization_plans_loaded=len(self.optimization_manager.get_all_pump_plans())
)
# Print status information
@ -129,6 +139,9 @@ class CalejoControlAdapterPhase1:
result='SUCCESS'
)
# Stop optimization monitoring
await self.optimization_manager.stop_monitoring()
# Disconnect from database
await self.db_client.disconnect()
@ -143,7 +156,8 @@ class CalejoControlAdapterPhase1:
"environment": settings.environment,
"database": self.db_client.get_connection_stats(),
"auto_discovery": self.auto_discovery.get_discovery_status(),
"safety_limits_loaded": len(self.safety_enforcer.safety_limits_cache)
"safety_limits_loaded": len(self.safety_enforcer.safety_limits_cache),
"optimization_manager": self.optimization_manager.get_status()
}
def print_status(self):
@ -175,6 +189,14 @@ class CalejoControlAdapterPhase1:
print(f"\nSafety Framework:")
print(f" Limits Loaded: {status['safety_limits_loaded']}")
# Optimization manager status
opt_status = status['optimization_manager']
print(f"\nOptimization Manager:")
print(f" Status: {'RUNNING' if opt_status['running'] else 'STOPPED'}")
print(f" Pump Plans: {opt_status['active_pump_plans']}")
print(f" Generic Plans: {opt_status['active_generic_plans']}")
print(f" Last Refresh: {opt_status['last_refresh'] or 'Never'}")
print("="*60)
print("\nPress Ctrl+C to stop the application\n")