Compare commits
2 Commits
a4ae884022
...
78e67b4321
Author | SHA1 | Date | |
---|---|---|---|
|
78e67b4321 | ||
|
7e407dd1c9 |
151
deviantart_scraper.py
Normal file
151
deviantart_scraper.py
Normal file
@ -0,0 +1,151 @@
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.common.action_chains import ActionChains
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
|
||||
def scrape_deviantart(query, max_pages):
|
||||
"""
|
||||
Scrapes text content from DeviantArt based on a query using Selenium.
|
||||
|
||||
Args:
|
||||
query (str): The search query to find relevant stories.
|
||||
max_pages (int): The number of pages to scrape.
|
||||
|
||||
Returns:
|
||||
None: Saves scraped content into text files in the `data` directory.
|
||||
"""
|
||||
output_dir = "data"
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
# Configure Selenium WebDriver for Brave
|
||||
options = Options()
|
||||
options.binary_location = "C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe" # Path to Brave browser executable
|
||||
# options.add_argument("--headless") # Uncomment to enable headless mode
|
||||
options.add_argument("--disable-gpu")
|
||||
options.add_argument("--no-sandbox")
|
||||
service = Service("F:\\chromedriver\\chromedriver.exe") # Path to your ChromeDriver
|
||||
driver = webdriver.Chrome(service=service, options=options)
|
||||
|
||||
try:
|
||||
base_url = "https://www.deviantart.com/search/?q="
|
||||
driver.get(base_url + query)
|
||||
scraped_urls = set() # To avoid duplicates
|
||||
|
||||
for page in range(1, max_pages + 1):
|
||||
print(f"Scraping page {page}...")
|
||||
time.sleep(3) # Allow time for dynamic content to load
|
||||
|
||||
# Locate all grid rows containing stories
|
||||
grid_rows = driver.find_elements(By.XPATH, "//div[@data-testid='grid-row']")
|
||||
print(f"DEBUG: Found {len(grid_rows)} grid rows on the page.")
|
||||
|
||||
if not grid_rows:
|
||||
print("No stories found on this page.")
|
||||
break
|
||||
|
||||
for row_idx, row in enumerate(grid_rows):
|
||||
story_elements = row.find_elements(By.XPATH, ".//a[contains(@class, 'torpedo-thumb-link')] | .//a[contains(@href, '/art/')]")
|
||||
print(f"DEBUG: Found {len(story_elements)} story elements in row {row_idx + 1}.")
|
||||
|
||||
for idx, link_element in enumerate(story_elements):
|
||||
try:
|
||||
story_url = link_element.get_attribute("href")
|
||||
|
||||
# Exclude URLs ending with '#comments'
|
||||
if story_url.endswith("#comments"):
|
||||
print(f"Skipping comment URL: {story_url}")
|
||||
continue
|
||||
|
||||
if story_url in scraped_urls:
|
||||
print(f"Skipping duplicate URL: {story_url}")
|
||||
continue
|
||||
|
||||
# Verify the URL format to ensure it's a story link
|
||||
if "/art/" not in story_url:
|
||||
print(f"Skipping non-story URL: {story_url}")
|
||||
continue
|
||||
|
||||
scraped_urls.add(story_url)
|
||||
print(f"Processing story {idx + 1} in row {row_idx + 1}: {story_url}")
|
||||
|
||||
# Open the story in a new tab
|
||||
driver.execute_script("window.open('{}', '_blank');".format(story_url))
|
||||
driver.switch_to.window(driver.window_handles[-1]) # Switch to the new tab
|
||||
|
||||
# Scrape the content
|
||||
scrape_deviation_content(driver, story_url, output_dir)
|
||||
|
||||
# Close the tab and return to the main tab
|
||||
driver.close()
|
||||
driver.switch_to.window(driver.window_handles[0])
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing a story element: {e}")
|
||||
|
||||
# Navigate to the next page if applicable
|
||||
try:
|
||||
next_button = driver.find_element(By.XPATH, "//a[contains(@href, 'cursor=') and contains(., 'Next')]")
|
||||
driver.execute_script("arguments[0].scrollIntoView(true);", next_button)
|
||||
time.sleep(1) # Allow any animations to complete
|
||||
next_button.click()
|
||||
time.sleep(2) # Allow time for the next page to load
|
||||
except Exception as e:
|
||||
print(f"No more pages available or navigation failed: {e}")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"An error occurred during scraping: {e}")
|
||||
finally:
|
||||
print("Browser will remain open for debugging. Close it manually when done.")
|
||||
|
||||
|
||||
def scrape_deviation_content(driver, story_url, output_dir):
|
||||
"""
|
||||
Scrapes the content of a single DeviantArt story page using Selenium.
|
||||
|
||||
Args:
|
||||
driver (WebDriver): Selenium WebDriver instance.
|
||||
story_url (str): URL of the deviation page.
|
||||
output_dir (str): Directory to save the story content.
|
||||
|
||||
Returns:
|
||||
None: Saves the story content into a text file.
|
||||
"""
|
||||
time.sleep(2) # Allow time for the page to load
|
||||
|
||||
try:
|
||||
title_element = driver.find_element(By.TAG_NAME, "h1")
|
||||
|
||||
# Locate the main story container by data-editor-viewer attribute
|
||||
try:
|
||||
story_container = driver.find_element(By.XPATH, "//div[@data-editor-viewer='1']")
|
||||
except:
|
||||
# Fallback to legacy-journal if data-editor-viewer is not found
|
||||
print("DEBUG: Falling back to legacy-journal.")
|
||||
story_container = driver.find_element(By.XPATH, "//div[contains(@class, 'legacy-journal')]")
|
||||
|
||||
# Extract the story content
|
||||
content = story_container.text.strip()
|
||||
|
||||
title = title_element.text.strip()
|
||||
|
||||
print(f"DEBUG: Extracted content length: {len(content)}") # Debugging log
|
||||
|
||||
if not title or not content:
|
||||
print(f"No content found on {story_url}")
|
||||
return
|
||||
|
||||
file_name = re.sub(r"[^a-zA-Z0-9_]+", "_", title) + ".txt"
|
||||
file_path = os.path.join(output_dir, file_name)
|
||||
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
print(f"Saved: {file_path}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Failed to scrape content from {story_url}: {e}")
|
148
scripts/deviantart_scraper.py
Normal file
148
scripts/deviantart_scraper.py
Normal file
@ -0,0 +1,148 @@
|
||||
import os
|
||||
import re
|
||||
import random
|
||||
import time
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
|
||||
|
||||
def load_scraped_urls(file_path):
|
||||
"""
|
||||
Load already scraped URLs from a file.
|
||||
"""
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
return set(line.strip() for line in f)
|
||||
return set()
|
||||
|
||||
|
||||
def save_scraped_url(file_path, url):
|
||||
"""
|
||||
Append a scraped URL to the file.
|
||||
"""
|
||||
with open(file_path, "a", encoding="utf-8") as f:
|
||||
f.write(url + "\n")
|
||||
|
||||
|
||||
def scrape_deviantart(query, max_pages):
|
||||
output_dir = "data/raw/deviantart"
|
||||
scraped_urls_file = "scraped_urls.txt"
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
# Load previously scraped URLs
|
||||
scraped_urls = load_scraped_urls(scraped_urls_file)
|
||||
|
||||
# Configure Selenium WebDriver
|
||||
options = Options()
|
||||
options.binary_location = "C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe" # noqa
|
||||
options.add_argument("--disable-gpu")
|
||||
options.add_argument("--no-sandbox")
|
||||
options.add_argument("--incognito")
|
||||
service = Service("F:\\chromedriver\\chromedriver.exe")
|
||||
driver = webdriver.Chrome(service=service, options=options)
|
||||
|
||||
try:
|
||||
base_url = "https://www.deviantart.com/search/?q="
|
||||
driver.get(base_url + query)
|
||||
|
||||
for page in range(1, max_pages + 1):
|
||||
print(f"Scraping page {page}...")
|
||||
time.sleep(random.uniform(5, 10))
|
||||
|
||||
# Detect throttling
|
||||
if detect_throttling(driver):
|
||||
print("Throttling detected. Retrying after delay...")
|
||||
time.sleep(random.uniform(60, 120))
|
||||
driver.refresh()
|
||||
continue
|
||||
|
||||
grid_rows = driver.find_elements(By.XPATH,
|
||||
"//div[@data-testid='grid-row']")
|
||||
print(f"DEBUG: Found {len(grid_rows)} grid rows on the page.")
|
||||
|
||||
if not grid_rows:
|
||||
print("No more stories found.")
|
||||
break
|
||||
|
||||
for row in grid_rows:
|
||||
story_elements = row.find_elements(By.XPATH, ".//a[contains(@href, '/art/')]") # noqa
|
||||
for link in story_elements:
|
||||
story_url = link.get_attribute("href")
|
||||
|
||||
# Skip #comments URLs and already scraped URLs
|
||||
if not story_url or story_url.endswith("#comments") or story_url in scraped_urls: # noqa
|
||||
print(f"Skipping URL: {story_url}")
|
||||
continue
|
||||
|
||||
# Open the story in a new tab
|
||||
driver.execute_script("window.open('{}', '_blank');".format(story_url)) # noqa
|
||||
driver.switch_to.window(driver.window_handles[-1])
|
||||
|
||||
# Scrape the story
|
||||
if scrape_story(driver, story_url, output_dir):
|
||||
scraped_urls.add(story_url)
|
||||
save_scraped_url(scraped_urls_file, story_url)
|
||||
|
||||
# Close the tab and return to the main tab
|
||||
driver.close()
|
||||
driver.switch_to.window(driver.window_handles[0])
|
||||
|
||||
time.sleep(random.uniform(1, 3))
|
||||
|
||||
# Navigate to the next page
|
||||
try:
|
||||
next_button = driver.find_element(By.XPATH, "//a[contains(@href, 'cursor=') and contains(., 'Next')]") # noqa
|
||||
driver.execute_script("arguments[0].scrollIntoView(true);",
|
||||
next_button)
|
||||
time.sleep(random.uniform(2, 5))
|
||||
next_button.click()
|
||||
except Exception as e:
|
||||
print(f"No more pages or navigation error: {e}")
|
||||
break
|
||||
finally:
|
||||
driver.quit()
|
||||
|
||||
|
||||
def detect_throttling(driver):
|
||||
"""
|
||||
Detect if throttling has occurred by checking for specific keywords.
|
||||
"""
|
||||
try:
|
||||
if "captcha" in driver.page_source.lower() or "rate limit" in driver.page_source.lower(): # noqa
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error during throttling detection: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def scrape_story(driver, story_url, output_dir):
|
||||
"""
|
||||
Scrapes and saves a single story, skipping if the file already exists.
|
||||
"""
|
||||
try:
|
||||
time.sleep(random.uniform(5, 10))
|
||||
title_element = driver.find_element(By.TAG_NAME, "h1")
|
||||
story_element = driver.find_element(By.XPATH,
|
||||
"//div[@data-editor-viewer='1']")
|
||||
title = title_element.text.strip()
|
||||
story = story_element.text.strip()
|
||||
|
||||
if title and story:
|
||||
filename = re.sub(r"[^a-zA-Z0-9_]+", "_", title) + ".txt"
|
||||
filepath = os.path.join(output_dir, filename)
|
||||
|
||||
if os.path.exists(filepath):
|
||||
print(f"Skipping existing story: {title}")
|
||||
return False
|
||||
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
f.write(story)
|
||||
print(f"Saved story: {title}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error processing story {story_url}: {e}")
|
||||
return False
|
Loading…
x
Reference in New Issue
Block a user