diff --git a/src/models/model_manager.py b/src/models/model_manager.py index 2329b7f..f981a87 100644 --- a/src/models/model_manager.py +++ b/src/models/model_manager.py @@ -10,6 +10,8 @@ from pathlib import Path from .lmstudio_adapter import LMStudioAdapter from .resource_monitor import ResourceMonitor from .context_manager import ContextManager +from ..resource.scaling import ProactiveScaler, ScalingDecision +from ..resource.tiers import HardwareTierDetector class ModelManager: @@ -39,6 +41,26 @@ class ModelManager: self.lm_adapter = LMStudioAdapter() self.resource_monitor = ResourceMonitor() self.context_manager = ContextManager() + self.tier_detector = HardwareTierDetector() + + # Initialize proactive scaler + self._proactive_scaler = ProactiveScaler( + resource_monitor=self.resource_monitor, + tier_detector=self.tier_detector, + upgrade_threshold=0.8, + downgrade_threshold=0.9, + stabilization_minutes=5, + monitoring_interval=2.0, + trend_window_minutes=10, + ) + + # Set callback for scaling decisions + self._proactive_scaler.set_scaling_callback( + self._handle_proactive_scaling_decision + ) + + # Start continuous monitoring + self._proactive_scaler.start_continuous_monitoring() # Current model state self.current_model_key: Optional[str] = None @@ -141,8 +163,17 @@ class ModelManager: Selected model key or None if no suitable model found """ try: - # Get current resources + # Get current resources and scaling recommendations resources = self.resource_monitor.get_current_resources() + scaling_status = self._proactive_scaler.get_scaling_status() + + # Apply proactive scaling recommendations + if scaling_status.get("degradation_needed", False): + # Prefer smaller models if degradation is needed + self.logger.debug("Degradation needed, prioritizing smaller models") + elif scaling_status.get("upgrade_available", False): + # Consider larger models if upgrade is available + self.logger.debug("Upgrade available, considering larger models") # Filter models that can fit current resources suitable_models = [] @@ -329,6 +360,31 @@ class ModelManager: Generated response text """ try: + # Pre-flight resource check + can_proceed, reason = self._proactive_scaler.check_preflight_resources( + "model_inference" + ) + if not can_proceed: + # Handle resource constraints gracefully + degradation_target = ( + self._proactive_scaler.initiate_graceful_degradation( + f"Pre-flight check failed: {reason}", immediate=True + ) + ) + if degradation_target: + # Switch to smaller model for this response + smaller_model_key = self._find_model_by_size(degradation_target) + if ( + smaller_model_key + and smaller_model_key != self.current_model_key + ): + await self.switch_model(smaller_model_key) + self.logger.info( + f"Switched to smaller model {smaller_model_key} due to resource constraints" + ) + else: + return "I'm experiencing resource constraints and cannot generate a response right now." + # Ensure we have a model loaded if not self.current_model_instance: await self._ensure_model_loaded(conversation_context) @@ -368,6 +424,13 @@ class ModelManager: conversation_id, MessageRole.ASSISTANT, response ) + # Update performance metrics for proactive scaling + self._proactive_scaler.update_performance_metrics( + operation_type="model_inference", + duration_ms=response_time_ms, + success=True, + ) + # Check if we should consider switching (slow response or struggling) if await self._should_consider_switching(response_time_ms, response): await self._proactive_model_switch(conversation_context) @@ -375,8 +438,16 @@ class ModelManager: return response except Exception as e: + response_time_ms = (time.time() - start_time) * 1000 self.logger.warning(f"Model generation failed: {e}") + # Update performance metrics for failure + self._proactive_scaler.update_performance_metrics( + operation_type="model_inference", + duration_ms=response_time_ms, + success=False, + ) + # Try switching to a different model if await self._handle_model_failure(conversation_context): # Retry with new model @@ -402,6 +473,9 @@ class ModelManager: "resources": self.resource_monitor.get_current_resources(), "available_models": len(self.available_models), "recent_failures": dict(self._failure_count), + "scaling": self._proactive_scaler.get_scaling_status() + if hasattr(self, "_proactive_scaler") + else {}, } if ( @@ -462,8 +536,17 @@ class ModelManager: ) -> None: """Ensure we have a model loaded, selecting one if needed.""" if not self.current_model_instance: + # Get scaling recommendations for initial load + scaling_status = self._proactive_scaler.get_scaling_status() + + # Select best model considering scaling constraints best_model = self.select_best_model(conversation_context) if best_model: + # Set current model size in proactive scaler + model_config = self.model_configurations.get(best_model, {}) + model_size = model_config.get("category", "unknown") + self._proactive_scaler._current_model_size = model_size + await self.switch_model(best_model) async def _should_consider_switching( @@ -592,9 +675,110 @@ class ModelManager: return "\n".join(formatted_parts) + def _handle_proactive_scaling_decision(self, scaling_event) -> None: + """Handle proactive scaling decision from ProactiveScaler. + + Args: + scaling_event: ScalingEvent from ProactiveScaler + """ + try: + if scaling_event.decision == ScalingDecision.UPGRADE: + # Proactive upgrade to larger model + target_model_key = self._find_model_by_size( + scaling_event.new_model_size + ) + if target_model_key and target_model_key != self.current_model_key: + self.logger.info( + f"Executing proactive upgrade to {target_model_key}" + ) + # Schedule upgrade for next response (not immediate) + asyncio.create_task( + self._execute_proactive_upgrade(target_model_key) + ) + + elif scaling_event.decision == ScalingDecision.DOWNGRADE: + # Immediate degradation to smaller model + target_model_key = self._find_model_by_size( + scaling_event.new_model_size + ) + if target_model_key: + self.logger.warning( + f"Executing degradation to {target_model_key}: {scaling_event.reason}" + ) + # Switch immediately for degradation + asyncio.create_task(self.switch_model(target_model_key)) + + except Exception as e: + self.logger.error(f"Error handling scaling decision: {e}") + + def _find_model_by_size(self, target_size: str) -> Optional[str]: + """Find model key by size category. + + Args: + target_size: Target model size ("small", "medium", "large") + + Returns: + Model key or None if not found + """ + try: + # First, try to match by category in configurations + for model_key, config in self.model_configurations.items(): + if config.get("category") == target_size: + # Check if model is available + for available_model in self.available_models: + if available_model["key"] == model_key and available_model.get( + "available", False + ): + return model_key + + # If no exact match, use preferred models from tier detector + current_tier = self.tier_detector.detect_current_tier() + preferred_models = self.tier_detector.get_preferred_models(current_tier) + + # Find model of target size in preferred list + for preferred_model in preferred_models: + if preferred_model in self.model_configurations: + config = self.model_configurations[preferred_model] + if config.get("category") == target_size: + return preferred_model + + return None + + except Exception as e: + self.logger.error(f"Error finding model by size {target_size}: {e}") + return None + + async def _execute_proactive_upgrade(self, target_model_key: str) -> None: + """Execute proactive model upgrade with proper timing. + + Args: + target_model_key: Model to upgrade to + """ + try: + # Only upgrade if not currently switching and enough time has passed + if hasattr(self, "_upgrade_in_progress") and self._upgrade_in_progress: + return + + self._upgrade_in_progress = True + + success = await self.switch_model(target_model_key) + if success: + self.logger.info(f"Proactive upgrade completed: {target_model_key}") + else: + self.logger.warning(f"Proactive upgrade failed: {target_model_key}") + + except Exception as e: + self.logger.error(f"Error executing proactive upgrade: {e}") + finally: + self._upgrade_in_progress = False + def shutdown(self) -> None: """Clean up resources and unload models.""" try: + # Stop proactive scaling monitoring + if hasattr(self, "_proactive_scaler"): + self._proactive_scaler.stop_continuous_monitoring() + if self.current_model_instance and self.current_model_key: self.lm_adapter.unload_model(self.current_model_key) self.current_model_key = None