""" Unit tests for security layer components. """ import pytest from unittest.mock import Mock, patch from datetime import datetime, timedelta, timezone import jwt import bcrypt from src.core.security import ( AuthenticationManager, AuthorizationManager, SecurityManager, UserRole, User, TokenData ) from config.settings import settings class TestAuthenticationManager: """Test cases for AuthenticationManager.""" def setup_method(self): """Set up test fixtures.""" self.auth_manager = AuthenticationManager( secret_key="test-secret-key", token_expire_minutes=60 ) def test_hash_password(self): """Test password hashing.""" password = "testpassword123" hashed = self.auth_manager.hash_password(password) # Verify the hash is valid assert bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) # Verify different passwords produce different hashes different_password = "differentpassword" different_hashed = self.auth_manager.hash_password(different_password) assert hashed != different_hashed def test_verify_password(self): """Test password verification.""" password = "testpassword123" hashed = self.auth_manager.hash_password(password) # Test correct password assert self.auth_manager.verify_password(password, hashed) # Test incorrect password assert not self.auth_manager.verify_password("wrongpassword", hashed) def test_create_user(self): """Test user creation.""" user = self.auth_manager.create_user( user_id="test_user_001", username="testuser", email="test@example.com", role=UserRole.OPERATOR, password="testpassword123" ) assert user.user_id == "test_user_001" assert user.username == "testuser" assert user.email == "test@example.com" assert user.role == UserRole.OPERATOR assert user.active is True assert user.created_at is not None # Verify password was hashed and stored assert "test_user_001" in self.auth_manager.password_hashes hashed_password = self.auth_manager.password_hashes["test_user_001"] assert bcrypt.checkpw("testpassword123".encode('utf-8'), hashed_password.encode('utf-8')) def test_create_user_duplicate_id(self): """Test creating user with duplicate ID.""" self.auth_manager.create_user( user_id="duplicate_user", username="user1", email="user1@example.com", role=UserRole.OPERATOR, password="password1" ) with pytest.raises(ValueError, match="User with ID duplicate_user already exists"): self.auth_manager.create_user( user_id="duplicate_user", username="user2", email="user2@example.com", role=UserRole.ENGINEER, password="password2" ) def test_authenticate_user_success(self): """Test successful user authentication.""" # Create a test user self.auth_manager.create_user( user_id="auth_test_user", username="authuser", email="auth@example.com", role=UserRole.ENGINEER, password="authpassword123" ) # Test authentication user = self.auth_manager.authenticate_user("authuser", "authpassword123") assert user is not None assert user.user_id == "auth_test_user" assert user.username == "authuser" assert user.role == UserRole.ENGINEER assert user.last_login is not None def test_authenticate_user_wrong_password(self): """Test authentication with wrong password.""" self.auth_manager.create_user( user_id="wrong_pass_user", username="wrongpass", email="wrong@example.com", role=UserRole.OPERATOR, password="correctpassword" ) user = self.auth_manager.authenticate_user("wrongpass", "wrongpassword") assert user is None def test_authenticate_user_not_found(self): """Test authentication with non-existent user.""" user = self.auth_manager.authenticate_user("nonexistent", "password") assert user is None def test_authenticate_user_inactive(self): """Test authentication with inactive user.""" # Create inactive user user = User( user_id="inactive_user", username="inactive", email="inactive@example.com", role=UserRole.OPERATOR, active=False, created_at=datetime.now(timezone.utc) ) self.auth_manager.users["inactive_user"] = user self.auth_manager.password_hashes["inactive_user"] = self.auth_manager.hash_password("password") result = self.auth_manager.authenticate_user("inactive", "password") assert result is None def test_create_access_token(self): """Test JWT token creation.""" user = User( user_id="token_user", username="tokenuser", email="token@example.com", role=UserRole.ADMINISTRATOR, created_at=datetime.now(timezone.utc) ) token = self.auth_manager.create_access_token(user) # Verify token can be decoded payload = jwt.decode(token, "test-secret-key", algorithms=["HS256"]) assert payload["user_id"] == "token_user" assert payload["username"] == "tokenuser" assert payload["role"] == "administrator" assert "exp" in payload def test_verify_token_success(self): """Test successful token verification.""" # Create user and token user = User( user_id="verify_user", username="verifyuser", email="verify@example.com", role=UserRole.OPERATOR, created_at=datetime.now(timezone.utc) ) self.auth_manager.users["verify_user"] = user token = self.auth_manager.create_access_token(user) token_data = self.auth_manager.verify_token(token) assert token_data is not None assert token_data.user_id == "verify_user" assert token_data.username == "verifyuser" assert token_data.role == UserRole.OPERATOR def test_verify_token_expired(self): """Test verification of expired token.""" # Create expired token expired_time = datetime.now(timezone.utc) - timedelta(hours=1) token_data = TokenData( user_id="expired_user", username="expireduser", role=UserRole.OPERATOR, exp=expired_time ) token = jwt.encode( token_data.dict(), "test-secret-key", algorithm="HS256" ) result = self.auth_manager.verify_token(token) assert result is None def test_verify_token_invalid_signature(self): """Test verification of token with invalid signature.""" # Create token with wrong secret user = User( user_id="invalid_user", username="invaliduser", email="invalid@example.com", role=UserRole.OPERATOR, created_at=datetime.now(timezone.utc) ) token = jwt.encode( { "user_id": "invalid_user", "username": "invaliduser", "role": "operator", "exp": datetime.now(timezone.utc) + timedelta(minutes=60) }, "wrong-secret-key", algorithm="HS256" ) result = self.auth_manager.verify_token(token) assert result is None class TestAuthorizationManager: """Test cases for AuthorizationManager.""" def setup_method(self): """Set up test fixtures.""" self.authz_manager = AuthorizationManager() def test_has_permission_read_only(self): """Test permissions for read-only role.""" assert self.authz_manager.has_permission(UserRole.READ_ONLY, "read_pump_status") assert self.authz_manager.has_permission(UserRole.READ_ONLY, "read_safety_status") assert self.authz_manager.has_permission(UserRole.READ_ONLY, "read_audit_logs") # Should not have write permissions assert not self.authz_manager.has_permission(UserRole.READ_ONLY, "emergency_stop") assert not self.authz_manager.has_permission(UserRole.READ_ONLY, "configure_safety_limits") def test_has_permission_operator(self): """Test permissions for operator role.""" assert self.authz_manager.has_permission(UserRole.OPERATOR, "read_pump_status") assert self.authz_manager.has_permission(UserRole.OPERATOR, "emergency_stop") assert self.authz_manager.has_permission(UserRole.OPERATOR, "clear_emergency_stop") # Should not have configuration permissions assert not self.authz_manager.has_permission(UserRole.OPERATOR, "configure_safety_limits") assert not self.authz_manager.has_permission(UserRole.OPERATOR, "manage_users") def test_has_permission_engineer(self): """Test permissions for engineer role.""" assert self.authz_manager.has_permission(UserRole.ENGINEER, "read_pump_status") assert self.authz_manager.has_permission(UserRole.ENGINEER, "emergency_stop") assert self.authz_manager.has_permission(UserRole.ENGINEER, "configure_safety_limits") assert self.authz_manager.has_permission(UserRole.ENGINEER, "manage_pump_configuration") # Should not have administrative permissions assert not self.authz_manager.has_permission(UserRole.ENGINEER, "manage_users") def test_has_permission_administrator(self): """Test permissions for administrator role.""" # Administrator should have all permissions all_permissions = [ "read_pump_status", "read_safety_status", "read_audit_logs", "emergency_stop", "clear_emergency_stop", "view_alerts", "configure_safety_limits", "manage_pump_configuration", "view_system_metrics", "manage_users", "configure_system", "access_all_stations" ] for permission in all_permissions: assert self.authz_manager.has_permission(UserRole.ADMINISTRATOR, permission) def test_has_permission_unknown_permission(self): """Test unknown permission.""" assert not self.authz_manager.has_permission(UserRole.ADMINISTRATOR, "unknown_permission") def test_get_allowed_actions(self): """Test getting all allowed actions for a role.""" operator_permissions = self.authz_manager.get_allowed_actions(UserRole.OPERATOR) assert "read_pump_status" in operator_permissions assert "emergency_stop" in operator_permissions assert "clear_emergency_stop" in operator_permissions assert "configure_safety_limits" not in operator_permissions def test_can_access_station(self): """Test station access control.""" # Administrators can access all stations assert self.authz_manager.can_access_station(UserRole.ADMINISTRATOR, "STATION_001") # Other roles can access all stations (for now) assert self.authz_manager.can_access_station(UserRole.OPERATOR, "STATION_001") assert self.authz_manager.can_access_station(UserRole.ENGINEER, "STATION_001") assert self.authz_manager.can_access_station(UserRole.READ_ONLY, "STATION_001") class TestSecurityManager: """Test cases for SecurityManager.""" def setup_method(self): """Set up test fixtures.""" self.security_manager = SecurityManager() def test_authenticate_success(self): """Test successful authentication.""" # Default users are created during initialization token = self.security_manager.authenticate("operator", "operator123") assert token is not None # Verify token is valid token_data = self.security_manager.verify_access_token(token) assert token_data is not None assert token_data.username == "operator" assert token_data.role == UserRole.OPERATOR def test_authenticate_failure(self): """Test failed authentication.""" token = self.security_manager.authenticate("nonexistent", "password") assert token is None def test_check_permission(self): """Test permission checking.""" # Create token for operator token = self.security_manager.authenticate("operator", "operator123") token_data = self.security_manager.verify_access_token(token) # Operator should have emergency_stop permission assert self.security_manager.check_permission(token_data, "emergency_stop") # Operator should not have manage_users permission assert not self.security_manager.check_permission(token_data, "manage_users") def test_get_user_permissions(self): """Test getting user permissions.""" # Create token for engineer token = self.security_manager.authenticate("engineer", "engineer123") token_data = self.security_manager.verify_access_token(token) permissions = self.security_manager.get_user_permissions(token_data) # Engineer should have specific permissions assert "configure_safety_limits" in permissions assert "manage_pump_configuration" in permissions assert "emergency_stop" in permissions # Engineer should not have administrative permissions assert "manage_users" not in permissions def test_can_access_station(self): """Test station access control.""" # Create token for operator token = self.security_manager.authenticate("operator", "operator123") token_data = self.security_manager.verify_access_token(token) # Operator should be able to access any station assert self.security_manager.can_access_station(token_data, "STATION_001") assert self.security_manager.can_access_station(token_data, "STATION_002")