feat(04-03): implement progressive compression engine

- Added CompressionEngine class with 4-tier age-based compression
- 7 days: Full content (no compression)
- 30 days: Key points extraction (70% retention)
- 90 days: Brief summary (40% retention)
- 365+ days: Metadata only
- Hybrid extractive-abstractive summarization with fallbacks
- Compression quality metrics and validation
- Support for missing dependencies (NLTK/transformers)
- Added transformers and nltk to requirements.txt
This commit is contained in:
Mai Development
2026-01-27 23:42:20 -05:00
parent bb7205223d
commit 017df5466d
2 changed files with 609 additions and 1 deletions

View File

@@ -8,4 +8,6 @@ semgrep>=1.99
docker>=7.0.0
sqlite-vec>=0.1.0
numpy>=1.24.0
sentence-transformers>=2.2.2
sentence-transformers>=2.2.2
transformers>=4.21.0
nltk>=3.8

View File

@@ -0,0 +1,606 @@
"""
Progressive conversation compression engine.
This module provides intelligent compression of conversations based on age,
preserving important information while reducing storage requirements.
"""
import re
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Union
from enum import Enum
from dataclasses import dataclass
try:
from transformers import pipeline as hf_pipeline
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
hf_pipeline = None
try:
import nltk
from nltk.tokenize import sent_tokenize
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
NLTK_AVAILABLE = True
except ImportError:
NLTK_AVAILABLE = False
nltk = None
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
from models.conversation import Message, MessageRole, ConversationMetadata
class CompressionLevel(Enum):
"""Compression levels based on conversation age."""
FULL = "full" # 0-7 days: No compression
KEY_POINTS = "key_points" # 7-30 days: 70% retention
SUMMARY = "summary" # 30-90 days: 40% retention
METADATA = "metadata" # 90+ days: Metadata only
@dataclass
class CompressionMetrics:
"""Metrics for compression quality assessment."""
original_length: int
compressed_length: int
compression_ratio: float
information_retention_score: float
quality_score: float
@dataclass
class CompressedConversation:
"""Represents a compressed conversation."""
original_id: str
compression_level: CompressionLevel
compressed_at: datetime
original_created_at: datetime
content: Union[str, Dict[str, Any]]
metadata: Dict[str, Any]
metrics: CompressionMetrics
class CompressionEngine:
"""
Progressive conversation compression engine.
Compresses conversations based on age using hybrid extractive-abstractive
summarization while preserving important information.
"""
def __init__(self, model_name: str = "facebook/bart-large-cnn"):
"""
Initialize compression engine.
Args:
model_name: Name of the summarization model to use
"""
self.model_name = model_name
self.logger = logging.getLogger(__name__)
self._summarizer = None
self._initialize_nltk()
def _initialize_nltk(self) -> None:
"""Initialize NLTK components for extractive summarization."""
if not NLTK_AVAILABLE:
self.logger.warning("NLTK not available - using fallback methods")
return
try:
# Download required NLTK data
import ssl
try:
_create_unverified_https_context = ssl._create_unverified_https_context
except AttributeError:
pass
else:
ssl._create_default_https_context = _create_unverified_https_context
nltk.download("punkt", quiet=True)
nltk.download("stopwords", quiet=True)
self.logger.debug("NLTK components initialized")
except Exception as e:
self.logger.warning(f"Failed to initialize NLTK: {e}")
def _get_summarizer(self):
"""Lazy initialization of summarization pipeline."""
if TRANSFORMERS_AVAILABLE and self._summarizer is None:
try:
self._summarizer = hf_pipeline(
"summarization",
model=self.model_name,
device=-1, # Use CPU by default
)
self.logger.debug(f"Initialized summarizer: {self.model_name}")
except Exception as e:
self.logger.error(f"Failed to initialize summarizer: {e}")
self._summarizer = None
return self._summarizer
def get_compression_level(self, age_days: int) -> CompressionLevel:
"""
Determine compression level based on conversation age.
Args:
age_days: Age of conversation in days
Returns:
CompressionLevel based on age
"""
if age_days < 7:
return CompressionLevel.FULL
elif age_days < 30:
return CompressionLevel.KEY_POINTS
elif age_days < 90:
return CompressionLevel.SUMMARY
else:
return CompressionLevel.METADATA
def extract_key_points(self, conversation: Dict[str, Any]) -> str:
"""
Extract key points from conversation using extractive methods.
Args:
conversation: Conversation data with messages
Returns:
String containing key points
"""
messages = conversation.get("messages", [])
if not messages:
return ""
# Combine all user and assistant messages
full_text = ""
for msg in messages:
if msg["role"] in ["user", "assistant"]:
full_text += msg["content"] + "\n"
if not full_text.strip():
return ""
# Extractive summarization using sentence importance
if not NLTK_AVAILABLE:
# Simple fallback: split by sentences and take first 70%
sentences = full_text.split(". ")
if len(sentences) <= 3:
return full_text.strip()
num_sentences = max(3, int(len(sentences) * 0.7))
key_points = ". ".join(sentences[:num_sentences])
if not key_points.endswith("."):
key_points += "."
return key_points.strip()
try:
sentences = sent_tokenize(full_text)
if len(sentences) <= 3:
return full_text.strip()
# Simple scoring based on sentence length and keywords
scored_sentences = []
stop_words = set(stopwords.words("english"))
for i, sentence in enumerate(sentences):
words = word_tokenize(sentence.lower())
content_words = [
w for w in words if w.isalpha() and w not in stop_words
]
# Score based on length, position, and content word ratio
length_score = min(len(words) / 20, 1.0) # Normalize to max 20 words
position_score = (len(sentences) - i) / len(
sentences
) # Earlier sentences get higher score
content_score = len(content_words) / max(len(words), 1)
total_score = (
length_score * 0.3 + position_score * 0.3 + content_score * 0.4
)
scored_sentences.append((sentence, total_score))
# Select top sentences (70% retention)
scored_sentences.sort(key=lambda x: x[1], reverse=True)
num_sentences = max(3, int(len(sentences) * 0.7))
key_points = " ".join([s[0] for s in scored_sentences[:num_sentences]])
return key_points.strip()
except Exception as e:
self.logger.error(f"Extractive summarization failed: {e}")
return full_text[:500] + "..." if len(full_text) > 500 else full_text
def generate_summary(
self, conversation: Dict[str, Any], target_ratio: float = 0.4
) -> str:
"""
Generate abstractive summary using transformer model.
Args:
conversation: Conversation data with messages
target_ratio: Target compression ratio (e.g., 0.4 = 40% retention)
Returns:
Generated summary string
"""
messages = conversation.get("messages", [])
if not messages:
return ""
# Combine messages into a single text
full_text = ""
for msg in messages:
if msg["role"] in ["user", "assistant"]:
full_text += f"{msg['role']}: {msg['content']}\n"
if not full_text.strip():
return ""
# Try abstractive summarization
summarizer = self._get_summarizer()
if summarizer:
try:
# Calculate target length based on ratio
max_length = max(50, int(len(full_text.split()) * target_ratio))
min_length = max(25, int(max_length * 0.5))
result = summarizer(
full_text,
max_length=max_length,
min_length=min_length,
do_sample=False,
)
if result and len(result) > 0:
summary = result[0].get("summary_text", "")
if summary:
return summary.strip()
except Exception as e:
self.logger.error(f"Abstractive summarization failed: {e}")
# Fallback to extractive method
return self.extract_key_points(conversation)
def extract_metadata_only(self, conversation: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract only metadata from conversation.
Args:
conversation: Conversation data
Returns:
Dictionary with conversation metadata
"""
messages = conversation.get("messages", [])
# Extract key metadata
metadata = {
"id": conversation.get("id"),
"title": conversation.get("title"),
"created_at": conversation.get("created_at"),
"updated_at": conversation.get("updated_at"),
"total_messages": len(messages),
"session_id": conversation.get("session_id"),
"topics": self._extract_topics(messages),
"key_entities": self._extract_entities(messages),
"summary_stats": self._calculate_summary_stats(messages),
}
return metadata
def _extract_topics(self, messages: List[Dict[str, Any]]) -> List[str]:
"""Extract main topics from conversation."""
topics = set()
# Simple keyword-based topic extraction
topic_keywords = {
"technical": [
"code",
"programming",
"algorithm",
"function",
"bug",
"debug",
],
"personal": ["feel", "think", "opinion", "prefer", "like"],
"work": ["project", "task", "deadline", "meeting", "team"],
"learning": ["learn", "study", "understand", "explain", "tutorial"],
"planning": ["plan", "schedule", "organize", "goal", "strategy"],
}
for msg in messages:
if msg["role"] in ["user", "assistant"]:
content = msg["content"].lower()
for topic, keywords in topic_keywords.items():
if isinstance(keywords, str):
keywords = [keywords]
if any(keyword in content for keyword in keywords):
topics.add(topic)
return list(topics)
def _extract_entities(self, messages: List[Dict[str, Any]]) -> List[str]:
"""Extract key entities from conversation."""
entities = set()
# Simple pattern-based entity extraction
patterns = {
"emails": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"urls": r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+",
"file_paths": r'\b[a-zA-Z]:\\[^<>:"|?*\n]*\b|\b/[^<>:"|?*\n]*\b',
}
for msg in messages:
if msg["role"] in ["user", "assistant"]:
content = msg["content"]
for entity_type, pattern in patterns.items():
matches = re.findall(pattern, content)
entities.update(matches)
return list(entities)
def _calculate_summary_stats(
self, messages: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Calculate summary statistics for conversation."""
user_messages = [m for m in messages if m["role"] == "user"]
assistant_messages = [m for m in messages if m["role"] == "assistant"]
total_tokens = sum(m.get("token_count", 0) for m in messages)
avg_importance = sum(m.get("importance_score", 0.5) for m in messages) / max(
len(messages), 1
)
return {
"user_message_count": len(user_messages),
"assistant_message_count": len(assistant_messages),
"total_tokens": total_tokens,
"average_importance_score": avg_importance,
"duration_days": self._calculate_conversation_duration(messages),
}
def _calculate_conversation_duration(self, messages: List[Dict[str, Any]]) -> int:
"""Calculate conversation duration in days."""
if not messages:
return 0
timestamps = []
for msg in messages:
if "timestamp" in msg:
try:
ts = datetime.fromisoformat(msg["timestamp"])
timestamps.append(ts)
except:
continue
if len(timestamps) < 2:
return 0
duration = max(timestamps) - min(timestamps)
return max(0, duration.days)
def compress_by_age(self, conversation: Dict[str, Any]) -> CompressedConversation:
"""
Compress conversation based on its age.
Args:
conversation: Conversation data to compress
Returns:
CompressedConversation with appropriate compression level
"""
# Calculate age
created_at = conversation.get("created_at")
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at)
elif created_at is None:
created_at = datetime.now()
age_days = (datetime.now() - created_at).days
compression_level = self.get_compression_level(age_days)
# Get original content length
original_content = json.dumps(conversation, ensure_ascii=False)
original_length = len(original_content)
# Apply compression based on level
if compression_level == CompressionLevel.FULL:
compressed_content = conversation
elif compression_level == CompressionLevel.KEY_POINTS:
compressed_content = self.extract_key_points(conversation)
elif compression_level == CompressionLevel.SUMMARY:
compressed_content = self.generate_summary(conversation, target_ratio=0.4)
else: # METADATA
compressed_content = self.extract_metadata_only(conversation)
# Calculate compression metrics
compressed_content_str = (
json.dumps(compressed_content, ensure_ascii=False)
if not isinstance(compressed_content, str)
else compressed_content
)
compressed_length = len(compressed_content_str)
compression_ratio = compressed_length / max(original_length, 1)
# Calculate information retention score
retention_score = self._calculate_retention_score(compression_level)
quality_score = self._calculate_quality_score(
compressed_content, conversation, compression_level
)
metrics = CompressionMetrics(
original_length=original_length,
compressed_length=compressed_length,
compression_ratio=compression_ratio,
information_retention_score=retention_score,
quality_score=quality_score,
)
return CompressedConversation(
original_id=conversation.get("id", "unknown"),
compression_level=compression_level,
compressed_at=datetime.now(),
original_created_at=created_at,
content=compressed_content,
metadata={
"compression_method": "hybrid_extractive_abstractive",
"age_days": age_days,
"original_tokens": conversation.get("total_tokens", 0),
},
metrics=metrics,
)
def _calculate_retention_score(self, compression_level: CompressionLevel) -> float:
"""Calculate information retention score based on compression level."""
retention_map = {
CompressionLevel.FULL: 1.0,
CompressionLevel.KEY_POINTS: 0.7,
CompressionLevel.SUMMARY: 0.4,
CompressionLevel.METADATA: 0.1,
}
return retention_map.get(compression_level, 0.1)
def _calculate_quality_score(
self,
compressed_content: Union[str, Dict[str, Any]],
original: Dict[str, Any],
level: CompressionLevel,
) -> float:
"""
Calculate quality score for compressed content.
Args:
compressed_content: The compressed content
original: Original conversation
level: Compression level used
Returns:
Quality score between 0.0 and 1.0
"""
try:
# Base score from compression level
base_scores = {
CompressionLevel.FULL: 1.0,
CompressionLevel.KEY_POINTS: 0.8,
CompressionLevel.SUMMARY: 0.7,
CompressionLevel.METADATA: 0.5,
}
base_score = base_scores.get(level, 0.5)
# Adjust based on content quality
if isinstance(compressed_content, str):
# Check for common quality indicators
content_length = len(compressed_content)
if content_length == 0:
return 0.0
# Penalize very short content
if level in [CompressionLevel.KEY_POINTS, CompressionLevel.SUMMARY]:
if content_length < 50:
base_score *= 0.5
elif content_length < 100:
base_score *= 0.8
# Check for coherent structure
sentences = (
compressed_content.count(".")
+ compressed_content.count("!")
+ compressed_content.count("?")
)
if sentences > 0:
coherence_score = min(
sentences / 10, 1.0
) # More sentences = more coherent
base_score = (base_score + coherence_score) / 2
return max(0.0, min(1.0, base_score))
except Exception as e:
self.logger.error(f"Error calculating quality score: {e}")
return 0.5
def decompress(self, compressed: CompressedConversation) -> Dict[str, Any]:
"""
Decompress compressed conversation to summary view.
Args:
compressed: Compressed conversation to decompress
Returns:
Summary view of the conversation
"""
if compressed.compression_level == CompressionLevel.FULL:
# Return full conversation if no compression
return (
compressed.content
if isinstance(compressed.content, dict)
else {"summary": compressed.content}
)
# Create summary view for compressed conversations
summary = {
"id": compressed.original_id,
"compression_level": compressed.compression_level.value,
"compressed_at": compressed.compressed_at.isoformat(),
"original_created_at": compressed.original_created_at.isoformat(),
"metadata": compressed.metadata,
"metrics": {
"compression_ratio": compressed.metrics.compression_ratio,
"information_retention_score": compressed.metrics.information_retention_score,
"quality_score": compressed.metrics.quality_score,
},
}
if compressed.compression_level == CompressionLevel.METADATA:
# Content is already metadata
if isinstance(compressed.content, dict):
summary["metadata"].update(compressed.content)
summary["summary"] = "Metadata only - full content compressed due to age"
else:
# Content is key points or summary text
summary["summary"] = compressed.content
return summary
def batch_compress_conversations(
self, conversations: List[Dict[str, Any]]
) -> List[CompressedConversation]:
"""
Compress multiple conversations efficiently.
Args:
conversations: List of conversations to compress
Returns:
List of compressed conversations
"""
compressed_list = []
for conversation in conversations:
try:
compressed = self.compress_by_age(conversation)
compressed_list.append(compressed)
except Exception as e:
self.logger.error(
f"Failed to compress conversation {conversation.get('id', 'unknown')}: {e}"
)
continue
self.logger.info(
f"Compressed {len(compressed_list)}/{len(conversations)} conversations successfully"
)
return compressed_list