CalejoControl/test_opcua_fix.py

82 lines
2.4 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Test script to fix OPC UA client connection with proper security mode
"""
import asyncio
async def test_opcua_with_correct_security():
"""Test OPC UA connection with correct security mode."""
print("Testing OPC UA connection with correct security mode...")
try:
from asyncua import Client
from asyncua.ua import MessageSecurityMode
client = Client(url="opc.tcp://localhost:4840")
# Set security to None explicitly - BOTH mode and URI
client.security_policy.Mode = MessageSecurityMode.None_
client.security_policy.URI = "http://opcfoundation.org/UA/SecurityPolicy#None"
print(f"Security Mode: {client.security_policy.Mode}")
print(f"Security Policy URI: {client.security_policy.URI}")
print("Connecting...")
await client.connect()
print("✓ Connected successfully!")
# Try to read a node
try:
node = client.get_node("ns=2;s=Station_STATION_001.Pump_PUMP_001.Setpoint_Hz")
value = await node.read_value()
print(f"✓ Successfully read node value: {value}")
except Exception as e:
print(f"✗ Failed to read node: {e}")
await client.disconnect()
print("✓ Disconnected successfully")
except Exception as e:
print(f"✗ Connection failed: {e}")
import traceback
traceback.print_exc()
async def test_opcua_with_auto_security():
"""Test OPC UA connection with automatic security negotiation."""
print("\n\nTesting OPC UA connection with automatic security negotiation...")
try:
from asyncua import Client
client = Client(url="opc.tcp://localhost:4840")
# Don't set any security - let it auto-negotiate
print("Connecting with auto-negotiation...")
await client.connect()
print("✓ Connected successfully with auto-negotiation!")
await client.disconnect()
except Exception as e:
print(f"✗ Auto-negotiation failed: {e}")
async def main():
"""Run the test."""
print("=" * 60)
print("OPC UA Security Fix Test")
print("=" * 60)
await test_opcua_with_correct_security()
await test_opcua_with_auto_security()
print("\n" + "=" * 60)
print("Test completed")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())