import torch import torch.nn as nn import torch.nn.functional as F from typing import Optional, Tuple, Dict, Any import math from .attention import SelfEvolvingAttention, MultiHeadAttention class PositionalEncoding(nn.Module): """Sinusoidal positional encoding with learnable scaling.""" def __init__(self, embed_dim: int, max_len: int = 5000, dropout: float = 0.1): super().__init__() self.dropout = nn.Dropout(dropout) self.scale = nn.Parameter(torch.ones(1)) pe = torch.zeros(max_len, embed_dim) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-math.log(10000.0) / embed_dim)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) self.register_buffer('pe', pe.unsqueeze(0)) def forward(self, x: torch.Tensor) -> torch.Tensor: seq_len = x.size(1) x = x + self.scale * self.pe[:, :seq_len] return self.dropout(x) class LayerNorm(nn.Module): """Layer normalization with learnable parameters and bias.""" def __init__(self, embed_dim: int, eps: float = 1e-5): super().__init__() self.eps = eps self.weight = nn.Parameter(torch.ones(embed_dim)) self.bias = nn.Parameter(torch.zeros(embed_dim)) def forward(self, x: torch.Tensor) -> torch.Tensor: mean = x.mean(dim=-1, keepdim=True) std = x.std(dim=-1, keepdim=True) return self.weight * (x - mean) / (std + self.eps) + self.bias class FeedForward(nn.Module): """Enhanced feedforward network with adaptive activation.""" def __init__( self, embed_dim: int, ff_dim: int, dropout: float = 0.1, activation: str = "gelu" ): super().__init__() self.embed_dim = embed_dim self.ff_dim = ff_dim # Standard feedforward layers self.linear1 = nn.Linear(embed_dim, ff_dim) self.linear2 = nn.Linear(ff_dim, embed_dim) self.dropout = nn.Dropout(dropout) # Adaptive activation - can learn to emphasize different patterns self.activation_gate = nn.Linear(embed_dim, ff_dim) # Choose activation function if activation == "gelu": self.activation = nn.GELU() elif activation == "relu": self.activation = nn.ReLU() elif activation == "swish": self.activation = nn.SiLU() else: self.activation = nn.GELU() def forward(self, x: torch.Tensor) -> torch.Tensor: # Standard feedforward path h = self.linear1(x) h = self.activation(h) # Adaptive gating based on input gate = torch.sigmoid(self.activation_gate(x)) h = h * gate h = self.dropout(h) return self.linear2(h) class LyraTransformerBlock(nn.Module): """ Transformer block with self-evolution capabilities. This block can adapt its behavior based on conversation context, emotional state, and past interaction success. """ def __init__( self, embed_dim: int, num_heads: int, ff_dim: int, dropout: float = 0.1, use_evolution: bool = True, layer_id: int = 0 ): super().__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.layer_id = layer_id self.use_evolution = use_evolution # Attention mechanism if use_evolution: self.attention = SelfEvolvingAttention( embed_dim=embed_dim, num_heads=num_heads, dropout=dropout ) else: self.attention = MultiHeadAttention( embed_dim=embed_dim, num_heads=num_heads, dropout=dropout ) # Layer normalization self.norm1 = LayerNorm(embed_dim) self.norm2 = LayerNorm(embed_dim) # Feedforward network self.feedforward = FeedForward( embed_dim=embed_dim, ff_dim=ff_dim, dropout=dropout ) # Evolution-specific components if use_evolution: # Emotional influence on processing self.emotional_projection = nn.Linear(embed_dim, embed_dim // 4) self.emotional_gate = nn.Linear(embed_dim // 4, embed_dim) # Layer-specific adaptation parameters self.adaptation_strength = nn.Parameter(torch.ones(1) * 0.1) self.emotional_sensitivity = nn.Parameter(torch.ones(1) * 0.5) self.dropout = nn.Dropout(dropout) def forward( self, x: torch.Tensor, attn_mask: Optional[torch.Tensor] = None, key_padding_mask: Optional[torch.Tensor] = None, emotional_state: Optional[torch.Tensor] = None, evolve: bool = True ) -> Tuple[torch.Tensor, Dict[str, Any]]: """ Forward pass through transformer block. Args: x: Input tensor [batch, seq_len, embed_dim] attn_mask: Attention mask key_padding_mask: Key padding mask emotional_state: Current emotional state evolve: Whether to apply evolution this step Returns: output: Block output layer_info: Information about this layer's processing """ layer_info = {} # Store input for residual residual = x # Pre-normalization x_norm = self.norm1(x) # Self-attention if self.use_evolution and isinstance(self.attention, SelfEvolvingAttention): attn_out, attn_weights, evolution_info = self.attention( query=x_norm, key=x_norm, value=x_norm, attn_mask=attn_mask, key_padding_mask=key_padding_mask, emotional_state=emotional_state, evolve=evolve and self.training ) layer_info.update(evolution_info) else: attn_out, attn_weights = self.attention( query=x_norm, key=x_norm, value=x_norm, attn_mask=attn_mask, key_padding_mask=key_padding_mask ) # Apply emotional influence if available if self.use_evolution and emotional_state is not None: emotional_features = self.emotional_projection(emotional_state.mean(dim=1, keepdim=True)) emotional_gate_values = torch.sigmoid(self.emotional_gate(emotional_features)) # Apply emotional gating emotional_influence = self.emotional_sensitivity * emotional_gate_values attn_out = attn_out * (1 + emotional_influence) layer_info['emotional_influence'] = emotional_influence.mean().item() # First residual connection x = residual + self.dropout(attn_out) # Second sublayer: feedforward residual = x x_norm = self.norm2(x) ff_out = self.feedforward(x_norm) # Second residual connection x = residual + self.dropout(ff_out) # Store layer statistics layer_info.update({ 'layer_id': self.layer_id, 'attention_entropy': self._compute_attention_entropy(attn_weights), 'activation_magnitude': x.abs().mean().item(), 'gradient_norm': None # Will be filled during backward pass if needed }) return x, layer_info def _compute_attention_entropy(self, attn_weights: torch.Tensor) -> float: """Compute entropy of attention weights (measure of focus vs. distribution).""" # attn_weights: [batch, num_heads, seq_len, seq_len] with torch.no_grad(): # Average across batch and heads avg_attn = attn_weights.mean(dim=(0, 1)) # [seq_len, seq_len] # Compute row-wise entropy (how spread out each token's attention is) row_entropy = -torch.sum(avg_attn * torch.log(avg_attn + 1e-8), dim=-1) return row_entropy.mean().item() def evolve_from_feedback(self, feedback_signal: float): """Update layer parameters based on conversation feedback.""" if not self.use_evolution: return with torch.no_grad(): # Update adaptation strength based on feedback if feedback_signal > 0.7: # Good feedback self.adaptation_strength.data *= 1.01 self.emotional_sensitivity.data *= 0.99 # Less emotional when doing well elif feedback_signal < 0.3: # Poor feedback self.adaptation_strength.data *= 0.99 self.emotional_sensitivity.data *= 1.01 # More emotional when struggling # Clamp parameters self.adaptation_strength.data = torch.clamp(self.adaptation_strength.data, 0.01, 0.5) self.emotional_sensitivity.data = torch.clamp(self.emotional_sensitivity.data, 0.1, 2.0) # Evolve attention patterns if using evolving attention if isinstance(self.attention, SelfEvolvingAttention): self.attention.evolve_attention_patterns(feedback_signal) class LyraTransformer(nn.Module): """ Complete transformer model with self-evolution capabilities. This is the core of Lyra's language understanding and generation, with the ability to adapt and evolve based on interactions. """ def __init__( self, vocab_size: int, embed_dim: int = 768, num_layers: int = 12, num_heads: int = 12, ff_dim: int = 3072, max_len: int = 2048, dropout: float = 0.1, use_evolution: bool = True ): super().__init__() self.vocab_size = vocab_size self.embed_dim = embed_dim self.num_layers = num_layers self.use_evolution = use_evolution # Embedding layers self.token_embedding = nn.Embedding(vocab_size, embed_dim) self.positional_encoding = PositionalEncoding(embed_dim, max_len, dropout) # Transformer blocks self.layers = nn.ModuleList([ LyraTransformerBlock( embed_dim=embed_dim, num_heads=num_heads, ff_dim=ff_dim, dropout=dropout, use_evolution=use_evolution, layer_id=i ) for i in range(num_layers) ]) # Output layers self.final_norm = LayerNorm(embed_dim) self.output_projection = nn.Linear(embed_dim, vocab_size) # Evolution tracking self.generation_count = 0 self.last_feedback = 0.5 self._init_parameters() def _init_parameters(self): """Initialize parameters with appropriate scaling.""" # Initialize embeddings nn.init.normal_(self.token_embedding.weight, mean=0, std=0.02) # Initialize output projection nn.init.normal_(self.output_projection.weight, mean=0, std=0.02) if self.output_projection.bias is not None: nn.init.zeros_(self.output_projection.bias) def forward( self, input_ids: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, emotional_state: Optional[torch.Tensor] = None, evolve: bool = True ) -> Tuple[torch.Tensor, Dict[str, Any]]: """ Forward pass through the transformer. Args: input_ids: Token IDs [batch, seq_len] attention_mask: Attention mask emotional_state: Current emotional state evolve: Whether to apply evolution Returns: logits: Output logits [batch, seq_len, vocab_size] model_info: Information about the forward pass """ batch_size, seq_len = input_ids.shape device = input_ids.device # Create attention mask if not provided if attention_mask is None: attention_mask = torch.ones(batch_size, seq_len, device=device) # Convert attention mask to the format expected by attention layers # 1 = attend, 0 = don't attend extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) extended_attention_mask = extended_attention_mask.expand( batch_size, 1, seq_len, seq_len ) # Key padding mask (True = padding, False = real tokens) key_padding_mask = (attention_mask == 0) # Embeddings x = self.token_embedding(input_ids) x = self.positional_encoding(x) # Track layer information model_info = { 'layer_info': [], 'total_parameters': sum(p.numel() for p in self.parameters()), 'evolution_active': evolve and self.use_evolution } # Pass through transformer layers for layer in self.layers: x, layer_info = layer( x=x, attn_mask=extended_attention_mask, key_padding_mask=key_padding_mask, emotional_state=emotional_state, evolve=evolve ) model_info['layer_info'].append(layer_info) # Final normalization and projection x = self.final_norm(x) logits = self.output_projection(x) # Update generation count self.generation_count += 1 return logits, model_info def generate( self, input_ids: torch.Tensor, max_new_tokens: int = 50, temperature: float = 1.0, top_k: int = 50, top_p: float = 0.9, emotional_state: Optional[torch.Tensor] = None, evolve: bool = True ) -> Tuple[torch.Tensor, Dict[str, Any]]: """ Generate text autoregressively. Args: input_ids: Starting token IDs max_new_tokens: Maximum number of tokens to generate temperature: Sampling temperature top_k: Top-k sampling top_p: Top-p (nucleus) sampling emotional_state: Current emotional state evolve: Whether to apply evolution during generation Returns: generated_ids: Complete sequence including input generation_info: Information about generation process """ self.eval() device = input_ids.device batch_size, input_len = input_ids.shape generated_ids = input_ids.clone() generation_info = { 'tokens_generated': 0, 'average_confidence': 0.0, 'generation_steps': [] } with torch.no_grad(): for step in range(max_new_tokens): # Forward pass logits, model_info = self.forward( input_ids=generated_ids, emotional_state=emotional_state, evolve=evolve ) # Get next token logits next_token_logits = logits[:, -1, :] / temperature # Apply top-k filtering if top_k > 0: top_k_values, top_k_indices = torch.topk(next_token_logits, top_k) next_token_logits[next_token_logits < top_k_values[:, -1:]] = float('-inf') # Apply top-p filtering if top_p < 1.0: sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True) cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) # Create mask for tokens to keep sorted_indices_to_remove = cumulative_probs > top_p sorted_indices_to_remove[:, 1:] = sorted_indices_to_remove[:, :-1].clone() sorted_indices_to_remove[:, 0] = 0 # Scatter back to original indices indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove) next_token_logits[indices_to_remove] = float('-inf') # Sample next token probs = F.softmax(next_token_logits, dim=-1) next_token = torch.multinomial(probs, num_samples=1) # Track confidence confidence = probs.max(dim=-1)[0].mean().item() generation_info['average_confidence'] += confidence # Append to sequence generated_ids = torch.cat([generated_ids, next_token], dim=1) # Store step info generation_info['generation_steps'].append({ 'step': step, 'token_id': next_token.item(), 'confidence': confidence, 'temperature': temperature }) generation_info['tokens_generated'] += 1 # Check for end of sequence (you might want to add EOS token logic here) # if next_token.item() == eos_token_id: # break # Calculate average confidence if generation_info['tokens_generated'] > 0: generation_info['average_confidence'] /= generation_info['tokens_generated'] return generated_ids, generation_info def evolve_from_conversation(self, feedback_signal: float): """Evolve the entire model based on conversation feedback.""" if not self.use_evolution: return self.last_feedback = feedback_signal # Evolve each layer for layer in self.layers: layer.evolve_from_feedback(feedback_signal) def get_model_stats(self) -> Dict[str, Any]: """Get statistics about the model's current state.""" stats = { 'generation_count': self.generation_count, 'last_feedback': self.last_feedback, 'model_parameters': sum(p.numel() for p in self.parameters()), 'trainable_parameters': sum(p.numel() for p in self.parameters() if p.requires_grad) } if self.use_evolution: # Get evolution-specific stats from each layer layer_stats = [] for i, layer in enumerate(self.layers): if hasattr(layer, 'adaptation_strength'): layer_stats.append({ 'layer_id': i, 'adaptation_strength': layer.adaptation_strength.item(), 'emotional_sensitivity': layer.emotional_sensitivity.item() }) stats['layer_evolution'] = layer_stats # Get attention diversity attention_diversity = [] for layer in self.layers: if isinstance(layer.attention, SelfEvolvingAttention): diversity = layer.attention.get_attention_diversity() attention_diversity.append(diversity) if attention_diversity: stats['attention_diversity'] = { 'mean': sum(attention_diversity) / len(attention_diversity), 'per_layer': attention_diversity } return stats