fix: Fix OPC UA server security configuration

Only configure secure security policies when certificates are available. When certificates are not available, only offer the None security policy and skip certificate validation configuration.
This commit is contained in:
openhands 2025-11-01 20:12:50 +00:00
parent 9c92c5c47f
commit 7917fb0968
1 changed files with 23 additions and 11 deletions

View File

@ -172,17 +172,22 @@ class OPCUAServer:
async def _configure_security(self): async def _configure_security(self):
"""Configure OPC UA security with certificates.""" """Configure OPC UA security with certificates."""
try: try:
# Set security policies
self.server.set_security_policy([
SecurityPolicyBasic256Sha256,
"http://opcfoundation.org/UA/SecurityPolicy#None"
])
# Load or generate certificates # Load or generate certificates
if self.certificate_path and self.private_key_path: if self.certificate_path and self.private_key_path:
# Load existing certificates # Load existing certificates
await self.server.load_certificate(self.certificate_path) await self.server.load_certificate(self.certificate_path)
await self.server.load_private_key(self.private_key_path) await self.server.load_private_key(self.private_key_path)
# Set security policies for secure connections
self.server.set_security_policy([
SecurityPolicyBasic256Sha256,
"http://opcfoundation.org/UA/SecurityPolicy#None"
])
# Configure certificate validation
validator = CertificateValidator(CertificateValidatorOptions())
self.server.set_certificate_validator(validator)
elif HAS_CERT_GEN and setup_self_signed_cert: elif HAS_CERT_GEN and setup_self_signed_cert:
# Generate self-signed certificate for development # Generate self-signed certificate for development
await setup_self_signed_cert( await setup_self_signed_cert(
@ -194,18 +199,25 @@ class OPCUAServer:
"Lazio", "Lazio",
"calejo-control.com" "calejo-control.com"
) )
# Set security policies for secure connections
self.server.set_security_policy([
SecurityPolicyBasic256Sha256,
"http://opcfoundation.org/UA/SecurityPolicy#None"
])
# Configure certificate validation
validator = CertificateValidator(CertificateValidatorOptions())
self.server.set_certificate_validator(validator)
else: else:
# Certificate generation not available, use basic security # Certificate generation not available, use only None security policy
logger.warning("certificate_generation_not_available") logger.warning("certificate_generation_not_available")
self.server.set_security_policy([ self.server.set_security_policy([
"http://opcfoundation.org/UA/SecurityPolicy#None" "http://opcfoundation.org/UA/SecurityPolicy#None"
]) ])
return return
# Configure certificate validation
validator = CertificateValidator(CertificateValidatorOptions())
self.server.set_certificate_validator(validator)
logger.info("opcua_security_configured") logger.info("opcua_security_configured")
except Exception as e: except Exception as e: