""" Simplified Configuration Manager Manages protocol signals with human-readable names and tags Replaces the complex ID-based system """ import logging from typing import List, Optional, Dict, Any from datetime import datetime from .simplified_models import ( ProtocolSignal, ProtocolSignalCreate, ProtocolSignalUpdate, ProtocolSignalFilter, ProtocolType ) logger = logging.getLogger(__name__) class SimplifiedConfigurationManager: """ Manages protocol signals with simplified name + tags approach """ def __init__(self, database_client=None): self.database_client = database_client self.signals: Dict[str, ProtocolSignal] = {} logger.info("SimplifiedConfigurationManager initialized") def add_protocol_signal(self, signal_create: ProtocolSignalCreate) -> bool: """ Add a new protocol signal """ try: # Generate signal ID signal_id = signal_create.generate_signal_id() # Check if signal ID already exists if signal_id in self.signals: logger.warning(f"Signal ID {signal_id} already exists") return False # Create ProtocolSignal object signal = ProtocolSignal( signal_id=signal_id, signal_name=signal_create.signal_name, tags=signal_create.tags, protocol_type=signal_create.protocol_type, protocol_address=signal_create.protocol_address, db_source=signal_create.db_source, preprocessing_enabled=signal_create.preprocessing_enabled, preprocessing_rules=signal_create.preprocessing_rules, min_output_value=signal_create.min_output_value, max_output_value=signal_create.max_output_value, default_output_value=signal_create.default_output_value, modbus_config=signal_create.modbus_config, opcua_config=signal_create.opcua_config, created_at=datetime.now().isoformat(), updated_at=datetime.now().isoformat() ) # Store in memory (in production, this would be in database) self.signals[signal_id] = signal logger.info(f"Added protocol signal: {signal_id} - {signal.signal_name}") return True except Exception as e: logger.error(f"Error adding protocol signal: {str(e)}") return False def get_protocol_signals(self, filters: Optional[ProtocolSignalFilter] = None) -> List[ProtocolSignal]: """ Get protocol signals with optional filtering """ try: signals = list(self.signals.values()) if not filters: return signals # Apply filters filtered_signals = signals # Filter by tags if filters.tags: filtered_signals = [ s for s in filtered_signals if any(tag in s.tags for tag in filters.tags) ] # Filter by protocol type if filters.protocol_type: filtered_signals = [ s for s in filtered_signals if s.protocol_type == filters.protocol_type ] # Filter by signal name if filters.signal_name_contains: filtered_signals = [ s for s in filtered_signals if filters.signal_name_contains.lower() in s.signal_name.lower() ] # Filter by enabled status if filters.enabled is not None: filtered_signals = [ s for s in filtered_signals if s.enabled == filters.enabled ] return filtered_signals except Exception as e: logger.error(f"Error getting protocol signals: {str(e)}") return [] def get_protocol_signal(self, signal_id: str) -> Optional[ProtocolSignal]: """ Get a specific protocol signal by ID """ return self.signals.get(signal_id) def update_protocol_signal(self, signal_id: str, update_data: ProtocolSignalUpdate) -> bool: """ Update an existing protocol signal """ try: if signal_id not in self.signals: logger.warning(f"Signal {signal_id} not found for update") return False signal = self.signals[signal_id] # Update fields if provided if update_data.signal_name is not None: signal.signal_name = update_data.signal_name if update_data.tags is not None: signal.tags = update_data.tags if update_data.protocol_type is not None: signal.protocol_type = update_data.protocol_type if update_data.protocol_address is not None: signal.protocol_address = update_data.protocol_address if update_data.db_source is not None: signal.db_source = update_data.db_source if update_data.preprocessing_enabled is not None: signal.preprocessing_enabled = update_data.preprocessing_enabled if update_data.preprocessing_rules is not None: signal.preprocessing_rules = update_data.preprocessing_rules if update_data.min_output_value is not None: signal.min_output_value = update_data.min_output_value if update_data.max_output_value is not None: signal.max_output_value = update_data.max_output_value if update_data.default_output_value is not None: signal.default_output_value = update_data.default_output_value if update_data.modbus_config is not None: signal.modbus_config = update_data.modbus_config if update_data.opcua_config is not None: signal.opcua_config = update_data.opcua_config if update_data.enabled is not None: signal.enabled = update_data.enabled # Update timestamp signal.updated_at = datetime.now().isoformat() logger.info(f"Updated protocol signal: {signal_id}") return True except Exception as e: logger.error(f"Error updating protocol signal {signal_id}: {str(e)}") return False def delete_protocol_signal(self, signal_id: str) -> bool: """ Delete a protocol signal """ try: if signal_id not in self.signals: logger.warning(f"Signal {signal_id} not found for deletion") return False del self.signals[signal_id] logger.info(f"Deleted protocol signal: {signal_id}") return True except Exception as e: logger.error(f"Error deleting protocol signal {signal_id}: {str(e)}") return False def search_signals_by_tags(self, tags: List[str]) -> List[ProtocolSignal]: """ Search signals by tags (all tags must match) """ try: return [ signal for signal in self.signals.values() if all(tag in signal.tags for tag in tags) ] except Exception as e: logger.error(f"Error searching signals by tags: {str(e)}") return [] def get_all_tags(self) -> List[str]: """ Get all unique tags used across all signals """ all_tags = set() for signal in self.signals.values(): all_tags.update(signal.tags) return sorted(list(all_tags)) def get_signals_by_protocol_type(self, protocol_type: ProtocolType) -> List[ProtocolSignal]: """ Get all signals for a specific protocol type """ return [ signal for signal in self.signals.values() if signal.protocol_type == protocol_type ] def validate_signal_configuration(self, signal_create: ProtocolSignalCreate) -> Dict[str, Any]: """ Validate signal configuration before creation """ validation_result = { "valid": True, "errors": [], "warnings": [] } try: # Validate signal name if not signal_create.signal_name or not signal_create.signal_name.strip(): validation_result["valid"] = False validation_result["errors"].append("Signal name cannot be empty") # Validate protocol address if not signal_create.protocol_address: validation_result["valid"] = False validation_result["errors"].append("Protocol address cannot be empty") # Validate database source if not signal_create.db_source: validation_result["valid"] = False validation_result["errors"].append("Database source cannot be empty") # Check for duplicate signal names existing_names = [s.signal_name for s in self.signals.values()] if signal_create.signal_name in existing_names: validation_result["warnings"].append( f"Signal name '{signal_create.signal_name}' already exists" ) # Validate tags if not signal_create.tags: validation_result["warnings"].append("No tags provided - consider adding tags for better organization") return validation_result except Exception as e: validation_result["valid"] = False validation_result["errors"].append(f"Validation error: {str(e)}") return validation_result # Global instance for simplified configuration management simplified_configuration_manager = SimplifiedConfigurationManager()