202 lines
7.7 KiB
Python
202 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test API Integration for Simplified Protocol Signals
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
import json
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from src.dashboard.simplified_models import ProtocolSignalCreate, ProtocolType
|
|
from src.dashboard.simplified_configuration_manager import simplified_configuration_manager
|
|
|
|
async def test_api_endpoints():
|
|
"""Test the API endpoints through the configuration manager"""
|
|
print("\n=== Testing API Integration ===")
|
|
|
|
# Test 1: Create signals
|
|
print("\n1. Creating test signals:")
|
|
test_signals = [
|
|
{
|
|
"signal_name": "Boiler Temperature Reading",
|
|
"tags": ["equipment:boiler", "protocol:modbus_tcp", "data_point:temperature", "unit:celsius"],
|
|
"protocol_type": "modbus_tcp",
|
|
"protocol_address": "30001",
|
|
"db_source": "measurements.boiler_temperature"
|
|
},
|
|
{
|
|
"signal_name": "Pump Motor Status",
|
|
"tags": ["equipment:pump", "protocol:opcua", "data_point:status", "type:boolean"],
|
|
"protocol_type": "opcua",
|
|
"protocol_address": "ns=2;s=PumpStatus",
|
|
"db_source": "measurements.pump_status"
|
|
},
|
|
{
|
|
"signal_name": "System Pressure",
|
|
"tags": ["equipment:system", "protocol:modbus_tcp", "data_point:pressure", "unit:psi"],
|
|
"protocol_type": "modbus_tcp",
|
|
"protocol_address": "30002",
|
|
"db_source": "measurements.system_pressure"
|
|
}
|
|
]
|
|
|
|
created_signals = []
|
|
for signal_data in test_signals:
|
|
signal_create = ProtocolSignalCreate(
|
|
signal_name=signal_data["signal_name"],
|
|
tags=signal_data["tags"],
|
|
protocol_type=ProtocolType(signal_data["protocol_type"]),
|
|
protocol_address=signal_data["protocol_address"],
|
|
db_source=signal_data["db_source"]
|
|
)
|
|
|
|
success = simplified_configuration_manager.add_protocol_signal(signal_create)
|
|
if success:
|
|
# Get the actual signal ID that was used
|
|
signal_id = signal_create.generate_signal_id()
|
|
signal = simplified_configuration_manager.get_protocol_signal(signal_id)
|
|
if signal:
|
|
created_signals.append(signal)
|
|
print(f" ✓ Created: {signal.signal_name}")
|
|
else:
|
|
print(f" ⚠ Created but cannot retrieve: {signal_data['signal_name']}")
|
|
else:
|
|
print(f" ✗ Failed to create: {signal_data['signal_name']}")
|
|
|
|
# Test 2: Get all signals
|
|
print("\n2. Getting all signals:")
|
|
all_signals = simplified_configuration_manager.get_protocol_signals()
|
|
print(f" Total signals: {len(all_signals)}")
|
|
for signal in all_signals:
|
|
print(f" - {signal.signal_name} ({signal.protocol_type.value})")
|
|
|
|
# Test 3: Filter by tags
|
|
print("\n3. Filtering by tags:")
|
|
modbus_signals = simplified_configuration_manager.search_signals_by_tags(["protocol:modbus_tcp"])
|
|
print(f" Modbus signals: {len(modbus_signals)}")
|
|
for signal in modbus_signals:
|
|
print(f" - {signal.signal_name}")
|
|
|
|
# Test 4: Get all tags
|
|
print("\n4. Getting all tags:")
|
|
all_tags = simplified_configuration_manager.get_all_tags()
|
|
print(f" All tags: {all_tags}")
|
|
|
|
# Test 5: Update a signal
|
|
print("\n5. Updating a signal:")
|
|
if created_signals:
|
|
signal_to_update = created_signals[0]
|
|
print(f" Updating: {signal_to_update.signal_name}")
|
|
|
|
from src.dashboard.simplified_models import ProtocolSignalUpdate
|
|
update_data = ProtocolSignalUpdate(
|
|
signal_name="Updated Boiler Temperature",
|
|
tags=["equipment:boiler", "protocol:modbus_tcp", "data_point:temperature", "unit:celsius", "updated:true"]
|
|
)
|
|
|
|
success = simplified_configuration_manager.update_protocol_signal(signal_to_update.signal_id, update_data)
|
|
if success:
|
|
updated_signal = simplified_configuration_manager.get_protocol_signal(signal_to_update.signal_id)
|
|
print(f" ✓ Updated to: {updated_signal.signal_name}")
|
|
print(f" New tags: {updated_signal.tags}")
|
|
else:
|
|
print(f" ✗ Failed to update")
|
|
|
|
# Test 6: Delete a signal
|
|
print("\n6. Deleting a signal:")
|
|
if len(created_signals) > 1:
|
|
signal_to_delete = created_signals[1]
|
|
print(f" Deleting: {signal_to_delete.signal_name}")
|
|
|
|
success = simplified_configuration_manager.delete_protocol_signal(signal_to_delete.signal_id)
|
|
if success:
|
|
print(f" ✓ Deleted successfully")
|
|
else:
|
|
print(f" ✗ Failed to delete")
|
|
|
|
# Test 7: Get remaining signals
|
|
print("\n7. Final signal count:")
|
|
final_signals = simplified_configuration_manager.get_protocol_signals()
|
|
print(f" Remaining signals: {len(final_signals)}")
|
|
|
|
return len(final_signals) > 0
|
|
|
|
def test_api_compatibility():
|
|
"""Test that the new API is compatible with discovery results"""
|
|
print("\n=== Testing Discovery Compatibility ===")
|
|
|
|
from src.dashboard.simplified_models import SignalDiscoveryResult
|
|
|
|
# Simulate discovery results
|
|
discovery_results = [
|
|
{
|
|
"device_name": "Flow Meter",
|
|
"protocol_type": "modbus_tcp",
|
|
"protocol_address": "30003",
|
|
"data_point": "Flow Rate",
|
|
"device_address": "192.168.1.105"
|
|
},
|
|
{
|
|
"device_name": "Level Sensor",
|
|
"protocol_type": "opcua",
|
|
"protocol_address": "ns=2;s=Level",
|
|
"data_point": "Tank Level",
|
|
"device_address": "192.168.1.106"
|
|
}
|
|
]
|
|
|
|
for discovery_data in discovery_results:
|
|
discovery = SignalDiscoveryResult(**discovery_data)
|
|
signal_create = discovery.to_protocol_signal_create()
|
|
|
|
print(f"\nDiscovery: {discovery.device_name}")
|
|
print(f" Signal Name: {signal_create.signal_name}")
|
|
print(f" Tags: {signal_create.tags}")
|
|
print(f" Protocol: {signal_create.protocol_type.value}")
|
|
print(f" Address: {signal_create.protocol_address}")
|
|
print(f" DB Source: {signal_create.db_source}")
|
|
|
|
# Validate
|
|
validation = simplified_configuration_manager.validate_signal_configuration(signal_create)
|
|
print(f" Valid: {validation['valid']}")
|
|
if validation['warnings']:
|
|
print(f" Warnings: {validation['warnings']}")
|
|
|
|
def main():
|
|
"""Run all API integration tests"""
|
|
print("Calejo Control API Integration Test")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
# Run async tests
|
|
success = asyncio.run(test_api_endpoints())
|
|
|
|
# Run compatibility tests
|
|
test_api_compatibility()
|
|
|
|
print("\n" + "=" * 50)
|
|
if success:
|
|
print("✅ All API integration tests completed successfully!")
|
|
print("\nAPI Endpoints Available:")
|
|
print(" • GET /api/v1/dashboard/protocol-signals")
|
|
print(" • GET /api/v1/dashboard/protocol-signals/{signal_id}")
|
|
print(" • POST /api/v1/dashboard/protocol-signals")
|
|
print(" • PUT /api/v1/dashboard/protocol-signals/{signal_id}")
|
|
print(" • DELETE /api/v1/dashboard/protocol-signals/{signal_id}")
|
|
print(" • GET /api/v1/dashboard/protocol-signals/tags/all")
|
|
else:
|
|
print("❌ Some API integration tests failed")
|
|
return 1
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ API integration test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main()) |