FEAT: Youtube Module is done as it's going to be
This commit is contained in:
parent
f7f1e14b4b
commit
c0dd278cd4
@ -4,7 +4,6 @@ import requests
|
|||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from datetime import datetime
|
|
||||||
from config import config
|
from config import config
|
||||||
|
|
||||||
|
|
||||||
@ -18,14 +17,57 @@ class YouTube:
|
|||||||
handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s:%(message)s'))
|
handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s:%(message)s'))
|
||||||
self.logger.addHandler(handler)
|
self.logger.addHandler(handler)
|
||||||
self.db_path = 'data/selena.db'
|
self.db_path = 'data/selena.db'
|
||||||
|
self.channel_alerts = {}
|
||||||
|
|
||||||
async def fetch_latest_video(self, channel_id):
|
async def fetch_channel_id(self, identifier):
|
||||||
url = f'https://www.googleapis.com/youtube/v3/search?key={self.api_key}&channelId={channel_id}&part=snippet,id&order=date&maxResults=1'
|
if identifier.startswith('@'):
|
||||||
|
url = f'https://www.googleapis.com/youtube/v3/search?part=snippet&q={identifier[1:]}&type=channel&key={self.api_key}'
|
||||||
|
else:
|
||||||
|
url = f'https://www.googleapis.com/youtube/v3/channels?part=id&id={identifier}&key={self.api_key}'
|
||||||
|
self.logger.debug(f'Fetching channel ID with URL: {url}')
|
||||||
response = requests.get(url)
|
response = requests.get(url)
|
||||||
|
self.logger.debug(f'Channel ID response status code: {response.status_code}')
|
||||||
|
self.logger.debug(f'Channel ID response content: {response.content}')
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
data = response.json()
|
data = response.json()
|
||||||
if data['items']:
|
if data.get('items'):
|
||||||
|
if identifier.startswith('@'):
|
||||||
|
for item in data['items']:
|
||||||
|
if item['snippet']['title'].lower() == identifier[1:].lower():
|
||||||
|
self.logger.debug(f'Found channel ID: {item["id"]["channelId"]} for {identifier}')
|
||||||
|
return item['id']['channelId']
|
||||||
|
else:
|
||||||
|
return data['items'][0]['id']
|
||||||
|
else:
|
||||||
|
self.logger.error(f'Failed to fetch channel ID: {response.status_code} - {response.text}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def fetch_latest_video(self, channel_id):
|
||||||
|
url = f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}&order=date&maxResults=1&type=video&key={self.api_key}'
|
||||||
|
self.logger.debug(f'Fetching latest video with URL: {url}')
|
||||||
|
response = requests.get(url)
|
||||||
|
self.logger.debug(f'Latest video response status code: {response.status_code}')
|
||||||
|
self.logger.debug(f'Latest video response content: {response.content}')
|
||||||
|
if response.status_code == 200:
|
||||||
|
data = response.json()
|
||||||
|
if data.get('items'):
|
||||||
return data['items'][0]
|
return data['items'][0]
|
||||||
|
else:
|
||||||
|
self.logger.error(f'Failed to fetch latest video: {response.status_code} - {response.text}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def fetch_channel_info(self, channel_id):
|
||||||
|
url = f'https://www.googleapis.com/youtube/v3/channels?part=snippet&id={channel_id}&key={self.api_key}'
|
||||||
|
self.logger.debug(f'Fetching channel info with URL: {url}')
|
||||||
|
response = requests.get(url)
|
||||||
|
self.logger.debug(f'Channel info response status code: {response.status_code}')
|
||||||
|
self.logger.debug(f'Channel info response content: {response.content}')
|
||||||
|
if response.status_code == 200:
|
||||||
|
data = response.json()
|
||||||
|
if data.get('items'):
|
||||||
|
return data['items'][0]
|
||||||
|
else:
|
||||||
|
self.logger.error(f'Failed to fetch channel info: {response.status_code} - {response.text}')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def check_channels(self):
|
async def check_channels(self):
|
||||||
@ -39,6 +81,7 @@ class YouTube:
|
|||||||
await self.send_alert(alert_channel_id, latest_video)
|
await self.send_alert(alert_channel_id, latest_video)
|
||||||
cursor.execute("UPDATE youtube_channels SET last_video_id = ? WHERE channel_id = ?", (latest_video['id']['videoId'], channel_id))
|
cursor.execute("UPDATE youtube_channels SET last_video_id = ? WHERE channel_id = ?", (latest_video['id']['videoId'], channel_id))
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
await self.check_if_live(channel_id, alert_channel_id)
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
async def send_alert(self, alert_channel_id, video):
|
async def send_alert(self, alert_channel_id, video):
|
||||||
@ -52,15 +95,49 @@ class YouTube:
|
|||||||
embed.set_thumbnail(url=thumbnail)
|
embed.set_thumbnail(url=thumbnail)
|
||||||
await channel.send(embed=embed)
|
await channel.send(embed=embed)
|
||||||
|
|
||||||
|
async def check_if_live(self, channel_id, alert_channel_id):
|
||||||
|
url = f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}&type=video&eventType=live&key={self.api_key}'
|
||||||
|
self.logger.debug(f'Checking live status with URL: {url}')
|
||||||
|
response = requests.get(url)
|
||||||
|
self.logger.debug(f'Live status response status code: {response.status_code}')
|
||||||
|
self.logger.debug(f'Live status response content: {response.content}')
|
||||||
|
if response.status_code == 200:
|
||||||
|
data = response.json()
|
||||||
|
if data.get('items'):
|
||||||
|
for live_video in data['items']:
|
||||||
|
if live_video['id']['kind'] == 'youtube#video':
|
||||||
|
await self.send_live_alert(alert_channel_id, live_video)
|
||||||
|
return True # Indicate that the channel is live
|
||||||
|
elif response.status_code == 400:
|
||||||
|
# Handle the specific error case with additional logging
|
||||||
|
self.logger.error(f'Error checking live status: {response.status_code} - {response.text}')
|
||||||
|
return False # Indicate that the channel is not live
|
||||||
|
|
||||||
|
async def send_live_alert(self, alert_channel_id, live_video):
|
||||||
|
channel = self.bot.get_channel(alert_channel_id)
|
||||||
|
if channel:
|
||||||
|
title = live_video['snippet']['title']
|
||||||
|
description = live_video['snippet']['description']
|
||||||
|
url = f"https://www.youtube.com/watch?v={live_video['id']['videoId']}"
|
||||||
|
thumbnail = live_video['snippet']['thumbnails']['high']['url']
|
||||||
|
embed = discord.Embed(title=title, description=description, url=url, color=discord.Color.red())
|
||||||
|
embed.set_thumbnail(url=thumbnail)
|
||||||
|
embed.add_field(name="Status", value="Live", inline=True)
|
||||||
|
await channel.send(embed=embed)
|
||||||
|
|
||||||
def setup(self, tree: app_commands.CommandTree):
|
def setup(self, tree: app_commands.CommandTree):
|
||||||
@tree.command(name="add_youtube_channel", description="Add a YouTube channel to monitor")
|
@tree.command(name="add_youtube_channel", description="Add a YouTube channel to monitor")
|
||||||
async def add_youtube_channel_command(interaction: discord.Interaction, channel_id: str, alert_channel: discord.TextChannel):
|
async def add_youtube_channel_command(interaction: discord.Interaction, identifier: str, alert_channel: discord.TextChannel):
|
||||||
conn = sqlite3.connect(self.db_path)
|
channel_id = await self.fetch_channel_id(identifier)
|
||||||
cursor = conn.cursor()
|
if channel_id:
|
||||||
cursor.execute("INSERT INTO youtube_channels (channel_id, last_video_id, alert_channel_id) VALUES (?, ?, ?)", (channel_id, '', alert_channel.id))
|
conn = sqlite3.connect(self.db_path)
|
||||||
conn.commit()
|
cursor = conn.cursor()
|
||||||
conn.close()
|
cursor.execute("INSERT INTO youtube_channels (channel_id, last_video_id, alert_channel_id) VALUES (?, ?, ?)", (channel_id, '', alert_channel.id))
|
||||||
await interaction.response.send_message(embed=discord.Embed(description=f"Added YouTube channel {channel_id} to monitor.", color=discord.Color.green()))
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
await interaction.response.send_message(embed=discord.Embed(description=f"Added YouTube channel {identifier} to monitor.", color=discord.Color.green()))
|
||||||
|
else:
|
||||||
|
await interaction.response.send_message(embed=discord.Embed(description=f"Failed to find YouTube channel {identifier}.", color=discord.Color.red()))
|
||||||
|
|
||||||
@tree.command(name="remove_youtube_channel", description="Remove a YouTube channel from monitoring")
|
@tree.command(name="remove_youtube_channel", description="Remove a YouTube channel from monitoring")
|
||||||
async def remove_youtube_channel_command(interaction: discord.Interaction, channel_id: str):
|
async def remove_youtube_channel_command(interaction: discord.Interaction, channel_id: str):
|
||||||
@ -71,12 +148,33 @@ class YouTube:
|
|||||||
conn.close()
|
conn.close()
|
||||||
await interaction.response.send_message(embed=discord.Embed(description=f"Removed YouTube channel {channel_id} from monitoring.", color=discord.Color.green()))
|
await interaction.response.send_message(embed=discord.Embed(description=f"Removed YouTube channel {channel_id} from monitoring.", color=discord.Color.green()))
|
||||||
|
|
||||||
|
@tree.command(name="check_youtube_channel", description="Check if a YouTube channel is live")
|
||||||
|
async def check_youtube_channel_command(interaction: discord.Interaction, identifier: str):
|
||||||
|
channel_id = await self.fetch_channel_id(identifier)
|
||||||
|
if not channel_id:
|
||||||
|
await interaction.response.send_message(embed=discord.Embed(description=f"Failed to find YouTube channel {identifier}.", color=discord.Color.red()))
|
||||||
|
return
|
||||||
|
is_live = await self.check_if_live(channel_id, interaction.channel_id)
|
||||||
|
if is_live:
|
||||||
|
await interaction.response.send_message(embed=discord.Embed(description=f"{identifier} is live!", color=discord.Color.green()))
|
||||||
|
else:
|
||||||
|
channel_info = await self.fetch_channel_info(channel_id)
|
||||||
|
if channel_info:
|
||||||
|
embed = discord.Embed(description=f"{channel_info['snippet']['title']} is not live.", color=discord.Color.red())
|
||||||
|
embed.set_thumbnail(url=channel_info['snippet']['thumbnails']['high']['url'])
|
||||||
|
await interaction.response.send_message(embed=embed)
|
||||||
|
else:
|
||||||
|
await interaction.response.send_message(embed=discord.Embed(description=f"{identifier} is not live.", color=discord.Color.red()))
|
||||||
|
|
||||||
if not tree.get_command("add_youtube_channel"):
|
if not tree.get_command("add_youtube_channel"):
|
||||||
tree.add_command(add_youtube_channel_command)
|
tree.add_command(add_youtube_channel_command)
|
||||||
|
|
||||||
if not tree.get_command("remove_youtube_channel"):
|
if not tree.get_command("remove_youtube_channel"):
|
||||||
tree.add_command(remove_youtube_channel_command)
|
tree.add_command(remove_youtube_channel_command)
|
||||||
|
|
||||||
|
if not tree.get_command("check_youtube_channel"):
|
||||||
|
tree.add_command(check_youtube_channel_command)
|
||||||
|
|
||||||
async def setup_hook(self):
|
async def setup_hook(self):
|
||||||
await self.bot.wait_until_ready()
|
await self.bot.wait_until_ready()
|
||||||
while not self.bot.is_closed():
|
while not self.bot.is_closed():
|
||||||
|
Loading…
x
Reference in New Issue
Block a user