From b9aba97086bb97773d3b0d3ac8b5020052566828 Mon Sep 17 00:00:00 2001 From: Mai Development Date: Tue, 27 Jan 2026 23:22:50 -0500 Subject: [PATCH] feat(04-02): create semantic search with embedding-based retrieval - Added sentence-transformers to requirements.txt for semantic embeddings - Created src/memory/retrieval/ module with search capabilities - Implemented SemanticSearch class with embedding generation and vector similarity - Added SearchResult and SearchQuery dataclasses for structured search results - Included hybrid search combining semantic and keyword matching - Added conversation indexing for semantic search - Followed lazy loading pattern for embedding model performance Files created: - src/memory/retrieval/__init__.py - src/memory/retrieval/search_types.py - src/memory/retrieval/semantic_search.py - Updated src/memory/__init__.py with enhanced MemoryManager Note: sentence-transformers installation requires proper venv setup in production --- requirements.txt | 5 +- src/memory/__init__.py | 295 +++++++++++++++- src/memory/retrieval/__init__.py | 12 + src/memory/retrieval/context_aware.py | 385 ++++++++++++++++++++ src/memory/retrieval/search_types.py | 70 ++++ src/memory/retrieval/semantic_search.py | 373 ++++++++++++++++++++ src/memory/retrieval/timeline_search.py | 449 ++++++++++++++++++++++++ 7 files changed, 1569 insertions(+), 20 deletions(-) create mode 100644 src/memory/retrieval/__init__.py create mode 100644 src/memory/retrieval/context_aware.py create mode 100644 src/memory/retrieval/search_types.py create mode 100644 src/memory/retrieval/semantic_search.py create mode 100644 src/memory/retrieval/timeline_search.py diff --git a/requirements.txt b/requirements.txt index 07ddf85..f3eaa68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,7 @@ pyyaml>=6.0 gpu-tracker>=5.0.1 bandit>=1.7.7 semgrep>=1.99 -docker>=7.0.0 \ No newline at end of file +docker>=7.0.0 +sqlite-vec>=0.1.0 +numpy>=1.24.0 +sentence-transformers>=2.2.2 \ No newline at end of file diff --git a/src/memory/__init__.py b/src/memory/__init__.py index 0a2e7a5..77f41c3 100644 --- a/src/memory/__init__.py +++ b/src/memory/__init__.py @@ -6,46 +6,60 @@ messages, and associated vector embeddings for semantic search capabilities. """ from .storage.sqlite_manager import SQLiteManager -# from .storage.vector_store import VectorStore # Will be added in Task 2 +from .storage.vector_store import VectorStore +from .retrieval.semantic_search import SemanticSearch +from .retrieval.context_aware import ContextAwareSearch +from .retrieval.timeline_search import TimelineSearch -from typing import Optional +from typing import Optional, List, Dict, Any, Union +from datetime import datetime import logging class MemoryManager: """ - Main interface for memory operations in Mai. + Enhanced memory manager with unified search interface. - Provides unified access to conversation storage and vector search - capabilities through SQLite with sqlite-vec extension. + Provides comprehensive memory operations including semantic search, + context-aware search, timeline filtering, and hybrid search strategies. """ def __init__(self, db_path: str = "memory.db"): """ - Initialize memory manager with SQLite database. + Initialize memory manager with SQLite database and search capabilities. Args: db_path: Path to SQLite database file """ self.db_path = db_path self._sqlite_manager: Optional[SQLiteManager] = None - # self._vector_store: Optional[VectorStore] = None # Will be added in Task 2 + self._vector_store: Optional[VectorStore] = None + self._semantic_search: Optional[SemanticSearch] = None + self._context_aware_search: Optional[ContextAwareSearch] = None + self._timeline_search: Optional[TimelineSearch] = None self.logger = logging.getLogger(__name__) def initialize(self) -> None: """ - Initialize storage components. + Initialize storage and search components. - Creates database schema and vector tables if they don't exist. + Creates database schema, vector tables, and search instances. """ try: + # Initialize storage components self._sqlite_manager = SQLiteManager(self.db_path) - # self._vector_store = VectorStore(self._sqlite_manager) # Will be added in Task 2 + self._vector_store = VectorStore(self._sqlite_manager) + + # Initialize search components + self._semantic_search = SemanticSearch(self._vector_store) + self._context_aware_search = ContextAwareSearch(self._sqlite_manager) + self._timeline_search = TimelineSearch(self._sqlite_manager) + self.logger.info( - f"Memory manager initialized with database: {self.db_path}" + f"Enhanced memory manager initialized with database: {self.db_path}" ) except Exception as e: - self.logger.error(f"Failed to initialize memory manager: {e}") + self.logger.error(f"Failed to initialize enhanced memory manager: {e}") raise @property @@ -57,15 +71,258 @@ class MemoryManager: ) return self._sqlite_manager - # @property - # def vector_store(self) -> VectorStore: - # """Get vector store instance.""" - # if self._vector_store is None: - # raise RuntimeError("Memory manager not initialized. Call initialize() first.") - # return self._vector_store + @property + def vector_store(self) -> VectorStore: + """Get vector store instance.""" + if self._vector_store is None: + raise RuntimeError( + "Memory manager not initialized. Call initialize() first." + ) + return self._vector_store + @property + def semantic_search(self) -> SemanticSearch: + """Get semantic search instance.""" + if self._semantic_search is None: + raise RuntimeError( + "Memory manager not initialized. Call initialize() first." + ) + return self._semantic_search + + @property + def context_aware_search(self) -> ContextAwareSearch: + """Get context-aware search instance.""" + if self._context_aware_search is None: + raise RuntimeError( + "Memory manager not initialized. Call initialize() first." + ) + return self._context_aware_search + + @property + def timeline_search(self) -> TimelineSearch: + """Get timeline search instance.""" + if self._timeline_search is None: + raise RuntimeError( + "Memory manager not initialized. Call initialize() first." + ) + return self._timeline_search + + # Legacy methods for compatibility def close(self) -> None: """Close database connections.""" if self._sqlite_manager: self._sqlite_manager.close() - self.logger.info("Memory manager closed") + self.logger.info("Enhanced memory manager closed") + + # Unified search interface + def search( + self, + query: str, + search_type: str = "semantic", + limit: int = 5, + conversation_id: Optional[str] = None, + date_start: Optional[datetime] = None, + date_end: Optional[datetime] = None, + current_topic: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """ + Unified search interface supporting multiple search strategies. + + Args: + query: Search query text + search_type: Type of search ("semantic", "keyword", "context_aware", "timeline", "hybrid") + limit: Maximum number of results to return + conversation_id: Current conversation ID for context-aware search + date_start: Start date for timeline search + date_end: End date for timeline search + current_topic: Current topic for context-aware prioritization + + Returns: + List of search results as dictionaries + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + try: + results = [] + + if search_type == "semantic": + results = self._semantic_search.search(query, limit) + elif search_type == "keyword": + results = self._semantic_search.keyword_search(query, limit) + elif search_type == "context_aware": + # Get base semantic results, then prioritize by topic + base_results = self._semantic_search.search(query, limit * 2) + results = self._context_aware_search.prioritize_by_topic( + base_results, current_topic, conversation_id + ) + elif search_type == "timeline": + if date_start and date_end: + results = self._timeline_search.search_by_date_range( + date_start, date_end, limit + ) + else: + # Default to recent search + results = self._timeline_search.search_recent(limit=limit) + elif search_type == "hybrid": + results = self._semantic_search.hybrid_search(query, limit) + else: + self.logger.warning( + f"Unknown search type: {search_type}, falling back to semantic" + ) + results = self._semantic_search.search(query, limit) + + # Convert search results to dictionaries for external interface + return [ + { + "conversation_id": result.conversation_id, + "message_id": result.message_id, + "content": result.content, + "relevance_score": result.relevance_score, + "snippet": result.snippet, + "timestamp": result.timestamp.isoformat() + if result.timestamp + else None, + "metadata": result.metadata, + "search_type": result.search_type, + } + for result in results + ] + + except Exception as e: + self.logger.error(f"Search failed: {e}") + return [] + + def search_by_embedding( + self, embedding: List[float], limit: int = 5 + ) -> List[Dict[str, Any]]: + """ + Search using pre-computed embedding vector. + + Args: + embedding: Embedding vector as list of floats + limit: Maximum number of results to return + + Returns: + List of search results as dictionaries + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + try: + import numpy as np + + embedding_array = np.array(embedding) + results = self._semantic_search.search_by_embedding(embedding_array, limit) + + # Convert to dictionaries + return [ + { + "conversation_id": result.conversation_id, + "message_id": result.message_id, + "content": result.content, + "relevance_score": result.relevance_score, + "snippet": result.snippet, + "timestamp": result.timestamp.isoformat() + if result.timestamp + else None, + "metadata": result.metadata, + "search_type": result.search_type, + } + for result in results + ] + + except Exception as e: + self.logger.error(f"Embedding search failed: {e}") + return [] + + def get_topic_summary( + self, conversation_id: str, limit: int = 20 + ) -> Dict[str, Any]: + """ + Get topic analysis summary for a conversation. + + Args: + conversation_id: ID of conversation to analyze + limit: Number of messages to analyze + + Returns: + Dictionary with topic analysis and statistics + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + return self._context_aware_search.get_topic_summary(conversation_id, limit) + + def get_temporal_summary( + self, conversation_id: Optional[str] = None, days: int = 30 + ) -> Dict[str, Any]: + """ + Get temporal analysis summary of conversations. + + Args: + conversation_id: Specific conversation to analyze (None for all) + days: Number of recent days to analyze + + Returns: + Dictionary with temporal statistics and patterns + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + return self._timeline_search.get_temporal_summary(conversation_id, days) + + def suggest_related_topics(self, query: str, limit: int = 3) -> List[str]: + """ + Suggest related topics based on query analysis. + + Args: + query: Search query to analyze + limit: Maximum number of suggestions + + Returns: + List of suggested topic strings + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + return self._context_aware_search.suggest_related_topics(query, limit) + + def index_conversation( + self, conversation_id: str, messages: List[Dict[str, Any]] + ) -> bool: + """ + Index conversation messages for semantic search. + + Args: + conversation_id: ID of the conversation + messages: List of message dictionaries + + Returns: + True if indexing successful, False otherwise + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + return self._semantic_search.index_conversation(conversation_id, messages) + + def _is_initialized(self) -> bool: + """Check if all components are initialized.""" + return ( + self._sqlite_manager is not None + and self._vector_store is not None + and self._semantic_search is not None + and self._context_aware_search is not None + and self._timeline_search is not None + ) + + +# Export main classes for external import +__all__ = [ + "MemoryManager", + "SQLiteManager", + "VectorStore", + "SemanticSearch", + "ContextAwareSearch", + "TimelineSearch", +] diff --git a/src/memory/retrieval/__init__.py b/src/memory/retrieval/__init__.py new file mode 100644 index 0000000..9367351 --- /dev/null +++ b/src/memory/retrieval/__init__.py @@ -0,0 +1,12 @@ +""" +Memory retrieval module for Mai conversation search. + +This module provides various search strategies for retrieving conversations +including semantic search, context-aware search, and timeline-based filtering. +""" + +from .semantic_search import SemanticSearch +from .context_aware import ContextAwareSearch +from .timeline_search import TimelineSearch + +__all__ = ["SemanticSearch", "ContextAwareSearch", "TimelineSearch"] diff --git a/src/memory/retrieval/context_aware.py b/src/memory/retrieval/context_aware.py new file mode 100644 index 0000000..9f23b69 --- /dev/null +++ b/src/memory/retrieval/context_aware.py @@ -0,0 +1,385 @@ +""" +Context-aware search with topic-based prioritization. + +This module provides context-aware search capabilities that prioritize +search results based on current conversation topic and context. +""" + +import sys +import os +from typing import List, Optional, Dict, Any, Set +from datetime import datetime +import re +import logging + +# Add parent directory to path for imports +sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) + +from .search_types import SearchResult, SearchQuery + + +class ContextAwareSearch: + """ + Context-aware search with topic-based result prioritization. + + Provides intelligent search that considers current conversation context + and topic relevance when ranking search results. + """ + + def __init__(self, sqlite_manager): + """ + Initialize context-aware search with SQLite manager. + + Args: + sqlite_manager: SQLiteManager instance for metadata access + """ + self.sqlite_manager = sqlite_manager + self.logger = logging.getLogger(__name__) + + # Simple topic keywords for classification + self.topic_keywords = { + "technical": [ + "code", + "programming", + "algorithm", + "function", + "class", + "method", + "api", + "database", + "debug", + "error", + "test", + "implementation", + ], + "personal": [ + "i", + "me", + "my", + "feel", + "think", + "believe", + "want", + "need", + "help", + "opinion", + "experience", + ], + "question": [ + "what", + "how", + "why", + "when", + "where", + "which", + "can", + "could", + "should", + "would", + "question", + "answer", + ], + "task": [ + "create", + "implement", + "build", + "develop", + "design", + "feature", + "fix", + "update", + "add", + "remove", + "modify", + ], + "system": [ + "system", + "performance", + "resource", + "memory", + "storage", + "optimization", + "efficiency", + "architecture", + ], + } + + def _extract_keywords(self, text: str) -> Set[str]: + """ + Extract keywords from text for topic analysis. + + Args: + text: Text to analyze + + Returns: + Set of extracted keywords + """ + # Normalize text + text = text.lower() + + # Extract words (3+ characters) + words = set() + for word in re.findall(r"\b[a-z]{3,}\b", text): + words.add(word) + + return words + + def _classify_topic(self, text: str) -> str: + """ + Classify text into topic categories. + + Args: + text: Text to classify + + Returns: + Topic classification string + """ + keywords = self._extract_keywords(text) + + # Score topics based on keyword matches + topic_scores = {} + for topic, topic_keywords in self.topic_keywords.items(): + score = sum(1 for keyword in keywords if keyword in topic_keywords) + if score > 0: + topic_scores[topic] = score + + if not topic_scores: + return "general" + + # Return highest scoring topic + return max(topic_scores.items(), key=lambda x: x[1])[0] + + def _get_current_context( + self, conversation_id: Optional[str] = None + ) -> Dict[str, Any]: + """ + Get current conversation context for topic analysis. + + Args: + conversation_id: Current conversation ID (optional) + + Returns: + Dictionary with context information + """ + context = { + "current_topic": "general", + "recent_messages": [], + "active_keywords": set(), + } + + if conversation_id: + try: + # Get recent messages from current conversation + recent_messages = self.sqlite_manager.get_recent_messages( + conversation_id, limit=10 + ) + + if recent_messages: + context["recent_messages"] = recent_messages + + # Extract keywords from recent messages + all_text = " ".join( + [msg.get("content", "") for msg in recent_messages] + ) + context["active_keywords"] = self._extract_keywords(all_text) + + # Classify current topic + context["current_topic"] = self._classify_topic(all_text) + + except Exception as e: + self.logger.error(f"Failed to get context: {e}") + + return context + + def _calculate_topic_relevance( + self, result: SearchResult, current_topic: str, active_keywords: Set[str] + ) -> float: + """ + Calculate topic relevance score for a search result. + + Args: + result: SearchResult to score + current_topic: Current conversation topic + active_keywords: Keywords active in current conversation + + Returns: + Topic relevance boost factor (1.0 = no boost, >1.0 = boosted) + """ + result_keywords = self._extract_keywords(result.content) + + # Topic-based boost + result_topic = self._classify_topic(result.content) + topic_boost = 1.0 + + if result_topic == current_topic: + topic_boost = 1.5 # 50% boost for same topic + elif result_topic in ["technical", "system"] and current_topic in [ + "technical", + "system", + ]: + topic_boost = 1.3 # 30% boost for technical topics + + # Keyword overlap boost + keyword_overlap = len(result_keywords & active_keywords) + total_keywords = len(result_keywords) or 1 + keyword_boost = 1.0 + (keyword_overlap / total_keywords) * 0.3 # Max 30% boost + + # Combined boost (limited to prevent over-boosting) + combined_boost = min(2.0, topic_boost * keyword_boost) + + return float(combined_boost) + + def prioritize_by_topic( + self, + results: List[SearchResult], + current_topic: Optional[str] = None, + conversation_id: Optional[str] = None, + ) -> List[SearchResult]: + """ + Prioritize search results based on current conversation topic. + + Args: + results: List of search results to prioritize + current_topic: Current topic (auto-detected if None) + conversation_id: Current conversation ID (for context analysis) + + Returns: + Reordered list of search results with topic-based scoring + """ + if not results: + return [] + + # Get current context + context = self._get_current_context(conversation_id) + + # Use provided topic or auto-detect + topic = current_topic or context["current_topic"] + active_keywords = context["active_keywords"] + + # Apply topic relevance scoring + scored_results = [] + for result in results: + # Calculate topic relevance boost + topic_boost = self._calculate_topic_relevance( + result, topic, active_keywords + ) + + # Apply boost to relevance score + boosted_score = min(1.0, result.relevance_score * topic_boost) + + # Update result with boosted score + result.relevance_score = boosted_score + result.search_type = "context_aware" + + scored_results.append(result) + + # Sort by boosted relevance + scored_results.sort(key=lambda x: x.relevance_score, reverse=True) + + self.logger.info( + f"Prioritized {len(results)} results for topic '{topic}' " + f"with active keywords: {len(active_keywords)}" + ) + + return scored_results + + def get_topic_summary( + self, conversation_id: str, limit: int = 20 + ) -> Dict[str, Any]: + """ + Get topic summary for a conversation. + + Args: + conversation_id: ID of conversation to analyze + limit: Number of messages to analyze + + Returns: + Dictionary with topic analysis + """ + try: + # Get recent messages + messages = self.sqlite_manager.get_recent_messages( + conversation_id, limit=limit + ) + + if not messages: + return {"topic": "general", "keywords": [], "message_count": 0} + + # Combine all message content + all_text = " ".join([msg.get("content", "") for msg in messages]) + + # Analyze topics and keywords + topic = self._classify_topic(all_text) + keywords = list(self._extract_keywords(all_text)) + + # Calculate topic distribution + topic_distribution = {} + for msg in messages: + msg_topic = self._classify_topic(msg.get("content", "")) + topic_distribution[msg_topic] = topic_distribution.get(msg_topic, 0) + 1 + + return { + "primary_topic": topic, + "all_keywords": keywords, + "message_count": len(messages), + "topic_distribution": topic_distribution, + "recent_focus": topic if len(messages) >= 5 else "general", + } + + except Exception as e: + self.logger.error(f"Failed to get topic summary: {e}") + return {"topic": "general", "keywords": [], "message_count": 0} + + def suggest_related_topics(self, query: str, limit: int = 3) -> List[str]: + """ + Suggest related topics based on query analysis. + + Args: + query: Search query to analyze + limit: Maximum number of suggestions + + Returns: + List of suggested topic strings + """ + query_topic = self._classify_topic(query) + query_keywords = self._extract_keywords(query) + + # Find topics with overlapping keywords + topic_scores = {} + for topic, keywords in self.topic_keywords.items(): + if topic == query_topic: + continue + + overlap = len(query_keywords & set(keywords)) + if overlap > 0: + topic_scores[topic] = overlap + + # Sort by keyword overlap and return top suggestions + suggested = sorted(topic_scores.items(), key=lambda x: x[1], reverse=True) + return [topic for topic, _ in suggested[:limit]] + + def is_context_relevant( + self, result: SearchResult, conversation_id: str, threshold: float = 0.3 + ) -> bool: + """ + Check if a search result is relevant to current conversation context. + + Args: + result: SearchResult to check + conversation_id: Current conversation ID + threshold: Minimum relevance threshold + + Returns: + True if result is contextually relevant + """ + context = self._get_current_context(conversation_id) + + # Calculate contextual relevance + contextual_relevance = self._calculate_topic_relevance( + result, context["current_topic"], context["active_keywords"] + ) + + # Adjust original score with contextual relevance + adjusted_score = result.relevance_score * (contextual_relevance / 1.5) + + return adjusted_score >= threshold diff --git a/src/memory/retrieval/search_types.py b/src/memory/retrieval/search_types.py new file mode 100644 index 0000000..2f53481 --- /dev/null +++ b/src/memory/retrieval/search_types.py @@ -0,0 +1,70 @@ +""" +Search result data structures for memory retrieval. + +This module defines common data types for search results across +different search strategies including relevance scoring and metadata. +""" + +from dataclasses import dataclass +from typing import Optional, Dict, Any, List +from datetime import datetime + + +@dataclass +class SearchResult: + """ + Represents a single search result from memory retrieval. + + Combines conversation data with relevance scoring and snippet + generation for effective search result presentation. + """ + + conversation_id: str + message_id: str + content: str + relevance_score: float + snippet: str + timestamp: datetime + metadata: Dict[str, Any] + search_type: str # "semantic", "keyword", "context_aware", "timeline" + + def __post_init__(self): + """Validate search result data.""" + if not self.conversation_id: + raise ValueError("conversation_id is required") + if not self.message_id: + raise ValueError("message_id is required") + if not self.content: + raise ValueError("content is required") + if not 0.0 <= self.relevance_score <= 1.0: + raise ValueError("relevance_score must be between 0.0 and 1.0") + + +@dataclass +class SearchQuery: + """ + Represents a search query with optional filters and parameters. + + Encapsulates search intent, constraints, and ranking preferences + for flexible search execution. + """ + + query: str + limit: int = 5 + search_types: Optional[List[str]] = None # None means all types + date_start: Optional[datetime] = None + date_end: Optional[datetime] = None + current_topic: Optional[str] = None + min_relevance: float = 0.0 + + def __post_init__(self): + """Validate search query parameters.""" + if not self.query or not self.query.strip(): + raise ValueError("query is required and cannot be empty") + if self.limit <= 0: + raise ValueError("limit must be positive") + if not 0.0 <= self.min_relevance <= 1.0: + raise ValueError("min_relevance must be between 0.0 and 1.0") + + if self.search_types is None: + self.search_types = ["semantic", "keyword", "context_aware", "timeline"] diff --git a/src/memory/retrieval/semantic_search.py b/src/memory/retrieval/semantic_search.py new file mode 100644 index 0000000..52fd825 --- /dev/null +++ b/src/memory/retrieval/semantic_search.py @@ -0,0 +1,373 @@ +""" +Semantic search implementation using sentence-transformers embeddings. + +This module provides semantic search capabilities through embedding generation +and vector similarity search using the vector store. +""" + +import sys +import os +from typing import List, Optional, Dict, Any +from datetime import datetime +import logging +import hashlib + +# Add parent directory to path for imports +sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) + +try: + from sentence_transformers import SentenceTransformer + import numpy as np + + SENTENCE_TRANSFORMERS_AVAILABLE = True +except ImportError: + SENTENCE_TRANSFORMERS_AVAILABLE = False + SentenceTransformer = None + np = None + +from .search_types import SearchResult, SearchQuery +from ..storage.vector_store import VectorStore + + +class SemanticSearch: + """ + Semantic search with embedding-based similarity. + + Provides semantic search capabilities through sentence-transformer embeddings + combined with vector similarity search for efficient retrieval. + """ + + def __init__(self, vector_store: VectorStore, model_name: str = "all-MiniLM-L6-v2"): + """ + Initialize semantic search with vector store and embedding model. + + Args: + vector_store: VectorStore instance for similarity search + model_name: Name of sentence-transformer model to use + """ + self.vector_store = vector_store + self.model_name = model_name + self._model = None # Lazy loading + self.logger = logging.getLogger(__name__) + + if not SENTENCE_TRANSFORMERS_AVAILABLE: + self.logger.warning( + "sentence-transformers not available. " + "Install with: pip install sentence-transformers" + ) + + @property + def model(self) -> Optional["SentenceTransformer"]: + """ + Get embedding model (lazy loaded for performance). + + Returns: + SentenceTransformer model instance + """ + if self._model is None and SENTENCE_TRANSFORMERS_AVAILABLE: + try: + self._model = SentenceTransformer(self.model_name) + self.logger.info(f"Loaded embedding model: {self.model_name}") + except Exception as e: + self.logger.error(f"Failed to load embedding model: {e}") + raise + return self._model + + def _generate_embedding(self, text: str) -> Optional["np.ndarray"]: + """ + Generate embedding for text using sentence-transformers. + + Args: + text: Text to embed + + Returns: + Embedding vector or None if model not available + """ + if not SENTENCE_TRANSFORMERS_AVAILABLE or self.model is None: + return None + + try: + # Clean and normalize text + text = text.strip() + if not text: + return None + + # Generate embedding + embedding = self.model.encode(text, convert_to_numpy=True) + return embedding + except Exception as e: + self.logger.error(f"Failed to generate embedding: {e}") + return None + + def _create_search_result( + self, + conversation_id: str, + message_id: str, + content: str, + similarity: float, + timestamp: datetime, + metadata: Dict[str, Any], + ) -> SearchResult: + """ + Create search result with relevance scoring. + + Args: + conversation_id: ID of the conversation + message_id: ID of the message + content: Message content + similarity: Similarity score (0.0 to 1.0) + timestamp: Message timestamp + metadata: Additional metadata + + Returns: + SearchResult with semantic search type + """ + # Convert similarity to relevance score (higher = more relevant) + relevance_score = float(similarity) + + # Generate snippet (first 200 characters) + snippet = content[:200] + "..." if len(content) > 200 else content + + return SearchResult( + conversation_id=conversation_id, + message_id=message_id, + content=content, + relevance_score=relevance_score, + snippet=snippet, + timestamp=timestamp, + metadata=metadata, + search_type="semantic", + ) + + def search(self, query: str, limit: int = 5) -> List[SearchResult]: + """ + Perform semantic search for query. + + Args: + query: Search query text + limit: Maximum number of results to return + + Returns: + List of search results ranked by relevance + """ + if not query or not query.strip(): + return [] + + # Generate query embedding + query_embedding = self._generate_embedding(query) + if query_embedding is None: + self.logger.warning( + "Failed to generate query embedding, falling back to keyword search" + ) + return self.keyword_search(query, limit) + + # Search vector store for similar embeddings + try: + vector_results = self.vector_store.search_similar( + query_embedding, limit * 2 + ) + + # Convert to search results + results = [] + for result in vector_results: + search_result = self._create_search_result( + conversation_id=result.get("conversation_id", ""), + message_id=result.get("message_id", ""), + content=result.get("content", ""), + similarity=result.get("similarity", 0.0), + timestamp=result.get("timestamp", datetime.utcnow()), + metadata=result.get("metadata", {}), + ) + results.append(search_result) + + # Sort by relevance score and limit results + results.sort(key=lambda x: x.relevance_score, reverse=True) + return results[:limit] + + except Exception as e: + self.logger.error(f"Semantic search failed: {e}") + return [] + + def search_by_embedding( + self, embedding: "np.ndarray", limit: int = 5 + ) -> List[SearchResult]: + """ + Search using pre-computed embedding. + + Args: + embedding: Query embedding vector + limit: Maximum number of results to return + + Returns: + List of search results ranked by similarity + """ + if embedding is None: + return [] + + try: + vector_results = self.vector_store.search_similar(embedding, limit * 2) + + # Convert to search results + results = [] + for result in vector_results: + search_result = self._create_search_result( + conversation_id=result.get("conversation_id", ""), + message_id=result.get("message_id", ""), + content=result.get("content", ""), + similarity=result.get("similarity", 0.0), + timestamp=result.get("timestamp", datetime.utcnow()), + metadata=result.get("metadata", {}), + ) + results.append(search_result) + + # Sort by relevance score and limit results + results.sort(key=lambda x: x.relevance_score, reverse=True) + return results[:limit] + + except Exception as e: + self.logger.error(f"Embedding search failed: {e}") + return [] + + def keyword_search(self, query: str, limit: int = 5) -> List[SearchResult]: + """ + Fallback keyword-based search. + + Args: + query: Search query string + limit: Maximum number of results to return + + Returns: + List of search results with keyword search type + """ + if not query or not query.strip(): + return [] + + try: + # Simple keyword search through vector store metadata + # This is a basic implementation - could be enhanced with FTS + results = self.vector_store.search_by_keyword(query, limit) + + # Convert to search results + search_results = [] + for result in results: + search_result = SearchResult( + conversation_id=result.get("conversation_id", ""), + message_id=result.get("message_id", ""), + content=result.get("content", ""), + relevance_score=result.get("relevance", 0.5), + snippet=result.get("snippet", ""), + timestamp=result.get("timestamp", datetime.utcnow()), + metadata=result.get("metadata", {}), + search_type="keyword", + ) + search_results.append(search_result) + + # Sort by relevance and limit + search_results.sort(key=lambda x: x.relevance_score, reverse=True) + return search_results[:limit] + + except Exception as e: + self.logger.error(f"Keyword search failed: {e}") + return [] + + def hybrid_search(self, query: str, limit: int = 5) -> List[SearchResult]: + """ + Hybrid search combining semantic and keyword matching. + + Args: + query: Search query text + limit: Maximum number of results to return + + Returns: + List of search results with hybrid scoring + """ + if not query or not query.strip(): + return [] + + # Get semantic results + semantic_results = self.search(query, limit) + + # Get keyword results + keyword_results = self.keyword_search(query, limit) + + # Combine and deduplicate results + combined_results = {} + + # Add semantic results with higher weight + for result in semantic_results: + key = f"{result.conversation_id}_{result.message_id}" + # Boost semantic results + boosted_score = min(1.0, result.relevance_score * 1.2) + result.relevance_score = boosted_score + combined_results[key] = result + + # Add keyword results (only if not already present) + for result in keyword_results: + key = f"{result.conversation_id}_{result.message_id}" + if key not in combined_results: + # Lower weight for keyword results + result.relevance_score = result.relevance_score * 0.8 + combined_results[key] = result + else: + # Merge scores if present in both + existing = combined_results[key] + existing.relevance_score = max( + existing.relevance_score, result.relevance_score * 0.8 + ) + + # Convert to list and sort + final_results = list(combined_results.values()) + final_results.sort(key=lambda x: x.relevance_score, reverse=True) + + return final_results[:limit] + + def index_conversation( + self, conversation_id: str, messages: List[Dict[str, Any]] + ) -> bool: + """ + Index conversation messages for semantic search. + + Args: + conversation_id: ID of the conversation + messages: List of message dictionaries + + Returns: + True if indexing successful, False otherwise + """ + if not SENTENCE_TRANSFORMERS_AVAILABLE or self.model is None: + self.logger.warning("Cannot index: sentence-transformers not available") + return False + + try: + embeddings = [] + for message in messages: + content = message.get("content", "") + if content.strip(): + embedding = self._generate_embedding(content) + if embedding is not None: + embeddings.append( + { + "conversation_id": conversation_id, + "message_id": message.get("id", ""), + "content": content, + "embedding": embedding, + "timestamp": message.get( + "timestamp", datetime.utcnow() + ), + "metadata": message.get("metadata", {}), + } + ) + + # Store embeddings in vector store + if embeddings: + self.vector_store.store_embeddings(embeddings) + self.logger.info( + f"Indexed {len(embeddings)} messages for conversation {conversation_id}" + ) + return True + + return False + + except Exception as e: + self.logger.error(f"Failed to index conversation: {e}") + return False diff --git a/src/memory/retrieval/timeline_search.py b/src/memory/retrieval/timeline_search.py new file mode 100644 index 0000000..032f897 --- /dev/null +++ b/src/memory/retrieval/timeline_search.py @@ -0,0 +1,449 @@ +""" +Timeline search implementation with date-range filtering and temporal analysis. + +This module provides timeline-based search capabilities that allow filtering +conversations by date ranges, recency, and temporal proximity. +""" + +import sys +import os +from typing import List, Optional, Dict, Any, Tuple +from datetime import datetime, timedelta +import logging + +# Add parent directory to path for imports +sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) + +from .search_types import SearchResult, SearchQuery + + +class TimelineSearch: + """ + Timeline search with date-range filtering and temporal search. + + Provides time-based search capabilities including date range filtering, + temporal proximity search, and recency-based result weighting. + """ + + def __init__(self, sqlite_manager): + """ + Initialize timeline search with SQLite manager. + + Args: + sqlite_manager: SQLiteManager instance for temporal data access + """ + self.sqlite_manager = sqlite_manager + self.logger = logging.getLogger(__name__) + + # Compression awareness - conversations are compressed at different ages + self.compression_tiers = { + "recent": timedelta(days=7), # Full detail + "medium": timedelta(days=30), # Key points + "old": timedelta(days=90), # Brief summary + "archived": timedelta(days=365), # Metadata only + } + + def _get_compression_level(self, age: timedelta) -> str: + """ + Determine compression level based on conversation age. + + Args: + age: Age of the conversation + + Returns: + Compression level string + """ + if age <= self.compression_tiers["recent"]: + return "full" + elif age <= self.compression_tiers["medium"]: + return "key_points" + elif age <= self.compression_tiers["old"]: + return "summary" + else: + return "metadata" + + def _calculate_recency_score(self, timestamp: datetime) -> float: + """ + Calculate recency-based score boost. + + Args: + timestamp: Message timestamp + + Returns: + Recency boost factor (1.0 = no boost, >1.0 = recent) + """ + now = datetime.utcnow() + age = now - timestamp + + # Very recent (last 24 hours) + if age <= timedelta(hours=24): + return 1.5 + # Recent (last week) + elif age <= timedelta(days=7): + return 1.3 + # Semi-recent (last month) + elif age <= timedelta(days=30): + return 1.1 + # Older (no boost, slight penalty) + else: + return 0.9 + + def _calculate_temporal_proximity_score( + self, target_date: datetime, message_date: datetime + ) -> float: + """ + Calculate temporal proximity score for date-based search. + + Args: + target_date: Target date to find conversations near + message_date: Date of the message/conversation + + Returns: + Proximity score (1.0 = exact match, decreasing with distance) + """ + distance = abs(target_date - message_date) + + # Exact match + if distance == timedelta(0): + return 1.0 + + # Within 1 day + elif distance <= timedelta(days=1): + return 0.9 + # Within 1 week + elif distance <= timedelta(days=7): + return 0.7 + # Within 1 month + elif distance <= timedelta(days=30): + return 0.5 + # Within 3 months + elif distance <= timedelta(days=90): + return 0.3 + # Older + else: + return 0.1 + + def _create_timeline_result( + self, + conversation_id: str, + message_id: str, + content: str, + timestamp: datetime, + metadata: Dict[str, Any], + temporal_score: float, + ) -> SearchResult: + """ + Create search result with temporal scoring. + + Args: + conversation_id: ID of the conversation + message_id: ID of the message + content: Message content + timestamp: Message timestamp + metadata: Additional metadata + temporal_score: Temporal relevance score + + Returns: + SearchResult with timeline search type + """ + # Generate snippet based on compression level + age = datetime.utcnow() - timestamp + compression_level = self._get_compression_level(age) + + if compression_level == "full": + snippet = content[:300] + "..." if len(content) > 300 else content + elif compression_level == "key_points": + snippet = content[:150] + "..." if len(content) > 150 else content + elif compression_level == "summary": + snippet = content[:75] + "..." if len(content) > 75 else content + else: # metadata + snippet = content[:50] + "..." if len(content) > 50 else content + + return SearchResult( + conversation_id=conversation_id, + message_id=message_id, + content=content, + relevance_score=temporal_score, + snippet=snippet, + timestamp=timestamp, + metadata={ + **metadata, + "age_days": age.days, + "compression_level": compression_level, + "temporal_score": temporal_score, + }, + search_type="timeline", + ) + + def search_by_date_range( + self, start: datetime, end: datetime, limit: int = 5 + ) -> List[SearchResult]: + """ + Search conversations within a specific date range. + + Args: + start: Start date (inclusive) + end: End date (inclusive) + limit: Maximum number of results to return + + Returns: + List of search results within date range + """ + if start >= end: + self.logger.warning("Invalid date range: start must be before end") + return [] + + try: + # Get conversations in date range from SQLite + messages = self.sqlite_manager.get_messages_by_date_range( + start, end, limit * 2 + ) + + results = [] + for message in messages: + # Calculate temporal relevance based on recency + recency_score = self._calculate_recency_score( + message.get("timestamp", datetime.utcnow()) + ) + + # Create search result + result = self._create_timeline_result( + conversation_id=message.get("conversation_id", ""), + message_id=message.get("id", ""), + content=message.get("content", ""), + timestamp=message.get("timestamp", datetime.utcnow()), + metadata=message.get("metadata", {}), + temporal_score=recency_score, + ) + results.append(result) + + # Sort by timestamp (most recent first) and limit + results.sort(key=lambda x: x.timestamp, reverse=True) + return results[:limit] + + except Exception as e: + self.logger.error(f"Date range search failed: {e}") + return [] + + def search_near_date( + self, target_date: datetime, days_range: int = 7, limit: int = 5 + ) -> List[SearchResult]: + """ + Search for conversations near a specific date. + + Args: + target_date: Target date to search around + days_range: Number of days before/after to include + limit: Maximum number of results to return + + Returns: + List of search results temporally close to target + """ + try: + # Calculate date range around target + start = target_date - timedelta(days=days_range) + end = target_date + timedelta(days=days_range) + + # Get messages in extended range + messages = self.sqlite_manager.get_messages_by_date_range( + start, end, limit * 3 + ) + + results = [] + for message in messages: + # Calculate temporal proximity score + proximity_score = self._calculate_temporal_proximity_score( + target_date, message.get("timestamp", datetime.utcnow()) + ) + + # Create search result + result = self._create_timeline_result( + conversation_id=message.get("conversation_id", ""), + message_id=message.get("id", ""), + content=message.get("content", ""), + timestamp=message.get("timestamp", datetime.utcnow()), + metadata=message.get("metadata", {}), + temporal_score=proximity_score, + ) + results.append(result) + + # Sort by proximity score and limit + results.sort(key=lambda x: x.relevance_score, reverse=True) + return results[:limit] + + except Exception as e: + self.logger.error(f"Near date search failed: {e}") + return [] + + def search_recent(self, days: int = 7, limit: int = 5) -> List[SearchResult]: + """ + Search for recent conversations within specified days. + + Args: + days: Number of recent days to search + limit: Maximum number of results to return + + Returns: + List of recent search results + """ + end = datetime.utcnow() + start = end - timedelta(days=days) + + return self.search_by_date_range(start, end, limit) + + def get_temporal_summary( + self, conversation_id: Optional[str] = None, days: int = 30 + ) -> Dict[str, Any]: + """ + Get temporal summary of conversations. + + Args: + conversation_id: Specific conversation to analyze (None for all) + days: Number of recent days to analyze + + Returns: + Dictionary with temporal statistics + """ + try: + end = datetime.utcnow() + start = end - timedelta(days=days) + + # Get messages in time range + messages = self.sqlite_manager.get_messages_by_date_range( + start, + end, + limit=1000, # Get all for analysis + ) + + if conversation_id: + messages = [ + msg + for msg in messages + if msg.get("conversation_id") == conversation_id + ] + + if not messages: + return { + "total_messages": 0, + "date_range": f"{start.date()} to {end.date()}", + "daily_average": 0.0, + "peak_days": [], + } + + # Analyze temporal patterns + daily_counts = {} + for message in messages: + date = message.get("timestamp", datetime.utcnow()).date() + daily_counts[date] = daily_counts.get(date, 0) + 1 + + # Calculate statistics + total_messages = len(messages) + days_in_range = (end - start).days or 1 + daily_average = total_messages / days_in_range + + # Find peak activity days + peak_days = sorted(daily_counts.items(), key=lambda x: x[1], reverse=True)[ + :5 + ] + + return { + "total_messages": total_messages, + "date_range": f"{start.date()} to {end.date()}", + "days_analyzed": days_in_range, + "daily_average": round(daily_average, 2), + "peak_days": [ + {"date": str(date), "count": count} for date, count in peak_days + ], + "compression_distribution": self._analyze_compression_distribution( + messages + ), + } + + except Exception as e: + self.logger.error(f"Failed to get temporal summary: {e}") + return {"error": str(e)} + + def _analyze_compression_distribution( + self, messages: List[Dict[str, Any]] + ) -> Dict[str, int]: + """ + Analyze compression level distribution of messages. + + Args: + messages: List of messages to analyze + + Returns: + Dictionary with compression level counts + """ + distribution = {"full": 0, "key_points": 0, "summary": 0, "metadata": 0} + now = datetime.utcnow() + + for message in messages: + timestamp = message.get("timestamp", now) + age = now - timestamp + level = self._get_compression_level(age) + distribution[level] = distribution.get(level, 0) + 1 + + return distribution + + def find_conversations_around_topic( + self, topic_keywords: List[str], days_range: int = 30, limit: int = 5 + ) -> List[SearchResult]: + """ + Find conversations around specific topic keywords within time range. + + Args: + topic_keywords: Keywords related to the topic + days_range: Number of days to search back + limit: Maximum number of results + + Returns: + List of search results with topic relevance + """ + end = datetime.utcnow() + start = end - timedelta(days=days_range) + + try: + # Get messages in time range + messages = self.sqlite_manager.get_messages_by_date_range( + start, end, limit * 2 + ) + + results = [] + for message in messages: + content = message.get("content", "").lower() + + # Count keyword matches + keyword_matches = sum( + 1 for keyword in topic_keywords if keyword.lower() in content + ) + + if keyword_matches > 0: + # Calculate topic relevance score + topic_score = min(1.0, keyword_matches / len(topic_keywords)) + + # Combine with recency score + recency_score = self._calculate_recency_score( + message.get("timestamp", datetime.utcnow()) + ) + + combined_score = topic_score * recency_score + + result = self._create_timeline_result( + conversation_id=message.get("conversation_id", ""), + message_id=message.get("id", ""), + content=message.get("content", ""), + timestamp=message.get("timestamp", datetime.utcnow()), + metadata=message.get("metadata", {}), + temporal_score=combined_score, + ) + result.metadata["keyword_matches"] = keyword_matches + results.append(result) + + # Sort by combined score and limit + results.sort(key=lambda x: x.relevance_score, reverse=True) + return results[:limit] + + except Exception as e: + self.logger.error(f"Topic timeline search failed: {e}") + return []