diff --git a/config.py b/config.py index 73d0919..67f365a 100644 --- a/config.py +++ b/config.py @@ -8,6 +8,8 @@ config = { 'GUILD_ID': int(os.getenv('DISCORD_GUILD_ID')), 'DISCORD_CHANNEL_ID': int(os.getenv('DISCORD_CHANNEL_ID')), 'YOUTUBE_API_KEY': os.getenv('YOUTUBE_API_KEY'), + 'TWITCH_CLIENT_ID': os.getenv('TWITCH_CLIENT_ID'), + 'TWITCH_CLIENT_SECRET': os.getenv('TWITCH_CLIENT_SECRET'), 'BUNGIE_API_KEY': os.getenv('BUNGIE_API_KEY'), 'OAUTH_URL': os.getenv('OAUTH_URL'), 'OAUTH_CLIENT_ID': os.getenv('OAUTH_CLIENT_ID'), @@ -29,6 +31,9 @@ config = { }, 'youtube': { 'enabled': True + }, + 'twitch': { + 'enabled': True } } } diff --git a/main.py b/main.py index 83b21b5..7f21227 100644 --- a/main.py +++ b/main.py @@ -54,6 +54,11 @@ class Selena(discord.Client): youtube = YouTube(self) youtube.setup(self.tree) + if config['modules']['twitch']['enabled']: + from modules.social.twitch import Twitch + twitch = Twitch(self) + twitch.setup(self.tree) + bot = Selena() diff --git a/modules/social/twitch.py b/modules/social/twitch.py new file mode 100644 index 0000000..251a0d1 --- /dev/null +++ b/modules/social/twitch.py @@ -0,0 +1,159 @@ +import discord +from discord import app_commands +import requests +import logging +import asyncio +import sqlite3 +from config import config + +class Twitch: + def __init__(self, bot): + self.bot = bot + self.client_id = config['TWITCH_CLIENT_ID'] + self.client_secret = config['TWITCH_CLIENT_SECRET'] + self.logger = logging.getLogger('Twitch') + self.logger.setLevel(logging.DEBUG) + handler = logging.FileHandler(filename='log/selena.log', encoding='utf-8', mode='w') + handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s:%(message)s')) + self.logger.addHandler(handler) + self.db_path = 'data/selena.db' + self.token = None + self.token_expiry = None + self.channel_alerts = {} + + async def get_token(self): + url = "https://id.twitch.tv/oauth2/token" + params = { + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'grant_type': 'client_credentials' + } + response = requests.post(url, params=params) + if response.status_code == 200: + data = response.json() + self.token = data['access_token'] + self.token_expiry = asyncio.get_event_loop().time() + data['expires_in'] + self.logger.info('Successfully obtained Twitch token') + else: + self.logger.error(f'Failed to obtain Twitch token: {response.status_code} - {response.text}') + + async def ensure_token(self): + if not self.token or asyncio.get_event_loop().time() >= self.token_expiry: + await self.get_token() + + async def fetch_channel_info(self, channel_name): + await self.ensure_token() + url = "https://api.twitch.tv/helix/streams" + headers = { + 'Authorization': f'Bearer {self.token}', + 'Client-Id': self.client_id + } + params = { + 'user_login': channel_name + } + response = requests.get(url, headers=headers, params=params) + self.logger.debug(f'Response status code: {response.status_code}') + self.logger.debug(f'Response content: {response.content}') + if response.status_code == 200: + data = response.json() + if data['data']: + return data['data'][0] # Return the first stream (should only be one) + return None + + async def fetch_user_info(self, user_id): + await self.ensure_token() + url = f"https://api.twitch.tv/helix/users?id={user_id}" + headers = { + 'Authorization': f'Bearer {self.token}', + 'Client-Id': self.client_id + } + response = requests.get(url, headers=headers) + if response.status_code == 200: + data = response.json() + if data['data']: + return data['data'][0] + return None + + async def check_channels(self): + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + cursor.execute("SELECT channel_name, alert_channel_id FROM twitch_channels") + channels = cursor.fetchall() + for channel_name, alert_channel_id in channels: + channel_info = await self.fetch_channel_info(channel_name) + if channel_info and not self.channel_alerts.get(channel_name): + await self.send_alert(alert_channel_id, channel_info) + self.channel_alerts[channel_name] = True + elif not channel_info and self.channel_alerts.get(channel_name): + self.channel_alerts[channel_name] = False + conn.close() + + async def send_alert(self, alert_channel_id, channel_info): + user_info = await self.fetch_user_info(channel_info['user_id']) + channel = self.bot.get_channel(alert_channel_id) + if channel: + title = channel_info['title'] + url = f"https://www.twitch.tv/{channel_info['user_login']}" + thumbnail = channel_info['thumbnail_url'].replace('{width}', '320').replace('{height}', '180') + logo = user_info['profile_image_url'] if user_info else None + embed = discord.Embed(title=title, url=url, color=discord.Color.purple()) + embed.set_thumbnail(url=logo if logo else thumbnail) + embed.add_field(name="Channel", value=channel_info['user_name'], inline=True) + embed.add_field(name="Game", value=channel_info['game_name'], inline=True) + await channel.send(embed=embed) + + def setup(self, tree: app_commands.CommandTree): + @tree.command(name="add_twitch_channel", description="Add a Twitch channel to monitor") + async def add_twitch_channel_command(interaction: discord.Interaction, channel_name: str, alert_channel: discord.TextChannel): + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + cursor.execute("INSERT INTO twitch_channels (channel_name, alert_channel_id) VALUES (?, ?)", (channel_name, alert_channel.id)) + conn.commit() + conn.close() + await interaction.response.send_message(embed=discord.Embed(description=f"Added Twitch channel {channel_name} to monitor.", color=discord.Color.green())) + + @tree.command(name="remove_twitch_channel", description="Remove a Twitch channel from monitoring") + async def remove_twitch_channel_command(interaction: discord.Interaction, channel_name: str): + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + cursor.execute("DELETE FROM twitch_channels WHERE channel_name = ?", (channel_name,)) + conn.commit() + conn.close() + await interaction.response.send_message(embed=discord.Embed(description=f"Removed Twitch channel {channel_name} from monitoring.", color=discord.Color.green())) + + @tree.command(name="check_twitch_channel", description="Check if a Twitch channel is live") + async def check_twitch_channel_command(interaction: discord.Interaction, channel_name: str): + channel_info = await self.fetch_channel_info(channel_name) + user_info = await self.fetch_user_info(channel_info['user_id']) if channel_info else None + if channel_info: + thumbnail = channel_info['thumbnail_url'].replace('{width}', '320').replace('{height}', '180') + logo = user_info['profile_image_url'] if user_info else None + embed = discord.Embed(title=f"{channel_info['user_name']} is live!", url=f"https://www.twitch.tv/{channel_info['user_login']}", color=discord.Color.purple()) + embed.set_thumbnail(url=logo if logo else thumbnail) + embed.add_field(name="Title", value=channel_info['title'], inline=False) + embed.add_field(name="Game", value=channel_info['game_name'], inline=False) + await interaction.response.send_message(embed=embed) + else: + await interaction.response.send_message(embed=discord.Embed(description=f"{channel_name} is not live.", color=discord.Color.red())) + + if not tree.get_command("add_twitch_channel"): + tree.add_command(add_twitch_channel_command) + + if not tree.get_command("remove_twitch_channel"): + tree.add_command(remove_twitch_channel_command) + + if not tree.get_command("check_twitch_channel"): + tree.add_command(check_twitch_channel_command) + + async def setup_hook(self): + await self.bot.wait_until_ready() + await self.get_token() + while not self.bot.is_closed(): + await self.check_channels() + await asyncio.sleep(300) # Check every 5 minutes + + +def setup(bot): + twitch = Twitch(bot) + twitch.setup(bot.tree) + bot.loop.create_task(twitch.setup_hook())