CalejoControl/src/core/security.py

358 lines
12 KiB
Python

"""
Security layer for Calejo Control Adapter.
Provides authentication, authorization, and security utilities for compliance
with IEC 62443, ISO 27001, and NIS2 Directive requirements.
"""
import jwt
import bcrypt
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, List, Any
from enum import Enum
from pydantic import BaseModel
import structlog
from config.settings import settings
logger = structlog.get_logger()
class UserRole(str, Enum):
"""User roles for role-based access control."""
OPERATOR = "operator"
ENGINEER = "engineer"
ADMINISTRATOR = "administrator"
READ_ONLY = "read_only"
class User(BaseModel):
"""User model for authentication and authorization."""
user_id: str
username: str
email: str
role: UserRole
active: bool = True
created_at: datetime
last_login: Optional[datetime] = None
class TokenData(BaseModel):
"""Data encoded in JWT tokens."""
user_id: str
username: str
role: UserRole
exp: datetime
class AuthenticationManager:
"""
Manages user authentication with JWT tokens and password hashing.
Implements security controls for IEC 62443, ISO 27001, and NIS2 compliance.
"""
def __init__(self, secret_key: str, algorithm: str = "HS256", token_expire_minutes: int = 60):
self.secret_key = secret_key
self.algorithm = algorithm
self.token_expire_minutes = token_expire_minutes
# In-memory user store (in production, this would be a database)
self.users: Dict[str, User] = {}
self.password_hashes: Dict[str, str] = {}
# Initialize with default users for development
self._initialize_default_users()
def _initialize_default_users(self):
"""Initialize default users for development and testing."""
default_users = [
{
"user_id": "admin_001",
"username": "admin",
"email": "admin@calejo.com",
"role": UserRole.ADMINISTRATOR,
"password": "admin123"
},
{
"user_id": "operator_001",
"username": "operator",
"email": "operator@calejo.com",
"role": UserRole.OPERATOR,
"password": "operator123"
},
{
"user_id": "engineer_001",
"username": "engineer",
"email": "engineer@calejo.com",
"role": UserRole.ENGINEER,
"password": "engineer123"
},
{
"user_id": "viewer_001",
"username": "viewer",
"email": "viewer@calejo.com",
"role": UserRole.READ_ONLY,
"password": "viewer123"
}
]
for user_data in default_users:
self.create_user(
user_id=user_data["user_id"],
username=user_data["username"],
email=user_data["email"],
role=user_data["role"],
password=user_data["password"]
)
def hash_password(self, password: str) -> str:
"""Hash a password using bcrypt."""
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return bcrypt.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
def create_user(self, user_id: str, username: str, email: str, role: UserRole, password: str) -> User:
"""Create a new user with hashed password."""
if user_id in self.users:
raise ValueError(f"User with ID {user_id} already exists")
user = User(
user_id=user_id,
username=username,
email=email,
role=role,
created_at=datetime.now(timezone.utc)
)
self.users[user_id] = user
self.password_hashes[user_id] = self.hash_password(password)
logger.info("user_created", user_id=user_id, username=username, role=role.value)
return user
def authenticate_user(self, username: str, password: str) -> Optional[User]:
"""Authenticate a user and return user object if successful."""
# Find user by username
user = None
for u in self.users.values():
if u.username == username and u.active:
user = u
break
if not user:
logger.warning("authentication_failed", username=username, reason="user_not_found")
return None
# Verify password
if not self.verify_password(password, self.password_hashes[user.user_id]):
logger.warning("authentication_failed", username=username, reason="invalid_password")
return None
# Update last login
user.last_login = datetime.now(timezone.utc)
logger.info("user_authenticated", user_id=user.user_id, username=username, role=user.role.value)
return user
def create_access_token(self, user: User) -> str:
"""Create a JWT access token for the user."""
expires_delta = timedelta(minutes=self.token_expire_minutes)
expire = datetime.now(timezone.utc) + expires_delta
token_data = TokenData(
user_id=user.user_id,
username=user.username,
role=user.role,
exp=expire
)
encoded_jwt = jwt.encode(
token_data.dict(),
self.secret_key,
algorithm=self.algorithm
)
return encoded_jwt
def verify_token(self, token: str) -> Optional[TokenData]:
"""Verify and decode a JWT token."""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
token_data = TokenData(**payload)
# Check if token is expired
if token_data.exp < datetime.now(timezone.utc):
logger.warning("token_expired", username=token_data.username)
return None
# Verify user still exists and is active
user = self.users.get(token_data.user_id)
if not user or not user.active:
logger.warning("token_invalid_user", username=token_data.username)
return None
return token_data
except jwt.PyJWTError as e:
logger.warning("token_verification_failed", error=str(e))
return None
class AuthorizationManager:
"""
Manages role-based access control (RBAC) for authorization.
Implements IEC 62443 zone security model and ISO 27001 access control requirements.
"""
def __init__(self):
# Define permissions for each role
self.role_permissions = {
UserRole.READ_ONLY: {
"read_pump_status",
"read_safety_status",
"read_audit_logs"
},
UserRole.OPERATOR: {
"read_pump_status",
"read_safety_status",
"read_audit_logs",
"emergency_stop",
"clear_emergency_stop",
"view_alerts"
},
UserRole.ENGINEER: {
"read_pump_status",
"read_safety_status",
"read_audit_logs",
"emergency_stop",
"clear_emergency_stop",
"view_alerts",
"configure_safety_limits",
"manage_pump_configuration",
"view_system_metrics"
},
UserRole.ADMINISTRATOR: {
"read_pump_status",
"read_safety_status",
"read_audit_logs",
"emergency_stop",
"clear_emergency_stop",
"view_alerts",
"configure_safety_limits",
"manage_pump_configuration",
"view_system_metrics",
"manage_users",
"configure_system",
"access_all_stations"
}
}
def has_permission(self, role: UserRole, permission: str) -> bool:
"""Check if a role has the specified permission."""
permissions = self.role_permissions.get(role, set())
return permission in permissions
def get_allowed_actions(self, role: UserRole) -> List[str]:
"""Get all allowed actions for a role."""
return list(self.role_permissions.get(role, set()))
def can_access_station(self, role: UserRole, station_id: str) -> bool:
"""
Check if user can access a specific station.
Administrators can access all stations, others may have station-specific
permissions (to be implemented with database integration).
"""
if role == UserRole.ADMINISTRATOR:
return True
# For now, all authenticated users can access all stations
# In production, this would check station-specific permissions
return True
class SecurityManager:
"""
Main security manager that coordinates authentication and authorization.
Provides a unified interface for security operations and compliance logging.
"""
def __init__(self, audit_logger=None):
self.auth_manager = AuthenticationManager(
secret_key=settings.jwt_secret_key,
token_expire_minutes=settings.jwt_token_expire_minutes
)
self.authz_manager = AuthorizationManager()
self.audit_logger = audit_logger
def authenticate(self, username: str, password: str) -> Optional[str]:
"""Authenticate user and return JWT token if successful."""
user = self.auth_manager.authenticate_user(username, password)
if user:
token = self.auth_manager.create_access_token(user)
# Log successful authentication
if self.audit_logger:
self.audit_logger.log(
event_type="USER_AUTHENTICATION",
severity="INFO",
user_id=user.user_id,
event_data={
"username": username,
"role": user.role.value,
"result": "SUCCESS"
}
)
return token
# Log failed authentication
if self.audit_logger:
self.audit_logger.log(
event_type="USER_AUTHENTICATION",
severity="WARNING",
event_data={
"username": username,
"result": "FAILURE"
}
)
return None
def verify_access_token(self, token: str) -> Optional[TokenData]:
"""Verify JWT token and return token data if valid."""
return self.auth_manager.verify_token(token)
def check_permission(self, token_data: TokenData, permission: str) -> bool:
"""Check if user has the specified permission."""
return self.authz_manager.has_permission(token_data.role, permission)
def can_access_station(self, token_data: TokenData, station_id: str) -> bool:
"""Check if user can access the specified station."""
return self.authz_manager.can_access_station(token_data.role, station_id)
def get_user_permissions(self, token_data: TokenData) -> List[str]:
"""Get all permissions for the user."""
return self.authz_manager.get_allowed_actions(token_data.role)
# Global security manager instance
security_manager: Optional[SecurityManager] = None
def get_security_manager() -> SecurityManager:
"""Get or create the global security manager instance."""
global security_manager
if security_manager is None:
security_manager = SecurityManager()
return security_manager