Complete fresh slate
This commit is contained in:
@@ -1,378 +0,0 @@
|
||||
"""
|
||||
Tests for Docker Executor component
|
||||
|
||||
Test suite for Docker-based container execution with isolation,
|
||||
resource limits, and audit logging integration.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from pathlib import Path
|
||||
|
||||
# Import components under test
|
||||
from src.mai.sandbox.docker_executor import DockerExecutor, ContainerConfig, ContainerResult
|
||||
from src.mai.sandbox.audit_logger import AuditLogger
|
||||
|
||||
|
||||
class TestContainerConfig:
|
||||
"""Test ContainerConfig dataclass"""
|
||||
|
||||
def test_default_config(self):
|
||||
"""Test default configuration values"""
|
||||
config = ContainerConfig()
|
||||
assert config.image == "python:3.10-slim"
|
||||
assert config.timeout_seconds == 30
|
||||
assert config.memory_limit == "128m"
|
||||
assert config.cpu_limit == "0.5"
|
||||
assert config.network_disabled is True
|
||||
assert config.read_only_filesystem is True
|
||||
assert config.tmpfs_size == "64m"
|
||||
assert config.working_dir == "/app"
|
||||
assert config.user == "nobody"
|
||||
|
||||
def test_custom_config(self):
|
||||
"""Test custom configuration values"""
|
||||
config = ContainerConfig(
|
||||
image="python:3.9-alpine",
|
||||
timeout_seconds=60,
|
||||
memory_limit="256m",
|
||||
cpu_limit="0.8",
|
||||
network_disabled=False,
|
||||
)
|
||||
assert config.image == "python:3.9-alpine"
|
||||
assert config.timeout_seconds == 60
|
||||
assert config.memory_limit == "256m"
|
||||
assert config.cpu_limit == "0.8"
|
||||
assert config.network_disabled is False
|
||||
|
||||
|
||||
class TestDockerExecutor:
|
||||
"""Test DockerExecutor class"""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_audit_logger(self):
|
||||
"""Create mock audit logger"""
|
||||
return Mock(spec=AuditLogger)
|
||||
|
||||
@pytest.fixture
|
||||
def docker_executor(self, mock_audit_logger):
|
||||
"""Create DockerExecutor instance for testing"""
|
||||
return DockerExecutor(audit_logger=mock_audit_logger)
|
||||
|
||||
def test_init_without_docker(self, mock_audit_logger):
|
||||
"""Test initialization when Docker is not available"""
|
||||
with patch("src.mai.sandbox.docker_executor.DOCKER_AVAILABLE", False):
|
||||
executor = DockerExecutor(audit_logger=mock_audit_logger)
|
||||
assert executor.is_available() is False
|
||||
assert executor.client is None
|
||||
|
||||
def test_init_with_docker_error(self, mock_audit_logger):
|
||||
"""Test initialization when Docker fails to connect"""
|
||||
with patch("src.mai.sandbox.docker_executor.DOCKER_AVAILABLE", True):
|
||||
with patch("docker.from_env") as mock_from_env:
|
||||
mock_from_env.side_effect = Exception("Docker daemon not running")
|
||||
|
||||
executor = DockerExecutor(audit_logger=mock_audit_logger)
|
||||
assert executor.is_available() is False
|
||||
assert executor.client is None
|
||||
|
||||
def test_is_available(self, docker_executor):
|
||||
"""Test is_available method"""
|
||||
# When client is None, should not be available
|
||||
docker_executor.client = None
|
||||
docker_executor.available = False
|
||||
assert docker_executor.is_available() is False
|
||||
|
||||
# When client is available, should reflect available status
|
||||
docker_executor.client = Mock()
|
||||
docker_executor.available = True
|
||||
assert docker_executor.is_available() is True
|
||||
|
||||
docker_executor.client = Mock()
|
||||
docker_executor.available = False
|
||||
assert docker_executor.is_available() is False
|
||||
|
||||
def test_execute_code_unavailable(self, docker_executor):
|
||||
"""Test execute_code when Docker is not available"""
|
||||
with patch.object(docker_executor, "is_available", return_value=False):
|
||||
result = docker_executor.execute_code("print('test')")
|
||||
|
||||
assert result.success is False
|
||||
assert result.container_id == ""
|
||||
assert result.exit_code == -1
|
||||
assert "Docker executor not available" in result.error
|
||||
|
||||
@patch("src.mai.sandbox.docker_executor.Path")
|
||||
@patch("src.mai.sandbox.docker_executor.tempfile.TemporaryDirectory")
|
||||
def test_execute_code_success(self, mock_temp_dir, mock_path, docker_executor):
|
||||
"""Test successful code execution in container"""
|
||||
# Mock temporary directory and file creation
|
||||
mock_temp_file = Mock()
|
||||
mock_temp_file.write_text = Mock()
|
||||
|
||||
mock_temp_path = Mock()
|
||||
mock_temp_path.__truediv__ = Mock(return_value=mock_temp_file)
|
||||
mock_temp_path.__str__ = Mock(return_value="/tmp/test")
|
||||
|
||||
mock_temp_dir.return_value.__enter__.return_value = mock_temp_path
|
||||
|
||||
# Mock Docker client and container
|
||||
mock_container = Mock()
|
||||
mock_container.id = "test-container-id"
|
||||
mock_container.wait.return_value = {"StatusCode": 0}
|
||||
mock_container.logs.return_value = b"test output"
|
||||
mock_container.stats.return_value = {
|
||||
"cpu_stats": {"cpu_usage": {"total_usage": 1000000}, "system_cpu_usage": 2000000},
|
||||
"precpu_stats": {"cpu_usage": {"total_usage": 500000}, "system_cpu_usage": 1000000},
|
||||
"memory_stats": {"usage": 50000000, "limit": 100000000},
|
||||
}
|
||||
|
||||
mock_client = Mock()
|
||||
mock_client.containers.run.return_value = mock_container
|
||||
|
||||
docker_executor.client = mock_client
|
||||
docker_executor.available = True
|
||||
|
||||
# Execute code
|
||||
result = docker_executor.execute_code("print('test')")
|
||||
|
||||
assert result.success is True
|
||||
assert result.container_id == "test-container-id"
|
||||
assert result.exit_code == 0
|
||||
assert result.stdout == "test output"
|
||||
assert result.execution_time > 0
|
||||
assert result.resource_usage is not None
|
||||
|
||||
@patch("src.mai.sandbox.docker_executor.Path")
|
||||
@patch("src.mai.sandbox.docker_executor.tempfile.TemporaryDirectory")
|
||||
def test_execute_code_with_files(self, mock_temp_dir, mock_path, docker_executor):
|
||||
"""Test code execution with additional files"""
|
||||
# Mock temporary directory and file creation
|
||||
mock_temp_file = Mock()
|
||||
mock_temp_file.write_text = Mock()
|
||||
|
||||
mock_temp_path = Mock()
|
||||
mock_temp_path.__truediv__ = Mock(return_value=mock_temp_file)
|
||||
mock_temp_path.__str__ = Mock(return_value="/tmp/test")
|
||||
|
||||
mock_temp_dir.return_value.__enter__.return_value = mock_temp_path
|
||||
|
||||
# Mock Docker client and container
|
||||
mock_container = Mock()
|
||||
mock_container.id = "test-container-id"
|
||||
mock_container.wait.return_value = {"StatusCode": 0}
|
||||
mock_container.logs.return_value = b"test output"
|
||||
mock_container.stats.return_value = {}
|
||||
|
||||
mock_client = Mock()
|
||||
mock_client.containers.run.return_value = mock_container
|
||||
|
||||
docker_executor.client = mock_client
|
||||
docker_executor.available = True
|
||||
|
||||
# Execute code with files
|
||||
files = {"data.txt": "test data"}
|
||||
result = docker_executor.execute_code("print('test')", files=files)
|
||||
|
||||
# Verify additional files were handled
|
||||
assert mock_temp_file.write_text.call_count >= 2 # code + data file
|
||||
assert result.success is True
|
||||
|
||||
def test_build_container_config(self, docker_executor):
|
||||
"""Test building Docker container configuration"""
|
||||
config = ContainerConfig(memory_limit="256m", cpu_limit="0.8", network_disabled=False)
|
||||
environment = {"TEST_VAR": "test_value"}
|
||||
|
||||
container_config = docker_executor._build_container_config(config, environment)
|
||||
|
||||
assert container_config["mem_limit"] == "256m"
|
||||
assert container_config["cpu_quota"] == 80000 # 0.8 * 100000
|
||||
assert container_config["cpu_period"] == 100000
|
||||
assert container_config["network_disabled"] is False
|
||||
assert container_config["read_only"] is True
|
||||
assert container_config["user"] == "nobody"
|
||||
assert container_config["working_dir"] == "/app"
|
||||
assert "TEST_VAR" in container_config["environment"]
|
||||
assert "security_opt" in container_config
|
||||
assert "cap_drop" in container_config
|
||||
assert "cap_add" in container_config
|
||||
|
||||
def test_get_container_stats(self, docker_executor):
|
||||
"""Test extracting container resource statistics"""
|
||||
mock_container = Mock()
|
||||
mock_container.stats.return_value = {
|
||||
"cpu_stats": {
|
||||
"cpu_usage": {"total_usage": 2000000},
|
||||
"system_cpu_usage": 4000000,
|
||||
"online_cpus": 2,
|
||||
},
|
||||
"precpu_stats": {"cpu_usage": {"total_usage": 1000000}, "system_cpu_usage": 2000000},
|
||||
"memory_stats": {
|
||||
"usage": 67108864, # 64MB
|
||||
"limit": 134217728, # 128MB
|
||||
},
|
||||
}
|
||||
|
||||
stats = docker_executor._get_container_stats(mock_container)
|
||||
|
||||
assert stats["cpu_percent"] == 100.0 # (2000000-1000000)/(4000000-2000000) * 2 * 100
|
||||
assert stats["memory_usage_bytes"] == 67108864
|
||||
assert stats["memory_limit_bytes"] == 134217728
|
||||
assert stats["memory_percent"] == 50.0
|
||||
assert stats["memory_usage_mb"] == 64.0
|
||||
|
||||
def test_get_container_stats_error(self, docker_executor):
|
||||
"""Test get_container_stats with error"""
|
||||
mock_container = Mock()
|
||||
mock_container.stats.side_effect = Exception("Stats error")
|
||||
|
||||
stats = docker_executor._get_container_stats(mock_container)
|
||||
|
||||
assert stats["cpu_percent"] == 0.0
|
||||
assert stats["memory_usage_bytes"] == 0
|
||||
assert stats["memory_percent"] == 0.0
|
||||
assert stats["memory_usage_mb"] == 0.0
|
||||
|
||||
def test_log_container_execution(self, docker_executor, mock_audit_logger):
|
||||
"""Test logging container execution"""
|
||||
config = ContainerConfig(image="python:3.10-slim")
|
||||
result = ContainerResult(
|
||||
success=True,
|
||||
container_id="test-id",
|
||||
exit_code=0,
|
||||
stdout="test output",
|
||||
stderr="",
|
||||
execution_time=1.5,
|
||||
resource_usage={"cpu_percent": 50.0},
|
||||
)
|
||||
|
||||
docker_executor._log_container_execution("print('test')", result, config)
|
||||
|
||||
# Verify audit logger was called
|
||||
mock_audit_logger.log_execution.assert_called_once()
|
||||
call_args = mock_audit_logger.log_execution.call_args
|
||||
assert call_args.kwargs["code"] == "print('test')"
|
||||
assert call_args.kwargs["execution_type"] == "docker"
|
||||
assert "docker_container" in call_args.kwargs["execution_result"]["type"]
|
||||
|
||||
def test_get_available_images(self, docker_executor):
|
||||
"""Test getting available Docker images"""
|
||||
mock_image = Mock()
|
||||
mock_image.tags = ["python:3.10-slim", "python:3.9-alpine"]
|
||||
|
||||
mock_client = Mock()
|
||||
mock_client.images.list.return_value = [mock_image]
|
||||
|
||||
docker_executor.client = mock_client
|
||||
docker_executor.available = True
|
||||
|
||||
images = docker_executor.get_available_images()
|
||||
|
||||
assert "python:3.10-slim" in images
|
||||
assert "python:3.9-alpine" in images
|
||||
|
||||
def test_pull_image(self, docker_executor):
|
||||
"""Test pulling Docker image"""
|
||||
mock_client = Mock()
|
||||
mock_client.images.pull.return_value = None
|
||||
|
||||
docker_executor.client = mock_client
|
||||
docker_executor.available = True
|
||||
|
||||
result = docker_executor.pull_image("python:3.10-slim")
|
||||
|
||||
assert result is True
|
||||
mock_client.images.pull.assert_called_once_with("python:3.10-slim")
|
||||
|
||||
def test_cleanup_containers(self, docker_executor):
|
||||
"""Test cleaning up containers"""
|
||||
mock_container = Mock()
|
||||
|
||||
mock_client = Mock()
|
||||
mock_client.containers.list.return_value = [mock_container, mock_container]
|
||||
|
||||
docker_executor.client = mock_client
|
||||
docker_executor.available = True
|
||||
|
||||
count = docker_executor.cleanup_containers()
|
||||
|
||||
assert count == 2
|
||||
assert mock_container.remove.call_count == 2
|
||||
|
||||
def test_get_system_info(self, docker_executor):
|
||||
"""Test getting Docker system information"""
|
||||
mock_client = Mock()
|
||||
mock_client.info.return_value = {
|
||||
"Containers": 5,
|
||||
"ContainersRunning": 2,
|
||||
"Images": 10,
|
||||
"MemTotal": 8589934592,
|
||||
"NCPU": 4,
|
||||
}
|
||||
mock_client.version.return_value = {"Version": "20.10.7", "ApiVersion": "1.41"}
|
||||
|
||||
docker_executor.client = mock_client
|
||||
docker_executor.available = True
|
||||
|
||||
info = docker_executor.get_system_info()
|
||||
|
||||
assert info["available"] is True
|
||||
assert info["version"] == "20.10.7"
|
||||
assert info["api_version"] == "1.41"
|
||||
assert info["containers"] == 5
|
||||
assert info["images"] == 10
|
||||
|
||||
|
||||
class TestDockerExecutorIntegration:
|
||||
"""Integration tests for Docker executor with other sandbox components"""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_audit_logger(self):
|
||||
"""Create mock audit logger"""
|
||||
return Mock(spec=AuditLogger)
|
||||
|
||||
def test_docker_executor_integration(self, mock_audit_logger):
|
||||
"""Test Docker executor integration with audit logger"""
|
||||
executor = DockerExecutor(audit_logger=mock_audit_logger)
|
||||
|
||||
# Test that audit logger is properly integrated
|
||||
assert executor.audit_logger is mock_audit_logger
|
||||
|
||||
# Mock Docker availability for integration test
|
||||
with patch.object(executor, "is_available", return_value=False):
|
||||
result = executor.execute_code("print('test')")
|
||||
|
||||
# Should fail gracefully and still attempt logging
|
||||
assert result.success is False
|
||||
|
||||
def test_container_result_serialization(self):
|
||||
"""Test ContainerResult can be properly serialized"""
|
||||
result = ContainerResult(
|
||||
success=True,
|
||||
container_id="test-id",
|
||||
exit_code=0,
|
||||
stdout="test output",
|
||||
stderr="",
|
||||
execution_time=1.5,
|
||||
resource_usage={"cpu_percent": 50.0},
|
||||
)
|
||||
|
||||
# Test that result can be converted to dict for JSON serialization
|
||||
result_dict = {
|
||||
"success": result.success,
|
||||
"container_id": result.container_id,
|
||||
"exit_code": result.exit_code,
|
||||
"stdout": result.stdout,
|
||||
"stderr": result.stderr,
|
||||
"execution_time": result.execution_time,
|
||||
"error": result.error,
|
||||
"resource_usage": result.resource_usage,
|
||||
}
|
||||
|
||||
assert result_dict["success"] is True
|
||||
assert result_dict["container_id"] == "test-id"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
@@ -1,341 +0,0 @@
|
||||
"""
|
||||
Integration test for complete Docker sandbox execution
|
||||
|
||||
Tests the full integration of Docker executor with sandbox manager,
|
||||
risk analysis, resource enforcement, and audit logging.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import json
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, Mock
|
||||
|
||||
from src.mai.sandbox.manager import SandboxManager, ExecutionRequest
|
||||
from src.mai.sandbox.audit_logger import AuditLogger
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestDockerSandboxIntegration:
|
||||
"""Integration tests for Docker sandbox execution"""
|
||||
|
||||
@pytest.fixture
|
||||
def temp_log_dir(self):
|
||||
"""Create temporary directory for audit logs"""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
yield temp_dir
|
||||
|
||||
@pytest.fixture
|
||||
def sandbox_manager(self, temp_log_dir):
|
||||
"""Create SandboxManager with temp log directory"""
|
||||
return SandboxManager(log_dir=temp_log_dir)
|
||||
|
||||
def test_full_docker_execution_workflow(self, sandbox_manager):
|
||||
"""Test complete Docker execution workflow"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(sandbox_manager.docker_executor, "execute_code") as mock_docker:
|
||||
# Mock Docker container execution
|
||||
from src.mai.sandbox.docker_executor import ContainerResult
|
||||
|
||||
mock_docker.return_value = {
|
||||
"success": True,
|
||||
"output": "42\n",
|
||||
"container_result": ContainerResult(
|
||||
success=True,
|
||||
container_id="integration-test-container",
|
||||
exit_code=0,
|
||||
stdout="42\n",
|
||||
stderr="",
|
||||
execution_time=2.3,
|
||||
resource_usage={
|
||||
"cpu_percent": 15.2,
|
||||
"memory_usage_mb": 28.5,
|
||||
"memory_percent": 5.5,
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
# Create execution request
|
||||
request = ExecutionRequest(
|
||||
code="result = 6 * 7\nprint(result)",
|
||||
use_docker=True,
|
||||
docker_image="python:3.10-slim",
|
||||
timeout_seconds=30,
|
||||
cpu_limit_percent=50.0,
|
||||
memory_limit_percent=40.0,
|
||||
network_allowed=False,
|
||||
filesystem_restricted=True,
|
||||
)
|
||||
|
||||
# Execute code
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify execution results
|
||||
assert result.success is True
|
||||
assert result.execution_method == "docker"
|
||||
assert result.output == "42\n"
|
||||
assert result.container_result is not None
|
||||
assert result.container_result.container_id == "integration-test-container"
|
||||
assert result.container_result.exit_code == 0
|
||||
assert result.container_result.execution_time == 2.3
|
||||
assert result.container_result.resource_usage["cpu_percent"] == 15.2
|
||||
assert result.container_result.resource_usage["memory_usage_mb"] == 28.5
|
||||
|
||||
# Verify Docker executor was called with correct parameters
|
||||
mock_docker.assert_called_once()
|
||||
call_args = mock_docker.call_args
|
||||
|
||||
# Check code was passed correctly
|
||||
assert call_args.args[0] == "result = 6 * 7\nprint(result)"
|
||||
|
||||
# Check container config
|
||||
config = call_args.kwargs["config"]
|
||||
assert config.image == "python:3.10-slim"
|
||||
assert config.timeout_seconds == 30
|
||||
assert config.memory_limit == "51m" # Scaled from 40% of 128m
|
||||
assert config.cpu_limit == "0.5" # 50% CPU
|
||||
assert config.network_disabled is True
|
||||
assert config.read_only_filesystem is True
|
||||
|
||||
# Verify audit logging occurred
|
||||
assert result.audit_entry_id is not None
|
||||
|
||||
# Check audit log contents
|
||||
logs = sandbox_manager.get_execution_history(limit=1)
|
||||
assert len(logs) == 1
|
||||
|
||||
log_entry = logs[0]
|
||||
assert log_entry["code"] == "result = 6 * 7\nprint(result)"
|
||||
assert log_entry["execution_result"]["success"] is True
|
||||
assert "docker_container" in log_entry["execution_result"]
|
||||
|
||||
def test_docker_execution_with_additional_files(self, sandbox_manager):
|
||||
"""Test Docker execution with additional files"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(sandbox_manager.docker_executor, "execute_code") as mock_docker:
|
||||
# Mock Docker execution
|
||||
from src.mai.sandbox.docker_executor import ContainerResult
|
||||
|
||||
mock_docker.return_value = {
|
||||
"success": True,
|
||||
"output": "Hello, Alice!\n",
|
||||
"container_result": ContainerResult(
|
||||
success=True,
|
||||
container_id="files-test-container",
|
||||
exit_code=0,
|
||||
stdout="Hello, Alice!\n",
|
||||
),
|
||||
}
|
||||
|
||||
# Create execution request with additional files
|
||||
request = ExecutionRequest(
|
||||
code="with open('template.txt', 'r') as f: template = f.read()\nprint(template.replace('{name}', 'Alice'))",
|
||||
use_docker=True,
|
||||
additional_files={"template.txt": "Hello, {name}!"},
|
||||
)
|
||||
|
||||
# Execute code
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify execution
|
||||
assert result.success is True
|
||||
assert result.execution_method == "docker"
|
||||
|
||||
# Verify Docker executor was called with files
|
||||
call_args = mock_docker.call_args
|
||||
assert "files" in call_args.kwargs
|
||||
assert call_args.kwargs["files"] == {"template.txt": "Hello, {name}!"}
|
||||
|
||||
def test_docker_execution_blocked_by_risk_analysis(self, sandbox_manager):
|
||||
"""Test that high-risk code is blocked before Docker execution"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(sandbox_manager.docker_executor, "execute_code") as mock_docker:
|
||||
# Risk analysis will automatically detect the dangerous pattern
|
||||
request = ExecutionRequest(
|
||||
code="import subprocess; subprocess.run(['rm', '-rf', '/'], shell=True)",
|
||||
use_docker=True,
|
||||
)
|
||||
|
||||
# Execute code
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify execution was blocked
|
||||
assert result.success is False
|
||||
assert "blocked" in result.error.lower()
|
||||
assert result.risk_assessment.score >= 70
|
||||
assert result.execution_method == "local" # Set before Docker check
|
||||
|
||||
# Docker executor should not be called
|
||||
mock_docker.assert_not_called()
|
||||
|
||||
# Should still be logged
|
||||
assert result.audit_entry_id is not None
|
||||
|
||||
def test_docker_execution_fallback_to_local(self, sandbox_manager):
|
||||
"""Test fallback to local execution when Docker unavailable"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=False):
|
||||
with patch.object(sandbox_manager, "_execute_in_sandbox") as mock_local:
|
||||
with patch.object(
|
||||
sandbox_manager.resource_enforcer, "stop_monitoring"
|
||||
) as mock_monitoring:
|
||||
# Mock local execution
|
||||
mock_local.return_value = {"success": True, "output": "Local fallback result"}
|
||||
|
||||
# Mock resource usage
|
||||
from src.mai.sandbox.resource_enforcer import ResourceUsage
|
||||
|
||||
mock_monitoring.return_value = ResourceUsage(
|
||||
cpu_percent=35.0,
|
||||
memory_percent=25.0,
|
||||
memory_used_gb=0.4,
|
||||
elapsed_seconds=1.8,
|
||||
approaching_limits=False,
|
||||
)
|
||||
|
||||
# Create request preferring Docker
|
||||
request = ExecutionRequest(
|
||||
code="print('fallback test')",
|
||||
use_docker=True, # But Docker is unavailable
|
||||
)
|
||||
|
||||
# Execute code
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify fallback to local execution
|
||||
assert result.success is True
|
||||
assert result.execution_method == "local"
|
||||
assert result.output == "Local fallback result"
|
||||
assert result.container_result is None
|
||||
assert result.resource_usage is not None
|
||||
assert result.resource_usage.cpu_percent == 35.0
|
||||
|
||||
# Verify local execution was used
|
||||
mock_local.assert_called_once()
|
||||
|
||||
def test_audit_logging_docker_execution_details(self, sandbox_manager):
|
||||
"""Test comprehensive audit logging for Docker execution"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(sandbox_manager.docker_executor, "execute_code") as mock_docker:
|
||||
# Mock Docker execution with detailed stats
|
||||
from src.mai.sandbox.docker_executor import ContainerResult
|
||||
|
||||
mock_docker.return_value = {
|
||||
"success": True,
|
||||
"output": "Calculation complete: 144\n",
|
||||
"container_result": ContainerResult(
|
||||
success=True,
|
||||
container_id="audit-test-container",
|
||||
exit_code=0,
|
||||
stdout="Calculation complete: 144\n",
|
||||
stderr="",
|
||||
execution_time=3.7,
|
||||
resource_usage={
|
||||
"cpu_percent": 22.8,
|
||||
"memory_usage_mb": 45.2,
|
||||
"memory_percent": 8.9,
|
||||
"memory_usage_bytes": 47395648,
|
||||
"memory_limit_bytes": 536870912,
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
# Execute request
|
||||
request = ExecutionRequest(
|
||||
code="result = 12 * 12\nprint(f'Calculation complete: {result}')",
|
||||
use_docker=True,
|
||||
docker_image="python:3.9-alpine",
|
||||
timeout_seconds=45,
|
||||
)
|
||||
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify audit log contains Docker execution details
|
||||
logs = sandbox_manager.get_execution_history(limit=1)
|
||||
assert len(logs) == 1
|
||||
|
||||
log_entry = logs[0]
|
||||
execution_result = log_entry["execution_result"]
|
||||
|
||||
# Check Docker-specific fields
|
||||
assert execution_result["type"] == "docker_container"
|
||||
assert execution_result["container_id"] == "audit-test-container"
|
||||
assert execution_result["exit_code"] == 0
|
||||
assert execution_result["stdout"] == "Calculation complete: 144\n"
|
||||
|
||||
# Check configuration details
|
||||
config = execution_result["config"]
|
||||
assert config["image"] == "python:3.9-alpine"
|
||||
assert config["timeout"] == 45
|
||||
assert config["network_disabled"] is True
|
||||
assert config["read_only_filesystem"] is True
|
||||
|
||||
# Check resource usage
|
||||
resource_usage = execution_result["resource_usage"]
|
||||
assert resource_usage["cpu_percent"] == 22.8
|
||||
assert resource_usage["memory_usage_mb"] == 45.2
|
||||
assert resource_usage["memory_percent"] == 8.9
|
||||
|
||||
def test_system_status_includes_docker_info(self, sandbox_manager):
|
||||
"""Test system status includes Docker information"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(
|
||||
sandbox_manager.docker_executor, "get_system_info"
|
||||
) as mock_docker_info:
|
||||
# Mock Docker system info
|
||||
mock_docker_info.return_value = {
|
||||
"available": True,
|
||||
"version": "20.10.12",
|
||||
"api_version": "1.41",
|
||||
"containers": 5,
|
||||
"containers_running": 2,
|
||||
"images": 8,
|
||||
"ncpu": 4,
|
||||
"memory_total": 8589934592,
|
||||
}
|
||||
|
||||
# Get system status
|
||||
status = sandbox_manager.get_system_status()
|
||||
|
||||
# Verify Docker information is included
|
||||
assert "docker_available" in status
|
||||
assert "docker_info" in status
|
||||
assert status["docker_available"] is True
|
||||
assert status["docker_info"]["available"] is True
|
||||
assert status["docker_info"]["version"] == "20.10.12"
|
||||
assert status["docker_info"]["containers"] == 5
|
||||
assert status["docker_info"]["images"] == 8
|
||||
|
||||
def test_docker_status_management(self, sandbox_manager):
|
||||
"""Test Docker status management functions"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(
|
||||
sandbox_manager.docker_executor, "get_available_images"
|
||||
) as mock_images:
|
||||
with patch.object(sandbox_manager.docker_executor, "pull_image") as mock_pull:
|
||||
with patch.object(
|
||||
sandbox_manager.docker_executor, "cleanup_containers"
|
||||
) as mock_cleanup:
|
||||
# Mock responses
|
||||
mock_images.return_value = ["python:3.10-slim", "python:3.9-alpine"]
|
||||
mock_pull.return_value = True
|
||||
mock_cleanup.return_value = 3
|
||||
|
||||
# Test get Docker status
|
||||
status = sandbox_manager.get_docker_status()
|
||||
assert status["available"] is True
|
||||
assert "python:3.10-slim" in status["images"]
|
||||
assert "python:3.9-alpine" in status["images"]
|
||||
|
||||
# Test pull image
|
||||
pull_result = sandbox_manager.pull_docker_image("node:16-alpine")
|
||||
assert pull_result is True
|
||||
mock_pull.assert_called_once_with("node:16-alpine")
|
||||
|
||||
# Test cleanup containers
|
||||
cleanup_count = sandbox_manager.cleanup_docker_containers()
|
||||
assert cleanup_count == 3
|
||||
mock_cleanup.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,632 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Comprehensive integration tests for Phase 1 requirements.
|
||||
|
||||
This module validates all Phase 1 components work together correctly.
|
||||
Tests cover model discovery, resource monitoring, model selection,
|
||||
context compression, git workflow, and end-to-end conversations.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import tempfile
|
||||
import shutil
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from pathlib import Path
|
||||
|
||||
# Add src to path for imports
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
|
||||
|
||||
# Mock missing dependencies first
|
||||
sys.modules["ollama"] = Mock()
|
||||
sys.modules["psutil"] = Mock()
|
||||
sys.modules["tiktoken"] = Mock()
|
||||
|
||||
|
||||
# Test availability of core components
|
||||
def check_imports():
|
||||
"""Check if all required imports are available."""
|
||||
test_results = {}
|
||||
|
||||
# Test each import
|
||||
imports_to_test = [
|
||||
("mai.core.interface", "MaiInterface"),
|
||||
("mai.model.resource_detector", "ResourceDetector"),
|
||||
("mai.model.compression", "ContextCompressor"),
|
||||
("mai.core.config", "Config"),
|
||||
("mai.core.exceptions", "MaiError"),
|
||||
("mai.git.workflow", "StagingWorkflow"),
|
||||
("mai.git.committer", "AutoCommitter"),
|
||||
("mai.git.health_check", "HealthChecker"),
|
||||
]
|
||||
|
||||
for module_name, class_name in imports_to_test:
|
||||
try:
|
||||
module = __import__(module_name, fromlist=[class_name])
|
||||
cls = getattr(module, class_name)
|
||||
test_results[f"{module_name}.{class_name}"] = "OK"
|
||||
except ImportError as e:
|
||||
test_results[f"{module_name}.{class_name}"] = f"IMPORT_ERROR: {e}"
|
||||
except AttributeError as e:
|
||||
test_results[f"{module_name}.{class_name}"] = f"CLASS_NOT_FOUND: {e}"
|
||||
|
||||
return test_results
|
||||
|
||||
|
||||
class TestComponentImports(unittest.TestCase):
|
||||
"""Test that all Phase 1 components can be imported."""
|
||||
|
||||
def test_all_components_import(self):
|
||||
"""Test that all required components can be imported."""
|
||||
results = check_imports()
|
||||
|
||||
# Print results for debugging
|
||||
print("\n=== Import Test Results ===")
|
||||
for component, status in results.items():
|
||||
print(f"{component}: {status}")
|
||||
|
||||
# Check that at least some imports work
|
||||
successful_imports = sum(1 for status in results.values() if status == "OK")
|
||||
self.assertGreater(
|
||||
successful_imports, 0, "At least one component should import successfully"
|
||||
)
|
||||
|
||||
|
||||
class TestResourceDetectionBasic(unittest.TestCase):
|
||||
"""Test basic resource detection functionality."""
|
||||
|
||||
def test_resource_info_structure(self):
|
||||
"""Test that ResourceInfo has required structure."""
|
||||
try:
|
||||
from mai.model.resource_detector import ResourceInfo
|
||||
|
||||
# Create a test ResourceInfo with correct attributes
|
||||
resources = ResourceInfo(
|
||||
cpu_percent=50.0,
|
||||
memory_total_gb=16.0,
|
||||
memory_available_gb=8.0,
|
||||
memory_percent=50.0,
|
||||
gpu_available=False,
|
||||
)
|
||||
|
||||
self.assertEqual(resources.cpu_percent, 50.0)
|
||||
self.assertEqual(resources.memory_total_gb, 16.0)
|
||||
self.assertEqual(resources.memory_available_gb, 8.0)
|
||||
self.assertEqual(resources.memory_percent, 50.0)
|
||||
self.assertEqual(resources.gpu_available, False)
|
||||
except ImportError:
|
||||
self.skipTest("ResourceDetector not available")
|
||||
|
||||
def test_resource_detector_basic(self):
|
||||
"""Test ResourceDetector can be instantiated."""
|
||||
try:
|
||||
from mai.model.resource_detector import ResourceDetector
|
||||
|
||||
detector = ResourceDetector()
|
||||
self.assertIsNotNone(detector)
|
||||
except ImportError:
|
||||
self.skipTest("ResourceDetector not available")
|
||||
|
||||
|
||||
class TestContextCompressionBasic(unittest.TestCase):
|
||||
"""Test basic context compression functionality."""
|
||||
|
||||
def test_context_compressor_instantiation(self):
|
||||
"""Test ContextCompressor can be instantiated."""
|
||||
try:
|
||||
from mai.model.compression import ContextCompressor
|
||||
|
||||
compressor = ContextCompressor()
|
||||
self.assertIsNotNone(compressor)
|
||||
except ImportError:
|
||||
self.skipTest("ContextCompressor not available")
|
||||
|
||||
def test_token_counting_basic(self):
|
||||
"""Test basic token counting functionality."""
|
||||
try:
|
||||
from mai.model.compression import ContextCompressor, TokenInfo
|
||||
|
||||
compressor = ContextCompressor()
|
||||
tokens = compressor.count_tokens("Hello, world!")
|
||||
|
||||
self.assertIsInstance(tokens, TokenInfo)
|
||||
self.assertGreater(tokens.count, 0)
|
||||
self.assertIsInstance(tokens.model_name, str)
|
||||
self.assertGreater(len(tokens.model_name), 0)
|
||||
self.assertIsInstance(tokens.accuracy, float)
|
||||
self.assertGreaterEqual(tokens.accuracy, 0.0)
|
||||
self.assertLessEqual(tokens.accuracy, 1.0)
|
||||
except (ImportError, AttributeError):
|
||||
self.skipTest("ContextCompressor not fully available")
|
||||
|
||||
def test_token_info_structure(self):
|
||||
"""Test TokenInfo object structure and attributes."""
|
||||
try:
|
||||
from mai.model.compression import ContextCompressor, TokenInfo
|
||||
|
||||
compressor = ContextCompressor()
|
||||
tokens = compressor.count_tokens("Test string for structure validation")
|
||||
|
||||
# Test TokenInfo structure
|
||||
self.assertIsInstance(tokens, TokenInfo)
|
||||
self.assertTrue(hasattr(tokens, "count"))
|
||||
self.assertTrue(hasattr(tokens, "model_name"))
|
||||
self.assertTrue(hasattr(tokens, "accuracy"))
|
||||
|
||||
# Test attribute types
|
||||
self.assertIsInstance(tokens.count, int)
|
||||
self.assertIsInstance(tokens.model_name, str)
|
||||
self.assertIsInstance(tokens.accuracy, float)
|
||||
|
||||
# Test attribute values
|
||||
self.assertGreaterEqual(tokens.count, 0)
|
||||
self.assertGreater(len(tokens.model_name), 0)
|
||||
self.assertGreaterEqual(tokens.accuracy, 0.0)
|
||||
self.assertLessEqual(tokens.accuracy, 1.0)
|
||||
except (ImportError, AttributeError):
|
||||
self.skipTest("ContextCompressor not fully available")
|
||||
|
||||
def test_token_counting_accuracy(self):
|
||||
"""Test token counting accuracy for various text lengths."""
|
||||
try:
|
||||
from mai.model.compression import ContextCompressor
|
||||
|
||||
compressor = ContextCompressor()
|
||||
|
||||
# Test with different text lengths
|
||||
test_cases = [
|
||||
("", 0, 5), # Empty string
|
||||
("Hello", 1, 10), # Short text
|
||||
("Hello, world! This is a test.", 5, 15), # Medium text
|
||||
(
|
||||
"This is a longer text to test token counting accuracy across multiple sentences and paragraphs. "
|
||||
* 3,
|
||||
50,
|
||||
200,
|
||||
), # Long text
|
||||
]
|
||||
|
||||
for text, min_expected, max_expected in test_cases:
|
||||
with self.subTest(text_length=len(text)):
|
||||
tokens = compressor.count_tokens(text)
|
||||
self.assertGreaterEqual(
|
||||
tokens.count,
|
||||
min_expected,
|
||||
f"Token count {tokens.count} below minimum {min_expected} for text: {text[:50]}...",
|
||||
)
|
||||
self.assertLessEqual(
|
||||
tokens.count,
|
||||
max_expected,
|
||||
f"Token count {tokens.count} above maximum {max_expected} for text: {text[:50]}...",
|
||||
)
|
||||
|
||||
# Test accuracy is reasonable
|
||||
self.assertGreaterEqual(tokens.accuracy, 0.7, "Accuracy should be at least 70%")
|
||||
self.assertLessEqual(tokens.accuracy, 1.0, "Accuracy should not exceed 100%")
|
||||
|
||||
except (ImportError, AttributeError):
|
||||
self.skipTest("ContextCompressor not fully available")
|
||||
|
||||
def test_token_fallback_behavior(self):
|
||||
"""Test token counting fallback behavior when tiktoken unavailable."""
|
||||
try:
|
||||
from mai.model.compression import ContextCompressor
|
||||
from unittest.mock import patch
|
||||
|
||||
compressor = ContextCompressor()
|
||||
test_text = "Testing fallback behavior with a reasonable text length"
|
||||
|
||||
# Test normal behavior first
|
||||
tokens_normal = compressor.count_tokens(test_text)
|
||||
self.assertIsInstance(tokens_normal, type(tokens_normal))
|
||||
self.assertGreater(tokens_normal.count, 0)
|
||||
|
||||
# Test with mocked tiktoken error to trigger fallback
|
||||
with patch("tiktoken.encoding_for_model") as mock_encoding:
|
||||
mock_encoding.side_effect = Exception("tiktoken not available")
|
||||
|
||||
tokens_fallback = compressor.count_tokens(test_text)
|
||||
|
||||
# Both should return TokenInfo objects
|
||||
self.assertEqual(type(tokens_normal), type(tokens_fallback))
|
||||
self.assertIsInstance(tokens_fallback, type(tokens_fallback))
|
||||
self.assertGreater(tokens_fallback.count, 0)
|
||||
|
||||
# Fallback might be less accurate but should still be reasonable
|
||||
self.assertGreaterEqual(tokens_fallback.accuracy, 0.7)
|
||||
self.assertLessEqual(tokens_fallback.accuracy, 1.0)
|
||||
|
||||
except (ImportError, AttributeError):
|
||||
self.skipTest("ContextCompressor not fully available")
|
||||
|
||||
def test_token_edge_cases(self):
|
||||
"""Test token counting with edge cases."""
|
||||
try:
|
||||
from mai.model.compression import ContextCompressor
|
||||
|
||||
compressor = ContextCompressor()
|
||||
|
||||
# Edge cases to test
|
||||
edge_cases = [
|
||||
("", "Empty string"),
|
||||
(" ", "Single space"),
|
||||
("\n", "Single newline"),
|
||||
("\t", "Single tab"),
|
||||
(" ", "Multiple spaces"),
|
||||
("Hello\nworld", "Text with newline"),
|
||||
("Special chars: !@#$%^&*()", "Special characters"),
|
||||
("Unicode: ñáéíóú 🤖", "Unicode characters"),
|
||||
("Numbers: 1234567890", "Numbers"),
|
||||
("Mixed: Hello123!@#world", "Mixed content"),
|
||||
]
|
||||
|
||||
for text, description in edge_cases:
|
||||
with self.subTest(case=description):
|
||||
tokens = compressor.count_tokens(text)
|
||||
|
||||
# All should return TokenInfo
|
||||
self.assertIsInstance(tokens, type(tokens))
|
||||
self.assertGreaterEqual(
|
||||
tokens.count, 0, f"Token count should be >= 0 for {description}"
|
||||
)
|
||||
|
||||
# Model name and accuracy should be set
|
||||
self.assertGreater(
|
||||
len(tokens.model_name),
|
||||
0,
|
||||
f"Model name should not be empty for {description}",
|
||||
)
|
||||
self.assertGreaterEqual(
|
||||
tokens.accuracy, 0.7, f"Accuracy should be reasonable for {description}"
|
||||
)
|
||||
self.assertLessEqual(
|
||||
tokens.accuracy, 1.0, f"Accuracy should not exceed 100% for {description}"
|
||||
)
|
||||
|
||||
except (ImportError, AttributeError):
|
||||
self.skipTest("ContextCompressor not fully available")
|
||||
|
||||
|
||||
class TestConfigSystem(unittest.TestCase):
|
||||
"""Test configuration system functionality."""
|
||||
|
||||
def test_config_instantiation(self):
|
||||
"""Test Config can be instantiated."""
|
||||
try:
|
||||
from mai.core.config import Config
|
||||
|
||||
config = Config()
|
||||
self.assertIsNotNone(config)
|
||||
except ImportError:
|
||||
self.skipTest("Config not available")
|
||||
|
||||
def test_config_validation(self):
|
||||
"""Test configuration validation."""
|
||||
try:
|
||||
from mai.core.config import Config
|
||||
|
||||
config = Config()
|
||||
# Test basic validation
|
||||
self.assertIsNotNone(config)
|
||||
except ImportError:
|
||||
self.skipTest("Config not available")
|
||||
|
||||
|
||||
class TestGitWorkflowBasic(unittest.TestCase):
|
||||
"""Test basic git workflow functionality."""
|
||||
|
||||
def test_staging_workflow_instantiation(self):
|
||||
"""Test StagingWorkflow can be instantiated."""
|
||||
try:
|
||||
from mai.git.workflow import StagingWorkflow
|
||||
|
||||
workflow = StagingWorkflow()
|
||||
self.assertIsNotNone(workflow)
|
||||
except ImportError:
|
||||
self.skipTest("StagingWorkflow not available")
|
||||
|
||||
def test_auto_committer_instantiation(self):
|
||||
"""Test AutoCommitter can be instantiated."""
|
||||
try:
|
||||
from mai.git.committer import AutoCommitter
|
||||
|
||||
committer = AutoCommitter()
|
||||
self.assertIsNotNone(committer)
|
||||
except ImportError:
|
||||
self.skipTest("AutoCommitter not available")
|
||||
|
||||
def test_health_checker_instantiation(self):
|
||||
"""Test HealthChecker can be instantiated."""
|
||||
try:
|
||||
from mai.git.health_check import HealthChecker
|
||||
|
||||
checker = HealthChecker()
|
||||
self.assertIsNotNone(checker)
|
||||
except ImportError:
|
||||
self.skipTest("HealthChecker not available")
|
||||
|
||||
|
||||
class TestExceptionHandling(unittest.TestCase):
|
||||
"""Test exception handling system."""
|
||||
|
||||
def test_exception_hierarchy(self):
|
||||
"""Test exception hierarchy exists."""
|
||||
try:
|
||||
from mai.core.exceptions import (
|
||||
MaiError,
|
||||
ModelError,
|
||||
ConfigurationError,
|
||||
ModelConnectionError,
|
||||
)
|
||||
|
||||
# Test exception inheritance
|
||||
self.assertTrue(issubclass(ModelError, MaiError))
|
||||
self.assertTrue(issubclass(ConfigurationError, MaiError))
|
||||
self.assertTrue(issubclass(ModelConnectionError, ModelError))
|
||||
|
||||
# Test instantiation
|
||||
error = MaiError("Test error")
|
||||
self.assertEqual(str(error), "Test error")
|
||||
except ImportError:
|
||||
self.skipTest("Exception hierarchy not available")
|
||||
|
||||
|
||||
class TestFileStructure(unittest.TestCase):
|
||||
"""Test that all required files exist with proper structure."""
|
||||
|
||||
def test_core_files_exist(self):
|
||||
"""Test that all core files exist."""
|
||||
required_files = [
|
||||
"src/mai/core/interface.py",
|
||||
"src/mai/model/ollama_client.py",
|
||||
"src/mai/model/resource_detector.py",
|
||||
"src/mai/model/compression.py",
|
||||
"src/mai/core/config.py",
|
||||
"src/mai/core/exceptions.py",
|
||||
"src/mai/git/workflow.py",
|
||||
"src/mai/git/committer.py",
|
||||
"src/mai/git/health_check.py",
|
||||
]
|
||||
|
||||
project_root = os.path.dirname(os.path.dirname(__file__))
|
||||
|
||||
for file_path in required_files:
|
||||
full_path = os.path.join(project_root, file_path)
|
||||
self.assertTrue(os.path.exists(full_path), f"Required file {file_path} does not exist")
|
||||
|
||||
def test_minimum_file_sizes(self):
|
||||
"""Test that files meet minimum size requirements."""
|
||||
min_lines = 40 # From plan requirements
|
||||
|
||||
test_file = os.path.join(os.path.dirname(__file__), "test_integration.py")
|
||||
with open(test_file, "r") as f:
|
||||
lines = f.readlines()
|
||||
|
||||
self.assertGreaterEqual(
|
||||
len(lines), min_lines, f"Integration test file must have at least {min_lines} lines"
|
||||
)
|
||||
|
||||
|
||||
class TestPhase1Requirements(unittest.TestCase):
|
||||
"""Test that Phase 1 requirements are satisfied."""
|
||||
|
||||
def test_requirement_1_model_discovery(self):
|
||||
"""Requirement 1: Model discovery and capability detection."""
|
||||
try:
|
||||
from mai.core.interface import MaiInterface
|
||||
|
||||
# Test interface has list_models method
|
||||
interface = MaiInterface()
|
||||
self.assertTrue(hasattr(interface, "list_models"))
|
||||
except ImportError:
|
||||
self.skipTest("MaiInterface not available")
|
||||
|
||||
def test_requirement_2_resource_monitoring(self):
|
||||
"""Requirement 2: Resource monitoring and constraint detection."""
|
||||
try:
|
||||
from mai.model.resource_detector import ResourceDetector
|
||||
|
||||
detector = ResourceDetector()
|
||||
self.assertTrue(hasattr(detector, "detect_resources"))
|
||||
except ImportError:
|
||||
self.skipTest("ResourceDetector not available")
|
||||
|
||||
def test_requirement_3_model_selection(self):
|
||||
"""Requirement 3: Intelligent model selection."""
|
||||
try:
|
||||
from mai.core.interface import MaiInterface
|
||||
|
||||
interface = MaiInterface()
|
||||
# Should have model selection capability
|
||||
self.assertIsNotNone(interface)
|
||||
except ImportError:
|
||||
self.skipTest("MaiInterface not available")
|
||||
|
||||
def test_requirement_4_context_compression(self):
|
||||
"""Requirement 4: Context compression for model switching."""
|
||||
try:
|
||||
from mai.model.compression import ContextCompressor
|
||||
|
||||
compressor = ContextCompressor()
|
||||
self.assertTrue(hasattr(compressor, "count_tokens"))
|
||||
except ImportError:
|
||||
self.skipTest("ContextCompressor not available")
|
||||
|
||||
def test_requirement_5_git_integration(self):
|
||||
"""Requirement 5: Git workflow automation."""
|
||||
# Check if GitPython is available
|
||||
try:
|
||||
import git
|
||||
except ImportError:
|
||||
self.skipTest("GitPython not available - git integration tests skipped")
|
||||
|
||||
git_components = [
|
||||
("mai.git.workflow", "StagingWorkflow"),
|
||||
("mai.git.committer", "AutoCommitter"),
|
||||
("mai.git.health_check", "HealthChecker"),
|
||||
]
|
||||
|
||||
available_count = 0
|
||||
for module_name, class_name in git_components:
|
||||
try:
|
||||
module = __import__(module_name, fromlist=[class_name])
|
||||
cls = getattr(module, class_name)
|
||||
available_count += 1
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# At least one git component should be available if GitPython is installed
|
||||
# If GitPython is installed but no components are available, that's a problem
|
||||
if available_count == 0:
|
||||
# Check if the source files actually exist
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
src_path = Path(__file__).parent.parent / "src" / "mai" / "git"
|
||||
if src_path.exists():
|
||||
git_files = list(src_path.glob("*.py"))
|
||||
if git_files:
|
||||
self.fail(
|
||||
f"Git files exist but no git components importable. Files: {[f.name for f in git_files]}"
|
||||
)
|
||||
return
|
||||
|
||||
# If we get here, either components are available or they don't exist yet
|
||||
# Both are acceptable states for Phase 1 validation
|
||||
self.assertTrue(True, "Git integration validation completed")
|
||||
|
||||
|
||||
class TestErrorHandlingGracefulDegradation(unittest.TestCase):
|
||||
"""Test error handling and graceful degradation."""
|
||||
|
||||
def test_missing_dependency_handling(self):
|
||||
"""Test handling of missing dependencies."""
|
||||
# Mock missing ollama dependency
|
||||
with patch.dict("sys.modules", {"ollama": None}):
|
||||
try:
|
||||
from mai.model.ollama_client import OllamaClient
|
||||
|
||||
# If import succeeds, test that it handles missing dependency
|
||||
client = OllamaClient()
|
||||
self.assertIsNotNone(client)
|
||||
except ImportError:
|
||||
# Expected behavior - import should fail gracefully
|
||||
pass
|
||||
|
||||
def test_resource_exhaustion_simulation(self):
|
||||
"""Test behavior with simulated resource exhaustion."""
|
||||
try:
|
||||
from mai.model.resource_detector import ResourceInfo
|
||||
|
||||
# Create exhausted resource scenario with correct attributes
|
||||
exhausted = ResourceInfo(
|
||||
cpu_percent=95.0,
|
||||
memory_total_gb=16.0,
|
||||
memory_available_gb=0.1, # Very low (100MB)
|
||||
memory_percent=99.4, # Almost all memory used
|
||||
gpu_available=False,
|
||||
)
|
||||
|
||||
# ResourceInfo should handle extreme values
|
||||
self.assertEqual(exhausted.cpu_percent, 95.0)
|
||||
self.assertEqual(exhausted.memory_available_gb, 0.1)
|
||||
self.assertEqual(exhausted.memory_percent, 99.4)
|
||||
except ImportError:
|
||||
self.skipTest("ResourceInfo not available")
|
||||
|
||||
|
||||
class TestPerformanceRegression(unittest.TestCase):
|
||||
"""Test performance regression detection."""
|
||||
|
||||
def test_import_time_performance(self):
|
||||
"""Test that import time is reasonable."""
|
||||
import_time_start = time.time()
|
||||
|
||||
# Try to import main components
|
||||
try:
|
||||
from mai.core.config import Config
|
||||
from mai.core.exceptions import MaiError
|
||||
|
||||
config = Config()
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
import_time = time.time() - import_time_start
|
||||
|
||||
# Imports should complete within reasonable time (< 5 seconds)
|
||||
self.assertLess(import_time, 5.0, "Import time should be reasonable")
|
||||
|
||||
def test_instantiation_performance(self):
|
||||
"""Test that component instantiation is performant."""
|
||||
times = []
|
||||
|
||||
# Test multiple instantiations
|
||||
for _ in range(5):
|
||||
start_time = time.time()
|
||||
try:
|
||||
from mai.core.config import Config
|
||||
|
||||
config = Config()
|
||||
except ImportError:
|
||||
pass
|
||||
times.append(time.time() - start_time)
|
||||
|
||||
avg_time = sum(times) / len(times)
|
||||
|
||||
# Average instantiation should be fast (< 1 second)
|
||||
self.assertLess(avg_time, 1.0, "Component instantiation should be fast")
|
||||
|
||||
|
||||
def run_phase1_validation():
|
||||
"""Run comprehensive Phase 1 validation."""
|
||||
print("\n" + "=" * 60)
|
||||
print("PHASE 1 INTEGRATION TEST VALIDATION")
|
||||
print("=" * 60)
|
||||
|
||||
# Run import checks
|
||||
import_results = check_imports()
|
||||
print("\n1. COMPONENT IMPORT VALIDATION:")
|
||||
for component, status in import_results.items():
|
||||
status_symbol = "✓" if status == "OK" else "✗"
|
||||
print(f" {status_symbol} {component}: {status}")
|
||||
|
||||
# Count successful imports
|
||||
successful = sum(1 for s in import_results.values() if s == "OK")
|
||||
total = len(import_results)
|
||||
print(f"\n Import Success Rate: {successful}/{total} ({successful / total * 100:.1f}%)")
|
||||
|
||||
# Run unit tests
|
||||
print("\n2. FUNCTIONAL TESTS:")
|
||||
loader = unittest.TestLoader()
|
||||
suite = loader.loadTestsFromModule(sys.modules[__name__])
|
||||
runner = unittest.TextTestRunner(verbosity=1)
|
||||
result = runner.run(suite)
|
||||
|
||||
# Summary
|
||||
print("\n" + "=" * 60)
|
||||
print("PHASE 1 VALIDATION SUMMARY")
|
||||
print("=" * 60)
|
||||
print(f"Tests run: {result.testsRun}")
|
||||
print(f"Failures: {len(result.failures)}")
|
||||
print(f"Errors: {len(result.errors)}")
|
||||
print(f"Skipped: {len(result.skipped)}")
|
||||
|
||||
success_rate = (
|
||||
(result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun * 100
|
||||
)
|
||||
print(f"Success Rate: {success_rate:.1f}%")
|
||||
|
||||
if success_rate >= 80:
|
||||
print("✓ PHASE 1 VALIDATION: PASSED")
|
||||
else:
|
||||
print("✗ PHASE 1 VALIDATION: FAILED")
|
||||
|
||||
return result.wasSuccessful()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run Phase 1 validation
|
||||
success = run_phase1_validation()
|
||||
sys.exit(0 if success else 1)
|
||||
@@ -1,351 +0,0 @@
|
||||
"""
|
||||
Comprehensive test suite for Mai Memory System
|
||||
|
||||
Tests all memory components including storage, compression, retrieval, and CLI integration.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import shutil
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
# Add src to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
# Import CLI interface - this should work
|
||||
from mai.core.interface import show_memory_status, search_memory, manage_memory
|
||||
|
||||
# Try to import memory components - they might not work due to dependencies
|
||||
try:
|
||||
from mai.memory.storage import MemoryStorage, MemoryStorageError
|
||||
from mai.memory.compression import MemoryCompressor, CompressionResult
|
||||
from mai.memory.retrieval import ContextRetriever, SearchQuery, MemoryContext
|
||||
from mai.memory.manager import MemoryManager, MemoryStats
|
||||
from mai.models.conversation import Conversation, Message
|
||||
from mai.models.memory import MemoryContext as ModelMemoryContext
|
||||
|
||||
MEMORY_COMPONENTS_AVAILABLE = True
|
||||
except ImportError as e:
|
||||
print(f"Memory components not available: {e}")
|
||||
MEMORY_COMPONENTS_AVAILABLE = False
|
||||
|
||||
|
||||
class TestCLIInterface:
|
||||
"""Test CLI interface functions - these should always work."""
|
||||
|
||||
def test_show_memory_status(self):
|
||||
"""Test show_memory_status CLI function."""
|
||||
result = show_memory_status()
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, dict)
|
||||
|
||||
# Should contain memory status information
|
||||
if "memory_enabled" in result:
|
||||
assert isinstance(result["memory_enabled"], bool)
|
||||
|
||||
if "error" in result:
|
||||
# Memory system might not be initialized, that's okay for test
|
||||
assert isinstance(result["error"], str)
|
||||
|
||||
def test_search_memory(self):
|
||||
"""Test search_memory CLI function."""
|
||||
result = search_memory("test query")
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, dict)
|
||||
|
||||
if "success" in result:
|
||||
assert isinstance(result["success"], bool)
|
||||
|
||||
if "results" in result:
|
||||
assert isinstance(result["results"], list)
|
||||
|
||||
if "error" in result:
|
||||
# Memory system might not be initialized, that's okay for test
|
||||
assert isinstance(result["error"], str)
|
||||
|
||||
def test_manage_memory(self):
|
||||
"""Test manage_memory CLI function."""
|
||||
# Test stats action (should work even without memory system)
|
||||
result = manage_memory("stats")
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, dict)
|
||||
assert result.get("action") == "stats"
|
||||
|
||||
if "success" in result:
|
||||
assert isinstance(result["success"], bool)
|
||||
|
||||
if "error" in result:
|
||||
# Memory system might not be initialized, that's okay for test
|
||||
assert isinstance(result["error"], str)
|
||||
|
||||
|
||||
def test_manage_memory_unknown_action(self):
|
||||
"""Test manage_memory with unknown action."""
|
||||
result = manage_memory("unknown_action")
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, dict)
|
||||
assert result.get("success") is False
|
||||
# Check if error mentions unknown action or memory system not available
|
||||
error_msg = result.get("error", "").lower()
|
||||
assert "unknown" in error_msg or "memory system not available" in error_msg
|
||||
|
||||
|
||||
@pytest.mark.skipif(not MEMORY_COMPONENTS_AVAILABLE, reason="Memory components not available")
|
||||
class TestMemoryStorage:
|
||||
"""Test memory storage functionality."""
|
||||
|
||||
@pytest.fixture
|
||||
def temp_db(self):
|
||||
"""Create temporary database for testing."""
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
db_path = os.path.join(temp_dir, "test_memory.db")
|
||||
yield db_path
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
|
||||
def test_storage_initialization(self, temp_db):
|
||||
"""Test that storage initializes correctly."""
|
||||
try:
|
||||
storage = MemoryStorage(database_path=temp_db)
|
||||
assert storage is not None
|
||||
except Exception as e:
|
||||
# Storage might fail due to missing dependencies
|
||||
pytest.skip(f"Storage initialization failed: {e}")
|
||||
|
||||
def test_conversation_storage(self, temp_db):
|
||||
"""Test storing and retrieving conversations."""
|
||||
try:
|
||||
storage = MemoryStorage(database_path=temp_db)
|
||||
|
||||
# Create test conversation with minimal required fields
|
||||
conversation = Conversation(
|
||||
title="Test Conversation",
|
||||
messages=[
|
||||
Message(role="user", content="Hello", timestamp=datetime.now()),
|
||||
Message(role="assistant", content="Hi there!", timestamp=datetime.now()),
|
||||
],
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
)
|
||||
|
||||
# Store conversation
|
||||
conv_id = storage.store_conversation(conversation)
|
||||
assert conv_id is not None
|
||||
|
||||
except Exception as e:
|
||||
pytest.skip(f"Conversation storage test failed: {e}")
|
||||
|
||||
def test_conversation_search(self, temp_db):
|
||||
"""Test searching conversations."""
|
||||
try:
|
||||
storage = MemoryStorage(database_path=temp_db)
|
||||
|
||||
# Store test conversations
|
||||
conv1 = Conversation(
|
||||
title="Python Programming",
|
||||
messages=[
|
||||
Message(role="user", content="How to use Python?", timestamp=datetime.now())
|
||||
],
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
)
|
||||
conv2 = Conversation(
|
||||
title="Machine Learning",
|
||||
messages=[Message(role="user", content="What is ML?", timestamp=datetime.now())],
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
)
|
||||
|
||||
storage.store_conversation(conv1)
|
||||
storage.store_conversation(conv2)
|
||||
|
||||
# Search for Python
|
||||
results = storage.search_conversations("Python", limit=10)
|
||||
assert isinstance(results, list)
|
||||
|
||||
except Exception as e:
|
||||
pytest.skip(f"Conversation search test failed: {e}")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not MEMORY_COMPONENTS_AVAILABLE, reason="Memory components not available")
|
||||
class TestMemoryCompression:
|
||||
"""Test memory compression functionality."""
|
||||
|
||||
@pytest.fixture
|
||||
def compressor(self):
|
||||
"""Create compressor instance."""
|
||||
try:
|
||||
return MemoryCompressor()
|
||||
except Exception as e:
|
||||
pytest.skip(f"Compressor initialization failed: {e}")
|
||||
|
||||
def test_conversation_compression(self, compressor):
|
||||
"""Test conversation compression."""
|
||||
try:
|
||||
# Create test conversation
|
||||
conversation = Conversation(
|
||||
title="Long Conversation",
|
||||
messages=[
|
||||
Message(role="user", content=f"Message {i}", timestamp=datetime.now())
|
||||
for i in range(10) # Smaller for testing
|
||||
],
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
)
|
||||
|
||||
# Compress
|
||||
result = compressor.compress_conversation(conversation)
|
||||
|
||||
assert result is not None
|
||||
|
||||
except Exception as e:
|
||||
pytest.skip(f"Conversation compression test failed: {e}")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not MEMORY_COMPONENTS_AVAILABLE, reason="Memory components not available")
|
||||
class TestMemoryManager:
|
||||
"""Test memory manager orchestration."""
|
||||
|
||||
@pytest.fixture
|
||||
def temp_manager(self):
|
||||
"""Create memory manager with temporary storage."""
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
db_path = os.path.join(temp_dir, "test_manager.db")
|
||||
|
||||
try:
|
||||
# Mock the storage path
|
||||
with patch("mai.memory.manager.MemoryStorage") as mock_storage:
|
||||
mock_storage.return_value = MemoryStorage(database_path=db_path)
|
||||
manager = MemoryManager()
|
||||
yield manager
|
||||
except Exception as e:
|
||||
# If manager fails, create a mock
|
||||
mock_manager = Mock(spec=MemoryManager)
|
||||
mock_manager.get_memory_stats.return_value = MemoryStats()
|
||||
mock_manager.store_conversation.return_value = "test-conv-id"
|
||||
mock_manager.get_context.return_value = ModelMemoryContext(
|
||||
relevant_conversations=[], total_conversations=0, estimated_tokens=0, metadata={}
|
||||
)
|
||||
mock_manager.search_conversations.return_value = []
|
||||
yield mock_manager
|
||||
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
|
||||
def test_conversation_storage(self, temp_manager):
|
||||
"""Test conversation storage through manager."""
|
||||
try:
|
||||
messages = [
|
||||
{"role": "user", "content": "Hello"},
|
||||
{"role": "assistant", "content": "Hi there!"},
|
||||
]
|
||||
|
||||
conv_id = temp_manager.store_conversation(messages=messages, metadata={"test": True})
|
||||
|
||||
assert conv_id is not None
|
||||
assert isinstance(conv_id, str)
|
||||
|
||||
except Exception as e:
|
||||
pytest.skip(f"Manager conversation storage test failed: {e}")
|
||||
|
||||
def test_memory_stats(self, temp_manager):
|
||||
"""Test memory statistics through manager."""
|
||||
try:
|
||||
stats = temp_manager.get_memory_stats()
|
||||
assert stats is not None
|
||||
assert isinstance(stats, MemoryStats)
|
||||
|
||||
except Exception as e:
|
||||
pytest.skip(f"Manager memory stats test failed: {e}")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not MEMORY_COMPONENTS_AVAILABLE, reason="Memory components not available")
|
||||
class TestContextRetrieval:
|
||||
"""Test context retrieval functionality."""
|
||||
|
||||
@pytest.fixture
|
||||
def retriever(self):
|
||||
"""Create retriever instance."""
|
||||
try:
|
||||
return ContextRetriever()
|
||||
except Exception as e:
|
||||
pytest.skip(f"Retriever initialization failed: {e}")
|
||||
|
||||
def test_context_retrieval(self, retriever):
|
||||
"""Test context retrieval for query."""
|
||||
try:
|
||||
query = SearchQuery(text="Python programming", max_results=5)
|
||||
|
||||
context = retriever.get_context(query)
|
||||
|
||||
assert context is not None
|
||||
assert isinstance(context, ModelMemoryContext)
|
||||
|
||||
except Exception as e:
|
||||
pytest.skip(f"Context retrieval test failed: {e}")
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests for memory system."""
|
||||
|
||||
def test_end_to_end_workflow(self):
|
||||
"""Test complete workflow: store -> search -> compress."""
|
||||
# This is a smoke test to verify the basic workflow doesn't crash
|
||||
# Individual components are tested in their respective test classes
|
||||
|
||||
# Test CLI functions don't crash
|
||||
status = show_memory_status()
|
||||
assert isinstance(status, dict)
|
||||
|
||||
search_result = search_memory("test")
|
||||
assert isinstance(search_result, dict)
|
||||
|
||||
manage_result = manage_memory("stats")
|
||||
assert isinstance(manage_result, dict)
|
||||
|
||||
|
||||
# Performance and stress tests
|
||||
class TestPerformance:
|
||||
"""Performance tests for memory system."""
|
||||
|
||||
def test_search_performance(self):
|
||||
"""Test search performance with larger datasets."""
|
||||
try:
|
||||
# This would require setting up a larger test dataset
|
||||
# For now, just verify the function exists and returns reasonable timing
|
||||
start_time = time.time()
|
||||
result = search_memory("performance test")
|
||||
end_time = time.time()
|
||||
|
||||
search_time = end_time - start_time
|
||||
assert search_time < 5.0 # Should complete within 5 seconds
|
||||
assert isinstance(result, dict)
|
||||
|
||||
except ImportError:
|
||||
pytest.skip("Memory system dependencies not available")
|
||||
|
||||
def test_memory_stats_performance(self):
|
||||
"""Test memory stats calculation performance."""
|
||||
try:
|
||||
start_time = time.time()
|
||||
result = show_memory_status()
|
||||
end_time = time.time()
|
||||
|
||||
stats_time = end_time - start_time
|
||||
assert stats_time < 2.0 # Should complete within 2 seconds
|
||||
assert isinstance(result, dict)
|
||||
|
||||
except ImportError:
|
||||
pytest.skip("Memory system dependencies not available")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run tests if script is executed directly
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,409 +0,0 @@
|
||||
"""
|
||||
Test suite for ApprovalSystem
|
||||
|
||||
This module provides comprehensive testing for the risk-based approval system
|
||||
including user interaction, trust management, and edge cases.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import time
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "src"))
|
||||
|
||||
from mai.sandbox.approval_system import (
|
||||
ApprovalSystem,
|
||||
RiskLevel,
|
||||
ApprovalResult,
|
||||
RiskAnalysis,
|
||||
ApprovalRequest,
|
||||
ApprovalDecision,
|
||||
)
|
||||
|
||||
|
||||
class TestApprovalSystem:
|
||||
"""Test cases for ApprovalSystem."""
|
||||
|
||||
@pytest.fixture
|
||||
def approval_system(self):
|
||||
"""Create fresh ApprovalSystem for each test."""
|
||||
with patch("mai.sandbox.approval_system.get_config") as mock_config:
|
||||
mock_config.return_value = Mock()
|
||||
mock_config.return_value.get.return_value = {
|
||||
"low_threshold": 0.3,
|
||||
"medium_threshold": 0.6,
|
||||
"high_threshold": 0.8,
|
||||
}
|
||||
return ApprovalSystem()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_low_risk_code(self):
|
||||
"""Sample low-risk code."""
|
||||
return 'print("hello world")'
|
||||
|
||||
@pytest.fixture
|
||||
def mock_medium_risk_code(self):
|
||||
"""Sample medium-risk code."""
|
||||
return "import os\nprint(os.getcwd())"
|
||||
|
||||
@pytest.fixture
|
||||
def mock_high_risk_code(self):
|
||||
"""Sample high-risk code."""
|
||||
return 'import subprocess\nsubprocess.call(["ls", "-la"])'
|
||||
|
||||
@pytest.fixture
|
||||
def mock_blocked_code(self):
|
||||
"""Sample blocked code."""
|
||||
return 'os.system("rm -rf /")'
|
||||
|
||||
def test_initialization(self, approval_system):
|
||||
"""Test ApprovalSystem initialization."""
|
||||
assert approval_system.approval_history == []
|
||||
assert approval_system.user_preferences == {}
|
||||
assert approval_system.trust_patterns == {}
|
||||
assert approval_system.risk_thresholds["low_threshold"] == 0.3
|
||||
|
||||
def test_risk_analysis_low_risk(self, approval_system, mock_low_risk_code):
|
||||
"""Test risk analysis for low-risk code."""
|
||||
context = {}
|
||||
risk_analysis = approval_system._analyze_code_risk(mock_low_risk_code, context)
|
||||
|
||||
assert risk_analysis.risk_level == RiskLevel.LOW
|
||||
assert risk_analysis.severity_score < 0.3
|
||||
assert len(risk_analysis.reasons) == 0
|
||||
assert risk_analysis.confidence > 0.5
|
||||
|
||||
def test_risk_analysis_medium_risk(self, approval_system, mock_medium_risk_code):
|
||||
"""Test risk analysis for medium-risk code."""
|
||||
context = {}
|
||||
risk_analysis = approval_system._analyze_code_risk(mock_medium_risk_code, context)
|
||||
|
||||
assert risk_analysis.risk_level == RiskLevel.MEDIUM
|
||||
assert risk_analysis.severity_score >= 0.3
|
||||
assert len(risk_analysis.reasons) > 0
|
||||
assert "file_system" in risk_analysis.affected_resources
|
||||
|
||||
def test_risk_analysis_high_risk(self, approval_system, mock_high_risk_code):
|
||||
"""Test risk analysis for high-risk code."""
|
||||
context = {}
|
||||
risk_analysis = approval_system._analyze_code_risk(mock_high_risk_code, context)
|
||||
|
||||
assert risk_analysis.risk_level == RiskLevel.HIGH
|
||||
assert risk_analysis.severity_score >= 0.6
|
||||
assert len(risk_analysis.reasons) > 0
|
||||
assert "system_operations" in risk_analysis.affected_resources
|
||||
|
||||
def test_risk_analysis_blocked(self, approval_system, mock_blocked_code):
|
||||
"""Test risk analysis for blocked code."""
|
||||
context = {}
|
||||
risk_analysis = approval_system._analyze_code_risk(mock_blocked_code, context)
|
||||
|
||||
assert risk_analysis.risk_level == RiskLevel.BLOCKED
|
||||
assert any("blocked operation" in reason.lower() for reason in risk_analysis.reasons)
|
||||
|
||||
def test_operation_type_detection(self, approval_system):
|
||||
"""Test operation type detection."""
|
||||
assert approval_system._get_operation_type('print("hello")') == "output_operation"
|
||||
assert approval_system._get_operation_type("import os") == "module_import"
|
||||
assert approval_system._get_operation_type('os.system("ls")') == "system_command"
|
||||
assert approval_system._get_operation_type('open("file.txt")') == "file_operation"
|
||||
assert approval_system._get_operation_type("x = 5") == "code_execution"
|
||||
|
||||
def test_request_id_generation(self, approval_system):
|
||||
"""Test unique request ID generation."""
|
||||
code1 = 'print("test")'
|
||||
code2 = 'print("test")'
|
||||
|
||||
id1 = approval_system._generate_request_id(code1)
|
||||
time.sleep(0.01) # Small delay to ensure different timestamps
|
||||
id2 = approval_system._generate_request_id(code2)
|
||||
|
||||
assert id1 != id2 # Should be different due to timestamp
|
||||
assert len(id1) == 12 # MD5 hash truncated to 12 chars
|
||||
assert len(id2) == 12
|
||||
|
||||
@patch("builtins.input")
|
||||
def test_low_risk_approval_allow(self, mock_input, approval_system, mock_low_risk_code):
|
||||
"""Test low-risk approval with user allowing."""
|
||||
mock_input.return_value = "y"
|
||||
|
||||
result, decision = approval_system.request_approval(mock_low_risk_code)
|
||||
|
||||
assert result == ApprovalResult.APPROVED
|
||||
assert decision.user_input == "allowed"
|
||||
assert decision.request.risk_analysis.risk_level == RiskLevel.LOW
|
||||
|
||||
@patch("builtins.input")
|
||||
def test_low_risk_approval_deny(self, mock_input, approval_system, mock_low_risk_code):
|
||||
"""Test low-risk approval with user denying."""
|
||||
mock_input.return_value = "n"
|
||||
|
||||
result, decision = approval_system.request_approval(mock_low_risk_code)
|
||||
|
||||
assert result == ApprovalResult.DENIED
|
||||
assert decision.user_input == "denied"
|
||||
|
||||
@patch("builtins.input")
|
||||
def test_low_risk_approval_always(self, mock_input, approval_system, mock_low_risk_code):
|
||||
"""Test low-risk approval with 'always allow' preference."""
|
||||
mock_input.return_value = "a"
|
||||
|
||||
result, decision = approval_system.request_approval(mock_low_risk_code)
|
||||
|
||||
assert result == ApprovalResult.APPROVED
|
||||
assert decision.user_input == "allowed_always"
|
||||
assert decision.trust_updated == True
|
||||
assert "output_operation" in approval_system.user_preferences
|
||||
|
||||
@patch("builtins.input")
|
||||
def test_medium_risk_approval_details(self, mock_input, approval_system, mock_medium_risk_code):
|
||||
"""Test medium-risk approval requesting details."""
|
||||
mock_input.return_value = "d" # Request details first
|
||||
|
||||
with patch.object(approval_system, "_present_detailed_view") as mock_detailed:
|
||||
mock_detailed.return_value = "allowed"
|
||||
|
||||
result, decision = approval_system.request_approval(mock_medium_risk_code)
|
||||
|
||||
assert result == ApprovalResult.APPROVED
|
||||
mock_detailed.assert_called_once()
|
||||
|
||||
@patch("builtins.input")
|
||||
def test_high_risk_approval_confirm(self, mock_input, approval_system, mock_high_risk_code):
|
||||
"""Test high-risk approval with confirmation."""
|
||||
mock_input.return_value = "confirm"
|
||||
|
||||
result, decision = approval_system.request_approval(mock_high_risk_code)
|
||||
|
||||
assert result == ApprovalResult.APPROVED
|
||||
assert decision.request.risk_analysis.risk_level == RiskLevel.HIGH
|
||||
|
||||
@patch("builtins.input")
|
||||
def test_high_risk_approval_cancel(self, mock_input, approval_system, mock_high_risk_code):
|
||||
"""Test high-risk approval with cancellation."""
|
||||
mock_input.return_value = "cancel"
|
||||
|
||||
result, decision = approval_system.request_approval(mock_high_risk_code)
|
||||
|
||||
assert result == ApprovalResult.DENIED
|
||||
|
||||
@patch("builtins.print")
|
||||
def test_blocked_operation(self, mock_print, approval_system, mock_blocked_code):
|
||||
"""Test blocked operation handling."""
|
||||
result, decision = approval_system.request_approval(mock_blocked_code)
|
||||
|
||||
assert result == ApprovalResult.BLOCKED
|
||||
assert decision.request.risk_analysis.risk_level == RiskLevel.BLOCKED
|
||||
|
||||
def test_auto_approval_for_trusted_operation(self, approval_system, mock_low_risk_code):
|
||||
"""Test auto-approval for trusted operations."""
|
||||
# Set up user preference
|
||||
approval_system.user_preferences["output_operation"] = "auto_allow"
|
||||
|
||||
result, decision = approval_system.request_approval(mock_low_risk_code)
|
||||
|
||||
assert result == ApprovalResult.ALLOWED
|
||||
assert decision.user_input == "auto_allowed"
|
||||
|
||||
def test_approval_history(self, approval_system, mock_low_risk_code):
|
||||
"""Test approval history tracking."""
|
||||
# Add some decisions
|
||||
with patch("builtins.input", return_value="y"):
|
||||
approval_system.request_approval(mock_low_risk_code)
|
||||
approval_system.request_approval(mock_low_risk_code)
|
||||
|
||||
history = approval_system.get_approval_history(5)
|
||||
assert len(history) == 2
|
||||
assert all(isinstance(decision, ApprovalDecision) for decision in history)
|
||||
|
||||
def test_trust_patterns_learning(self, approval_system, mock_low_risk_code):
|
||||
"""Test trust pattern learning."""
|
||||
# Add approved decisions
|
||||
with patch("builtins.input", return_value="y"):
|
||||
for _ in range(3):
|
||||
approval_system.request_approval(mock_low_risk_code)
|
||||
|
||||
patterns = approval_system.get_trust_patterns()
|
||||
assert "output_operation" in patterns
|
||||
assert patterns["output_operation"] == 3
|
||||
|
||||
def test_preferences_reset(self, approval_system):
|
||||
"""Test preferences reset."""
|
||||
# Add some preferences
|
||||
approval_system.user_preferences = {"test": "value"}
|
||||
approval_system.reset_preferences()
|
||||
|
||||
assert approval_system.user_preferences == {}
|
||||
|
||||
def test_is_code_safe(self, approval_system, mock_low_risk_code, mock_high_risk_code):
|
||||
"""Test quick safety check."""
|
||||
assert approval_system.is_code_safe(mock_low_risk_code) == True
|
||||
assert approval_system.is_code_safe(mock_high_risk_code) == False
|
||||
|
||||
def test_context_awareness(self, approval_system, mock_low_risk_code):
|
||||
"""Test context-aware risk analysis."""
|
||||
# New user context should increase risk
|
||||
context_new_user = {"user_level": "new"}
|
||||
risk_new = approval_system._analyze_code_risk(mock_low_risk_code, context_new_user)
|
||||
|
||||
context_known_user = {"user_level": "known"}
|
||||
risk_known = approval_system._analyze_code_risk(mock_low_risk_code, context_known_user)
|
||||
|
||||
assert risk_new.severity_score > risk_known.severity_score
|
||||
assert "New user profile" in risk_new.reasons
|
||||
|
||||
def test_request_id_uniqueness(self, approval_system):
|
||||
"""Test that request IDs are unique even for same code."""
|
||||
code = 'print("test")'
|
||||
ids = []
|
||||
|
||||
for _ in range(10):
|
||||
rid = approval_system._generate_request_id(code)
|
||||
assert rid not in ids, f"Duplicate ID: {rid}"
|
||||
ids.append(rid)
|
||||
|
||||
def test_risk_score_accumulation(self, approval_system):
|
||||
"""Test that multiple risk factors accumulate."""
|
||||
# Code with multiple risk factors
|
||||
risky_code = """
|
||||
import os
|
||||
import subprocess
|
||||
os.system("ls")
|
||||
subprocess.call(["pwd"])
|
||||
"""
|
||||
risk_analysis = approval_system._analyze_code_risk(risky_code, {})
|
||||
|
||||
assert risk_analysis.severity_score > 0.5
|
||||
assert len(risk_analysis.reasons) >= 2
|
||||
assert "system_operations" in risk_analysis.affected_resources
|
||||
|
||||
@patch("builtins.input")
|
||||
def test_detailed_view_presentation(self, mock_input, approval_system, mock_medium_risk_code):
|
||||
"""Test detailed view presentation."""
|
||||
mock_input.return_value = "y"
|
||||
|
||||
# Create a request
|
||||
risk_analysis = approval_system._analyze_code_risk(mock_medium_risk_code, {})
|
||||
request = ApprovalRequest(
|
||||
code=mock_medium_risk_code,
|
||||
risk_analysis=risk_analysis,
|
||||
context={"test": "value"},
|
||||
timestamp=datetime.now(),
|
||||
request_id="test123",
|
||||
)
|
||||
|
||||
result = approval_system._present_detailed_view(request)
|
||||
assert result == "allowed"
|
||||
|
||||
@patch("builtins.input")
|
||||
def test_detailed_analysis_presentation(self, mock_input, approval_system, mock_high_risk_code):
|
||||
"""Test detailed analysis presentation."""
|
||||
mock_input.return_value = "confirm"
|
||||
|
||||
# Create a request
|
||||
risk_analysis = approval_system._analyze_code_risk(mock_high_risk_code, {})
|
||||
request = ApprovalRequest(
|
||||
code=mock_high_risk_code,
|
||||
risk_analysis=risk_analysis,
|
||||
context={},
|
||||
timestamp=datetime.now(),
|
||||
request_id="test456",
|
||||
)
|
||||
|
||||
result = approval_system._present_detailed_analysis(request)
|
||||
assert result == "allowed"
|
||||
|
||||
def test_error_handling_in_risk_analysis(self, approval_system):
|
||||
"""Test error handling in risk analysis."""
|
||||
# Test with None code (should not crash)
|
||||
try:
|
||||
risk_analysis = approval_system._analyze_code_risk(None, {})
|
||||
# Should still return a valid RiskAnalysis object
|
||||
assert isinstance(risk_analysis, RiskAnalysis)
|
||||
except Exception:
|
||||
# If it raises an exception, that's also acceptable behavior
|
||||
pass
|
||||
|
||||
def test_preferences_persistence(self, approval_system):
|
||||
"""Test preferences persistence simulation."""
|
||||
# Simulate loading preferences with error
|
||||
with patch.object(approval_system, "_load_preferences") as mock_load:
|
||||
mock_load.side_effect = Exception("Load error")
|
||||
|
||||
# Should not crash during initialization
|
||||
try:
|
||||
approval_system._load_preferences()
|
||||
except Exception:
|
||||
pass # Expected
|
||||
|
||||
# Simulate saving preferences with error
|
||||
with patch.object(approval_system, "_save_preferences") as mock_save:
|
||||
mock_save.side_effect = Exception("Save error")
|
||||
|
||||
# Should not crash when saving
|
||||
try:
|
||||
approval_system._save_preferences()
|
||||
except Exception:
|
||||
pass # Expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"code_pattern,expected_risk",
|
||||
[
|
||||
('print("hello")', RiskLevel.LOW),
|
||||
("import os", RiskLevel.MEDIUM),
|
||||
('os.system("ls")', RiskLevel.HIGH),
|
||||
("rm -rf /", RiskLevel.BLOCKED),
|
||||
('eval("x + 1")', RiskLevel.HIGH),
|
||||
('exec("print(1)")', RiskLevel.HIGH),
|
||||
('__import__("os")', RiskLevel.HIGH),
|
||||
],
|
||||
)
|
||||
def test_risk_patterns(self, approval_system, code_pattern, expected_risk):
|
||||
"""Test various code patterns for risk classification."""
|
||||
risk_analysis = approval_system._analyze_code_risk(code_pattern, {})
|
||||
|
||||
# Allow some flexibility in risk assessment
|
||||
if expected_risk == RiskLevel.HIGH:
|
||||
assert risk_analysis.risk_level in [RiskLevel.HIGH, RiskLevel.BLOCKED]
|
||||
else:
|
||||
assert risk_analysis.risk_level == expected_risk
|
||||
|
||||
def test_approval_decision_dataclass(self):
|
||||
"""Test ApprovalDecision dataclass."""
|
||||
now = datetime.now()
|
||||
request = ApprovalRequest(
|
||||
code='print("test")',
|
||||
risk_analysis=RiskAnalysis(
|
||||
risk_level=RiskLevel.LOW,
|
||||
confidence=0.8,
|
||||
reasons=[],
|
||||
affected_resources=[],
|
||||
severity_score=0.1,
|
||||
),
|
||||
context={},
|
||||
timestamp=now,
|
||||
request_id="test123",
|
||||
)
|
||||
|
||||
decision = ApprovalDecision(
|
||||
request=request,
|
||||
result=ApprovalResult.APPROVED,
|
||||
user_input="y",
|
||||
timestamp=now,
|
||||
trust_updated=False,
|
||||
)
|
||||
|
||||
assert decision.request == request
|
||||
assert decision.result == ApprovalResult.APPROVED
|
||||
assert decision.user_input == "y"
|
||||
assert decision.timestamp == now
|
||||
assert decision.trust_updated == False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,403 +0,0 @@
|
||||
"""
|
||||
Tests for SandboxManager with Docker integration
|
||||
|
||||
Test suite for enhanced SandboxManager that includes Docker-based
|
||||
container execution with fallback to local execution.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch, call
|
||||
|
||||
from src.mai.sandbox.manager import SandboxManager, ExecutionRequest, ExecutionResult
|
||||
from src.mai.sandbox.risk_analyzer import RiskAssessment, RiskPattern
|
||||
from src.mai.sandbox.resource_enforcer import ResourceUsage, ResourceLimits
|
||||
from src.mai.sandbox.docker_executor import ContainerResult, ContainerConfig
|
||||
|
||||
|
||||
class TestSandboxManagerDockerIntegration:
|
||||
"""Test SandboxManager Docker integration features"""
|
||||
|
||||
@pytest.fixture
|
||||
def sandbox_manager(self):
|
||||
"""Create SandboxManager instance for testing"""
|
||||
return SandboxManager()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_docker_executor(self):
|
||||
"""Create mock Docker executor"""
|
||||
mock_executor = Mock()
|
||||
mock_executor.is_available.return_value = True
|
||||
mock_executor.execute_code.return_value = ContainerResult(
|
||||
success=True,
|
||||
container_id="test-container-id",
|
||||
exit_code=0,
|
||||
stdout="Hello from Docker!",
|
||||
stderr="",
|
||||
execution_time=1.2,
|
||||
resource_usage={"cpu_percent": 45.0, "memory_usage_mb": 32.0},
|
||||
)
|
||||
mock_executor.get_system_info.return_value = {
|
||||
"available": True,
|
||||
"version": "20.10.7",
|
||||
"containers": 3,
|
||||
}
|
||||
return mock_executor
|
||||
|
||||
def test_execution_request_with_docker_options(self):
|
||||
"""Test ExecutionRequest with Docker-specific options"""
|
||||
request = ExecutionRequest(
|
||||
code="print('test')",
|
||||
use_docker=True,
|
||||
docker_image="python:3.9-alpine",
|
||||
timeout_seconds=45,
|
||||
network_allowed=True,
|
||||
additional_files={"data.txt": "test content"},
|
||||
)
|
||||
|
||||
assert request.use_docker is True
|
||||
assert request.docker_image == "python:3.9-alpine"
|
||||
assert request.timeout_seconds == 45
|
||||
assert request.network_allowed is True
|
||||
assert request.additional_files == {"data.txt": "test content"}
|
||||
|
||||
def test_execution_result_with_docker_info(self):
|
||||
"""Test ExecutionResult includes Docker execution info"""
|
||||
container_result = ContainerResult(
|
||||
success=True,
|
||||
container_id="test-id",
|
||||
exit_code=0,
|
||||
stdout="Docker output",
|
||||
execution_time=1.5,
|
||||
)
|
||||
|
||||
result = ExecutionResult(
|
||||
success=True,
|
||||
execution_id="test-exec",
|
||||
output="Docker output",
|
||||
execution_method="docker",
|
||||
container_result=container_result,
|
||||
)
|
||||
|
||||
assert result.execution_method == "docker"
|
||||
assert result.container_result == container_result
|
||||
assert result.container_result.container_id == "test-id"
|
||||
|
||||
def test_execute_code_with_docker_available(self, sandbox_manager):
|
||||
"""Test code execution when Docker is available"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(sandbox_manager.risk_analyzer, "analyze_ast") as mock_risk:
|
||||
with patch.object(sandbox_manager.docker_executor, "execute_code") as mock_docker:
|
||||
with patch.object(sandbox_manager.audit_logger, "log_execution") as mock_log:
|
||||
# Mock risk analysis (allow execution)
|
||||
mock_risk.return_value = RiskAssessment(
|
||||
score=20, patterns=[], safe_to_execute=True, approval_required=False
|
||||
)
|
||||
|
||||
# Mock Docker execution
|
||||
mock_docker.return_value = {
|
||||
"success": True,
|
||||
"output": "Hello from Docker!",
|
||||
"container_result": ContainerResult(
|
||||
success=True,
|
||||
container_id="test-container",
|
||||
exit_code=0,
|
||||
stdout="Hello from Docker!",
|
||||
),
|
||||
}
|
||||
|
||||
# Execute request with Docker
|
||||
request = ExecutionRequest(
|
||||
code="print('Hello from Docker!')", use_docker=True
|
||||
)
|
||||
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify Docker was used
|
||||
assert result.execution_method == "docker"
|
||||
assert result.success is True
|
||||
assert result.output == "Hello from Docker!"
|
||||
assert result.container_result is not None
|
||||
|
||||
# Verify Docker executor was called
|
||||
mock_docker.assert_called_once()
|
||||
|
||||
def test_execute_code_fallback_to_local(self, sandbox_manager):
|
||||
"""Test fallback to local execution when Docker unavailable"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=False):
|
||||
with patch.object(sandbox_manager.risk_analyzer, "analyze_ast") as mock_risk:
|
||||
with patch.object(sandbox_manager, "_execute_in_sandbox") as mock_local:
|
||||
with patch.object(
|
||||
sandbox_manager.resource_enforcer, "stop_monitoring"
|
||||
) as mock_monitoring:
|
||||
# Mock risk analysis (allow execution)
|
||||
mock_risk.return_value = RiskAssessment(
|
||||
score=20, patterns=[], safe_to_execute=True, approval_required=False
|
||||
)
|
||||
|
||||
# Mock local execution
|
||||
mock_local.return_value = {"success": True, "output": "Hello from local!"}
|
||||
|
||||
# Mock resource monitoring
|
||||
mock_monitoring.return_value = ResourceUsage(
|
||||
cpu_percent=25.0,
|
||||
memory_percent=30.0,
|
||||
memory_used_gb=0.5,
|
||||
elapsed_seconds=1.0,
|
||||
approaching_limits=False,
|
||||
)
|
||||
|
||||
# Execute request preferring Docker
|
||||
request = ExecutionRequest(
|
||||
code="print('Hello')",
|
||||
use_docker=True, # But Docker is unavailable
|
||||
)
|
||||
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify fallback to local execution
|
||||
assert result.execution_method == "local"
|
||||
assert result.success is True
|
||||
assert result.output == "Hello from local!"
|
||||
assert result.container_result is None
|
||||
|
||||
# Verify local execution was used
|
||||
mock_local.assert_called_once()
|
||||
|
||||
def test_execute_code_local_preference(self, sandbox_manager):
|
||||
"""Test explicit preference for local execution"""
|
||||
with patch.object(sandbox_manager.risk_analyzer, "analyze_ast") as mock_risk:
|
||||
with patch.object(sandbox_manager, "_execute_in_sandbox") as mock_local:
|
||||
# Mock risk analysis (allow execution)
|
||||
mock_risk.return_value = RiskAssessment(
|
||||
score=20, patterns=[], safe_to_execute=True, approval_required=False
|
||||
)
|
||||
|
||||
# Mock local execution
|
||||
mock_local.return_value = {"success": True, "output": "Local execution"}
|
||||
|
||||
# Execute request explicitly preferring local
|
||||
request = ExecutionRequest(
|
||||
code="print('Local')",
|
||||
use_docker=False, # Explicitly prefer local
|
||||
)
|
||||
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify local execution was used
|
||||
assert result.execution_method == "local"
|
||||
assert result.success is True
|
||||
|
||||
# Docker executor should not be called
|
||||
sandbox_manager.docker_executor.execute_code.assert_not_called()
|
||||
|
||||
def test_build_docker_config_from_request(self, sandbox_manager):
|
||||
"""Test building Docker config from execution request"""
|
||||
from src.mai.sandbox.docker_executor import ContainerConfig
|
||||
|
||||
# Use the actual method from DockerExecutor
|
||||
config = sandbox_manager.docker_executor._build_container_config(
|
||||
ContainerConfig(
|
||||
memory_limit="256m", cpu_limit="0.8", network_disabled=False, timeout_seconds=60
|
||||
),
|
||||
{"TEST_VAR": "value"},
|
||||
)
|
||||
|
||||
assert config["mem_limit"] == "256m"
|
||||
assert config["cpu_quota"] == 80000
|
||||
assert config["network_disabled"] is False
|
||||
assert config["security_opt"] is not None
|
||||
assert "TEST_VAR" in config["environment"]
|
||||
|
||||
def test_get_docker_status(self, sandbox_manager, mock_docker_executor):
|
||||
"""Test getting Docker status information"""
|
||||
sandbox_manager.docker_executor = mock_docker_executor
|
||||
|
||||
status = sandbox_manager.get_docker_status()
|
||||
|
||||
assert "available" in status
|
||||
assert "images" in status
|
||||
assert "system_info" in status
|
||||
assert status["available"] is True
|
||||
assert status["system_info"]["available"] is True
|
||||
|
||||
def test_pull_docker_image(self, sandbox_manager, mock_docker_executor):
|
||||
"""Test pulling Docker image"""
|
||||
sandbox_manager.docker_executor = mock_docker_executor
|
||||
mock_docker_executor.pull_image.return_value = True
|
||||
|
||||
result = sandbox_manager.pull_docker_image("python:3.9-slim")
|
||||
|
||||
assert result is True
|
||||
mock_docker_executor.pull_image.assert_called_once_with("python:3.9-slim")
|
||||
|
||||
def test_cleanup_docker_containers(self, sandbox_manager, mock_docker_executor):
|
||||
"""Test cleaning up Docker containers"""
|
||||
sandbox_manager.docker_executor = mock_docker_executor
|
||||
mock_docker_executor.cleanup_containers.return_value = 3
|
||||
|
||||
result = sandbox_manager.cleanup_docker_containers()
|
||||
|
||||
assert result == 3
|
||||
mock_docker_executor.cleanup_containers.assert_called_once()
|
||||
|
||||
def test_get_system_status_includes_docker(self, sandbox_manager, mock_docker_executor):
|
||||
"""Test system status includes Docker information"""
|
||||
sandbox_manager.docker_executor = mock_docker_executor
|
||||
|
||||
with patch.object(sandbox_manager, "verify_log_integrity", return_value=True):
|
||||
status = sandbox_manager.get_system_status()
|
||||
|
||||
assert "docker_available" in status
|
||||
assert "docker_info" in status
|
||||
assert status["docker_available"] is True
|
||||
assert status["docker_info"]["available"] is True
|
||||
|
||||
def test_execute_code_with_additional_files(self, sandbox_manager):
|
||||
"""Test code execution with additional files in Docker"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(sandbox_manager.risk_analyzer, "analyze_ast") as mock_risk:
|
||||
with patch.object(sandbox_manager.docker_executor, "execute_code") as mock_docker:
|
||||
# Mock risk analysis (allow execution)
|
||||
mock_risk.return_value = RiskAssessment(
|
||||
score=20, patterns=[], safe_to_execute=True, approval_required=False
|
||||
)
|
||||
|
||||
# Mock Docker execution
|
||||
mock_docker.return_value = {
|
||||
"success": True,
|
||||
"output": "Processed files",
|
||||
"container_result": ContainerResult(
|
||||
success=True,
|
||||
container_id="test-container",
|
||||
exit_code=0,
|
||||
stdout="Processed files",
|
||||
),
|
||||
}
|
||||
|
||||
# Execute request with additional files
|
||||
request = ExecutionRequest(
|
||||
code="with open('data.txt', 'r') as f: print(f.read())",
|
||||
use_docker=True,
|
||||
additional_files={"data.txt": "test data content"},
|
||||
)
|
||||
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify Docker executor was called with files
|
||||
mock_docker.assert_called_once()
|
||||
call_args = mock_docker.call_args
|
||||
assert "files" in call_args.kwargs
|
||||
assert call_args.kwargs["files"] == {"data.txt": "test data content"}
|
||||
|
||||
assert result.success is True
|
||||
assert result.execution_method == "docker"
|
||||
|
||||
def test_risk_analysis_blocks_docker_execution(self, sandbox_manager):
|
||||
"""Test that high-risk code is blocked even with Docker"""
|
||||
with patch.object(sandbox_manager.risk_analyzer, "analyze_ast") as mock_risk:
|
||||
# Mock high-risk analysis (block execution)
|
||||
mock_risk.return_value = RiskAssessment(
|
||||
score=85,
|
||||
patterns=[
|
||||
RiskPattern(
|
||||
pattern="os.system",
|
||||
severity="BLOCKED",
|
||||
score=50,
|
||||
line_number=1,
|
||||
description="System command execution",
|
||||
)
|
||||
],
|
||||
safe_to_execute=False,
|
||||
approval_required=True,
|
||||
)
|
||||
|
||||
# Execute risky code with Docker preference
|
||||
request = ExecutionRequest(code="os.system('rm -rf /')", use_docker=True)
|
||||
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify execution was blocked
|
||||
assert result.success is False
|
||||
assert "blocked" in result.error.lower()
|
||||
assert result.risk_assessment.score == 85
|
||||
assert result.execution_method == "local" # Default before Docker check
|
||||
|
||||
# Docker should not be called for blocked code
|
||||
sandbox_manager.docker_executor.execute_code.assert_not_called()
|
||||
|
||||
|
||||
class TestSandboxManagerDockerEdgeCases:
|
||||
"""Test edge cases and error handling in Docker integration"""
|
||||
|
||||
@pytest.fixture
|
||||
def sandbox_manager(self):
|
||||
"""Create SandboxManager instance for testing"""
|
||||
return SandboxManager()
|
||||
|
||||
def test_docker_executor_error_handling(self, sandbox_manager):
|
||||
"""Test handling of Docker executor errors"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(sandbox_manager.risk_analyzer, "analyze_ast") as mock_risk:
|
||||
with patch.object(sandbox_manager.docker_executor, "execute_code") as mock_docker:
|
||||
# Mock risk analysis (allow execution)
|
||||
mock_risk.return_value = RiskAssessment(
|
||||
score=20, patterns=[], safe_to_execute=True, approval_required=False
|
||||
)
|
||||
|
||||
# Mock Docker executor error
|
||||
mock_docker.return_value = {
|
||||
"success": False,
|
||||
"error": "Docker daemon not available",
|
||||
"container_result": None,
|
||||
}
|
||||
|
||||
request = ExecutionRequest(code="print('test')", use_docker=True)
|
||||
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify error handling
|
||||
assert result.success is False
|
||||
assert result.execution_method == "docker"
|
||||
assert "Docker daemon not available" in result.error
|
||||
|
||||
def test_container_resource_usage_integration(self, sandbox_manager):
|
||||
"""Test integration of container resource usage"""
|
||||
with patch.object(sandbox_manager.docker_executor, "is_available", return_value=True):
|
||||
with patch.object(sandbox_manager.risk_analyzer, "analyze_ast") as mock_risk:
|
||||
with patch.object(sandbox_manager.docker_executor, "execute_code") as mock_docker:
|
||||
# Mock risk analysis (allow execution)
|
||||
mock_risk.return_value = RiskAssessment(
|
||||
score=20, patterns=[], safe_to_execute=True, approval_required=False
|
||||
)
|
||||
|
||||
# Mock Docker execution with resource usage
|
||||
container_result = ContainerResult(
|
||||
success=True,
|
||||
container_id="test-container",
|
||||
exit_code=0,
|
||||
stdout="test output",
|
||||
resource_usage={
|
||||
"cpu_percent": 35.5,
|
||||
"memory_usage_mb": 64.2,
|
||||
"memory_percent": 12.5,
|
||||
},
|
||||
)
|
||||
|
||||
mock_docker.return_value = {
|
||||
"success": True,
|
||||
"output": "test output",
|
||||
"container_result": container_result,
|
||||
}
|
||||
|
||||
request = ExecutionRequest(code="print('test')", use_docker=True)
|
||||
|
||||
result = sandbox_manager.execute_code(request)
|
||||
|
||||
# Verify resource usage is preserved
|
||||
assert result.container_result.resource_usage["cpu_percent"] == 35.5
|
||||
assert result.container_result.resource_usage["memory_usage_mb"] == 64.2
|
||||
assert result.container_result.resource_usage["memory_percent"] == 12.5
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
@@ -1,2 +0,0 @@
|
||||
def test_smoke() -> None:
|
||||
assert True
|
||||
Reference in New Issue
Block a user