#!/usr/bin/env python """ Mock-Dependent End-to-End Test Runner Starts mock services and runs comprehensive e2e tests This script is for tests that require mock SCADA and optimizer services to be running. For integration tests that don't require external services, use pytest directly. """ import subprocess import sys import time import requests import os # Configuration SCADA_BASE_URL = "http://localhost:8081" OPTIMIZER_BASE_URL = "http://localhost:8082" def wait_for_service(url, max_attempts=30, delay=1): """Wait for a service to become available""" for attempt in range(max_attempts): try: response = requests.get(url, timeout=5) if response.status_code == 200: print(f"โœ… Service {url} is ready") return True except requests.exceptions.RequestException: pass if attempt < max_attempts - 1: print(f" Waiting for {url}... ({attempt + 1}/{max_attempts})") time.sleep(delay) print(f"โŒ Service {url} failed to start") return False def start_mock_services(): """Start mock services using the existing script""" print("๐Ÿš€ Starting mock services...") # Start services in background scada_process = subprocess.Popen([ sys.executable, "tests/mock_services/mock_scada_server.py" ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) optimizer_process = subprocess.Popen([ sys.executable, "tests/mock_services/mock_optimizer_server.py" ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Wait for services to be ready print("โณ Waiting for services to be ready...") scada_ready = wait_for_service(f"{SCADA_BASE_URL}/health") optimizer_ready = wait_for_service(f"{OPTIMIZER_BASE_URL}/health") if not (scada_ready and optimizer_ready): print("โŒ Failed to start mock services") scada_process.terminate() optimizer_process.terminate() return None, None print("โœ… All mock services are ready!") return scada_process, optimizer_process def stop_mock_services(scada_process, optimizer_process): """Stop mock services""" print("\n๐Ÿ›‘ Stopping mock services...") if scada_process: scada_process.terminate() scada_process.wait() if optimizer_process: optimizer_process.terminate() optimizer_process.wait() print("โœ… Mock services stopped") def run_tests(): """Run the reliable end-to-end tests""" print("\n๐Ÿงช Running Reliable End-to-End Tests...") # Run pytest with the reliable e2e tests result = subprocess.run([ sys.executable, "-m", "pytest", "tests/e2e/test_reliable_e2e_workflow.py", "-v", "--tb=short" ], capture_output=False) return result.returncode def main(): """Main function""" print("=" * 80) print("๐Ÿ”ง RELIABLE END-TO-END TEST RUNNER") print("=" * 80) # Change to project directory os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Start mock services scada_process, optimizer_process = start_mock_services() if not scada_process or not optimizer_process: print("โŒ Failed to start services, cannot run tests") return 1 try: # Run tests test_result = run_tests() # Report results print("\n" + "=" * 80) print("๐Ÿ“Š TEST RESULTS") print("=" * 80) if test_result == 0: print("๐ŸŽ‰ ALL TESTS PASSED!") else: print("โŒ SOME TESTS FAILED") return test_result finally: # Always stop services stop_mock_services(scada_process, optimizer_process) if __name__ == "__main__": sys.exit(main())