moved the scrapper into a better location

This commit is contained in:
Dan 2024-12-21 08:58:12 -05:00
parent 7e407dd1c9
commit 78e67b4321

View File

@ -0,0 +1,148 @@
import os
import re
import random
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
def load_scraped_urls(file_path):
"""
Load already scraped URLs from a file.
"""
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return set(line.strip() for line in f)
return set()
def save_scraped_url(file_path, url):
"""
Append a scraped URL to the file.
"""
with open(file_path, "a", encoding="utf-8") as f:
f.write(url + "\n")
def scrape_deviantart(query, max_pages):
output_dir = "data/raw/deviantart"
scraped_urls_file = "scraped_urls.txt"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Load previously scraped URLs
scraped_urls = load_scraped_urls(scraped_urls_file)
# Configure Selenium WebDriver
options = Options()
options.binary_location = "C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe" # noqa
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--incognito")
service = Service("F:\\chromedriver\\chromedriver.exe")
driver = webdriver.Chrome(service=service, options=options)
try:
base_url = "https://www.deviantart.com/search/?q="
driver.get(base_url + query)
for page in range(1, max_pages + 1):
print(f"Scraping page {page}...")
time.sleep(random.uniform(5, 10))
# Detect throttling
if detect_throttling(driver):
print("Throttling detected. Retrying after delay...")
time.sleep(random.uniform(60, 120))
driver.refresh()
continue
grid_rows = driver.find_elements(By.XPATH,
"//div[@data-testid='grid-row']")
print(f"DEBUG: Found {len(grid_rows)} grid rows on the page.")
if not grid_rows:
print("No more stories found.")
break
for row in grid_rows:
story_elements = row.find_elements(By.XPATH, ".//a[contains(@href, '/art/')]") # noqa
for link in story_elements:
story_url = link.get_attribute("href")
# Skip #comments URLs and already scraped URLs
if not story_url or story_url.endswith("#comments") or story_url in scraped_urls: # noqa
print(f"Skipping URL: {story_url}")
continue
# Open the story in a new tab
driver.execute_script("window.open('{}', '_blank');".format(story_url)) # noqa
driver.switch_to.window(driver.window_handles[-1])
# Scrape the story
if scrape_story(driver, story_url, output_dir):
scraped_urls.add(story_url)
save_scraped_url(scraped_urls_file, story_url)
# Close the tab and return to the main tab
driver.close()
driver.switch_to.window(driver.window_handles[0])
time.sleep(random.uniform(1, 3))
# Navigate to the next page
try:
next_button = driver.find_element(By.XPATH, "//a[contains(@href, 'cursor=') and contains(., 'Next')]") # noqa
driver.execute_script("arguments[0].scrollIntoView(true);",
next_button)
time.sleep(random.uniform(2, 5))
next_button.click()
except Exception as e:
print(f"No more pages or navigation error: {e}")
break
finally:
driver.quit()
def detect_throttling(driver):
"""
Detect if throttling has occurred by checking for specific keywords.
"""
try:
if "captcha" in driver.page_source.lower() or "rate limit" in driver.page_source.lower(): # noqa
return True
except Exception as e:
print(f"Error during throttling detection: {e}")
return False
def scrape_story(driver, story_url, output_dir):
"""
Scrapes and saves a single story, skipping if the file already exists.
"""
try:
time.sleep(random.uniform(5, 10))
title_element = driver.find_element(By.TAG_NAME, "h1")
story_element = driver.find_element(By.XPATH,
"//div[@data-editor-viewer='1']")
title = title_element.text.strip()
story = story_element.text.strip()
if title and story:
filename = re.sub(r"[^a-zA-Z0-9_]+", "_", title) + ".txt"
filepath = os.path.join(output_dir, filename)
if os.path.exists(filepath):
print(f"Skipping existing story: {title}")
return False
with open(filepath, "w", encoding="utf-8") as f:
f.write(story)
print(f"Saved story: {title}")
return True
except Exception as e:
print(f"Error processing story {story_url}: {e}")
return False