From 7917fb09683672270cdff41526aa4d958b3e029f Mon Sep 17 00:00:00 2001 From: openhands Date: Sat, 1 Nov 2025 20:12:50 +0000 Subject: [PATCH] fix: Fix OPC UA server security configuration Only configure secure security policies when certificates are available. When certificates are not available, only offer the None security policy and skip certificate validation configuration. --- src/protocols/opcua_server.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/protocols/opcua_server.py b/src/protocols/opcua_server.py index 8a5d059..f64e7f6 100644 --- a/src/protocols/opcua_server.py +++ b/src/protocols/opcua_server.py @@ -172,17 +172,22 @@ class OPCUAServer: async def _configure_security(self): """Configure OPC UA security with certificates.""" try: - # Set security policies - self.server.set_security_policy([ - SecurityPolicyBasic256Sha256, - "http://opcfoundation.org/UA/SecurityPolicy#None" - ]) - # Load or generate certificates if self.certificate_path and self.private_key_path: # Load existing certificates await self.server.load_certificate(self.certificate_path) await self.server.load_private_key(self.private_key_path) + + # Set security policies for secure connections + self.server.set_security_policy([ + SecurityPolicyBasic256Sha256, + "http://opcfoundation.org/UA/SecurityPolicy#None" + ]) + + # Configure certificate validation + validator = CertificateValidator(CertificateValidatorOptions()) + self.server.set_certificate_validator(validator) + elif HAS_CERT_GEN and setup_self_signed_cert: # Generate self-signed certificate for development await setup_self_signed_cert( @@ -194,18 +199,25 @@ class OPCUAServer: "Lazio", "calejo-control.com" ) + + # Set security policies for secure connections + self.server.set_security_policy([ + SecurityPolicyBasic256Sha256, + "http://opcfoundation.org/UA/SecurityPolicy#None" + ]) + + # Configure certificate validation + validator = CertificateValidator(CertificateValidatorOptions()) + self.server.set_certificate_validator(validator) + else: - # Certificate generation not available, use basic security + # Certificate generation not available, use only None security policy logger.warning("certificate_generation_not_available") self.server.set_security_policy([ "http://opcfoundation.org/UA/SecurityPolicy#None" ]) return - # Configure certificate validation - validator = CertificateValidator(CertificateValidatorOptions()) - self.server.set_certificate_validator(validator) - logger.info("opcua_security_configured") except Exception as e: