-- Calejo Control Adapter Database Schema -- Version: 2.0 -- Date: October 26, 2025 -- Drop existing tables if they exist (for clean setup) DROP TABLE IF EXISTS protocol_mappings CASCADE; DROP TABLE IF EXISTS audit_log CASCADE; DROP TABLE IF EXISTS emergency_stop_events CASCADE; DROP TABLE IF EXISTS failsafe_events CASCADE; DROP TABLE IF EXISTS safety_limit_violations CASCADE; DROP TABLE IF EXISTS pump_safety_limits CASCADE; DROP TABLE IF EXISTS pump_feedback CASCADE; DROP TABLE IF EXISTS pump_plans CASCADE; DROP TABLE IF EXISTS pumps CASCADE; DROP TABLE IF EXISTS pump_stations CASCADE; -- Create pump_stations table CREATE TABLE pump_stations ( station_id VARCHAR(50) PRIMARY KEY, station_name VARCHAR(200) NOT NULL, location VARCHAR(200), latitude DECIMAL(10, 8), longitude DECIMAL(11, 8), timezone VARCHAR(50) DEFAULT 'Europe/Rome', active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE pump_stations IS 'Metadata about pump stations'; COMMENT ON COLUMN pump_stations.timezone IS 'Timezone for the pump station (default: Europe/Rome for Italian utilities)'; -- Create protocol_mappings table CREATE TABLE protocol_mappings ( mapping_id VARCHAR(100) PRIMARY KEY, station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, protocol_type VARCHAR(20) NOT NULL, -- 'opcua', 'modbus_tcp', 'modbus_rtu', 'rest_api' protocol_address VARCHAR(500) NOT NULL, -- Node ID, register address, endpoint URL data_type VARCHAR(50) NOT NULL, -- 'setpoint', 'status', 'control', 'safety' db_source VARCHAR(100) NOT NULL, -- Database field name -- Metadata created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), created_by VARCHAR(100), enabled BOOLEAN DEFAULT TRUE, FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id), -- Constraints CONSTRAINT valid_protocol_type CHECK (protocol_type IN ('opcua', 'modbus_tcp', 'modbus_rtu', 'rest_api')), CONSTRAINT valid_data_type CHECK (data_type IN ('setpoint', 'status', 'control', 'safety', 'alarm', 'configuration')), CONSTRAINT unique_protocol_address UNIQUE (protocol_type, protocol_address) ); COMMENT ON TABLE protocol_mappings IS 'Protocol-agnostic mappings between database fields and protocol addresses'; COMMENT ON COLUMN protocol_mappings.protocol_type IS 'Protocol type: opcua, modbus_tcp, modbus_rtu, rest_api'; COMMENT ON COLUMN protocol_mappings.protocol_address IS 'Protocol-specific address (OPC UA node ID, Modbus register, REST endpoint)'; COMMENT ON COLUMN protocol_mappings.data_type IS 'Type of data: setpoint, status, control, safety, alarm, configuration'; COMMENT ON COLUMN protocol_mappings.db_source IS 'Database field name that this mapping represents'; -- Create indexes for protocol mappings CREATE INDEX idx_protocol_mappings_station_pump ON protocol_mappings(station_id, pump_id); CREATE INDEX idx_protocol_mappings_protocol_type ON protocol_mappings(protocol_type, enabled); CREATE INDEX idx_protocol_mappings_data_type ON protocol_mappings(data_type, enabled); -- Create pumps table CREATE TABLE pumps ( pump_id VARCHAR(50) NOT NULL, station_id VARCHAR(50) NOT NULL, pump_name VARCHAR(200), pump_type VARCHAR(50), -- 'SUBMERSIBLE', 'CENTRIFUGAL', etc. control_type VARCHAR(50) NOT NULL, -- 'LEVEL_CONTROLLED', 'POWER_CONTROLLED', 'DIRECT_SPEED' manufacturer VARCHAR(100), model VARCHAR(100), rated_power_kw DECIMAL(10, 2), min_speed_hz DECIMAL(5, 2) DEFAULT 20.0, max_speed_hz DECIMAL(5, 2) DEFAULT 50.0, -- Default setpoint (used in failsafe mode) default_setpoint_hz DECIMAL(5, 2) NOT NULL DEFAULT 35.0, -- Control-specific parameters (JSON) control_parameters JSONB, active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), PRIMARY KEY (station_id, pump_id), FOREIGN KEY (station_id) REFERENCES pump_stations(station_id) ); COMMENT ON TABLE pumps IS 'Metadata about individual pumps'; COMMENT ON COLUMN pumps.default_setpoint_hz IS 'Default safe setpoint used in failsafe mode (existing pump configuration)'; COMMENT ON COLUMN pumps.control_parameters IS 'Control-specific parameters in JSON format (PID gains, thresholds, etc.)'; -- Create pump_plans table with version-based updates CREATE TABLE pump_plans ( plan_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, interval_start TIMESTAMP NOT NULL, interval_end TIMESTAMP NOT NULL, -- Optimization outputs target_flow_m3h DECIMAL(10, 2), target_power_kw DECIMAL(10, 2), target_level_m DECIMAL(5, 2), suggested_speed_hz DECIMAL(5, 2), -- Version-based update metadata plan_created_at TIMESTAMP DEFAULT NOW(), plan_updated_at TIMESTAMP DEFAULT NOW(), plan_version INTEGER DEFAULT 1, optimization_run_id INTEGER, -- Status tracking plan_status VARCHAR(20) DEFAULT 'ACTIVE', -- 'ACTIVE', 'SUPERSEDED', 'CANCELLED' superseded_by INTEGER, -- Points to plan_id that superseded this plan FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id), FOREIGN KEY (superseded_by) REFERENCES pump_plans(plan_id) ); COMMENT ON TABLE pump_plans IS 'Optimization plans generated by Calejo Optimize with version-based updates'; COMMENT ON COLUMN pump_plans.plan_version IS 'Version number for tracking updates (increments on each update)'; COMMENT ON COLUMN pump_plans.plan_status IS 'Status of the plan: ACTIVE, SUPERSEDED, or CANCELLED'; COMMENT ON COLUMN pump_plans.superseded_by IS 'Reference to plan that superseded this one (for audit trail)'; -- Create indexes for efficient plan management CREATE INDEX idx_pump_plans_active ON pump_plans(station_id, pump_id, interval_start, interval_end) WHERE plan_status = 'ACTIVE'; CREATE INDEX idx_pump_plans_latest ON pump_plans(station_id, pump_id, plan_created_at DESC) WHERE plan_status = 'ACTIVE'; CREATE INDEX idx_pump_plans_version ON pump_plans(station_id, pump_id, plan_version DESC); -- Create generic optimization_plans table for future actuator support CREATE TABLE optimization_plans ( plan_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, resource_id VARCHAR(50) NOT NULL, resource_type VARCHAR(20) NOT NULL DEFAULT 'PUMP', -- 'PUMP', 'VALVE', 'BLOWER', etc. interval_start TIMESTAMP NOT NULL, interval_end TIMESTAMP NOT NULL, -- Generic optimization targets (JSON for flexibility) optimization_targets JSONB NOT NULL, -- Version-based update metadata plan_created_at TIMESTAMP DEFAULT NOW(), plan_updated_at TIMESTAMP DEFAULT NOW(), plan_version INTEGER DEFAULT 1, optimization_run_id INTEGER, plan_priority INTEGER DEFAULT 1, -- Status tracking plan_status VARCHAR(20) DEFAULT 'ACTIVE', superseded_by INTEGER, FOREIGN KEY (station_id, resource_id) REFERENCES pumps(station_id, pump_id), FOREIGN KEY (superseded_by) REFERENCES optimization_plans(plan_id), -- Ensure resource_type is valid CONSTRAINT valid_resource_type CHECK (resource_type IN ('PUMP', 'VALVE', 'BLOWER', 'COMPRESSOR', 'GATE')) ); COMMENT ON TABLE optimization_plans IS 'Generic optimization plans for all actuator types with version-based updates'; COMMENT ON COLUMN optimization_plans.optimization_targets IS 'JSON structure containing actuator-specific optimization targets'; COMMENT ON COLUMN optimization_plans.resource_type IS 'Type of actuator: PUMP, VALVE, BLOWER, COMPRESSOR, GATE'; -- Create indexes for generic optimization plans CREATE INDEX idx_optimization_plans_active ON optimization_plans(station_id, resource_id, interval_start, interval_end) WHERE plan_status = 'ACTIVE'; CREATE INDEX idx_optimization_plans_latest ON optimization_plans(station_id, resource_id, plan_created_at DESC) WHERE plan_status = 'ACTIVE'; CREATE INDEX idx_optimization_plans_type ON optimization_plans(resource_type, plan_status); -- Create pump_feedback table CREATE TABLE pump_feedback ( feedback_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, timestamp TIMESTAMP NOT NULL DEFAULT NOW(), -- Actual measurements actual_speed_hz DECIMAL(5, 2), actual_power_kw DECIMAL(10, 2), actual_flow_m3h DECIMAL(10, 2), wet_well_level_m DECIMAL(5, 2), pump_running BOOLEAN, -- Status alarm_active BOOLEAN DEFAULT FALSE, alarm_code VARCHAR(50), FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) ); COMMENT ON TABLE pump_feedback IS 'Real-time feedback from pumps'; -- Create index for latest feedback CREATE INDEX idx_pump_feedback_latest ON pump_feedback(station_id, pump_id, timestamp DESC); -- Create pump_safety_limits table CREATE TABLE pump_safety_limits ( station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, -- Speed limits (Layer 2: Station Safety Limits) hard_min_speed_hz DECIMAL(5, 2) NOT NULL, hard_max_speed_hz DECIMAL(5, 2) NOT NULL, -- Level limits hard_min_level_m DECIMAL(5, 2), hard_max_level_m DECIMAL(5, 2), emergency_stop_level_m DECIMAL(5, 2), -- Emergency stop if exceeded dry_run_protection_level_m DECIMAL(5, 2), -- Stop pump if level too low -- Power and flow limits hard_max_power_kw DECIMAL(10, 2), hard_max_flow_m3h DECIMAL(10, 2), -- Operational limits max_starts_per_hour INTEGER DEFAULT 6, min_run_time_seconds INTEGER DEFAULT 300, max_continuous_run_hours INTEGER DEFAULT 24, -- Rate of change limits (prevent sudden changes that damage equipment) max_speed_change_hz_per_min DECIMAL(5, 2) DEFAULT 5.0, -- Metadata set_by VARCHAR(100), set_at TIMESTAMP DEFAULT NOW(), approved_by VARCHAR(100), -- Dual approval approved_at TIMESTAMP, notes TEXT, PRIMARY KEY (station_id, pump_id), FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id), -- Constraints CONSTRAINT check_speed_limits CHECK (hard_min_speed_hz >= 15 AND hard_max_speed_hz <= 55), CONSTRAINT check_min_max CHECK (hard_min_speed_hz < hard_max_speed_hz), CONSTRAINT check_approved CHECK (approved_by IS NULL OR approved_by != set_by) -- Dual approval ); COMMENT ON TABLE pump_safety_limits IS 'Hard operational limits enforced by Calejo Control adapter (Layer 2)'; COMMENT ON COLUMN pump_safety_limits.hard_min_speed_hz IS 'Minimum speed enforced by adapter (must be >= PLC physical limit)'; COMMENT ON COLUMN pump_safety_limits.hard_max_speed_hz IS 'Maximum speed enforced by adapter (must be <= PLC physical limit)'; -- Create safety_limit_violations table CREATE TABLE safety_limit_violations ( violation_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, requested_setpoint DECIMAL(5, 2), enforced_setpoint DECIMAL(5, 2), violations TEXT[], -- Array of violation descriptions timestamp TIMESTAMP DEFAULT NOW(), FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) ); COMMENT ON TABLE safety_limit_violations IS 'Audit trail of safety limit violations (immutable)'; -- Create index for violation timestamp CREATE INDEX idx_violations_timestamp ON safety_limit_violations(timestamp DESC); -- Create failsafe_events table CREATE TABLE failsafe_events ( event_id SERIAL PRIMARY KEY, station_id VARCHAR(50) NOT NULL, pump_id VARCHAR(50) NOT NULL, event_type VARCHAR(50) NOT NULL, -- 'DATABASE_TIMEOUT', 'COMMUNICATION_LOSS', 'INVALID_DATA' default_setpoint DECIMAL(5, 2), triggered_by VARCHAR(100), timestamp TIMESTAMP DEFAULT NOW(), cleared_at TIMESTAMP, notes TEXT, FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) ); COMMENT ON TABLE failsafe_events IS 'Record of failsafe mode activations'; -- Create index for active failsafe events CREATE INDEX idx_failsafe_events_active ON failsafe_events(station_id, pump_id, timestamp DESC) WHERE cleared_at IS NULL; -- Create emergency_stop_events table CREATE TABLE emergency_stop_events ( event_id SERIAL PRIMARY KEY, station_id VARCHAR(50), -- NULL = all stations pump_id VARCHAR(50), -- NULL = all pumps at station triggered_by VARCHAR(100) NOT NULL, reason TEXT NOT NULL, timestamp TIMESTAMP DEFAULT NOW(), cleared_at TIMESTAMP, cleared_by VARCHAR(100), clear_notes TEXT, FOREIGN KEY (station_id, pump_id) REFERENCES pumps(station_id, pump_id) ); COMMENT ON TABLE emergency_stop_events IS 'Emergency stop event log (big red button)'; -- Create index for active emergency stops CREATE INDEX idx_emergency_stop_active ON emergency_stop_events(timestamp DESC) WHERE cleared_at IS NULL; -- Create audit_log table CREATE TABLE audit_log ( log_id BIGSERIAL PRIMARY KEY, timestamp TIMESTAMP DEFAULT NOW(), event_type VARCHAR(50) NOT NULL, severity VARCHAR(20) NOT NULL, -- 'INFO', 'WARNING', 'CRITICAL' station_id VARCHAR(50), pump_id VARCHAR(50), user_id VARCHAR(100), ip_address INET, protocol VARCHAR(20), -- 'OPC_UA', 'MODBUS', 'REST_API' action VARCHAR(100), resource VARCHAR(200), result VARCHAR(20), -- 'SUCCESS', 'FAILURE', 'DENIED' event_data JSONB ); COMMENT ON TABLE audit_log IS 'Immutable audit trail for compliance (IEC 62443, ISO 27001, NIS2 Directive)'; -- Create indexes for audit log CREATE INDEX idx_audit_log_timestamp ON audit_log(timestamp DESC); CREATE INDEX idx_audit_log_severity ON audit_log(severity, timestamp DESC); CREATE INDEX idx_audit_log_user ON audit_log(user_id, timestamp DESC); -- Make audit log immutable (append-only) CREATE RULE audit_log_no_update AS ON UPDATE TO audit_log DO INSTEAD NOTHING; CREATE RULE audit_log_no_delete AS ON DELETE TO audit_log DO INSTEAD NOTHING; -- Create database user for adapter -- Note: This should be executed by database administrator -- CREATE USER control_reader WITH PASSWORD 'secure_password'; -- GRANT CONNECT ON DATABASE calejo TO control_reader; -- GRANT USAGE ON SCHEMA public TO control_reader; -- GRANT SELECT ON pump_stations, pumps, pump_plans, pump_feedback, pump_safety_limits TO control_reader; -- GRANT INSERT ON safety_limit_violations, failsafe_events, emergency_stop_events, audit_log TO control_reader; -- GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO control_reader;