""" OPC UA Server for Calejo Control Adapter. Provides OPC UA interface for SCADA systems to access setpoints and status. """ import asyncio from typing import Dict, Optional, Tuple, Any from datetime import datetime import structlog from asyncua import Server, Node from asyncua.common.methods import uamethod from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256 from asyncua.crypto.validator import CertificateValidator, CertificateValidatorOptions # Try to import certificate generation functions # These might not be available in all asyncua versions try: from asyncua.crypto.cert_gen import setup_self_signed_cert HAS_CERT_GEN = True except ImportError: HAS_CERT_GEN = False setup_self_signed_cert = None from src.core.setpoint_manager import SetpointManager from src.core.security import SecurityManager, UserRole from src.core.compliance_audit import ComplianceAuditLogger, AuditEventType, AuditSeverity logger = structlog.get_logger() class OPCUAServer: """OPC UA Server for Calejo Control Adapter.""" def __init__( self, setpoint_manager: SetpointManager, security_manager: SecurityManager, audit_logger: ComplianceAuditLogger, endpoint: str = "opc.tcp://0.0.0.0:4840", server_name: str = "Calejo Control OPC UA Server", enable_security: bool = True, certificate_path: Optional[str] = None, private_key_path: Optional[str] = None ): self.setpoint_manager = setpoint_manager self.security_manager = security_manager self.audit_logger = audit_logger self.endpoint = endpoint self.server_name = server_name self.enable_security = enable_security self.certificate_path = certificate_path self.private_key_path = private_key_path self.server = None self.namespace_idx = None # Security tracking self.connected_clients: Dict[str, Dict] = {} # client_id -> client_info # Node references self.objects_node = None self.station_nodes = {} self.pump_nodes = {} async def start(self): """Start the OPC UA server.""" try: # Create server self.server = Server() await self.server.init() # Configure server self.server.set_endpoint(self.endpoint) self.server.set_server_name(self.server_name) # Configure security if enabled if self.enable_security: await self._configure_security() else: # No security (for development only) self.server.set_security_policy([ "http://opcfoundation.org/UA/SecurityPolicy#None" ]) # Setup namespace uri = "http://calejo-control.com/OPCUA/" self.namespace_idx = await self.server.register_namespace(uri) # Create object structure await self._create_object_structure() # Start server await self.server.start() # Log security configuration security_mode = "secure" if self.enable_security else "insecure" logger.info( "opcua_server_started", endpoint=self.endpoint, namespace_idx=self.namespace_idx, security_mode=security_mode ) # Log security event self.audit_logger.log_security_event( event_type=AuditEventType.SYSTEM_START, severity=AuditSeverity.LOW, event_data={ "protocol": "OPC_UA", "endpoint": self.endpoint, "security_enabled": self.enable_security, "server_name": self.server_name } ) # Start background task to update setpoints asyncio.create_task(self._update_setpoints_loop()) except Exception as e: logger.error("failed_to_start_opcua_server", error=str(e)) raise async def _configure_security(self): """Configure OPC UA security with certificates.""" try: # Set security policies self.server.set_security_policy([ SecurityPolicyBasic256Sha256, "http://opcfoundation.org/UA/SecurityPolicy#None" ]) # Load or generate certificates if self.certificate_path and self.private_key_path: # Load existing certificates await self.server.load_certificate(self.certificate_path) await self.server.load_private_key(self.private_key_path) elif HAS_CERT_GEN and setup_self_signed_cert: # Generate self-signed certificate for development await setup_self_signed_cert( self.server, "Calejo Control OPC UA Server", "calejo-control.com", "IT", "Rome", "Lazio", "calejo-control.com" ) else: # Certificate generation not available, use basic security logger.warning("certificate_generation_not_available") self.server.set_security_policy([ "http://opcfoundation.org/UA/SecurityPolicy#None" ]) return # Configure certificate validation validator = CertificateValidator(CertificateValidatorOptions()) self.server.set_certificate_validator(validator) logger.info("opcua_security_configured") except Exception as e: logger.error("failed_to_configure_opcua_security", error=str(e)) raise async def _on_client_connect(self, session, endpoint): """Handle client connection with security logging.""" client_id = str(session.session_id) client_info = { 'session_id': session.session_id, 'endpoint': endpoint, 'connected_at': datetime.now(), 'user_identity': session.user, 'security_policy': session.security_policy, 'client_certificate': getattr(session, 'peer_certificate', None) } self.connected_clients[client_id] = client_info # Log connection event self.audit_logger.log_security_event( event_type=AuditEventType.USER_LOGIN, severity=AuditSeverity.LOW, event_data={ "protocol": "OPC_UA", "client_id": client_id, "endpoint": endpoint, "user_identity": str(session.user) if session.user else "anonymous", "security_policy": str(session.security_policy) } ) logger.info( "opcua_client_connected", client_id=client_id, endpoint=endpoint, user_identity=str(session.user) if session.user else "anonymous" ) async def _on_client_disconnect(self, session): """Handle client disconnection with security logging.""" client_id = str(session.session_id) client_info = self.connected_clients.pop(client_id, None) if client_info: # Log disconnection event self.audit_logger.log_security_event( event_type=AuditEventType.USER_LOGOUT, severity=AuditSeverity.LOW, event_data={ "protocol": "OPC_UA", "client_id": client_id, "endpoint": client_info['endpoint'], "duration_seconds": (datetime.now() - client_info['connected_at']).total_seconds() } ) logger.info( "opcua_client_disconnected", client_id=client_id, duration_seconds=(datetime.now() - client_info['connected_at']).total_seconds() ) async def stop(self): """Stop the OPC UA server.""" if self.server: await self.server.stop() # Log security event self.audit_logger.log_security_event( event_type=AuditEventType.SYSTEM_STOP, severity=AuditSeverity.LOW, event_data={ "protocol": "OPC_UA", "endpoint": self.endpoint, "connected_clients": len(self.connected_clients) } ) logger.info("opcua_server_stopped", connected_clients=len(self.connected_clients)) def _get_station_id_from_node(self, node: Node) -> Optional[str]: """Extract station ID from OPC UA node path.""" try: # Node path format: Objects/CalejoControl/Station_/... node_path = str(node) if "Station_" in node_path: parts = node_path.split("/") for part in parts: if part.startswith("Station_"): return part.replace("Station_", "") except Exception: pass return None def _check_node_access(self, session, node: Node, operation: str) -> bool: """Check if client has permission to access node.""" try: # Extract station ID from node station_id = self._get_station_id_from_node(node) # Get user identity from session user_identity = session.user # Map OPC UA operation to permission permission_map = { "read": "read_pump_status", "write": "write_pump_setpoint", "browse": "read_pump_status" } required_permission = permission_map.get(operation, "read_pump_status") # Log access attempt self.audit_logger.log_security_event( event_type=AuditEventType.ACCESS_DENIED, severity=AuditSeverity.MEDIUM, event_data={ "protocol": "OPC_UA", "client_id": str(session.session_id), "node": str(node), "operation": operation, "station_id": station_id, "user_identity": str(user_identity) if user_identity else "anonymous", "required_permission": required_permission } ) # For now, allow all access (will be enhanced with proper user mapping) # In production, this would validate against user permissions return True except Exception as e: logger.error("failed_to_check_node_access", error=str(e)) return False async def _create_object_structure(self): """Create the OPC UA object structure.""" # Get objects node self.objects_node = self.server.get_objects_node() # Create Calejo Control folder calejo_folder = await self.objects_node.add_folder( self.namespace_idx, "CalejoControl" ) # Create stations and pumps structure stations = self.setpoint_manager.discovery.get_stations() for station_id, station in stations.items(): # Create station folder station_folder = await calejo_folder.add_folder( self.namespace_idx, f"Station_{station_id}" ) # Add station info variables station_name_var = await station_folder.add_variable( self.namespace_idx, "StationName", station.get('station_name', station_id) ) await station_name_var.set_writable(False) # Create pumps for this station pumps = self.setpoint_manager.discovery.get_pumps(station_id) for pump in pumps: pump_id = pump['pump_id'] # Create pump object pump_obj = await station_folder.add_object( self.namespace_idx, f"Pump_{pump_id}" ) # Add pump variables pump_name_var = await pump_obj.add_variable( self.namespace_idx, "PumpName", pump.get('pump_name', pump_id) ) await pump_name_var.set_writable(False) control_type_var = await pump_obj.add_variable( self.namespace_idx, "ControlType", pump.get('control_type', 'UNKNOWN') ) await control_type_var.set_writable(False) # Add setpoint variable (writable for SCADA override) setpoint_var = await pump_obj.add_variable( self.namespace_idx, "Setpoint_Hz", 0.0 ) await setpoint_var.set_writable(True) # Add safety status variable safety_status_var = await pump_obj.add_variable( self.namespace_idx, "SafetyStatus", "normal" ) await safety_status_var.set_writable(False) # Add timestamp variable timestamp_var = await pump_obj.add_variable( self.namespace_idx, "LastUpdate", datetime.now().isoformat() ) await timestamp_var.set_writable(False) # Store node references self.pump_nodes[(station_id, pump_id)] = { 'object': pump_obj, 'setpoint': setpoint_var, 'safety_status': safety_status_var, 'timestamp': timestamp_var } self.station_nodes[station_id] = station_folder # Add server status variables server_status_folder = await calejo_folder.add_folder( self.namespace_idx, "ServerStatus" ) server_status_var = await server_status_folder.add_variable( self.namespace_idx, "Status", "running" ) await server_status_var.set_writable(False) uptime_var = await server_status_folder.add_variable( self.namespace_idx, "Uptime", 0 ) await uptime_var.set_writable(False) total_pumps_var = await server_status_folder.add_variable( self.namespace_idx, "TotalPumps", len(self.pump_nodes) ) await total_pumps_var.set_writable(False) # Add security status variables security_status_folder = await calejo_folder.add_folder( self.namespace_idx, "SecurityStatus" ) security_enabled_var = await security_status_folder.add_variable( self.namespace_idx, "SecurityEnabled", self.enable_security ) await security_enabled_var.set_writable(False) connected_clients_var = await security_status_folder.add_variable( self.namespace_idx, "ConnectedClients", len(self.connected_clients) ) await connected_clients_var.set_writable(False) last_security_event_var = await security_status_folder.add_variable( self.namespace_idx, "LastSecurityEvent", "server_started" ) await last_security_event_var.set_writable(False) async def _update_setpoints_loop(self): """Background task to update setpoints periodically.""" while True: try: await self._update_setpoints() await asyncio.sleep(5) # Update every 5 seconds except Exception as e: logger.error("failed_to_update_setpoints", error=str(e)) await asyncio.sleep(10) # Wait longer on error async def _update_setpoints(self): """Update all setpoint values in OPC UA server.""" for (station_id, pump_id), nodes in self.pump_nodes.items(): try: # Get current setpoint setpoint = self.setpoint_manager.get_current_setpoint(station_id, pump_id) if setpoint is not None: # Update setpoint variable await nodes['setpoint'].write_value(float(setpoint)) # Update safety status safety_status = "normal" if self.setpoint_manager.emergency_stop_manager.is_emergency_stop_active(station_id, pump_id): safety_status = "emergency_stop" elif self.setpoint_manager.watchdog.is_failsafe_active(station_id, pump_id): safety_status = "failsafe" await nodes['safety_status'].write_value(safety_status) # Update timestamp await nodes['timestamp'].write_value(datetime.now().isoformat()) except Exception as e: logger.error( "failed_to_update_pump_setpoint", station_id=station_id, pump_id=pump_id, error=str(e) ) def get_security_status(self) -> Dict[str, Any]: """Get current security status of OPC UA server.""" return { "security_enabled": self.enable_security, "connected_clients": len(self.connected_clients), "certificate_configured": bool(self.certificate_path and self.private_key_path), "client_details": [ { "client_id": client_id, "connected_at": info['connected_at'].isoformat(), "endpoint": info['endpoint'], "user_identity": str(info['user_identity']) if info['user_identity'] else "anonymous", "security_policy": str(info['security_policy']) } for client_id, info in self.connected_clients.items() ] }