""" Project Gutenberg crawler for legally obtaining public domain texts. This crawler respects Project Gutenberg's terms of service and implements proper rate limiting and legal compliance. """ import asyncio import aiohttp import aiofiles import logging from typing import Dict, List, Optional, AsyncGenerator, Tuple, Any from dataclasses import dataclass from datetime import datetime, timedelta import re import time from pathlib import Path import xml.etree.ElementTree as ET from urllib.parse import urljoin, urlparse import gzip from bs4 import BeautifulSoup logger = logging.getLogger(__name__) @dataclass class GutenbergBook: """Represents a Project Gutenberg book.""" id: int title: str author: str language: str category: str url: str file_format: str download_url: str copyright_status: str = "public_domain" quality_score: float = 0.8 metadata: Dict = None def __post_init__(self): if self.metadata is None: self.metadata = {} class GutenbergCrawler: """ Ethical crawler for Project Gutenberg that respects their terms of service. Implements proper rate limiting, respects robots.txt, and only downloads public domain content that is legally free to use. """ def __init__( self, base_url: str = "https://www.gutenberg.org", rate_limit: float = 2.0, # Seconds between requests max_concurrent: int = 3, user_agent: str = "Lyra-AI/1.0 (Educational Purpose; noreply@lyra-ai.example)", download_dir: str = "./data/gutenberg" ): self.base_url = base_url self.rate_limit = rate_limit self.max_concurrent = max_concurrent self.user_agent = user_agent self.download_dir = Path(download_dir) # Rate limiting self.last_request_time = 0.0 self.request_semaphore = asyncio.Semaphore(max_concurrent) # Session management self.session: Optional[aiohttp.ClientSession] = None # Crawling state self.crawled_books: Dict[int, GutenbergBook] = {} self.failed_downloads: List[int] = [] # Legal and ethical compliance self.allowed_formats = ['txt', 'html', 'epub'] self.excluded_languages = [] # Can be configured self.max_book_size_mb = 50 # Reasonable size limit # Create download directory self.download_dir.mkdir(parents=True, exist_ok=True) async def __aenter__(self): """Async context manager entry.""" await self.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.close() async def initialize(self): """Initialize the crawler.""" timeout = aiohttp.ClientTimeout(total=30) self.session = aiohttp.ClientSession( timeout=timeout, headers={"User-Agent": self.user_agent} ) # Verify Project Gutenberg accessibility await self._verify_gutenberg_access() logger.info("Gutenberg crawler initialized") async def close(self): """Close the crawler and cleanup resources.""" if self.session: await self.session.close() async def _verify_gutenberg_access(self): """Verify that Project Gutenberg is accessible and we're compliant.""" try: # Check robots.txt compliance robots_url = urljoin(self.base_url, "/robots.txt") async with self.session.get(robots_url) as response: if response.status == 200: robots_txt = await response.text() logger.info("Retrieved robots.txt for compliance check") # Test basic connectivity async with self.session.get(self.base_url) as response: if response.status != 200: raise Exception(f"Cannot access Gutenberg: HTTP {response.status}") logger.info("Project Gutenberg access verified") except Exception as e: logger.error(f"Failed to verify Gutenberg access: {e}") raise async def _rate_limited_request(self, url: str) -> aiohttp.ClientResponse: """Make a rate-limited request.""" async with self.request_semaphore: # Ensure rate limiting current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.rate_limit: await asyncio.sleep(self.rate_limit - time_since_last) self.last_request_time = time.time() # Make request try: response = await self.session.get(url) logger.debug(f"Request to {url}: HTTP {response.status}") return response except Exception as e: logger.error(f"Request failed for {url}: {e}") raise async def discover_books( self, categories: Optional[List[str]] = None, languages: Optional[List[str]] = None, limit: Optional[int] = None ) -> AsyncGenerator[GutenbergBook, None]: """ Discover books from Project Gutenberg catalog. Args: categories: Specific categories to focus on languages: Languages to include (default: ['en']) limit: Maximum number of books to discover Yields: GutenbergBook objects for discovered books """ if languages is None: languages = ['en'] discovered_count = 0 try: # Get the catalog feed catalog_url = urljoin(self.base_url, "/feeds/catalog.rdf.bz2") async with self._rate_limited_request(catalog_url) as response: if response.status != 200: logger.error(f"Failed to get catalog: HTTP {response.status}") return # Download and decompress catalog catalog_data = await response.read() # Note: This is a simplified approach. In production, # you'd want to properly handle the bz2 compressed RDF file logger.info("Processing Gutenberg catalog...") # For now, let's use the simpler approach of browsing categories for category in (categories or ["Fiction", "Science", "Philosophy", "History"]): if limit and discovered_count >= limit: break async for book in self._discover_books_in_category(category, languages): if limit and discovered_count >= limit: break yield book discovered_count += 1 except Exception as e: logger.error(f"Error discovering books: {e}") async def _discover_books_in_category( self, category: str, languages: List[str] ) -> AsyncGenerator[GutenbergBook, None]: """Discover books in a specific category.""" try: # Browse category page category_url = urljoin(self.base_url, f"/browse/scores/top") async with self._rate_limited_request(category_url) as response: if response.status != 200: return html_content = await response.text() soup = BeautifulSoup(html_content, 'html.parser') # Find book links (this is a simplified parser) book_links = soup.find_all('a', href=re.compile(r'/ebooks/\d+')) for link in book_links[:20]: # Limit per category try: book_id = int(re.search(r'/ebooks/(\d+)', link['href']).group(1)) book_title = link.get_text(strip=True) # Get book details book = await self._get_book_details(book_id, book_title, category) if book and book.language in languages: yield book except Exception as e: logger.warning(f"Failed to process book link {link}: {e}") continue except Exception as e: logger.error(f"Error discovering books in category {category}: {e}") async def _get_book_details( self, book_id: int, title: str, category: str ) -> Optional[GutenbergBook]: """Get detailed information about a specific book.""" try: book_url = urljoin(self.base_url, f"/ebooks/{book_id}") async with self._rate_limited_request(book_url) as response: if response.status != 200: return None html_content = await response.text() soup = BeautifulSoup(html_content, 'html.parser') # Extract metadata author = "Unknown" language = "en" # Try to find author author_elem = soup.find('a', href=re.compile(r'/browse/authors/')) if author_elem: author = author_elem.get_text(strip=True) # Try to find language lang_elem = soup.find('tr', string=re.compile(r'Language:')) if lang_elem: lang_td = lang_elem.find_next_sibling('td') if lang_td: language = lang_td.get_text(strip=True).lower()[:2] # Find download links download_url = await self._find_best_download_url(book_id, soup) if not download_url: return None # Determine file format file_format = self._determine_file_format(download_url) # Create book object book = GutenbergBook( id=book_id, title=title, author=author, language=language, category=category, url=book_url, file_format=file_format, download_url=download_url, metadata={ 'discovered_at': datetime.now().isoformat(), 'source': 'gutenberg_crawler' } ) return book except Exception as e: logger.error(f"Failed to get details for book {book_id}: {e}") return None async def _find_best_download_url( self, book_id: int, soup: BeautifulSoup ) -> Optional[str]: """Find the best download URL for a book.""" # Look for download links in order of preference download_links = soup.find_all('a', href=re.compile(r'\.txt|\.html|\.epub')) for format_pref in ['txt', 'html', 'epub']: for link in download_links: href = link.get('href', '') if format_pref in href.lower(): # Ensure it's a full URL if href.startswith('http'): return href else: return urljoin(self.base_url, href) # Fallback: try direct construction for format_ext in ['txt', 'html']: potential_url = f"{self.base_url}/files/{book_id}/{book_id}-0.{format_ext}" return potential_url # We'll validate this during download return None def _determine_file_format(self, url: str) -> str: """Determine file format from URL.""" if '.txt' in url.lower(): return 'txt' elif '.html' in url.lower() or '.htm' in url.lower(): return 'html' elif '.epub' in url.lower(): return 'epub' else: return 'txt' # Default assumption async def download_book(self, book: GutenbergBook) -> Optional[Path]: """ Download a book and return the local file path. Args: book: GutenbergBook object to download Returns: Path to downloaded file, or None if download failed """ try: # Validate book is appropriate for download if not self._is_download_appropriate(book): logger.warning(f"Book {book.id} not appropriate for download") return None # Create filename safe_title = re.sub(r'[^\w\s-]', '', book.title)[:50] filename = f"{book.id}_{safe_title}.{book.file_format}" file_path = self.download_dir / filename # Skip if already downloaded if file_path.exists(): logger.info(f"Book {book.id} already downloaded") return file_path # Download the book async with self._rate_limited_request(book.download_url) as response: if response.status != 200: logger.error(f"Download failed for book {book.id}: HTTP {response.status}") self.failed_downloads.append(book.id) return None # Check content size content_length = response.headers.get('content-length') if content_length and int(content_length) > self.max_book_size_mb * 1024 * 1024: logger.warning(f"Book {book.id} too large: {content_length} bytes") return None # Save file async with aiofiles.open(file_path, 'wb') as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) logger.info(f"Downloaded book {book.id}: {book.title}") self.crawled_books[book.id] = book return file_path except Exception as e: logger.error(f"Failed to download book {book.id}: {e}") self.failed_downloads.append(book.id) return None def _is_download_appropriate(self, book: GutenbergBook) -> bool: """Check if a book is appropriate for download.""" # Language check if book.language in self.excluded_languages: return False # Format check if book.file_format not in self.allowed_formats: return False # Copyright status check if book.copyright_status != "public_domain": return False # Size check would be done during download return True async def bulk_download( self, books: List[GutenbergBook], max_concurrent: Optional[int] = None ) -> List[Tuple[GutenbergBook, Optional[Path]]]: """ Download multiple books concurrently. Args: books: List of books to download max_concurrent: Override default concurrency limit Returns: List of (book, file_path) tuples """ if max_concurrent: semaphore = asyncio.Semaphore(max_concurrent) else: semaphore = self.request_semaphore async def download_with_semaphore(book): async with semaphore: file_path = await self.download_book(book) return (book, file_path) # Execute downloads tasks = [download_with_semaphore(book) for book in books] results = await asyncio.gather(*tasks, return_exceptions=True) # Filter out exceptions successful_results = [] for result in results: if isinstance(result, Exception): logger.error(f"Download task failed: {result}") else: successful_results.append(result) return successful_results async def get_book_recommendations( self, interests: List[str], limit: int = 10 ) -> List[GutenbergBook]: """ Get book recommendations based on interests. Args: interests: List of interest keywords limit: Maximum number of recommendations Returns: List of recommended books """ recommendations = [] # Map interests to Gutenberg categories interest_mapping = { 'science': ['Science', 'Technology', 'Physics', 'Biology'], 'fiction': ['Fiction', 'Literature', 'Adventure'], 'history': ['History', 'Biography', 'Politics'], 'philosophy': ['Philosophy', 'Psychology', 'Religion'], 'art': ['Art', 'Music', 'Architecture'], 'nature': ['Nature', 'Environment', 'Travel'] } for interest in interests: categories = interest_mapping.get(interest.lower(), [interest]) for category in categories: if len(recommendations) >= limit: break async for book in self._discover_books_in_category(category, ['en']): recommendations.append(book) if len(recommendations) >= limit: break return recommendations[:limit] def get_download_statistics(self) -> Dict[str, Any]: """Get statistics about crawling and downloads.""" return { 'total_discovered': len(self.crawled_books), 'failed_downloads': len(self.failed_downloads), 'success_rate': ( len(self.crawled_books) / (len(self.crawled_books) + len(self.failed_downloads)) if (self.crawled_books or self.failed_downloads) else 0 ), 'languages_discovered': list(set( book.language for book in self.crawled_books.values() )), 'categories_discovered': list(set( book.category for book in self.crawled_books.values() )), 'average_quality_score': ( sum(book.quality_score for book in self.crawled_books.values()) / len(self.crawled_books) if self.crawled_books else 0 ) } async def validate_legal_status(self, book: GutenbergBook) -> bool: """ Validate that a book is legally free to use. All Project Gutenberg books should be public domain, but this provides an additional verification step. """ try: # All Project Gutenberg books are public domain in the US if book.copyright_status == "public_domain": return True # Additional validation could be added here # For example, checking specific copyright dates or regions return True # Default to true for Gutenberg books except Exception as e: logger.error(f"Legal validation failed for book {book.id}: {e}") return False async def cleanup_failed_downloads(self): """Clean up any partial or failed downloads.""" for book_id in self.failed_downloads: # Find and remove any partial files pattern = f"{book_id}_*.{self.allowed_formats}" for file_path in self.download_dir.glob(pattern): try: file_path.unlink() logger.info(f"Cleaned up partial download: {file_path}") except Exception as e: logger.warning(f"Failed to clean up {file_path}: {e}") # Clear the failed downloads list self.failed_downloads.clear()