373 lines
14 KiB
Python
373 lines
14 KiB
Python
"""
|
|
Unit tests for security layer components.
|
|
"""
|
|
|
|
import pytest
|
|
from unittest.mock import Mock, patch
|
|
from datetime import datetime, timedelta, timezone
|
|
import jwt
|
|
import bcrypt
|
|
|
|
from src.core.security import (
|
|
AuthenticationManager,
|
|
AuthorizationManager,
|
|
SecurityManager,
|
|
UserRole,
|
|
User,
|
|
TokenData
|
|
)
|
|
from config.settings import settings
|
|
|
|
|
|
class TestAuthenticationManager:
|
|
"""Test cases for AuthenticationManager."""
|
|
|
|
def setup_method(self):
|
|
"""Set up test fixtures."""
|
|
self.auth_manager = AuthenticationManager(
|
|
secret_key="test-secret-key",
|
|
token_expire_minutes=60
|
|
)
|
|
|
|
def test_hash_password(self):
|
|
"""Test password hashing."""
|
|
password = "testpassword123"
|
|
hashed = self.auth_manager.hash_password(password)
|
|
|
|
# Verify the hash is valid
|
|
assert bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
|
|
|
|
# Verify different passwords produce different hashes
|
|
different_password = "differentpassword"
|
|
different_hashed = self.auth_manager.hash_password(different_password)
|
|
assert hashed != different_hashed
|
|
|
|
def test_verify_password(self):
|
|
"""Test password verification."""
|
|
password = "testpassword123"
|
|
hashed = self.auth_manager.hash_password(password)
|
|
|
|
# Test correct password
|
|
assert self.auth_manager.verify_password(password, hashed)
|
|
|
|
# Test incorrect password
|
|
assert not self.auth_manager.verify_password("wrongpassword", hashed)
|
|
|
|
def test_create_user(self):
|
|
"""Test user creation."""
|
|
user = self.auth_manager.create_user(
|
|
user_id="test_user_001",
|
|
username="testuser",
|
|
email="test@example.com",
|
|
role=UserRole.OPERATOR,
|
|
password="testpassword123"
|
|
)
|
|
|
|
assert user.user_id == "test_user_001"
|
|
assert user.username == "testuser"
|
|
assert user.email == "test@example.com"
|
|
assert user.role == UserRole.OPERATOR
|
|
assert user.active is True
|
|
assert user.created_at is not None
|
|
|
|
# Verify password was hashed and stored
|
|
assert "test_user_001" in self.auth_manager.password_hashes
|
|
hashed_password = self.auth_manager.password_hashes["test_user_001"]
|
|
assert bcrypt.checkpw("testpassword123".encode('utf-8'), hashed_password.encode('utf-8'))
|
|
|
|
def test_create_user_duplicate_id(self):
|
|
"""Test creating user with duplicate ID."""
|
|
self.auth_manager.create_user(
|
|
user_id="duplicate_user",
|
|
username="user1",
|
|
email="user1@example.com",
|
|
role=UserRole.OPERATOR,
|
|
password="password1"
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="User with ID duplicate_user already exists"):
|
|
self.auth_manager.create_user(
|
|
user_id="duplicate_user",
|
|
username="user2",
|
|
email="user2@example.com",
|
|
role=UserRole.ENGINEER,
|
|
password="password2"
|
|
)
|
|
|
|
def test_authenticate_user_success(self):
|
|
"""Test successful user authentication."""
|
|
# Create a test user
|
|
self.auth_manager.create_user(
|
|
user_id="auth_test_user",
|
|
username="authuser",
|
|
email="auth@example.com",
|
|
role=UserRole.ENGINEER,
|
|
password="authpassword123"
|
|
)
|
|
|
|
# Test authentication
|
|
user = self.auth_manager.authenticate_user("authuser", "authpassword123")
|
|
|
|
assert user is not None
|
|
assert user.user_id == "auth_test_user"
|
|
assert user.username == "authuser"
|
|
assert user.role == UserRole.ENGINEER
|
|
assert user.last_login is not None
|
|
|
|
def test_authenticate_user_wrong_password(self):
|
|
"""Test authentication with wrong password."""
|
|
self.auth_manager.create_user(
|
|
user_id="wrong_pass_user",
|
|
username="wrongpass",
|
|
email="wrong@example.com",
|
|
role=UserRole.OPERATOR,
|
|
password="correctpassword"
|
|
)
|
|
|
|
user = self.auth_manager.authenticate_user("wrongpass", "wrongpassword")
|
|
assert user is None
|
|
|
|
def test_authenticate_user_not_found(self):
|
|
"""Test authentication with non-existent user."""
|
|
user = self.auth_manager.authenticate_user("nonexistent", "password")
|
|
assert user is None
|
|
|
|
def test_authenticate_user_inactive(self):
|
|
"""Test authentication with inactive user."""
|
|
# Create inactive user
|
|
user = User(
|
|
user_id="inactive_user",
|
|
username="inactive",
|
|
email="inactive@example.com",
|
|
role=UserRole.OPERATOR,
|
|
active=False,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
self.auth_manager.users["inactive_user"] = user
|
|
self.auth_manager.password_hashes["inactive_user"] = self.auth_manager.hash_password("password")
|
|
|
|
result = self.auth_manager.authenticate_user("inactive", "password")
|
|
assert result is None
|
|
|
|
def test_create_access_token(self):
|
|
"""Test JWT token creation."""
|
|
user = User(
|
|
user_id="token_user",
|
|
username="tokenuser",
|
|
email="token@example.com",
|
|
role=UserRole.ADMINISTRATOR,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
|
|
token = self.auth_manager.create_access_token(user)
|
|
|
|
# Verify token can be decoded
|
|
payload = jwt.decode(token, "test-secret-key", algorithms=["HS256"])
|
|
assert payload["user_id"] == "token_user"
|
|
assert payload["username"] == "tokenuser"
|
|
assert payload["role"] == "administrator"
|
|
assert "exp" in payload
|
|
|
|
def test_verify_token_success(self):
|
|
"""Test successful token verification."""
|
|
# Create user and token
|
|
user = User(
|
|
user_id="verify_user",
|
|
username="verifyuser",
|
|
email="verify@example.com",
|
|
role=UserRole.OPERATOR,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
self.auth_manager.users["verify_user"] = user
|
|
|
|
token = self.auth_manager.create_access_token(user)
|
|
token_data = self.auth_manager.verify_token(token)
|
|
|
|
assert token_data is not None
|
|
assert token_data.user_id == "verify_user"
|
|
assert token_data.username == "verifyuser"
|
|
assert token_data.role == UserRole.OPERATOR
|
|
|
|
def test_verify_token_expired(self):
|
|
"""Test verification of expired token."""
|
|
# Create expired token
|
|
expired_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
token_data = TokenData(
|
|
user_id="expired_user",
|
|
username="expireduser",
|
|
role=UserRole.OPERATOR,
|
|
exp=expired_time
|
|
)
|
|
|
|
token = jwt.encode(
|
|
token_data.dict(),
|
|
"test-secret-key",
|
|
algorithm="HS256"
|
|
)
|
|
|
|
result = self.auth_manager.verify_token(token)
|
|
assert result is None
|
|
|
|
def test_verify_token_invalid_signature(self):
|
|
"""Test verification of token with invalid signature."""
|
|
# Create token with wrong secret
|
|
user = User(
|
|
user_id="invalid_user",
|
|
username="invaliduser",
|
|
email="invalid@example.com",
|
|
role=UserRole.OPERATOR,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
|
|
token = jwt.encode(
|
|
{
|
|
"user_id": "invalid_user",
|
|
"username": "invaliduser",
|
|
"role": "operator",
|
|
"exp": datetime.now(timezone.utc) + timedelta(minutes=60)
|
|
},
|
|
"wrong-secret-key",
|
|
algorithm="HS256"
|
|
)
|
|
|
|
result = self.auth_manager.verify_token(token)
|
|
assert result is None
|
|
|
|
|
|
class TestAuthorizationManager:
|
|
"""Test cases for AuthorizationManager."""
|
|
|
|
def setup_method(self):
|
|
"""Set up test fixtures."""
|
|
self.authz_manager = AuthorizationManager()
|
|
|
|
def test_has_permission_read_only(self):
|
|
"""Test permissions for read-only role."""
|
|
assert self.authz_manager.has_permission(UserRole.READ_ONLY, "read_pump_status")
|
|
assert self.authz_manager.has_permission(UserRole.READ_ONLY, "read_safety_status")
|
|
assert self.authz_manager.has_permission(UserRole.READ_ONLY, "read_audit_logs")
|
|
|
|
# Should not have write permissions
|
|
assert not self.authz_manager.has_permission(UserRole.READ_ONLY, "emergency_stop")
|
|
assert not self.authz_manager.has_permission(UserRole.READ_ONLY, "configure_safety_limits")
|
|
|
|
def test_has_permission_operator(self):
|
|
"""Test permissions for operator role."""
|
|
assert self.authz_manager.has_permission(UserRole.OPERATOR, "read_pump_status")
|
|
assert self.authz_manager.has_permission(UserRole.OPERATOR, "emergency_stop")
|
|
assert self.authz_manager.has_permission(UserRole.OPERATOR, "clear_emergency_stop")
|
|
|
|
# Should not have configuration permissions
|
|
assert not self.authz_manager.has_permission(UserRole.OPERATOR, "configure_safety_limits")
|
|
assert not self.authz_manager.has_permission(UserRole.OPERATOR, "manage_users")
|
|
|
|
def test_has_permission_engineer(self):
|
|
"""Test permissions for engineer role."""
|
|
assert self.authz_manager.has_permission(UserRole.ENGINEER, "read_pump_status")
|
|
assert self.authz_manager.has_permission(UserRole.ENGINEER, "emergency_stop")
|
|
assert self.authz_manager.has_permission(UserRole.ENGINEER, "configure_safety_limits")
|
|
assert self.authz_manager.has_permission(UserRole.ENGINEER, "manage_pump_configuration")
|
|
|
|
# Should not have administrative permissions
|
|
assert not self.authz_manager.has_permission(UserRole.ENGINEER, "manage_users")
|
|
|
|
def test_has_permission_administrator(self):
|
|
"""Test permissions for administrator role."""
|
|
# Administrator should have all permissions
|
|
all_permissions = [
|
|
"read_pump_status", "read_safety_status", "read_audit_logs",
|
|
"emergency_stop", "clear_emergency_stop", "view_alerts",
|
|
"configure_safety_limits", "manage_pump_configuration",
|
|
"view_system_metrics", "manage_users", "configure_system",
|
|
"access_all_stations"
|
|
]
|
|
|
|
for permission in all_permissions:
|
|
assert self.authz_manager.has_permission(UserRole.ADMINISTRATOR, permission)
|
|
|
|
def test_has_permission_unknown_permission(self):
|
|
"""Test unknown permission."""
|
|
assert not self.authz_manager.has_permission(UserRole.ADMINISTRATOR, "unknown_permission")
|
|
|
|
def test_get_allowed_actions(self):
|
|
"""Test getting all allowed actions for a role."""
|
|
operator_permissions = self.authz_manager.get_allowed_actions(UserRole.OPERATOR)
|
|
|
|
assert "read_pump_status" in operator_permissions
|
|
assert "emergency_stop" in operator_permissions
|
|
assert "clear_emergency_stop" in operator_permissions
|
|
assert "configure_safety_limits" not in operator_permissions
|
|
|
|
def test_can_access_station(self):
|
|
"""Test station access control."""
|
|
# Administrators can access all stations
|
|
assert self.authz_manager.can_access_station(UserRole.ADMINISTRATOR, "STATION_001")
|
|
|
|
# Other roles can access all stations (for now)
|
|
assert self.authz_manager.can_access_station(UserRole.OPERATOR, "STATION_001")
|
|
assert self.authz_manager.can_access_station(UserRole.ENGINEER, "STATION_001")
|
|
assert self.authz_manager.can_access_station(UserRole.READ_ONLY, "STATION_001")
|
|
|
|
|
|
class TestSecurityManager:
|
|
"""Test cases for SecurityManager."""
|
|
|
|
def setup_method(self):
|
|
"""Set up test fixtures."""
|
|
self.security_manager = SecurityManager()
|
|
|
|
def test_authenticate_success(self):
|
|
"""Test successful authentication."""
|
|
# Default users are created during initialization
|
|
token = self.security_manager.authenticate("operator", "operator123")
|
|
|
|
assert token is not None
|
|
|
|
# Verify token is valid
|
|
token_data = self.security_manager.verify_access_token(token)
|
|
assert token_data is not None
|
|
assert token_data.username == "operator"
|
|
assert token_data.role == UserRole.OPERATOR
|
|
|
|
def test_authenticate_failure(self):
|
|
"""Test failed authentication."""
|
|
token = self.security_manager.authenticate("nonexistent", "password")
|
|
assert token is None
|
|
|
|
def test_check_permission(self):
|
|
"""Test permission checking."""
|
|
# Create token for operator
|
|
token = self.security_manager.authenticate("operator", "operator123")
|
|
token_data = self.security_manager.verify_access_token(token)
|
|
|
|
# Operator should have emergency_stop permission
|
|
assert self.security_manager.check_permission(token_data, "emergency_stop")
|
|
|
|
# Operator should not have manage_users permission
|
|
assert not self.security_manager.check_permission(token_data, "manage_users")
|
|
|
|
def test_get_user_permissions(self):
|
|
"""Test getting user permissions."""
|
|
# Create token for engineer
|
|
token = self.security_manager.authenticate("engineer", "engineer123")
|
|
token_data = self.security_manager.verify_access_token(token)
|
|
|
|
permissions = self.security_manager.get_user_permissions(token_data)
|
|
|
|
# Engineer should have specific permissions
|
|
assert "configure_safety_limits" in permissions
|
|
assert "manage_pump_configuration" in permissions
|
|
assert "emergency_stop" in permissions
|
|
|
|
# Engineer should not have administrative permissions
|
|
assert "manage_users" not in permissions
|
|
|
|
def test_can_access_station(self):
|
|
"""Test station access control."""
|
|
# Create token for operator
|
|
token = self.security_manager.authenticate("operator", "operator123")
|
|
token_data = self.security_manager.verify_access_token(token)
|
|
|
|
# Operator should be able to access any station
|
|
assert self.security_manager.can_access_station(token_data, "STATION_001")
|
|
assert self.security_manager.can_access_station(token_data, "STATION_002") |