CalejoControl/tests/test_protocol_clients.py

108 lines
3.6 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Test script to verify protocol clients can connect and read data.
"""
import asyncio
import sys
import os
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from dashboard.protocol_clients import OPCUAClient, ModbusClient
async def test_opcua_client():
"""Test OPC UA client connection and data reading."""
print("Testing OPC UA client...")
client = OPCUAClient(endpoint="opc.tcp://localhost:4840")
try:
# Test connection
connected = await client.connect()
if connected:
print("✓ OPC UA client connected successfully")
# Test reading a node
# Try to read a known node (setpoint for station 1, pump 1)
node_id = "ns=2;s=Station_STATION_001.Pump_PUMP_001.Setpoint_Hz"
value = await client.read_node_value(node_id)
if value is not None:
print(f"✓ OPC UA read successful: {value}")
else:
print("✗ OPC UA read returned None (node might not exist)")
# Test getting pump data
pump_data = await client.get_pump_data("STATION_001", "PUMP_001")
if pump_data:
print(f"✓ OPC UA pump data retrieved: {pump_data}")
else:
print("✗ OPC UA pump data returned empty dict")
await client.disconnect()
print("✓ OPC UA client disconnected")
else:
print("✗ OPC UA client failed to connect")
except Exception as e:
print(f"✗ OPC UA client test failed: {e}")
def test_modbus_client():
"""Test Modbus client connection and data reading."""
print("\nTesting Modbus client...")
client = ModbusClient(host="localhost", port=502, unit_id=1)
try:
# Test connection
connected = client.connect()
if connected:
print("✓ Modbus client connected successfully")
# Test reading holding register (setpoint for pump 1)
setpoint = client.read_holding_register(0, 1) # Address 0 for pump 1
if setpoint is not None:
print(f"✓ Modbus holding register read successful: {setpoint}")
else:
print("✗ Modbus holding register read returned None")
# Test reading input register (status for pump 1)
status = client.read_input_register(100, 1) # Address 100 for pump 1
if status is not None:
print(f"✓ Modbus input register read successful: {status}")
else:
print("✗ Modbus input register read returned None")
# Test getting pump registers
pump_registers = client.get_pump_registers(1) # Pump 1
if pump_registers:
print(f"✓ Modbus pump registers retrieved: {pump_registers}")
else:
print("✗ Modbus pump registers returned empty dict")
client.disconnect()
print("✓ Modbus client disconnected")
else:
print("✗ Modbus client failed to connect")
except Exception as e:
print(f"✗ Modbus client test failed: {e}")
async def main():
"""Run all tests."""
print("=" * 50)
print("Protocol Clients Test")
print("=" * 50)
await test_opcua_client()
test_modbus_client()
print("\n" + "=" * 50)
print("Test completed")
print("=" * 50)
if __name__ == "__main__":
asyncio.run(main())