From 78e67b4321e5ebce89a4d227cb7014abec600f50 Mon Sep 17 00:00:00 2001 From: Dan Date: Sat, 21 Dec 2024 08:58:12 -0500 Subject: [PATCH] moved the scrapper into a better location --- scripts/deviantart_scraper.py | 148 ++++++++++++++++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 scripts/deviantart_scraper.py diff --git a/scripts/deviantart_scraper.py b/scripts/deviantart_scraper.py new file mode 100644 index 0000000..f400644 --- /dev/null +++ b/scripts/deviantart_scraper.py @@ -0,0 +1,148 @@ +import os +import re +import random +import time +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.chrome.options import Options + + +def load_scraped_urls(file_path): + """ + Load already scraped URLs from a file. + """ + if os.path.exists(file_path): + with open(file_path, "r", encoding="utf-8") as f: + return set(line.strip() for line in f) + return set() + + +def save_scraped_url(file_path, url): + """ + Append a scraped URL to the file. + """ + with open(file_path, "a", encoding="utf-8") as f: + f.write(url + "\n") + + +def scrape_deviantart(query, max_pages): + output_dir = "data/raw/deviantart" + scraped_urls_file = "scraped_urls.txt" + + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + # Load previously scraped URLs + scraped_urls = load_scraped_urls(scraped_urls_file) + + # Configure Selenium WebDriver + options = Options() + options.binary_location = "C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe" # noqa + options.add_argument("--disable-gpu") + options.add_argument("--no-sandbox") + options.add_argument("--incognito") + service = Service("F:\\chromedriver\\chromedriver.exe") + driver = webdriver.Chrome(service=service, options=options) + + try: + base_url = "https://www.deviantart.com/search/?q=" + driver.get(base_url + query) + + for page in range(1, max_pages + 1): + print(f"Scraping page {page}...") + time.sleep(random.uniform(5, 10)) + + # Detect throttling + if detect_throttling(driver): + print("Throttling detected. Retrying after delay...") + time.sleep(random.uniform(60, 120)) + driver.refresh() + continue + + grid_rows = driver.find_elements(By.XPATH, + "//div[@data-testid='grid-row']") + print(f"DEBUG: Found {len(grid_rows)} grid rows on the page.") + + if not grid_rows: + print("No more stories found.") + break + + for row in grid_rows: + story_elements = row.find_elements(By.XPATH, ".//a[contains(@href, '/art/')]") # noqa + for link in story_elements: + story_url = link.get_attribute("href") + + # Skip #comments URLs and already scraped URLs + if not story_url or story_url.endswith("#comments") or story_url in scraped_urls: # noqa + print(f"Skipping URL: {story_url}") + continue + + # Open the story in a new tab + driver.execute_script("window.open('{}', '_blank');".format(story_url)) # noqa + driver.switch_to.window(driver.window_handles[-1]) + + # Scrape the story + if scrape_story(driver, story_url, output_dir): + scraped_urls.add(story_url) + save_scraped_url(scraped_urls_file, story_url) + + # Close the tab and return to the main tab + driver.close() + driver.switch_to.window(driver.window_handles[0]) + + time.sleep(random.uniform(1, 3)) + + # Navigate to the next page + try: + next_button = driver.find_element(By.XPATH, "//a[contains(@href, 'cursor=') and contains(., 'Next')]") # noqa + driver.execute_script("arguments[0].scrollIntoView(true);", + next_button) + time.sleep(random.uniform(2, 5)) + next_button.click() + except Exception as e: + print(f"No more pages or navigation error: {e}") + break + finally: + driver.quit() + + +def detect_throttling(driver): + """ + Detect if throttling has occurred by checking for specific keywords. + """ + try: + if "captcha" in driver.page_source.lower() or "rate limit" in driver.page_source.lower(): # noqa + return True + except Exception as e: + print(f"Error during throttling detection: {e}") + return False + + +def scrape_story(driver, story_url, output_dir): + """ + Scrapes and saves a single story, skipping if the file already exists. + """ + try: + time.sleep(random.uniform(5, 10)) + title_element = driver.find_element(By.TAG_NAME, "h1") + story_element = driver.find_element(By.XPATH, + "//div[@data-editor-viewer='1']") + title = title_element.text.strip() + story = story_element.text.strip() + + if title and story: + filename = re.sub(r"[^a-zA-Z0-9_]+", "_", title) + ".txt" + filepath = os.path.join(output_dir, filename) + + if os.path.exists(filepath): + print(f"Skipping existing story: {title}") + return False + + with open(filepath, "w", encoding="utf-8") as f: + f.write(story) + print(f"Saved story: {title}") + return True + except Exception as e: + print(f"Error processing story {story_url}: {e}") + return False