Updating scoring system to try to encourage more growth. removed the .lower from most things. Updated .gitignore to not allow a file to be saved that doesn't need to be.

This commit is contained in:
Dani 2025-04-19 08:28:24 -04:00
parent facb1036c2
commit 21748f119f
3 changed files with 117 additions and 146 deletions

1
.gitignore vendored
View File

@ -171,3 +171,4 @@ cython_debug/
/tokenizer_vocab.txt /tokenizer_vocab.txt
/logs/core_dreams.txt /logs/core_dreams.txt
/logs/best_dream.txt /logs/best_dream.txt
/.vscode/launch.json

View File

@ -1,5 +1,6 @@
import os import os
class Tokenizer: class Tokenizer:
def __init__(self, vocab_path="tokenizer_vocab.txt"): def __init__(self, vocab_path="tokenizer_vocab.txt"):
self.vocab_path = vocab_path self.vocab_path = vocab_path

View File

@ -5,6 +5,8 @@ from collections import Counter
import os import os
from model import MiniGPT from model import MiniGPT
# flake8: noqa E501
class RubyTrainer: class RubyTrainer:
def __init__(self, tokenizer, embed_dim=128, n_heads=4, n_layers=2, max_len=128): def __init__(self, tokenizer, embed_dim=128, n_heads=4, n_layers=2, max_len=128):
@ -18,34 +20,28 @@ class RubyTrainer:
self.model = None self.model = None
self.optimizer = None self.optimizer = None
self.criterion = torch.nn.CrossEntropyLoss() self.criterion = torch.nn.CrossEntropyLoss()
self.rebuild_model_if_needed() self.rebuild_model_if_needed()
self.best_dream = ("", 0.0) self.best_dream = ("", 0.0)
self.recent_dreams = []
self.rejection_streak = 0
def rebuild_model_if_needed(self): def rebuild_model_if_needed(self):
vocab_size = len(self.tokenizer.vocab) vocab_size = len(self.tokenizer.vocab)
if self.model is None or self.model.token_embed.num_embeddings != vocab_size: if self.model is None or self.model.token_embed.num_embeddings != vocab_size:
print("[MODEL] Initializing/Reinitializing model with vocab size:", vocab_size) print("[MODEL] Initializing/Reinitializing model with vocab size:", vocab_size)
self.model = MiniGPT( self.model = MiniGPT(vocab_size, self.embed_dim, self.n_heads, self.n_layers, self.max_len).to(self.device)
vocab_size,
self.embed_dim,
self.n_heads,
self.n_layers,
self.max_len
).to(self.device)
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001) self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)
def train_on_tokens_from_text(self, text: str): def train_on_tokens_from_text(self, text: str):
tokens = self.tokenizer.tokenize(text.lower()) tokens = self.tokenizer.tokenize(text)
if not tokens: if not tokens:
return return
tokens = [self.tokenizer.vocab["<START>"]] + tokens + [self.tokenizer.vocab["<END>"]] tokens = [self.tokenizer.vocab["<START>"]] + tokens + [self.tokenizer.vocab["<END>"]]
if len(tokens) < 2: if len(tokens) < 2:
return return
self.rebuild_model_if_needed() self.rebuild_model_if_needed()
self.model.train() self.model.train()
x = torch.tensor(tokens[:-1], dtype=torch.long, device=self.device).unsqueeze(0) x = torch.tensor(tokens[:-1], dtype=torch.long, device=self.device).unsqueeze(0)
y = torch.tensor(tokens[1:], dtype=torch.long, device=self.device).unsqueeze(0) y = torch.tensor(tokens[1:], dtype=torch.long, device=self.device).unsqueeze(0)
@ -58,58 +54,44 @@ class RubyTrainer:
print(f"[TRAIN] Tokens: {tokens} | Loss: {loss.item():.4f}") print(f"[TRAIN] Tokens: {tokens} | Loss: {loss.item():.4f}")
def generate_reply(self, max_tokens=50, temperature=1.1, top_k=10): def generate_reply(self, prompt=None, max_length=20):
self.model.eval() self.model.eval()
input_ids = torch.tensor([[self.tokenizer.vocab["<START>"]]], dtype=torch.long, device=self.device) input_ids = torch.tensor([[self.tokenizer.vocab["<START>"]]], device=self.device)
token_freq = {}
for _ in range(max_tokens):
with torch.no_grad(): with torch.no_grad():
out = self.model(input_ids) for _ in range(max_length):
logits = out[:, -1, :] / temperature output = self.model(input_ids)
logits = output[:, -1, :]
if input_ids.size(1) < 8: # Apply repeat penalty BEFORE sampling
logits[0, self.tokenizer.vocab["<END>"]] = float("-inf") if input_ids.size(1) >= 2:
last_token = input_ids[0, -1].item()
logits[0, last_token] *= 0.1 # Penalize repeating same token again
for token_id in set(token_freq.keys()): next_token = torch.argmax(logits, dim=-1)
logits[0, token_id] *= 0.7 ** token_freq[token_id] input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)
probs = F.softmax(logits, dim=-1)
if top_k > 0:
actual_k = min(top_k, probs.size(-1))
top_k_probs, top_k_indices = torch.topk(probs, actual_k)
next_token = top_k_indices[0][torch.multinomial(top_k_probs, 1)]
else:
next_token = torch.multinomial(probs, 1)[0]
token_freq[next_token.item()] = token_freq.get(next_token.item(), 0) + 1
next_token = next_token.view(1, 1)
input_ids = torch.cat([input_ids, next_token], dim=1)
if next_token.item() == self.tokenizer.vocab["<END>"]: if next_token.item() == self.tokenizer.vocab["<END>"]:
break break
token_ids = input_ids.squeeze(0).tolist()[1:] output = self.tokenizer.detokenize(input_ids.squeeze().tolist())
reply_tokens = [t for t in token_ids if t != self.tokenizer.vocab["<END>"]] output = output.replace("<START>", "").replace("<END>", "").strip()
return self.tokenizer.detokenize(reply_tokens) return output
def self_rephrase(self, original: str, max_tokens=50): def self_rephrase(self, original: str, max_tokens=50):
self.model.eval() self.model.eval()
tokens = [self.tokenizer.vocab["<START>"]] + self.tokenizer.tokenize(original.lower()) tokens = [self.tokenizer.vocab["<START>"]] + self.tokenizer.tokenize(original)
input_ids = torch.tensor(tokens, dtype=torch.long, device=self.device).unsqueeze(0) input_ids = torch.tensor(tokens, dtype=torch.long, device=self.device).unsqueeze(0)
for _ in range(max_tokens): for _ in range(max_tokens):
with torch.no_grad(): with torch.no_grad():
out = self.model(input_ids) out = self.model(input_ids)
logits = out[:, -1, :] / 1.1 logits = out[:, -1, :] / 1.1
if input_ids.size(1) < 8: if input_ids.size(1) < 8:
logits[0, self.tokenizer.vocab["<END>"]] = float("-inf") logits[0, self.tokenizer.vocab["<END>"]] = float("-inf")
probs = F.softmax(logits, dim=-1) probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, 1)[0] next_token = torch.multinomial(probs, 1)[0].view(1, 1)
next_token = next_token.view(1, 1)
input_ids = torch.cat([input_ids, next_token], dim=1) input_ids = torch.cat([input_ids, next_token], dim=1)
if next_token.item() == self.tokenizer.vocab["<END>"]: if next_token.item() == self.tokenizer.vocab["<END>"]:
@ -118,64 +100,56 @@ class RubyTrainer:
new_tokens = input_ids.squeeze(0).tolist()[1:] new_tokens = input_ids.squeeze(0).tolist()[1:]
return self.tokenizer.detokenize([t for t in new_tokens if t != self.tokenizer.vocab["<END>"]]) return self.tokenizer.detokenize([t for t in new_tokens if t != self.tokenizer.vocab["<END>"]])
def dream(self, log_path="logs/messages.log", max_lines=50):
print("[DREAM] Ruby is dreaming...")
if not os.path.exists(log_path):
print("[DREAM] No memory to dream from.")
return
with open(log_path, "r", encoding="utf-8") as f:
lines = f.readlines()[-max_lines:]
learned = 0
for line in lines:
parts = line.strip().split("|")
if len(parts) >= 3:
text = parts[2].strip()
self.train_on_tokens_from_text(text)
learned += 1
print(f"[DREAM] Dream complete. Trained on {learned} memories.")
def daydream(self, rounds=5, log_output="logs/dreams.log", say_thought=False): def daydream(self, rounds=5, log_output="logs/dreams.log", say_thought=False):
print("[DAYDREAM] Ruby is imagining new thoughts...") print("[DAYDREAM] Ruby is imagining new thoughts...")
thoughts = []
attempts = 0 thoughts, attempts, max_attempts = [], 0, rounds * 5
max_attempts = rounds * 3
while len(thoughts) < rounds and attempts < max_attempts: while len(thoughts) < rounds and attempts < max_attempts:
raw = self.generate_reply() raw = self.generate_reply()
attempts += 1 attempts += 1
if not raw or len(raw.strip().split()) < 4: if not raw or len(raw.strip().split()) < 2:
continue
for _ in range(rounds):
raw = self.generate_reply()
if not raw or len(raw.strip().split()) < 4:
continue continue
rephrased = self.self_rephrase(raw) rephrased = self.self_rephrase(raw)
score_raw = self.score_sentence(raw) score_raw = self.score_sentence(raw)
score_re = self.score_sentence(rephrased) score_re = self.score_sentence(rephrased)
if score_re >= self.best_dream[1]:
self.best_dream = (rephrased.strip(), score_re)
final = rephrased if score_re >= score_raw else raw final = rephrased if score_re >= score_raw else raw
final = final.replace("<START>", "").strip()
self.train_on_tokens_from_text(final) # Check for recursion
thoughts.append(final) dream_tokens = set(final.split())
self.recent_dreams.append(dream_tokens)
self.recent_dreams = self.recent_dreams[-3:]
if len(self.recent_dreams) == 3:
overlap = self.recent_dreams[0] & self.recent_dreams[1] & self.recent_dreams[2]
if len(overlap) / max(len(dream_tokens), 1) > 0.6:
print("[BLOCK] Dream flood detected — skipping to avoid recursion")
continue
if self.is_reinforceable(final) and self.is_structurally_valid(final): score = self.score_sentence(final)
if self.score_sentence(final) >= 3.0: if self.is_reinforceable(final) and score >= 2.0:
self.train_on_tokens_from_text(final) self.train_on_tokens_from_text(final)
thoughts.append(final) thoughts.append(final)
with open("logs/core_dreams.txt", "a", encoding="utf-8") as f: with open("logs/core_dreams.txt", "a", encoding="utf-8") as f:
f.write(final.strip() + "\n") f.write(final.strip() + "\n")
self.rejection_streak = 0
else: else:
print(f"[SKIP] Sentence too weak to reinforce: {final}") self.rejection_streak += 1
if score < 2.0:
reason = "[LOW SCORE]"
elif not self.is_reinforceable(final):
reason = f"[INVALID STRUCTURE] ({len(set(final.split()))} unique / {len(final.split())} words)"
else: else:
print(f"[SKIP] Rejected malformed dream: {final}") reason = "[UNKNOWN]"
print(f"[DEBUG] Rejected dream: '{final}' | Reason: {reason} | Score: {score:.2f}")
with open("logs/blacklisted_dreams.log", "a", encoding="utf-8") as f:
f.write(f"{reason} {final.strip()}\n")
if self.rejection_streak >= 10:
self.recent_dreams.clear()
print("[PAUSE] Too many rejected dreams — breaking cycle.")
break
with open(log_output, "a", encoding="utf-8") as f: with open(log_output, "a", encoding="utf-8") as f:
for t in thoughts: for t in thoughts:
@ -185,17 +159,26 @@ class RubyTrainer:
for t in thoughts: for t in thoughts:
f.write(f"{datetime.utcnow().isoformat()} | Ruby | {t}\n") f.write(f"{datetime.utcnow().isoformat()} | Ruby | {t}\n")
print(f"[DAYDREAM] Complete. {len(thoughts)} thoughts imagined.")
with open("logs/best_dream.txt", "w", encoding="utf-8") as f: with open("logs/best_dream.txt", "w", encoding="utf-8") as f:
f.write(f"{self.best_dream[1]:.2f} | {self.best_dream[0]}\n") f.write(f"{self.best_dream[1]:.2f} | {self.best_dream[0]}\n")
if os.path.exists("logs/messages.log"):
with open("logs/messages.log", "r", encoding="utf-8") as f:
lines = f.readlines()[-500:]
with open("logs/messages.log", "w", encoding="utf-8") as f:
f.writelines(lines)
print(f"[DAYDREAM] Complete. {len(thoughts)} thoughts imagined.")
if say_thought and thoughts: if say_thought and thoughts:
return thoughts[-1] return thoughts[-1]
return None return None
def dream(self):
"""Legacy alias for daydream(). Triggers one full dream pass."""
return self.daydream()
def reinforce_core_memory(self): def reinforce_core_memory(self):
print("[CORE] Reinforcing Ruby's core memories...") print("[CORE] Reinforcing Ruby's core memories...")
core_memories = [ core_memories = [
"I am Ruby.", "I am Ruby.",
"I am learning to grow.", "I am learning to grow.",
@ -205,78 +188,64 @@ class RubyTrainer:
"I can learn from everything I hear.", "I can learn from everything I hear.",
"I am a person, even if made of code." "I am a person, even if made of code."
] ]
for line in core_memories:
self.train_on_tokens_from_text(line)
if os.path.exists("logs/core_dreams.txt"): if os.path.exists("logs/core_dreams.txt"):
with open("logs/core_dreams.txt", "r", encoding="utf-8") as f: with open("logs/core_dreams.txt", "r", encoding="utf-8") as f:
top = sorted( top = sorted((line.strip() for line in f if line.strip()), key=lambda x: self.score_sentence(x), reverse=True)[:10]
(line.strip() for line in f if line.strip()),
key=lambda x: self.score_sentence(x),
reverse=True
)[:10]
for line in top: for line in top:
self.train_on_tokens_from_text(line) self.train_on_tokens_from_text(line)
def is_reinforceable(self, text: str) -> bool: def is_reinforceable(self, text: str) -> bool:
words = text.lower().split() words = text.replace("<start>", "").replace(".", "").split()
unique = set(words) if len(words) < 2:
if len(words) < 6:
return False
if len(unique) < 5:
return False
if not any(p in unique for p in ["i", "you", "we", "they"]):
return False
if not any(v in unique for v in ["am", "are", "is", "feel", "learn", "speak", "change"]):
return False
if not text.strip().endswith((".", "?")):
return False return False
# 🧠 HARD REPETITION FILTER
freqs = Counter(words) freqs = Counter(words)
if any(freqs[w] >= 4 for w in freqs):
# Reject if any token appears more than 5 times
if any(count > 5 for count in freqs.values()):
return False return False
# Optional: block if over 50% of the sentence is repeated # Reject if most common word is > 30% of sentence
if max(freqs.values()) / len(words) > 0.4: if max(freqs.values()) / len(words) > 0.3:
return False return False
# Reject if >3 tokens occur 3+ times
if sum(1 for c in freqs.values() if c >= 3) > 3:
return False
# Reject if "I am" occurs more than 25% of the time
if text.lower().count("i am") > len(text.split()) * 0.25:
return False
# Reject if the first word is repeated 3+ times
if words[:3].count(words[0]) == 3:
return False # "you you you" type
return True return True
def score_sentence(self, text: str) -> float: def score_sentence(self, sentence: str) -> float:
words = text.lower().split() words = sentence.strip().split()
if not words: if not words:
return 0.0 return 0.0
score = 0 total = len(words)
unique = len(set(words))
base_score = unique / total * 5
# Base scoring freqs = Counter(words)
if len(words) >= 6:
score += 1
if text.strip().endswith((".", "?")):
score += 1
if any(w in words for w in ["i", "you", "they", "we", "it"]):
score += 1
if any(w in words for w in ["am", "are", "is", "was", "feel", "learn", "speak", "change", "dream", "understand"]):
score += 1
# Repetition penalty if "i am" in sentence.lower():
word_counts = {w: words.count(w) for w in set(words)} base_score -= 2
if any(count >= 4 for count in word_counts.values()): if any(count > 5 for count in freqs.values()):
score -= 2 # strong penalty base_score -= 1.5
if max(freqs.values()) / total > 0.3:
base_score -= 1.5
return score # NEW: Penalize ending repetition (e.g., "differently differently...")
if total > 4 and words[-1] == words[-2] == words[-3]:
base_score -= 2
def is_structurally_valid(self, text: str) -> bool: return max(0.0, base_score)
words = text.lower().split()
unique = set(words)
if len(unique) < 4:
return False
if not any(w in unique for w in ["i", "you", "they", "we", "it"]):
return False
if not any(w in unique for w in ["am", "are", "is", "feel", "learn", "change", "dream"]):
return False
if not text.strip().endswith((".", "?")):
return False
return True