CalejoControl/scripts/run-reliable-e2e-tests.py

137 lines
4.0 KiB
Python
Raw Permalink Normal View History

GET http://95.111.206.155:8081/api/v1/dashboard/discovery/results/scan_20251107_092049 404 (Not Found)
(anonymous) @ discovery.js:114
setInterval
pollScanStatus @ discovery.js:112
startDiscoveryScan @ discovery.js:81
await in startDiscoveryScan
(anonymous) @ discovery.js:34
#!/usr/bin/env python
"""
Mock-Dependent End-to-End Test Runner
Starts mock services and runs comprehensive e2e tests
This script is for tests that require mock SCADA and optimizer services to be running.
For integration tests that don't require external services, use pytest directly.
"""
import subprocess
import sys
import time
import requests
import os
# Configuration
SCADA_BASE_URL = "http://localhost:8081"
OPTIMIZER_BASE_URL = "http://localhost:8082"
def wait_for_service(url, max_attempts=30, delay=1):
"""Wait for a service to become available"""
for attempt in range(max_attempts):
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
print(f"✅ Service {url} is ready")
return True
except requests.exceptions.RequestException:
pass
if attempt < max_attempts - 1:
print(f" Waiting for {url}... ({attempt + 1}/{max_attempts})")
time.sleep(delay)
print(f"❌ Service {url} failed to start")
return False
def start_mock_services():
"""Start mock services using the existing script"""
print("🚀 Starting mock services...")
# Start services in background
scada_process = subprocess.Popen([
sys.executable, "tests/mock_services/mock_scada_server.py"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
optimizer_process = subprocess.Popen([
sys.executable, "tests/mock_services/mock_optimizer_server.py"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Wait for services to be ready
print("⏳ Waiting for services to be ready...")
scada_ready = wait_for_service(f"{SCADA_BASE_URL}/health")
optimizer_ready = wait_for_service(f"{OPTIMIZER_BASE_URL}/health")
if not (scada_ready and optimizer_ready):
print("❌ Failed to start mock services")
scada_process.terminate()
optimizer_process.terminate()
return None, None
print("✅ All mock services are ready!")
return scada_process, optimizer_process
def stop_mock_services(scada_process, optimizer_process):
"""Stop mock services"""
print("\n🛑 Stopping mock services...")
if scada_process:
scada_process.terminate()
scada_process.wait()
if optimizer_process:
optimizer_process.terminate()
optimizer_process.wait()
print("✅ Mock services stopped")
def run_tests():
"""Run the reliable end-to-end tests"""
print("\n🧪 Running Reliable End-to-End Tests...")
# Run pytest with the reliable e2e tests
result = subprocess.run([
sys.executable, "-m", "pytest",
"tests/e2e/test_reliable_e2e_workflow.py",
"-v", "--tb=short"
], capture_output=False)
return result.returncode
def main():
"""Main function"""
print("=" * 80)
print("🔧 RELIABLE END-TO-END TEST RUNNER")
print("=" * 80)
# Change to project directory
os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Start mock services
scada_process, optimizer_process = start_mock_services()
if not scada_process or not optimizer_process:
print("❌ Failed to start services, cannot run tests")
return 1
try:
# Run tests
test_result = run_tests()
# Report results
print("\n" + "=" * 80)
print("📊 TEST RESULTS")
print("=" * 80)
if test_result == 0:
print("🎉 ALL TESTS PASSED!")
else:
print("❌ SOME TESTS FAILED")
return test_result
finally:
# Always stop services
stop_mock_services(scada_process, optimizer_process)
if __name__ == "__main__":
sys.exit(main())