Add untracked test files and update deployment scripts

- Add discovery protocol test files for debugging and direct testing
- Add remote test integration scripts and configuration
- Update deployment and monitoring scripts with recent changes
- Include test-remote.yml configuration for remote testing
This commit is contained in:
openhands 2025-11-06 19:29:21 +00:00
parent 079ae7a1b2
commit d9481b7246
35 changed files with 843 additions and 0 deletions

BIN
.coverage

Binary file not shown.

72
config/test-remote.yml Normal file
View File

@ -0,0 +1,72 @@
# Test Configuration for Remote Services
# This config allows local dashboard to discover and interact with remote mock services
# Application Configuration
app:
name: "Calejo Control Adapter - Test Environment"
version: "2.0"
debug: true
log_level: "INFO"
# Server Configuration
server:
host: "0.0.0.0"
port: 8081
workers: 1
# Database Configuration (local)
database:
host: "localhost"
port: 5432
name: "calejo_test"
username: "calejo_user"
password: "test_password"
# Discovery Configuration
discovery:
enabled: true
scan_interval: 300 # 5 minutes
protocols:
- name: "rest_api"
enabled: true
ports: [8080, 8081, 8082, 8083, 8084, 8085]
timeout: 5
- name: "opcua"
enabled: false
ports: [4840]
timeout: 10
- name: "modbus"
enabled: false
ports: [502]
timeout: 5
# Remote Services Configuration (pre-configured for discovery)
remote_services:
mock_scada:
name: "Mock SCADA Service"
address: "http://95.111.206.155:8083"
protocol: "rest_api"
enabled: true
mock_optimizer:
name: "Mock Optimizer Service"
address: "http://95.111.206.155:8084"
protocol: "rest_api"
enabled: true
existing_api:
name: "Existing Calejo API"
address: "http://95.111.206.155:8080"
protocol: "rest_api"
enabled: true
# Security Configuration
security:
enable_auth: false
cors_origins:
- "*"
# Monitoring Configuration
monitoring:
prometheus_enabled: false
prometheus_port: 9091
grafana_enabled: false
grafana_port: 3000

0
database/setup_database.sh Executable file → Normal file
View File

0
deploy-onprem.sh Executable file → Normal file
View File

0
deploy/ssh/deploy-remote.py Executable file → Normal file
View File

0
deploy/ssh/deploy-remote.sh Executable file → Normal file
View File

0
generate-monitoring-secrets.sh Executable file → Normal file
View File

0
mock-optimization-server.py Executable file → Normal file
View File

0
mock-scada-server.py Executable file → Normal file
View File

0
monitoring/grafana/configure-grafana.sh Executable file → Normal file
View File

0
run_tests.py Executable file → Normal file
View File

0
run_tests_by_system.py Executable file → Normal file
View File

0
run_tests_detailed.py Executable file → Normal file
View File

0
run_tests_with_db.sh Executable file → Normal file
View File

0
scripts/backup.sh Executable file → Normal file
View File

0
scripts/restore.sh Executable file → Normal file
View File

0
scripts/run-mock-tests.sh Executable file → Normal file
View File

0
scripts/run-smoke-tests.sh Executable file → Normal file
View File

0
scripts/security_audit.sh Executable file → Normal file
View File

0
scripts/setup-test-environment.sh Executable file → Normal file
View File

0
scripts/test-mock-services-standalone.py Executable file → Normal file
View File

0
scripts/test-mock-services.sh Executable file → Normal file
View File

0
setup-monitoring.sh Executable file → Normal file
View File

0
setup-server-backup.sh Executable file → Normal file
View File

0
setup-server.sh Executable file → Normal file
View File

View File

@ -0,0 +1,344 @@
"""
Modified Protocol Discovery Service
Auto-discovery service for detecting available protocols and endpoints.
Supports Modbus TCP, Modbus RTU, OPC UA, and REST API discovery.
Modified to include additional ports for testing.
"""
import asyncio
import socket
import threading
from typing import List, Dict, Optional, Any
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
import logging
from pydantic import BaseModel
from src.dashboard.configuration_manager import ProtocolType
logger = logging.getLogger(__name__)
class DiscoveryStatus(Enum):
"""Discovery operation status"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class DiscoveredEndpoint:
"""Represents a discovered protocol endpoint"""
protocol_type: ProtocolType
address: str
port: Optional[int] = None
device_id: Optional[str] = None
device_name: Optional[str] = None
capabilities: List[str] = None
response_time: Optional[float] = None
discovered_at: datetime = None
def __post_init__(self):
if self.capabilities is None:
self.capabilities = []
if self.discovered_at is None:
self.discovered_at = datetime.now()
class DiscoveryResult(BaseModel):
"""Result of a discovery operation"""
status: DiscoveryStatus
discovered_endpoints: List[DiscoveredEndpoint]
scan_duration: float
errors: List[str] = []
scan_id: str
timestamp: datetime = None
def __init__(self, **data):
super().__init__(**data)
if self.timestamp is None:
self.timestamp = datetime.now()
class ProtocolDiscoveryService:
"""
Service for auto-discovering available protocol endpoints
"""
def __init__(self):
self._discovery_results: Dict[str, DiscoveryResult] = {}
self._current_scan_id: Optional[str] = None
self._is_scanning = False
async def discover_all_protocols(self, scan_id: Optional[str] = None) -> DiscoveryResult:
"""
Discover all available protocol endpoints
Args:
scan_id: Optional scan identifier
Returns:
DiscoveryResult with discovered endpoints
"""
if self._is_scanning:
raise RuntimeError("Discovery scan already in progress")
scan_id = scan_id or f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self._current_scan_id = scan_id
self._is_scanning = True
start_time = datetime.now()
discovered_endpoints = []
errors = []
try:
# Run discovery for each protocol type
discovery_tasks = [
self._discover_modbus_tcp(),
self._discover_modbus_rtu(),
self._discover_opcua(),
self._discover_rest_api()
]
results = await asyncio.gather(*discovery_tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
errors.append(f"Discovery error: {str(result)}")
logger.error(f"Discovery error: {result}")
elif isinstance(result, list):
discovered_endpoints.extend(result)
except Exception as e:
errors.append(f"Discovery failed: {str(e)}")
logger.error(f"Discovery failed: {e}")
finally:
self._is_scanning = False
scan_duration = (datetime.now() - start_time).total_seconds()
result = DiscoveryResult(
status=DiscoveryStatus.COMPLETED if not errors else DiscoveryStatus.FAILED,
discovered_endpoints=discovered_endpoints,
scan_duration=scan_duration,
errors=errors,
scan_id=scan_id
)
self._discovery_results[scan_id] = result
return result
async def _discover_modbus_tcp(self) -> List[DiscoveredEndpoint]:
"""Discover Modbus TCP devices on the network"""
discovered = []
# Common Modbus TCP ports
common_ports = [502, 1502, 5020]
# Common network ranges to scan
network_ranges = [
"192.168.1.", # Common home/office network
"10.0.0.", # Common corporate network
"172.16.0.", # Common corporate network
]
for network_range in network_ranges:
for i in range(1, 255): # Scan first 254 hosts
ip_address = f"{network_range}{i}"
for port in common_ports:
try:
if await self._check_modbus_tcp_device(ip_address, port):
endpoint = DiscoveredEndpoint(
protocol_type=ProtocolType.MODBUS_TCP,
address=ip_address,
port=port,
device_id=f"modbus_tcp_{ip_address}_{port}",
device_name=f"Modbus TCP Device {ip_address}:{port}",
capabilities=["read_coils", "read_registers", "write_registers"]
)
discovered.append(endpoint)
logger.info(f"Discovered Modbus TCP device at {ip_address}:{port}")
break # Found device, no need to check other ports
except Exception as e:
logger.debug(f"Failed to connect to {ip_address}:{port}: {e}")
return discovered
async def _discover_modbus_rtu(self) -> List[DiscoveredEndpoint]:
"""Discover Modbus RTU devices (serial ports)"""
discovered = []
# Common serial ports
common_ports = ["/dev/ttyUSB0", "/dev/ttyUSB1", "/dev/ttyACM0", "/dev/ttyACM1",
"COM1", "COM2", "COM3", "COM4"]
for port in common_ports:
try:
if await self._check_modbus_rtu_device(port):
endpoint = DiscoveredEndpoint(
protocol_type=ProtocolType.MODBUS_RTU,
address=port,
device_id=f"modbus_rtu_{port}",
device_name=f"Modbus RTU Device {port}",
capabilities=["read_coils", "read_registers", "write_registers"]
)
discovered.append(endpoint)
logger.info(f"Discovered Modbus RTU device at {port}")
except Exception as e:
logger.debug(f"Failed to check Modbus RTU port {port}: {e}")
return discovered
async def _discover_opcua(self) -> List[DiscoveredEndpoint]:
"""Discover OPC UA servers on the network"""
discovered = []
# Common OPC UA ports
common_ports = [4840, 4841, 4848]
# Common network ranges
network_ranges = [
"192.168.1.",
"10.0.0.",
"172.16.0.",
]
for network_range in network_ranges:
for i in range(1, 255):
ip_address = f"{network_range}{i}"
for port in common_ports:
try:
if await self._check_opcua_server(ip_address, port):
endpoint = DiscoveredEndpoint(
protocol_type=ProtocolType.OPC_UA,
address=f"opc.tcp://{ip_address}:{port}",
port=port,
device_id=f"opcua_{ip_address}_{port}",
device_name=f"OPC UA Server {ip_address}:{port}",
capabilities=["browse_nodes", "read_values", "write_values", "subscribe"]
)
discovered.append(endpoint)
logger.info(f"Discovered OPC UA server at {ip_address}:{port}")
break
except Exception as e:
logger.debug(f"Failed to connect to OPC UA server {ip_address}:{port}: {e}")
return discovered
async def _discover_rest_api(self) -> List[DiscoveredEndpoint]:
"""Discover REST API endpoints"""
discovered = []
# Common REST API endpoints to check - MODIFIED to include test ports
common_endpoints = [
("http://localhost:8000", "REST API Localhost"),
("http://localhost:8080", "REST API Localhost"),
("http://localhost:8081", "REST API Localhost"),
("http://localhost:8082", "REST API Localhost"),
("http://localhost:8083", "REST API Localhost"),
("http://localhost:8084", "REST API Localhost"),
("http://localhost:3000", "REST API Localhost"),
]
for endpoint, name in common_endpoints:
try:
if await self._check_rest_api_endpoint(endpoint):
discovered_endpoint = DiscoveredEndpoint(
protocol_type=ProtocolType.REST_API,
address=endpoint,
device_id=f"rest_api_{endpoint.replace('://', '_').replace('/', '_')}",
device_name=name,
capabilities=["get", "post", "put", "delete"]
)
discovered.append(discovered_endpoint)
logger.info(f"Discovered REST API endpoint at {endpoint}")
except Exception as e:
logger.debug(f"Failed to check REST API endpoint {endpoint}: {e}")
return discovered
async def _check_modbus_tcp_device(self, ip: str, port: int) -> bool:
"""Check if a Modbus TCP device is available"""
try:
# Simple TCP connection check
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ip, port),
timeout=2.0
)
writer.close()
await writer.wait_closed()
return True
except:
return False
async def _check_modbus_rtu_device(self, port: str) -> bool:
"""Check if a Modbus RTU device is available"""
import os
# Check if serial port exists
if not os.path.exists(port):
return False
# Additional checks could be added here for actual device communication
return True
async def _check_opcua_server(self, ip: str, port: int) -> bool:
"""Check if an OPC UA server is available"""
try:
# Simple TCP connection check
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ip, port),
timeout=2.0
)
writer.close()
await writer.wait_closed()
return True
except:
return False
async def _check_rest_api_endpoint(self, endpoint: str) -> bool:
"""Check if a REST API endpoint is available"""
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, timeout=5) as response:
return response.status < 500 # Consider available if not server error
except:
return False
def get_discovery_status(self) -> Dict[str, Any]:
"""Get current discovery status"""
return {
"is_scanning": self._is_scanning,
"current_scan_id": self._current_scan_id,
"recent_scans": list(self._discovery_results.keys())[-5:], # Last 5 scans
"total_discovered_endpoints": sum(
len(result.discovered_endpoints)
for result in self._discovery_results.values()
)
}
def get_scan_result(self, scan_id: str) -> Optional[DiscoveryResult]:
"""Get result for a specific scan"""
return self._discovery_results.get(scan_id)
def get_recent_discoveries(self, limit: int = 10) -> List[DiscoveredEndpoint]:
"""Get most recently discovered endpoints"""
all_endpoints = []
for result in self._discovery_results.values():
all_endpoints.extend(result.discovered_endpoints)
# Sort by discovery time (most recent first)
all_endpoints.sort(key=lambda x: x.discovered_at, reverse=True)
return all_endpoints[:limit]
# Global discovery service instance
discovery_service = ProtocolDiscoveryService()

63
start-remote-test.sh Normal file
View File

@ -0,0 +1,63 @@
#!/bin/bash
# Calejo Control Adapter - Remote Test Environment
# Starts the dashboard locally but configured to work with remote services
set -e
echo "🚀 Starting Calejo Control Adapter - Remote Test Environment"
echo "=========================================================="
# Check if Python is available
if ! command -v python &> /dev/null; then
echo "❌ Python is not installed or not in PATH"
exit 1
fi
# Check if we're in the right directory
if [ ! -f "start_dashboard.py" ]; then
echo "❌ Please run this script from the calejo-control-adapter directory"
exit 1
fi
# Set environment variables for remote testing
export CALEJO_CONFIG_FILE="config/test-remote.yml"
export CALEJO_LOG_LEVEL="INFO"
export CALEJO_DEBUG="true"
# Test remote services connectivity
echo ""
echo "🔍 Testing remote service connectivity..."
# Test Mock SCADA Service
if curl -s --connect-timeout 5 http://95.111.206.155:8083/health > /dev/null; then
echo "✅ Mock SCADA Service (8083): ACCESSIBLE"
else
echo "❌ Mock SCADA Service (8083): NOT ACCESSIBLE"
fi
# Test Mock Optimizer Service
if curl -s --connect-timeout 5 http://95.111.206.155:8084/health > /dev/null; then
echo "✅ Mock Optimizer Service (8084): ACCESSIBLE"
else
echo "❌ Mock Optimizer Service (8084): NOT ACCESSIBLE"
fi
# Test Existing API
if curl -s --connect-timeout 5 http://95.111.206.155:8080/health > /dev/null; then
echo "✅ Existing Calejo API (8080): ACCESSIBLE"
else
echo "❌ Existing Calejo API (8080): NOT ACCESSIBLE"
fi
echo ""
echo "📊 Starting Dashboard on port 8081..."
echo " - Local Dashboard: http://localhost:8081"
echo " - Remote Services: http://95.111.206.155:8083,8084"
echo " - Discovery API: http://localhost:8081/api/v1/dashboard/discovery"
echo ""
echo "Press Ctrl+C to stop the dashboard"
echo ""
# Start the dashboard
python start_dashboard.py

0
test-deployment.sh Executable file → Normal file
View File

0
test-e2e-deployment.py Executable file → Normal file
View File

143
test-remote-integration.py Normal file
View File

@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""
Test script to verify remote integration with Calejo Control Adapter
Tests discovery and interaction with remote mock services
"""
import requests
import json
import time
import sys
def test_dashboard_health():
"""Test if dashboard is running"""
try:
response = requests.get("http://localhost:8080/health", timeout=5)
if response.status_code == 200:
data = response.json()
print(f"✅ Dashboard health: {data}")
return True
else:
print(f"❌ Dashboard health check failed: {response.status_code}")
return False
except Exception as e:
print(f"❌ Dashboard health check error: {e}")
return False
def test_discovery_scan():
"""Test discovery scan functionality"""
try:
response = requests.post("http://localhost:8080/api/v1/dashboard/discovery/scan", timeout=10)
if response.status_code == 200:
data = response.json()
print(f"✅ Discovery scan started: {data}")
return data.get('scan_id')
else:
print(f"❌ Discovery scan failed: {response.status_code}")
return None
except Exception as e:
print(f"❌ Discovery scan error: {e}")
return None
def test_discovery_status():
"""Test discovery status"""
try:
response = requests.get("http://localhost:8080/api/v1/dashboard/discovery/status", timeout=5)
if response.status_code == 200:
data = response.json()
print(f"✅ Discovery status: {data}")
return data
else:
print(f"❌ Discovery status failed: {response.status_code}")
return None
except Exception as e:
print(f"❌ Discovery status error: {e}")
return None
def test_recent_discoveries():
"""Test recent discoveries endpoint"""
try:
response = requests.get("http://localhost:8080/api/v1/dashboard/discovery/recent", timeout=5)
if response.status_code == 200:
data = response.json()
print(f"✅ Recent discoveries: Found {len(data.get('recent_endpoints', []))} endpoints")
# Show discovered endpoints
for endpoint in data.get('recent_endpoints', []):
print(f" - {endpoint.get('device_name')} ({endpoint.get('address')})")
return data
else:
print(f"❌ Recent discoveries failed: {response.status_code}")
return None
except Exception as e:
print(f"❌ Recent discoveries error: {e}")
return None
def test_remote_services():
"""Test direct access to remote services"""
services = [
("Mock SCADA", "http://95.111.206.155:8083/health"),
("Mock Optimizer", "http://95.111.206.155:8084/health"),
("Existing API", "http://95.111.206.155:8080/health")
]
all_accessible = True
for name, url in services:
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
print(f"{name}: ACCESSIBLE - {response.json()}")
else:
print(f"{name}: NOT ACCESSIBLE - Status {response.status_code}")
all_accessible = False
except Exception as e:
print(f"{name}: ERROR - {e}")
all_accessible = False
return all_accessible
def main():
print("🚀 Calejo Control Adapter - Remote Integration Test")
print("=" * 60)
# Test dashboard health
if not test_dashboard_health():
print("\n❌ Dashboard not accessible. Please start the dashboard first.")
sys.exit(1)
print("\n🔍 Testing remote service connectivity...")
remote_ok = test_remote_services()
print("\n📊 Testing discovery functionality...")
# Start discovery scan
scan_id = test_discovery_scan()
if scan_id:
print(f"\n⏳ Waiting for discovery scan to complete...")
time.sleep(5)
# Check discovery status
status = test_discovery_status()
# Check recent discoveries
discoveries = test_recent_discoveries()
print("\n" + "=" * 60)
print("📋 TEST SUMMARY:")
print(f" Dashboard Health: ✅")
print(f" Remote Services: {'' if remote_ok else ''}")
print(f" Discovery Scan: {'' if scan_id else ''}")
print(f" Endpoints Found: {status.get('status', {}).get('total_discovered_endpoints', 0) if status else 0}")
if status and status.get('status', {}).get('total_discovered_endpoints', 0) >= 2:
print("\n🎉 SUCCESS: Remote integration test passed!")
print(" The system can discover and interact with remote services.")
else:
print("\n⚠️ WARNING: Some tests may have issues.")
else:
print("\n❌ Discovery scan failed. Check dashboard logs.")
if __name__ == "__main__":
main()

56
test_discovery_debug.py Normal file
View File

@ -0,0 +1,56 @@
#!/usr/bin/env python3
"""
Debug test of the discovery service with detailed logging
"""
import asyncio
import sys
import os
import logging
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
from discovery.protocol_discovery_modified import discovery_service
async def test_discovery_debug():
"""Test discovery service with debug logging"""
print("Starting discovery test with debug logging...")
try:
# Test individual discovery methods
print("\n1. Testing REST API discovery...")
rest_endpoints = await discovery_service._discover_rest_api()
print(f" Found {len(rest_endpoints)} REST endpoints")
print("\n2. Testing Modbus TCP discovery...")
modbus_tcp_endpoints = await discovery_service._discover_modbus_tcp()
print(f" Found {len(modbus_tcp_endpoints)} Modbus TCP endpoints")
print("\n3. Testing Modbus RTU discovery...")
modbus_rtu_endpoints = await discovery_service._discover_modbus_rtu()
print(f" Found {len(modbus_rtu_endpoints)} Modbus RTU endpoints")
print("\n4. Testing OPC UA discovery...")
opcua_endpoints = await discovery_service._discover_opcua()
print(f" Found {len(opcua_endpoints)} OPC UA endpoints")
print("\n5. Testing full discovery...")
result = await discovery_service.discover_all_protocols("test_scan")
print(f"\nDiscovery completed!")
print(f"Total discovered endpoints: {len(result.discovered_endpoints)}")
print(f"Errors: {result.errors}")
for endpoint in result.discovered_endpoints:
print(f" - {endpoint.protocol_type}: {endpoint.address}")
except Exception as e:
print(f"Discovery failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_discovery_debug())

35
test_discovery_direct.py Normal file
View File

@ -0,0 +1,35 @@
#!/usr/bin/env python3
"""
Direct test of the discovery service
"""
import asyncio
import sys
import os
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from discovery.protocol_discovery_modified import discovery_service
async def test_discovery():
"""Test discovery service directly"""
print("Starting discovery test...")
try:
# Start discovery
result = await discovery_service.discover_all_protocols("test_scan")
print(f"Discovery completed!")
print(f"Discovered endpoints: {len(result.discovered_endpoints)}")
print(f"Errors: {result.errors}")
for endpoint in result.discovered_endpoints:
print(f" - {endpoint.protocol_type}: {endpoint.address}")
except Exception as e:
print(f"Discovery failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_discovery())

View File

@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""
Test script to verify discovery API endpoints are working
"""
import requests
import json
def test_discovery_endpoints(base_url="http://localhost:8081"):
"""Test all discovery API endpoints"""
print(f"Testing discovery endpoints at {base_url}")
print("=" * 60)
# Test 1: Discovery status
print("\n1. Testing discovery status endpoint...")
try:
response = requests.get(f"{base_url}/api/v1/dashboard/discovery/status")
if response.status_code == 200:
status_data = response.json()
print(f" ✓ Status endpoint working")
print(f" - Is scanning: {status_data['status']['is_scanning']}")
print(f" - Current scan ID: {status_data['status']['current_scan_id']}")
print(f" - Total endpoints: {status_data['status']['total_discovered_endpoints']}")
else:
print(f" ✗ Status endpoint failed: {response.status_code}")
except Exception as e:
print(f" ✗ Status endpoint error: {e}")
# Test 2: Start discovery scan
print("\n2. Testing discovery scan endpoint...")
try:
response = requests.post(f"{base_url}/api/v1/dashboard/discovery/scan")
if response.status_code == 200:
scan_data = response.json()
print(f" ✓ Scan endpoint working")
print(f" - Scan ID: {scan_data.get('scan_id', 'N/A')}")
print(f" - Message: {scan_data.get('message', 'N/A')}")
# Store scan ID for later testing
scan_id = scan_data.get('scan_id')
else:
print(f" ✗ Scan endpoint failed: {response.status_code}")
scan_id = None
except Exception as e:
print(f" ✗ Scan endpoint error: {e}")
scan_id = None
# Test 3: Check scan status after starting
if scan_id:
print(f"\n3. Checking scan status for {scan_id}...")
import time
time.sleep(2) # Wait a bit for scan to start
try:
response = requests.get(f"{base_url}/api/v1/dashboard/discovery/status")
if response.status_code == 200:
status_data = response.json()
print(f" ✓ Status check working")
print(f" - Is scanning: {status_data['status']['is_scanning']}")
print(f" - Current scan ID: {status_data['status']['current_scan_id']}")
else:
print(f" ✗ Status check failed: {response.status_code}")
except Exception as e:
print(f" ✗ Status check error: {e}")
# Test 4: Test discovery results endpoint (even if no results yet)
print("\n4. Testing discovery results endpoint...")
test_scan_id = "test_scan_123"
try:
response = requests.get(f"{base_url}/api/v1/dashboard/discovery/results/{test_scan_id}")
if response.status_code == 404:
print(f" ✓ Results endpoint working (correctly returned 404 for non-existent scan)")
elif response.status_code == 200:
print(f" ✓ Results endpoint working")
results_data = response.json()
print(f" - Scan ID: {results_data.get('scan_id', 'N/A')}")
print(f" - Status: {results_data.get('status', 'N/A')}")
print(f" - Endpoints found: {len(results_data.get('discovered_endpoints', []))}")
else:
print(f" ✗ Results endpoint unexpected response: {response.status_code}")
except Exception as e:
print(f" ✗ Results endpoint error: {e}")
print("\n" + "=" * 60)
print("Discovery API testing completed!")
if __name__ == "__main__":
# Test local test environment
test_discovery_endpoints("http://localhost:8081")

40
test_discovery_fast.py Normal file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test the fast discovery service
"""
import asyncio
import sys
import os
import logging
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
from discovery.protocol_discovery_fast import discovery_service
async def test_discovery_fast():
"""Test fast discovery service"""
print("Starting fast discovery test...")
try:
# Test full discovery
print("\nTesting full discovery...")
result = await discovery_service.discover_all_protocols("fast_test_scan")
print(f"\nDiscovery completed in {result.scan_duration:.2f} seconds!")
print(f"Total discovered endpoints: {len(result.discovered_endpoints)}")
print(f"Errors: {result.errors}")
for endpoint in result.discovered_endpoints:
print(f" - {endpoint.protocol_type}: {endpoint.address}")
except Exception as e:
print(f"Discovery failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_discovery_fast())

0
validate-deployment.sh Executable file → Normal file
View File