CalejoControl/tests/mocks/mock-scada-server.py

254 lines
9.1 KiB
Python

#!/usr/bin/env python3
"""
Mock SCADA Server for Testing
Simulates a real SCADA system with OPC UA and Modbus interfaces
"""
import asyncio
import logging
import random
import time
from datetime import datetime
from typing import Dict, Any
# OPC UA imports
try:
from asyncua import Server, Node
OPCUA_AVAILABLE = True
except ImportError:
OPCUA_AVAILABLE = False
print("Warning: asyncua not available. Install with: pip install asyncua")
# Modbus imports
try:
from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
MODBUS_AVAILABLE = True
except ImportError:
MODBUS_AVAILABLE = False
print("Warning: pymodbus not available. Install with: pip install pymodbus")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MockSCADAServer:
"""Mock SCADA server that simulates industrial control systems"""
def __init__(self):
self.opcua_server = None
self.modbus_server = None
self.running = False
# Simulated process data
self.process_data = {
'temperature': 25.0,
'pressure': 101.3,
'flow_rate': 100.0,
'level': 50.0,
'valve_position': 75.0,
'pump_status': True,
'alarm_status': False,
'setpoint': 100.0
}
# Historical data for trends
self.historical_data = {
'temperature': [],
'pressure': [],
'flow_rate': []
}
async def start_opcua_server(self, endpoint: str = "opc.tcp://0.0.0.0:4840"):
"""Start OPC UA server"""
if not OPCUA_AVAILABLE:
logger.warning("OPC UA not available, skipping OPC UA server")
return
try:
self.opcua_server = Server()
await self.opcua_server.init()
self.opcua_server.set_endpoint(endpoint)
self.opcua_server.set_server_name("Mock SCADA Server")
# Setup namespace
uri = "http://mock-scada.org"
idx = await self.opcua_server.register_namespace(uri)
# Create object node
objects = self.opcua_server.get_objects_node()
scada_object = await objects.add_object(idx, "SCADA_System")
# Add process variables
self.opcua_nodes = {}
for name, value in self.process_data.items():
if isinstance(value, bool):
node = await scada_object.add_variable(idx, name, value)
elif isinstance(value, (int, float)):
node = await scada_object.add_variable(idx, name, float(value))
else:
continue
await node.set_writable()
self.opcua_nodes[name] = node
await self.opcua_server.start()
logger.info(f"Mock OPC UA server started at {endpoint}")
except Exception as e:
logger.error(f"Failed to start OPC UA server: {e}")
def start_modbus_server(self, port: int = 502):
"""Start Modbus TCP server"""
if not MODBUS_AVAILABLE:
logger.warning("Modbus not available, skipping Modbus server")
return
try:
# Create data blocks
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [0]*100), # Discrete Inputs
co=ModbusSequentialDataBlock(0, [0]*100), # Coils
hr=ModbusSequentialDataBlock(0, [0]*100), # Holding Registers
ir=ModbusSequentialDataBlock(0, [0]*100) # Input Registers
)
context = ModbusServerContext(slaves=store, single=True)
# Start server in background thread
import threading
def run_modbus_server():
StartTcpServer(context=context, address=("0.0.0.0", port))
modbus_thread = threading.Thread(target=run_modbus_server, daemon=True)
modbus_thread.start()
self.modbus_server = modbus_thread
logger.info(f"Mock Modbus server started on port {port}")
except Exception as e:
logger.error(f"Failed to start Modbus server: {e}")
async def update_process_data(self):
"""Update simulated process data"""
while self.running:
try:
# Simulate realistic process variations
self.process_data['temperature'] += random.uniform(-0.5, 0.5)
self.process_data['temperature'] = max(20.0, min(80.0, self.process_data['temperature']))
self.process_data['pressure'] += random.uniform(-0.1, 0.1)
self.process_data['pressure'] = max(95.0, min(110.0, self.process_data['pressure']))
self.process_data['flow_rate'] += random.uniform(-2.0, 2.0)
self.process_data['flow_rate'] = max(0.0, min(200.0, self.process_data['flow_rate']))
self.process_data['level'] += random.uniform(-1.0, 1.0)
self.process_data['level'] = max(0.0, min(100.0, self.process_data['level']))
# Simulate valve and pump behavior
if self.process_data['flow_rate'] > 150:
self.process_data['valve_position'] = max(0, self.process_data['valve_position'] - 1)
elif self.process_data['flow_rate'] < 50:
self.process_data['valve_position'] = min(100, self.process_data['valve_position'] + 1)
# Simulate alarms
self.process_data['alarm_status'] = (
self.process_data['temperature'] > 75.0 or
self.process_data['pressure'] > 108.0 or
self.process_data['level'] > 95.0
)
# Update OPC UA nodes if available
if self.opcua_server and self.opcua_nodes:
for name, node in self.opcua_nodes.items():
await node.write_value(self.process_data[name])
# Store historical data
timestamp = datetime.now()
self.historical_data['temperature'].append({
'timestamp': timestamp,
'value': self.process_data['temperature']
})
self.historical_data['pressure'].append({
'timestamp': timestamp,
'value': self.process_data['pressure']
})
self.historical_data['flow_rate'].append({
'timestamp': timestamp,
'value': self.process_data['flow_rate']
})
# Keep only last 1000 points
for key in self.historical_data:
if len(self.historical_data[key]) > 1000:
self.historical_data[key] = self.historical_data[key][-1000:]
await asyncio.sleep(1) # Update every second
except Exception as e:
logger.error(f"Error updating process data: {e}")
await asyncio.sleep(5)
def get_status(self) -> Dict[str, Any]:
"""Get server status"""
return {
'running': self.running,
'opcua_available': OPCUA_AVAILABLE and self.opcua_server is not None,
'modbus_available': MODBUS_AVAILABLE and self.modbus_server is not None,
'process_data': self.process_data,
'data_points': sum(len(data) for data in self.historical_data.values())
}
async def start(self):
"""Start the mock SCADA server"""
if self.running:
return
self.running = True
# Start servers
await self.start_opcua_server()
self.start_modbus_server()
# Start data update loop
self.update_task = asyncio.create_task(self.update_process_data())
logger.info("Mock SCADA server started")
async def stop(self):
"""Stop the mock SCADA server"""
self.running = False
if hasattr(self, 'update_task'):
self.update_task.cancel()
try:
await self.update_task
except asyncio.CancelledError:
pass
if self.opcua_server:
await self.opcua_server.stop()
logger.info("Mock SCADA server stopped")
async def main():
"""Main function to run the mock SCADA server"""
server = MockSCADAServer()
try:
await server.start()
# Keep server running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nShutting down mock SCADA server...")
await server.stop()
if __name__ == "__main__":
asyncio.run(main())