diff --git a/.gitignore b/.gitignore index 0dbf2f2..b95683b 100644 --- a/.gitignore +++ b/.gitignore @@ -168,3 +168,4 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +/tokenizer_vocab.txt \ No newline at end of file diff --git a/main.py b/main.py index fdc86ad..062611b 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,8 @@ import discord import os from dotenv import load_dotenv from datetime import datetime +from tokenizer import Tokenizer +from model import RubyTrainer # Load environment load_dotenv() @@ -19,6 +21,8 @@ intents = intents class Ruby(discord.Client): def __init__(self): super().__init__(intents=intents) + self.tokenizer = Tokenizer() + self.trainer = RubyTrainer(self.tokenizer) self.log_path = os.path.join("logs", "messages.log") os.makedirs("logs", exist_ok=True) @@ -27,10 +31,18 @@ class Ruby(discord.Client): async def on_message(self, message: discord.Message): if message.author.id == self.user.id: - return # ignore self + return self.log_message(message) - self.train_on_message(message) + self.trainer.train_on_tokens_from_text(message.content.strip()) + + reply = self.trainer.generate_reply() + if reply.strip(): + await message.channel.send(reply) + else: + print("[REPLY] Skipped (empty)") + + def log_message(self, message: discord.Message): timestamp = datetime.utcnow().isoformat() @@ -42,7 +54,11 @@ class Ruby(discord.Client): print(f"[LOGGED] {log_entry.strip()}") def train_on_message(self, message: discord.Message): - print(f"[TRAIN] Simulating training on: \"{message.content.strip()}\"") + text = message.content.strip() + self.trainer.train_on_tokens_from_text(text) + token_tensor = torch.tensor(tokens, dtype=torch.long) + loss = train_on_tokens(self.model, tokens, self.optimizer, self.criterion, device="cpu") + print(f"[TRAIN] Tokens: {tokens} | Loss: {loss:.4f}") # Run Ruby diff --git a/model.py b/model.py new file mode 100644 index 0000000..4d1b4cf --- /dev/null +++ b/model.py @@ -0,0 +1,106 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F + +class MiniGPT(nn.Module): + def __init__(self, vocab_size, embed_dim=128, n_heads=4, n_layers=2, max_len=128): + super().__init__() + self.token_embed = nn.Embedding(vocab_size, embed_dim) + self.pos_embed = nn.Embedding(max_len, embed_dim) + self.blocks = nn.ModuleList([ + nn.TransformerEncoderLayer(d_model=embed_dim, nhead=n_heads, batch_first=True) + for _ in range(n_layers) + ]) + self.ln_f = nn.LayerNorm(embed_dim) + self.head = nn.Linear(embed_dim, vocab_size) + + def forward(self, x): + seq_len = x.size(1) + pos = torch.arange(0, seq_len, device=x.device).unsqueeze(0) + x = self.token_embed(x) + self.pos_embed(pos) + for block in self.blocks: + x = block(x) + x = self.ln_f(x) + return self.head(x) + +class RubyTrainer: + def __init__(self, tokenizer, embed_dim=128, n_heads=4, n_layers=2, max_len=128): + self.tokenizer = tokenizer + self.device = "cuda" if torch.cuda.is_available() else "cpu" + self.embed_dim = embed_dim + self.n_heads = n_heads + self.n_layers = n_layers + self.max_len = max_len + + self.model = None + self.optimizer = None + self.criterion = torch.nn.CrossEntropyLoss() + + self.rebuild_model_if_needed() + + def rebuild_model_if_needed(self): + vocab_size = len(self.tokenizer.vocab) + if self.model is None or self.model.token_embed.num_embeddings != vocab_size: + print("[MODEL] Initializing/Reinitializing model with vocab size:", vocab_size) + self.model = MiniGPT( + vocab_size, + self.embed_dim, + self.n_heads, + self.n_layers, + self.max_len + ).to(self.device) + self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001) + + def train_on_tokens_from_text(self, text: str): + tokens = self.tokenizer.tokenize(text) + if not tokens: + return + + # Wrap with and + tokens = [self.tokenizer.vocab[""]] + tokens + [self.tokenizer.vocab[""]] + + if len(tokens) < 2: + print("[TRAIN] Skipped (not enough tokens)") + return + + self.rebuild_model_if_needed() + + self.model.train() + x = torch.tensor(tokens[:-1], dtype=torch.long, device=self.device).unsqueeze(0) + y = torch.tensor(tokens[1:], dtype=torch.long, device=self.device).unsqueeze(0) + + out = self.model(x) + loss = self.criterion(out.view(-1, out.size(-1)), y.view(-1)) + loss.backward() + self.optimizer.step() + self.optimizer.zero_grad() + + print(f"[TRAIN] Tokens: {tokens} | Loss: {loss.item():.4f}") + + def generate_reply(self, max_tokens=15, temperature=1.0, top_k=5): + self.model.eval() + + input_ids = torch.tensor([[self.tokenizer.vocab[""]]], dtype=torch.long, device=self.device) + + for _ in range(max_tokens): + with torch.no_grad(): + out = self.model(input_ids) + logits = out[:, -1, :] / temperature + + if top_k > 0: + top_k_logits, top_k_indices = torch.topk(logits, top_k) + probs = F.softmax(top_k_logits, dim=-1) + next_token = top_k_indices[0][torch.multinomial(probs, 1)] + else: + probs = F.softmax(logits, dim=-1) + next_token = torch.multinomial(probs, 1)[0] + + # ⬇️ Fix here: reshape next_token to (1, 1) + next_token = next_token.view(1, 1) + input_ids = torch.cat([input_ids, next_token], dim=1) + + if next_token.item() == self.tokenizer.vocab[""]: + break + + token_ids = input_ids.squeeze(0).tolist()[1:] # skip + return self.tokenizer.detokenize(token_ids) diff --git a/tokenizer.py b/tokenizer.py new file mode 100644 index 0000000..84f0ded --- /dev/null +++ b/tokenizer.py @@ -0,0 +1,38 @@ +import os + +class Tokenizer: + def __init__(self, vocab_path="tokenizer_vocab.txt"): + self.vocab_path = vocab_path + self.vocab = {"": 0, "": 1} + self.inv_vocab = {0: "", 1: ""} + self.load_vocab() + + def load_vocab(self): + if not os.path.exists(self.vocab_path): + return + with open(self.vocab_path, "r", encoding="utf-8") as f: + for line in f: + token, idx = line.strip().split("\t") + self.vocab[token] = int(idx) + if token not in self.vocab: + self.vocab[token] = idx + self.inv_vocab[idx] = token + self.inv_vocab = {v: k for k, v in self.vocab.items()} + + def save_vocab(self): + with open(self.vocab_path, "w", encoding="utf-8") as f: + for token, idx in self.vocab.items(): + f.write(f"{token}\t{idx}\n") + + def tokenize(self, text): + tokens = [] + for word in text.strip().split(): + if word not in self.vocab: + self.vocab[word] = len(self.vocab) + self.inv_vocab[self.vocab[word]] = word + tokens.append(self.vocab[word]) + self.save_vocab() + return tokens + + def detokenize(self, tokens): + return " ".join(self.inv_vocab.get(t, "") for t in tokens)