CalejoControl/tests/unit/test_security.py

373 lines
14 KiB
Python
Raw Permalink Normal View History

"""
Unit tests for security layer components.
"""
import pytest
from unittest.mock import Mock, patch
from datetime import datetime, timedelta, timezone
import jwt
import bcrypt
from src.core.security import (
AuthenticationManager,
AuthorizationManager,
SecurityManager,
UserRole,
User,
TokenData
)
from config.settings import settings
class TestAuthenticationManager:
"""Test cases for AuthenticationManager."""
def setup_method(self):
"""Set up test fixtures."""
self.auth_manager = AuthenticationManager(
secret_key="test-secret-key",
token_expire_minutes=60
)
def test_hash_password(self):
"""Test password hashing."""
password = "testpassword123"
hashed = self.auth_manager.hash_password(password)
# Verify the hash is valid
assert bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
# Verify different passwords produce different hashes
different_password = "differentpassword"
different_hashed = self.auth_manager.hash_password(different_password)
assert hashed != different_hashed
def test_verify_password(self):
"""Test password verification."""
password = "testpassword123"
hashed = self.auth_manager.hash_password(password)
# Test correct password
assert self.auth_manager.verify_password(password, hashed)
# Test incorrect password
assert not self.auth_manager.verify_password("wrongpassword", hashed)
def test_create_user(self):
"""Test user creation."""
user = self.auth_manager.create_user(
user_id="test_user_001",
username="testuser",
email="test@example.com",
role=UserRole.OPERATOR,
password="testpassword123"
)
assert user.user_id == "test_user_001"
assert user.username == "testuser"
assert user.email == "test@example.com"
assert user.role == UserRole.OPERATOR
assert user.active is True
assert user.created_at is not None
# Verify password was hashed and stored
assert "test_user_001" in self.auth_manager.password_hashes
hashed_password = self.auth_manager.password_hashes["test_user_001"]
assert bcrypt.checkpw("testpassword123".encode('utf-8'), hashed_password.encode('utf-8'))
def test_create_user_duplicate_id(self):
"""Test creating user with duplicate ID."""
self.auth_manager.create_user(
user_id="duplicate_user",
username="user1",
email="user1@example.com",
role=UserRole.OPERATOR,
password="password1"
)
with pytest.raises(ValueError, match="User with ID duplicate_user already exists"):
self.auth_manager.create_user(
user_id="duplicate_user",
username="user2",
email="user2@example.com",
role=UserRole.ENGINEER,
password="password2"
)
def test_authenticate_user_success(self):
"""Test successful user authentication."""
# Create a test user
self.auth_manager.create_user(
user_id="auth_test_user",
username="authuser",
email="auth@example.com",
role=UserRole.ENGINEER,
password="authpassword123"
)
# Test authentication
user = self.auth_manager.authenticate_user("authuser", "authpassword123")
assert user is not None
assert user.user_id == "auth_test_user"
assert user.username == "authuser"
assert user.role == UserRole.ENGINEER
assert user.last_login is not None
def test_authenticate_user_wrong_password(self):
"""Test authentication with wrong password."""
self.auth_manager.create_user(
user_id="wrong_pass_user",
username="wrongpass",
email="wrong@example.com",
role=UserRole.OPERATOR,
password="correctpassword"
)
user = self.auth_manager.authenticate_user("wrongpass", "wrongpassword")
assert user is None
def test_authenticate_user_not_found(self):
"""Test authentication with non-existent user."""
user = self.auth_manager.authenticate_user("nonexistent", "password")
assert user is None
def test_authenticate_user_inactive(self):
"""Test authentication with inactive user."""
# Create inactive user
user = User(
user_id="inactive_user",
username="inactive",
email="inactive@example.com",
role=UserRole.OPERATOR,
active=False,
created_at=datetime.now(timezone.utc)
)
self.auth_manager.users["inactive_user"] = user
self.auth_manager.password_hashes["inactive_user"] = self.auth_manager.hash_password("password")
result = self.auth_manager.authenticate_user("inactive", "password")
assert result is None
def test_create_access_token(self):
"""Test JWT token creation."""
user = User(
user_id="token_user",
username="tokenuser",
email="token@example.com",
role=UserRole.ADMINISTRATOR,
created_at=datetime.now(timezone.utc)
)
token = self.auth_manager.create_access_token(user)
# Verify token can be decoded
payload = jwt.decode(token, "test-secret-key", algorithms=["HS256"])
assert payload["user_id"] == "token_user"
assert payload["username"] == "tokenuser"
assert payload["role"] == "administrator"
assert "exp" in payload
def test_verify_token_success(self):
"""Test successful token verification."""
# Create user and token
user = User(
user_id="verify_user",
username="verifyuser",
email="verify@example.com",
role=UserRole.OPERATOR,
created_at=datetime.now(timezone.utc)
)
self.auth_manager.users["verify_user"] = user
token = self.auth_manager.create_access_token(user)
token_data = self.auth_manager.verify_token(token)
assert token_data is not None
assert token_data.user_id == "verify_user"
assert token_data.username == "verifyuser"
assert token_data.role == UserRole.OPERATOR
def test_verify_token_expired(self):
"""Test verification of expired token."""
# Create expired token
expired_time = datetime.now(timezone.utc) - timedelta(hours=1)
token_data = TokenData(
user_id="expired_user",
username="expireduser",
role=UserRole.OPERATOR,
exp=expired_time
)
token = jwt.encode(
token_data.dict(),
"test-secret-key",
algorithm="HS256"
)
result = self.auth_manager.verify_token(token)
assert result is None
def test_verify_token_invalid_signature(self):
"""Test verification of token with invalid signature."""
# Create token with wrong secret
user = User(
user_id="invalid_user",
username="invaliduser",
email="invalid@example.com",
role=UserRole.OPERATOR,
created_at=datetime.now(timezone.utc)
)
token = jwt.encode(
{
"user_id": "invalid_user",
"username": "invaliduser",
"role": "operator",
"exp": datetime.now(timezone.utc) + timedelta(minutes=60)
},
"wrong-secret-key",
algorithm="HS256"
)
result = self.auth_manager.verify_token(token)
assert result is None
class TestAuthorizationManager:
"""Test cases for AuthorizationManager."""
def setup_method(self):
"""Set up test fixtures."""
self.authz_manager = AuthorizationManager()
def test_has_permission_read_only(self):
"""Test permissions for read-only role."""
assert self.authz_manager.has_permission(UserRole.READ_ONLY, "read_pump_status")
assert self.authz_manager.has_permission(UserRole.READ_ONLY, "read_safety_status")
assert self.authz_manager.has_permission(UserRole.READ_ONLY, "read_audit_logs")
# Should not have write permissions
assert not self.authz_manager.has_permission(UserRole.READ_ONLY, "emergency_stop")
assert not self.authz_manager.has_permission(UserRole.READ_ONLY, "configure_safety_limits")
def test_has_permission_operator(self):
"""Test permissions for operator role."""
assert self.authz_manager.has_permission(UserRole.OPERATOR, "read_pump_status")
assert self.authz_manager.has_permission(UserRole.OPERATOR, "emergency_stop")
assert self.authz_manager.has_permission(UserRole.OPERATOR, "clear_emergency_stop")
# Should not have configuration permissions
assert not self.authz_manager.has_permission(UserRole.OPERATOR, "configure_safety_limits")
assert not self.authz_manager.has_permission(UserRole.OPERATOR, "manage_users")
def test_has_permission_engineer(self):
"""Test permissions for engineer role."""
assert self.authz_manager.has_permission(UserRole.ENGINEER, "read_pump_status")
assert self.authz_manager.has_permission(UserRole.ENGINEER, "emergency_stop")
assert self.authz_manager.has_permission(UserRole.ENGINEER, "configure_safety_limits")
assert self.authz_manager.has_permission(UserRole.ENGINEER, "manage_pump_configuration")
# Should not have administrative permissions
assert not self.authz_manager.has_permission(UserRole.ENGINEER, "manage_users")
def test_has_permission_administrator(self):
"""Test permissions for administrator role."""
# Administrator should have all permissions
all_permissions = [
"read_pump_status", "read_safety_status", "read_audit_logs",
"emergency_stop", "clear_emergency_stop", "view_alerts",
"configure_safety_limits", "manage_pump_configuration",
"view_system_metrics", "manage_users", "configure_system",
"access_all_stations"
]
for permission in all_permissions:
assert self.authz_manager.has_permission(UserRole.ADMINISTRATOR, permission)
def test_has_permission_unknown_permission(self):
"""Test unknown permission."""
assert not self.authz_manager.has_permission(UserRole.ADMINISTRATOR, "unknown_permission")
def test_get_allowed_actions(self):
"""Test getting all allowed actions for a role."""
operator_permissions = self.authz_manager.get_allowed_actions(UserRole.OPERATOR)
assert "read_pump_status" in operator_permissions
assert "emergency_stop" in operator_permissions
assert "clear_emergency_stop" in operator_permissions
assert "configure_safety_limits" not in operator_permissions
def test_can_access_station(self):
"""Test station access control."""
# Administrators can access all stations
assert self.authz_manager.can_access_station(UserRole.ADMINISTRATOR, "STATION_001")
# Other roles can access all stations (for now)
assert self.authz_manager.can_access_station(UserRole.OPERATOR, "STATION_001")
assert self.authz_manager.can_access_station(UserRole.ENGINEER, "STATION_001")
assert self.authz_manager.can_access_station(UserRole.READ_ONLY, "STATION_001")
class TestSecurityManager:
"""Test cases for SecurityManager."""
def setup_method(self):
"""Set up test fixtures."""
self.security_manager = SecurityManager()
def test_authenticate_success(self):
"""Test successful authentication."""
# Default users are created during initialization
token = self.security_manager.authenticate("operator", "operator123")
assert token is not None
# Verify token is valid
token_data = self.security_manager.verify_access_token(token)
assert token_data is not None
assert token_data.username == "operator"
assert token_data.role == UserRole.OPERATOR
def test_authenticate_failure(self):
"""Test failed authentication."""
token = self.security_manager.authenticate("nonexistent", "password")
assert token is None
def test_check_permission(self):
"""Test permission checking."""
# Create token for operator
token = self.security_manager.authenticate("operator", "operator123")
token_data = self.security_manager.verify_access_token(token)
# Operator should have emergency_stop permission
assert self.security_manager.check_permission(token_data, "emergency_stop")
# Operator should not have manage_users permission
assert not self.security_manager.check_permission(token_data, "manage_users")
def test_get_user_permissions(self):
"""Test getting user permissions."""
# Create token for engineer
token = self.security_manager.authenticate("engineer", "engineer123")
token_data = self.security_manager.verify_access_token(token)
permissions = self.security_manager.get_user_permissions(token_data)
# Engineer should have specific permissions
assert "configure_safety_limits" in permissions
assert "manage_pump_configuration" in permissions
assert "emergency_stop" in permissions
# Engineer should not have administrative permissions
assert "manage_users" not in permissions
def test_can_access_station(self):
"""Test station access control."""
# Create token for operator
token = self.security_manager.authenticate("operator", "operator123")
token_data = self.security_manager.verify_access_token(token)
# Operator should be able to access any station
assert self.security_manager.can_access_station(token_data, "STATION_001")
assert self.security_manager.can_access_station(token_data, "STATION_002")