CalejoControl/src/protocols/opcua_server.py

708 lines
28 KiB
Python

"""
OPC UA Server for Calejo Control Adapter.
Provides OPC UA interface for SCADA systems to access setpoints and status.
Enhanced with performance optimizations for Phase 5.
"""
import asyncio
from typing import Dict, Optional, Tuple, Any
from datetime import datetime, timedelta
import structlog
from asyncua import Server, Node
from asyncua.common.methods import uamethod
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
from asyncua.crypto.validator import CertificateValidator, CertificateValidatorOptions
# Try to import certificate generation functions
# These might not be available in all asyncua versions
try:
from asyncua.crypto.cert_gen import setup_self_signed_cert
HAS_CERT_GEN = True
except ImportError:
HAS_CERT_GEN = False
setup_self_signed_cert = None
from src.core.setpoint_manager import SetpointManager
from src.core.security import SecurityManager, UserRole
from src.core.compliance_audit import ComplianceAuditLogger, AuditEventType, AuditSeverity
logger = structlog.get_logger()
class NodeCache:
"""Cache for frequently accessed OPC UA nodes to improve performance."""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 300):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self._cache: Dict[str, Tuple[Node, datetime]] = {}
def get(self, node_id: str) -> Optional[Node]:
"""Get node from cache if it exists and is not expired."""
if node_id in self._cache:
node, timestamp = self._cache[node_id]
if datetime.now() - timestamp < timedelta(seconds=self.ttl_seconds):
return node
else:
# Remove expired entry
del self._cache[node_id]
return None
def set(self, node_id: str, node: Node):
"""Add node to cache."""
# Remove oldest entry if cache is full
if len(self._cache) >= self.max_size:
oldest_key = next(iter(self._cache))
del self._cache[oldest_key]
self._cache[node_id] = (node, datetime.now())
def clear(self):
"""Clear the entire cache."""
self._cache.clear()
class OPCUAServer:
"""OPC UA Server for Calejo Control Adapter."""
def __init__(
self,
setpoint_manager: SetpointManager,
security_manager: SecurityManager,
audit_logger: ComplianceAuditLogger,
endpoint: str = "opc.tcp://0.0.0.0:4840",
server_name: str = "Calejo Control OPC UA Server",
enable_security: bool = True,
certificate_path: Optional[str] = None,
private_key_path: Optional[str] = None,
enable_caching: bool = True,
cache_ttl_seconds: int = 300
):
self.setpoint_manager = setpoint_manager
self.security_manager = security_manager
self.audit_logger = audit_logger
self.endpoint = endpoint
self.server_name = server_name
self.enable_security = enable_security
self.certificate_path = certificate_path
self.private_key_path = private_key_path
self.enable_caching = enable_caching
self.cache_ttl_seconds = cache_ttl_seconds
self.server = None
self.namespace_idx = None
# Security tracking
self.connected_clients: Dict[str, Dict] = {} # client_id -> client_info
# Node references
self.objects_node = None
self.station_nodes = {}
self.pump_variables = {}
self.pump_nodes = {}
self.simulation_task = None
self.pump_nodes = {}
# Performance optimizations
self.node_cache = NodeCache(ttl_seconds=cache_ttl_seconds) if enable_caching else None
self._last_setpoint_update = datetime.now()
self._setpoint_cache: Dict[Tuple[str, str], float] = {} # (station_id, pump_id) -> setpoint
async def start(self):
"""Start the OPC UA server."""
try:
# Create server
self.server = Server()
await self.server.init()
# Configure server
self.server.set_endpoint(self.endpoint)
self.server.set_server_name(self.server_name)
# Configure security if enabled
if self.enable_security:
await self._configure_security()
else:
# No security (for development only)
self.server.set_security_policy([
"http://opcfoundation.org/UA/SecurityPolicy#None"
])
# Setup namespace
uri = "http://calejo-control.com/OPCUA/"
self.namespace_idx = await self.server.register_namespace(uri)
# Create object structure
await self._create_object_structure()
# Start server
await self.server.start()
# Log security configuration
security_mode = "secure" if self.enable_security else "insecure"
logger.info(
"opcua_server_started",
endpoint=self.endpoint,
namespace_idx=self.namespace_idx,
security_mode=security_mode
)
# Log security event
self.audit_logger.log_security_event(
event_type=AuditEventType.SYSTEM_START,
severity=AuditSeverity.LOW,
event_data={
"protocol": "OPC_UA",
"endpoint": self.endpoint,
"security_enabled": self.enable_security,
"server_name": self.server_name
}
)
# Start background task to update setpoints
asyncio.create_task(self._update_setpoints_loop())
# Start simulation task for mock industrial data
self.simulation_task = asyncio.create_task(self._simulate_pump_behavior())
except Exception as e:
logger.error("failed_to_start_opcua_server", error=str(e))
raise
async def _configure_security(self):
"""Configure OPC UA security with certificates."""
try:
# Set security policies
self.server.set_security_policy([
SecurityPolicyBasic256Sha256,
"http://opcfoundation.org/UA/SecurityPolicy#None"
])
# Load or generate certificates
if self.certificate_path and self.private_key_path:
# Load existing certificates
await self.server.load_certificate(self.certificate_path)
await self.server.load_private_key(self.private_key_path)
elif HAS_CERT_GEN and setup_self_signed_cert:
# Generate self-signed certificate for development
await setup_self_signed_cert(
self.server,
"Calejo Control OPC UA Server",
"calejo-control.com",
"IT",
"Rome",
"Lazio",
"calejo-control.com"
)
else:
# Certificate generation not available, use basic security
logger.warning("certificate_generation_not_available")
self.server.set_security_policy([
"http://opcfoundation.org/UA/SecurityPolicy#None"
])
return
# Configure certificate validation
validator = CertificateValidator(CertificateValidatorOptions())
self.server.set_certificate_validator(validator)
logger.info("opcua_security_configured")
except Exception as e:
logger.error("failed_to_configure_opcua_security", error=str(e))
raise
async def _on_client_connect(self, session, endpoint):
"""Handle client connection with security logging."""
client_id = str(session.session_id)
client_info = {
'session_id': session.session_id,
'endpoint': endpoint,
'connected_at': datetime.now(),
'user_identity': session.user,
'security_policy': session.security_policy,
'client_certificate': getattr(session, 'peer_certificate', None)
}
self.connected_clients[client_id] = client_info
# Log connection event
self.audit_logger.log_security_event(
event_type=AuditEventType.USER_LOGIN,
severity=AuditSeverity.LOW,
event_data={
"protocol": "OPC_UA",
"client_id": client_id,
"endpoint": endpoint,
"user_identity": str(session.user) if session.user else "anonymous",
"security_policy": str(session.security_policy)
}
)
logger.info(
"opcua_client_connected",
client_id=client_id,
endpoint=endpoint,
user_identity=str(session.user) if session.user else "anonymous"
)
async def _on_client_disconnect(self, session):
"""Handle client disconnection with security logging."""
client_id = str(session.session_id)
client_info = self.connected_clients.pop(client_id, None)
if client_info:
# Log disconnection event
self.audit_logger.log_security_event(
event_type=AuditEventType.USER_LOGOUT,
severity=AuditSeverity.LOW,
event_data={
"protocol": "OPC_UA",
"client_id": client_id,
"endpoint": client_info['endpoint'],
"duration_seconds": (datetime.now() - client_info['connected_at']).total_seconds()
}
)
logger.info(
"opcua_client_disconnected",
client_id=client_id,
duration_seconds=(datetime.now() - client_info['connected_at']).total_seconds()
)
async def stop(self):
"""Stop the OPC UA server."""
if self.server:
await self.server.stop()
# Log security event
self.audit_logger.log_security_event(
event_type=AuditEventType.SYSTEM_STOP,
severity=AuditSeverity.LOW,
event_data={
"protocol": "OPC_UA",
"endpoint": self.endpoint,
"connected_clients": len(self.connected_clients)
}
)
logger.info("opcua_server_stopped", connected_clients=len(self.connected_clients))
def _get_station_id_from_node(self, node: Node) -> Optional[str]:
"""Extract station ID from OPC UA node path."""
try:
# Node path format: Objects/CalejoControl/Station_<station_id>/...
node_path = str(node)
if "Station_" in node_path:
parts = node_path.split("/")
for part in parts:
if part.startswith("Station_"):
return part.replace("Station_", "")
except Exception:
pass
return None
def _check_node_access(self, session, node: Node, operation: str) -> bool:
"""Check if client has permission to access node."""
try:
# Extract station ID from node
station_id = self._get_station_id_from_node(node)
# Get user identity from session
user_identity = session.user
# Map OPC UA operation to permission
permission_map = {
"read": "read_pump_status",
"write": "write_pump_setpoint",
"browse": "read_pump_status"
}
required_permission = permission_map.get(operation, "read_pump_status")
# Log access attempt
self.audit_logger.log_security_event(
event_type=AuditEventType.ACCESS_DENIED,
severity=AuditSeverity.MEDIUM,
event_data={
"protocol": "OPC_UA",
"client_id": str(session.session_id),
"node": str(node),
"operation": operation,
"station_id": station_id,
"user_identity": str(user_identity) if user_identity else "anonymous",
"required_permission": required_permission
}
)
# For now, allow all access (will be enhanced with proper user mapping)
# In production, this would validate against user permissions
return True
except Exception as e:
logger.error("failed_to_check_node_access", error=str(e))
return False
async def _create_object_structure(self):
"""Create the OPC UA object structure."""
# Get objects node
self.objects_node = self.server.get_objects_node()
# Create Calejo Control folder
calejo_folder = await self.objects_node.add_folder(
self.namespace_idx,
"CalejoControl"
)
# Create stations and pumps structure
stations = self.setpoint_manager.discovery.get_stations()
for station_id, station in stations.items():
# Create station folder
station_folder = await calejo_folder.add_folder(
self.namespace_idx,
f"Station_{station_id}"
)
# Add station info variables
station_name_var = await station_folder.add_variable(
self.namespace_idx,
"StationName",
station.get('station_name', station_id)
)
await station_name_var.set_writable(False)
# Create pumps for this station
pumps = self.setpoint_manager.discovery.get_pumps(station_id)
for pump in pumps:
pump_id = pump['pump_id']
# Create pump object
pump_obj = await station_folder.add_object(
self.namespace_idx,
f"Pump_{pump_id}"
)
# Add pump variables
pump_name_var = await pump_obj.add_variable(
self.namespace_idx,
"PumpName",
pump.get('pump_name', pump_id)
)
await pump_name_var.set_writable(False)
control_type_var = await pump_obj.add_variable(
self.namespace_idx,
"ControlType",
pump.get('control_type', 'UNKNOWN')
)
await control_type_var.set_writable(False)
# Add setpoint variable (writable for SCADA override)
setpoint_var = await pump_obj.add_variable(
self.namespace_idx,
"Setpoint_Hz",
0.0
)
await setpoint_var.set_writable(True)
# Add actual speed variable (read-only, simulated)
actual_speed_var = await pump_obj.add_variable(
self.namespace_idx,
"ActualSpeed_Hz",
0.0
)
await actual_speed_var.set_writable(False)
# Add power consumption variable
power_var = await pump_obj.add_variable(
self.namespace_idx,
"Power_kW",
0.0
)
await power_var.set_writable(False)
# Add flow rate variable
flow_var = await pump_obj.add_variable(
self.namespace_idx,
"FlowRate_m3h",
0.0
)
await flow_var.set_writable(False)
# Add safety status variable
safety_status_var = await pump_obj.add_variable(
self.namespace_idx,
"SafetyStatus",
"normal"
)
await safety_status_var.set_writable(False)
# Add timestamp variable
timestamp_var = await pump_obj.add_variable(
self.namespace_idx,
"LastUpdate",
datetime.now().isoformat()
)
await timestamp_var.set_writable(False)
# Store variable references for simulation
self.pump_variables[(station_id, pump_id)] = {
'setpoint': setpoint_var,
'actual_speed': actual_speed_var,
'power': power_var,
'flow': flow_var,
'safety_status': safety_status_var,
'timestamp': timestamp_var
}
# Store node references
self.pump_nodes[(station_id, pump_id)] = {
'object': pump_obj,
'setpoint': setpoint_var,
'safety_status': safety_status_var,
'timestamp': timestamp_var
}
self.station_nodes[station_id] = station_folder
# Add server status variables
server_status_folder = await calejo_folder.add_folder(
self.namespace_idx,
"ServerStatus"
)
server_status_var = await server_status_folder.add_variable(
self.namespace_idx,
"Status",
"running"
)
await server_status_var.set_writable(False)
uptime_var = await server_status_folder.add_variable(
self.namespace_idx,
"Uptime",
0
)
await uptime_var.set_writable(False)
total_pumps_var = await server_status_folder.add_variable(
self.namespace_idx,
"TotalPumps",
len(self.pump_nodes)
)
await total_pumps_var.set_writable(False)
# Add security status variables
security_status_folder = await calejo_folder.add_folder(
self.namespace_idx,
"SecurityStatus"
)
security_enabled_var = await security_status_folder.add_variable(
self.namespace_idx,
"SecurityEnabled",
self.enable_security
)
await security_enabled_var.set_writable(False)
connected_clients_var = await security_status_folder.add_variable(
self.namespace_idx,
"ConnectedClients",
len(self.connected_clients)
)
await connected_clients_var.set_writable(False)
last_security_event_var = await security_status_folder.add_variable(
self.namespace_idx,
"LastSecurityEvent",
"server_started"
)
await last_security_event_var.set_writable(False)
async def _update_setpoints_loop(self):
"""Background task to update setpoints periodically."""
while True:
try:
await self._update_setpoints()
await asyncio.sleep(5) # Update every 5 seconds
except Exception as e:
logger.error("failed_to_update_setpoints", error=str(e))
await asyncio.sleep(10) # Wait longer on error
async def _update_setpoints(self):
"""Update all setpoint values in OPC UA server."""
for (station_id, pump_id), nodes in self.pump_nodes.items():
try:
# Get current setpoint
setpoint = self.setpoint_manager.get_current_setpoint(station_id, pump_id)
if setpoint is not None:
# Update setpoint variable
await nodes['setpoint'].write_value(float(setpoint))
# Update safety status
safety_status = "normal"
if self.setpoint_manager.emergency_stop_manager.is_emergency_stop_active(station_id, pump_id):
safety_status = "emergency_stop"
elif self.setpoint_manager.watchdog.is_failsafe_active(station_id, pump_id):
safety_status = "failsafe"
await nodes['safety_status'].write_value(safety_status)
# Update timestamp
await nodes['timestamp'].write_value(datetime.now().isoformat())
except Exception as e:
logger.error(
"failed_to_update_pump_setpoint",
station_id=station_id,
pump_id=pump_id,
error=str(e)
)
def get_security_status(self) -> Dict[str, Any]:
"""Get current security status of OPC UA server."""
return {
"security_enabled": self.enable_security,
"connected_clients": len(self.connected_clients),
"certificate_configured": bool(self.certificate_path and self.private_key_path),
"client_details": [
{
"client_id": client_id,
"connected_at": info['connected_at'].isoformat(),
"endpoint": info['endpoint'],
"user_identity": str(info['user_identity']) if info['user_identity'] else "anonymous",
"security_policy": str(info['security_policy'])
}
for client_id, info in self.connected_clients.items()
]
}
def get_performance_status(self) -> Dict[str, Any]:
"""Get performance status information."""
cache_stats = {
"enabled": self.enable_caching,
"cache_size": len(self.node_cache._cache) if self.node_cache else 0,
"cache_hits": getattr(self.node_cache, '_hits', 0) if self.node_cache else 0,
"cache_misses": getattr(self.node_cache, '_misses', 0) if self.node_cache else 0,
"setpoint_cache_size": len(self._setpoint_cache)
}
return {
"caching": cache_stats,
"last_setpoint_update": self._last_setpoint_update.isoformat(),
"connected_clients": len(self.connected_clients)
}
async def _get_or_create_node(self, parent_node: Node, node_name: str, node_type: str = "Object") -> Node:
"""Get node from cache or create it if it doesn't exist."""
node_id = f"{parent_node.nodeid.to_string()}.{node_name}"
# Try to get from cache first
if self.node_cache:
cached_node = self.node_cache.get(node_id)
if cached_node:
return cached_node
# Node not in cache, create it
try:
if node_type == "Object":
node = await parent_node.add_object(self.namespace_idx, node_name)
elif node_type == "Variable":
node = await parent_node.add_variable(self.namespace_idx, node_name, 0.0)
elif node_type == "Folder":
node = await parent_node.add_folder(self.namespace_idx, node_name)
else:
raise ValueError(f"Unknown node type: {node_type}")
# Add to cache
if self.node_cache:
self.node_cache.set(node_id, node)
return node
except Exception as e:
logger.error("failed_to_create_node", node_name=node_name, error=str(e))
raise
async def _update_setpoint_cache(self):
"""Update the setpoint cache with current values."""
try:
current_setpoints = self.setpoint_manager.get_current_setpoints()
self._setpoint_cache = current_setpoints.copy()
self._last_setpoint_update = datetime.now()
# Log performance metrics
logger.debug(
"setpoint_cache_updated",
cache_size=len(self._setpoint_cache),
timestamp=self._last_setpoint_update.isoformat()
)
except Exception as e:
logger.error("failed_to_update_setpoint_cache", error=str(e))
async def _simulate_pump_behavior(self):
"""Background task to simulate realistic pump behavior."""
import random
import time
while True:
try:
# Update all pump variables with simulated data
for (station_id, pump_id), variables in self.pump_variables.items():
try:
# Get current setpoint
setpoint_node = variables['setpoint']
current_setpoint = await setpoint_node.read_value()
# Simulate actual speed (follows setpoint with some lag and noise)
actual_speed_node = variables['actual_speed']
target_speed = current_setpoint
# Add realistic behavior: pumps don't instantly reach setpoint
current_actual = await actual_speed_node.read_value()
if current_actual < target_speed:
new_actual = min(current_actual + random.uniform(0.1, 0.5), target_speed)
else:
new_actual = max(current_actual - random.uniform(0.1, 0.5), target_speed)
# Add some noise
new_actual += random.uniform(-0.2, 0.2)
new_actual = max(0.0, min(50.0, new_actual)) # Clamp to 0-50 Hz
await actual_speed_node.write_value(new_actual)
# Simulate power consumption (roughly proportional to speed^3)
power_node = variables['power']
power_kw = (new_actual / 50.0) ** 3 * 75.0 # 75 kW max power
power_kw += random.uniform(-2.0, 2.0)
await power_node.write_value(max(0.0, power_kw))
# Simulate flow rate (roughly proportional to speed)
flow_node = variables['flow']
flow_rate = (new_actual / 50.0) * 500.0 # 500 m³/h max flow
flow_rate += random.uniform(-10.0, 10.0)
await flow_node.write_value(max(0.0, flow_rate))
# Update timestamp
timestamp_node = variables['timestamp']
await timestamp_node.write_value(datetime.now().isoformat())
# Occasionally simulate safety events
if random.random() < 0.01: # 1% chance per update
safety_node = variables['safety_status']
await safety_node.write_value("warning")
elif random.random() < 0.005: # 0.5% chance to return to normal
safety_node = variables['safety_status']
await safety_node.write_value("normal")
except Exception as e:
logger.error("failed_to_simulate_pump", station_id=station_id, pump_id=pump_id, error=str(e))
# Update every 2 seconds
await asyncio.sleep(2)
except Exception as e:
logger.error("pump_simulation_error", error=str(e))
await asyncio.sleep(5)