203 lines
6.6 KiB
Python
203 lines
6.6 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Mock SCADA Server for testing
|
||
|
|
Simulates industrial SCADA system with process data and equipment control
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import random
|
||
|
|
import time
|
||
|
|
from datetime import datetime
|
||
|
|
from flask import Flask, jsonify, request
|
||
|
|
|
||
|
|
app = Flask(__name__)
|
||
|
|
|
||
|
|
# Mock SCADA data
|
||
|
|
current_data = {
|
||
|
|
"temperature": {"value": 75.5, "unit": "°C", "min": 50, "max": 100},
|
||
|
|
"pressure": {"value": 2.5, "unit": "bar", "min": 1, "max": 5},
|
||
|
|
"flow_rate": {"value": 120.0, "unit": "m³/h", "min": 50, "max": 200},
|
||
|
|
"level": {"value": 65.0, "unit": "%", "min": 0, "max": 100},
|
||
|
|
"power": {"value": 450.0, "unit": "kW", "min": 200, "max": 600},
|
||
|
|
"status": {"value": "RUNNING", "unit": "-"},
|
||
|
|
"efficiency": {"value": 87.5, "unit": "%", "min": 0, "max": 100}
|
||
|
|
}
|
||
|
|
|
||
|
|
# Equipment status
|
||
|
|
equipment_status = {
|
||
|
|
"pump_1": "RUNNING",
|
||
|
|
"pump_2": "STOPPED",
|
||
|
|
"valve_1": "OPEN",
|
||
|
|
"valve_2": "CLOSED",
|
||
|
|
"compressor": "RUNNING",
|
||
|
|
"heater": "STOPPED"
|
||
|
|
}
|
||
|
|
|
||
|
|
# Alarm status
|
||
|
|
alarms = []
|
||
|
|
|
||
|
|
def update_data():
|
||
|
|
"""Update mock data with realistic variations"""
|
||
|
|
global current_data
|
||
|
|
|
||
|
|
# Temperature variation
|
||
|
|
current_data["temperature"]["value"] += random.uniform(-0.5, 0.5)
|
||
|
|
current_data["temperature"]["value"] = max(50, min(100, current_data["temperature"]["value"]))
|
||
|
|
|
||
|
|
# Pressure variation
|
||
|
|
current_data["pressure"]["value"] += random.uniform(-0.1, 0.1)
|
||
|
|
current_data["pressure"]["value"] = max(1, min(5, current_data["pressure"]["value"]))
|
||
|
|
|
||
|
|
# Flow rate variation
|
||
|
|
current_data["flow_rate"]["value"] += random.uniform(-2, 2)
|
||
|
|
current_data["flow_rate"]["value"] = max(50, min(200, current_data["flow_rate"]["value"]))
|
||
|
|
|
||
|
|
# Level variation
|
||
|
|
current_data["level"]["value"] += random.uniform(-1, 1)
|
||
|
|
current_data["level"]["value"] = max(0, min(100, current_data["level"]["value"]))
|
||
|
|
|
||
|
|
# Power variation
|
||
|
|
current_data["power"]["value"] += random.uniform(-5, 5)
|
||
|
|
current_data["power"]["value"] = max(200, min(600, current_data["power"]["value"]))
|
||
|
|
|
||
|
|
# Efficiency variation
|
||
|
|
current_data["efficiency"]["value"] += random.uniform(-0.5, 0.5)
|
||
|
|
current_data["efficiency"]["value"] = max(0, min(100, current_data["efficiency"]["value"]))
|
||
|
|
|
||
|
|
# Check for alarm conditions
|
||
|
|
check_alarms()
|
||
|
|
|
||
|
|
def check_alarms():
|
||
|
|
"""Check for alarm conditions"""
|
||
|
|
global alarms
|
||
|
|
|
||
|
|
# Clear old alarms
|
||
|
|
alarms = [alarm for alarm in alarms if datetime.now().timestamp() - alarm["timestamp"] < 300]
|
||
|
|
|
||
|
|
# Check temperature alarm
|
||
|
|
if current_data["temperature"]["value"] > 95:
|
||
|
|
add_alarm("HIGH_TEMPERATURE", f"Temperature high: {current_data['temperature']['value']:.1f}°C")
|
||
|
|
elif current_data["temperature"]["value"] < 55:
|
||
|
|
add_alarm("LOW_TEMPERATURE", f"Temperature low: {current_data['temperature']['value']:.1f}°C")
|
||
|
|
|
||
|
|
# Check pressure alarm
|
||
|
|
if current_data["pressure"]["value"] > 4.5:
|
||
|
|
add_alarm("HIGH_PRESSURE", f"Pressure high: {current_data['pressure']['value']:.1f} bar")
|
||
|
|
elif current_data["pressure"]["value"] < 1.5:
|
||
|
|
add_alarm("LOW_PRESSURE", f"Pressure low: {current_data['pressure']['value']:.1f} bar")
|
||
|
|
|
||
|
|
# Check level alarm
|
||
|
|
if current_data["level"]["value"] > 90:
|
||
|
|
add_alarm("HIGH_LEVEL", f"Level high: {current_data['level']['value']:.1f}%")
|
||
|
|
elif current_data["level"]["value"] < 20:
|
||
|
|
add_alarm("LOW_LEVEL", f"Level low: {current_data['level']['value']:.1f}%")
|
||
|
|
|
||
|
|
def add_alarm(type, message):
|
||
|
|
"""Add a new alarm"""
|
||
|
|
global alarms
|
||
|
|
|
||
|
|
# Check if alarm already exists
|
||
|
|
for alarm in alarms:
|
||
|
|
if alarm["type"] == type:
|
||
|
|
return
|
||
|
|
|
||
|
|
alarms.append({
|
||
|
|
"type": type,
|
||
|
|
"message": message,
|
||
|
|
"timestamp": datetime.now().timestamp(),
|
||
|
|
"acknowledged": False
|
||
|
|
})
|
||
|
|
|
||
|
|
@app.route('/health', methods=['GET'])
|
||
|
|
def health():
|
||
|
|
"""Health check endpoint"""
|
||
|
|
return jsonify({
|
||
|
|
"status": "healthy",
|
||
|
|
"service": "mock-scada",
|
||
|
|
"timestamp": datetime.now().isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
@app.route('/api/v1/data', methods=['GET'])
|
||
|
|
def get_all_data():
|
||
|
|
"""Get all SCADA data"""
|
||
|
|
update_data()
|
||
|
|
|
||
|
|
return jsonify({
|
||
|
|
"timestamp": datetime.now().isoformat(),
|
||
|
|
"data": current_data,
|
||
|
|
"equipment": equipment_status
|
||
|
|
})
|
||
|
|
|
||
|
|
@app.route('/api/v1/data/<tag>', methods=['GET'])
|
||
|
|
def get_specific_data(tag):
|
||
|
|
"""Get specific SCADA data tag"""
|
||
|
|
update_data()
|
||
|
|
|
||
|
|
if tag not in current_data:
|
||
|
|
return jsonify({"error": f"Tag '{tag}' not found"}), 404
|
||
|
|
|
||
|
|
return jsonify({
|
||
|
|
"tag": tag,
|
||
|
|
"value": current_data[tag]["value"],
|
||
|
|
"unit": current_data[tag]["unit"],
|
||
|
|
"timestamp": datetime.now().isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
@app.route('/api/v1/control/<equipment>', methods=['POST'])
|
||
|
|
def control_equipment(equipment):
|
||
|
|
"""Control SCADA equipment"""
|
||
|
|
if equipment not in equipment_status:
|
||
|
|
return jsonify({"error": f"Equipment '{equipment}' not found"}), 404
|
||
|
|
|
||
|
|
command = request.json.get("command")
|
||
|
|
if not command:
|
||
|
|
return jsonify({"error": "Command is required"}), 400
|
||
|
|
|
||
|
|
valid_commands = ["START", "STOP", "OPEN", "CLOSE", "RESET"]
|
||
|
|
if command not in valid_commands:
|
||
|
|
return jsonify({"error": f"Invalid command. Valid commands: {valid_commands}"}), 400
|
||
|
|
|
||
|
|
previous_status = equipment_status[equipment]
|
||
|
|
equipment_status[equipment] = command
|
||
|
|
|
||
|
|
return jsonify({
|
||
|
|
"equipment": equipment,
|
||
|
|
"previous_status": previous_status,
|
||
|
|
"current_status": command,
|
||
|
|
"timestamp": datetime.now().isoformat(),
|
||
|
|
"message": f"Equipment {equipment} changed from {previous_status} to {command}"
|
||
|
|
})
|
||
|
|
|
||
|
|
@app.route('/api/v1/alarms', methods=['GET'])
|
||
|
|
def get_alarms():
|
||
|
|
"""Get current alarms"""
|
||
|
|
return jsonify({
|
||
|
|
"alarms": alarms,
|
||
|
|
"timestamp": datetime.now().isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
@app.route('/api/v1/alarms/<alarm_type>/acknowledge', methods=['POST'])
|
||
|
|
def acknowledge_alarm(alarm_type):
|
||
|
|
"""Acknowledge an alarm"""
|
||
|
|
for alarm in alarms:
|
||
|
|
if alarm["type"] == alarm_type:
|
||
|
|
alarm["acknowledged"] = True
|
||
|
|
return jsonify({
|
||
|
|
"alarm": alarm_type,
|
||
|
|
"acknowledged": True,
|
||
|
|
"timestamp": datetime.now().isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
return jsonify({"error": f"Alarm '{alarm_type}' not found"}), 404
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
print("🚀 Starting Mock SCADA Server on port 8081...")
|
||
|
|
print("📊 Available endpoints:")
|
||
|
|
print(" GET /health")
|
||
|
|
print(" GET /api/v1/data")
|
||
|
|
print(" GET /api/v1/data/<tag>")
|
||
|
|
print(" POST /api/v1/control/<equipment>")
|
||
|
|
print(" GET /api/v1/alarms")
|
||
|
|
print(" POST /api/v1/alarms/<type>/acknowledge")
|
||
|
|
|
||
|
|
app.run(host='0.0.0.0', port=8081, debug=False)
|