import os import re import random import time from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options def load_scraped_urls(file_path): """ Load already scraped URLs from a file. """ if os.path.exists(file_path): with open(file_path, "r", encoding="utf-8") as f: return set(line.strip() for line in f) return set() def save_scraped_url(file_path, url): """ Append a scraped URL to the file. """ with open(file_path, "a", encoding="utf-8") as f: f.write(url + "\n") def scrape_deviantart(query, max_pages): output_dir = "data/raw/deviantart" scraped_urls_file = "scraped_urls.txt" if not os.path.exists(output_dir): os.makedirs(output_dir) # Load previously scraped URLs scraped_urls = load_scraped_urls(scraped_urls_file) # Configure Selenium WebDriver options = Options() options.binary_location = "C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe" # noqa options.add_argument("--disable-gpu") options.add_argument("--no-sandbox") options.add_argument("--incognito") service = Service("F:\\chromedriver\\chromedriver.exe") driver = webdriver.Chrome(service=service, options=options) try: base_url = "https://www.deviantart.com/search/?q=" driver.get(base_url + query) for page in range(1, max_pages + 1): print(f"Scraping page {page}...") time.sleep(random.uniform(5, 10)) # Detect throttling if detect_throttling(driver): print("Throttling detected. Retrying after delay...") time.sleep(random.uniform(60, 120)) driver.refresh() continue grid_rows = driver.find_elements(By.XPATH, "//div[@data-testid='grid-row']") print(f"DEBUG: Found {len(grid_rows)} grid rows on the page.") if not grid_rows: print("No more stories found.") break for row in grid_rows: story_elements = row.find_elements(By.XPATH, ".//a[contains(@href, '/art/')]") # noqa for link in story_elements: story_url = link.get_attribute("href") # Skip #comments URLs and already scraped URLs if not story_url or story_url.endswith("#comments") or story_url in scraped_urls: # noqa print(f"Skipping URL: {story_url}") continue # Open the story in a new tab driver.execute_script("window.open('{}', '_blank');".format(story_url)) # noqa driver.switch_to.window(driver.window_handles[-1]) # Scrape the story if scrape_story(driver, story_url, output_dir): scraped_urls.add(story_url) save_scraped_url(scraped_urls_file, story_url) # Close the tab and return to the main tab driver.close() driver.switch_to.window(driver.window_handles[0]) time.sleep(random.uniform(1, 3)) # Navigate to the next page try: next_button = driver.find_element(By.XPATH, "//a[contains(@href, 'cursor=') and contains(., 'Next')]") # noqa driver.execute_script("arguments[0].scrollIntoView(true);", next_button) time.sleep(random.uniform(2, 5)) next_button.click() except Exception as e: print(f"No more pages or navigation error: {e}") break finally: driver.quit() def detect_throttling(driver): """ Detect if throttling has occurred by checking for specific keywords. """ try: if "captcha" in driver.page_source.lower() or "rate limit" in driver.page_source.lower(): # noqa return True except Exception as e: print(f"Error during throttling detection: {e}") return False def scrape_story(driver, story_url, output_dir): """ Scrapes and saves a single story, skipping if the file already exists. """ try: time.sleep(random.uniform(5, 10)) title_element = driver.find_element(By.TAG_NAME, "h1") story_element = driver.find_element(By.XPATH, "//div[@data-editor-viewer='1']") title = title_element.text.strip() story = story_element.text.strip() if title and story: filename = re.sub(r"[^a-zA-Z0-9_]+", "_", title) + ".txt" filepath = os.path.join(output_dir, filename) if os.path.exists(filepath): print(f"Skipping existing story: {title}") return False with open(filepath, "w", encoding="utf-8") as f: f.write(story) print(f"Saved story: {title}") return True except Exception as e: print(f"Error processing story {story_url}: {e}") return False