126 lines
3.8 KiB
Python
126 lines
3.8 KiB
Python
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
import random
|
|
from model.tokenizer import Tokenizer
|
|
from model.memory import save_dream
|
|
from model.train import train_on_message
|
|
from model.journal import record_to_journal
|
|
|
|
|
|
recent_dreams = []
|
|
|
|
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
tokenizer = Tokenizer()
|
|
VOCAB_SIZE = 10000 # Temporary cap, grows dynamically
|
|
EMBED_DIM = 128
|
|
|
|
|
|
class MultiHeadSelfAttention(nn.Module):
|
|
def __init__(self, embed_dim, heads):
|
|
super().__init__()
|
|
assert embed_dim % heads == 0
|
|
self.heads = heads
|
|
self.head_dim = embed_dim // heads
|
|
self.scale = torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))
|
|
|
|
self.to_qkv = nn.Linear(embed_dim, embed_dim * 3)
|
|
self.out = nn.Linear(embed_dim, embed_dim)
|
|
|
|
def forward(self, x):
|
|
B, T, C = x.shape
|
|
qkv = self.to_qkv(x).view(B, T, self.heads, 3 * self.head_dim)
|
|
q, k, v = qkv.chunk(3, dim=-1)
|
|
|
|
attn_scores = (q @ k.transpose(-2, -1)) / self.scale
|
|
attn_weights = torch.softmax(attn_scores, dim=-1)
|
|
|
|
out = attn_weights @ v
|
|
out = out.transpose(1, 2).contiguous().view(B, T, C)
|
|
return self.out(out)
|
|
|
|
|
|
class TransformerBlock(nn.Module):
|
|
def __init__(self, embed_dim, heads):
|
|
super().__init__()
|
|
self.attn = MultiHeadSelfAttention(embed_dim, heads)
|
|
self.norm1 = nn.LayerNorm(embed_dim)
|
|
self.ff = nn.Sequential(
|
|
nn.Linear(embed_dim, embed_dim * 4),
|
|
nn.ReLU(),
|
|
nn.Linear(embed_dim * 4, embed_dim)
|
|
)
|
|
self.norm2 = nn.LayerNorm(embed_dim)
|
|
|
|
def forward(self, x):
|
|
x = x + self.attn(self.norm1(x))
|
|
x = x + self.ff(self.norm2(x))
|
|
return x
|
|
|
|
|
|
class TinyTransformer(nn.Module):
|
|
def __init__(self, vocab_size=VOCAB_SIZE, embed_dim=256, depth=4, heads=8):
|
|
super().__init__()
|
|
self.token_embed = nn.Embedding(vocab_size, embed_dim)
|
|
self.pos_embed = nn.Parameter(torch.randn(1, 128, embed_dim))
|
|
self.blocks = nn.Sequential(*[TransformerBlock(embed_dim, heads) for _ in range(depth)])
|
|
self.norm = nn.LayerNorm(embed_dim)
|
|
self.head = nn.Linear(embed_dim, vocab_size)
|
|
|
|
def forward(self, x):
|
|
B, T = x.shape
|
|
tok = self.token_embed(x)
|
|
pos = self.pos_embed[:, :T, :]
|
|
x = tok + pos
|
|
x = self.blocks(x)
|
|
x = self.norm(x)
|
|
return self.head(x)
|
|
|
|
|
|
model = TinyTransformer().to(DEVICE)
|
|
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
|
|
|
def generate_response():
|
|
seed = torch.tensor([random.randint(0, tokenizer.next_id - 1)], device=DEVICE)
|
|
output = model(seed.unsqueeze(0))
|
|
pred = torch.argmax(output, dim=-1).squeeze().tolist()
|
|
if not isinstance(pred, list):
|
|
pred = [pred]
|
|
return tokenizer.detokenize(pred)
|
|
|
|
|
|
def score_sentence(sentence: str) -> float:
|
|
words = sentence.strip().split()
|
|
length = len(words)
|
|
diversity = len(set(words)) / (length + 1)
|
|
if length < 4:
|
|
return 0.0
|
|
return diversity * min(length, 20)
|
|
|
|
|
|
def daydream():
|
|
model.eval()
|
|
seed = torch.tensor([random.randint(0, tokenizer.next_id - 1)], device=DEVICE).unsqueeze(0)
|
|
dream = []
|
|
|
|
for _ in range(12):
|
|
out = model(seed)
|
|
logits = out[:, -1, :]
|
|
probs = F.softmax(logits, dim=-1)
|
|
token = torch.multinomial(probs, num_samples=1)
|
|
dream.append(token.item())
|
|
seed = torch.cat([seed, token], dim=1)
|
|
|
|
sentence = tokenizer.detokenize(dream)
|
|
score = score_sentence(sentence)
|
|
|
|
if score > 0.45:
|
|
save_dream(sentence, score)
|
|
record_to_journal(sentence)
|
|
train_on_message(sentence)
|
|
recent_dreams.append((score, sentence))
|
|
if len(recent_dreams) > 10:
|
|
recent_dreams.pop(0)
|