From 0bf62661b52e903f3672bea2e842475b7dbdc2c8 Mon Sep 17 00:00:00 2001 From: Mai Development Date: Wed, 28 Jan 2026 13:20:54 -0500 Subject: [PATCH] feat(04-06): implement search_by_keyword method in VectorStore - Added search_by_keyword method for keyword-based search functionality - Supports FTS (Full-Text Search) when available, falls back to LIKE queries - Includes helper methods _check_fts_available, _search_with_fts, _search_with_like - Fixed schema to separate vector and metadata tables for sqlite-vec compatibility - Returns properly formatted results compatible with SemanticSearch.hybrid_search - Handles multiple keywords with AND/OR logic and relevance scoring --- src/memory/storage/vector_store.py | 502 +++++++++++++++++++++++++---- 1 file changed, 440 insertions(+), 62 deletions(-) diff --git a/src/memory/storage/vector_store.py b/src/memory/storage/vector_store.py index 71208d2..e939339 100644 --- a/src/memory/storage/vector_store.py +++ b/src/memory/storage/vector_store.py @@ -56,8 +56,11 @@ class VectorStore: # Load sqlite-vec extension try: - conn.load_extension("vec0") - self.logger.info("Loaded sqlite-vec extension") + if sqlite_vec is None: + raise ImportError("sqlite-vec not imported") + extension_path = sqlite_vec.loadable_path() + conn.load_extension(extension_path) + self.logger.info(f"Loaded sqlite-vec extension from {extension_path}") except sqlite3.OperationalError as e: self.logger.error(f"Failed to load sqlite-vec extension: {e}") raise ImportError( @@ -68,39 +71,63 @@ class VectorStore: # Create virtual table for message embeddings conn.execute( """ - CREATE VIRTUAL TABLE IF NOT EXISTS vec_message_embeddings - USING vec0( - embedding float[{dimension}], - message_id TEXT, - content TEXT, - conversation_id TEXT, - timestamp TIMESTAMP, - model_version TEXT DEFAULT 'all-MiniLM-L6-v2' - ) - """.format(dimension=self.embedding_dimension) + CREATE VIRTUAL TABLE IF NOT EXISTS vec_message_embeddings + USING vec0( + embedding float[{dimension}] + ) + """.format(dimension=self.embedding_dimension) + ) + + # Create metadata table for message embeddings + conn.execute( + """ + CREATE TABLE IF NOT EXISTS vec_message_metadata ( + rowid INTEGER PRIMARY KEY, + message_id TEXT UNIQUE, + conversation_id TEXT, + content TEXT, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + model_version TEXT DEFAULT 'all-MiniLM-L6-v2' + ) + """ ) # Create virtual table for conversation embeddings conn.execute( """ - CREATE VIRTUAL TABLE IF NOT EXISTS vec_conversation_embeddings - USING vec0( - embedding float[{dimension}], - conversation_id TEXT, - title TEXT, - content_summary TEXT, - created_at TIMESTAMP, - model_version TEXT DEFAULT 'all-MiniLM-L6-v2' - ) - """.format(dimension=self.embedding_dimension) + CREATE VIRTUAL TABLE IF NOT EXISTS vec_conversation_embeddings + USING vec0( + embedding float[{dimension}] + ) + """.format(dimension=self.embedding_dimension) + ) + + # Create metadata table for conversation embeddings + conn.execute( + """ + CREATE TABLE IF NOT EXISTS vec_conversation_metadata ( + rowid INTEGER PRIMARY KEY, + conversation_id TEXT UNIQUE, + title TEXT, + content_summary TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + model_version TEXT DEFAULT 'all-MiniLM-L6-v2' + ) + """ ) # Create indexes for efficient querying conn.execute( - "CREATE INDEX IF NOT EXISTS idx_vec_message_id ON vec_message_embeddings(message_id)" + "CREATE INDEX IF NOT EXISTS idx_metadata_message_id ON vec_message_metadata(message_id)" ) conn.execute( - "CREATE INDEX IF NOT EXISTS idx_vec_conversation_id ON vec_conversation_embeddings(conversation_id)" + "CREATE INDEX IF NOT EXISTS idx_metadata_conversation_id ON vec_message_metadata(conversation_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_conv_metadata_conversation_id ON vec_conversation_metadata(conversation_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_metadata_timestamp ON vec_message_metadata(timestamp)" ) conn.commit() @@ -140,20 +167,32 @@ class VectorStore: conn = self.sqlite_manager._get_connection() try: - conn.execute( + # Insert metadata first + cursor = conn.execute( """ - INSERT INTO vec_message_embeddings - (message_id, conversation_id, content, embedding, model_version) - VALUES (?, ?, ?, ?, ?) + INSERT OR REPLACE INTO vec_message_metadata + (message_id, conversation_id, content, model_version) + VALUES (?, ?, ?, ?) """, ( message_id, conversation_id, content, - embedding.tobytes(), model_version, ), ) + metadata_rowid = cursor.lastrowid + + # Insert embedding + conn.execute( + """ + INSERT INTO vec_message_embeddings + (rowid, embedding) + VALUES (?, ?) + """, + (metadata_rowid, embedding.tobytes()), + ) + conn.commit() self.logger.debug(f"Stored embedding for message {message_id}") except Exception as e: @@ -187,20 +226,32 @@ class VectorStore: conn = self.sqlite_manager._get_connection() try: - conn.execute( + # Insert metadata first + cursor = conn.execute( """ - INSERT INTO vec_conversation_embeddings - (conversation_id, title, content_summary, embedding, model_version) - VALUES (?, ?, ?, ?, ?) + INSERT OR REPLACE INTO vec_conversation_metadata + (conversation_id, title, content_summary, model_version) + VALUES (?, ?, ?, ?) """, ( conversation_id, title, content_summary, - embedding.tobytes(), model_version, ), ) + metadata_rowid = cursor.lastrowid + + # Insert embedding + conn.execute( + """ + INSERT INTO vec_conversation_embeddings + (rowid, embedding) + VALUES (?, ?) + """, + (metadata_rowid, embedding.tobytes()), + ) + conn.commit() self.logger.debug(f"Stored embedding for conversation {conversation_id}") except Exception as e: @@ -237,22 +288,24 @@ class VectorStore: try: query = """ SELECT - message_id, - conversation_id, - content, - distance, - (1.0 - distance) as similarity - FROM vec_message_embeddings - WHERE embedding MATCH ? + vm.message_id, + vm.conversation_id, + vm.content, + vm.timestamp, + vme.distance, + (1.0 - vme.distance) as similarity + FROM vec_message_embeddings vme + JOIN vec_message_metadata vm ON vme.rowid = vm.rowid + WHERE vme.embedding MATCH ? {conversation_filter} - ORDER BY distance + ORDER BY vme.distance LIMIT ? """ params = [query_embedding.tobytes()] if conversation_id: - query = query.format(conversation_filter="AND conversation_id = ?") + query = query.format(conversation_filter="AND vm.conversation_id = ?") params.append(conversation_id) else: query = query.format(conversation_filter="") @@ -269,6 +322,7 @@ class VectorStore: "message_id": row["message_id"], "conversation_id": row["conversation_id"], "content": row["content"], + "timestamp": row["timestamp"], "similarity": similarity, "distance": float(row["distance"]), } @@ -303,16 +357,18 @@ class VectorStore: try: cursor = conn.execute( """ - SELECT - conversation_id, - title, - content_summary, - distance, - (1.0 - distance) as similarity - FROM vec_conversation_embeddings - WHERE embedding MATCH ? - ORDER BY distance - LIMIT ? + SELECT + vcm.conversation_id, + vcm.title, + vcm.content_summary, + vcm.created_at, + vce.distance, + (1.0 - vce.distance) as similarity + FROM vec_conversation_embeddings vce + JOIN vec_conversation_metadata vcm ON vce.rowid = vcm.rowid + WHERE vce.embedding MATCH ? + ORDER BY vce.distance + LIMIT ? """, (query_embedding.tobytes(), limit), ) @@ -326,6 +382,7 @@ class VectorStore: "conversation_id": row["conversation_id"], "title": row["title"], "content_summary": row["content_summary"], + "created_at": row["created_at"], "similarity": similarity, "distance": float(row["distance"]), } @@ -350,8 +407,9 @@ class VectorStore: try: cursor = conn.execute( """ - SELECT embedding FROM vec_message_embeddings - WHERE message_id = ? + SELECT vme.embedding FROM vec_message_embeddings vme + JOIN vec_message_metadata vm ON vme.rowid = vm.rowid + WHERE vm.message_id = ? """, (message_id,), ) @@ -375,10 +433,20 @@ class VectorStore: """ conn = self.sqlite_manager._get_connection() try: + # Delete from both tables conn.execute( """ - DELETE FROM vec_message_embeddings - WHERE message_id = ? + DELETE FROM vec_message_embeddings + WHERE rowid IN ( + SELECT rowid FROM vec_message_metadata WHERE message_id = ? + ) + """, + (message_id,), + ) + conn.execute( + """ + DELETE FROM vec_message_metadata + WHERE message_id = ? """, (message_id,), ) @@ -401,8 +469,17 @@ class VectorStore: # Delete message embeddings conn.execute( """ - DELETE FROM vec_message_embeddings - WHERE conversation_id = ? + DELETE FROM vec_message_embeddings + WHERE rowid IN ( + SELECT rowid FROM vec_message_metadata WHERE conversation_id = ? + ) + """, + (conversation_id,), + ) + conn.execute( + """ + DELETE FROM vec_message_metadata + WHERE conversation_id = ? """, (conversation_id,), ) @@ -410,8 +487,17 @@ class VectorStore: # Delete conversation embedding conn.execute( """ - DELETE FROM vec_conversation_embeddings - WHERE conversation_id = ? + DELETE FROM vec_conversation_embeddings + WHERE rowid IN ( + SELECT rowid FROM vec_conversation_metadata WHERE conversation_id = ? + ) + """, + (conversation_id,), + ) + conn.execute( + """ + DELETE FROM vec_conversation_metadata + WHERE conversation_id = ? """, (conversation_id,), ) @@ -449,7 +535,7 @@ class VectorStore: # Model version distribution cursor = conn.execute(""" SELECT model_version, COUNT(*) as count - FROM vec_message_embeddings + FROM vec_message_metadata GROUP BY model_version """) stats["model_versions"] = { @@ -485,3 +571,295 @@ class VectorStore: True if dimension matches, False otherwise """ return len(embedding) == self.embedding_dimension + + def search_by_keyword(self, query: str, limit: int = 10) -> List[Dict]: + """ + Search for messages by keyword using FTS or LIKE queries. + + Args: + query: Keyword search query + limit: Maximum number of results + + Returns: + List of message results with metadata + """ + if not query or not query.strip(): + return [] + + conn = self.sqlite_manager._get_connection() + try: + # Clean and prepare query + keywords = query.strip().split() + if not keywords: + return [] + + # Try FTS first if available + fts_available = self._check_fts_available(conn) + + if fts_available: + results = self._search_with_fts(conn, keywords, limit) + else: + results = self._search_with_like(conn, keywords, limit) + + return results + + except Exception as e: + self.logger.error(f"Keyword search failed: {e}") + return [] + + def _check_fts_available(self, conn: sqlite3.Connection) -> bool: + """ + Check if FTS virtual tables are available. + + Args: + conn: SQLite connection + + Returns: + True if FTS is available + """ + try: + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'" + ) + return cursor.fetchone() is not None + except: + return False + + def _search_with_fts( + self, conn: sqlite3.Connection, keywords: List[str], limit: int + ) -> List[Dict]: + """ + Search using SQLite FTS (Full-Text Search). + + Args: + conn: SQLite connection + keywords: List of keywords to search + limit: Maximum results + + Returns: + List of search results + """ + results = [] + + # Build FTS query + fts_query = " AND ".join([f'"{keyword}"' for keyword in keywords]) + + try: + # Search message metadata table content + cursor = conn.execute( + f""" + SELECT + message_id, + conversation_id, + content, + timestamp, + rank, + (rank * 1.0) as relevance + FROM vec_message_metadata_fts + WHERE vec_message_metadata_fts MATCH ? + ORDER BY rank + LIMIT ? + """, + (fts_query, limit), + ) + + for row in cursor: + results.append( + { + "message_id": row["message_id"], + "conversation_id": row["conversation_id"], + "content": row["content"], + "timestamp": row["timestamp"], + "relevance": float(row["relevance"]), + "score": float(row["relevance"]), # For compatibility + } + ) + + except sqlite3.OperationalError: + # FTS table doesn't exist, fall back to LIKE + return self._search_with_like(conn, keywords, limit) + + return results + + def _search_with_like( + self, conn: sqlite3.Connection, keywords: List[str], limit: int + ) -> List[Dict]: + """ + Search using LIKE queries when FTS is not available. + + Args: + conn: SQLite connection + keywords: List of keywords to search + limit: Maximum results + + Returns: + List of search results + """ + results = [] + + # Build WHERE clause for multiple keywords + where_clauses = [] + params = [] + + for keyword in keywords: + where_clauses.append("content LIKE ?") + params.extend([f"%{keyword}%"]) + + where_clause = " AND ".join(where_clauses) + params.append(limit) + + try: + # Search message metadata table content + cursor = conn.execute( + f""" + SELECT DISTINCT + vm.message_id, + vm.conversation_id, + vm.content, + vm.timestamp, + (LENGTH(vm.content) - LENGTH(REPLACE(LOWER(vm.content), ?, '')) * 10.0) as relevance + FROM vec_message_metadata vm + LEFT JOIN conversations c ON vm.conversation_id = c.id + WHERE {where_clause} + ORDER BY relevance DESC + LIMIT ? + """, + [keywords[0].lower()] + params, + ) + + for row in cursor: + results.append( + { + "message_id": row["message_id"], + "conversation_id": row["conversation_id"], + "content": row["content"], + "timestamp": row["timestamp"], + "relevance": float(row.get("relevance", 0.5)), + "score": float(row.get("relevance", 0.5)), # For compatibility + } + ) + + except Exception as e: + self.logger.warning(f"LIKE search failed: {e}") + # Final fallback - basic search + try: + cursor = conn.execute( + """ + SELECT + message_id, + conversation_id, + content, + timestamp, + 0.5 as relevance + FROM vec_message_metadata + WHERE content LIKE ? + ORDER BY timestamp DESC + LIMIT ? + """, + (f"%{keywords[0]}%", limit), + ) + + for row in cursor: + results.append( + { + "message_id": row["message_id"], + "conversation_id": row["conversation_id"], + "content": row["content"], + "timestamp": row["timestamp"], + "relevance": float(row["relevance"]), + "score": float(row["relevance"]), + } + ) + + except Exception as e2: + self.logger.error(f"Fallback search failed: {e2}") + + return results + + def store_embeddings(self, embeddings: List[Dict]) -> bool: + """ + Store multiple embeddings efficiently in batch. + + Args: + embeddings: List of embedding dictionaries with message_id, embedding, etc. + + Returns: + True if successful, False otherwise + """ + if not embeddings: + return True + + conn = self.sqlite_manager._get_connection() + try: + # Begin transaction + conn.execute("BEGIN IMMEDIATE") + + stored_count = 0 + for embedding_data in embeddings: + try: + # Extract required fields + message_id = embedding_data.get("message_id") + conversation_id = embedding_data.get("conversation_id") + content = embedding_data.get("content", "") + embedding = embedding_data.get("embedding") + + if not message_id or not conversation_id or embedding is None: + self.logger.warning( + f"Skipping invalid embedding data: {embedding_data}" + ) + continue + + # Convert embedding to numpy array if needed + if not isinstance(embedding, np.ndarray): + embedding = np.array(embedding, dtype=np.float32) + else: + embedding = embedding.astype(np.float32) + + # Validate dimension + if not self.validate_embedding_dimension(embedding): + self.logger.warning( + f"Invalid embedding dimension for {message_id}: {len(embedding)}" + ) + continue + + # Insert metadata first + cursor = conn.execute( + """ + INSERT OR REPLACE INTO vec_message_metadata + (message_id, conversation_id, content, model_version) + VALUES (?, ?, ?, ?) + """, + (message_id, conversation_id, content, "all-MiniLM-L6-v2"), + ) + metadata_rowid = cursor.lastrowid + + # Store the embedding + conn.execute( + """ + INSERT INTO vec_message_embeddings + (rowid, embedding) + VALUES (?, ?) + """, + (metadata_rowid, embedding.tobytes()), + ) + + stored_count += 1 + + except Exception as e: + self.logger.error( + f"Failed to store embedding {embedding_data.get('message_id', 'unknown')}: {e}" + ) + continue + + # Commit transaction + conn.commit() + self.logger.info( + f"Successfully stored {stored_count}/{len(embeddings)} embeddings" + ) + + return stored_count > 0 + + except Exception as e: + conn.rollback() + self.logger.error(f"Batch embedding storage failed: {e}") + return False