Ruby/brain/brainmap.py

134 lines
4.0 KiB
Python

import re
import json
import os
from sklearn.cluster import KMeans
import numpy as np
from utils.unicleaner import clean_unicode
BRAINMAP_PATH = "memory/brainmap.json" # actual connection data
BRAINMAP_CACHE_PATH = "memory/brainmap_cache.json" # for dashboard rendering only
brainmap = {}
MAX_CONNECTIONS = 50 # Max neighbors to keep per word
def is_valid_brainword(word: str) -> bool:
word = clean_unicode(word.strip())
if len(word) < 3:
return False
if re.fullmatch(r"\d+", word): # Pure numbers
return False
if re.fullmatch(r"(i|ii|iii|iv|v|vi|vii|viii|ix|x|xi|xii|xiii|xiv|xv)", word.lower()):
return False
if not word.isascii():
return False
if re.search(r"[^a-zA-Z0-9\-]", word): # Block weird characters except dash
return False
return True
def load_brainmap():
global brainmap
if os.path.exists(BRAINMAP_PATH):
with open(BRAINMAP_PATH, "r", encoding="utf-8") as f:
brainmap = json.load(f)
def save_brainmap():
with open(BRAINMAP_PATH, "w", encoding="utf-8") as f:
json.dump(brainmap, f, indent=2)
def add_to_brainmap(words):
if isinstance(words, str):
words = words.split()
cleaned_words = [w.lower() for w in words if is_valid_brainword(w)]
updated = False
for i, word in enumerate(cleaned_words):
if word not in brainmap:
brainmap[word] = {}
updated = True
neighbors = cleaned_words[max(0, i-2):i] + cleaned_words[i+1:i+3]
for neighbor in neighbors:
if neighbor == word or not is_valid_brainword(neighbor):
continue
previous_count = brainmap[word].get(neighbor, 0)
brainmap[word][neighbor] = previous_count + 1
if previous_count == 0:
updated = True
# Limit neighbors
if len(brainmap[word]) > MAX_CONNECTIONS:
brainmap[word] = dict(sorted(brainmap[word].items(), key=lambda x: x[1], reverse=True)[:MAX_CONNECTIONS])
if updated:
save_brainmap()
def prune_brainmap(min_neighbors=2, min_strength=2):
"""
Remove weakly connected or isolated words from the brainmap.
Args:
min_neighbors (int): Minimum neighbors required to keep a word.
min_strength (int): Minimum strength (connection count) for neighbors.
"""
global brainmap
to_delete = []
for word, neighbors in brainmap.items():
# Clean weak neighbors
weak_neighbors = [n for n, count in neighbors.items() if count < min_strength]
for n in weak_neighbors:
del neighbors[n]
# Delete word if too few neighbors remain
if len(neighbors) < min_neighbors:
to_delete.append(word)
for word in to_delete:
del brainmap[word]
save_brainmap()
def get_brainmap():
return brainmap
def refresh_brainmap_cache(min_weight=2, min_neighbors=2):
try:
with open("data/memory/brainmap.json", "r", encoding="utf-8") as f:
raw_data = json.load(f)
except Exception as e:
print(f"[Brainmap] Error reading brainmap.json: {e}")
return
nodes = {}
links = []
for source, targets in raw_data.items():
if not isinstance(targets, dict):
continue
strong_links = [(t, w) for t, w in targets.items() if w >= min_weight]
if len(strong_links) < min_neighbors:
continue # skip weak nodes
nodes[source] = True
for target, weight in strong_links:
if weight >= min_weight:
nodes[target] = True
links.append({"source": source, "target": target, "value": weight})
clustered_nodes = [{"id": n, "group": hash(n) % 10} for n in nodes]
with open("data/memory/brainmap_cache.json", "w", encoding="utf-8") as f:
json.dump({"nodes": clustered_nodes, "links": links}, f, ensure_ascii=False, indent=2)
# print(f"[Brainmap] Cache updated: {len(clustered_nodes)} nodes, {len(links)} links")