CalejoControl/tests/deployment/smoke_tests.py

239 lines
9.6 KiB
Python
Raw Permalink Normal View History

"""
Deployment Smoke Tests
These tests verify basic functionality after deployment.
They should be run on the deployed environment to ensure the deployment was successful.
"""
import pytest
import requests
import time
import os
from datetime import datetime
# Configuration - these should be set based on the deployment environment
BASE_URL = os.getenv('DEPLOYMENT_BASE_URL', 'http://localhost:8080')
SCADA_URL = os.getenv('DEPLOYMENT_SCADA_URL', 'http://localhost:8081')
OPTIMIZER_URL = os.getenv('DEPLOYMENT_OPTIMIZER_URL', 'http://localhost:8082')
# Retry configuration for service startup
MAX_RETRIES = 10
RETRY_DELAY = 5 # seconds
class DeploymentSmokeTests:
"""Smoke tests for deployment verification"""
def test_health_endpoints(self):
"""Test that all health endpoints are responding"""
print("\n🏥 Testing Health Endpoints...")
endpoints = [
(f"{BASE_URL}/health", "Main Application"),
(f"{SCADA_URL}/health", "SCADA Service"),
(f"{OPTIMIZER_URL}/health", "Optimizer Service"),
]
for url, service_name in endpoints:
for attempt in range(MAX_RETRIES):
try:
response = requests.get(url, timeout=10)
assert response.status_code == 200, f"{service_name} health check failed"
print(f"{service_name}: Healthy")
break
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
if attempt == MAX_RETRIES - 1:
pytest.fail(f"{service_name} not available after {MAX_RETRIES} attempts: {e}")
print(f"{service_name}: Waiting... ({attempt + 1}/{MAX_RETRIES})")
time.sleep(RETRY_DELAY)
def test_api_endpoints(self):
"""Test that key API endpoints are accessible"""
print("\n🔌 Testing API Endpoints...")
# Test main application API
endpoints = [
(f"{BASE_URL}/api/v1/status", "GET"),
(f"{BASE_URL}/dashboard", "GET"),
]
for url, method in endpoints:
try:
if method == "GET":
response = requests.get(url, timeout=10)
else:
response = requests.post(url, timeout=10)
# Accept 200 (success) or 404 (endpoint not implemented yet)
assert response.status_code in [200, 404], f"API endpoint {url} failed: {response.status_code}"
print(f"{url}: Accessible")
except Exception as e:
# For smoke tests, don't fail on main app endpoints if they're not available
# This allows testing with just mock services
if BASE_URL != SCADA_URL and BASE_URL != OPTIMIZER_URL:
print(f" ⚠️ {url}: Not available (expected for mock-only testing)")
else:
pytest.fail(f"API endpoint {url} failed: {e}")
def test_scada_integration(self):
"""Test basic SCADA integration"""
print("\n🏭 Testing SCADA Integration...")
try:
# Get SCADA data
response = requests.get(f"{SCADA_URL}/api/v1/data", timeout=10)
assert response.status_code == 200, "SCADA data endpoint failed"
data = response.json()
assert "timestamp" in data, "SCADA data missing timestamp"
assert "data" in data, "SCADA data missing data section"
assert "equipment" in data, "SCADA data missing equipment section"
print(f" ✅ SCADA Data: {len(data['data'])} data points")
print(f" ✅ Equipment: {len(data['equipment'])} devices")
except Exception as e:
pytest.fail(f"SCADA integration test failed: {e}")
def test_optimizer_integration(self):
"""Test basic optimizer integration"""
print("\n🧠 Testing Optimizer Integration...")
try:
# Get available models
response = requests.get(f"{OPTIMIZER_URL}/api/v1/models", timeout=10)
assert response.status_code == 200, "Optimizer models endpoint failed"
models = response.json()
assert "models" in models, "Optimizer response missing models"
# Test basic optimization
if models["models"] and len(models["models"]) > 0:
# Get the first model key (model names are keys in the dictionary)
model_keys = list(models["models"].keys())
if model_keys:
model = model_keys[0]
optimization_data = {
"power_load": 450,
"time_of_day": datetime.now().hour,
"production_rate": 95.0
}
response = requests.post(
f"{OPTIMIZER_URL}/api/v1/optimize/{model}",
json=optimization_data,
timeout=30
)
# Accept 200 (success) or 400/404 (model-specific issues)
assert response.status_code in [200, 400, 404], f"Optimization failed: {response.status_code}"
if response.status_code == 200:
result = response.json()
assert "optimization_id" in result, "Optimization result missing ID"
print(f" ✅ Optimization: {result['optimization_id']}")
else:
print(f" ⚠️ Optimization: Model {model} not available")
print(f" ✅ Available Models: {len(models['models'])}")
except Exception as e:
pytest.fail(f"Optimizer integration test failed: {e}")
def test_database_connectivity(self):
"""Test database connectivity (if applicable)"""
print("\n🗄️ Testing Database Connectivity...")
# This test would need to be adapted based on the actual database setup
# For now, we'll skip it or implement a basic check
try:
# Try to access a database-related endpoint if available
response = requests.get(f"{BASE_URL}/api/v1/status", timeout=10)
# If we can reach the status endpoint, assume database is working
# (since the application likely depends on it)
assert response.status_code in [200, 404], "Status endpoint failed"
print(" ✅ Database: Application is responsive")
except Exception as e:
print(f" ⚠️ Database: Basic check failed - {e}")
# Don't fail the test for database issues in smoke tests
# This allows deployment to succeed even if database needs separate setup
def test_performance_baseline(self):
"""Test basic performance characteristics"""
print("\n⚡ Testing Performance Baseline...")
endpoints = [
(f"{BASE_URL}/health", "Main Health"),
(f"{SCADA_URL}/health", "SCADA Health"),
(f"{OPTIMIZER_URL}/health", "Optimizer Health"),
]
max_response_time = 5.0 # seconds
for url, endpoint_name in endpoints:
start_time = time.time()
try:
response = requests.get(url, timeout=10)
response_time = time.time() - start_time
assert response.status_code == 200, f"{endpoint_name} failed"
assert response_time < max_response_time, f"{endpoint_name} too slow: {response_time:.2f}s"
print(f"{endpoint_name}: {response_time:.3f}s")
except Exception as e:
print(f" ⚠️ {endpoint_name}: Performance test skipped - {e}")
def run_smoke_tests():
"""Run all smoke tests and return success status"""
print("🚀 Running Deployment Smoke Tests...")
print(f"📡 Testing Environment:")
print(f" Main App: {BASE_URL}")
print(f" SCADA: {SCADA_URL}")
print(f" Optimizer: {OPTIMIZER_URL}")
test_instance = DeploymentSmokeTests()
tests = [
test_instance.test_health_endpoints,
test_instance.test_api_endpoints,
test_instance.test_scada_integration,
test_instance.test_optimizer_integration,
test_instance.test_database_connectivity,
test_instance.test_performance_baseline,
]
results = []
for test in tests:
try:
test()
results.append((test.__name__, "PASSED"))
except Exception as e:
results.append((test.__name__, f"FAILED: {e}"))
# Print summary
print("\n" + "="*60)
print("📊 SMOKE TEST SUMMARY")
print("="*60)
passed = 0
for test_name, result in results:
status = "" if "PASSED" in result else ""
print(f"{status} {test_name}: {result}")
if "PASSED" in result:
passed += 1
print(f"\n📈 Results: {passed}/{len(results)} tests passed")
if passed == len(results):
print("🎉 All smoke tests passed! Deployment appears successful.")
return True
else:
print("⚠️ Some smoke tests failed. Please investigate deployment issues.")
return False
if __name__ == "__main__":
success = run_smoke_tests()
exit(0 if success else 1)