From 1e4ceec820b837349f849eb0488a8b3efc872abd Mon Sep 17 00:00:00 2001 From: Mai Development Date: Wed, 28 Jan 2026 13:12:59 -0500 Subject: [PATCH] feat(04-07): implement get_conversation_metadata and get_recent_messages methods - Added get_conversation_metadata method for comprehensive conversation metadata - Added get_recent_messages method for retrieving recent messages by conversation - Methods support topic analysis and engagement metrics - Includes temporal patterns, context clues, and relationship analysis - Follows existing SQLiteManager patterns and error handling --- src/memory/storage/sqlite_manager.py | 286 ++++++++++++++++++++++++++- 1 file changed, 285 insertions(+), 1 deletion(-) diff --git a/src/memory/storage/sqlite_manager.py b/src/memory/storage/sqlite_manager.py index 99d43cb..3648f14 100644 --- a/src/memory/storage/sqlite_manager.py +++ b/src/memory/storage/sqlite_manager.py @@ -397,7 +397,7 @@ class SQLiteManager: query += " LIMIT ?" params.append(limit) - cursor = conn.execute(query, params) + cursor = conn.execute(query, tuple(params)) messages = [] for row in cursor: messages.append( @@ -419,6 +419,290 @@ class SQLiteManager: self.logger.error(f"Failed to get messages by role {role}: {e}") raise + def get_recent_messages( + self, conversation_id: str, limit: int = 10, offset: int = 0 + ) -> List[Dict[str, Any]]: + """ + Get recent messages from a conversation. + + Args: + conversation_id: Conversation ID + limit: Maximum number of messages to return + offset: Offset for pagination + + Returns: + List of messages ordered by timestamp (newest first) + """ + conn = self._get_connection() + try: + query = """ + SELECT * FROM messages + WHERE conversation_id = ? + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + """ + + cursor = conn.execute(query, (conversation_id, limit, offset)) + messages = [] + for row in cursor: + messages.append( + { + "id": row["id"], + "conversation_id": row["conversation_id"], + "role": row["role"], + "content": row["content"], + "timestamp": row["timestamp"], + "token_count": row["token_count"], + "importance_score": row["importance_score"], + "metadata": json.loads(row["metadata"]), + "embedding_id": row["embedding_id"], + } + ) + + return messages + except Exception as e: + self.logger.error(f"Failed to get recent messages: {e}") + raise + + def get_conversation_metadata( + self, conversation_ids: List[str] + ) -> Dict[str, Dict[str, Any]]: + """ + Get comprehensive metadata for specified conversations. + + Args: + conversation_ids: List of conversation IDs to retrieve metadata for + + Returns: + Dictionary mapping conversation_id to comprehensive metadata + """ + conn = self._get_connection() + try: + metadata = {} + + # Create placeholders for IN clause + placeholders = ",".join(["?" for _ in conversation_ids]) + + # Get basic conversation metadata + cursor = conn.execute( + f""" + SELECT + id, title, created_at, updated_at, metadata, + session_id, total_messages, total_tokens, context_window_size, + model_history + FROM conversations + WHERE id IN ({placeholders}) + ORDER BY updated_at DESC + """, + conversation_ids, + ) + + conversations_data = cursor.fetchall() + + for conv in conversations_data: + conv_id = conv["id"] + + # Parse JSON metadata fields + try: + conv_metadata = ( + json.loads(conv["metadata"]) if conv["metadata"] else {} + ) + model_history = ( + json.loads(conv["model_history"]) + if conv["model_history"] + else [] + ) + except json.JSONDecodeError: + conv_metadata = {} + model_history = [] + + # Initialize metadata structure + metadata[conv_id] = { + # Basic conversation metadata + "conversation_info": { + "id": conv_id, + "title": conv["title"], + "created_at": conv["created_at"], + "updated_at": conv["updated_at"], + "session_id": conv["session_id"], + "total_messages": conv["total_messages"], + "total_tokens": conv["total_tokens"], + "context_window_size": conv["context_window_size"], + }, + # Topic information from metadata + "topic_info": { + "main_topics": conv_metadata.get("main_topics", []), + "topic_frequency": conv_metadata.get("topic_frequency", {}), + "topic_sentiment": conv_metadata.get("topic_sentiment", {}), + "primary_topic": conv_metadata.get("primary_topic", "general"), + }, + # Conversation metadata + "metadata": conv_metadata, + # Model history + "model_history": model_history, + } + + # Calculate engagement metrics for each conversation + for conv_id in conversation_ids: + if conv_id in metadata: + # Get message statistics + cursor = conn.execute( + """ + SELECT + role, + COUNT(*) as count, + AVG(importance_score) as avg_importance, + MIN(timestamp) as first_message, + MAX(timestamp) as last_message + FROM messages + WHERE conversation_id = ? + GROUP BY role + """, + (conv_id,), + ) + + role_stats = cursor.fetchall() + + # Calculate engagement metrics + total_user_messages = 0 + total_assistant_messages = 0 + total_importance = 0 + message_count = 0 + first_message_time = None + last_message_time = None + + for stat in role_stats: + if stat["role"] == "user": + total_user_messages = stat["count"] + elif stat["role"] == "assistant": + total_assistant_messages = stat["count"] + + total_importance += stat["avg_importance"] or 0 + message_count += stat["count"] + + if ( + not first_message_time + or stat["first_message"] < first_message_time + ): + first_message_time = stat["first_message"] + if ( + not last_message_time + or stat["last_message"] > last_message_time + ): + last_message_time = stat["last_message"] + + # Calculate user message ratio + user_message_ratio = total_user_messages / max(1, message_count) + + # Add engagement metrics + metadata[conv_id]["engagement_metrics"] = { + "message_count": message_count, + "user_message_count": total_user_messages, + "assistant_message_count": total_assistant_messages, + "user_message_ratio": user_message_ratio, + "avg_importance": total_importance / max(1, len(role_stats)), + "conversation_duration_seconds": ( + (last_message_time - first_message_time).total_seconds() + if first_message_time and last_message_time + else 0 + ), + } + + # Calculate temporal patterns + if last_message_time: + cursor = conn.execute( + """ + SELECT + strftime('%H', timestamp) as hour, + strftime('%w', timestamp) as day_of_week, + COUNT(*) as count + FROM messages + WHERE conversation_id = ? + GROUP BY hour, day_of_week + """, + (conv_id,), + ) + + temporal_data = cursor.fetchall() + + # Analyze temporal patterns + hour_counts = {} + day_counts = {} + for row in temporal_data: + hour = row["hour"] + day = int(row["day_of_week"]) + hour_counts[hour] = hour_counts.get(hour, 0) + row["count"] + day_counts[day] = day_counts.get(day, 0) + row["count"] + + # Find most common hour and day + most_common_hour = ( + max(hour_counts.items(), key=lambda x: x[1])[0] + if hour_counts + else None + ) + most_common_day = ( + max(day_counts.items(), key=lambda x: x[1])[0] + if day_counts + else None + ) + + metadata[conv_id]["temporal_patterns"] = { + "most_common_hour": int(most_common_hour) + if most_common_hour + else None, + "most_common_day": most_common_day, + "hour_distribution": hour_counts, + "day_distribution": day_counts, + "last_activity": last_message_time, + } + else: + metadata[conv_id]["temporal_patterns"] = { + "most_common_hour": None, + "most_common_day": None, + "hour_distribution": {}, + "day_distribution": {}, + "last_activity": None, + } + + # Get related conversations (same session or similar topics) + if metadata[conv_id]["conversation_info"]["session_id"]: + cursor = conn.execute( + """ + SELECT id, title, updated_at + FROM conversations + WHERE session_id = ? AND id != ? + ORDER BY updated_at DESC + LIMIT 5 + """, + ( + metadata[conv_id]["conversation_info"]["session_id"], + conv_id, + ), + ) + + related = cursor.fetchall() + metadata[conv_id]["context_clues"] = { + "related_conversations": [ + { + "id": r["id"], + "title": r["title"], + "updated_at": r["updated_at"], + "relationship": "same_session", + } + for r in related + ] + } + else: + metadata[conv_id]["context_clues"] = { + "related_conversations": [] + } + + return metadata + + except Exception as e: + self.logger.error(f"Failed to get conversation metadata: {e}") + raise + def update_conversation_metadata( self, conversation_id: str, metadata: Dict[str, Any] ) -> None: