#!/usr/bin/env python3 """ Test script to fix OPC UA client connection with proper security mode """ import asyncio async def test_opcua_with_correct_security(): """Test OPC UA connection with correct security mode.""" print("Testing OPC UA connection with correct security mode...") try: from asyncua import Client from asyncua.ua import MessageSecurityMode client = Client(url="opc.tcp://localhost:4840") # Set security to None explicitly - BOTH mode and URI client.security_policy.Mode = MessageSecurityMode.None_ client.security_policy.URI = "http://opcfoundation.org/UA/SecurityPolicy#None" print(f"Security Mode: {client.security_policy.Mode}") print(f"Security Policy URI: {client.security_policy.URI}") print("Connecting...") await client.connect() print("✓ Connected successfully!") # Try to read a node try: node = client.get_node("ns=2;s=Station_STATION_001.Pump_PUMP_001.Setpoint_Hz") value = await node.read_value() print(f"✓ Successfully read node value: {value}") except Exception as e: print(f"✗ Failed to read node: {e}") await client.disconnect() print("✓ Disconnected successfully") except Exception as e: print(f"✗ Connection failed: {e}") import traceback traceback.print_exc() async def test_opcua_with_auto_security(): """Test OPC UA connection with automatic security negotiation.""" print("\n\nTesting OPC UA connection with automatic security negotiation...") try: from asyncua import Client client = Client(url="opc.tcp://localhost:4840") # Don't set any security - let it auto-negotiate print("Connecting with auto-negotiation...") await client.connect() print("✓ Connected successfully with auto-negotiation!") await client.disconnect() except Exception as e: print(f"✗ Auto-negotiation failed: {e}") async def main(): """Run the test.""" print("=" * 60) print("OPC UA Security Fix Test") print("=" * 60) await test_opcua_with_correct_security() await test_opcua_with_auto_security() print("\n" + "=" * 60) print("Test completed") print("=" * 60) if __name__ == "__main__": asyncio.run(main())