#!/usr/bin/env python3 """ Migration Test Script Tests the simplified signal name + tags architecture """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.dashboard.simplified_models import ( ProtocolSignalCreate, ProtocolType, SignalDiscoveryResult ) from src.dashboard.simplified_configuration_manager import simplified_configuration_manager def test_simplified_models(): """Test the new simplified models""" print("\n=== Testing Simplified Models ===") # Test 1: Create from discovery result print("\n1. Testing discovery result conversion:") discovery = SignalDiscoveryResult( device_name="Water Pump Controller", protocol_type=ProtocolType.MODBUS_TCP, protocol_address="40001", data_point="Speed", device_address="192.168.1.100" ) signal_create = discovery.to_protocol_signal_create() print(f" Signal Name: {signal_create.signal_name}") print(f" Tags: {signal_create.tags}") print(f" Protocol: {signal_create.protocol_type}") print(f" Address: {signal_create.protocol_address}") print(f" DB Source: {signal_create.db_source}") # Test 2: Validation print("\n2. Testing validation:") validation = simplified_configuration_manager.validate_signal_configuration(signal_create) print(f" Valid: {validation['valid']}") print(f" Errors: {validation['errors']}") print(f" Warnings: {validation['warnings']}") # Test 3: Add signal print("\n3. Testing signal creation:") success = simplified_configuration_manager.add_protocol_signal(signal_create) print(f" Signal created: {success}") # Test 4: Retrieve signals print("\n4. Testing signal retrieval:") signals = simplified_configuration_manager.get_protocol_signals() print(f" Number of signals: {len(signals)}") for signal in signals: print(f" - {signal.signal_name} ({signal.signal_id})") # Test 5: Tag-based filtering print("\n5. Testing tag-based filtering:") pump_signals = simplified_configuration_manager.search_signals_by_tags(["equipment:pump"]) print(f" Pump signals: {len(pump_signals)}") # Test 6: All tags print("\n6. Testing tag collection:") all_tags = simplified_configuration_manager.get_all_tags() print(f" All tags: {all_tags}") def test_migration_scenarios(): """Test various migration scenarios""" print("\n=== Testing Migration Scenarios ===") scenarios = [ { "name": "Modbus Pump Speed", "device_name": "Main Water Pump", "protocol_type": ProtocolType.MODBUS_TCP, "data_point": "Speed", "protocol_address": "40001" }, { "name": "OPC UA Temperature", "device_name": "Boiler Temperature Sensor", "protocol_type": ProtocolType.OPCUA, "data_point": "Temperature", "protocol_address": "ns=2;s=Temperature" }, { "name": "REST API Status", "device_name": "System Controller", "protocol_type": ProtocolType.REST_API, "data_point": "Status", "protocol_address": "/api/v1/system/status" } ] for scenario in scenarios: print(f"\nScenario: {scenario['name']}") discovery = SignalDiscoveryResult( device_name=scenario["device_name"], protocol_type=scenario["protocol_type"], protocol_address=scenario["protocol_address"], data_point=scenario["data_point"] ) signal_create = discovery.to_protocol_signal_create() success = simplified_configuration_manager.add_protocol_signal(signal_create) print(f" Created: {success}") print(f" Signal: {signal_create.signal_name}") print(f" Tags: {', '.join(signal_create.tags[:3])}...") def compare_complexity(): """Compare old vs new approach complexity""" print("\n=== Complexity Comparison ===") print("\nOLD APPROACH (Complex IDs):") print(" Required fields:") print(" - station_id: 'station_main'") print(" - equipment_id: 'pump_primary'") print(" - data_type_id: 'speed_pump'") print(" - protocol_address: '40001'") print(" - db_source: 'measurements.pump_speed'") print(" Issues: Complex relationships, redundant IDs, confusing UX") print("\nNEW APPROACH (Simple Names + Tags):") print(" Required fields:") print(" - signal_name: 'Main Water Pump Speed'") print(" - tags: ['equipment:pump', 'protocol:modbus_tcp', 'data_point:speed']") print(" - protocol_address: '40001'") print(" - db_source: 'measurements.main_water_pump_speed'") print(" Benefits: Intuitive, flexible, simpler relationships") def main(): """Run all tests""" print("Calejo Control Migration Test") print("=" * 50) try: test_simplified_models() test_migration_scenarios() compare_complexity() print("\n" + "=" * 50) print("✅ All migration tests completed successfully!") print("\nMigration Benefits:") print(" • Simplified user experience") print(" • Flexible tag-based organization") print(" • Intuitive signal names") print(" • Reduced complexity") print(" • Better discovery integration") except Exception as e: print(f"\n❌ Migration test failed: {e}") import traceback traceback.print_exc() return 1 return 0 if __name__ == "__main__": sys.exit(main())