""" Protocol Clients for Dashboard Provides client utilities to query OPC UA and Modbus servers for real-time data. """ import asyncio import structlog from typing import Dict, Any, Optional, List from datetime import datetime logger = structlog.get_logger() class OPCUAClient: """OPC UA Client for querying pump data from OPC UA server.""" def __init__(self, endpoint: str = "opc.tcp://localhost:4840"): self.endpoint = endpoint self._client = None async def connect(self): """Connect to OPC UA server.""" try: from asyncua import Client self._client = Client(url=self.endpoint) # Try to connect with no explicit security policy first # The client should automatically negotiate with the server await asyncio.wait_for(self._client.connect(), timeout=5.0) logger.info("opcua_client_connected", endpoint=self.endpoint) return True except asyncio.TimeoutError: logger.error("opcua_connection_timeout", endpoint=self.endpoint) return False except Exception as e: logger.error("failed_to_connect_opcua", endpoint=self.endpoint, error=str(e)) return False async def disconnect(self): """Disconnect from OPC UA server.""" if self._client: await self._client.disconnect() self._client = None logger.info("opcua_client_disconnected") async def read_node_value(self, node_id: str) -> Optional[Any]: """Read value from OPC UA node.""" try: if not self._client: logger.info("opcua_client_not_connected_attempting_connect") connected = await self.connect() if not connected: logger.error("opcua_client_connect_failed") return None # Double-check client is still valid if not self._client: logger.error("opcua_client_still_none_after_connect") return None node = self._client.get_node(node_id) value = await asyncio.wait_for(node.read_value(), timeout=3.0) return value except asyncio.TimeoutError: logger.error("opcua_read_timeout", node_id=node_id) return None except Exception as e: logger.error("failed_to_read_opcua_node", node_id=node_id, error=str(e)) return None async def get_pump_data(self, station_id: str, pump_id: str) -> Dict[str, Any]: """Get all data for a specific pump.""" try: # Ensure client is connected before reading multiple nodes if not self._client: connected = await self.connect() if not connected: logger.error("opcua_client_not_connected_for_pump_data", station_id=station_id, pump_id=pump_id) return {} # Define node IDs for this pump (using numeric IDs from server) # Node IDs are assigned sequentially by the server node_map = { "STATION_001": { "PUMP_001": { "setpoint": "ns=2;i=7", "actual_speed": "ns=2;i=8", "power": "ns=2;i=9", "flow_rate": "ns=2;i=10", "safety_status": "ns=2;i=11", "timestamp": "ns=2;i=12" }, "PUMP_002": { "setpoint": "ns=2;i=16", "actual_speed": "ns=2;i=17", "power": "ns=2;i=18", "flow_rate": "ns=2;i=19", "safety_status": "ns=2;i=20", "timestamp": "ns=2;i=21" } }, "STATION_002": { "PUMP_003": { "setpoint": "ns=2;i=27", "actual_speed": "ns=2;i=28", "power": "ns=2;i=29", "flow_rate": "ns=2;i=30", "safety_status": "ns=2;i=31", "timestamp": "ns=2;i=32" } } } # Get the nodes for this specific pump nodes = node_map.get(station_id, {}).get(pump_id, {}) data = {} for key, node_id in nodes.items(): value = await self.read_node_value(node_id) data[key] = value return data except Exception as e: logger.error("failed_to_get_pump_data", station_id=station_id, pump_id=pump_id, error=str(e)) return {} class ModbusClient: """Modbus Client for querying register data from Modbus server.""" def __init__(self, host: str = "localhost", port: int = 502, unit_id: int = 1): self.host = host self.port = port self.unit_id = unit_id self._client = None def connect(self) -> bool: """Connect to Modbus server.""" try: from pymodbus.client import ModbusTcpClient self._client = ModbusTcpClient(self.host, port=self.port) if self._client.connect(): logger.info("modbus_client_connected", host=self.host, port=self.port) return True else: logger.error("failed_to_connect_modbus", host=self.host, port=self.port) return False except Exception as e: logger.error("failed_to_connect_modbus", host=self.host, port=self.port, error=str(e)) return False def disconnect(self): """Disconnect from Modbus server.""" if self._client: self._client.close() self._client = None logger.info("modbus_client_disconnected") def read_holding_register(self, address: int, count: int = 1) -> Optional[List[int]]: """Read holding register(s).""" try: if not self._client or not self._client.is_socket_open(): connected = self.connect() if not connected: return None # Set timeout for the read operation if hasattr(self._client, 'timeout'): original_timeout = self._client.timeout self._client.timeout = 2.0 # 2 second timeout result = self._client.read_holding_registers(address, count) # Restore original timeout if hasattr(self._client, 'timeout'): self._client.timeout = original_timeout if not result.isError(): return result.registers else: logger.error("failed_to_read_holding_register", address=address, error=str(result)) return None except Exception as e: logger.error("failed_to_read_holding_register", address=address, error=str(e)) return None def read_input_register(self, address: int, count: int = 1) -> Optional[List[int]]: """Read input register(s).""" try: if not self._client or not self._client.is_socket_open(): connected = self.connect() if not connected: return None # Set timeout for the read operation if hasattr(self._client, 'timeout'): original_timeout = self._client.timeout self._client.timeout = 2.0 # 2 second timeout result = self._client.read_input_registers(address, count) # Restore original timeout if hasattr(self._client, 'timeout'): self._client.timeout = original_timeout if not result.isError(): return result.registers else: logger.error("failed_to_read_input_register", address=address, error=str(result)) return None except Exception as e: logger.error("failed_to_read_input_register", address=address, error=str(e)) return None def get_pump_registers(self, pump_num: int) -> Dict[str, Any]: """Get all register data for a specific pump.""" try: # Calculate register addresses based on server configuration # Server uses: SETPOINT_BASE=0, STATUS_BASE=100, SAFETY_BASE=200, PERFORMANCE_BASE=400 # Each pump gets 10 registers in each block pump_offset = (pump_num - 1) * 10 # Read holding registers (setpoints) - base address 0 setpoint = self.read_holding_register(pump_offset, 1) # Read input registers (status values) - base address 100 actual_speed = self.read_input_register(100 + pump_offset, 1) power = self.read_input_register(100 + pump_offset + 1, 1) flow_rate = self.read_input_register(100 + pump_offset + 2, 1) # Read safety status - base address 200 safety_status = self.read_input_register(200 + pump_offset, 1) # Read performance metrics - base address 400 (if available) efficiency = None try: efficiency = self.read_input_register(400 + pump_offset, 1) except Exception: # Performance metrics might not be available pass return { "setpoint": setpoint[0] if setpoint else None, "actual_speed": actual_speed[0] if actual_speed else None, "power": power[0] if power else None, "flow_rate": flow_rate[0] if flow_rate else None, "safety_status": safety_status[0] if safety_status else None, "efficiency": efficiency[0] if efficiency else None } except Exception as e: logger.error("failed_to_get_pump_registers", pump_num=pump_num, error=str(e)) return {} class ProtocolDataCollector: """Main class for collecting data from all protocol servers.""" def __init__(self): self.opcua_client = OPCUAClient() self.modbus_client = ModbusClient() async def get_signal_data(self, station_id: str, pump_id: str) -> List[Dict[str, Any]]: """Get signal data for a specific pump from all protocols.""" signals = [] # Extract pump number from pump_id (e.g., "PUMP_001" -> 1) try: pump_num = int(pump_id.split('_')[1]) except (IndexError, ValueError): pump_num = 1 # Get OPC UA data opcua_data = await self.opcua_client.get_pump_data(station_id, pump_id) # Get Modbus data with timeout protection modbus_data = None try: import asyncio # Run Modbus operations in a thread with timeout loop = asyncio.get_event_loop() modbus_data = await asyncio.wait_for( loop.run_in_executor(None, self.modbus_client.get_pump_registers, pump_num), timeout=5.0 ) except asyncio.TimeoutError: logger.warning("modbus_data_timeout", pump_num=pump_num) modbus_data = None except Exception as e: logger.error("failed_to_get_modbus_data", pump_num=pump_num, error=str(e)) modbus_data = None # Create OPC UA signals if opcua_data: # Handle None values gracefully setpoint = opcua_data.get('setpoint', 0.0) or 0.0 actual_speed = opcua_data.get('actual_speed', 0.0) or 0.0 power = opcua_data.get('power', 0.0) or 0.0 flow_rate = opcua_data.get('flow_rate', 0.0) or 0.0 safety_status = opcua_data.get('safety_status', 'normal') or 'normal' # Map pump IDs to their node IDs address_map = { "PUMP_001": { "setpoint": "ns=2;i=7", "actual_speed": "ns=2;i=8", "power": "ns=2;i=9", "flow_rate": "ns=2;i=10", "safety_status": "ns=2;i=11" }, "PUMP_002": { "setpoint": "ns=2;i=16", "actual_speed": "ns=2;i=17", "power": "ns=2;i=18", "flow_rate": "ns=2;i=19", "safety_status": "ns=2;i=20" }, "PUMP_003": { "setpoint": "ns=2;i=27", "actual_speed": "ns=2;i=28", "power": "ns=2;i=29", "flow_rate": "ns=2;i=30", "safety_status": "ns=2;i=31" } } pump_addresses = address_map.get(pump_id, {}) signals.extend([ { "name": f"Station_{station_id}_Pump_{pump_id}_Setpoint", "protocol": "opcua", "address": pump_addresses.get("setpoint", "ns=2;i=7"), "data_type": "Float", "current_value": f"{setpoint:.1f} Hz", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": f"Station_{station_id}_Pump_{pump_id}_ActualSpeed", "protocol": "opcua", "address": pump_addresses.get("actual_speed", "ns=2;i=8"), "data_type": "Float", "current_value": f"{actual_speed:.1f} Hz", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": f"Station_{station_id}_Pump_{pump_id}_Power", "protocol": "opcua", "address": pump_addresses.get("power", "ns=2;i=9"), "data_type": "Float", "current_value": f"{power:.1f} kW", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": f"Station_{station_id}_Pump_{pump_id}_FlowRate", "protocol": "opcua", "address": pump_addresses.get("flow_rate", "ns=2;i=10"), "data_type": "Float", "current_value": f"{flow_rate:.1f} m³/h", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": f"Station_{station_id}_Pump_{pump_id}_SafetyStatus", "protocol": "opcua", "address": pump_addresses.get("safety_status", "ns=2;i=11"), "data_type": "String", "current_value": safety_status, "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } ]) # Create Modbus signals if modbus_data: # Handle None values gracefully setpoint = modbus_data.get('setpoint', 0) or 0 actual_speed = modbus_data.get('actual_speed', 0) or 0 power = modbus_data.get('power', 0) or 0 flow_rate = modbus_data.get('flow_rate', 0) or 0 safety_status = modbus_data.get('safety_status', 0) or 0 efficiency = modbus_data.get('efficiency', 0) or 0 # Calculate pump offset for address display pump_offset = (pump_num - 1) * 10 signals.extend([ { "name": f"Station_{station_id}_Pump_{pump_id}_Setpoint", "protocol": "modbus", "address": f"Holding Register {pump_offset}", "data_type": "Integer", "current_value": f"{setpoint} Hz (x10)", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": f"Station_{station_id}_Pump_{pump_id}_ActualSpeed", "protocol": "modbus", "address": f"Input Register {100 + pump_offset}", "data_type": "Integer", "current_value": f"{actual_speed} Hz (x10)", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": f"Station_{station_id}_Pump_{pump_id}_Power", "protocol": "modbus", "address": f"Input Register {100 + pump_offset + 1}", "data_type": "Integer", "current_value": f"{power} kW (x10)", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": f"Station_{station_id}_Pump_{pump_id}_FlowRate", "protocol": "modbus", "address": f"Input Register {100 + pump_offset + 2}", "data_type": "Integer", "current_value": f"{flow_rate} m³/h", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": f"Station_{station_id}_Pump_{pump_id}_SafetyStatus", "protocol": "modbus", "address": f"Input Register {200 + pump_offset}", "data_type": "Integer", "current_value": f"{safety_status}", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": f"Station_{station_id}_Pump_{pump_id}_Efficiency", "protocol": "modbus", "address": f"Input Register {400 + pump_offset}", "data_type": "Integer", "current_value": f"{efficiency} %", "quality": "Good", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } ]) return signals async def cleanup(self): """Clean up connections.""" await self.opcua_client.disconnect() self.modbus_client.disconnect()