feat(03-03): integrate proactive scaling into ModelManager
Some checks failed
Discord Webhook / git (push) Has been cancelled
Some checks failed
Discord Webhook / git (push) Has been cancelled
- Added ProactiveScaler integration with HardwareTierDetector - Implemented pre-flight resource checks before model inference - Enhanced model selection with scaling recommendations - Added graceful degradation handling for resource constraints - Integrated performance metrics tracking for scaling decisions - Added proactive upgrade execution with stabilization periods - Enhanced status reporting with scaling information - Maintained silent switching behavior per Phase 1 decisions
This commit is contained in:
@@ -10,6 +10,8 @@ from pathlib import Path
|
|||||||
from .lmstudio_adapter import LMStudioAdapter
|
from .lmstudio_adapter import LMStudioAdapter
|
||||||
from .resource_monitor import ResourceMonitor
|
from .resource_monitor import ResourceMonitor
|
||||||
from .context_manager import ContextManager
|
from .context_manager import ContextManager
|
||||||
|
from ..resource.scaling import ProactiveScaler, ScalingDecision
|
||||||
|
from ..resource.tiers import HardwareTierDetector
|
||||||
|
|
||||||
|
|
||||||
class ModelManager:
|
class ModelManager:
|
||||||
@@ -39,6 +41,26 @@ class ModelManager:
|
|||||||
self.lm_adapter = LMStudioAdapter()
|
self.lm_adapter = LMStudioAdapter()
|
||||||
self.resource_monitor = ResourceMonitor()
|
self.resource_monitor = ResourceMonitor()
|
||||||
self.context_manager = ContextManager()
|
self.context_manager = ContextManager()
|
||||||
|
self.tier_detector = HardwareTierDetector()
|
||||||
|
|
||||||
|
# Initialize proactive scaler
|
||||||
|
self._proactive_scaler = ProactiveScaler(
|
||||||
|
resource_monitor=self.resource_monitor,
|
||||||
|
tier_detector=self.tier_detector,
|
||||||
|
upgrade_threshold=0.8,
|
||||||
|
downgrade_threshold=0.9,
|
||||||
|
stabilization_minutes=5,
|
||||||
|
monitoring_interval=2.0,
|
||||||
|
trend_window_minutes=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set callback for scaling decisions
|
||||||
|
self._proactive_scaler.set_scaling_callback(
|
||||||
|
self._handle_proactive_scaling_decision
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start continuous monitoring
|
||||||
|
self._proactive_scaler.start_continuous_monitoring()
|
||||||
|
|
||||||
# Current model state
|
# Current model state
|
||||||
self.current_model_key: Optional[str] = None
|
self.current_model_key: Optional[str] = None
|
||||||
@@ -141,8 +163,17 @@ class ModelManager:
|
|||||||
Selected model key or None if no suitable model found
|
Selected model key or None if no suitable model found
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Get current resources
|
# Get current resources and scaling recommendations
|
||||||
resources = self.resource_monitor.get_current_resources()
|
resources = self.resource_monitor.get_current_resources()
|
||||||
|
scaling_status = self._proactive_scaler.get_scaling_status()
|
||||||
|
|
||||||
|
# Apply proactive scaling recommendations
|
||||||
|
if scaling_status.get("degradation_needed", False):
|
||||||
|
# Prefer smaller models if degradation is needed
|
||||||
|
self.logger.debug("Degradation needed, prioritizing smaller models")
|
||||||
|
elif scaling_status.get("upgrade_available", False):
|
||||||
|
# Consider larger models if upgrade is available
|
||||||
|
self.logger.debug("Upgrade available, considering larger models")
|
||||||
|
|
||||||
# Filter models that can fit current resources
|
# Filter models that can fit current resources
|
||||||
suitable_models = []
|
suitable_models = []
|
||||||
@@ -329,6 +360,31 @@ class ModelManager:
|
|||||||
Generated response text
|
Generated response text
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
# Pre-flight resource check
|
||||||
|
can_proceed, reason = self._proactive_scaler.check_preflight_resources(
|
||||||
|
"model_inference"
|
||||||
|
)
|
||||||
|
if not can_proceed:
|
||||||
|
# Handle resource constraints gracefully
|
||||||
|
degradation_target = (
|
||||||
|
self._proactive_scaler.initiate_graceful_degradation(
|
||||||
|
f"Pre-flight check failed: {reason}", immediate=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if degradation_target:
|
||||||
|
# Switch to smaller model for this response
|
||||||
|
smaller_model_key = self._find_model_by_size(degradation_target)
|
||||||
|
if (
|
||||||
|
smaller_model_key
|
||||||
|
and smaller_model_key != self.current_model_key
|
||||||
|
):
|
||||||
|
await self.switch_model(smaller_model_key)
|
||||||
|
self.logger.info(
|
||||||
|
f"Switched to smaller model {smaller_model_key} due to resource constraints"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return "I'm experiencing resource constraints and cannot generate a response right now."
|
||||||
|
|
||||||
# Ensure we have a model loaded
|
# Ensure we have a model loaded
|
||||||
if not self.current_model_instance:
|
if not self.current_model_instance:
|
||||||
await self._ensure_model_loaded(conversation_context)
|
await self._ensure_model_loaded(conversation_context)
|
||||||
@@ -368,6 +424,13 @@ class ModelManager:
|
|||||||
conversation_id, MessageRole.ASSISTANT, response
|
conversation_id, MessageRole.ASSISTANT, response
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Update performance metrics for proactive scaling
|
||||||
|
self._proactive_scaler.update_performance_metrics(
|
||||||
|
operation_type="model_inference",
|
||||||
|
duration_ms=response_time_ms,
|
||||||
|
success=True,
|
||||||
|
)
|
||||||
|
|
||||||
# Check if we should consider switching (slow response or struggling)
|
# Check if we should consider switching (slow response or struggling)
|
||||||
if await self._should_consider_switching(response_time_ms, response):
|
if await self._should_consider_switching(response_time_ms, response):
|
||||||
await self._proactive_model_switch(conversation_context)
|
await self._proactive_model_switch(conversation_context)
|
||||||
@@ -375,8 +438,16 @@ class ModelManager:
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
response_time_ms = (time.time() - start_time) * 1000
|
||||||
self.logger.warning(f"Model generation failed: {e}")
|
self.logger.warning(f"Model generation failed: {e}")
|
||||||
|
|
||||||
|
# Update performance metrics for failure
|
||||||
|
self._proactive_scaler.update_performance_metrics(
|
||||||
|
operation_type="model_inference",
|
||||||
|
duration_ms=response_time_ms,
|
||||||
|
success=False,
|
||||||
|
)
|
||||||
|
|
||||||
# Try switching to a different model
|
# Try switching to a different model
|
||||||
if await self._handle_model_failure(conversation_context):
|
if await self._handle_model_failure(conversation_context):
|
||||||
# Retry with new model
|
# Retry with new model
|
||||||
@@ -402,6 +473,9 @@ class ModelManager:
|
|||||||
"resources": self.resource_monitor.get_current_resources(),
|
"resources": self.resource_monitor.get_current_resources(),
|
||||||
"available_models": len(self.available_models),
|
"available_models": len(self.available_models),
|
||||||
"recent_failures": dict(self._failure_count),
|
"recent_failures": dict(self._failure_count),
|
||||||
|
"scaling": self._proactive_scaler.get_scaling_status()
|
||||||
|
if hasattr(self, "_proactive_scaler")
|
||||||
|
else {},
|
||||||
}
|
}
|
||||||
|
|
||||||
if (
|
if (
|
||||||
@@ -462,8 +536,17 @@ class ModelManager:
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Ensure we have a model loaded, selecting one if needed."""
|
"""Ensure we have a model loaded, selecting one if needed."""
|
||||||
if not self.current_model_instance:
|
if not self.current_model_instance:
|
||||||
|
# Get scaling recommendations for initial load
|
||||||
|
scaling_status = self._proactive_scaler.get_scaling_status()
|
||||||
|
|
||||||
|
# Select best model considering scaling constraints
|
||||||
best_model = self.select_best_model(conversation_context)
|
best_model = self.select_best_model(conversation_context)
|
||||||
if best_model:
|
if best_model:
|
||||||
|
# Set current model size in proactive scaler
|
||||||
|
model_config = self.model_configurations.get(best_model, {})
|
||||||
|
model_size = model_config.get("category", "unknown")
|
||||||
|
self._proactive_scaler._current_model_size = model_size
|
||||||
|
|
||||||
await self.switch_model(best_model)
|
await self.switch_model(best_model)
|
||||||
|
|
||||||
async def _should_consider_switching(
|
async def _should_consider_switching(
|
||||||
@@ -592,9 +675,110 @@ class ModelManager:
|
|||||||
|
|
||||||
return "\n".join(formatted_parts)
|
return "\n".join(formatted_parts)
|
||||||
|
|
||||||
|
def _handle_proactive_scaling_decision(self, scaling_event) -> None:
|
||||||
|
"""Handle proactive scaling decision from ProactiveScaler.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
scaling_event: ScalingEvent from ProactiveScaler
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if scaling_event.decision == ScalingDecision.UPGRADE:
|
||||||
|
# Proactive upgrade to larger model
|
||||||
|
target_model_key = self._find_model_by_size(
|
||||||
|
scaling_event.new_model_size
|
||||||
|
)
|
||||||
|
if target_model_key and target_model_key != self.current_model_key:
|
||||||
|
self.logger.info(
|
||||||
|
f"Executing proactive upgrade to {target_model_key}"
|
||||||
|
)
|
||||||
|
# Schedule upgrade for next response (not immediate)
|
||||||
|
asyncio.create_task(
|
||||||
|
self._execute_proactive_upgrade(target_model_key)
|
||||||
|
)
|
||||||
|
|
||||||
|
elif scaling_event.decision == ScalingDecision.DOWNGRADE:
|
||||||
|
# Immediate degradation to smaller model
|
||||||
|
target_model_key = self._find_model_by_size(
|
||||||
|
scaling_event.new_model_size
|
||||||
|
)
|
||||||
|
if target_model_key:
|
||||||
|
self.logger.warning(
|
||||||
|
f"Executing degradation to {target_model_key}: {scaling_event.reason}"
|
||||||
|
)
|
||||||
|
# Switch immediately for degradation
|
||||||
|
asyncio.create_task(self.switch_model(target_model_key))
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error handling scaling decision: {e}")
|
||||||
|
|
||||||
|
def _find_model_by_size(self, target_size: str) -> Optional[str]:
|
||||||
|
"""Find model key by size category.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
target_size: Target model size ("small", "medium", "large")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Model key or None if not found
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# First, try to match by category in configurations
|
||||||
|
for model_key, config in self.model_configurations.items():
|
||||||
|
if config.get("category") == target_size:
|
||||||
|
# Check if model is available
|
||||||
|
for available_model in self.available_models:
|
||||||
|
if available_model["key"] == model_key and available_model.get(
|
||||||
|
"available", False
|
||||||
|
):
|
||||||
|
return model_key
|
||||||
|
|
||||||
|
# If no exact match, use preferred models from tier detector
|
||||||
|
current_tier = self.tier_detector.detect_current_tier()
|
||||||
|
preferred_models = self.tier_detector.get_preferred_models(current_tier)
|
||||||
|
|
||||||
|
# Find model of target size in preferred list
|
||||||
|
for preferred_model in preferred_models:
|
||||||
|
if preferred_model in self.model_configurations:
|
||||||
|
config = self.model_configurations[preferred_model]
|
||||||
|
if config.get("category") == target_size:
|
||||||
|
return preferred_model
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error finding model by size {target_size}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _execute_proactive_upgrade(self, target_model_key: str) -> None:
|
||||||
|
"""Execute proactive model upgrade with proper timing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
target_model_key: Model to upgrade to
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Only upgrade if not currently switching and enough time has passed
|
||||||
|
if hasattr(self, "_upgrade_in_progress") and self._upgrade_in_progress:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._upgrade_in_progress = True
|
||||||
|
|
||||||
|
success = await self.switch_model(target_model_key)
|
||||||
|
if success:
|
||||||
|
self.logger.info(f"Proactive upgrade completed: {target_model_key}")
|
||||||
|
else:
|
||||||
|
self.logger.warning(f"Proactive upgrade failed: {target_model_key}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error executing proactive upgrade: {e}")
|
||||||
|
finally:
|
||||||
|
self._upgrade_in_progress = False
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
def shutdown(self) -> None:
|
||||||
"""Clean up resources and unload models."""
|
"""Clean up resources and unload models."""
|
||||||
try:
|
try:
|
||||||
|
# Stop proactive scaling monitoring
|
||||||
|
if hasattr(self, "_proactive_scaler"):
|
||||||
|
self._proactive_scaler.stop_continuous_monitoring()
|
||||||
|
|
||||||
if self.current_model_instance and self.current_model_key:
|
if self.current_model_instance and self.current_model_key:
|
||||||
self.lm_adapter.unload_model(self.current_model_key)
|
self.lm_adapter.unload_model(self.current_model_key)
|
||||||
self.current_model_key = None
|
self.current_model_key = None
|
||||||
|
|||||||
Reference in New Issue
Block a user