From 8c58b1d0703b3df4ec5a92a7b24eccc49033e951 Mon Sep 17 00:00:00 2001 From: Mai Development Date: Tue, 27 Jan 2026 23:56:49 -0500 Subject: [PATCH] feat(04-03): create JSON archival and smart retention systems - Added ArchivalManager for JSON export/import with gzip compression - Implemented organized directory structure by year/month - Added batch archival operations and restore functionality - Created RetentionPolicy with importance-based scoring - Smart retention considers engagement, topics, user-marked importance - MemoryManager integrates compression and archival automatically - Added automatic compression triggering and archival scheduling - Comprehensive archival statistics and retention recommendations - Support for backup integration and restore verification --- src/memory/__init__.py | 304 ++++++++++++++++++- src/memory/backup/__init__.py | 11 + src/memory/backup/archival.py | 431 ++++++++++++++++++++++++++ src/memory/backup/retention.py | 540 +++++++++++++++++++++++++++++++++ 4 files changed, 1285 insertions(+), 1 deletion(-) create mode 100644 src/memory/backup/__init__.py create mode 100644 src/memory/backup/archival.py create mode 100644 src/memory/backup/retention.py diff --git a/src/memory/__init__.py b/src/memory/__init__.py index 77f41c3..3444dd6 100644 --- a/src/memory/__init__.py +++ b/src/memory/__init__.py @@ -7,9 +7,12 @@ messages, and associated vector embeddings for semantic search capabilities. from .storage.sqlite_manager import SQLiteManager from .storage.vector_store import VectorStore +from .storage.compression import CompressionEngine from .retrieval.semantic_search import SemanticSearch from .retrieval.context_aware import ContextAwareSearch from .retrieval.timeline_search import TimelineSearch +from .backup.archival import ArchivalManager +from .backup.retention import RetentionPolicy from typing import Optional, List, Dict, Any, Union from datetime import datetime @@ -37,6 +40,9 @@ class MemoryManager: self._semantic_search: Optional[SemanticSearch] = None self._context_aware_search: Optional[ContextAwareSearch] = None self._timeline_search: Optional[TimelineSearch] = None + self._compression_engine: Optional[CompressionEngine] = None + self._archival_manager: Optional[ArchivalManager] = None + self._retention_policy: Optional[RetentionPolicy] = None self.logger = logging.getLogger(__name__) def initialize(self) -> None: @@ -55,8 +61,15 @@ class MemoryManager: self._context_aware_search = ContextAwareSearch(self._sqlite_manager) self._timeline_search = TimelineSearch(self._sqlite_manager) + # Initialize archival components + self._compression_engine = CompressionEngine() + self._archival_manager = ArchivalManager( + compression_engine=self._compression_engine + ) + self._retention_policy = RetentionPolicy(self._sqlite_manager) + self.logger.info( - f"Enhanced memory manager initialized with database: {self.db_path}" + f"Enhanced memory manager initialized with archival: {self.db_path}" ) except Exception as e: self.logger.error(f"Failed to initialize enhanced memory manager: {e}") @@ -107,6 +120,289 @@ class MemoryManager: ) return self._timeline_search + @property + def compression_engine(self) -> CompressionEngine: + """Get compression engine instance.""" + if self._compression_engine is None: + raise RuntimeError( + "Memory manager not initialized. Call initialize() first." + ) + return self._compression_engine + + @property + def archival_manager(self) -> ArchivalManager: + """Get archival manager instance.""" + if self._archival_manager is None: + raise RuntimeError( + "Memory manager not initialized. Call initialize() first." + ) + return self._archival_manager + + @property + def retention_policy(self) -> RetentionPolicy: + """Get retention policy instance.""" + if self._retention_policy is None: + raise RuntimeError( + "Memory manager not initialized. Call initialize() first." + ) + return self._retention_policy + + # Archival methods + def compress_conversation(self, conversation_id: str) -> Optional[Dict[str, Any]]: + """ + Compress a conversation based on its age. + + Args: + conversation_id: ID of conversation to compress + + Returns: + Compressed conversation data or None if not found + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + try: + conversation = self._sqlite_manager.get_conversation( + conversation_id, include_messages=True + ) + if not conversation: + self.logger.error( + f"Conversation {conversation_id} not found for compression" + ) + return None + + compressed = self._compression_engine.compress_by_age(conversation) + return { + "original_conversation": conversation, + "compressed_conversation": compressed, + "compression_applied": True, + } + + except Exception as e: + self.logger.error(f"Failed to compress conversation {conversation_id}: {e}") + return None + + def archive_conversation(self, conversation_id: str) -> Optional[str]: + """ + Archive a conversation to JSON file. + + Args: + conversation_id: ID of conversation to archive + + Returns: + Path to archived file or None if failed + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + try: + conversation = self._sqlite_manager.get_conversation( + conversation_id, include_messages=True + ) + if not conversation: + self.logger.error( + f"Conversation {conversation_id} not found for archival" + ) + return None + + compressed = self._compression_engine.compress_by_age(conversation) + archive_path = self._archival_manager.archive_conversation( + conversation, compressed + ) + return archive_path + + except Exception as e: + self.logger.error(f"Failed to archive conversation {conversation_id}: {e}") + return None + + def get_retention_recommendations(self, limit: int = 100) -> List[Dict[str, Any]]: + """ + Get retention recommendations for recent conversations. + + Args: + limit: Number of conversations to analyze + + Returns: + List of retention recommendations + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + try: + recent_conversations = self._sqlite_manager.get_recent_conversations( + limit=limit + ) + + full_conversations = [] + for conv_data in recent_conversations: + full_conv = self._sqlite_manager.get_conversation( + conv_data["id"], include_messages=True + ) + if full_conv: + full_conversations.append(full_conv) + + return self._retention_policy.get_retention_recommendations( + full_conversations + ) + + except Exception as e: + self.logger.error(f"Failed to get retention recommendations: {e}") + return [] + + def trigger_automatic_compression(self, days_threshold: int = 30) -> Dict[str, Any]: + """ + Automatically compress conversations older than threshold. + + Args: + days_threshold: Age in days to trigger compression + + Returns: + Dictionary with compression results + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + try: + recent_conversations = self._sqlite_manager.get_recent_conversations( + limit=1000 + ) + + compressed_count = 0 + archived_count = 0 + total_space_saved = 0 + errors = [] + + from datetime import datetime, timedelta + + for conv_data in recent_conversations: + try: + # Check conversation age + created_at = conv_data.get("created_at") + if created_at: + conv_date = datetime.fromisoformat(created_at) + age_days = (datetime.now() - conv_date).days + + if age_days >= days_threshold: + # Get full conversation data + full_conv = self._sqlite_manager.get_conversation( + conv_data["id"], include_messages=True + ) + if full_conv: + # Check retention policy + importance_score = ( + self._retention_policy.calculate_importance_score( + full_conv + ) + ) + should_compress, level = ( + self._retention_policy.should_retain_compressed( + full_conv, importance_score + ) + ) + + if should_compress: + compressed = ( + self._compression_engine.compress_by_age( + full_conv + ) + ) + + # Calculate space saved + original_size = len(str(full_conv)) + compressed_size = len(str(compressed)) + space_saved = original_size - compressed_size + total_space_saved += space_saved + + # Archive the compressed version + archive_path = ( + self._archival_manager.archive_conversation( + full_conv, compressed + ) + ) + if archive_path: + archived_count += 1 + compressed_count += 1 + else: + errors.append( + f"Failed to archive conversation {conv_data['id']}" + ) + else: + self.logger.debug( + f"Conversation {conv_data['id']} marked to retain full" + ) + + except Exception as e: + errors.append( + f"Error processing {conv_data.get('id', 'unknown')}: {e}" + ) + continue + + return { + "total_processed": len(recent_conversations), + "compressed_count": compressed_count, + "archived_count": archived_count, + "total_space_saved_bytes": total_space_saved, + "total_space_saved_mb": round(total_space_saved / (1024 * 1024), 2), + "errors": errors, + "threshold_days": days_threshold, + } + + except Exception as e: + self.logger.error(f"Failed automatic compression: {e}") + return {"error": str(e), "compressed_count": 0, "archived_count": 0} + + def get_archival_stats(self) -> Dict[str, Any]: + """ + Get archival statistics. + + Returns: + Dictionary with archival statistics + """ + if not self._is_initialized(): + raise RuntimeError("Memory manager not initialized") + + try: + archive_stats = self._archival_manager.get_archive_stats() + retention_stats = self._retention_policy.get_retention_stats() + db_stats = self._sqlite_manager.get_database_stats() + + return { + "archive": archive_stats, + "retention": retention_stats, + "database": db_stats, + "compression_ratio": self._calculate_overall_compression_ratio(), + } + + except Exception as e: + self.logger.error(f"Failed to get archival stats: {e}") + return {} + + def _calculate_overall_compression_ratio(self) -> float: + """Calculate overall compression ratio across all data.""" + try: + archive_stats = self._archival_manager.get_archive_stats() + + if not archive_stats or "total_archive_size_bytes" not in archive_stats: + return 0.0 + + db_stats = self._sqlite_manager.get_database_stats() + total_db_size = db_stats.get("database_size_bytes", 0) + total_archive_size = archive_stats.get("total_archive_size_bytes", 0) + total_original_size = total_db_size + total_archive_size + + if total_original_size == 0: + return 0.0 + + return ( + (total_db_size / total_original_size) + if total_original_size > 0 + else 0.0 + ) + + except Exception as e: + self.logger.error(f"Failed to calculate compression ratio: {e}") + return 0.0 + # Legacy methods for compatibility def close(self) -> None: """Close database connections.""" @@ -314,6 +610,9 @@ class MemoryManager: and self._semantic_search is not None and self._context_aware_search is not None and self._timeline_search is not None + and self._compression_engine is not None + and self._archival_manager is not None + and self._retention_policy is not None ) @@ -322,7 +621,10 @@ __all__ = [ "MemoryManager", "SQLiteManager", "VectorStore", + "CompressionEngine", "SemanticSearch", "ContextAwareSearch", "TimelineSearch", + "ArchivalManager", + "RetentionPolicy", ] diff --git a/src/memory/backup/__init__.py b/src/memory/backup/__init__.py new file mode 100644 index 0000000..30d9744 --- /dev/null +++ b/src/memory/backup/__init__.py @@ -0,0 +1,11 @@ +""" +Memory backup and archival subsystem. + +This package provides conversation archival, retention policies, +and long-term storage management for the memory system. +""" + +from .archival import ArchivalManager +from .retention import RetentionPolicy + +__all__ = ["ArchivalManager", "RetentionPolicy"] diff --git a/src/memory/backup/archival.py b/src/memory/backup/archival.py new file mode 100644 index 0000000..f932278 --- /dev/null +++ b/src/memory/backup/archival.py @@ -0,0 +1,431 @@ +""" +JSON archival system for long-term conversation storage. + +Provides export/import functionality for compressed conversations +with organized directory structure and version compatibility. +""" + +import json +import os +import shutil +import logging +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional, Iterator +from pathlib import Path +import gzip + +import sys + +sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) + +from memory.storage.compression import CompressionEngine, CompressedConversation + + +class ArchivalManager: + """ + JSON archival manager for compressed conversations. + + Handles export/import of conversations with organized directory + structure and version compatibility for future upgrades. + """ + + ARCHIVAL_VERSION = "1.0" + + def __init__( + self, + archival_root: str = "archive", + compression_engine: Optional[CompressionEngine] = None, + ): + """ + Initialize archival manager. + + Args: + archival_root: Root directory for archived conversations + compression_engine: Optional compression engine instance + """ + self.archival_root = Path(archival_root) + self.archival_root.mkdir(exist_ok=True) + self.logger = logging.getLogger(__name__) + self.compression_engine = compression_engine or CompressionEngine() + + # Create archive directory structure + self._initialize_directory_structure() + + def _initialize_directory_structure(self) -> None: + """Create standard archive directory structure.""" + # Year/month structure: archive/YYYY/MM/ + for year_dir in self.archival_root.iterdir(): + if year_dir.is_dir() and year_dir.name.isdigit(): + for month in range(1, 13): + month_dir = year_dir / f"{month:02d}" + month_dir.mkdir(exist_ok=True) + + self.logger.debug( + f"Archive directory structure initialized: {self.archival_root}" + ) + + def _get_archive_path(self, conversation_date: datetime) -> Path: + """ + Get archive path for a conversation date. + + Args: + conversation_date: Date of the conversation + + Returns: + Path where conversation should be archived + """ + year_dir = self.archival_root / str(conversation_date.year) + month_dir = year_dir / f"{conversation_date.month:02d}" + + # Create directories if they don't exist + year_dir.mkdir(exist_ok=True) + month_dir.mkdir(exist_ok=True) + + return month_dir + + def archive_conversation( + self, conversation: Dict[str, Any], compressed: CompressedConversation + ) -> str: + """ + Archive a conversation to JSON file. + + Args: + conversation: Original conversation data + compressed: Compressed conversation data + + Returns: + Path to archived file + """ + try: + # Get archive path based on conversation date + conv_date = datetime.fromisoformat( + conversation.get("created_at", datetime.now().isoformat()) + ) + archive_path = self._get_archive_path(conv_date) + + # Create filename + timestamp = conv_date.strftime("%Y%m%d_%H%M%S") + safe_title = "".join( + c + for c in conversation.get("title", "untitled") + if c.isalnum() or c in "-_" + )[:50] + filename = f"{timestamp}_{safe_title}_{conversation.get('id', 'unknown')[:8]}.json.gz" + file_path = archive_path / filename + + # Prepare archival data + archival_data = { + "version": self.ARCHIVAL_VERSION, + "archived_at": datetime.now().isoformat(), + "original_conversation": conversation, + "compressed_conversation": { + "original_id": compressed.original_id, + "compression_level": compressed.compression_level.value, + "compressed_at": compressed.compressed_at.isoformat(), + "original_created_at": compressed.original_created_at.isoformat(), + "content": compressed.content, + "metadata": compressed.metadata, + "metrics": { + "original_length": compressed.metrics.original_length, + "compressed_length": compressed.metrics.compressed_length, + "compression_ratio": compressed.metrics.compression_ratio, + "information_retention_score": compressed.metrics.information_retention_score, + "quality_score": compressed.metrics.quality_score, + }, + }, + } + + # Write compressed JSON file + with gzip.open(file_path, "wt", encoding="utf-8") as f: + json.dump(archival_data, f, indent=2, ensure_ascii=False) + + self.logger.info( + f"Archived conversation {conversation.get('id')} to {file_path}" + ) + return str(file_path) + + except Exception as e: + self.logger.error( + f"Failed to archive conversation {conversation.get('id')}: {e}" + ) + raise + + def archive_conversations_batch( + self, conversations: List[Dict[str, Any]], compress: bool = True + ) -> List[str]: + """ + Archive multiple conversations efficiently. + + Args: + conversations: List of conversations to archive + compress: Whether to compress conversations before archiving + + Returns: + List of archived file paths + """ + archived_paths = [] + + for conversation in conversations: + try: + # Compress if requested + if compress: + compressed = self.compression_engine.compress_by_age(conversation) + else: + # Create uncompressed version + from memory.storage.compression import ( + CompressionLevel, + CompressedConversation, + CompressionMetrics, + ) + from datetime import datetime + + compressed = CompressedConversation( + original_id=conversation.get("id", "unknown"), + compression_level=CompressionLevel.FULL, + compressed_at=datetime.now(), + original_created_at=datetime.fromisoformat( + conversation.get("created_at", datetime.now().isoformat()) + ), + content=conversation, + metadata={"uncompressed": True}, + metrics=CompressionMetrics( + original_length=len(json.dumps(conversation)), + compressed_length=len(json.dumps(conversation)), + compression_ratio=1.0, + information_retention_score=1.0, + quality_score=1.0, + ), + ) + + path = self.archive_conversation(conversation, compressed) + archived_paths.append(path) + + except Exception as e: + self.logger.error( + f"Failed to archive conversation {conversation.get('id', 'unknown')}: {e}" + ) + continue + + self.logger.info( + f"Archived {len(archived_paths)}/{len(conversations)} conversations" + ) + return archived_paths + + def restore_conversation(self, archive_path: str) -> Optional[Dict[str, Any]]: + """ + Restore a conversation from archive. + + Args: + archive_path: Path to archived file + + Returns: + Restored conversation data or None if failed + """ + try: + archive_file = Path(archive_path) + if not archive_file.exists(): + self.logger.error(f"Archive file not found: {archive_path}") + return None + + # Read and decompress archive file + with gzip.open(archive_file, "rt", encoding="utf-8") as f: + archival_data = json.load(f) + + # Verify version compatibility + version = archival_data.get("version", "unknown") + if version != self.ARCHIVAL_VERSION: + self.logger.warning( + f"Archive version {version} may not be compatible with current version {self.ARCHIVAL_VERSION}" + ) + + # Return the original conversation (or decompressed version if preferred) + original_conversation = archival_data.get("original_conversation") + compressed_info = archival_data.get("compressed_conversation", {}) + + # Add archival metadata to conversation + original_conversation["_archival_info"] = { + "archived_at": archival_data.get("archived_at"), + "archive_path": str(archive_file), + "compression_level": compressed_info.get("compression_level"), + "compression_ratio": compressed_info.get("metrics", {}).get( + "compression_ratio", 1.0 + ), + "version": version, + } + + self.logger.info(f"Restored conversation from {archive_path}") + return original_conversation + + except Exception as e: + self.logger.error( + f"Failed to restore conversation from {archive_path}: {e}" + ) + return None + + def list_archived( + self, + year: Optional[int] = None, + month: Optional[int] = None, + include_content: bool = False, + ) -> List[Dict[str, Any]]: + """ + List archived conversations with optional filtering. + + Args: + year: Optional year filter + month: Optional month filter (1-12) + include_content: Whether to include conversation content + + Returns: + List of archived conversation info + """ + archived_list = [] + + try: + # Determine search path + search_path = self.archival_root + if year: + search_path = search_path / str(year) + if month: + search_path = search_path / f"{month:02d}" + + if not search_path.exists(): + return [] + + # Scan for archive files + for archive_file in search_path.rglob("*.json.gz"): + try: + # Read minimal metadata without loading full content + with gzip.open(archive_file, "rt", encoding="utf-8") as f: + archival_data = json.load(f) + + conversation = archival_data.get("original_conversation", {}) + compressed = archival_data.get("compressed_conversation", {}) + + archive_info = { + "id": conversation.get("id"), + "title": conversation.get("title"), + "created_at": conversation.get("created_at"), + "archived_at": archival_data.get("archived_at"), + "archive_path": str(archive_file), + "compression_level": compressed.get("compression_level"), + "compression_ratio": compressed.get("metrics", {}).get( + "compression_ratio", 1.0 + ), + "version": archival_data.get("version"), + } + + if include_content: + archive_info["original_conversation"] = conversation + archive_info["compressed_conversation"] = compressed + + archived_list.append(archive_info) + + except Exception as e: + self.logger.error( + f"Failed to read archive file {archive_file}: {e}" + ) + continue + + # Sort by archived date (newest first) + archived_list.sort(key=lambda x: x.get("archived_at", ""), reverse=True) + return archived_list + + except Exception as e: + self.logger.error(f"Failed to list archived conversations: {e}") + return [] + + def delete_archive(self, archive_path: str) -> bool: + """ + Delete an archived conversation. + + Args: + archive_path: Path to archived file + + Returns: + True if deleted successfully, False otherwise + """ + try: + archive_file = Path(archive_path) + if archive_file.exists(): + archive_file.unlink() + self.logger.info(f"Deleted archive: {archive_path}") + return True + else: + self.logger.warning(f"Archive file not found: {archive_path}") + return False + except Exception as e: + self.logger.error(f"Failed to delete archive {archive_path}: {e}") + return False + + def get_archive_stats(self) -> Dict[str, Any]: + """ + Get statistics about archived conversations. + + Returns: + Dictionary with archive statistics + """ + try: + total_files = 0 + total_size = 0 + compression_levels = {} + years = set() + + for archive_file in self.archival_root.rglob("*.json.gz"): + try: + total_files += 1 + total_size += archive_file.stat().st_size + + # Extract year from path + path_parts = archive_file.parts + for i, part in enumerate(path_parts): + if part == str(self.archival_root.name) and i + 1 < len( + path_parts + ): + year_part = path_parts[i + 1] + if year_part.isdigit(): + years.add(year_part) + break + + # Read compression level without loading full content + with gzip.open(archive_file, "rt", encoding="utf-8") as f: + archival_data = json.load(f) + compressed = archival_data.get("compressed_conversation", {}) + level = compressed.get("compression_level", "unknown") + compression_levels[level] = compression_levels.get(level, 0) + 1 + + except Exception as e: + self.logger.error( + f"Failed to analyze archive file {archive_file}: {e}" + ) + continue + + return { + "total_archived_conversations": total_files, + "total_archive_size_bytes": total_size, + "total_archive_size_mb": round(total_size / (1024 * 1024), 2), + "compression_levels": compression_levels, + "years_with_archives": sorted(list(years)), + "archive_directory": str(self.archival_root), + } + + except Exception as e: + self.logger.error(f"Failed to get archive stats: {e}") + return {} + + def migrate_archives(self, from_version: str, to_version: str) -> int: + """ + Migrate archives from one version to another. + + Args: + from_version: Source version + to_version: Target version + + Returns: + Number of archives migrated + """ + # Placeholder for future migration functionality + self.logger.info( + f"Migration from {from_version} to {to_version} not yet implemented" + ) + return 0 diff --git a/src/memory/backup/retention.py b/src/memory/backup/retention.py new file mode 100644 index 0000000..d3f372e --- /dev/null +++ b/src/memory/backup/retention.py @@ -0,0 +1,540 @@ +""" +Smart retention policies for conversation preservation. + +Implements value-based retention scoring that keeps important +conversations longer while efficiently managing storage usage. +""" + +import logging +import re +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional, Tuple +from collections import defaultdict +import statistics + +import sys +import os + +sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) + +from memory.storage.sqlite_manager import SQLiteManager + + +class RetentionPolicy: + """ + Smart retention policy engine. + + Calculates conversation importance scores and determines + which conversations should be retained or compressed. + """ + + def __init__(self, sqlite_manager: SQLiteManager): + """ + Initialize retention policy. + + Args: + sqlite_manager: SQLite manager instance for data access + """ + self.db_manager = sqlite_manager + self.logger = logging.getLogger(__name__) + + # Retention policy parameters + self.important_threshold = 0.7 # Above this = retain full + self.preserve_threshold = 0.4 # Above this = lighter compression + self.user_marked_multiplier = 1.5 # Boost for user-marked important + + # Engagement scoring weights + self.weights = { + "message_count": 0.2, # More messages = higher engagement + "response_quality": 0.25, # Back-and-forth conversation + "topic_diversity": 0.15, # Multiple topics = important + "time_span": 0.1, # Longer duration = important + "user_marked": 0.2, # User explicitly marked important + "question_density": 0.1, # Questions = seeking information + } + + def calculate_importance_score(self, conversation: Dict[str, Any]) -> float: + """ + Calculate importance score for a conversation. + + Args: + conversation: Conversation data with messages and metadata + + Returns: + Importance score between 0.0 and 1.0 + """ + try: + messages = conversation.get("messages", []) + if not messages: + return 0.0 + + # Extract basic metrics + message_count = len(messages) + user_messages = [m for m in messages if m["role"] == "user"] + assistant_messages = [m for m in messages if m["role"] == "assistant"] + + # Calculate engagement metrics + scores = {} + + # 1. Message count score (normalized) + scores["message_count"] = min( + message_count / 20, 1.0 + ) # 20 messages = full score + + # 2. Response quality (back-and-forth ratio) + if len(user_messages) > 0 and len(assistant_messages) > 0: + ratio = min(len(assistant_messages), len(user_messages)) / max( + len(assistant_messages), len(user_messages) + ) + scores["response_quality"] = ratio # Close to 1.0 = good conversation + else: + scores["response_quality"] = 0.5 + + # 3. Topic diversity (variety in content) + scores["topic_diversity"] = self._calculate_topic_diversity(messages) + + # 4. Time span (conversation duration) + scores["time_span"] = self._calculate_time_span_score(messages) + + # 5. User marked important + metadata = conversation.get("metadata", {}) + user_marked = metadata.get("user_marked_important", False) + scores["user_marked"] = self.user_marked_multiplier if user_marked else 1.0 + + # 6. Question density (information seeking) + scores["question_density"] = self._calculate_question_density(user_messages) + + # Calculate weighted final score + final_score = 0.0 + for factor, weight in self.weights.items(): + final_score += scores.get(factor, 0.0) * weight + + # Normalize to 0-1 range + final_score = max(0.0, min(1.0, final_score)) + + self.logger.debug( + f"Importance score for {conversation.get('id')}: {final_score:.3f}" + ) + return final_score + + except Exception as e: + self.logger.error(f"Failed to calculate importance score: {e}") + return 0.5 # Default to neutral + + def _calculate_topic_diversity(self, messages: List[Dict[str, Any]]) -> float: + """Calculate topic diversity score from messages.""" + try: + # Simple topic-based diversity using keyword categories + topic_keywords = { + "technical": [ + "code", + "programming", + "algorithm", + "function", + "bug", + "debug", + "api", + "database", + ], + "personal": [ + "feel", + "think", + "opinion", + "prefer", + "like", + "personal", + "life", + ], + "work": [ + "project", + "task", + "deadline", + "meeting", + "team", + "work", + "job", + ], + "learning": [ + "learn", + "study", + "understand", + "explain", + "tutorial", + "help", + ], + "planning": ["plan", "schedule", "organize", "goal", "strategy"], + "creative": ["design", "create", "write", "art", "music", "story"], + } + + topic_counts = defaultdict(int) + total_content = "" + + for message in messages: + if message["role"] in ["user", "assistant"]: + content = message["content"].lower() + total_content += content + " " + + # Count topic occurrences + for topic, keywords in topic_keywords.items(): + for keyword in keywords: + if keyword in content: + topic_counts[topic] += 1 + + # Diversity = number of topics with significant presence + significant_topics = sum(1 for count in topic_counts.values() if count >= 2) + diversity_score = min(significant_topics / len(topic_keywords), 1.0) + + return diversity_score + + except Exception as e: + self.logger.error(f"Failed to calculate topic diversity: {e}") + return 0.5 + + def _calculate_time_span_score(self, messages: List[Dict[str, Any]]) -> float: + """Calculate time span score based on conversation duration.""" + try: + timestamps = [] + for message in messages: + if "timestamp" in message: + try: + ts = datetime.fromisoformat(message["timestamp"]) + timestamps.append(ts) + except: + continue + + if len(timestamps) < 2: + return 0.1 # Very short conversation + + duration = max(timestamps) - min(timestamps) + duration_hours = duration.total_seconds() / 3600 + + # Score based on duration (24 hours = full score) + return min(duration_hours / 24, 1.0) + + except Exception as e: + self.logger.error(f"Failed to calculate time span: {e}") + return 0.5 + + def _calculate_question_density(self, user_messages: List[Dict[str, Any]]) -> float: + """Calculate question density from user messages.""" + try: + if not user_messages: + return 0.0 + + question_count = 0 + total_words = 0 + + for message in user_messages: + content = message["content"] + # Count questions + question_marks = content.count("?") + question_words = len( + re.findall( + r"\b(how|what|when|where|why|which|who|can|could|would|should|is|are|do|does)\b", + content, + re.IGNORECASE, + ) + ) + question_count += question_marks + question_words + + # Count words + words = len(content.split()) + total_words += words + + if total_words == 0: + return 0.0 + + question_ratio = question_count / total_words + return min(question_ratio * 5, 1.0) # Normalize + + except Exception as e: + self.logger.error(f"Failed to calculate question density: {e}") + return 0.5 + + def should_retain_full( + self, conversation: Dict[str, Any], importance_score: Optional[float] = None + ) -> bool: + """ + Determine if conversation should be retained in full form. + + Args: + conversation: Conversation data + importance_score: Pre-calculated importance score (optional) + + Returns: + True if conversation should be retained full + """ + if importance_score is None: + importance_score = self.calculate_importance_score(conversation) + + # User explicitly marked important always retained + metadata = conversation.get("metadata", {}) + if metadata.get("user_marked_important", False): + return True + + # High importance score + if importance_score >= self.important_threshold: + return True + + # Recent important conversations (within 30 days) + created_at = conversation.get("created_at") + if created_at: + try: + conv_date = datetime.fromisoformat(created_at) + if (datetime.now() - conv_date).days <= 30 and importance_score >= 0.5: + return True + except: + pass + + return False + + def should_retain_compressed( + self, conversation: Dict[str, Any], importance_score: Optional[float] = None + ) -> Tuple[bool, str]: + """ + Determine if conversation should be compressed and to what level. + + Args: + conversation: Conversation data + importance_score: Pre-calculated importance score (optional) + + Returns: + Tuple of (should_compress, recommended_compression_level) + """ + if importance_score is None: + importance_score = self.calculate_importance_score(conversation) + + # Check if should retain full + if self.should_retain_full(conversation, importance_score): + return False, "full" + + # Determine compression level based on importance + if importance_score >= self.preserve_threshold: + # Important: lighter compression (key points) + return True, "key_points" + elif importance_score >= 0.2: + # Moderately important: summary compression + return True, "summary" + else: + # Low importance: metadata only + return True, "metadata" + + def update_retention_policy(self, policy_settings: Dict[str, Any]) -> None: + """ + Update retention policy parameters. + + Args: + policy_settings: Dictionary of policy parameter updates + """ + try: + if "important_threshold" in policy_settings: + self.important_threshold = float(policy_settings["important_threshold"]) + if "preserve_threshold" in policy_settings: + self.preserve_threshold = float(policy_settings["preserve_threshold"]) + if "user_marked_multiplier" in policy_settings: + self.user_marked_multiplier = float( + policy_settings["user_marked_multiplier"] + ) + if "weights" in policy_settings: + self.weights.update(policy_settings["weights"]) + + self.logger.info(f"Updated retention policy: {policy_settings}") + + except Exception as e: + self.logger.error(f"Failed to update retention policy: {e}") + + def get_retention_recommendations( + self, conversations: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """ + Get retention recommendations for multiple conversations. + + Args: + conversations: List of conversations to analyze + + Returns: + List of recommendations with scores and actions + """ + recommendations = [] + + for conversation in conversations: + try: + importance_score = self.calculate_importance_score(conversation) + should_compress, compression_level = self.should_retain_compressed( + conversation, importance_score + ) + + recommendation = { + "conversation_id": conversation.get("id"), + "title": conversation.get("title"), + "created_at": conversation.get("created_at"), + "importance_score": importance_score, + "should_compress": should_compress, + "recommended_level": compression_level, + "user_marked_important": conversation.get("metadata", {}).get( + "user_marked_important", False + ), + "message_count": len(conversation.get("messages", [])), + "retention_reason": self._get_retention_reason( + importance_score, compression_level + ), + } + + recommendations.append(recommendation) + + except Exception as e: + self.logger.error( + f"Failed to analyze conversation {conversation.get('id')}: {e}" + ) + continue + + # Sort by importance score (highest first) + recommendations.sort(key=lambda x: x["importance_score"], reverse=True) + return recommendations + + def _get_retention_reason( + self, importance_score: float, compression_level: str + ) -> str: + """Get human-readable reason for retention decision.""" + if compression_level == "full": + if importance_score >= self.important_threshold: + return "High importance - retained full" + else: + return "Recent conversation - retained full" + elif compression_level == "key_points": + return f"Moderate importance ({importance_score:.2f}) - key points retained" + elif compression_level == "summary": + return f"Standard importance ({importance_score:.2f}) - summary compression" + else: + return f"Low importance ({importance_score:.2f}) - metadata only" + + def mark_conversation_important( + self, conversation_id: str, important: bool = True + ) -> bool: + """ + Mark a conversation as user-important. + + Args: + conversation_id: ID of conversation to mark + important: Whether to mark as important (True) or not important (False) + + Returns: + True if marked successfully + """ + try: + conversation = self.db_manager.get_conversation( + conversation_id, include_messages=False + ) + if not conversation: + self.logger.error(f"Conversation {conversation_id} not found") + return False + + # Update metadata + metadata = conversation.get("metadata", {}) + metadata["user_marked_important"] = important + metadata["marked_important_at"] = datetime.now().isoformat() + + self.db_manager.update_conversation_metadata(conversation_id, metadata) + + self.logger.info( + f"Marked conversation {conversation_id} as {'important' if important else 'not important'}" + ) + return True + + except Exception as e: + self.logger.error( + f"Failed to mark conversation {conversation_id} important: {e}" + ) + return False + + def get_important_conversations(self) -> List[Dict[str, Any]]: + """ + Get all user-marked important conversations. + + Returns: + List of important conversations + """ + try: + recent_conversations = self.db_manager.get_recent_conversations(limit=1000) + + important_conversations = [] + for conversation in recent_conversations: + full_conversation = self.db_manager.get_conversation( + conversation["id"], include_messages=True + ) + if full_conversation: + metadata = full_conversation.get("metadata", {}) + if metadata.get("user_marked_important", False): + important_conversations.append(full_conversation) + + return important_conversations + + except Exception as e: + self.logger.error(f"Failed to get important conversations: {e}") + return [] + + def get_retention_stats(self) -> Dict[str, Any]: + """ + Get retention policy statistics. + + Returns: + Dictionary with retention statistics + """ + try: + recent_conversations = self.db_manager.get_recent_conversations(limit=500) + + stats = { + "total_conversations": len(recent_conversations), + "important_marked": 0, + "importance_distribution": {"high": 0, "medium": 0, "low": 0}, + "average_importance": 0.0, + "compression_recommendations": { + "full": 0, + "key_points": 0, + "summary": 0, + "metadata": 0, + }, + } + + importance_scores = [] + + for conv_data in recent_conversations: + conversation = self.db_manager.get_conversation( + conv_data["id"], include_messages=True + ) + if not conversation: + continue + + importance_score = self.calculate_importance_score(conversation) + importance_scores.append(importance_score) + + # Check if user marked important + metadata = conversation.get("metadata", {}) + if metadata.get("user_marked_important", False): + stats["important_marked"] += 1 + + # Categorize importance + if importance_score >= self.important_threshold: + stats["importance_distribution"]["high"] += 1 + elif importance_score >= self.preserve_threshold: + stats["importance_distribution"]["medium"] += 1 + else: + stats["importance_distribution"]["low"] += 1 + + # Compression recommendations + should_compress, level = self.should_retain_compressed( + conversation, importance_score + ) + if level in stats["compression_recommendations"]: + stats["compression_recommendations"][level] += 1 + else: + stats["compression_recommendations"]["full"] += 1 + + if importance_scores: + stats["average_importance"] = statistics.mean(importance_scores) + + return stats + + except Exception as e: + self.logger.error(f"Failed to get retention stats: {e}") + return {}