feat(02-04): Create integration tests for safety system
Some checks failed
Discord Webhook / git (push) Has been cancelled

This commit is contained in:
Mai Development
2026-01-27 16:05:46 -05:00
parent 26a77e612d
commit 543fe75150
2 changed files with 485 additions and 0 deletions

View File

@@ -0,0 +1,484 @@
"""Integration tests for safety system."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
import tempfile
import time
from unittest.mock import Mock, patch, MagicMock
from pathlib import Path
from src.safety.coordinator import SafetyCoordinator
from src.safety.api import SafetyAPI
from src.security.assessor import SecurityLevel
class TestSafetyIntegration:
"""Integration tests for complete safety system."""
@pytest.fixture
def temp_config_dir(self):
"""Create temporary config directory for tests."""
with tempfile.TemporaryDirectory() as temp_dir:
config_dir = Path(temp_dir)
yield config_dir
@pytest.fixture
def mock_coordinator(self):
"""Create safety coordinator with mocked components."""
with patch("src.safety.coordinator.SecurityAssessor") as mock_assessor, patch(
"src.safety.coordinator.SandboxExecutor"
) as mock_executor, patch(
"src.safety.coordinator.AuditLogger"
) as mock_logger, patch("src.safety.coordinator.psutil") as mock_psutil:
# Configure mock assessor
assessor_instance = Mock()
mock_assessor.return_value = assessor_instance
# Configure mock executor
executor_instance = Mock()
mock_executor.return_value = executor_instance
# Configure mock logger
logger_instance = Mock()
logger_instance.log_security_assessment.return_value = "hash1"
logger_instance.log_code_execution.return_value = "hash2"
logger_instance.log_security_event.return_value = "hash3"
mock_logger.return_value = logger_instance
# Configure mock psutil
mock_psutil.cpu_count.return_value = 4
mock_memory = Mock()
mock_memory.available = 8 * 1024**3 # 8GB
mock_psutil.virtual_memory.return_value = mock_memory
coordinator = SafetyCoordinator()
coordinator.security_assessor = assessor_instance
coordinator.sandbox_executor = executor_instance
coordinator.audit_logger = logger_instance
yield coordinator, assessor_instance, executor_instance, logger_instance
@pytest.fixture
def safety_api(self, mock_coordinator):
"""Create safety API with mocked coordinator."""
coordinator, _, _, _ = mock_coordinator
# Mock the coordinator constructor
with patch("src.safety.api.SafetyCoordinator", return_value=coordinator):
api = SafetyAPI()
api.coordinator = coordinator
yield api
def test_low_risk_code_executes_successfully(self, mock_coordinator):
"""Test that LOW risk code executes successfully."""
coordinator, mock_assessor, mock_executor, _ = mock_coordinator
# Setup LOW risk assessment
mock_assessor.assess.return_value = (
SecurityLevel.LOW,
{"security_score": 1, "recommendations": ["Code appears safe"]},
)
# Setup successful execution
mock_executor.execute_code.return_value = {
"success": True,
"exit_code": 0,
"stdout": "Hello, World!",
"stderr": "",
"execution_time": 0.5,
"resource_usage": {"cpu": 10, "memory": "50MB"},
}
# Execute code
result = coordinator.execute_code_safely("print('Hello, World!')")
# Verify result
assert result["success"] is True
assert result["security_level"] == "LOW"
assert result["blocked"] is False
assert result["override_used"] is False
assert result["execution_result"]["stdout"] == "Hello, World!"
# Verify components called correctly
mock_assessor.assess.assert_called_once_with("print('Hello, World!')")
mock_executor.execute_code.assert_called_once()
def test_medium_risk_executes_with_warnings(self, mock_coordinator):
"""Test that MEDIUM risk code executes with warnings."""
coordinator, mock_assessor, mock_executor, _ = mock_coordinator
# Setup MEDIUM risk assessment
mock_assessor.assess.return_value = (
SecurityLevel.MEDIUM,
{"security_score": 5, "recommendations": ["Review file operations"]},
)
# Setup successful execution
mock_executor.execute_code.return_value = {
"success": True,
"exit_code": 0,
"stdout": "File processed",
"stderr": "",
"execution_time": 1.2,
"resource_usage": {"cpu": 25, "memory": "100MB"},
}
# Execute code
result = coordinator.execute_code_safely("open('file.txt').read()")
# Verify result
assert result["success"] is True
assert result["security_level"] == "MEDIUM"
assert result["trust_level"] == "standard"
assert result["blocked"] is False
# Verify proper trust level was determined
mock_executor.execute_code.assert_called_once_with(
"open('file.txt').read()", trust_level="standard"
)
def test_high_risk_requires_user_confirmation(self, mock_coordinator):
"""Test that HIGH risk code can execute with user aware."""
coordinator, mock_assessor, mock_executor, _ = mock_coordinator
# Setup HIGH risk assessment
mock_assessor.assess.return_value = (
SecurityLevel.HIGH,
{"security_score": 8, "recommendations": ["Avoid system calls"]},
)
# Setup successful execution
mock_executor.execute_code.return_value = {
"success": True,
"exit_code": 0,
"stdout": "System info retrieved",
"stderr": "",
"execution_time": 2.0,
"resource_usage": {"cpu": 40, "memory": "200MB"},
}
# Execute code
result = coordinator.execute_code_safely("import os; os.system('uname')")
# Verify result
assert result["success"] is True
assert result["security_level"] == "HIGH"
assert result["trust_level"] == "untrusted"
assert result["blocked"] is False
# Verify untrusted trust level for HIGH risk
mock_executor.execute_code.assert_called_once_with(
"import os; os.system('uname')", trust_level="untrusted"
)
def test_blocked_code_blocked_without_override(self, mock_coordinator):
"""Test that BLOCKED code is blocked without override."""
coordinator, mock_assessor, mock_executor, _ = mock_coordinator
# Setup BLOCKED assessment
mock_assessor.assess.return_value = (
SecurityLevel.BLOCKED,
{"security_score": 15, "recommendations": ["Remove dangerous functions"]},
)
# Execute code without override
result = coordinator.execute_code_safely(
'eval(\'__import__("os").system("rm -rf /")\')'
)
# Verify result
assert result["success"] is False
assert result["blocked"] is True
assert result["security_level"] == "BLOCKED"
assert result["override_used"] is False
assert "blocked by security assessment" in result["reason"]
# Verify executor was not called
mock_executor.execute_code.assert_not_called()
# Verify security event logged
coordinator.audit_logger.log_security_event.assert_called()
call_args = coordinator.audit_logger.log_security_event.call_args
assert call_args[1]["event_type"] == "execution_blocked"
def test_blocked_code_executes_with_user_override(self, mock_coordinator):
"""Test that BLOCKED code executes with user override."""
coordinator, mock_assessor, mock_executor, _ = mock_coordinator
# Setup BLOCKED assessment
mock_assessor.assess.return_value = (
SecurityLevel.BLOCKED,
{"security_score": 15, "recommendations": ["Remove dangerous functions"]},
)
# Setup successful execution for override case
mock_executor.execute_code.return_value = {
"success": True,
"exit_code": 0,
"stdout": "Command executed",
"stderr": "",
"execution_time": 1.5,
"resource_usage": {"cpu": 30, "memory": "150MB"},
}
# Execute code with override
result = coordinator.execute_code_safely(
'eval(\'__import__("os").system("echo test")\')',
user_override=True,
user_explanation="Testing eval functionality for educational purposes",
)
# Verify result
assert result["success"] is True
assert result["security_level"] == "BLOCKED"
assert result["blocked"] is False
assert result["override_used"] is True
assert result["trust_level"] == "untrusted"
# Verify executor was called with untrusted level
mock_executor.execute_code.assert_called_once_with(
'eval(\'__import__("os").system("echo test")\')', trust_level="untrusted"
)
# Verify override logged
coordinator.audit_logger.log_security_event.assert_called()
call_args = coordinator.audit_logger.log_security_event.call_args
assert call_args[1]["event_type"] == "user_override"
def test_resource_limits_adapt_to_code_complexity(self, mock_coordinator):
"""Test that resource limits adapt based on code complexity."""
coordinator, mock_assessor, mock_executor, _ = mock_coordinator
# Setup LOW risk assessment
mock_assessor.assess.return_value = (
SecurityLevel.LOW,
{"security_score": 1, "recommendations": []},
)
# Setup successful execution
mock_executor.execute_code.return_value = {
"success": True,
"exit_code": 0,
"stdout": "success",
"stderr": "",
"execution_time": 0.5,
"resource_usage": {"cpu": 10, "memory": "50MB"},
}
# Simple code should get lower limits
simple_code = "print('hello')"
result = coordinator.execute_code_safely(simple_code)
simple_limits = result["resource_limits"]
# Complex code should get higher limits
complex_code = """
def complex_function(data):
result = []
for item in data:
if item % 2 == 0:
result.append(item * 2)
else:
result.append(item + 1)
return result
# Test the function
test_data = list(range(100))
output = complex_function(test_data)
print(f"Processed {len(output)} items")
"""
result = coordinator.execute_code_safely(complex_code)
complex_limits = result["resource_limits"]
# Complex code should have higher complexity score
assert complex_limits["complexity_score"] > simple_limits["complexity_score"]
assert complex_limits["code_lines"] > simple_limits["code_lines"]
def test_audit_logs_created_for_all_operations(self, mock_coordinator):
"""Test that audit logs are created for all safety operations."""
coordinator, mock_assessor, mock_executor, mock_logger = mock_coordinator
# Setup assessment
mock_assessor.assess.return_value = (
SecurityLevel.MEDIUM,
{"security_score": 5, "recommendations": []},
)
# Setup execution
mock_executor.execute_code.return_value = {
"success": True,
"exit_code": 0,
"stdout": "success",
"stderr": "",
"execution_time": 0.5,
"resource_usage": {},
}
# Execute code
coordinator.execute_code_safely("print('test')")
# Verify all audit methods were called
mock_logger.log_security_assessment.assert_called_once()
mock_logger.log_code_execution.assert_called_once()
# Check log call arguments
assessment_call = mock_logger.log_security_assessment.call_args[1]
assert "assessment" in assessment_call
assert "code_snippet" in assessment_call
execution_call = mock_logger.log_code_execution.call_args[1]
assert "code" in execution_call
assert "result" in execution_call
assert "security_level" in execution_call
@patch("src.audit.crypto_logger.TamperProofLogger")
def test_hash_chain_tampering_detected(
self, mock_crypto_logger_class, temp_config_dir
):
"""Test that hash chain tampering is detected."""
# Setup mock logger to simulate tampering
mock_crypto_logger = Mock()
mock_crypto_logger.verify_chain.return_value = {
"valid": False,
"tampered_entry": {"hash": "invalid_hash"},
"error": "Hash chain integrity compromised",
}
mock_crypto_logger_class.return_value = mock_crypto_logger
# Create coordinator with tampered logger
coordinator = SafetyCoordinator(str(temp_config_dir))
coordinator.audit_logger.crypto_logger = mock_crypto_logger
# Get security status
status = coordinator.get_security_status()
# Verify tampering is detected
assert status["audit_integrity"]["valid"] is False
assert "tampered_entry" in status["audit_integrity"]
def test_api_assess_and_execute_interface(self, safety_api):
"""Test API assess_and_execute interface."""
api = safety_api
coordinator = api.coordinator
# Mock coordinator response
coordinator.execute_code_safely = Mock(
return_value={
"success": True,
"execution_id": "test123",
"security_level": "LOW",
"security_findings": {"security_score": 1},
"execution_result": {"stdout": "success"},
"resource_limits": {"cpu_count": 2},
"trust_level": "trusted",
"override_used": False,
"blocked": False,
}
)
# Test API call
result = api.assess_and_execute("print('test')")
# Verify formatted response
assert "request_id" in result
assert result["success"] is True
assert result["security"]["level"] == "LOW"
assert result["execution"]["stdout"] == "success"
assert "timestamp" in result
def test_api_input_validation(self, safety_api):
"""Test API input validation."""
api = safety_api
# Test empty code
with pytest.raises(ValueError, match="Code cannot be empty"):
api.assess_and_execute("")
# Test override without explanation
with pytest.raises(ValueError, match="User override requires explanation"):
api.assess_and_execute("print('test')", user_override=True)
# Test code size limit
large_code = "x" * 100001 # Over 100KB
with pytest.raises(ValueError, match="Code too large"):
api.assess_and_execute(large_code)
def test_api_get_execution_history(self, safety_api):
"""Test API execution history retrieval."""
api = safety_api
coordinator = api.coordinator
# Mock coordinator response
coordinator.get_execution_history = Mock(
return_value={
"summary": {"code_executions": 5},
"recent_executions": 5,
"security_assessments": 10,
"resource_violations": 0,
}
)
# Test API call
result = api.get_execution_history(limit=10)
# Verify formatted response
assert result["request"]["limit"] == 10
assert result["response"]["recent_executions"] == 5
assert "retrieved_at" in result
def test_api_get_security_status(self, safety_api):
"""Test API security status retrieval."""
api = safety_api
coordinator = api.coordinator
# Mock coordinator response
coordinator.get_security_status = Mock(
return_value={
"security_assessor": "active",
"sandbox_executor": "active",
"audit_logger": "active",
"system_resources": {"cpu_count": 4},
"audit_integrity": {"valid": True},
}
)
# Test API call
result = api.get_security_status()
# Verify formatted response
assert result["system_status"] == "operational"
assert result["components"]["security_assessor"] == "active"
assert "status_checked_at" in result
def test_api_configure_policies_validation(self, safety_api):
"""Test API policy configuration validation."""
api = safety_api
# Test invalid policy format
with pytest.raises(ValueError, match="Policies must be a dictionary"):
api.configure_policies("invalid")
# Test invalid security policies
invalid_security = {
"security": {
"blocked_patterns": [],
"thresholds": {"blocked_score": "invalid"}, # Should be number
}
}
result = api.configure_policies(invalid_security)
assert "validation_errors" in result["response"]
assert "security" in result["response"]["failed_updates"]
# Test valid policies
valid_policies = {
"security": {
"blocked_patterns": ["os.system"],
"high_triggers": ["admin"],
"thresholds": {"blocked_score": 10, "high_score": 7, "medium_score": 4},
}
}
result = api.configure_policies(valid_policies)
assert "security" in result["response"]["updated_policies"]