CalejoControl/test_migration.py

160 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""
Migration Test Script
Tests the simplified signal name + tags architecture
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.dashboard.simplified_models import (
ProtocolSignalCreate, ProtocolType, SignalDiscoveryResult
)
from src.dashboard.simplified_configuration_manager import simplified_configuration_manager
def test_simplified_models():
"""Test the new simplified models"""
print("\n=== Testing Simplified Models ===")
# Test 1: Create from discovery result
print("\n1. Testing discovery result conversion:")
discovery = SignalDiscoveryResult(
device_name="Water Pump Controller",
protocol_type=ProtocolType.MODBUS_TCP,
protocol_address="40001",
data_point="Speed",
device_address="192.168.1.100"
)
signal_create = discovery.to_protocol_signal_create()
print(f" Signal Name: {signal_create.signal_name}")
print(f" Tags: {signal_create.tags}")
print(f" Protocol: {signal_create.protocol_type}")
print(f" Address: {signal_create.protocol_address}")
print(f" DB Source: {signal_create.db_source}")
# Test 2: Validation
print("\n2. Testing validation:")
validation = simplified_configuration_manager.validate_signal_configuration(signal_create)
print(f" Valid: {validation['valid']}")
print(f" Errors: {validation['errors']}")
print(f" Warnings: {validation['warnings']}")
# Test 3: Add signal
print("\n3. Testing signal creation:")
success = simplified_configuration_manager.add_protocol_signal(signal_create)
print(f" Signal created: {success}")
# Test 4: Retrieve signals
print("\n4. Testing signal retrieval:")
signals = simplified_configuration_manager.get_protocol_signals()
print(f" Number of signals: {len(signals)}")
for signal in signals:
print(f" - {signal.signal_name} ({signal.signal_id})")
# Test 5: Tag-based filtering
print("\n5. Testing tag-based filtering:")
pump_signals = simplified_configuration_manager.search_signals_by_tags(["equipment:pump"])
print(f" Pump signals: {len(pump_signals)}")
# Test 6: All tags
print("\n6. Testing tag collection:")
all_tags = simplified_configuration_manager.get_all_tags()
print(f" All tags: {all_tags}")
def test_migration_scenarios():
"""Test various migration scenarios"""
print("\n=== Testing Migration Scenarios ===")
scenarios = [
{
"name": "Modbus Pump Speed",
"device_name": "Main Water Pump",
"protocol_type": ProtocolType.MODBUS_TCP,
"data_point": "Speed",
"protocol_address": "40001"
},
{
"name": "OPC UA Temperature",
"device_name": "Boiler Temperature Sensor",
"protocol_type": ProtocolType.OPCUA,
"data_point": "Temperature",
"protocol_address": "ns=2;s=Temperature"
},
{
"name": "REST API Status",
"device_name": "System Controller",
"protocol_type": ProtocolType.REST_API,
"data_point": "Status",
"protocol_address": "/api/v1/system/status"
}
]
for scenario in scenarios:
print(f"\nScenario: {scenario['name']}")
discovery = SignalDiscoveryResult(
device_name=scenario["device_name"],
protocol_type=scenario["protocol_type"],
protocol_address=scenario["protocol_address"],
data_point=scenario["data_point"]
)
signal_create = discovery.to_protocol_signal_create()
success = simplified_configuration_manager.add_protocol_signal(signal_create)
print(f" Created: {success}")
print(f" Signal: {signal_create.signal_name}")
print(f" Tags: {', '.join(signal_create.tags[:3])}...")
def compare_complexity():
"""Compare old vs new approach complexity"""
print("\n=== Complexity Comparison ===")
print("\nOLD APPROACH (Complex IDs):")
print(" Required fields:")
print(" - station_id: 'station_main'")
print(" - equipment_id: 'pump_primary'")
print(" - data_type_id: 'speed_pump'")
print(" - protocol_address: '40001'")
print(" - db_source: 'measurements.pump_speed'")
print(" Issues: Complex relationships, redundant IDs, confusing UX")
print("\nNEW APPROACH (Simple Names + Tags):")
print(" Required fields:")
print(" - signal_name: 'Main Water Pump Speed'")
print(" - tags: ['equipment:pump', 'protocol:modbus_tcp', 'data_point:speed']")
print(" - protocol_address: '40001'")
print(" - db_source: 'measurements.main_water_pump_speed'")
print(" Benefits: Intuitive, flexible, simpler relationships")
def main():
"""Run all tests"""
print("Calejo Control Migration Test")
print("=" * 50)
try:
test_simplified_models()
test_migration_scenarios()
compare_complexity()
print("\n" + "=" * 50)
print("✅ All migration tests completed successfully!")
print("\nMigration Benefits:")
print(" • Simplified user experience")
print(" • Flexible tag-based organization")
print(" • Intuitive signal names")
print(" • Reduced complexity")
print(" • Better discovery integration")
except Exception as e:
print(f"\n❌ Migration test failed: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
sys.exit(main())