402 lines
14 KiB
Python
402 lines
14 KiB
Python
"""
|
|
Integration tests for mock SCADA and optimizer services
|
|
"""
|
|
|
|
import pytest
|
|
import requests
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
|
|
# Test configuration
|
|
SCADA_BASE_URL = "http://localhost:8081"
|
|
OPTIMIZER_BASE_URL = "http://localhost:8082"
|
|
CALEJO_BASE_URL = "http://localhost:8080"
|
|
|
|
class TestMockSCADAService:
|
|
"""Test suite for mock SCADA service"""
|
|
|
|
def test_scada_health(self):
|
|
"""Test SCADA service health endpoint"""
|
|
response = requests.get(f"{SCADA_BASE_URL}/health")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "healthy"
|
|
assert data["service"] == "mock-scada"
|
|
|
|
def test_scada_get_all_data(self):
|
|
"""Test retrieving all SCADA data"""
|
|
response = requests.get(f"{SCADA_BASE_URL}/api/v1/data")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Verify structure
|
|
assert "timestamp" in data
|
|
assert "data" in data
|
|
assert "equipment" in data
|
|
|
|
# Verify data fields
|
|
scada_data = data["data"]
|
|
assert "temperature" in scada_data
|
|
assert "pressure" in scada_data
|
|
assert "flow_rate" in scada_data
|
|
assert "level" in scada_data
|
|
assert "power" in scada_data
|
|
assert "status" in scada_data
|
|
assert "efficiency" in scada_data
|
|
|
|
# Verify equipment status
|
|
equipment = data["equipment"]
|
|
assert "pump_1" in equipment
|
|
assert "valve_1" in equipment
|
|
assert "compressor" in equipment
|
|
|
|
def test_scada_get_specific_data(self):
|
|
"""Test retrieving specific SCADA data tags"""
|
|
tags = ["temperature", "pressure", "flow_rate"]
|
|
|
|
for tag in tags:
|
|
response = requests.get(f"{SCADA_BASE_URL}/api/v1/data/{tag}")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data["tag"] == tag
|
|
assert "value" in data
|
|
assert "unit" in data
|
|
assert "timestamp" in data
|
|
|
|
def test_scada_control_equipment(self):
|
|
"""Test controlling SCADA equipment"""
|
|
equipment = "pump_1"
|
|
command = "START"
|
|
|
|
response = requests.post(
|
|
f"{SCADA_BASE_URL}/api/v1/control/{equipment}",
|
|
json={"command": command}
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data["equipment"] == equipment
|
|
assert data["current_status"] == command
|
|
assert "previous_status" in data
|
|
assert "timestamp" in data
|
|
assert "message" in data
|
|
|
|
def test_scada_alarms(self):
|
|
"""Test SCADA alarm system"""
|
|
response = requests.get(f"{SCADA_BASE_URL}/api/v1/alarms")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert "alarms" in data
|
|
# Alarms may be empty or contain alarm objects
|
|
assert isinstance(data["alarms"], list)
|
|
|
|
def test_scada_invalid_tag(self):
|
|
"""Test requesting invalid SCADA tag"""
|
|
response = requests.get(f"{SCADA_BASE_URL}/api/v1/data/invalid_tag")
|
|
assert response.status_code == 404
|
|
data = response.json()
|
|
assert "error" in data
|
|
|
|
def test_scada_invalid_control(self):
|
|
"""Test invalid control command"""
|
|
response = requests.post(
|
|
f"{SCADA_BASE_URL}/api/v1/control/invalid_equipment",
|
|
json={"command": "INVALID_COMMAND"}
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
|
|
class TestMockOptimizerService:
|
|
"""Test suite for mock optimizer service"""
|
|
|
|
def test_optimizer_health(self):
|
|
"""Test optimizer service health endpoint"""
|
|
response = requests.get(f"{OPTIMIZER_BASE_URL}/health")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "healthy"
|
|
assert data["service"] == "mock-optimizer"
|
|
|
|
def test_optimizer_get_models(self):
|
|
"""Test retrieving available optimization models"""
|
|
response = requests.get(f"{OPTIMIZER_BASE_URL}/api/v1/models")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert "models" in data
|
|
models = data["models"]
|
|
|
|
# Verify model structure
|
|
assert "energy_optimization" in models
|
|
assert "production_optimization" in models
|
|
assert "cost_optimization" in models
|
|
|
|
# Verify model details
|
|
energy_model = models["energy_optimization"]
|
|
assert energy_model["name"] == "Energy Consumption Optimizer"
|
|
assert "parameters" in energy_model
|
|
assert isinstance(energy_model["parameters"], list)
|
|
|
|
def test_energy_optimization(self):
|
|
"""Test energy optimization model"""
|
|
test_data = {
|
|
"power_load": 450.0,
|
|
"time_of_day": 14,
|
|
"production_rate": 95.0
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/optimize/energy_optimization",
|
|
json=test_data
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert "optimization_id" in data
|
|
assert data["model"] == "energy_optimization"
|
|
assert "result" in data
|
|
assert "processing_time" in data
|
|
assert "timestamp" in data
|
|
|
|
result = data["result"]
|
|
assert "optimal_power_setpoint" in result
|
|
assert "recommended_actions" in result
|
|
assert "estimated_savings" in result
|
|
assert "confidence" in result
|
|
|
|
def test_production_optimization(self):
|
|
"""Test production optimization model"""
|
|
test_data = {
|
|
"raw_material_quality": 85.0,
|
|
"machine_utilization": 92.0,
|
|
"operator_skill": 88.0
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/optimize/production_optimization",
|
|
json=test_data
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data["model"] == "production_optimization"
|
|
result = data["result"]
|
|
assert "optimal_production_rate" in result
|
|
assert "efficiency_gain" in result
|
|
|
|
def test_cost_optimization(self):
|
|
"""Test cost optimization model"""
|
|
test_data = {
|
|
"energy_cost": 55.0,
|
|
"labor_cost": 30.0,
|
|
"maintenance_cost": 15.0
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/optimize/cost_optimization",
|
|
json=test_data
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data["model"] == "cost_optimization"
|
|
result = data["result"]
|
|
assert "optimal_cost_structure" in result
|
|
assert "cost_reduction" in result
|
|
|
|
def test_optimizer_history(self):
|
|
"""Test optimization history"""
|
|
response = requests.get(f"{OPTIMIZER_BASE_URL}/api/v1/history")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert "history" in data
|
|
assert "total_optimizations" in data
|
|
assert isinstance(data["history"], list)
|
|
|
|
def test_optimizer_forecast(self):
|
|
"""Test forecast generation"""
|
|
forecast_data = {"hours": 24}
|
|
|
|
response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/forecast",
|
|
json=forecast_data
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert "forecast" in data
|
|
assert "generated_at" in data
|
|
assert "horizon_hours" in data
|
|
|
|
forecast = data["forecast"]
|
|
assert len(forecast) == 24
|
|
|
|
# Verify forecast structure
|
|
for item in forecast:
|
|
assert "timestamp" in item
|
|
assert "energy_consumption" in item
|
|
assert "production_rate" in item
|
|
assert "efficiency" in item
|
|
assert "cost" in item
|
|
|
|
def test_optimizer_invalid_model(self):
|
|
"""Test optimization with invalid model"""
|
|
response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/optimize/invalid_model",
|
|
json={"test": "data"}
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
def test_optimizer_missing_data(self):
|
|
"""Test optimization with missing data"""
|
|
response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/optimize/energy_optimization",
|
|
json={}
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
|
|
class TestCalejoIntegration:
|
|
"""Test Calejo Control Adapter integration with mock services"""
|
|
|
|
def test_calejo_health(self):
|
|
"""Test Calejo Control Adapter health"""
|
|
response = requests.get(f"{CALEJO_BASE_URL}/health")
|
|
assert response.status_code == 200
|
|
# Health endpoint should return success status
|
|
|
|
def test_calejo_dashboard(self):
|
|
"""Test dashboard accessibility"""
|
|
response = requests.get(f"{CALEJO_BASE_URL}/dashboard")
|
|
# Dashboard might return 200 or redirect
|
|
assert response.status_code in [200, 302]
|
|
|
|
def test_calejo_api_status(self):
|
|
"""Test API status endpoint"""
|
|
response = requests.get(f"{CALEJO_BASE_URL}/api/v1/status")
|
|
# Status endpoint should be accessible
|
|
assert response.status_code in [200, 404] # Might not be implemented yet
|
|
|
|
def test_calejo_metrics(self):
|
|
"""Test metrics endpoint"""
|
|
response = requests.get(f"{CALEJO_BASE_URL}/api/v1/metrics")
|
|
# Metrics endpoint should be accessible
|
|
assert response.status_code in [200, 404] # Might not be implemented yet
|
|
|
|
|
|
class TestEndToEndWorkflow:
|
|
"""Test end-to-end workflows with mock services"""
|
|
|
|
def test_scada_to_optimizer_workflow(self):
|
|
"""Test workflow: SCADA data -> Optimization -> Control"""
|
|
|
|
# 1. Get current SCADA data
|
|
scada_response = requests.get(f"{SCADA_BASE_URL}/api/v1/data")
|
|
assert scada_response.status_code == 200
|
|
scada_data = scada_response.json()
|
|
|
|
# 2. Run energy optimization based on SCADA data
|
|
optimization_data = {
|
|
"power_load": scada_data["data"]["power"]["value"],
|
|
"time_of_day": datetime.now().hour,
|
|
"production_rate": scada_data["data"]["flow_rate"]["value"]
|
|
}
|
|
|
|
opt_response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/optimize/energy_optimization",
|
|
json=optimization_data
|
|
)
|
|
assert opt_response.status_code == 200
|
|
optimization_result = opt_response.json()
|
|
|
|
# 3. Apply optimization recommendations (simulate control)
|
|
# This would typically be done by Calejo Control Adapter
|
|
assert "result" in optimization_result
|
|
assert "recommended_actions" in optimization_result["result"]
|
|
|
|
def test_alarm_to_optimization_workflow(self):
|
|
"""Test workflow: Alarm detection -> Optimization response"""
|
|
|
|
# 1. Check for alarms
|
|
alarm_response = requests.get(f"{SCADA_BASE_URL}/api/v1/alarms")
|
|
assert alarm_response.status_code == 200
|
|
alarms = alarm_response.json()["alarms"]
|
|
|
|
# 2. If alarms exist, run appropriate optimization
|
|
if alarms:
|
|
# For temperature alarm, run energy optimization
|
|
opt_response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/optimize/energy_optimization",
|
|
json={
|
|
"power_load": 500.0,
|
|
"time_of_day": datetime.now().hour,
|
|
"production_rate": 100.0
|
|
}
|
|
)
|
|
assert opt_response.status_code == 200
|
|
|
|
def test_forecast_based_planning(self):
|
|
"""Test forecast-based planning workflow"""
|
|
|
|
# 1. Generate forecast
|
|
forecast_response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/forecast",
|
|
json={"hours": 12}
|
|
)
|
|
assert forecast_response.status_code == 200
|
|
forecast = forecast_response.json()
|
|
|
|
# 2. Use forecast for planning
|
|
assert len(forecast["forecast"]) == 12
|
|
|
|
# 3. Run optimization based on forecast
|
|
avg_energy = sum(item["energy_consumption"] for item in forecast["forecast"]) / 12
|
|
|
|
opt_response = requests.post(
|
|
f"{OPTIMIZER_BASE_URL}/api/v1/optimize/energy_optimization",
|
|
json={
|
|
"power_load": avg_energy,
|
|
"time_of_day": datetime.now().hour,
|
|
"production_rate": 95.0
|
|
}
|
|
)
|
|
assert opt_response.status_code == 200
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def wait_for_services():
|
|
"""Wait for services to be ready before running tests"""
|
|
print("⏳ Waiting for mock services to be ready...")
|
|
|
|
max_wait = 60 # Maximum wait time in seconds
|
|
start_time = time.time()
|
|
|
|
services_ready = False
|
|
while time.time() - start_time < max_wait:
|
|
try:
|
|
# Check if all services are responding
|
|
scada_ready = requests.get(f"{SCADA_BASE_URL}/health").status_code == 200
|
|
optimizer_ready = requests.get(f"{OPTIMIZER_BASE_URL}/health").status_code == 200
|
|
calejo_ready = requests.get(f"{CALEJO_BASE_URL}/health").status_code == 200
|
|
|
|
if scada_ready and optimizer_ready and calejo_ready:
|
|
services_ready = True
|
|
break
|
|
except:
|
|
pass
|
|
|
|
time.sleep(2)
|
|
|
|
if not services_ready:
|
|
pytest.skip("Mock services not ready within timeout period")
|
|
|
|
print("✅ All mock services are ready!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run tests directly
|
|
pytest.main([__file__, "-v"]) |