#!/usr/bin/env python3 """ Test script to verify protocol clients can connect and read data. """ import asyncio import sys import os # Add src to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) from dashboard.protocol_clients import OPCUAClient, ModbusClient async def test_opcua_client(): """Test OPC UA client connection and data reading.""" print("Testing OPC UA client...") client = OPCUAClient(endpoint="opc.tcp://localhost:4840") try: # Test connection connected = await client.connect() if connected: print("✓ OPC UA client connected successfully") # Test reading a node # Try to read a known node (setpoint for station 1, pump 1) node_id = "ns=2;s=Station_STATION_001.Pump_PUMP_001.Setpoint_Hz" value = await client.read_node_value(node_id) if value is not None: print(f"✓ OPC UA read successful: {value}") else: print("✗ OPC UA read returned None (node might not exist)") # Test getting pump data pump_data = await client.get_pump_data("STATION_001", "PUMP_001") if pump_data: print(f"✓ OPC UA pump data retrieved: {pump_data}") else: print("✗ OPC UA pump data returned empty dict") await client.disconnect() print("✓ OPC UA client disconnected") else: print("✗ OPC UA client failed to connect") except Exception as e: print(f"✗ OPC UA client test failed: {e}") def test_modbus_client(): """Test Modbus client connection and data reading.""" print("\nTesting Modbus client...") client = ModbusClient(host="localhost", port=502, unit_id=1) try: # Test connection connected = client.connect() if connected: print("✓ Modbus client connected successfully") # Test reading holding register (setpoint for pump 1) setpoint = client.read_holding_register(0, 1) # Address 0 for pump 1 if setpoint is not None: print(f"✓ Modbus holding register read successful: {setpoint}") else: print("✗ Modbus holding register read returned None") # Test reading input register (status for pump 1) status = client.read_input_register(100, 1) # Address 100 for pump 1 if status is not None: print(f"✓ Modbus input register read successful: {status}") else: print("✗ Modbus input register read returned None") # Test getting pump registers pump_registers = client.get_pump_registers(1) # Pump 1 if pump_registers: print(f"✓ Modbus pump registers retrieved: {pump_registers}") else: print("✗ Modbus pump registers returned empty dict") client.disconnect() print("✓ Modbus client disconnected") else: print("✗ Modbus client failed to connect") except Exception as e: print(f"✗ Modbus client test failed: {e}") async def main(): """Run all tests.""" print("=" * 50) print("Protocol Clients Test") print("=" * 50) await test_opcua_client() test_modbus_client() print("\n" + "=" * 50) print("Test completed") print("=" * 50) if __name__ == "__main__": asyncio.run(main())