import torch import torch.nn as nn import torch.nn.functional as F class MiniGPT(nn.Module): def __init__(self, vocab_size, embed_dim=128, n_heads=4, n_layers=2, max_len=128): super().__init__() self.token_embed = nn.Embedding(vocab_size, embed_dim) self.pos_embed = nn.Embedding(max_len, embed_dim) self.blocks = nn.ModuleList([ nn.TransformerEncoderLayer(d_model=embed_dim, nhead=n_heads, batch_first=True) for _ in range(n_layers) ]) self.ln_f = nn.LayerNorm(embed_dim) self.head = nn.Linear(embed_dim, vocab_size) def forward(self, x): seq_len = x.size(1) pos = torch.arange(0, seq_len, device=x.device).unsqueeze(0) x = self.token_embed(x) + self.pos_embed(pos) for block in self.blocks: x = block(x) x = self.ln_f(x) return self.head(x) class RubyTrainer: def __init__(self, tokenizer, embed_dim=128, n_heads=4, n_layers=2, max_len=128): self.tokenizer = tokenizer self.device = "cuda" if torch.cuda.is_available() else "cpu" self.embed_dim = embed_dim self.n_heads = n_heads self.n_layers = n_layers self.max_len = max_len self.model = None self.optimizer = None self.criterion = torch.nn.CrossEntropyLoss() self.rebuild_model_if_needed() def rebuild_model_if_needed(self): vocab_size = len(self.tokenizer.vocab) if self.model is None or self.model.token_embed.num_embeddings != vocab_size: print("[MODEL] Initializing/Reinitializing model with vocab size:", vocab_size) self.model = MiniGPT( vocab_size, self.embed_dim, self.n_heads, self.n_layers, self.max_len ).to(self.device) self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001) def train_on_tokens_from_text(self, text: str): tokens = self.tokenizer.tokenize(text) if not tokens: return # Wrap with and tokens = [self.tokenizer.vocab[""]] + tokens + [self.tokenizer.vocab[""]] if len(tokens) < 2: print("[TRAIN] Skipped (not enough tokens)") return self.rebuild_model_if_needed() self.model.train() x = torch.tensor(tokens[:-1], dtype=torch.long, device=self.device).unsqueeze(0) y = torch.tensor(tokens[1:], dtype=torch.long, device=self.device).unsqueeze(0) out = self.model(x) loss = self.criterion(out.view(-1, out.size(-1)), y.view(-1)) loss.backward() self.optimizer.step() self.optimizer.zero_grad() print(f"[TRAIN] Tokens: {tokens} | Loss: {loss.item():.4f}") def generate_reply(self, max_tokens=15, temperature=1.0, top_k=5): self.model.eval() input_ids = torch.tensor([[self.tokenizer.vocab[""]]], dtype=torch.long, device=self.device) for _ in range(max_tokens): with torch.no_grad(): out = self.model(input_ids) logits = out[:, -1, :] / temperature if top_k > 0: top_k_logits, top_k_indices = torch.topk(logits, top_k) probs = F.softmax(top_k_logits, dim=-1) next_token = top_k_indices[0][torch.multinomial(probs, 1)] else: probs = F.softmax(logits, dim=-1) next_token = torch.multinomial(probs, 1)[0] # ⬇️ Fix here: reshape next_token to (1, 1) next_token = next_token.view(1, 1) input_ids = torch.cat([input_ids, next_token], dim=1) if next_token.item() == self.tokenizer.vocab[""]: break token_ids = input_ids.squeeze(0).tolist()[1:] # skip return self.tokenizer.detokenize(token_ids)