149 lines
5.2 KiB
Python
149 lines
5.2 KiB
Python
|
import os
|
||
|
import re
|
||
|
import random
|
||
|
import time
|
||
|
from selenium import webdriver
|
||
|
from selenium.webdriver.common.by import By
|
||
|
from selenium.webdriver.chrome.service import Service
|
||
|
from selenium.webdriver.chrome.options import Options
|
||
|
|
||
|
|
||
|
def load_scraped_urls(file_path):
|
||
|
"""
|
||
|
Load already scraped URLs from a file.
|
||
|
"""
|
||
|
if os.path.exists(file_path):
|
||
|
with open(file_path, "r", encoding="utf-8") as f:
|
||
|
return set(line.strip() for line in f)
|
||
|
return set()
|
||
|
|
||
|
|
||
|
def save_scraped_url(file_path, url):
|
||
|
"""
|
||
|
Append a scraped URL to the file.
|
||
|
"""
|
||
|
with open(file_path, "a", encoding="utf-8") as f:
|
||
|
f.write(url + "\n")
|
||
|
|
||
|
|
||
|
def scrape_deviantart(query, max_pages):
|
||
|
output_dir = "data/raw/deviantart"
|
||
|
scraped_urls_file = "scraped_urls.txt"
|
||
|
|
||
|
if not os.path.exists(output_dir):
|
||
|
os.makedirs(output_dir)
|
||
|
|
||
|
# Load previously scraped URLs
|
||
|
scraped_urls = load_scraped_urls(scraped_urls_file)
|
||
|
|
||
|
# Configure Selenium WebDriver
|
||
|
options = Options()
|
||
|
options.binary_location = "C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe" # noqa
|
||
|
options.add_argument("--disable-gpu")
|
||
|
options.add_argument("--no-sandbox")
|
||
|
options.add_argument("--incognito")
|
||
|
service = Service("F:\\chromedriver\\chromedriver.exe")
|
||
|
driver = webdriver.Chrome(service=service, options=options)
|
||
|
|
||
|
try:
|
||
|
base_url = "https://www.deviantart.com/search/?q="
|
||
|
driver.get(base_url + query)
|
||
|
|
||
|
for page in range(1, max_pages + 1):
|
||
|
print(f"Scraping page {page}...")
|
||
|
time.sleep(random.uniform(5, 10))
|
||
|
|
||
|
# Detect throttling
|
||
|
if detect_throttling(driver):
|
||
|
print("Throttling detected. Retrying after delay...")
|
||
|
time.sleep(random.uniform(60, 120))
|
||
|
driver.refresh()
|
||
|
continue
|
||
|
|
||
|
grid_rows = driver.find_elements(By.XPATH,
|
||
|
"//div[@data-testid='grid-row']")
|
||
|
print(f"DEBUG: Found {len(grid_rows)} grid rows on the page.")
|
||
|
|
||
|
if not grid_rows:
|
||
|
print("No more stories found.")
|
||
|
break
|
||
|
|
||
|
for row in grid_rows:
|
||
|
story_elements = row.find_elements(By.XPATH, ".//a[contains(@href, '/art/')]") # noqa
|
||
|
for link in story_elements:
|
||
|
story_url = link.get_attribute("href")
|
||
|
|
||
|
# Skip #comments URLs and already scraped URLs
|
||
|
if not story_url or story_url.endswith("#comments") or story_url in scraped_urls: # noqa
|
||
|
print(f"Skipping URL: {story_url}")
|
||
|
continue
|
||
|
|
||
|
# Open the story in a new tab
|
||
|
driver.execute_script("window.open('{}', '_blank');".format(story_url)) # noqa
|
||
|
driver.switch_to.window(driver.window_handles[-1])
|
||
|
|
||
|
# Scrape the story
|
||
|
if scrape_story(driver, story_url, output_dir):
|
||
|
scraped_urls.add(story_url)
|
||
|
save_scraped_url(scraped_urls_file, story_url)
|
||
|
|
||
|
# Close the tab and return to the main tab
|
||
|
driver.close()
|
||
|
driver.switch_to.window(driver.window_handles[0])
|
||
|
|
||
|
time.sleep(random.uniform(1, 3))
|
||
|
|
||
|
# Navigate to the next page
|
||
|
try:
|
||
|
next_button = driver.find_element(By.XPATH, "//a[contains(@href, 'cursor=') and contains(., 'Next')]") # noqa
|
||
|
driver.execute_script("arguments[0].scrollIntoView(true);",
|
||
|
next_button)
|
||
|
time.sleep(random.uniform(2, 5))
|
||
|
next_button.click()
|
||
|
except Exception as e:
|
||
|
print(f"No more pages or navigation error: {e}")
|
||
|
break
|
||
|
finally:
|
||
|
driver.quit()
|
||
|
|
||
|
|
||
|
def detect_throttling(driver):
|
||
|
"""
|
||
|
Detect if throttling has occurred by checking for specific keywords.
|
||
|
"""
|
||
|
try:
|
||
|
if "captcha" in driver.page_source.lower() or "rate limit" in driver.page_source.lower(): # noqa
|
||
|
return True
|
||
|
except Exception as e:
|
||
|
print(f"Error during throttling detection: {e}")
|
||
|
return False
|
||
|
|
||
|
|
||
|
def scrape_story(driver, story_url, output_dir):
|
||
|
"""
|
||
|
Scrapes and saves a single story, skipping if the file already exists.
|
||
|
"""
|
||
|
try:
|
||
|
time.sleep(random.uniform(5, 10))
|
||
|
title_element = driver.find_element(By.TAG_NAME, "h1")
|
||
|
story_element = driver.find_element(By.XPATH,
|
||
|
"//div[@data-editor-viewer='1']")
|
||
|
title = title_element.text.strip()
|
||
|
story = story_element.text.strip()
|
||
|
|
||
|
if title and story:
|
||
|
filename = re.sub(r"[^a-zA-Z0-9_]+", "_", title) + ".txt"
|
||
|
filepath = os.path.join(output_dir, filename)
|
||
|
|
||
|
if os.path.exists(filepath):
|
||
|
print(f"Skipping existing story: {title}")
|
||
|
return False
|
||
|
|
||
|
with open(filepath, "w", encoding="utf-8") as f:
|
||
|
f.write(story)
|
||
|
print(f"Saved story: {title}")
|
||
|
return True
|
||
|
except Exception as e:
|
||
|
print(f"Error processing story {story_url}: {e}")
|
||
|
return False
|