CalejoControl/test_opcua_client_integrati...

59 lines
1.8 KiB
Python

#!/usr/bin/env python3
"""
Test OPC UA client integration with the actual server nodes
"""
import asyncio
import sys
import os
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from src.protocols.opcua_client import OPCUAClient
from config.settings import Settings
async def test_opcua_client_integration():
"""Test that OPC UA client can read from actual server nodes"""
settings = Settings()
client = OPCUAClient(
endpoint="opc.tcp://localhost:4840",
namespace_idx=2
)
try:
# Connect to server
await client.connect()
print("✓ Connected to OPC UA server")
# Test reading specific nodes that should exist
test_nodes = [
"ns=2;s=Station_STATION_001.Pump_PUMP_001.Setpoint_Hz",
"ns=2;s=Station_STATION_001.Pump_PUMP_001.ActualSpeed_Hz",
"ns=2;s=Station_STATION_001.Pump_PUMP_001.Power_kW",
"ns=2;s=Station_STATION_001.Pump_PUMP_001.FlowRate_m3h",
"ns=2;s=Station_STATION_001.Pump_PUMP_001.SafetyStatus"
]
for node_id in test_nodes:
try:
value = await client.read_value(node_id)
print(f"✓ Read {node_id}: {value}")
except Exception as e:
print(f"✗ Failed to read {node_id}: {e}")
# Test reading multiple nodes at once
print("\nTesting batch read:")
values = await client.read_values(test_nodes)
for node_id, value in zip(test_nodes, values):
print(f" {node_id}: {value}")
except Exception as e:
print(f"✗ Connection failed: {e}")
finally:
await client.disconnect()
print("✓ Disconnected from OPC UA server")
if __name__ == "__main__":
asyncio.run(test_opcua_client_integration())