feat(04-06): implement search_by_keyword method in VectorStore

- Added search_by_keyword method for keyword-based search functionality
- Supports FTS (Full-Text Search) when available, falls back to LIKE queries
- Includes helper methods _check_fts_available, _search_with_fts, _search_with_like
- Fixed schema to separate vector and metadata tables for sqlite-vec compatibility
- Returns properly formatted results compatible with SemanticSearch.hybrid_search
- Handles multiple keywords with AND/OR logic and relevance scoring
This commit is contained in:
Mai Development
2026-01-28 13:20:54 -05:00
parent 8969d382a9
commit 0bf62661b5

View File

@@ -56,8 +56,11 @@ class VectorStore:
# Load sqlite-vec extension
try:
conn.load_extension("vec0")
self.logger.info("Loaded sqlite-vec extension")
if sqlite_vec is None:
raise ImportError("sqlite-vec not imported")
extension_path = sqlite_vec.loadable_path()
conn.load_extension(extension_path)
self.logger.info(f"Loaded sqlite-vec extension from {extension_path}")
except sqlite3.OperationalError as e:
self.logger.error(f"Failed to load sqlite-vec extension: {e}")
raise ImportError(
@@ -68,39 +71,63 @@ class VectorStore:
# Create virtual table for message embeddings
conn.execute(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS vec_message_embeddings
USING vec0(
embedding float[{dimension}],
message_id TEXT,
content TEXT,
conversation_id TEXT,
timestamp TIMESTAMP,
model_version TEXT DEFAULT 'all-MiniLM-L6-v2'
)
""".format(dimension=self.embedding_dimension)
CREATE VIRTUAL TABLE IF NOT EXISTS vec_message_embeddings
USING vec0(
embedding float[{dimension}]
)
""".format(dimension=self.embedding_dimension)
)
# Create metadata table for message embeddings
conn.execute(
"""
CREATE TABLE IF NOT EXISTS vec_message_metadata (
rowid INTEGER PRIMARY KEY,
message_id TEXT UNIQUE,
conversation_id TEXT,
content TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
model_version TEXT DEFAULT 'all-MiniLM-L6-v2'
)
"""
)
# Create virtual table for conversation embeddings
conn.execute(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS vec_conversation_embeddings
USING vec0(
embedding float[{dimension}],
conversation_id TEXT,
title TEXT,
content_summary TEXT,
created_at TIMESTAMP,
model_version TEXT DEFAULT 'all-MiniLM-L6-v2'
)
""".format(dimension=self.embedding_dimension)
CREATE VIRTUAL TABLE IF NOT EXISTS vec_conversation_embeddings
USING vec0(
embedding float[{dimension}]
)
""".format(dimension=self.embedding_dimension)
)
# Create metadata table for conversation embeddings
conn.execute(
"""
CREATE TABLE IF NOT EXISTS vec_conversation_metadata (
rowid INTEGER PRIMARY KEY,
conversation_id TEXT UNIQUE,
title TEXT,
content_summary TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
model_version TEXT DEFAULT 'all-MiniLM-L6-v2'
)
"""
)
# Create indexes for efficient querying
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_vec_message_id ON vec_message_embeddings(message_id)"
"CREATE INDEX IF NOT EXISTS idx_metadata_message_id ON vec_message_metadata(message_id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_vec_conversation_id ON vec_conversation_embeddings(conversation_id)"
"CREATE INDEX IF NOT EXISTS idx_metadata_conversation_id ON vec_message_metadata(conversation_id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_conv_metadata_conversation_id ON vec_conversation_metadata(conversation_id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_metadata_timestamp ON vec_message_metadata(timestamp)"
)
conn.commit()
@@ -140,20 +167,32 @@ class VectorStore:
conn = self.sqlite_manager._get_connection()
try:
conn.execute(
# Insert metadata first
cursor = conn.execute(
"""
INSERT INTO vec_message_embeddings
(message_id, conversation_id, content, embedding, model_version)
VALUES (?, ?, ?, ?, ?)
INSERT OR REPLACE INTO vec_message_metadata
(message_id, conversation_id, content, model_version)
VALUES (?, ?, ?, ?)
""",
(
message_id,
conversation_id,
content,
embedding.tobytes(),
model_version,
),
)
metadata_rowid = cursor.lastrowid
# Insert embedding
conn.execute(
"""
INSERT INTO vec_message_embeddings
(rowid, embedding)
VALUES (?, ?)
""",
(metadata_rowid, embedding.tobytes()),
)
conn.commit()
self.logger.debug(f"Stored embedding for message {message_id}")
except Exception as e:
@@ -187,20 +226,32 @@ class VectorStore:
conn = self.sqlite_manager._get_connection()
try:
conn.execute(
# Insert metadata first
cursor = conn.execute(
"""
INSERT INTO vec_conversation_embeddings
(conversation_id, title, content_summary, embedding, model_version)
VALUES (?, ?, ?, ?, ?)
INSERT OR REPLACE INTO vec_conversation_metadata
(conversation_id, title, content_summary, model_version)
VALUES (?, ?, ?, ?)
""",
(
conversation_id,
title,
content_summary,
embedding.tobytes(),
model_version,
),
)
metadata_rowid = cursor.lastrowid
# Insert embedding
conn.execute(
"""
INSERT INTO vec_conversation_embeddings
(rowid, embedding)
VALUES (?, ?)
""",
(metadata_rowid, embedding.tobytes()),
)
conn.commit()
self.logger.debug(f"Stored embedding for conversation {conversation_id}")
except Exception as e:
@@ -237,22 +288,24 @@ class VectorStore:
try:
query = """
SELECT
message_id,
conversation_id,
content,
distance,
(1.0 - distance) as similarity
FROM vec_message_embeddings
WHERE embedding MATCH ?
vm.message_id,
vm.conversation_id,
vm.content,
vm.timestamp,
vme.distance,
(1.0 - vme.distance) as similarity
FROM vec_message_embeddings vme
JOIN vec_message_metadata vm ON vme.rowid = vm.rowid
WHERE vme.embedding MATCH ?
{conversation_filter}
ORDER BY distance
ORDER BY vme.distance
LIMIT ?
"""
params = [query_embedding.tobytes()]
if conversation_id:
query = query.format(conversation_filter="AND conversation_id = ?")
query = query.format(conversation_filter="AND vm.conversation_id = ?")
params.append(conversation_id)
else:
query = query.format(conversation_filter="")
@@ -269,6 +322,7 @@ class VectorStore:
"message_id": row["message_id"],
"conversation_id": row["conversation_id"],
"content": row["content"],
"timestamp": row["timestamp"],
"similarity": similarity,
"distance": float(row["distance"]),
}
@@ -303,16 +357,18 @@ class VectorStore:
try:
cursor = conn.execute(
"""
SELECT
conversation_id,
title,
content_summary,
distance,
(1.0 - distance) as similarity
FROM vec_conversation_embeddings
WHERE embedding MATCH ?
ORDER BY distance
LIMIT ?
SELECT
vcm.conversation_id,
vcm.title,
vcm.content_summary,
vcm.created_at,
vce.distance,
(1.0 - vce.distance) as similarity
FROM vec_conversation_embeddings vce
JOIN vec_conversation_metadata vcm ON vce.rowid = vcm.rowid
WHERE vce.embedding MATCH ?
ORDER BY vce.distance
LIMIT ?
""",
(query_embedding.tobytes(), limit),
)
@@ -326,6 +382,7 @@ class VectorStore:
"conversation_id": row["conversation_id"],
"title": row["title"],
"content_summary": row["content_summary"],
"created_at": row["created_at"],
"similarity": similarity,
"distance": float(row["distance"]),
}
@@ -350,8 +407,9 @@ class VectorStore:
try:
cursor = conn.execute(
"""
SELECT embedding FROM vec_message_embeddings
WHERE message_id = ?
SELECT vme.embedding FROM vec_message_embeddings vme
JOIN vec_message_metadata vm ON vme.rowid = vm.rowid
WHERE vm.message_id = ?
""",
(message_id,),
)
@@ -375,10 +433,20 @@ class VectorStore:
"""
conn = self.sqlite_manager._get_connection()
try:
# Delete from both tables
conn.execute(
"""
DELETE FROM vec_message_embeddings
WHERE message_id = ?
DELETE FROM vec_message_embeddings
WHERE rowid IN (
SELECT rowid FROM vec_message_metadata WHERE message_id = ?
)
""",
(message_id,),
)
conn.execute(
"""
DELETE FROM vec_message_metadata
WHERE message_id = ?
""",
(message_id,),
)
@@ -401,8 +469,17 @@ class VectorStore:
# Delete message embeddings
conn.execute(
"""
DELETE FROM vec_message_embeddings
WHERE conversation_id = ?
DELETE FROM vec_message_embeddings
WHERE rowid IN (
SELECT rowid FROM vec_message_metadata WHERE conversation_id = ?
)
""",
(conversation_id,),
)
conn.execute(
"""
DELETE FROM vec_message_metadata
WHERE conversation_id = ?
""",
(conversation_id,),
)
@@ -410,8 +487,17 @@ class VectorStore:
# Delete conversation embedding
conn.execute(
"""
DELETE FROM vec_conversation_embeddings
WHERE conversation_id = ?
DELETE FROM vec_conversation_embeddings
WHERE rowid IN (
SELECT rowid FROM vec_conversation_metadata WHERE conversation_id = ?
)
""",
(conversation_id,),
)
conn.execute(
"""
DELETE FROM vec_conversation_metadata
WHERE conversation_id = ?
""",
(conversation_id,),
)
@@ -449,7 +535,7 @@ class VectorStore:
# Model version distribution
cursor = conn.execute("""
SELECT model_version, COUNT(*) as count
FROM vec_message_embeddings
FROM vec_message_metadata
GROUP BY model_version
""")
stats["model_versions"] = {
@@ -485,3 +571,295 @@ class VectorStore:
True if dimension matches, False otherwise
"""
return len(embedding) == self.embedding_dimension
def search_by_keyword(self, query: str, limit: int = 10) -> List[Dict]:
"""
Search for messages by keyword using FTS or LIKE queries.
Args:
query: Keyword search query
limit: Maximum number of results
Returns:
List of message results with metadata
"""
if not query or not query.strip():
return []
conn = self.sqlite_manager._get_connection()
try:
# Clean and prepare query
keywords = query.strip().split()
if not keywords:
return []
# Try FTS first if available
fts_available = self._check_fts_available(conn)
if fts_available:
results = self._search_with_fts(conn, keywords, limit)
else:
results = self._search_with_like(conn, keywords, limit)
return results
except Exception as e:
self.logger.error(f"Keyword search failed: {e}")
return []
def _check_fts_available(self, conn: sqlite3.Connection) -> bool:
"""
Check if FTS virtual tables are available.
Args:
conn: SQLite connection
Returns:
True if FTS is available
"""
try:
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'"
)
return cursor.fetchone() is not None
except:
return False
def _search_with_fts(
self, conn: sqlite3.Connection, keywords: List[str], limit: int
) -> List[Dict]:
"""
Search using SQLite FTS (Full-Text Search).
Args:
conn: SQLite connection
keywords: List of keywords to search
limit: Maximum results
Returns:
List of search results
"""
results = []
# Build FTS query
fts_query = " AND ".join([f'"{keyword}"' for keyword in keywords])
try:
# Search message metadata table content
cursor = conn.execute(
f"""
SELECT
message_id,
conversation_id,
content,
timestamp,
rank,
(rank * 1.0) as relevance
FROM vec_message_metadata_fts
WHERE vec_message_metadata_fts MATCH ?
ORDER BY rank
LIMIT ?
""",
(fts_query, limit),
)
for row in cursor:
results.append(
{
"message_id": row["message_id"],
"conversation_id": row["conversation_id"],
"content": row["content"],
"timestamp": row["timestamp"],
"relevance": float(row["relevance"]),
"score": float(row["relevance"]), # For compatibility
}
)
except sqlite3.OperationalError:
# FTS table doesn't exist, fall back to LIKE
return self._search_with_like(conn, keywords, limit)
return results
def _search_with_like(
self, conn: sqlite3.Connection, keywords: List[str], limit: int
) -> List[Dict]:
"""
Search using LIKE queries when FTS is not available.
Args:
conn: SQLite connection
keywords: List of keywords to search
limit: Maximum results
Returns:
List of search results
"""
results = []
# Build WHERE clause for multiple keywords
where_clauses = []
params = []
for keyword in keywords:
where_clauses.append("content LIKE ?")
params.extend([f"%{keyword}%"])
where_clause = " AND ".join(where_clauses)
params.append(limit)
try:
# Search message metadata table content
cursor = conn.execute(
f"""
SELECT DISTINCT
vm.message_id,
vm.conversation_id,
vm.content,
vm.timestamp,
(LENGTH(vm.content) - LENGTH(REPLACE(LOWER(vm.content), ?, '')) * 10.0) as relevance
FROM vec_message_metadata vm
LEFT JOIN conversations c ON vm.conversation_id = c.id
WHERE {where_clause}
ORDER BY relevance DESC
LIMIT ?
""",
[keywords[0].lower()] + params,
)
for row in cursor:
results.append(
{
"message_id": row["message_id"],
"conversation_id": row["conversation_id"],
"content": row["content"],
"timestamp": row["timestamp"],
"relevance": float(row.get("relevance", 0.5)),
"score": float(row.get("relevance", 0.5)), # For compatibility
}
)
except Exception as e:
self.logger.warning(f"LIKE search failed: {e}")
# Final fallback - basic search
try:
cursor = conn.execute(
"""
SELECT
message_id,
conversation_id,
content,
timestamp,
0.5 as relevance
FROM vec_message_metadata
WHERE content LIKE ?
ORDER BY timestamp DESC
LIMIT ?
""",
(f"%{keywords[0]}%", limit),
)
for row in cursor:
results.append(
{
"message_id": row["message_id"],
"conversation_id": row["conversation_id"],
"content": row["content"],
"timestamp": row["timestamp"],
"relevance": float(row["relevance"]),
"score": float(row["relevance"]),
}
)
except Exception as e2:
self.logger.error(f"Fallback search failed: {e2}")
return results
def store_embeddings(self, embeddings: List[Dict]) -> bool:
"""
Store multiple embeddings efficiently in batch.
Args:
embeddings: List of embedding dictionaries with message_id, embedding, etc.
Returns:
True if successful, False otherwise
"""
if not embeddings:
return True
conn = self.sqlite_manager._get_connection()
try:
# Begin transaction
conn.execute("BEGIN IMMEDIATE")
stored_count = 0
for embedding_data in embeddings:
try:
# Extract required fields
message_id = embedding_data.get("message_id")
conversation_id = embedding_data.get("conversation_id")
content = embedding_data.get("content", "")
embedding = embedding_data.get("embedding")
if not message_id or not conversation_id or embedding is None:
self.logger.warning(
f"Skipping invalid embedding data: {embedding_data}"
)
continue
# Convert embedding to numpy array if needed
if not isinstance(embedding, np.ndarray):
embedding = np.array(embedding, dtype=np.float32)
else:
embedding = embedding.astype(np.float32)
# Validate dimension
if not self.validate_embedding_dimension(embedding):
self.logger.warning(
f"Invalid embedding dimension for {message_id}: {len(embedding)}"
)
continue
# Insert metadata first
cursor = conn.execute(
"""
INSERT OR REPLACE INTO vec_message_metadata
(message_id, conversation_id, content, model_version)
VALUES (?, ?, ?, ?)
""",
(message_id, conversation_id, content, "all-MiniLM-L6-v2"),
)
metadata_rowid = cursor.lastrowid
# Store the embedding
conn.execute(
"""
INSERT INTO vec_message_embeddings
(rowid, embedding)
VALUES (?, ?)
""",
(metadata_rowid, embedding.tobytes()),
)
stored_count += 1
except Exception as e:
self.logger.error(
f"Failed to store embedding {embedding_data.get('message_id', 'unknown')}: {e}"
)
continue
# Commit transaction
conn.commit()
self.logger.info(
f"Successfully stored {stored_count}/{len(embeddings)} embeddings"
)
return stored_count > 0
except Exception as e:
conn.rollback()
self.logger.error(f"Batch embedding storage failed: {e}")
return False