#!/usr/bin/env python3 """ Mock SCADA Server for Testing Simulates a real SCADA system with OPC UA and Modbus interfaces """ import asyncio import logging import random import time from datetime import datetime from typing import Dict, Any # OPC UA imports try: from asyncua import Server, Node OPCUA_AVAILABLE = True except ImportError: OPCUA_AVAILABLE = False print("Warning: asyncua not available. Install with: pip install asyncua") # Modbus imports try: from pymodbus.server import StartTcpServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext MODBUS_AVAILABLE = True except ImportError: MODBUS_AVAILABLE = False print("Warning: pymodbus not available. Install with: pip install pymodbus") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MockSCADAServer: """Mock SCADA server that simulates industrial control systems""" def __init__(self): self.opcua_server = None self.modbus_server = None self.running = False # Simulated process data self.process_data = { 'temperature': 25.0, 'pressure': 101.3, 'flow_rate': 100.0, 'level': 50.0, 'valve_position': 75.0, 'pump_status': True, 'alarm_status': False, 'setpoint': 100.0 } # Historical data for trends self.historical_data = { 'temperature': [], 'pressure': [], 'flow_rate': [] } async def start_opcua_server(self, endpoint: str = "opc.tcp://0.0.0.0:4840"): """Start OPC UA server""" if not OPCUA_AVAILABLE: logger.warning("OPC UA not available, skipping OPC UA server") return try: self.opcua_server = Server() await self.opcua_server.init() self.opcua_server.set_endpoint(endpoint) self.opcua_server.set_server_name("Mock SCADA Server") # Setup namespace uri = "http://mock-scada.org" idx = await self.opcua_server.register_namespace(uri) # Create object node objects = self.opcua_server.get_objects_node() scada_object = await objects.add_object(idx, "SCADA_System") # Add process variables self.opcua_nodes = {} for name, value in self.process_data.items(): if isinstance(value, bool): node = await scada_object.add_variable(idx, name, value) elif isinstance(value, (int, float)): node = await scada_object.add_variable(idx, name, float(value)) else: continue await node.set_writable() self.opcua_nodes[name] = node await self.opcua_server.start() logger.info(f"Mock OPC UA server started at {endpoint}") except Exception as e: logger.error(f"Failed to start OPC UA server: {e}") def start_modbus_server(self, port: int = 502): """Start Modbus TCP server""" if not MODBUS_AVAILABLE: logger.warning("Modbus not available, skipping Modbus server") return try: # Create data blocks store = ModbusSlaveContext( di=ModbusSequentialDataBlock(0, [0]*100), # Discrete Inputs co=ModbusSequentialDataBlock(0, [0]*100), # Coils hr=ModbusSequentialDataBlock(0, [0]*100), # Holding Registers ir=ModbusSequentialDataBlock(0, [0]*100) # Input Registers ) context = ModbusServerContext(slaves=store, single=True) # Start server in background thread import threading def run_modbus_server(): StartTcpServer(context=context, address=("0.0.0.0", port)) modbus_thread = threading.Thread(target=run_modbus_server, daemon=True) modbus_thread.start() self.modbus_server = modbus_thread logger.info(f"Mock Modbus server started on port {port}") except Exception as e: logger.error(f"Failed to start Modbus server: {e}") async def update_process_data(self): """Update simulated process data""" while self.running: try: # Simulate realistic process variations self.process_data['temperature'] += random.uniform(-0.5, 0.5) self.process_data['temperature'] = max(20.0, min(80.0, self.process_data['temperature'])) self.process_data['pressure'] += random.uniform(-0.1, 0.1) self.process_data['pressure'] = max(95.0, min(110.0, self.process_data['pressure'])) self.process_data['flow_rate'] += random.uniform(-2.0, 2.0) self.process_data['flow_rate'] = max(0.0, min(200.0, self.process_data['flow_rate'])) self.process_data['level'] += random.uniform(-1.0, 1.0) self.process_data['level'] = max(0.0, min(100.0, self.process_data['level'])) # Simulate valve and pump behavior if self.process_data['flow_rate'] > 150: self.process_data['valve_position'] = max(0, self.process_data['valve_position'] - 1) elif self.process_data['flow_rate'] < 50: self.process_data['valve_position'] = min(100, self.process_data['valve_position'] + 1) # Simulate alarms self.process_data['alarm_status'] = ( self.process_data['temperature'] > 75.0 or self.process_data['pressure'] > 108.0 or self.process_data['level'] > 95.0 ) # Update OPC UA nodes if available if self.opcua_server and self.opcua_nodes: for name, node in self.opcua_nodes.items(): await node.write_value(self.process_data[name]) # Store historical data timestamp = datetime.now() self.historical_data['temperature'].append({ 'timestamp': timestamp, 'value': self.process_data['temperature'] }) self.historical_data['pressure'].append({ 'timestamp': timestamp, 'value': self.process_data['pressure'] }) self.historical_data['flow_rate'].append({ 'timestamp': timestamp, 'value': self.process_data['flow_rate'] }) # Keep only last 1000 points for key in self.historical_data: if len(self.historical_data[key]) > 1000: self.historical_data[key] = self.historical_data[key][-1000:] await asyncio.sleep(1) # Update every second except Exception as e: logger.error(f"Error updating process data: {e}") await asyncio.sleep(5) def get_status(self) -> Dict[str, Any]: """Get server status""" return { 'running': self.running, 'opcua_available': OPCUA_AVAILABLE and self.opcua_server is not None, 'modbus_available': MODBUS_AVAILABLE and self.modbus_server is not None, 'process_data': self.process_data, 'data_points': sum(len(data) for data in self.historical_data.values()) } async def start(self): """Start the mock SCADA server""" if self.running: return self.running = True # Start servers await self.start_opcua_server() self.start_modbus_server() # Start data update loop self.update_task = asyncio.create_task(self.update_process_data()) logger.info("Mock SCADA server started") async def stop(self): """Stop the mock SCADA server""" self.running = False if hasattr(self, 'update_task'): self.update_task.cancel() try: await self.update_task except asyncio.CancelledError: pass if self.opcua_server: await self.opcua_server.stop() logger.info("Mock SCADA server stopped") async def main(): """Main function to run the mock SCADA server""" server = MockSCADAServer() try: await server.start() # Keep server running while True: await asyncio.sleep(1) except KeyboardInterrupt: print("\nShutting down mock SCADA server...") await server.stop() if __name__ == "__main__": asyncio.run(main())