CalejoControl/src/dashboard/api.py

1214 lines
48 KiB
Python
Raw Normal View History

"""
Dashboard API for Calejo Control Adapter
Provides REST endpoints for configuration management and system monitoring
"""
import json
import logging
from typing import Dict, Any, List, Optional
from fastapi import APIRouter, HTTPException, BackgroundTasks
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, ValidationError
from config.settings import Settings
from .configuration_manager import (
configuration_manager, OPCUAConfig, ModbusTCPConfig, PumpStationConfig,
PumpConfig, SafetyLimitsConfig, DataPointMapping, ProtocolType, ProtocolMapping
)
from src.discovery.protocol_discovery_fast import discovery_service, DiscoveryStatus, DiscoveredEndpoint
from datetime import datetime
logger = logging.getLogger(__name__)
# API Router
dashboard_router = APIRouter(prefix="/api/v1/dashboard", tags=["dashboard"])
# Pydantic models for configuration
class DatabaseConfig(BaseModel):
db_host: str = "localhost"
db_port: int = 5432
db_name: str = "calejo"
db_user: str = "calejo"
db_password: str = ""
class OPCUAConfig(BaseModel):
enabled: bool = True
host: str = "localhost"
port: int = 4840
class ModbusConfig(BaseModel):
enabled: bool = True
host: str = "localhost"
port: int = 502
unit_id: int = 1
class RESTAPIConfig(BaseModel):
enabled: bool = True
host: str = "0.0.0.0"
port: int = 8080
cors_enabled: bool = True
class MonitoringConfig(BaseModel):
health_monitor_port: int = 9090
metrics_enabled: bool = True
class SecurityConfig(BaseModel):
jwt_secret_key: str = ""
api_key: str = ""
class SystemConfig(BaseModel):
database: DatabaseConfig
opcua: OPCUAConfig
modbus: ModbusConfig
rest_api: RESTAPIConfig
monitoring: MonitoringConfig
security: SecurityConfig
class ValidationResult(BaseModel):
valid: bool
errors: List[str] = []
warnings: List[str] = []
class DashboardStatus(BaseModel):
application_status: str
database_status: str
opcua_status: str
modbus_status: str
rest_api_status: str
monitoring_status: str
# Global settings instance
settings = Settings()
@dashboard_router.get("/config", response_model=SystemConfig)
async def get_configuration():
"""Get current system configuration"""
try:
config = SystemConfig(
database=DatabaseConfig(
db_host=settings.db_host,
db_port=settings.db_port,
db_name=settings.db_name,
db_user=settings.db_user,
db_password="********" # Don't expose actual password
),
opcua=OPCUAConfig(
enabled=settings.opcua_enabled,
host=settings.opcua_host,
port=settings.opcua_port
),
modbus=ModbusConfig(
enabled=settings.modbus_enabled,
host=settings.modbus_host,
port=settings.modbus_port,
unit_id=settings.modbus_unit_id
),
rest_api=RESTAPIConfig(
enabled=settings.rest_api_enabled,
host=settings.rest_api_host,
port=settings.rest_api_port,
cors_enabled=settings.rest_api_cors_enabled
),
monitoring=MonitoringConfig(
health_monitor_port=settings.health_monitor_port,
metrics_enabled=True
),
security=SecurityConfig(
jwt_secret_key="********",
api_key="********"
)
)
return config
except Exception as e:
logger.error(f"Error getting configuration: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to retrieve configuration")
@dashboard_router.post("/config", response_model=ValidationResult)
async def update_configuration(config: SystemConfig, background_tasks: BackgroundTasks):
"""Update system configuration"""
try:
# Validate configuration
validation_result = validate_configuration(config)
if validation_result.valid:
# Save configuration in background
background_tasks.add_task(save_configuration, config)
return validation_result
except Exception as e:
logger.error(f"Error updating configuration: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to update configuration")
@dashboard_router.get("/status", response_model=DashboardStatus)
async def get_system_status():
"""Get current system status"""
try:
# This would integrate with the health monitor
# For now, return mock status
status = DashboardStatus(
application_status="running",
database_status="connected",
opcua_status="listening",
modbus_status="listening",
rest_api_status="running",
monitoring_status="active"
)
return status
except Exception as e:
logger.error(f"Error getting system status: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to retrieve system status")
@dashboard_router.post("/restart")
async def restart_system():
"""Restart the system (admin only)"""
# This would trigger a system restart
# For now, just log the request
logger.info("System restart requested via dashboard")
return {"message": "Restart request received", "status": "pending"}
@dashboard_router.get("/backup")
async def create_backup():
"""Create a system backup"""
# This would trigger the backup script
logger.info("Backup requested via dashboard")
return {"message": "Backup initiated", "status": "in_progress"}
@dashboard_router.get("/logs")
async def get_system_logs(limit: int = 100):
"""Get system logs"""
try:
# This would read from the application logs
# For now, return mock logs
logs = [
{"timestamp": "2024-01-01T10:00:00", "level": "INFO", "message": "System started"},
{"timestamp": "2024-01-01T10:01:00", "level": "INFO", "message": "Database connected"},
{"timestamp": "2024-01-01T10:02:00", "level": "INFO", "message": "OPC UA server started"}
]
return {"logs": logs[:limit]}
except Exception as e:
logger.error(f"Error getting logs: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to retrieve logs")
# Comprehensive Configuration Endpoints
@dashboard_router.post("/configure/protocol/opcua")
async def configure_opcua_protocol(config: OPCUAConfig):
"""Configure OPC UA protocol"""
try:
success = configuration_manager.configure_protocol(config)
if success:
return {"success": True, "message": "OPC UA protocol configured successfully"}
else:
raise HTTPException(status_code=400, detail="Failed to configure OPC UA protocol")
except Exception as e:
logger.error(f"Error configuring OPC UA protocol: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to configure OPC UA protocol: {str(e)}")
@dashboard_router.post("/configure/protocol/modbus-tcp")
async def configure_modbus_tcp_protocol(config: ModbusTCPConfig):
"""Configure Modbus TCP protocol"""
try:
success = configuration_manager.configure_protocol(config)
if success:
return {"success": True, "message": "Modbus TCP protocol configured successfully"}
else:
raise HTTPException(status_code=400, detail="Failed to configure Modbus TCP protocol")
except Exception as e:
logger.error(f"Error configuring Modbus TCP protocol: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to configure Modbus TCP protocol: {str(e)}")
@dashboard_router.post("/configure/station")
async def configure_pump_station(station: PumpStationConfig):
"""Configure a pump station"""
try:
success = configuration_manager.add_pump_station(station)
if success:
return {"success": True, "message": f"Pump station {station.name} configured successfully"}
else:
raise HTTPException(status_code=400, detail="Failed to configure pump station")
except Exception as e:
logger.error(f"Error configuring pump station: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to configure pump station: {str(e)}")
@dashboard_router.post("/configure/pump")
async def configure_pump(pump: PumpConfig):
"""Configure a pump"""
try:
success = configuration_manager.add_pump(pump)
if success:
return {"success": True, "message": f"Pump {pump.name} configured successfully"}
else:
raise HTTPException(status_code=400, detail="Failed to configure pump")
except Exception as e:
logger.error(f"Error configuring pump: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to configure pump: {str(e)}")
@dashboard_router.post("/configure/safety-limits")
async def configure_safety_limits(limits: SafetyLimitsConfig):
"""Configure safety limits for a pump"""
try:
success = configuration_manager.set_safety_limits(limits)
if success:
return {"success": True, "message": f"Safety limits configured for pump {limits.pump_id}"}
else:
raise HTTPException(status_code=400, detail="Failed to configure safety limits")
except Exception as e:
logger.error(f"Error configuring safety limits: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to configure safety limits: {str(e)}")
@dashboard_router.post("/configure/data-mapping")
async def configure_data_mapping(mapping: DataPointMapping):
"""Configure data point mapping"""
try:
success = configuration_manager.map_data_point(mapping)
if success:
return {"success": True, "message": "Data point mapping configured successfully"}
else:
raise HTTPException(status_code=400, detail="Failed to configure data mapping")
except Exception as e:
logger.error(f"Error configuring data mapping: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to configure data mapping: {str(e)}")
@dashboard_router.post("/discover-hardware")
async def discover_hardware():
"""Auto-discover connected hardware"""
try:
result = configuration_manager.auto_discover_hardware()
return {
"success": result.success,
"discovered_stations": [station.dict() for station in result.discovered_stations],
"discovered_pumps": [pump.dict() for pump in result.discovered_pumps],
"errors": result.errors,
"warnings": result.warnings
}
except Exception as e:
logger.error(f"Error during hardware discovery: {str(e)}")
raise HTTPException(status_code=500, detail=f"Hardware discovery failed: {str(e)}")
@dashboard_router.get("/validate-configuration")
async def validate_current_configuration():
"""Validate current configuration"""
try:
validation_result = configuration_manager.validate_configuration()
return validation_result
except Exception as e:
logger.error(f"Error validating configuration: {str(e)}")
raise HTTPException(status_code=500, detail=f"Configuration validation failed: {str(e)}")
@dashboard_router.get("/export-configuration")
async def export_configuration():
"""Export complete configuration"""
try:
config_data = configuration_manager.export_configuration()
return {
"success": True,
"configuration": config_data,
"message": "Configuration exported successfully"
}
except Exception as e:
logger.error(f"Error exporting configuration: {str(e)}")
raise HTTPException(status_code=500, detail=f"Configuration export failed: {str(e)}")
@dashboard_router.post("/import-configuration")
async def import_configuration(config_data: dict):
"""Import configuration from backup"""
try:
success = configuration_manager.import_configuration(config_data)
if success:
return {"success": True, "message": "Configuration imported successfully"}
else:
raise HTTPException(status_code=400, detail="Failed to import configuration")
except Exception as e:
logger.error(f"Error importing configuration: {str(e)}")
raise HTTPException(status_code=500, detail=f"Configuration import failed: {str(e)}")
def validate_configuration(config: SystemConfig) -> ValidationResult:
"""Validate configuration before applying"""
errors = []
warnings = []
# Database validation
if not config.database.db_host:
errors.append("Database host is required")
if not config.database.db_name:
errors.append("Database name is required")
if not config.database.db_user:
errors.append("Database user is required")
# Port validation
if not (1 <= config.database.db_port <= 65535):
errors.append("Database port must be between 1 and 65535")
if not (1 <= config.opcua.port <= 65535):
errors.append("OPC UA port must be between 1 and 65535")
if not (1 <= config.modbus.port <= 65535):
errors.append("Modbus port must be between 1 and 65535")
if not (1 <= config.rest_api.port <= 65535):
errors.append("REST API port must be between 1 and 65535")
if not (1 <= config.monitoring.health_monitor_port <= 65535):
errors.append("Health monitor port must be between 1 and 65535")
# Security warnings
if config.security.jwt_secret_key == "your-secret-key-change-in-production":
warnings.append("Default JWT secret key detected - please change for production")
if config.security.api_key == "your-api-key-here":
warnings.append("Default API key detected - please change for production")
return ValidationResult(
valid=len(errors) == 0,
errors=errors,
warnings=warnings
)
def save_configuration(config: SystemConfig):
"""Save configuration to settings file"""
try:
# This would update the settings file
# For now, just log the configuration
logger.info(f"Configuration update received: {config.json(indent=2)}")
# In a real implementation, this would:
# 1. Update the settings file
# 2. Restart affected services
# 3. Verify the new configuration
except Exception as e:
logger.error(f"Error saving configuration: {str(e)}")
# SCADA Configuration endpoints
@dashboard_router.get("/scada-status")
async def get_scada_status():
"""Get SCADA system status"""
try:
# Mock data for demonstration
return {
"modbus_enabled": True,
"modbus_port": 502,
"opcua_enabled": True,
"opcua_port": 4840,
"device_connections": 3,
"data_acquisition": True,
"last_update": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logger.error(f"Error getting SCADA status: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get SCADA status: {str(e)}")
@dashboard_router.get("/scada-config")
async def get_scada_config():
"""Get current SCADA configuration"""
try:
# Get actual configuration from settings
settings = Settings()
# Build device mapping from default stations and pumps
device_mapping_lines = []
stations = ["STATION_001", "STATION_002"]
pumps_by_station = {
"STATION_001": ["PUMP_001", "PUMP_002"],
"STATION_002": ["PUMP_003"]
}
for station_id in stations:
pumps = pumps_by_station.get(station_id, [])
for pump_id in pumps:
device_mapping_lines.append(f"{station_id},{pump_id},OPCUA,Pump_{pump_id}")
device_mapping_lines.append(f"{station_id},{pump_id},Modbus,Pump_{pump_id}")
device_mapping = "\n".join(device_mapping_lines)
return {
"modbus": {
"enabled": True,
"port": settings.modbus_port,
"slave_id": settings.modbus_unit_id,
"baud_rate": "115200"
},
"opcua": {
"enabled": True,
"port": settings.opcua_port,
"security_mode": "SignAndEncrypt"
},
"device_mapping": device_mapping
}
except Exception as e:
logger.error(f"Error getting SCADA configuration: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get SCADA configuration: {str(e)}")
@dashboard_router.post("/scada-config")
async def save_scada_config(config: dict):
"""Save SCADA configuration"""
try:
# In a real implementation, this would save to configuration
logger.info(f"SCADA configuration saved: {config}")
return {"success": True, "message": "SCADA configuration saved successfully"}
except Exception as e:
logger.error(f"Error saving SCADA configuration: {str(e)}")
return {"success": False, "error": str(e)}
@dashboard_router.get("/test-scada")
async def test_scada_connection():
"""Test SCADA connection"""
try:
import socket
# Test OPC UA connection
settings = Settings()
opcua_success = False
modbus_success = False
# Test OPC UA port
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex(('localhost', settings.opcua_port))
sock.close()
opcua_success = result == 0
except:
opcua_success = False
# Test Modbus port
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex(('localhost', settings.modbus_port))
sock.close()
modbus_success = result == 0
except:
modbus_success = False
if opcua_success and modbus_success:
return {"success": True, "message": "SCADA connection test successful - Both OPC UA and Modbus servers are running"}
elif opcua_success:
return {"success": True, "message": "OPC UA server is running, but Modbus server is not accessible"}
elif modbus_success:
return {"success": True, "message": "Modbus server is running, but OPC UA server is not accessible"}
else:
return {"success": False, "error": "Neither OPC UA nor Modbus servers are accessible"}
except Exception as e:
logger.error(f"Error testing SCADA connection: {str(e)}")
return {"success": False, "error": str(e)}
async def _generate_mock_signals(stations: Dict, pumps_by_station: Dict) -> List[Dict[str, Any]]:
"""Generate mock signal data as fallback when protocol servers are not available."""
import random
signals = []
for station_id, station in stations.items():
pumps = pumps_by_station.get(station_id, [])
for pump in pumps:
pump_id = pump['pump_id']
# OPC UA signals for this pump
signals.extend([
{
"name": f"Station_{station_id}_Pump_{pump_id}_Setpoint",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.Setpoint_Hz",
"data_type": "Float",
"current_value": f"{round(random.uniform(0.0, 50.0), 1)} Hz",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_ActualSpeed",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.ActualSpeed_Hz",
"data_type": "Float",
"current_value": f"{round(random.uniform(0.0, 50.0), 1)} Hz",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_Power",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.Power_kW",
"data_type": "Float",
"current_value": f"{round(random.uniform(0.0, 75.0), 1)} kW",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_FlowRate",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.FlowRate_m3h",
"data_type": "Float",
"current_value": f"{round(random.uniform(0.0, 500.0), 1)} m³/h",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_SafetyStatus",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.SafetyStatus",
"data_type": "String",
"current_value": "normal",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
])
# Modbus signals for this pump
# Use consistent register addresses for each pump type
pump_num = int(pump_id.split('_')[1]) # Extract pump number from PUMP_001, PUMP_002, etc.
base_register = 40000 + (pump_num * 10) # Each pump gets 10 registers starting at 40001, 40011, etc.
signals.extend([
{
"name": f"Station_{station_id}_Pump_{pump_id}_Setpoint",
"protocol": "modbus",
"address": f"{base_register + 1}",
"data_type": "Integer",
"current_value": f"{random.randint(0, 500)} Hz (x10)",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_ActualSpeed",
"protocol": "modbus",
"address": f"{base_register + 2}",
"data_type": "Integer",
"current_value": f"{random.randint(0, 500)} Hz (x10)",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_Power",
"protocol": "modbus",
"address": f"{base_register + 3}",
"data_type": "Integer",
"current_value": f"{random.randint(0, 750)} kW (x10)",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_Temperature",
"protocol": "modbus",
"address": f"{base_register + 4}",
"data_type": "Integer",
"current_value": f"{random.randint(20, 35)} °C",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
])
return signals
def _create_fallback_signals(station_id: str, pump_id: str) -> List[Dict[str, Any]]:
"""Create fallback signals when protocol servers are unavailable"""
import random
from datetime import datetime
# Generate realistic mock data
base_setpoint = random.randint(300, 450) # 30-45 Hz
actual_speed = base_setpoint + random.randint(-20, 20)
power = int(actual_speed * 2.5) # Rough power calculation
flow_rate = int(actual_speed * 10) # Rough flow calculation
temperature = random.randint(20, 35) # Normal operating temperature
return [
{
"name": f"Station_{station_id}_Pump_{pump_id}_Setpoint",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.Setpoint_Hz",
"data_type": "Float",
"current_value": f"{base_setpoint / 10:.1f} Hz",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_ActualSpeed",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.ActualSpeed_Hz",
"data_type": "Float",
"current_value": f"{actual_speed / 10:.1f} Hz",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_Power",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.Power_kW",
"data_type": "Float",
"current_value": f"{power / 10:.1f} kW",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_FlowRate",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.FlowRate_m3h",
"data_type": "Float",
"current_value": f"{flow_rate:.1f} m³/h",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_SafetyStatus",
"protocol": "opcua",
"address": f"ns=2;s=Station_{station_id}.Pump_{pump_id}.SafetyStatus",
"data_type": "String",
"current_value": "normal",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_Setpoint",
"protocol": "modbus",
"address": f"{40000 + int(pump_id[-1]) * 10 + 1}",
"data_type": "Integer",
"current_value": f"{base_setpoint} Hz (x10)",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_ActualSpeed",
"protocol": "modbus",
"address": f"{40000 + int(pump_id[-1]) * 10 + 2}",
"data_type": "Integer",
"current_value": f"{actual_speed} Hz (x10)",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_Power",
"protocol": "modbus",
"address": f"{40000 + int(pump_id[-1]) * 10 + 3}",
"data_type": "Integer",
"current_value": f"{power} kW (x10)",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": f"Station_{station_id}_Pump_{pump_id}_Temperature",
"protocol": "modbus",
"address": f"{40000 + int(pump_id[-1]) * 10 + 4}",
"data_type": "Integer",
"current_value": f"{temperature} °C",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
]
# Signal Overview endpoints
@dashboard_router.get("/signals")
async def get_signals():
"""Get overview of all active signals across protocols"""
# Use default stations and pumps since we don't have db access in this context
stations = {
"STATION_001": {"name": "Main Pump Station", "location": "Downtown"},
"STATION_002": {"name": "Secondary Pump Station", "location": "Industrial Area"}
}
pumps_by_station = {
"STATION_001": [
{"pump_id": "PUMP_001", "name": "Primary Pump"},
{"pump_id": "PUMP_002", "name": "Backup Pump"}
],
"STATION_002": [
{"pump_id": "PUMP_003", "name": "Industrial Pump"}
]
}
signals = []
# Try to use real protocol data for both Modbus and OPC UA
try:
from .protocol_clients import ModbusClient, ProtocolDataCollector
# Create protocol data collector
collector = ProtocolDataCollector()
# Collect data from all protocols
for station_id, station in stations.items():
pumps = pumps_by_station.get(station_id, [])
for pump in pumps:
pump_id = pump['pump_id']
# Get signal data from all protocols
pump_signals = await collector.get_signal_data(station_id, pump_id)
signals.extend(pump_signals)
logger.info("using_real_protocol_data", modbus_signals=len([s for s in signals if s["protocol"] == "modbus"]),
opcua_signals=len([s for s in signals if s["protocol"] == "opcua"]))
except Exception as e:
logger.error(f"error_using_real_protocol_data_using_fallback: {str(e)}")
# Fallback to mock data if any error occurs
for station_id, station in stations.items():
pumps = pumps_by_station.get(station_id, [])
for pump in pumps:
signals.extend(_create_fallback_signals(station_id, pump['pump_id']))
# Add system status signals
signals.extend([
{
"name": "System_Status",
"protocol": "rest",
"address": "/api/v1/dashboard/status",
"data_type": "String",
"current_value": "Running",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": "Database_Connection",
"protocol": "rest",
"address": "/api/v1/dashboard/status",
"data_type": "Boolean",
"current_value": "Connected",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
{
"name": "Health_Status",
"protocol": "rest",
"address": "/api/v1/dashboard/health",
"data_type": "String",
"current_value": "Healthy",
"quality": "Good",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
])
# Calculate protocol statistics
protocol_counts = {}
for signal in signals:
protocol = signal["protocol"]
if protocol not in protocol_counts:
protocol_counts[protocol] = 0
protocol_counts[protocol] += 1
protocol_stats = {}
for protocol, count in protocol_counts.items():
protocol_stats[protocol] = {
"active_signals": count,
"total_signals": count,
"error_rate": "0%"
}
return {
"signals": signals,
"protocol_stats": protocol_stats,
"total_signals": len(signals),
"last_updated": datetime.now().isoformat()
}
@dashboard_router.get("/signals/export")
async def export_signals():
"""Export signals to CSV format"""
try:
# Get signals data
signals_data = await get_signals()
signals = signals_data["signals"]
# Create CSV content
csv_content = "Signal Name,Protocol,Address,Data Type,Current Value,Quality,Timestamp\n"
for signal in signals:
csv_content += f'{signal["name"]},{signal["protocol"]},{signal["address"]},{signal["data_type"]},{signal["current_value"]},{signal["quality"]},{signal["timestamp"]}\n'
# Return as downloadable file
from fastapi.responses import Response
return Response(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=calejo-signals.csv"}
)
except Exception as e:
logger.error(f"Error exporting signals: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to export signals: {str(e)}")
# Protocol Mapping API Endpoints
@dashboard_router.get("/protocol-mappings")
async def get_protocol_mappings(
protocol_type: Optional[str] = None,
station_id: Optional[str] = None,
pump_id: Optional[str] = None
):
"""Get protocol mappings with optional filtering"""
try:
# Convert protocol_type string to enum if provided
protocol_enum = None
if protocol_type:
try:
protocol_enum = ProtocolType(protocol_type)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid protocol type: {protocol_type}")
mappings = configuration_manager.get_protocol_mappings(
protocol_type=protocol_enum,
station_id=station_id,
pump_id=pump_id
)
return {
"success": True,
"mappings": [mapping.dict() for mapping in mappings],
"count": len(mappings)
}
except Exception as e:
logger.error(f"Error getting protocol mappings: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get protocol mappings: {str(e)}")
@dashboard_router.post("/protocol-mappings")
async def create_protocol_mapping(mapping_data: dict):
"""Create a new protocol mapping"""
try:
# Convert protocol_type string to enum
if "protocol_type" not in mapping_data:
raise HTTPException(status_code=400, detail="protocol_type is required")
try:
protocol_enum = ProtocolType(mapping_data["protocol_type"])
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid protocol type: {mapping_data['protocol_type']}")
# Create ProtocolMapping object
import uuid
mapping = ProtocolMapping(
id=mapping_data.get("id") or f"{mapping_data.get('protocol_type')}_{mapping_data.get('station_id', 'unknown')}_{mapping_data.get('pump_id', 'unknown')}_{uuid.uuid4().hex[:8]}",
protocol_type=protocol_enum,
station_id=mapping_data.get("station_id"),
pump_id=mapping_data.get("pump_id"),
data_type=mapping_data.get("data_type"),
protocol_address=mapping_data.get("protocol_address"),
db_source=mapping_data.get("db_source"),
transformation_rules=mapping_data.get("transformation_rules", []),
modbus_config=mapping_data.get("modbus_config"),
opcua_config=mapping_data.get("opcua_config")
)
success = configuration_manager.add_protocol_mapping(mapping)
if success:
return {
"success": True,
"message": "Protocol mapping created successfully",
"mapping": mapping.dict()
}
else:
raise HTTPException(status_code=400, detail="Failed to create protocol mapping")
except ValidationError as e:
logger.error(f"Validation error creating protocol mapping: {str(e)}")
raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}")
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
logger.error(f"Error creating protocol mapping: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to create protocol mapping: {str(e)}")
@dashboard_router.put("/protocol-mappings/{mapping_id}")
async def update_protocol_mapping(mapping_id: str, mapping_data: dict):
"""Update an existing protocol mapping"""
try:
# Convert protocol_type string to enum if provided
protocol_enum = None
if "protocol_type" in mapping_data:
try:
protocol_enum = ProtocolType(mapping_data["protocol_type"])
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid protocol type: {mapping_data['protocol_type']}")
# Create updated ProtocolMapping object
updated_mapping = ProtocolMapping(
id=mapping_id, # Use the ID from URL
protocol_type=protocol_enum or ProtocolType(mapping_data.get("protocol_type")),
station_id=mapping_data.get("station_id"),
pump_id=mapping_data.get("pump_id"),
data_type=mapping_data.get("data_type"),
protocol_address=mapping_data.get("protocol_address"),
db_source=mapping_data.get("db_source"),
transformation_rules=mapping_data.get("transformation_rules", []),
modbus_config=mapping_data.get("modbus_config"),
opcua_config=mapping_data.get("opcua_config")
)
success = configuration_manager.update_protocol_mapping(mapping_id, updated_mapping)
if success:
return {
"success": True,
"message": "Protocol mapping updated successfully",
"mapping": updated_mapping.dict()
}
else:
raise HTTPException(status_code=404, detail=f"Protocol mapping {mapping_id} not found")
except ValidationError as e:
logger.error(f"Validation error updating protocol mapping: {str(e)}")
raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}")
except Exception as e:
logger.error(f"Error updating protocol mapping: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to update protocol mapping: {str(e)}")
@dashboard_router.delete("/protocol-mappings/{mapping_id}")
async def delete_protocol_mapping(mapping_id: str):
"""Delete a protocol mapping"""
try:
success = configuration_manager.delete_protocol_mapping(mapping_id)
if success:
return {
"success": True,
"message": f"Protocol mapping {mapping_id} deleted successfully"
}
else:
raise HTTPException(status_code=404, detail=f"Protocol mapping {mapping_id} not found")
except Exception as e:
logger.error(f"Error deleting protocol mapping: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to delete protocol mapping: {str(e)}")
# Protocol Discovery API Endpoints
@dashboard_router.get("/discovery/status")
async def get_discovery_status():
"""Get current discovery service status"""
try:
status = discovery_service.get_discovery_status()
return {
"success": True,
"status": status
}
except Exception as e:
logger.error(f"Error getting discovery status: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get discovery status: {str(e)}")
@dashboard_router.post("/discovery/scan")
async def start_discovery_scan(background_tasks: BackgroundTasks):
"""Start a new discovery scan"""
try:
# Check if scan is already running
status = discovery_service.get_discovery_status()
if status["is_scanning"]:
raise HTTPException(status_code=409, detail="Discovery scan already in progress")
# Start discovery scan in background
scan_id = f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
async def run_discovery():
await discovery_service.discover_all_protocols(scan_id)
background_tasks.add_task(run_discovery)
return {
"success": True,
"scan_id": scan_id,
"message": "Discovery scan started successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error starting discovery scan: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to start discovery scan: {str(e)}")
@dashboard_router.get("/discovery/results/{scan_id}")
async def get_discovery_results(scan_id: str):
"""Get results for a specific discovery scan"""
try:
result = discovery_service.get_scan_result(scan_id)
if not result:
raise HTTPException(status_code=404, detail=f"Discovery scan {scan_id} not found")
# Convert discovered endpoints to dict format
endpoints_data = []
for endpoint in result.discovered_endpoints:
endpoint_data = {
"protocol_type": endpoint.protocol_type.value,
"address": endpoint.address,
"port": endpoint.port,
"device_id": endpoint.device_id,
"device_name": endpoint.device_name,
"capabilities": endpoint.capabilities,
"response_time": endpoint.response_time,
"discovered_at": endpoint.discovered_at.isoformat() if endpoint.discovered_at else None
}
endpoints_data.append(endpoint_data)
return {
"success": True,
"scan_id": scan_id,
"status": result.status.value,
"scan_duration": result.scan_duration,
"errors": result.errors,
"timestamp": result.timestamp.isoformat() if result.timestamp else None,
"discovered_endpoints": endpoints_data
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting discovery results: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get discovery results: {str(e)}")
@dashboard_router.get("/discovery/recent")
async def get_recent_discoveries():
"""Get most recently discovered endpoints"""
try:
# Get recent scan results and extract endpoints
status = discovery_service.get_discovery_status()
recent_scans = status.get("recent_scans", [])[-5:] # Last 5 scans
recent_endpoints = []
for scan_id in recent_scans:
result = discovery_service.get_scan_result(scan_id)
if result and result.discovered_endpoints:
recent_endpoints.extend(result.discovered_endpoints)
# Sort by discovery time (most recent first) and limit
recent_endpoints.sort(key=lambda x: x.discovered_at or datetime.min, reverse=True)
recent_endpoints = recent_endpoints[:20] # Limit to 20 most recent
# Convert to dict format
endpoints_data = []
for endpoint in recent_endpoints:
endpoint_data = {
"protocol_type": endpoint.protocol_type.value,
"address": endpoint.address,
"port": endpoint.port,
"device_id": endpoint.device_id,
"device_name": endpoint.device_name,
"capabilities": endpoint.capabilities,
"response_time": endpoint.response_time,
"discovered_at": endpoint.discovered_at.isoformat() if endpoint.discovered_at else None
}
endpoints_data.append(endpoint_data)
return {
"success": True,
"recent_endpoints": endpoints_data
}
except Exception as e:
logger.error(f"Error getting recent discoveries: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get recent discoveries: {str(e)}")
@dashboard_router.post("/discovery/apply/{scan_id}")
async def apply_discovery_results(scan_id: str, station_id: str, pump_id: str, data_type: str, db_source: str):
"""Apply discovered endpoints as protocol mappings"""
try:
result = discovery_service.get_scan_result(scan_id)
if not result:
raise HTTPException(status_code=404, detail=f"Discovery scan {scan_id} not found")
if result.status != DiscoveryStatus.COMPLETED:
raise HTTPException(status_code=400, detail="Cannot apply incomplete discovery scan")
created_mappings = []
errors = []
for endpoint in result.discovered_endpoints:
try:
# Create protocol mapping from discovered endpoint
mapping_id = f"{endpoint.device_id}_{data_type}"
protocol_mapping = ProtocolMapping(
id=mapping_id,
station_id=station_id,
pump_id=pump_id,
protocol_type=endpoint.protocol_type,
protocol_address=endpoint.address,
data_type=data_type,
db_source=db_source
)
# Add to configuration manager
success = configuration_manager.add_protocol_mapping(protocol_mapping)
if success:
created_mappings.append(mapping_id)
else:
errors.append(f"Failed to create mapping for {endpoint.device_name}")
except Exception as e:
errors.append(f"Error creating mapping for {endpoint.device_name}: {str(e)}")
return {
"success": True,
"created_mappings": created_mappings,
"errors": errors,
"message": f"Created {len(created_mappings)} protocol mappings from discovery results"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error applying discovery results: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to apply discovery results: {str(e)}")
@dashboard_router.post("/protocol-mappings/{mapping_id}/validate")
async def validate_protocol_mapping(mapping_id: str, mapping_data: dict):
"""Validate a protocol mapping without saving it"""
try:
# Convert protocol_type string to enum
if "protocol_type" not in mapping_data:
raise HTTPException(status_code=400, detail="protocol_type is required")
try:
protocol_enum = ProtocolType(mapping_data["protocol_type"])
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid protocol type: {mapping_data['protocol_type']}")
# Create temporary ProtocolMapping object for validation
temp_mapping = ProtocolMapping(
id=mapping_id,
protocol_type=protocol_enum,
station_id=mapping_data.get("station_id"),
pump_id=mapping_data.get("pump_id"),
data_type=mapping_data.get("data_type"),
protocol_address=mapping_data.get("protocol_address"),
db_source=mapping_data.get("db_source"),
transformation_rules=mapping_data.get("transformation_rules", []),
modbus_config=mapping_data.get("modbus_config"),
opcua_config=mapping_data.get("opcua_config")
)
validation_result = configuration_manager.validate_protocol_mapping(temp_mapping)
return {
"success": True,
"valid": validation_result["valid"],
"errors": validation_result["errors"],
"warnings": validation_result["warnings"]
}
except ValidationError as e:
logger.error(f"Validation error in protocol mapping: {str(e)}")
raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}")
except Exception as e:
logger.error(f"Error validating protocol mapping: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to validate protocol mapping: {str(e)}")
@dashboard_router.get("/protocol-mappings/protocols")
async def get_available_protocols():
"""Get list of available protocol types"""
try:
protocols = [
{
"value": protocol.value,
"label": protocol.value.replace("_", " ").upper(),
"description": f"Configure {protocol.value.replace('_', ' ').title()} mappings"
}
for protocol in ProtocolType
]
return {
"success": True,
"protocols": protocols
}
except Exception as e:
logger.error(f"Error getting available protocols: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get available protocols: {str(e)}")