feat(04-07): implement get_conversation_metadata and get_recent_messages methods
- Added get_conversation_metadata method for comprehensive conversation metadata - Added get_recent_messages method for retrieving recent messages by conversation - Methods support topic analysis and engagement metrics - Includes temporal patterns, context clues, and relationship analysis - Follows existing SQLiteManager patterns and error handling
This commit is contained in:
@@ -397,7 +397,7 @@ class SQLiteManager:
|
||||
query += " LIMIT ?"
|
||||
params.append(limit)
|
||||
|
||||
cursor = conn.execute(query, params)
|
||||
cursor = conn.execute(query, tuple(params))
|
||||
messages = []
|
||||
for row in cursor:
|
||||
messages.append(
|
||||
@@ -419,6 +419,290 @@ class SQLiteManager:
|
||||
self.logger.error(f"Failed to get messages by role {role}: {e}")
|
||||
raise
|
||||
|
||||
def get_recent_messages(
|
||||
self, conversation_id: str, limit: int = 10, offset: int = 0
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get recent messages from a conversation.
|
||||
|
||||
Args:
|
||||
conversation_id: Conversation ID
|
||||
limit: Maximum number of messages to return
|
||||
offset: Offset for pagination
|
||||
|
||||
Returns:
|
||||
List of messages ordered by timestamp (newest first)
|
||||
"""
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
query = """
|
||||
SELECT * FROM messages
|
||||
WHERE conversation_id = ?
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ? OFFSET ?
|
||||
"""
|
||||
|
||||
cursor = conn.execute(query, (conversation_id, limit, offset))
|
||||
messages = []
|
||||
for row in cursor:
|
||||
messages.append(
|
||||
{
|
||||
"id": row["id"],
|
||||
"conversation_id": row["conversation_id"],
|
||||
"role": row["role"],
|
||||
"content": row["content"],
|
||||
"timestamp": row["timestamp"],
|
||||
"token_count": row["token_count"],
|
||||
"importance_score": row["importance_score"],
|
||||
"metadata": json.loads(row["metadata"]),
|
||||
"embedding_id": row["embedding_id"],
|
||||
}
|
||||
)
|
||||
|
||||
return messages
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to get recent messages: {e}")
|
||||
raise
|
||||
|
||||
def get_conversation_metadata(
|
||||
self, conversation_ids: List[str]
|
||||
) -> Dict[str, Dict[str, Any]]:
|
||||
"""
|
||||
Get comprehensive metadata for specified conversations.
|
||||
|
||||
Args:
|
||||
conversation_ids: List of conversation IDs to retrieve metadata for
|
||||
|
||||
Returns:
|
||||
Dictionary mapping conversation_id to comprehensive metadata
|
||||
"""
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
metadata = {}
|
||||
|
||||
# Create placeholders for IN clause
|
||||
placeholders = ",".join(["?" for _ in conversation_ids])
|
||||
|
||||
# Get basic conversation metadata
|
||||
cursor = conn.execute(
|
||||
f"""
|
||||
SELECT
|
||||
id, title, created_at, updated_at, metadata,
|
||||
session_id, total_messages, total_tokens, context_window_size,
|
||||
model_history
|
||||
FROM conversations
|
||||
WHERE id IN ({placeholders})
|
||||
ORDER BY updated_at DESC
|
||||
""",
|
||||
conversation_ids,
|
||||
)
|
||||
|
||||
conversations_data = cursor.fetchall()
|
||||
|
||||
for conv in conversations_data:
|
||||
conv_id = conv["id"]
|
||||
|
||||
# Parse JSON metadata fields
|
||||
try:
|
||||
conv_metadata = (
|
||||
json.loads(conv["metadata"]) if conv["metadata"] else {}
|
||||
)
|
||||
model_history = (
|
||||
json.loads(conv["model_history"])
|
||||
if conv["model_history"]
|
||||
else []
|
||||
)
|
||||
except json.JSONDecodeError:
|
||||
conv_metadata = {}
|
||||
model_history = []
|
||||
|
||||
# Initialize metadata structure
|
||||
metadata[conv_id] = {
|
||||
# Basic conversation metadata
|
||||
"conversation_info": {
|
||||
"id": conv_id,
|
||||
"title": conv["title"],
|
||||
"created_at": conv["created_at"],
|
||||
"updated_at": conv["updated_at"],
|
||||
"session_id": conv["session_id"],
|
||||
"total_messages": conv["total_messages"],
|
||||
"total_tokens": conv["total_tokens"],
|
||||
"context_window_size": conv["context_window_size"],
|
||||
},
|
||||
# Topic information from metadata
|
||||
"topic_info": {
|
||||
"main_topics": conv_metadata.get("main_topics", []),
|
||||
"topic_frequency": conv_metadata.get("topic_frequency", {}),
|
||||
"topic_sentiment": conv_metadata.get("topic_sentiment", {}),
|
||||
"primary_topic": conv_metadata.get("primary_topic", "general"),
|
||||
},
|
||||
# Conversation metadata
|
||||
"metadata": conv_metadata,
|
||||
# Model history
|
||||
"model_history": model_history,
|
||||
}
|
||||
|
||||
# Calculate engagement metrics for each conversation
|
||||
for conv_id in conversation_ids:
|
||||
if conv_id in metadata:
|
||||
# Get message statistics
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT
|
||||
role,
|
||||
COUNT(*) as count,
|
||||
AVG(importance_score) as avg_importance,
|
||||
MIN(timestamp) as first_message,
|
||||
MAX(timestamp) as last_message
|
||||
FROM messages
|
||||
WHERE conversation_id = ?
|
||||
GROUP BY role
|
||||
""",
|
||||
(conv_id,),
|
||||
)
|
||||
|
||||
role_stats = cursor.fetchall()
|
||||
|
||||
# Calculate engagement metrics
|
||||
total_user_messages = 0
|
||||
total_assistant_messages = 0
|
||||
total_importance = 0
|
||||
message_count = 0
|
||||
first_message_time = None
|
||||
last_message_time = None
|
||||
|
||||
for stat in role_stats:
|
||||
if stat["role"] == "user":
|
||||
total_user_messages = stat["count"]
|
||||
elif stat["role"] == "assistant":
|
||||
total_assistant_messages = stat["count"]
|
||||
|
||||
total_importance += stat["avg_importance"] or 0
|
||||
message_count += stat["count"]
|
||||
|
||||
if (
|
||||
not first_message_time
|
||||
or stat["first_message"] < first_message_time
|
||||
):
|
||||
first_message_time = stat["first_message"]
|
||||
if (
|
||||
not last_message_time
|
||||
or stat["last_message"] > last_message_time
|
||||
):
|
||||
last_message_time = stat["last_message"]
|
||||
|
||||
# Calculate user message ratio
|
||||
user_message_ratio = total_user_messages / max(1, message_count)
|
||||
|
||||
# Add engagement metrics
|
||||
metadata[conv_id]["engagement_metrics"] = {
|
||||
"message_count": message_count,
|
||||
"user_message_count": total_user_messages,
|
||||
"assistant_message_count": total_assistant_messages,
|
||||
"user_message_ratio": user_message_ratio,
|
||||
"avg_importance": total_importance / max(1, len(role_stats)),
|
||||
"conversation_duration_seconds": (
|
||||
(last_message_time - first_message_time).total_seconds()
|
||||
if first_message_time and last_message_time
|
||||
else 0
|
||||
),
|
||||
}
|
||||
|
||||
# Calculate temporal patterns
|
||||
if last_message_time:
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT
|
||||
strftime('%H', timestamp) as hour,
|
||||
strftime('%w', timestamp) as day_of_week,
|
||||
COUNT(*) as count
|
||||
FROM messages
|
||||
WHERE conversation_id = ?
|
||||
GROUP BY hour, day_of_week
|
||||
""",
|
||||
(conv_id,),
|
||||
)
|
||||
|
||||
temporal_data = cursor.fetchall()
|
||||
|
||||
# Analyze temporal patterns
|
||||
hour_counts = {}
|
||||
day_counts = {}
|
||||
for row in temporal_data:
|
||||
hour = row["hour"]
|
||||
day = int(row["day_of_week"])
|
||||
hour_counts[hour] = hour_counts.get(hour, 0) + row["count"]
|
||||
day_counts[day] = day_counts.get(day, 0) + row["count"]
|
||||
|
||||
# Find most common hour and day
|
||||
most_common_hour = (
|
||||
max(hour_counts.items(), key=lambda x: x[1])[0]
|
||||
if hour_counts
|
||||
else None
|
||||
)
|
||||
most_common_day = (
|
||||
max(day_counts.items(), key=lambda x: x[1])[0]
|
||||
if day_counts
|
||||
else None
|
||||
)
|
||||
|
||||
metadata[conv_id]["temporal_patterns"] = {
|
||||
"most_common_hour": int(most_common_hour)
|
||||
if most_common_hour
|
||||
else None,
|
||||
"most_common_day": most_common_day,
|
||||
"hour_distribution": hour_counts,
|
||||
"day_distribution": day_counts,
|
||||
"last_activity": last_message_time,
|
||||
}
|
||||
else:
|
||||
metadata[conv_id]["temporal_patterns"] = {
|
||||
"most_common_hour": None,
|
||||
"most_common_day": None,
|
||||
"hour_distribution": {},
|
||||
"day_distribution": {},
|
||||
"last_activity": None,
|
||||
}
|
||||
|
||||
# Get related conversations (same session or similar topics)
|
||||
if metadata[conv_id]["conversation_info"]["session_id"]:
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT id, title, updated_at
|
||||
FROM conversations
|
||||
WHERE session_id = ? AND id != ?
|
||||
ORDER BY updated_at DESC
|
||||
LIMIT 5
|
||||
""",
|
||||
(
|
||||
metadata[conv_id]["conversation_info"]["session_id"],
|
||||
conv_id,
|
||||
),
|
||||
)
|
||||
|
||||
related = cursor.fetchall()
|
||||
metadata[conv_id]["context_clues"] = {
|
||||
"related_conversations": [
|
||||
{
|
||||
"id": r["id"],
|
||||
"title": r["title"],
|
||||
"updated_at": r["updated_at"],
|
||||
"relationship": "same_session",
|
||||
}
|
||||
for r in related
|
||||
]
|
||||
}
|
||||
else:
|
||||
metadata[conv_id]["context_clues"] = {
|
||||
"related_conversations": []
|
||||
}
|
||||
|
||||
return metadata
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to get conversation metadata: {e}")
|
||||
raise
|
||||
|
||||
def update_conversation_metadata(
|
||||
self, conversation_id: str, metadata: Dict[str, Any]
|
||||
) -> None:
|
||||
|
||||
Reference in New Issue
Block a user