160 lines
5.6 KiB
Python
160 lines
5.6 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Migration Test Script
|
||
|
|
Tests the simplified signal name + tags architecture
|
||
|
|
"""
|
||
|
|
|
||
|
|
import sys
|
||
|
|
import os
|
||
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
|
|
||
|
|
from src.dashboard.simplified_models import (
|
||
|
|
ProtocolSignalCreate, ProtocolType, SignalDiscoveryResult
|
||
|
|
)
|
||
|
|
from src.dashboard.simplified_configuration_manager import simplified_configuration_manager
|
||
|
|
|
||
|
|
def test_simplified_models():
|
||
|
|
"""Test the new simplified models"""
|
||
|
|
print("\n=== Testing Simplified Models ===")
|
||
|
|
|
||
|
|
# Test 1: Create from discovery result
|
||
|
|
print("\n1. Testing discovery result conversion:")
|
||
|
|
discovery = SignalDiscoveryResult(
|
||
|
|
device_name="Water Pump Controller",
|
||
|
|
protocol_type=ProtocolType.MODBUS_TCP,
|
||
|
|
protocol_address="40001",
|
||
|
|
data_point="Speed",
|
||
|
|
device_address="192.168.1.100"
|
||
|
|
)
|
||
|
|
|
||
|
|
signal_create = discovery.to_protocol_signal_create()
|
||
|
|
print(f" Signal Name: {signal_create.signal_name}")
|
||
|
|
print(f" Tags: {signal_create.tags}")
|
||
|
|
print(f" Protocol: {signal_create.protocol_type}")
|
||
|
|
print(f" Address: {signal_create.protocol_address}")
|
||
|
|
print(f" DB Source: {signal_create.db_source}")
|
||
|
|
|
||
|
|
# Test 2: Validation
|
||
|
|
print("\n2. Testing validation:")
|
||
|
|
validation = simplified_configuration_manager.validate_signal_configuration(signal_create)
|
||
|
|
print(f" Valid: {validation['valid']}")
|
||
|
|
print(f" Errors: {validation['errors']}")
|
||
|
|
print(f" Warnings: {validation['warnings']}")
|
||
|
|
|
||
|
|
# Test 3: Add signal
|
||
|
|
print("\n3. Testing signal creation:")
|
||
|
|
success = simplified_configuration_manager.add_protocol_signal(signal_create)
|
||
|
|
print(f" Signal created: {success}")
|
||
|
|
|
||
|
|
# Test 4: Retrieve signals
|
||
|
|
print("\n4. Testing signal retrieval:")
|
||
|
|
signals = simplified_configuration_manager.get_protocol_signals()
|
||
|
|
print(f" Number of signals: {len(signals)}")
|
||
|
|
for signal in signals:
|
||
|
|
print(f" - {signal.signal_name} ({signal.signal_id})")
|
||
|
|
|
||
|
|
# Test 5: Tag-based filtering
|
||
|
|
print("\n5. Testing tag-based filtering:")
|
||
|
|
pump_signals = simplified_configuration_manager.search_signals_by_tags(["equipment:pump"])
|
||
|
|
print(f" Pump signals: {len(pump_signals)}")
|
||
|
|
|
||
|
|
# Test 6: All tags
|
||
|
|
print("\n6. Testing tag collection:")
|
||
|
|
all_tags = simplified_configuration_manager.get_all_tags()
|
||
|
|
print(f" All tags: {all_tags}")
|
||
|
|
|
||
|
|
def test_migration_scenarios():
|
||
|
|
"""Test various migration scenarios"""
|
||
|
|
print("\n=== Testing Migration Scenarios ===")
|
||
|
|
|
||
|
|
scenarios = [
|
||
|
|
{
|
||
|
|
"name": "Modbus Pump Speed",
|
||
|
|
"device_name": "Main Water Pump",
|
||
|
|
"protocol_type": ProtocolType.MODBUS_TCP,
|
||
|
|
"data_point": "Speed",
|
||
|
|
"protocol_address": "40001"
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"name": "OPC UA Temperature",
|
||
|
|
"device_name": "Boiler Temperature Sensor",
|
||
|
|
"protocol_type": ProtocolType.OPCUA,
|
||
|
|
"data_point": "Temperature",
|
||
|
|
"protocol_address": "ns=2;s=Temperature"
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"name": "REST API Status",
|
||
|
|
"device_name": "System Controller",
|
||
|
|
"protocol_type": ProtocolType.REST_API,
|
||
|
|
"data_point": "Status",
|
||
|
|
"protocol_address": "/api/v1/system/status"
|
||
|
|
}
|
||
|
|
]
|
||
|
|
|
||
|
|
for scenario in scenarios:
|
||
|
|
print(f"\nScenario: {scenario['name']}")
|
||
|
|
|
||
|
|
discovery = SignalDiscoveryResult(
|
||
|
|
device_name=scenario["device_name"],
|
||
|
|
protocol_type=scenario["protocol_type"],
|
||
|
|
protocol_address=scenario["protocol_address"],
|
||
|
|
data_point=scenario["data_point"]
|
||
|
|
)
|
||
|
|
|
||
|
|
signal_create = discovery.to_protocol_signal_create()
|
||
|
|
success = simplified_configuration_manager.add_protocol_signal(signal_create)
|
||
|
|
|
||
|
|
print(f" Created: {success}")
|
||
|
|
print(f" Signal: {signal_create.signal_name}")
|
||
|
|
print(f" Tags: {', '.join(signal_create.tags[:3])}...")
|
||
|
|
|
||
|
|
def compare_complexity():
|
||
|
|
"""Compare old vs new approach complexity"""
|
||
|
|
print("\n=== Complexity Comparison ===")
|
||
|
|
|
||
|
|
print("\nOLD APPROACH (Complex IDs):")
|
||
|
|
print(" Required fields:")
|
||
|
|
print(" - station_id: 'station_main'")
|
||
|
|
print(" - equipment_id: 'pump_primary'")
|
||
|
|
print(" - data_type_id: 'speed_pump'")
|
||
|
|
print(" - protocol_address: '40001'")
|
||
|
|
print(" - db_source: 'measurements.pump_speed'")
|
||
|
|
print(" Issues: Complex relationships, redundant IDs, confusing UX")
|
||
|
|
|
||
|
|
print("\nNEW APPROACH (Simple Names + Tags):")
|
||
|
|
print(" Required fields:")
|
||
|
|
print(" - signal_name: 'Main Water Pump Speed'")
|
||
|
|
print(" - tags: ['equipment:pump', 'protocol:modbus_tcp', 'data_point:speed']")
|
||
|
|
print(" - protocol_address: '40001'")
|
||
|
|
print(" - db_source: 'measurements.main_water_pump_speed'")
|
||
|
|
print(" Benefits: Intuitive, flexible, simpler relationships")
|
||
|
|
|
||
|
|
def main():
|
||
|
|
"""Run all tests"""
|
||
|
|
print("Calejo Control Migration Test")
|
||
|
|
print("=" * 50)
|
||
|
|
|
||
|
|
try:
|
||
|
|
test_simplified_models()
|
||
|
|
test_migration_scenarios()
|
||
|
|
compare_complexity()
|
||
|
|
|
||
|
|
print("\n" + "=" * 50)
|
||
|
|
print("✅ All migration tests completed successfully!")
|
||
|
|
print("\nMigration Benefits:")
|
||
|
|
print(" • Simplified user experience")
|
||
|
|
print(" • Flexible tag-based organization")
|
||
|
|
print(" • Intuitive signal names")
|
||
|
|
print(" • Reduced complexity")
|
||
|
|
print(" • Better discovery integration")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"\n❌ Migration test failed: {e}")
|
||
|
|
import traceback
|
||
|
|
traceback.print_exc()
|
||
|
|
return 1
|
||
|
|
|
||
|
|
return 0
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(main())
|