CalejoControl/src/core/compliance_audit.py

447 lines
14 KiB
Python
Raw Normal View History

"""
Compliance Audit Logger for Calejo Control Adapter.
Provides enhanced audit logging capabilities compliant with:
- IEC 62443 (Industrial Automation and Control Systems Security)
- ISO 27001 (Information Security Management)
- NIS2 Directive (Network and Information Systems Security)
"""
import structlog
from datetime import datetime, timezone
from typing import Dict, Any, Optional, List
from enum import Enum
from config.settings import settings
logger = structlog.get_logger()
class AuditEventType(Enum):
"""Audit event types for compliance requirements."""
# Authentication and Authorization
USER_LOGIN = "user_login"
USER_LOGOUT = "user_logout"
USER_CREATED = "user_created"
USER_MODIFIED = "user_modified"
USER_DELETED = "user_deleted"
PASSWORD_CHANGED = "password_changed"
ROLE_CHANGED = "role_changed"
# System Access
SYSTEM_START = "system_start"
SYSTEM_STOP = "system_stop"
SYSTEM_CONFIG_CHANGED = "system_config_changed"
# Control Operations
SETPOINT_CHANGED = "setpoint_changed"
EMERGENCY_STOP_ACTIVATED = "emergency_stop_activated"
EMERGENCY_STOP_RESET = "emergency_stop_reset"
PUMP_CONTROL = "pump_control"
VALVE_CONTROL = "valve_control"
# Security Events
ACCESS_DENIED = "access_denied"
INVALID_AUTHENTICATION = "invalid_authentication"
SESSION_TIMEOUT = "session_timeout"
CERTIFICATE_EXPIRED = "certificate_expired"
CERTIFICATE_ROTATED = "certificate_rotated"
# Data Operations
DATA_READ = "data_read"
DATA_WRITE = "data_write"
DATA_EXPORT = "data_export"
DATA_DELETED = "data_deleted"
# Network Operations
CONNECTION_ESTABLISHED = "connection_established"
CONNECTION_CLOSED = "connection_closed"
CONNECTION_REJECTED = "connection_rejected"
# Compliance Events
AUDIT_LOG_ACCESSED = "audit_log_accessed"
COMPLIANCE_CHECK = "compliance_check"
SECURITY_SCAN = "security_scan"
class AuditSeverity(Enum):
"""Audit event severity levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ComplianceAuditLogger:
"""
Enhanced audit logger for compliance requirements.
Provides comprehensive audit trail capabilities compliant with
IEC 62443, ISO 27001, and NIS2 Directive requirements.
"""
def __init__(self, db_client):
self.db_client = db_client
self.logger = structlog.get_logger(__name__)
def log_compliance_event(
self,
event_type: AuditEventType,
severity: AuditSeverity,
user_id: Optional[str] = None,
station_id: Optional[str] = None,
pump_id: Optional[str] = None,
ip_address: Optional[str] = None,
protocol: Optional[str] = None,
action: Optional[str] = None,
resource: Optional[str] = None,
result: Optional[str] = None,
reason: Optional[str] = None,
compliance_standard: Optional[List[str]] = None,
event_data: Optional[Dict[str, Any]] = None
):
"""
Log a compliance audit event.
Args:
event_type: Type of audit event
severity: Severity level
user_id: User ID performing the action
station_id: Station ID if applicable
pump_id: Pump ID if applicable
ip_address: Source IP address
protocol: Communication protocol used
action: Specific action performed
resource: Resource accessed or modified
result: Result of the action (success/failure)
reason: Reason for the action or failure
compliance_standard: List of compliance standards this event relates to
event_data: Additional event-specific data
"""
# Default compliance standards
if compliance_standard is None:
compliance_standard = ["IEC_62443", "ISO_27001", "NIS2"]
# Create comprehensive audit record
audit_record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": event_type.value,
"severity": severity.value,
"user_id": user_id,
"station_id": station_id,
"pump_id": pump_id,
"ip_address": ip_address,
"protocol": protocol,
"action": action,
"resource": resource,
"result": result,
"reason": reason,
"compliance_standard": compliance_standard,
"event_data": event_data or {},
"app_name": settings.app_name,
"app_version": settings.app_version,
"environment": settings.environment
}
# Log to structured logs
self.logger.info(
"compliance_audit_event",
**audit_record
)
# Log to database if audit logging is enabled
if settings.audit_log_enabled:
self._log_to_database(audit_record)
def _log_to_database(self, audit_record: Dict[str, Any]):
"""Log audit record to database."""
try:
query = """
INSERT INTO compliance_audit_log
(timestamp, event_type, severity, user_id, station_id, pump_id,
ip_address, protocol, action, resource, result, reason,
compliance_standard, event_data, app_name, app_version, environment)
VALUES (:timestamp, :event_type, :severity, :user_id, :station_id, :pump_id,
:ip_address, :protocol, :action, :resource, :result, :reason,
:compliance_standard, :event_data, :app_name, :app_version, :environment)
"""
self.db_client.execute(
query,
audit_record
)
except Exception as e:
self.logger.error(
"compliance_audit_database_failed",
error=str(e),
event_type=audit_record["event_type"]
)
def log_user_authentication(
self,
user_id: str,
result: str,
ip_address: str,
reason: Optional[str] = None
):
"""Log user authentication events."""
event_type = (
AuditEventType.USER_LOGIN if result == "success"
else AuditEventType.INVALID_AUTHENTICATION
)
severity = (
AuditSeverity.LOW if result == "success"
else AuditSeverity.HIGH
)
self.log_compliance_event(
event_type=event_type,
severity=severity,
user_id=user_id,
ip_address=ip_address,
action="authentication",
resource="system",
result=result,
reason=reason
)
def log_access_control(
self,
user_id: str,
action: str,
resource: str,
result: str,
ip_address: str,
reason: Optional[str] = None
):
"""Log access control events."""
event_type = (
AuditEventType.ACCESS_DENIED if result == "denied"
else AuditEventType.DATA_READ if action == "read"
else AuditEventType.DATA_WRITE if action == "write"
else AuditEventType.SYSTEM_CONFIG_CHANGED
)
severity = (
AuditSeverity.HIGH if result == "denied"
else AuditSeverity.MEDIUM
)
self.log_compliance_event(
event_type=event_type,
severity=severity,
user_id=user_id,
ip_address=ip_address,
action=action,
resource=resource,
result=result,
reason=reason
)
def log_control_operation(
self,
user_id: str,
station_id: str,
pump_id: Optional[str],
action: str,
resource: str,
result: str,
ip_address: str,
event_data: Optional[Dict[str, Any]] = None
):
"""Log control system operations."""
event_type = (
AuditEventType.SETPOINT_CHANGED if action == "setpoint_change"
else AuditEventType.EMERGENCY_STOP_ACTIVATED if action == "emergency_stop"
else AuditEventType.PUMP_CONTROL if "pump" in resource.lower()
else AuditEventType.VALVE_CONTROL if "valve" in resource.lower()
else AuditEventType.SYSTEM_CONFIG_CHANGED
)
severity = (
AuditSeverity.CRITICAL if action == "emergency_stop"
else AuditSeverity.HIGH if action == "setpoint_change"
else AuditSeverity.MEDIUM
)
self.log_compliance_event(
event_type=event_type,
severity=severity,
user_id=user_id,
station_id=station_id,
pump_id=pump_id,
ip_address=ip_address,
action=action,
resource=resource,
result=result,
event_data=event_data
)
def log_security_event(
self,
event_type: AuditEventType,
severity: AuditSeverity,
user_id: Optional[str] = None,
station_id: Optional[str] = None,
ip_address: Optional[str] = None,
reason: Optional[str] = None,
event_data: Optional[Dict[str, Any]] = None
):
"""Log security-related events."""
self.log_compliance_event(
event_type=event_type,
severity=severity,
user_id=user_id,
station_id=station_id,
ip_address=ip_address,
action="security_event",
resource="system",
result="detected",
reason=reason,
event_data=event_data
)
def get_audit_trail(
self,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
user_id: Optional[str] = None,
event_type: Optional[AuditEventType] = None,
severity: Optional[AuditSeverity] = None
) -> List[Dict[str, Any]]:
"""
Retrieve audit trail for compliance reporting.
Args:
start_time: Start time for audit trail
end_time: End time for audit trail
user_id: Filter by user ID
event_type: Filter by event type
severity: Filter by severity
Returns:
List of audit records
"""
try:
query = """
SELECT * FROM compliance_audit_log
WHERE 1=1
"""
params = []
if start_time:
query += " AND timestamp >= %s"
params.append(start_time.isoformat())
if end_time:
query += " AND timestamp <= %s"
params.append(end_time.isoformat())
if user_id:
query += " AND user_id = %s"
params.append(user_id)
if event_type:
query += " AND event_type = %s"
params.append(event_type.value)
if severity:
query += " AND severity = %s"
params.append(severity.value)
query += " ORDER BY timestamp DESC"
result = self.db_client.fetch_all(query, params)
# Log the audit trail access
self.log_compliance_event(
event_type=AuditEventType.AUDIT_LOG_ACCESSED,
severity=AuditSeverity.LOW,
user_id=user_id,
action="audit_trail_access",
resource="audit_log",
result="success"
)
return result
except Exception as e:
self.logger.error(
"audit_trail_retrieval_failed",
error=str(e),
user_id=user_id
)
return []
def generate_compliance_report(
self,
start_time: datetime,
end_time: datetime,
compliance_standard: str
) -> Dict[str, Any]:
"""
Generate compliance report for specified standard.
Args:
start_time: Report start time
end_time: Report end time
compliance_standard: Compliance standard to report on
Returns:
Compliance report data
"""
try:
query = """
SELECT
event_type,
severity,
COUNT(*) as count,
COUNT(DISTINCT user_id) as unique_users
FROM compliance_audit_log
WHERE timestamp BETWEEN %s AND %s
AND %s = ANY(compliance_standard)
GROUP BY event_type, severity
ORDER BY count DESC
"""
result = self.db_client.fetch_all(
query,
(start_time.isoformat(), end_time.isoformat(), compliance_standard)
)
report = {
"compliance_standard": compliance_standard,
"report_period": {
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat()
},
"summary": {
"total_events": sum(row["count"] for row in result),
"unique_users": sum(row["unique_users"] for row in result),
"event_types": len(set(row["event_type"] for row in result))
},
"events_by_type": result
}
# Log the compliance report generation
self.log_compliance_event(
event_type=AuditEventType.COMPLIANCE_CHECK,
severity=AuditSeverity.LOW,
action="compliance_report_generated",
resource="compliance",
result="success",
event_data={
"compliance_standard": compliance_standard,
"report_period": report["report_period"]
}
)
return report
except Exception as e:
self.logger.error(
"compliance_report_generation_failed",
error=str(e),
compliance_standard=compliance_standard
)
return {"error": str(e)}