Compare commits
No commits in common. "master" and "fix-discovery-service-issues" have entirely different histories.
master
...
fix-discov
|
|
@ -1,109 +0,0 @@
|
|||
# Pump Control Preprocessing Implementation Summary
|
||||
|
||||
## Overview
|
||||
Successfully implemented configurable pump control preprocessing logic for converting MPC outputs to pump actuation signals in the Calejo Control system.
|
||||
|
||||
## What Was Implemented
|
||||
|
||||
### 1. Core Pump Control Preprocessor (`src/core/pump_control_preprocessor.py`)
|
||||
- **Three configurable control logics**:
|
||||
- **MPC-Driven Adaptive Hysteresis**: Primary logic for normal operation with MPC + live level data
|
||||
- **State-Preserving MPC**: Enhanced logic to minimize pump state changes
|
||||
- **Backup Fixed-Band Control**: Fallback logic for when level sensors fail
|
||||
- **State tracking**: Maintains pump state and switch timing to prevent excessive cycling
|
||||
- **Safety integration**: Built-in safety overrides for emergency conditions
|
||||
|
||||
### 2. Integration with Existing System
|
||||
- **Extended preprocessing system**: Added `pump_control_logic` rule type to existing preprocessing framework
|
||||
- **Setpoint manager integration**: New `PumpControlPreprocessorCalculator` class for setpoint calculation
|
||||
- **Protocol mapping support**: Configurable through dashboard protocol mappings
|
||||
|
||||
### 3. Configuration Methods
|
||||
- **Protocol mapping preprocessing**: Configure via dashboard with JSON rules
|
||||
- **Pump metadata configuration**: Set control logic in pump configuration
|
||||
- **Control type selection**: Use `PUMP_CONTROL_PREPROCESSOR` control type
|
||||
|
||||
## Key Features
|
||||
|
||||
### Safety & Reliability
|
||||
- **Safety overrides**: Automatic shutdown on level limit violations
|
||||
- **Minimum switch intervals**: Prevents excessive pump cycling
|
||||
- **State preservation**: Minimizes equipment wear
|
||||
- **Fallback modes**: Graceful degradation when sensors fail
|
||||
|
||||
### Flexibility
|
||||
- **Per-pump configuration**: Different logics for different pumps
|
||||
- **Parameter tuning**: Fine-tune each logic for specific station requirements
|
||||
- **Multiple integration points**: Protocol mappings, pump config, or control type
|
||||
|
||||
### Monitoring & Logging
|
||||
- **Comprehensive logging**: Each control decision logged with reasoning
|
||||
- **Performance tracking**: Monitor pump state changes and efficiency
|
||||
- **Safety event tracking**: Record all safety overrides
|
||||
|
||||
## Files Created/Modified
|
||||
|
||||
### New Files
|
||||
- `src/core/pump_control_preprocessor.py` - Core control logic implementation
|
||||
- `docs/PUMP_CONTROL_LOGIC_CONFIGURATION.md` - Comprehensive documentation
|
||||
- `examples/pump_control_configuration.json` - Configuration examples
|
||||
- `test_pump_control_logic.py` - Test suite
|
||||
|
||||
### Modified Files
|
||||
- `src/dashboard/configuration_manager.py` - Extended preprocessing system
|
||||
- `src/core/setpoint_manager.py` - Added new calculator class
|
||||
|
||||
## Testing
|
||||
- **Unit tests**: All three control logics tested with various scenarios
|
||||
- **Integration tests**: Verified integration with configuration manager
|
||||
- **Safety tests**: Confirmed safety overrides work correctly
|
||||
- **Import tests**: Verified system integration
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Configuration via Protocol Mapping
|
||||
```json
|
||||
{
|
||||
"preprocessing_enabled": true,
|
||||
"preprocessing_rules": [
|
||||
{
|
||||
"type": "pump_control_logic",
|
||||
"parameters": {
|
||||
"logic_type": "mpc_adaptive_hysteresis",
|
||||
"control_params": {
|
||||
"safety_min_level": 0.5,
|
||||
"adaptive_buffer": 0.5
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Configuration via Pump Metadata
|
||||
```sql
|
||||
UPDATE pumps
|
||||
SET control_type = 'PUMP_CONTROL_PREPROCESSOR',
|
||||
control_parameters = '{
|
||||
"control_logic": "mpc_adaptive_hysteresis",
|
||||
"control_params": {
|
||||
"safety_min_level": 0.5,
|
||||
"adaptive_buffer": 0.5
|
||||
}
|
||||
}'
|
||||
WHERE station_id = 'station1' AND pump_id = 'pump1';
|
||||
```
|
||||
|
||||
## Benefits
|
||||
1. **Improved pump longevity** through state preservation
|
||||
2. **Better energy efficiency** by minimizing unnecessary switching
|
||||
3. **Enhanced safety** with multiple protection layers
|
||||
4. **Flexible configuration** for different operational requirements
|
||||
5. **Graceful degradation** when sensors or MPC fail
|
||||
6. **Comprehensive monitoring** for operational insights
|
||||
|
||||
## Next Steps
|
||||
- Deploy to test environment
|
||||
- Monitor performance and adjust parameters
|
||||
- Extend to other actuator types (valves, blowers)
|
||||
- Add more sophisticated control algorithms
|
||||
|
|
@ -1,185 +0,0 @@
|
|||
# Pump Control Logic Configuration
|
||||
|
||||
## Overview
|
||||
|
||||
The Calejo Control system now supports three configurable pump control logics for converting MPC outputs to pump actuation signals. These logics can be configured per pump through protocol mappings or pump configuration.
|
||||
|
||||
## Available Control Logics
|
||||
|
||||
### 1. MPC-Driven Adaptive Hysteresis (Primary)
|
||||
**Use Case**: Normal operation with MPC + live level data
|
||||
|
||||
**Logic**:
|
||||
- Converts MPC output to level thresholds for start/stop control
|
||||
- Uses current pump state to minimize switching
|
||||
- Adaptive buffer size based on expected level change rate
|
||||
|
||||
**Configuration Parameters**:
|
||||
```json
|
||||
{
|
||||
"control_logic": "mpc_adaptive_hysteresis",
|
||||
"control_params": {
|
||||
"safety_min_level": 0.5,
|
||||
"safety_max_level": 9.5,
|
||||
"adaptive_buffer": 0.5,
|
||||
"min_switch_interval": 300
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 2. State-Preserving MPC (Enhanced)
|
||||
**Use Case**: When pump wear/energy costs are primary concern
|
||||
|
||||
**Logic**:
|
||||
- Explicitly minimizes pump state changes by considering switching penalties
|
||||
- Calculates benefit vs. penalty for state changes
|
||||
- Maintains current state when penalty exceeds benefit
|
||||
|
||||
**Configuration Parameters**:
|
||||
```json
|
||||
{
|
||||
"control_logic": "state_preserving_mpc",
|
||||
"control_params": {
|
||||
"activation_threshold": 10.0,
|
||||
"deactivation_threshold": 5.0,
|
||||
"min_switch_interval": 300,
|
||||
"state_change_penalty_weight": 2.0
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3. Backup Fixed-Band Control (Fallback)
|
||||
**Use Case**: Backup when level sensor fails
|
||||
|
||||
**Logic**:
|
||||
- Uses fixed level bands based on pump station height
|
||||
- Three operation modes: "mostly_on", "mostly_off", "balanced"
|
||||
- Always active safety overrides
|
||||
|
||||
**Configuration Parameters**:
|
||||
```json
|
||||
{
|
||||
"control_logic": "backup_fixed_band",
|
||||
"control_params": {
|
||||
"pump_station_height": 10.0,
|
||||
"operation_mode": "balanced",
|
||||
"absolute_max": 9.5,
|
||||
"absolute_min": 0.5
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Configuration Methods
|
||||
|
||||
### Method 1: Protocol Mapping Preprocessing
|
||||
Configure through protocol mappings in the dashboard:
|
||||
|
||||
```json
|
||||
{
|
||||
"preprocessing_enabled": true,
|
||||
"preprocessing_rules": [
|
||||
{
|
||||
"type": "pump_control_logic",
|
||||
"parameters": {
|
||||
"logic_type": "mpc_adaptive_hysteresis",
|
||||
"control_params": {
|
||||
"safety_min_level": 0.5,
|
||||
"adaptive_buffer": 0.5
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Method 2: Pump Configuration
|
||||
Configure directly in pump metadata:
|
||||
|
||||
```sql
|
||||
UPDATE pumps
|
||||
SET control_parameters = '{
|
||||
"control_logic": "mpc_adaptive_hysteresis",
|
||||
"control_params": {
|
||||
"safety_min_level": 0.5,
|
||||
"adaptive_buffer": 0.5
|
||||
}
|
||||
}'
|
||||
WHERE station_id = 'station1' AND pump_id = 'pump1';
|
||||
```
|
||||
|
||||
### Method 3: Control Type Selection
|
||||
Set the pump's control type to use the preprocessor:
|
||||
|
||||
```sql
|
||||
UPDATE pumps
|
||||
SET control_type = 'PUMP_CONTROL_PREPROCESSOR'
|
||||
WHERE station_id = 'station1' AND pump_id = 'pump1';
|
||||
```
|
||||
|
||||
## Integration Points
|
||||
|
||||
### Setpoint Manager Integration
|
||||
The pump control preprocessor integrates with the existing Setpoint Manager:
|
||||
|
||||
1. **MPC outputs** are read from the database (pump_plans table)
|
||||
2. **Current state** is obtained from pump feedback
|
||||
3. **Control logic** is applied based on configuration
|
||||
4. **Actuation signals** are sent via protocol mappings
|
||||
|
||||
### Safety Integration
|
||||
All control logics include safety overrides:
|
||||
- Emergency stop conditions
|
||||
- Absolute level limits
|
||||
- Minimum switch intervals
|
||||
- Equipment protection
|
||||
|
||||
## Monitoring and Logging
|
||||
|
||||
Each control decision is logged with:
|
||||
- Control logic used
|
||||
- MPC input value
|
||||
- Resulting pump command
|
||||
- Reason for decision
|
||||
- Safety overrides applied
|
||||
|
||||
Example log entry:
|
||||
```json
|
||||
{
|
||||
"event": "pump_control_decision",
|
||||
"station_id": "station1",
|
||||
"pump_id": "pump1",
|
||||
"mpc_output": 45.2,
|
||||
"control_logic": "mpc_adaptive_hysteresis",
|
||||
"result_reason": "set_activation_threshold",
|
||||
"pump_command": false,
|
||||
"max_threshold": 2.5
|
||||
}
|
||||
```
|
||||
|
||||
## Testing and Validation
|
||||
|
||||
### Test Scenarios
|
||||
1. **Normal Operation**: MPC outputs with live level data
|
||||
2. **Sensor Failure**: No level signal available
|
||||
3. **State Preservation**: Verify minimal switching
|
||||
4. **Safety Overrides**: Test emergency conditions
|
||||
|
||||
### Validation Metrics
|
||||
- Pump state change frequency
|
||||
- Level control accuracy
|
||||
- Safety limit compliance
|
||||
- Energy efficiency
|
||||
|
||||
## Migration Guide
|
||||
|
||||
### From Legacy Control
|
||||
1. Identify pumps using level-based control
|
||||
2. Configure appropriate control logic
|
||||
3. Update protocol mappings if needed
|
||||
4. Monitor performance and adjust parameters
|
||||
|
||||
### Adding New Pumps
|
||||
1. Set control_type to 'PUMP_CONTROL_PREPROCESSOR'
|
||||
2. Configure control_parameters JSON
|
||||
3. Set up protocol mappings
|
||||
4. Test with sample MPC outputs
|
||||
|
|
@ -1,64 +0,0 @@
|
|||
{
|
||||
"pump_control_configuration": {
|
||||
"station1": {
|
||||
"pump1": {
|
||||
"control_type": "PUMP_CONTROL_PREPROCESSOR",
|
||||
"control_logic": "mpc_adaptive_hysteresis",
|
||||
"control_params": {
|
||||
"safety_min_level": 0.5,
|
||||
"safety_max_level": 9.5,
|
||||
"adaptive_buffer": 0.5,
|
||||
"min_switch_interval": 300
|
||||
}
|
||||
},
|
||||
"pump2": {
|
||||
"control_type": "PUMP_CONTROL_PREPROCESSOR",
|
||||
"control_logic": "state_preserving_mpc",
|
||||
"control_params": {
|
||||
"activation_threshold": 10.0,
|
||||
"deactivation_threshold": 5.0,
|
||||
"min_switch_interval": 300,
|
||||
"state_change_penalty_weight": 2.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"station2": {
|
||||
"pump1": {
|
||||
"control_type": "PUMP_CONTROL_PREPROCESSOR",
|
||||
"control_logic": "backup_fixed_band",
|
||||
"control_params": {
|
||||
"pump_station_height": 10.0,
|
||||
"operation_mode": "balanced",
|
||||
"absolute_max": 9.5,
|
||||
"absolute_min": 0.5
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"protocol_mappings_example": {
|
||||
"mappings": [
|
||||
{
|
||||
"mapping_id": "station1_pump1_setpoint",
|
||||
"station_id": "station1",
|
||||
"equipment_id": "pump1",
|
||||
"protocol_type": "modbus_tcp",
|
||||
"protocol_address": "40001",
|
||||
"data_type_id": "setpoint",
|
||||
"db_source": "pump_plans.suggested_speed_hz",
|
||||
"preprocessing_enabled": true,
|
||||
"preprocessing_rules": [
|
||||
{
|
||||
"type": "pump_control_logic",
|
||||
"parameters": {
|
||||
"logic_type": "mpc_adaptive_hysteresis",
|
||||
"control_params": {
|
||||
"safety_min_level": 0.5,
|
||||
"adaptive_buffer": 0.5
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
@ -1,385 +0,0 @@
|
|||
"""
|
||||
Pump Control Preprocessor for Calejo Control Adapter.
|
||||
|
||||
Implements three configurable control logics for converting MPC outputs to pump actuation signals:
|
||||
1. MPC-Driven Adaptive Hysteresis (Primary)
|
||||
2. State-Preserving MPC (Enhanced)
|
||||
3. Backup Fixed-Band Control (Fallback)
|
||||
"""
|
||||
|
||||
from typing import Dict, Optional, Any, Tuple
|
||||
from enum import Enum
|
||||
import structlog
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
class PumpControlLogic(Enum):
|
||||
"""Available pump control logic types"""
|
||||
MPC_ADAPTIVE_HYSTERESIS = "mpc_adaptive_hysteresis"
|
||||
STATE_PRESERVING_MPC = "state_preserving_mpc"
|
||||
BACKUP_FIXED_BAND = "backup_fixed_band"
|
||||
|
||||
|
||||
class PumpControlPreprocessor:
|
||||
"""
|
||||
Preprocessor for converting MPC outputs to pump actuation signals.
|
||||
|
||||
Supports three control logics that can be configured per pump via protocol mappings.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.pump_states: Dict[Tuple[str, str], Dict[str, Any]] = {}
|
||||
self.last_switch_times: Dict[Tuple[str, str], datetime] = {}
|
||||
|
||||
def apply_control_logic(
|
||||
self,
|
||||
station_id: str,
|
||||
pump_id: str,
|
||||
mpc_output: float, # 0-100% pump rate
|
||||
current_level: Optional[float] = None,
|
||||
current_pump_state: Optional[bool] = None,
|
||||
control_logic: PumpControlLogic = PumpControlLogic.MPC_ADAPTIVE_HYSTERESIS,
|
||||
control_params: Optional[Dict[str, Any]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Apply configured control logic to convert MPC output to pump actuation.
|
||||
|
||||
Args:
|
||||
station_id: Pump station identifier
|
||||
pump_id: Pump identifier
|
||||
mpc_output: MPC output (0-100% pump rate)
|
||||
current_level: Current level measurement (meters)
|
||||
current_pump_state: Current pump state (True=ON, False=OFF)
|
||||
control_logic: Control logic to apply
|
||||
control_params: Control-specific parameters
|
||||
|
||||
Returns:
|
||||
Dictionary with actuation signals and metadata
|
||||
"""
|
||||
|
||||
# Default parameters
|
||||
params = control_params or {}
|
||||
|
||||
# Get current state if not provided
|
||||
if current_pump_state is None:
|
||||
current_pump_state = self._get_current_pump_state(station_id, pump_id)
|
||||
|
||||
# Apply selected control logic
|
||||
if control_logic == PumpControlLogic.MPC_ADAPTIVE_HYSTERESIS:
|
||||
result = self._mpc_adaptive_hysteresis(
|
||||
station_id, pump_id, mpc_output, current_level, current_pump_state, params
|
||||
)
|
||||
elif control_logic == PumpControlLogic.STATE_PRESERVING_MPC:
|
||||
result = self._state_preserving_mpc(
|
||||
station_id, pump_id, mpc_output, current_pump_state, params
|
||||
)
|
||||
elif control_logic == PumpControlLogic.BACKUP_FIXED_BAND:
|
||||
result = self._backup_fixed_band(
|
||||
station_id, pump_id, mpc_output, current_level, params
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown control logic: {control_logic}")
|
||||
|
||||
# Update state tracking
|
||||
self._update_pump_state(station_id, pump_id, result)
|
||||
|
||||
return result
|
||||
|
||||
def _mpc_adaptive_hysteresis(
|
||||
self,
|
||||
station_id: str,
|
||||
pump_id: str,
|
||||
mpc_output: float,
|
||||
current_level: Optional[float],
|
||||
current_pump_state: bool,
|
||||
params: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Logic 1: MPC-Driven Adaptive Hysteresis
|
||||
|
||||
Converts MPC output to level thresholds for start/stop control.
|
||||
Uses current pump state to minimize switching.
|
||||
"""
|
||||
|
||||
# Extract parameters with defaults
|
||||
safety_min_level = params.get('safety_min_level', 0.5)
|
||||
safety_max_level = params.get('safety_max_level', 9.5)
|
||||
adaptive_buffer = params.get('adaptive_buffer', 0.5)
|
||||
min_switch_interval = params.get('min_switch_interval', 300) # 5 minutes
|
||||
|
||||
# Safety checks
|
||||
if current_level is not None:
|
||||
if current_level <= safety_min_level:
|
||||
return {
|
||||
'pump_command': False, # OFF
|
||||
'max_threshold': None,
|
||||
'min_threshold': None,
|
||||
'control_logic': 'mpc_adaptive_hysteresis',
|
||||
'reason': 'safety_min_level_exceeded',
|
||||
'safety_override': True
|
||||
}
|
||||
elif current_level >= safety_max_level:
|
||||
return {
|
||||
'pump_command': False, # OFF
|
||||
'max_threshold': None,
|
||||
'min_threshold': None,
|
||||
'control_logic': 'mpc_adaptive_hysteresis',
|
||||
'reason': 'safety_max_level_exceeded',
|
||||
'safety_override': True
|
||||
}
|
||||
|
||||
# MPC command interpretation
|
||||
mpc_wants_pump_on = mpc_output > 20.0 # Threshold for pump activation
|
||||
|
||||
result = {
|
||||
'pump_command': current_pump_state, # Default: maintain current state
|
||||
'max_threshold': None,
|
||||
'min_threshold': None,
|
||||
'control_logic': 'mpc_adaptive_hysteresis',
|
||||
'reason': 'maintain_current_state'
|
||||
}
|
||||
|
||||
# Check if we should change state
|
||||
if mpc_wants_pump_on and not current_pump_state:
|
||||
# MPC wants pump ON, but it's currently OFF
|
||||
if self._can_switch_pump(station_id, pump_id, min_switch_interval):
|
||||
if current_level is not None:
|
||||
result.update({
|
||||
'pump_command': False, # Still OFF, but set threshold
|
||||
'max_threshold': current_level + adaptive_buffer,
|
||||
'min_threshold': None,
|
||||
'reason': 'set_activation_threshold'
|
||||
})
|
||||
else:
|
||||
# No level signal - force ON
|
||||
result.update({
|
||||
'pump_command': True,
|
||||
'max_threshold': None,
|
||||
'min_threshold': None,
|
||||
'reason': 'force_on_no_level_signal'
|
||||
})
|
||||
|
||||
elif not mpc_wants_pump_on and current_pump_state:
|
||||
# MPC wants pump OFF, but it's currently ON
|
||||
if self._can_switch_pump(station_id, pump_id, min_switch_interval):
|
||||
if current_level is not None:
|
||||
result.update({
|
||||
'pump_command': True, # Still ON, but set threshold
|
||||
'max_threshold': None,
|
||||
'min_threshold': current_level - adaptive_buffer,
|
||||
'reason': 'set_deactivation_threshold'
|
||||
})
|
||||
else:
|
||||
# No level signal - force OFF
|
||||
result.update({
|
||||
'pump_command': False,
|
||||
'max_threshold': None,
|
||||
'min_threshold': None,
|
||||
'reason': 'force_off_no_level_signal'
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
def _state_preserving_mpc(
|
||||
self,
|
||||
station_id: str,
|
||||
pump_id: str,
|
||||
mpc_output: float,
|
||||
current_pump_state: bool,
|
||||
params: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Logic 2: State-Preserving MPC
|
||||
|
||||
Explicitly minimizes pump state changes by considering switching penalties.
|
||||
"""
|
||||
|
||||
# Extract parameters
|
||||
activation_threshold = params.get('activation_threshold', 10.0)
|
||||
deactivation_threshold = params.get('deactivation_threshold', 5.0)
|
||||
min_switch_interval = params.get('min_switch_interval', 300) # 5 minutes
|
||||
state_change_penalty_weight = params.get('state_change_penalty_weight', 2.0)
|
||||
|
||||
# MPC command interpretation
|
||||
mpc_wants_pump_on = mpc_output > activation_threshold
|
||||
mpc_wants_pump_off = mpc_output < deactivation_threshold
|
||||
|
||||
# Calculate state change penalty
|
||||
time_since_last_switch = self._get_time_since_last_switch(station_id, pump_id)
|
||||
state_change_penalty = self._calculate_state_change_penalty(
|
||||
time_since_last_switch, min_switch_interval, state_change_penalty_weight
|
||||
)
|
||||
|
||||
# Calculate benefit of switching
|
||||
benefit_of_switch = abs(mpc_output - (activation_threshold if current_pump_state else deactivation_threshold))
|
||||
|
||||
result = {
|
||||
'pump_command': current_pump_state, # Default: maintain current state
|
||||
'control_logic': 'state_preserving_mpc',
|
||||
'reason': 'maintain_current_state',
|
||||
'state_change_penalty': state_change_penalty,
|
||||
'benefit_of_switch': benefit_of_switch
|
||||
}
|
||||
|
||||
# Check if we should change state
|
||||
if mpc_wants_pump_on != current_pump_state:
|
||||
# MPC wants to change state
|
||||
if state_change_penalty < benefit_of_switch and self._can_switch_pump(station_id, pump_id, min_switch_interval):
|
||||
# Benefit justifies switch
|
||||
result.update({
|
||||
'pump_command': mpc_wants_pump_on,
|
||||
'reason': 'benefit_justifies_switch'
|
||||
})
|
||||
else:
|
||||
# Penalty too high - maintain current state
|
||||
result.update({
|
||||
'reason': 'state_change_penalty_too_high'
|
||||
})
|
||||
else:
|
||||
# MPC agrees with current state
|
||||
result.update({
|
||||
'reason': 'mpc_agrees_with_current_state'
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
def _backup_fixed_band(
|
||||
self,
|
||||
station_id: str,
|
||||
pump_id: str,
|
||||
mpc_output: float,
|
||||
current_level: Optional[float],
|
||||
params: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Logic 3: Backup Fixed-Band Control
|
||||
|
||||
Fallback logic for when no live level signal is available.
|
||||
Uses fixed level bands based on pump station height.
|
||||
"""
|
||||
|
||||
# Extract parameters
|
||||
pump_station_height = params.get('pump_station_height', 10.0)
|
||||
operation_mode = params.get('operation_mode', 'balanced') # 'mostly_on', 'mostly_off', 'balanced'
|
||||
absolute_max = params.get('absolute_max', pump_station_height * 0.95)
|
||||
absolute_min = params.get('absolute_min', pump_station_height * 0.05)
|
||||
|
||||
# Set thresholds based on operation mode
|
||||
if operation_mode == 'mostly_on':
|
||||
# Keep level low, pump runs frequently
|
||||
max_threshold = pump_station_height * 0.3 # 30% full
|
||||
min_threshold = pump_station_height * 0.1 # 10% full
|
||||
elif operation_mode == 'mostly_off':
|
||||
# Keep level high, pump runs infrequently
|
||||
max_threshold = pump_station_height * 0.9 # 90% full
|
||||
min_threshold = pump_station_height * 0.7 # 70% full
|
||||
else: # balanced
|
||||
# Middle ground
|
||||
max_threshold = pump_station_height * 0.6 # 60% full
|
||||
min_threshold = pump_station_height * 0.4 # 40% full
|
||||
|
||||
# Safety overrides (always active)
|
||||
if current_level is not None:
|
||||
if current_level >= absolute_max:
|
||||
return {
|
||||
'pump_command': False, # OFF
|
||||
'max_threshold': None,
|
||||
'min_threshold': None,
|
||||
'control_logic': 'backup_fixed_band',
|
||||
'reason': 'absolute_max_level_exceeded',
|
||||
'safety_override': True
|
||||
}
|
||||
elif current_level <= absolute_min:
|
||||
return {
|
||||
'pump_command': False, # OFF
|
||||
'max_threshold': None,
|
||||
'min_threshold': None,
|
||||
'control_logic': 'backup_fixed_band',
|
||||
'reason': 'absolute_min_level_exceeded',
|
||||
'safety_override': True
|
||||
}
|
||||
|
||||
# Normal fixed-band control
|
||||
result = {
|
||||
'pump_command': None, # Let level-based control handle it
|
||||
'max_threshold': max_threshold,
|
||||
'min_threshold': min_threshold,
|
||||
'control_logic': 'backup_fixed_band',
|
||||
'reason': 'fixed_band_control',
|
||||
'operation_mode': operation_mode
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
def _get_current_pump_state(self, station_id: str, pump_id: str) -> bool:
|
||||
"""Get current pump state from internal tracking"""
|
||||
key = (station_id, pump_id)
|
||||
if key in self.pump_states:
|
||||
return self.pump_states[key].get('pump_command', False)
|
||||
return False
|
||||
|
||||
def _update_pump_state(self, station_id: str, pump_id: str, result: Dict[str, Any]):
|
||||
"""Update internal pump state tracking"""
|
||||
key = (station_id, pump_id)
|
||||
|
||||
# Update state
|
||||
self.pump_states[key] = result
|
||||
|
||||
# Update switch time if state changed
|
||||
if 'pump_command' in result:
|
||||
new_state = result['pump_command']
|
||||
old_state = self._get_current_pump_state(station_id, pump_id)
|
||||
|
||||
if new_state != old_state:
|
||||
self.last_switch_times[key] = datetime.now()
|
||||
|
||||
def _can_switch_pump(self, station_id: str, pump_id: str, min_interval: int) -> bool:
|
||||
"""Check if pump can be switched based on minimum interval"""
|
||||
key = (station_id, pump_id)
|
||||
if key not in self.last_switch_times:
|
||||
return True
|
||||
|
||||
time_since_last_switch = (datetime.now() - self.last_switch_times[key]).total_seconds()
|
||||
return time_since_last_switch >= min_interval
|
||||
|
||||
def _get_time_since_last_switch(self, station_id: str, pump_id: str) -> float:
|
||||
"""Get time since last pump state switch in seconds"""
|
||||
key = (station_id, pump_id)
|
||||
if key not in self.last_switch_times:
|
||||
return float('inf') # Never switched
|
||||
|
||||
return (datetime.now() - self.last_switch_times[key]).total_seconds()
|
||||
|
||||
def _calculate_state_change_penalty(
|
||||
self, time_since_last_switch: float, min_switch_interval: int, weight: float
|
||||
) -> float:
|
||||
"""Calculate state change penalty based on time since last switch"""
|
||||
if time_since_last_switch >= min_switch_interval:
|
||||
return 0.0 # No penalty if enough time has passed
|
||||
|
||||
# Penalty decreases linearly as time approaches min_switch_interval
|
||||
penalty_ratio = 1.0 - (time_since_last_switch / min_switch_interval)
|
||||
return penalty_ratio * weight
|
||||
|
||||
def get_pump_status(self, station_id: str, pump_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get current status for a pump"""
|
||||
key = (station_id, pump_id)
|
||||
return self.pump_states.get(key)
|
||||
|
||||
def get_all_pump_statuses(self) -> Dict[Tuple[str, str], Dict[str, Any]]:
|
||||
"""Get status for all tracked pumps"""
|
||||
return self.pump_states.copy()
|
||||
|
||||
def reset_pump_state(self, station_id: str, pump_id: str):
|
||||
"""Reset state tracking for a pump"""
|
||||
key = (station_id, pump_id)
|
||||
if key in self.pump_states:
|
||||
del self.pump_states[key]
|
||||
if key in self.last_switch_times:
|
||||
del self.last_switch_times[key]
|
||||
|
||||
|
||||
# Global instance for easy access
|
||||
pump_control_preprocessor = PumpControlPreprocessor()
|
||||
|
|
@ -12,7 +12,6 @@ from src.database.flexible_client import FlexibleDatabaseClient
|
|||
from src.core.safety import SafetyLimitEnforcer
|
||||
from src.core.emergency_stop import EmergencyStopManager
|
||||
from src.monitoring.watchdog import DatabaseWatchdog
|
||||
from src.core.pump_control_preprocessor import pump_control_preprocessor, PumpControlLogic
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
|
@ -77,86 +76,6 @@ class LevelControlledCalculator(SetpointCalculator):
|
|||
return float(plan.get('suggested_speed_hz', 35.0))
|
||||
|
||||
|
||||
class PumpControlPreprocessorCalculator(SetpointCalculator):
|
||||
"""Calculator that applies pump control preprocessing logic."""
|
||||
|
||||
def calculate_setpoint(self, plan: Dict[str, Any], feedback: Optional[Dict[str, Any]],
|
||||
pump_info: Dict[str, Any]) -> float:
|
||||
"""
|
||||
Calculate setpoint using pump control preprocessing logic.
|
||||
|
||||
Converts MPC outputs to pump actuation signals using configurable control logic.
|
||||
"""
|
||||
# Extract MPC output (pump rate in %)
|
||||
mpc_output = float(plan.get('suggested_speed_hz', 35.0))
|
||||
|
||||
# Convert speed Hz to percentage (assuming 20-50 Hz range)
|
||||
min_speed = pump_info.get('min_speed_hz', 20.0)
|
||||
max_speed = pump_info.get('max_speed_hz', 50.0)
|
||||
pump_rate_percent = ((mpc_output - min_speed) / (max_speed - min_speed)) * 100.0
|
||||
pump_rate_percent = max(0.0, min(100.0, pump_rate_percent))
|
||||
|
||||
# Extract current state from feedback
|
||||
current_level = None
|
||||
current_pump_state = None
|
||||
|
||||
if feedback:
|
||||
current_level = feedback.get('current_level_m')
|
||||
current_pump_state = feedback.get('pump_running')
|
||||
|
||||
# Get control logic configuration from pump info
|
||||
control_logic_str = pump_info.get('control_logic', 'mpc_adaptive_hysteresis')
|
||||
control_params = pump_info.get('control_params', {})
|
||||
|
||||
try:
|
||||
control_logic = PumpControlLogic(control_logic_str)
|
||||
except ValueError:
|
||||
logger.warning(
|
||||
"unknown_control_logic",
|
||||
station_id=pump_info.get('station_id'),
|
||||
pump_id=pump_info.get('pump_id'),
|
||||
control_logic=control_logic_str
|
||||
)
|
||||
control_logic = PumpControlLogic.MPC_ADAPTIVE_HYSTERESIS
|
||||
|
||||
# Apply pump control logic
|
||||
result = pump_control_preprocessor.apply_control_logic(
|
||||
station_id=pump_info.get('station_id'),
|
||||
pump_id=pump_info.get('pump_id'),
|
||||
mpc_output=pump_rate_percent,
|
||||
current_level=current_level,
|
||||
current_pump_state=current_pump_state,
|
||||
control_logic=control_logic,
|
||||
control_params=control_params
|
||||
)
|
||||
|
||||
# Log the control decision
|
||||
logger.info(
|
||||
"pump_control_decision",
|
||||
station_id=pump_info.get('station_id'),
|
||||
pump_id=pump_info.get('pump_id'),
|
||||
mpc_output=mpc_output,
|
||||
pump_rate_percent=pump_rate_percent,
|
||||
control_logic=control_logic.value,
|
||||
result_reason=result.get('reason'),
|
||||
pump_command=result.get('pump_command'),
|
||||
max_threshold=result.get('max_threshold'),
|
||||
min_threshold=result.get('min_threshold')
|
||||
)
|
||||
|
||||
# Convert pump command back to speed Hz
|
||||
if result.get('pump_command') is True:
|
||||
# Pump should be ON - use MPC suggested speed
|
||||
return mpc_output
|
||||
elif result.get('pump_command') is False:
|
||||
# Pump should be OFF
|
||||
return 0.0
|
||||
else:
|
||||
# No direct command - use level-based control with thresholds
|
||||
# For now, return MPC speed and let level control handle it
|
||||
return mpc_output
|
||||
|
||||
|
||||
class PowerControlledCalculator(SetpointCalculator):
|
||||
"""Calculator for power-controlled pumps."""
|
||||
|
||||
|
|
@ -211,8 +130,7 @@ class SetpointManager:
|
|||
self.calculators = {
|
||||
'DIRECT_SPEED': DirectSpeedCalculator(),
|
||||
'LEVEL_CONTROLLED': LevelControlledCalculator(),
|
||||
'POWER_CONTROLLED': PowerControlledCalculator(),
|
||||
'PUMP_CONTROL_PREPROCESSOR': PumpControlPreprocessorCalculator()
|
||||
'POWER_CONTROLLED': PowerControlledCalculator()
|
||||
}
|
||||
|
||||
async def start(self) -> None:
|
||||
|
|
|
|||
|
|
@ -146,7 +146,7 @@ class ProtocolMapping(BaseModel):
|
|||
raise ValueError("REST API endpoint must start with 'http://' or 'https://'")
|
||||
return v
|
||||
|
||||
def apply_preprocessing(self, value: float, context: Optional[Dict[str, Any]] = None) -> float:
|
||||
def apply_preprocessing(self, value: float) -> float:
|
||||
"""Apply preprocessing rules to a value"""
|
||||
if not self.preprocessing_enabled:
|
||||
return value
|
||||
|
|
@ -183,44 +183,6 @@ class ProtocolMapping(BaseModel):
|
|||
width = params.get('width', 0.0)
|
||||
if abs(processed_value - center) <= width:
|
||||
processed_value = center
|
||||
elif rule_type == 'pump_control_logic':
|
||||
# Apply pump control logic preprocessing
|
||||
from src.core.pump_control_preprocessor import pump_control_preprocessor, PumpControlLogic
|
||||
|
||||
# Extract pump control parameters from context
|
||||
station_id = context.get('station_id') if context else None
|
||||
pump_id = context.get('pump_id') if context else None
|
||||
current_level = context.get('current_level') if context else None
|
||||
current_pump_state = context.get('current_pump_state') if context else None
|
||||
|
||||
if station_id and pump_id:
|
||||
# Get control logic type
|
||||
logic_type_str = params.get('logic_type', 'mpc_adaptive_hysteresis')
|
||||
try:
|
||||
logic_type = PumpControlLogic(logic_type_str)
|
||||
except ValueError:
|
||||
logger.warning(f"Unknown pump control logic: {logic_type_str}, using default")
|
||||
logic_type = PumpControlLogic.MPC_ADAPTIVE_HYSTERESIS
|
||||
|
||||
# Apply pump control logic
|
||||
result = pump_control_preprocessor.apply_control_logic(
|
||||
station_id=station_id,
|
||||
pump_id=pump_id,
|
||||
mpc_output=processed_value,
|
||||
current_level=current_level,
|
||||
current_pump_state=current_pump_state,
|
||||
control_logic=logic_type,
|
||||
control_params=params.get('control_params', {})
|
||||
)
|
||||
|
||||
# Convert result to output value
|
||||
# For level-based control, we return the MPC output but store control signals
|
||||
# The actual pump control will use the thresholds from the result
|
||||
processed_value = 100.0 if result.get('pump_command', False) else 0.0
|
||||
|
||||
# Store control result in context for downstream use
|
||||
if context is not None:
|
||||
context['pump_control_result'] = result
|
||||
|
||||
# Apply final output limits
|
||||
if self.min_output_value is not None:
|
||||
|
|
|
|||
Loading…
Reference in New Issue