82 lines
2.4 KiB
Python
82 lines
2.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script to fix OPC UA client connection with proper security mode
|
|
"""
|
|
|
|
import asyncio
|
|
|
|
|
|
async def test_opcua_with_correct_security():
|
|
"""Test OPC UA connection with correct security mode."""
|
|
print("Testing OPC UA connection with correct security mode...")
|
|
|
|
try:
|
|
from asyncua import Client
|
|
from asyncua.ua import MessageSecurityMode
|
|
|
|
client = Client(url="opc.tcp://localhost:4840")
|
|
|
|
# Set security to None explicitly - BOTH mode and URI
|
|
client.security_policy.Mode = MessageSecurityMode.None_
|
|
client.security_policy.URI = "http://opcfoundation.org/UA/SecurityPolicy#None"
|
|
|
|
print(f"Security Mode: {client.security_policy.Mode}")
|
|
print(f"Security Policy URI: {client.security_policy.URI}")
|
|
|
|
print("Connecting...")
|
|
await client.connect()
|
|
print("✓ Connected successfully!")
|
|
|
|
# Try to read a node
|
|
try:
|
|
node = client.get_node("ns=2;s=Station_STATION_001.Pump_PUMP_001.Setpoint_Hz")
|
|
value = await node.read_value()
|
|
print(f"✓ Successfully read node value: {value}")
|
|
except Exception as e:
|
|
print(f"✗ Failed to read node: {e}")
|
|
|
|
await client.disconnect()
|
|
print("✓ Disconnected successfully")
|
|
|
|
except Exception as e:
|
|
print(f"✗ Connection failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
async def test_opcua_with_auto_security():
|
|
"""Test OPC UA connection with automatic security negotiation."""
|
|
print("\n\nTesting OPC UA connection with automatic security negotiation...")
|
|
|
|
try:
|
|
from asyncua import Client
|
|
|
|
client = Client(url="opc.tcp://localhost:4840")
|
|
|
|
# Don't set any security - let it auto-negotiate
|
|
print("Connecting with auto-negotiation...")
|
|
await client.connect()
|
|
print("✓ Connected successfully with auto-negotiation!")
|
|
|
|
await client.disconnect()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Auto-negotiation failed: {e}")
|
|
|
|
|
|
async def main():
|
|
"""Run the test."""
|
|
print("=" * 60)
|
|
print("OPC UA Security Fix Test")
|
|
print("=" * 60)
|
|
|
|
await test_opcua_with_correct_security()
|
|
await test_opcua_with_auto_security()
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Test completed")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |