feat(04-03): create JSON archival and smart retention systems

- Added ArchivalManager for JSON export/import with gzip compression
- Implemented organized directory structure by year/month
- Added batch archival operations and restore functionality
- Created RetentionPolicy with importance-based scoring
- Smart retention considers engagement, topics, user-marked importance
- MemoryManager integrates compression and archival automatically
- Added automatic compression triggering and archival scheduling
- Comprehensive archival statistics and retention recommendations
- Support for backup integration and restore verification
This commit is contained in:
Mai Development
2026-01-27 23:56:49 -05:00
parent 017df5466d
commit 8c58b1d070
4 changed files with 1285 additions and 1 deletions

View File

@@ -7,9 +7,12 @@ messages, and associated vector embeddings for semantic search capabilities.
from .storage.sqlite_manager import SQLiteManager
from .storage.vector_store import VectorStore
from .storage.compression import CompressionEngine
from .retrieval.semantic_search import SemanticSearch
from .retrieval.context_aware import ContextAwareSearch
from .retrieval.timeline_search import TimelineSearch
from .backup.archival import ArchivalManager
from .backup.retention import RetentionPolicy
from typing import Optional, List, Dict, Any, Union
from datetime import datetime
@@ -37,6 +40,9 @@ class MemoryManager:
self._semantic_search: Optional[SemanticSearch] = None
self._context_aware_search: Optional[ContextAwareSearch] = None
self._timeline_search: Optional[TimelineSearch] = None
self._compression_engine: Optional[CompressionEngine] = None
self._archival_manager: Optional[ArchivalManager] = None
self._retention_policy: Optional[RetentionPolicy] = None
self.logger = logging.getLogger(__name__)
def initialize(self) -> None:
@@ -55,8 +61,15 @@ class MemoryManager:
self._context_aware_search = ContextAwareSearch(self._sqlite_manager)
self._timeline_search = TimelineSearch(self._sqlite_manager)
# Initialize archival components
self._compression_engine = CompressionEngine()
self._archival_manager = ArchivalManager(
compression_engine=self._compression_engine
)
self._retention_policy = RetentionPolicy(self._sqlite_manager)
self.logger.info(
f"Enhanced memory manager initialized with database: {self.db_path}"
f"Enhanced memory manager initialized with archival: {self.db_path}"
)
except Exception as e:
self.logger.error(f"Failed to initialize enhanced memory manager: {e}")
@@ -107,6 +120,289 @@ class MemoryManager:
)
return self._timeline_search
@property
def compression_engine(self) -> CompressionEngine:
"""Get compression engine instance."""
if self._compression_engine is None:
raise RuntimeError(
"Memory manager not initialized. Call initialize() first."
)
return self._compression_engine
@property
def archival_manager(self) -> ArchivalManager:
"""Get archival manager instance."""
if self._archival_manager is None:
raise RuntimeError(
"Memory manager not initialized. Call initialize() first."
)
return self._archival_manager
@property
def retention_policy(self) -> RetentionPolicy:
"""Get retention policy instance."""
if self._retention_policy is None:
raise RuntimeError(
"Memory manager not initialized. Call initialize() first."
)
return self._retention_policy
# Archival methods
def compress_conversation(self, conversation_id: str) -> Optional[Dict[str, Any]]:
"""
Compress a conversation based on its age.
Args:
conversation_id: ID of conversation to compress
Returns:
Compressed conversation data or None if not found
"""
if not self._is_initialized():
raise RuntimeError("Memory manager not initialized")
try:
conversation = self._sqlite_manager.get_conversation(
conversation_id, include_messages=True
)
if not conversation:
self.logger.error(
f"Conversation {conversation_id} not found for compression"
)
return None
compressed = self._compression_engine.compress_by_age(conversation)
return {
"original_conversation": conversation,
"compressed_conversation": compressed,
"compression_applied": True,
}
except Exception as e:
self.logger.error(f"Failed to compress conversation {conversation_id}: {e}")
return None
def archive_conversation(self, conversation_id: str) -> Optional[str]:
"""
Archive a conversation to JSON file.
Args:
conversation_id: ID of conversation to archive
Returns:
Path to archived file or None if failed
"""
if not self._is_initialized():
raise RuntimeError("Memory manager not initialized")
try:
conversation = self._sqlite_manager.get_conversation(
conversation_id, include_messages=True
)
if not conversation:
self.logger.error(
f"Conversation {conversation_id} not found for archival"
)
return None
compressed = self._compression_engine.compress_by_age(conversation)
archive_path = self._archival_manager.archive_conversation(
conversation, compressed
)
return archive_path
except Exception as e:
self.logger.error(f"Failed to archive conversation {conversation_id}: {e}")
return None
def get_retention_recommendations(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get retention recommendations for recent conversations.
Args:
limit: Number of conversations to analyze
Returns:
List of retention recommendations
"""
if not self._is_initialized():
raise RuntimeError("Memory manager not initialized")
try:
recent_conversations = self._sqlite_manager.get_recent_conversations(
limit=limit
)
full_conversations = []
for conv_data in recent_conversations:
full_conv = self._sqlite_manager.get_conversation(
conv_data["id"], include_messages=True
)
if full_conv:
full_conversations.append(full_conv)
return self._retention_policy.get_retention_recommendations(
full_conversations
)
except Exception as e:
self.logger.error(f"Failed to get retention recommendations: {e}")
return []
def trigger_automatic_compression(self, days_threshold: int = 30) -> Dict[str, Any]:
"""
Automatically compress conversations older than threshold.
Args:
days_threshold: Age in days to trigger compression
Returns:
Dictionary with compression results
"""
if not self._is_initialized():
raise RuntimeError("Memory manager not initialized")
try:
recent_conversations = self._sqlite_manager.get_recent_conversations(
limit=1000
)
compressed_count = 0
archived_count = 0
total_space_saved = 0
errors = []
from datetime import datetime, timedelta
for conv_data in recent_conversations:
try:
# Check conversation age
created_at = conv_data.get("created_at")
if created_at:
conv_date = datetime.fromisoformat(created_at)
age_days = (datetime.now() - conv_date).days
if age_days >= days_threshold:
# Get full conversation data
full_conv = self._sqlite_manager.get_conversation(
conv_data["id"], include_messages=True
)
if full_conv:
# Check retention policy
importance_score = (
self._retention_policy.calculate_importance_score(
full_conv
)
)
should_compress, level = (
self._retention_policy.should_retain_compressed(
full_conv, importance_score
)
)
if should_compress:
compressed = (
self._compression_engine.compress_by_age(
full_conv
)
)
# Calculate space saved
original_size = len(str(full_conv))
compressed_size = len(str(compressed))
space_saved = original_size - compressed_size
total_space_saved += space_saved
# Archive the compressed version
archive_path = (
self._archival_manager.archive_conversation(
full_conv, compressed
)
)
if archive_path:
archived_count += 1
compressed_count += 1
else:
errors.append(
f"Failed to archive conversation {conv_data['id']}"
)
else:
self.logger.debug(
f"Conversation {conv_data['id']} marked to retain full"
)
except Exception as e:
errors.append(
f"Error processing {conv_data.get('id', 'unknown')}: {e}"
)
continue
return {
"total_processed": len(recent_conversations),
"compressed_count": compressed_count,
"archived_count": archived_count,
"total_space_saved_bytes": total_space_saved,
"total_space_saved_mb": round(total_space_saved / (1024 * 1024), 2),
"errors": errors,
"threshold_days": days_threshold,
}
except Exception as e:
self.logger.error(f"Failed automatic compression: {e}")
return {"error": str(e), "compressed_count": 0, "archived_count": 0}
def get_archival_stats(self) -> Dict[str, Any]:
"""
Get archival statistics.
Returns:
Dictionary with archival statistics
"""
if not self._is_initialized():
raise RuntimeError("Memory manager not initialized")
try:
archive_stats = self._archival_manager.get_archive_stats()
retention_stats = self._retention_policy.get_retention_stats()
db_stats = self._sqlite_manager.get_database_stats()
return {
"archive": archive_stats,
"retention": retention_stats,
"database": db_stats,
"compression_ratio": self._calculate_overall_compression_ratio(),
}
except Exception as e:
self.logger.error(f"Failed to get archival stats: {e}")
return {}
def _calculate_overall_compression_ratio(self) -> float:
"""Calculate overall compression ratio across all data."""
try:
archive_stats = self._archival_manager.get_archive_stats()
if not archive_stats or "total_archive_size_bytes" not in archive_stats:
return 0.0
db_stats = self._sqlite_manager.get_database_stats()
total_db_size = db_stats.get("database_size_bytes", 0)
total_archive_size = archive_stats.get("total_archive_size_bytes", 0)
total_original_size = total_db_size + total_archive_size
if total_original_size == 0:
return 0.0
return (
(total_db_size / total_original_size)
if total_original_size > 0
else 0.0
)
except Exception as e:
self.logger.error(f"Failed to calculate compression ratio: {e}")
return 0.0
# Legacy methods for compatibility
def close(self) -> None:
"""Close database connections."""
@@ -314,6 +610,9 @@ class MemoryManager:
and self._semantic_search is not None
and self._context_aware_search is not None
and self._timeline_search is not None
and self._compression_engine is not None
and self._archival_manager is not None
and self._retention_policy is not None
)
@@ -322,7 +621,10 @@ __all__ = [
"MemoryManager",
"SQLiteManager",
"VectorStore",
"CompressionEngine",
"SemanticSearch",
"ContextAwareSearch",
"TimelineSearch",
"ArchivalManager",
"RetentionPolicy",
]

View File

@@ -0,0 +1,11 @@
"""
Memory backup and archival subsystem.
This package provides conversation archival, retention policies,
and long-term storage management for the memory system.
"""
from .archival import ArchivalManager
from .retention import RetentionPolicy
__all__ = ["ArchivalManager", "RetentionPolicy"]

View File

@@ -0,0 +1,431 @@
"""
JSON archival system for long-term conversation storage.
Provides export/import functionality for compressed conversations
with organized directory structure and version compatibility.
"""
import json
import os
import shutil
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Iterator
from pathlib import Path
import gzip
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
from memory.storage.compression import CompressionEngine, CompressedConversation
class ArchivalManager:
"""
JSON archival manager for compressed conversations.
Handles export/import of conversations with organized directory
structure and version compatibility for future upgrades.
"""
ARCHIVAL_VERSION = "1.0"
def __init__(
self,
archival_root: str = "archive",
compression_engine: Optional[CompressionEngine] = None,
):
"""
Initialize archival manager.
Args:
archival_root: Root directory for archived conversations
compression_engine: Optional compression engine instance
"""
self.archival_root = Path(archival_root)
self.archival_root.mkdir(exist_ok=True)
self.logger = logging.getLogger(__name__)
self.compression_engine = compression_engine or CompressionEngine()
# Create archive directory structure
self._initialize_directory_structure()
def _initialize_directory_structure(self) -> None:
"""Create standard archive directory structure."""
# Year/month structure: archive/YYYY/MM/
for year_dir in self.archival_root.iterdir():
if year_dir.is_dir() and year_dir.name.isdigit():
for month in range(1, 13):
month_dir = year_dir / f"{month:02d}"
month_dir.mkdir(exist_ok=True)
self.logger.debug(
f"Archive directory structure initialized: {self.archival_root}"
)
def _get_archive_path(self, conversation_date: datetime) -> Path:
"""
Get archive path for a conversation date.
Args:
conversation_date: Date of the conversation
Returns:
Path where conversation should be archived
"""
year_dir = self.archival_root / str(conversation_date.year)
month_dir = year_dir / f"{conversation_date.month:02d}"
# Create directories if they don't exist
year_dir.mkdir(exist_ok=True)
month_dir.mkdir(exist_ok=True)
return month_dir
def archive_conversation(
self, conversation: Dict[str, Any], compressed: CompressedConversation
) -> str:
"""
Archive a conversation to JSON file.
Args:
conversation: Original conversation data
compressed: Compressed conversation data
Returns:
Path to archived file
"""
try:
# Get archive path based on conversation date
conv_date = datetime.fromisoformat(
conversation.get("created_at", datetime.now().isoformat())
)
archive_path = self._get_archive_path(conv_date)
# Create filename
timestamp = conv_date.strftime("%Y%m%d_%H%M%S")
safe_title = "".join(
c
for c in conversation.get("title", "untitled")
if c.isalnum() or c in "-_"
)[:50]
filename = f"{timestamp}_{safe_title}_{conversation.get('id', 'unknown')[:8]}.json.gz"
file_path = archive_path / filename
# Prepare archival data
archival_data = {
"version": self.ARCHIVAL_VERSION,
"archived_at": datetime.now().isoformat(),
"original_conversation": conversation,
"compressed_conversation": {
"original_id": compressed.original_id,
"compression_level": compressed.compression_level.value,
"compressed_at": compressed.compressed_at.isoformat(),
"original_created_at": compressed.original_created_at.isoformat(),
"content": compressed.content,
"metadata": compressed.metadata,
"metrics": {
"original_length": compressed.metrics.original_length,
"compressed_length": compressed.metrics.compressed_length,
"compression_ratio": compressed.metrics.compression_ratio,
"information_retention_score": compressed.metrics.information_retention_score,
"quality_score": compressed.metrics.quality_score,
},
},
}
# Write compressed JSON file
with gzip.open(file_path, "wt", encoding="utf-8") as f:
json.dump(archival_data, f, indent=2, ensure_ascii=False)
self.logger.info(
f"Archived conversation {conversation.get('id')} to {file_path}"
)
return str(file_path)
except Exception as e:
self.logger.error(
f"Failed to archive conversation {conversation.get('id')}: {e}"
)
raise
def archive_conversations_batch(
self, conversations: List[Dict[str, Any]], compress: bool = True
) -> List[str]:
"""
Archive multiple conversations efficiently.
Args:
conversations: List of conversations to archive
compress: Whether to compress conversations before archiving
Returns:
List of archived file paths
"""
archived_paths = []
for conversation in conversations:
try:
# Compress if requested
if compress:
compressed = self.compression_engine.compress_by_age(conversation)
else:
# Create uncompressed version
from memory.storage.compression import (
CompressionLevel,
CompressedConversation,
CompressionMetrics,
)
from datetime import datetime
compressed = CompressedConversation(
original_id=conversation.get("id", "unknown"),
compression_level=CompressionLevel.FULL,
compressed_at=datetime.now(),
original_created_at=datetime.fromisoformat(
conversation.get("created_at", datetime.now().isoformat())
),
content=conversation,
metadata={"uncompressed": True},
metrics=CompressionMetrics(
original_length=len(json.dumps(conversation)),
compressed_length=len(json.dumps(conversation)),
compression_ratio=1.0,
information_retention_score=1.0,
quality_score=1.0,
),
)
path = self.archive_conversation(conversation, compressed)
archived_paths.append(path)
except Exception as e:
self.logger.error(
f"Failed to archive conversation {conversation.get('id', 'unknown')}: {e}"
)
continue
self.logger.info(
f"Archived {len(archived_paths)}/{len(conversations)} conversations"
)
return archived_paths
def restore_conversation(self, archive_path: str) -> Optional[Dict[str, Any]]:
"""
Restore a conversation from archive.
Args:
archive_path: Path to archived file
Returns:
Restored conversation data or None if failed
"""
try:
archive_file = Path(archive_path)
if not archive_file.exists():
self.logger.error(f"Archive file not found: {archive_path}")
return None
# Read and decompress archive file
with gzip.open(archive_file, "rt", encoding="utf-8") as f:
archival_data = json.load(f)
# Verify version compatibility
version = archival_data.get("version", "unknown")
if version != self.ARCHIVAL_VERSION:
self.logger.warning(
f"Archive version {version} may not be compatible with current version {self.ARCHIVAL_VERSION}"
)
# Return the original conversation (or decompressed version if preferred)
original_conversation = archival_data.get("original_conversation")
compressed_info = archival_data.get("compressed_conversation", {})
# Add archival metadata to conversation
original_conversation["_archival_info"] = {
"archived_at": archival_data.get("archived_at"),
"archive_path": str(archive_file),
"compression_level": compressed_info.get("compression_level"),
"compression_ratio": compressed_info.get("metrics", {}).get(
"compression_ratio", 1.0
),
"version": version,
}
self.logger.info(f"Restored conversation from {archive_path}")
return original_conversation
except Exception as e:
self.logger.error(
f"Failed to restore conversation from {archive_path}: {e}"
)
return None
def list_archived(
self,
year: Optional[int] = None,
month: Optional[int] = None,
include_content: bool = False,
) -> List[Dict[str, Any]]:
"""
List archived conversations with optional filtering.
Args:
year: Optional year filter
month: Optional month filter (1-12)
include_content: Whether to include conversation content
Returns:
List of archived conversation info
"""
archived_list = []
try:
# Determine search path
search_path = self.archival_root
if year:
search_path = search_path / str(year)
if month:
search_path = search_path / f"{month:02d}"
if not search_path.exists():
return []
# Scan for archive files
for archive_file in search_path.rglob("*.json.gz"):
try:
# Read minimal metadata without loading full content
with gzip.open(archive_file, "rt", encoding="utf-8") as f:
archival_data = json.load(f)
conversation = archival_data.get("original_conversation", {})
compressed = archival_data.get("compressed_conversation", {})
archive_info = {
"id": conversation.get("id"),
"title": conversation.get("title"),
"created_at": conversation.get("created_at"),
"archived_at": archival_data.get("archived_at"),
"archive_path": str(archive_file),
"compression_level": compressed.get("compression_level"),
"compression_ratio": compressed.get("metrics", {}).get(
"compression_ratio", 1.0
),
"version": archival_data.get("version"),
}
if include_content:
archive_info["original_conversation"] = conversation
archive_info["compressed_conversation"] = compressed
archived_list.append(archive_info)
except Exception as e:
self.logger.error(
f"Failed to read archive file {archive_file}: {e}"
)
continue
# Sort by archived date (newest first)
archived_list.sort(key=lambda x: x.get("archived_at", ""), reverse=True)
return archived_list
except Exception as e:
self.logger.error(f"Failed to list archived conversations: {e}")
return []
def delete_archive(self, archive_path: str) -> bool:
"""
Delete an archived conversation.
Args:
archive_path: Path to archived file
Returns:
True if deleted successfully, False otherwise
"""
try:
archive_file = Path(archive_path)
if archive_file.exists():
archive_file.unlink()
self.logger.info(f"Deleted archive: {archive_path}")
return True
else:
self.logger.warning(f"Archive file not found: {archive_path}")
return False
except Exception as e:
self.logger.error(f"Failed to delete archive {archive_path}: {e}")
return False
def get_archive_stats(self) -> Dict[str, Any]:
"""
Get statistics about archived conversations.
Returns:
Dictionary with archive statistics
"""
try:
total_files = 0
total_size = 0
compression_levels = {}
years = set()
for archive_file in self.archival_root.rglob("*.json.gz"):
try:
total_files += 1
total_size += archive_file.stat().st_size
# Extract year from path
path_parts = archive_file.parts
for i, part in enumerate(path_parts):
if part == str(self.archival_root.name) and i + 1 < len(
path_parts
):
year_part = path_parts[i + 1]
if year_part.isdigit():
years.add(year_part)
break
# Read compression level without loading full content
with gzip.open(archive_file, "rt", encoding="utf-8") as f:
archival_data = json.load(f)
compressed = archival_data.get("compressed_conversation", {})
level = compressed.get("compression_level", "unknown")
compression_levels[level] = compression_levels.get(level, 0) + 1
except Exception as e:
self.logger.error(
f"Failed to analyze archive file {archive_file}: {e}"
)
continue
return {
"total_archived_conversations": total_files,
"total_archive_size_bytes": total_size,
"total_archive_size_mb": round(total_size / (1024 * 1024), 2),
"compression_levels": compression_levels,
"years_with_archives": sorted(list(years)),
"archive_directory": str(self.archival_root),
}
except Exception as e:
self.logger.error(f"Failed to get archive stats: {e}")
return {}
def migrate_archives(self, from_version: str, to_version: str) -> int:
"""
Migrate archives from one version to another.
Args:
from_version: Source version
to_version: Target version
Returns:
Number of archives migrated
"""
# Placeholder for future migration functionality
self.logger.info(
f"Migration from {from_version} to {to_version} not yet implemented"
)
return 0

View File

@@ -0,0 +1,540 @@
"""
Smart retention policies for conversation preservation.
Implements value-based retention scoring that keeps important
conversations longer while efficiently managing storage usage.
"""
import logging
import re
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
from collections import defaultdict
import statistics
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
from memory.storage.sqlite_manager import SQLiteManager
class RetentionPolicy:
"""
Smart retention policy engine.
Calculates conversation importance scores and determines
which conversations should be retained or compressed.
"""
def __init__(self, sqlite_manager: SQLiteManager):
"""
Initialize retention policy.
Args:
sqlite_manager: SQLite manager instance for data access
"""
self.db_manager = sqlite_manager
self.logger = logging.getLogger(__name__)
# Retention policy parameters
self.important_threshold = 0.7 # Above this = retain full
self.preserve_threshold = 0.4 # Above this = lighter compression
self.user_marked_multiplier = 1.5 # Boost for user-marked important
# Engagement scoring weights
self.weights = {
"message_count": 0.2, # More messages = higher engagement
"response_quality": 0.25, # Back-and-forth conversation
"topic_diversity": 0.15, # Multiple topics = important
"time_span": 0.1, # Longer duration = important
"user_marked": 0.2, # User explicitly marked important
"question_density": 0.1, # Questions = seeking information
}
def calculate_importance_score(self, conversation: Dict[str, Any]) -> float:
"""
Calculate importance score for a conversation.
Args:
conversation: Conversation data with messages and metadata
Returns:
Importance score between 0.0 and 1.0
"""
try:
messages = conversation.get("messages", [])
if not messages:
return 0.0
# Extract basic metrics
message_count = len(messages)
user_messages = [m for m in messages if m["role"] == "user"]
assistant_messages = [m for m in messages if m["role"] == "assistant"]
# Calculate engagement metrics
scores = {}
# 1. Message count score (normalized)
scores["message_count"] = min(
message_count / 20, 1.0
) # 20 messages = full score
# 2. Response quality (back-and-forth ratio)
if len(user_messages) > 0 and len(assistant_messages) > 0:
ratio = min(len(assistant_messages), len(user_messages)) / max(
len(assistant_messages), len(user_messages)
)
scores["response_quality"] = ratio # Close to 1.0 = good conversation
else:
scores["response_quality"] = 0.5
# 3. Topic diversity (variety in content)
scores["topic_diversity"] = self._calculate_topic_diversity(messages)
# 4. Time span (conversation duration)
scores["time_span"] = self._calculate_time_span_score(messages)
# 5. User marked important
metadata = conversation.get("metadata", {})
user_marked = metadata.get("user_marked_important", False)
scores["user_marked"] = self.user_marked_multiplier if user_marked else 1.0
# 6. Question density (information seeking)
scores["question_density"] = self._calculate_question_density(user_messages)
# Calculate weighted final score
final_score = 0.0
for factor, weight in self.weights.items():
final_score += scores.get(factor, 0.0) * weight
# Normalize to 0-1 range
final_score = max(0.0, min(1.0, final_score))
self.logger.debug(
f"Importance score for {conversation.get('id')}: {final_score:.3f}"
)
return final_score
except Exception as e:
self.logger.error(f"Failed to calculate importance score: {e}")
return 0.5 # Default to neutral
def _calculate_topic_diversity(self, messages: List[Dict[str, Any]]) -> float:
"""Calculate topic diversity score from messages."""
try:
# Simple topic-based diversity using keyword categories
topic_keywords = {
"technical": [
"code",
"programming",
"algorithm",
"function",
"bug",
"debug",
"api",
"database",
],
"personal": [
"feel",
"think",
"opinion",
"prefer",
"like",
"personal",
"life",
],
"work": [
"project",
"task",
"deadline",
"meeting",
"team",
"work",
"job",
],
"learning": [
"learn",
"study",
"understand",
"explain",
"tutorial",
"help",
],
"planning": ["plan", "schedule", "organize", "goal", "strategy"],
"creative": ["design", "create", "write", "art", "music", "story"],
}
topic_counts = defaultdict(int)
total_content = ""
for message in messages:
if message["role"] in ["user", "assistant"]:
content = message["content"].lower()
total_content += content + " "
# Count topic occurrences
for topic, keywords in topic_keywords.items():
for keyword in keywords:
if keyword in content:
topic_counts[topic] += 1
# Diversity = number of topics with significant presence
significant_topics = sum(1 for count in topic_counts.values() if count >= 2)
diversity_score = min(significant_topics / len(topic_keywords), 1.0)
return diversity_score
except Exception as e:
self.logger.error(f"Failed to calculate topic diversity: {e}")
return 0.5
def _calculate_time_span_score(self, messages: List[Dict[str, Any]]) -> float:
"""Calculate time span score based on conversation duration."""
try:
timestamps = []
for message in messages:
if "timestamp" in message:
try:
ts = datetime.fromisoformat(message["timestamp"])
timestamps.append(ts)
except:
continue
if len(timestamps) < 2:
return 0.1 # Very short conversation
duration = max(timestamps) - min(timestamps)
duration_hours = duration.total_seconds() / 3600
# Score based on duration (24 hours = full score)
return min(duration_hours / 24, 1.0)
except Exception as e:
self.logger.error(f"Failed to calculate time span: {e}")
return 0.5
def _calculate_question_density(self, user_messages: List[Dict[str, Any]]) -> float:
"""Calculate question density from user messages."""
try:
if not user_messages:
return 0.0
question_count = 0
total_words = 0
for message in user_messages:
content = message["content"]
# Count questions
question_marks = content.count("?")
question_words = len(
re.findall(
r"\b(how|what|when|where|why|which|who|can|could|would|should|is|are|do|does)\b",
content,
re.IGNORECASE,
)
)
question_count += question_marks + question_words
# Count words
words = len(content.split())
total_words += words
if total_words == 0:
return 0.0
question_ratio = question_count / total_words
return min(question_ratio * 5, 1.0) # Normalize
except Exception as e:
self.logger.error(f"Failed to calculate question density: {e}")
return 0.5
def should_retain_full(
self, conversation: Dict[str, Any], importance_score: Optional[float] = None
) -> bool:
"""
Determine if conversation should be retained in full form.
Args:
conversation: Conversation data
importance_score: Pre-calculated importance score (optional)
Returns:
True if conversation should be retained full
"""
if importance_score is None:
importance_score = self.calculate_importance_score(conversation)
# User explicitly marked important always retained
metadata = conversation.get("metadata", {})
if metadata.get("user_marked_important", False):
return True
# High importance score
if importance_score >= self.important_threshold:
return True
# Recent important conversations (within 30 days)
created_at = conversation.get("created_at")
if created_at:
try:
conv_date = datetime.fromisoformat(created_at)
if (datetime.now() - conv_date).days <= 30 and importance_score >= 0.5:
return True
except:
pass
return False
def should_retain_compressed(
self, conversation: Dict[str, Any], importance_score: Optional[float] = None
) -> Tuple[bool, str]:
"""
Determine if conversation should be compressed and to what level.
Args:
conversation: Conversation data
importance_score: Pre-calculated importance score (optional)
Returns:
Tuple of (should_compress, recommended_compression_level)
"""
if importance_score is None:
importance_score = self.calculate_importance_score(conversation)
# Check if should retain full
if self.should_retain_full(conversation, importance_score):
return False, "full"
# Determine compression level based on importance
if importance_score >= self.preserve_threshold:
# Important: lighter compression (key points)
return True, "key_points"
elif importance_score >= 0.2:
# Moderately important: summary compression
return True, "summary"
else:
# Low importance: metadata only
return True, "metadata"
def update_retention_policy(self, policy_settings: Dict[str, Any]) -> None:
"""
Update retention policy parameters.
Args:
policy_settings: Dictionary of policy parameter updates
"""
try:
if "important_threshold" in policy_settings:
self.important_threshold = float(policy_settings["important_threshold"])
if "preserve_threshold" in policy_settings:
self.preserve_threshold = float(policy_settings["preserve_threshold"])
if "user_marked_multiplier" in policy_settings:
self.user_marked_multiplier = float(
policy_settings["user_marked_multiplier"]
)
if "weights" in policy_settings:
self.weights.update(policy_settings["weights"])
self.logger.info(f"Updated retention policy: {policy_settings}")
except Exception as e:
self.logger.error(f"Failed to update retention policy: {e}")
def get_retention_recommendations(
self, conversations: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Get retention recommendations for multiple conversations.
Args:
conversations: List of conversations to analyze
Returns:
List of recommendations with scores and actions
"""
recommendations = []
for conversation in conversations:
try:
importance_score = self.calculate_importance_score(conversation)
should_compress, compression_level = self.should_retain_compressed(
conversation, importance_score
)
recommendation = {
"conversation_id": conversation.get("id"),
"title": conversation.get("title"),
"created_at": conversation.get("created_at"),
"importance_score": importance_score,
"should_compress": should_compress,
"recommended_level": compression_level,
"user_marked_important": conversation.get("metadata", {}).get(
"user_marked_important", False
),
"message_count": len(conversation.get("messages", [])),
"retention_reason": self._get_retention_reason(
importance_score, compression_level
),
}
recommendations.append(recommendation)
except Exception as e:
self.logger.error(
f"Failed to analyze conversation {conversation.get('id')}: {e}"
)
continue
# Sort by importance score (highest first)
recommendations.sort(key=lambda x: x["importance_score"], reverse=True)
return recommendations
def _get_retention_reason(
self, importance_score: float, compression_level: str
) -> str:
"""Get human-readable reason for retention decision."""
if compression_level == "full":
if importance_score >= self.important_threshold:
return "High importance - retained full"
else:
return "Recent conversation - retained full"
elif compression_level == "key_points":
return f"Moderate importance ({importance_score:.2f}) - key points retained"
elif compression_level == "summary":
return f"Standard importance ({importance_score:.2f}) - summary compression"
else:
return f"Low importance ({importance_score:.2f}) - metadata only"
def mark_conversation_important(
self, conversation_id: str, important: bool = True
) -> bool:
"""
Mark a conversation as user-important.
Args:
conversation_id: ID of conversation to mark
important: Whether to mark as important (True) or not important (False)
Returns:
True if marked successfully
"""
try:
conversation = self.db_manager.get_conversation(
conversation_id, include_messages=False
)
if not conversation:
self.logger.error(f"Conversation {conversation_id} not found")
return False
# Update metadata
metadata = conversation.get("metadata", {})
metadata["user_marked_important"] = important
metadata["marked_important_at"] = datetime.now().isoformat()
self.db_manager.update_conversation_metadata(conversation_id, metadata)
self.logger.info(
f"Marked conversation {conversation_id} as {'important' if important else 'not important'}"
)
return True
except Exception as e:
self.logger.error(
f"Failed to mark conversation {conversation_id} important: {e}"
)
return False
def get_important_conversations(self) -> List[Dict[str, Any]]:
"""
Get all user-marked important conversations.
Returns:
List of important conversations
"""
try:
recent_conversations = self.db_manager.get_recent_conversations(limit=1000)
important_conversations = []
for conversation in recent_conversations:
full_conversation = self.db_manager.get_conversation(
conversation["id"], include_messages=True
)
if full_conversation:
metadata = full_conversation.get("metadata", {})
if metadata.get("user_marked_important", False):
important_conversations.append(full_conversation)
return important_conversations
except Exception as e:
self.logger.error(f"Failed to get important conversations: {e}")
return []
def get_retention_stats(self) -> Dict[str, Any]:
"""
Get retention policy statistics.
Returns:
Dictionary with retention statistics
"""
try:
recent_conversations = self.db_manager.get_recent_conversations(limit=500)
stats = {
"total_conversations": len(recent_conversations),
"important_marked": 0,
"importance_distribution": {"high": 0, "medium": 0, "low": 0},
"average_importance": 0.0,
"compression_recommendations": {
"full": 0,
"key_points": 0,
"summary": 0,
"metadata": 0,
},
}
importance_scores = []
for conv_data in recent_conversations:
conversation = self.db_manager.get_conversation(
conv_data["id"], include_messages=True
)
if not conversation:
continue
importance_score = self.calculate_importance_score(conversation)
importance_scores.append(importance_score)
# Check if user marked important
metadata = conversation.get("metadata", {})
if metadata.get("user_marked_important", False):
stats["important_marked"] += 1
# Categorize importance
if importance_score >= self.important_threshold:
stats["importance_distribution"]["high"] += 1
elif importance_score >= self.preserve_threshold:
stats["importance_distribution"]["medium"] += 1
else:
stats["importance_distribution"]["low"] += 1
# Compression recommendations
should_compress, level = self.should_retain_compressed(
conversation, importance_score
)
if level in stats["compression_recommendations"]:
stats["compression_recommendations"][level] += 1
else:
stats["compression_recommendations"]["full"] += 1
if importance_scores:
stats["average_importance"] = statistics.mean(importance_scores)
return stats
except Exception as e:
self.logger.error(f"Failed to get retention stats: {e}")
return {}