From d69f1099ff71751473dce3ebecb7d8dbfba200b5 Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 27 Jan 2025 17:34:47 -0500 Subject: [PATCH] Adding 24/7 Support --- audio.py | 304 +++++++--------------------------------------------- commands.py | 38 ++++--- 2 files changed, 66 insertions(+), 276 deletions(-) diff --git a/audio.py b/audio.py index 24a228e..e26fb6f 100644 --- a/audio.py +++ b/audio.py @@ -3,56 +3,55 @@ import yt_dlp import asyncio import os import json -import random import requests import time from dotenv import load_dotenv +from typing import List, Dict, Any load_dotenv() -voice_clients = {} # Track active voice connections -music_queues = {} # Per-guild song queue -current_tracks = {} # Currently playing tracks -volumes = {} # Volume levels per guild -repeat_modes = {} # Tracks repeat mode per guild: "one", "all", or "off" -now_playing_messages = {} # Tracks the "Now Playing" embed per guild -progress_tasks = {} # Progress bar tasks per guild -default_volume = 0.05 # Default volume (5%) -USER_DATA_FILE = 'user_data.json' -YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") # Replace with your API key +voice_clients: Dict[int, discord.VoiceClient] = {} +music_queues: Dict[int, List[Any]] = {} +current_tracks: Dict[int, Any] = {} +volumes: Dict[int, float] = {} +repeat_modes: Dict[int, str] = {} +now_playing_messages: Dict[int, int] = {} +progress_tasks: Dict[int, asyncio.Task] = {} +_247_mode: Dict[int, bool] = {} + +default_volume: float = 0.05 +USER_DATA_FILE: str = 'user_data.json' +YOUTUBE_API_KEY: str | None = os.getenv("YOUTUBE_API_KEY") # ---------- Utility Functions ---------- -def load_user_data(): +def load_user_data() -> Dict[str, Any]: if not os.path.exists(USER_DATA_FILE): return {} with open(USER_DATA_FILE, 'r') as f: return json.load(f) -def save_user_data(data): +def save_user_data(data: Dict[str, Any]) -> None: with open(USER_DATA_FILE, 'w') as f: json.dump(data, f, indent=4) -def update_user_history(user_id, song_title, artist): +def update_user_history(user_id: int, song_title: str, artist: str) -> None: user_data = load_user_data() if str(user_id) not in user_data: user_data[str(user_id)] = {"history": []} - # Add the song to the user's history user_data[str(user_id)]["history"].append({"title": song_title, "artist": artist}) save_user_data(user_data) # ---------- Playback Functions ---------- -async def join_voice(interaction: discord.Interaction): - """Joins the user's voice channel.""" +async def join_voice(interaction: discord.Interaction) -> bool: guild_id = interaction.guild.id channel = interaction.user.voice.channel if guild_id in voice_clients and voice_clients[guild_id].channel == channel: - # Already in the same channel return True try: @@ -64,28 +63,23 @@ async def join_voice(interaction: discord.Interaction): return False -async def play_audio(interaction: discord.Interaction, query: str): +async def play_audio(interaction: discord.Interaction, query: str) -> None: guild_id = interaction.guild.id - # Immediate acknowledgment to prevent "Amber is thinking" if not interaction.response.is_done(): await interaction.response.defer() - # Ensure Amber is connected to a voice channel if guild_id not in voice_clients or not voice_clients[guild_id].is_connected(): joined = await join_voice(interaction) if not joined: await interaction.followup.send("❌ **Error:** Could not join the voice channel.") return - # Start a background task to fetch and queue the song asyncio.create_task(fetch_and_queue_song(interaction, query)) -async def fetch_and_queue_song(interaction: discord.Interaction, query: str): +async def fetch_and_queue_song(interaction: discord.Interaction, query: str) -> None: guild_id = interaction.guild.id - - # Search for the song on YouTube ydl_opts = { 'format': 'bestaudio/best', 'quiet': True, @@ -102,44 +96,44 @@ async def fetch_and_queue_song(interaction: discord.Interaction, query: str): artist = info.get('uploader', 'Unknown Artist') duration = info.get('duration', 0) - # Log the song in the user's history update_user_history(interaction.user.id, title, artist) - # Add the song to the queue if guild_id not in music_queues: music_queues[guild_id] = [] music_queues[guild_id].append((song_url, title, duration)) await interaction.followup.send(f"βœ… **Added to Queue:** {title}") - # Start playback if nothing is playing if not voice_clients[guild_id].is_playing(): await play_next(interaction) -async def play_next(interaction: discord.Interaction): +async def play_next(interaction: discord.Interaction) -> None: guild_id = interaction.guild.id if guild_id not in music_queues or not music_queues[guild_id]: - # If no songs are left in the queue - if guild_id in now_playing_messages and now_playing_messages[guild_id]: - try: - message = await interaction.channel.fetch_message(now_playing_messages[guild_id]) - embed = discord.Embed( - title="🎡 Queue Finished", - description="Amber is now idle. Add more songs to the queue to keep the music going!", - color=discord.Color.red() - ) - await message.edit(embed=embed, view=None) # Remove playback controls - except discord.NotFound: - pass - return + if _247_mode.get(guild_id, True): + if guild_id in now_playing_messages and now_playing_messages[guild_id]: + try: + message = await interaction.channel.fetch_message(now_playing_messages[guild_id]) + embed = discord.Embed( + title="🎡 Queue Empty", + description="Amber is in 24/7 mode. Add more songs to the queue to resume playback.", + color=discord.Color.orange() + ) + await message.edit(embed=embed, view=None) + except discord.NotFound: + pass + return + else: + if guild_id in voice_clients and voice_clients[guild_id].is_connected(): + await voice_clients[guild_id].disconnect() + del voice_clients[guild_id] + return - # Get the next song from the queue song_url, title, duration = music_queues[guild_id].pop(0) current_tracks[guild_id] = (song_url, title) - # Prepare FFmpeg options ffmpeg_options = { 'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn', @@ -151,7 +145,6 @@ async def play_next(interaction: discord.Interaction): volume = volumes.get(guild_id, default_volume) source = discord.PCMVolumeTransformer(source, volume=volume) - # Play the audio voice_client.play( source, after=lambda e: asyncio.run_coroutine_threadsafe( @@ -159,38 +152,34 @@ async def play_next(interaction: discord.Interaction): ), ) - # Create the enhanced embed embed = discord.Embed( title="🎡 Now Playing", description=f"**{title}**\n\n`00:00 / {time.strftime('%M:%S', time.gmtime(duration))}`", color=discord.Color.green() ) - embed.set_thumbnail(url="https://img.youtube.com/vi/{}/0.jpg".format(song_url.split("?v=")[-1])) # Thumbnail + embed.set_thumbnail(url=f"https://img.youtube.com/vi/{song_url.split('?v=')[-1]}/0.jpg") embed.add_field(name="Artist", value=artist, inline=True) embed.set_footer(text="React with πŸ‘ or πŸ‘Ž to rate this song!") view = PlaybackControls(interaction, guild_id) - # Update or send the embed if guild_id in now_playing_messages and now_playing_messages[guild_id]: try: message = await interaction.channel.fetch_message(now_playing_messages[guild_id]) await message.edit(embed=embed, view=view) except discord.NotFound: - # If the message was deleted, send a new one message = await interaction.followup.send(embed=embed, view=view) now_playing_messages[guild_id] = message.id else: message = await interaction.followup.send(embed=embed, view=view) now_playing_messages[guild_id] = message.id - # Start updating the progress bar start_time = time.time() asyncio.create_task(update_progress_bar(interaction, duration, start_time)) except Exception as e: await interaction.followup.send(f"❌ **Error:** Could not play the next song. {str(e)}") -async def update_progress_bar(interaction: discord.Interaction, duration: int, start_time: float): +async def update_progress_bar(interaction: discord.Interaction, duration: int, start_time: float) -> None: guild_id = interaction.guild.id try: @@ -218,216 +207,3 @@ async def update_progress_bar(interaction: discord.Interaction, duration: int, s await asyncio.sleep(1) except asyncio.CancelledError: pass - - -async def stop_audio(interaction: discord.Interaction): - guild_id = interaction.guild.id - - if guild_id in voice_clients: - voice_client = voice_clients[guild_id] - if voice_client.is_playing(): - voice_client.stop() - await interaction.response.send_message("⏹️ **Playback stopped.**") - else: - await interaction.response.send_message("⚠️ **Nothing is playing.**") - - # Clear the queue but do not disconnect - music_queues[guild_id] = [] - await interaction.followup.send("πŸ—‘οΈ **Queue cleared. Amber will remain idle in the voice channel.**") - else: - await interaction.response.send_message("⚠️ **Amber is not connected to a voice channel.**") - - -async def pause_audio(interaction: discord.Interaction): - """Pauses the currently playing song.""" - guild_id = interaction.guild.id - - if guild_id in voice_clients and voice_clients[guild_id].is_playing(): - voice_clients[guild_id].pause() # Pause playback - await interaction.response.send_message("⏸️ **Music paused.**") - else: - await interaction.response.send_message("❌ **No music is currently playing.**") - - -async def resume_audio(interaction: discord.Interaction): - """Resumes playback of a paused song.""" - guild_id = interaction.guild.id - - if guild_id in voice_clients and voice_clients[guild_id].is_paused(): - voice_clients[guild_id].resume() # Resume playback - await interaction.response.send_message("▢️ **Music resumed.**") - else: - await interaction.response.send_message("❌ **No paused music to resume.**") - - -async def set_volume(interaction: discord.Interaction, level: int): - guild_id = interaction.guild.id - - if level < 0 or level > 100: - await interaction.response.send_message("❌ **Volume must be between 0 and 100.**") - return - - # Set the volume - volume = level / 100 - volumes[guild_id] = volume - - # Adjust volume for the current source if playing - if guild_id in voice_clients and voice_clients[guild_id].is_playing(): - source = voice_clients[guild_id].source - if isinstance(source, discord.PCMVolumeTransformer): - source.volume = volume - - await interaction.response.send_message(f"πŸ”Š **Volume set to {level}%**.") - - -async def leave_voice(interaction: discord.Interaction): - guild_id = interaction.guild.id - - if guild_id in voice_clients: - voice_client = voice_clients[guild_id] - if voice_client.is_connected(): - await voice_client.disconnect() - voice_clients.pop(guild_id, None) - await interaction.response.send_message("πŸ‘‹ **Left the voice channel.**") - else: - await interaction.response.send_message("❌ **I am not connected to any voice channel.**") - else: - await interaction.response.send_message("❌ **I am not connected to any voice channel.**") - - -# ---------- Discord UI ---------- - - -class PlaybackControls(discord.ui.View): - def __init__(self, interaction: discord.Interaction, guild_id: int): - super().__init__(timeout=None) - self.interaction = interaction - self.guild_id = guild_id - - @discord.ui.button(label="Pause", style=discord.ButtonStyle.primary) - async def pause(self, interaction: discord.Interaction, button: discord.ui.Button): - if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing(): - voice_clients[self.guild_id].pause() - await interaction.response.send_message("⏸️ **Playback paused.**", ephemeral=True) - else: - await interaction.response.send_message("⚠️ **No audio is playing.**", ephemeral=True) - - @discord.ui.button(label="Resume", style=discord.ButtonStyle.green) - async def resume(self, interaction: discord.Interaction, button: discord.ui.Button): - if self.guild_id in voice_clients and voice_clients[self.guild_id].is_paused(): - voice_clients[self.guild_id].resume() - await interaction.response.send_message("▢️ **Playback resumed.**", ephemeral=True) - else: - await interaction.response.send_message("⚠️ **No audio is paused.**", ephemeral=True) - - @discord.ui.button(label="Stop", style=discord.ButtonStyle.danger) - async def stop(self, interaction: discord.Interaction, button: discord.ui.Button): - if self.guild_id in voice_clients: - voice_clients[self.guild_id].stop() - music_queues[self.guild_id] = [] # Clear the queue - await interaction.response.send_message("⏹️ **Playback stopped and queue cleared.**", ephemeral=True) - else: - await interaction.response.send_message("⚠️ **No audio is playing.**", ephemeral=True) - - @discord.ui.button(label="Skip", style=discord.ButtonStyle.secondary) - async def skip(self, interaction: discord.Interaction, button: discord.ui.Button): - if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing(): - voice_clients[self.guild_id].stop() # Trigger play_next - await interaction.response.send_message("⏭️ **Skipped to the next track.**", ephemeral=True) - else: - await interaction.response.send_message("⚠️ **No audio is playing.**", ephemeral=True) - - @discord.ui.button(label="Repeat: Off", style=discord.ButtonStyle.success) - async def repeat(self, interaction: discord.Interaction, button: discord.ui.Button): - current_mode = repeat_modes.get(self.guild_id, "off") - new_mode = "one" if current_mode == "off" else "all" if current_mode == "one" else "off" - repeat_modes[self.guild_id] = new_mode - button.label = f"Repeat: {'One' if new_mode == 'one' else 'All' if new_mode == 'all' else 'Off'}" - await interaction.response.edit_message(view=self) - - @discord.ui.button(label="Volume Up", style=discord.ButtonStyle.primary) - async def volume_up(self, interaction: discord.Interaction, button: discord.ui.Button): - if self.guild_id in voice_clients: - volume = volumes.get(self.guild_id, default_volume) - new_volume = min(volume + 0.1, 2.0) # Max volume 200% - volumes[self.guild_id] = new_volume - voice_clients[self.guild_id].source.volume = new_volume - await interaction.response.send_message(f"πŸ”Š **Volume increased to {int(new_volume * 100)}%.**", ephemeral=True) - else: - await interaction.response.send_message("⚠️ **No audio is playing.**", ephemeral=True) - - @discord.ui.button(label="Volume Down", style=discord.ButtonStyle.primary) - async def volume_down(self, interaction: discord.Interaction, button: discord.ui.Button): - if self.guild_id in voice_clients: - volume = volumes.get(self.guild_id, default_volume) - new_volume = max(volume - 0.1, 0.1) # Min volume 10% - volumes[self.guild_id] = new_volume - voice_clients[self.guild_id].source.volume = new_volume - await interaction.response.send_message(f"πŸ”‰ **Volume decreased to {int(new_volume * 100)}%.**", ephemeral=True) - else: - await interaction.response.send_message("⚠️ **No audio is playing.**", ephemeral=True) - - @discord.ui.button(label="Mute/Unmute", style=discord.ButtonStyle.danger) - async def mute(self, interaction: discord.Interaction, button: discord.ui.Button): - if self.guild_id in voice_clients: - muted = volumes.get(self.guild_id) == 0.0 - new_volume = default_volume if muted else 0.0 - volumes[self.guild_id] = new_volume - voice_clients[self.guild_id].source.volume = new_volume - await interaction.response.send_message( - "πŸ”‡ **Muted.**" if new_volume == 0.0 else "πŸ”ˆ **Unmuted.**", - ephemeral=True, - ) - else: - await interaction.response.send_message("⚠️ **No audio is playing.**", ephemeral=True) - - -# ---------- Recommendations ---------- -def fetch_related_songs(song_title, artist): - """ - Fetch related songs using the YouTube Data API v3. - Returns a list of recommendations in the format: - [ "Title - URL" ] - """ - query = f"{song_title} {artist} related songs" - url = "https://www.googleapis.com/youtube/v3/search" - params = { - "q": query, - "key": YOUTUBE_API_KEY, - "type": "video", - "part": "snippet", - "maxResults": 5 - } - - response = requests.get(url, params=params) - if response.status_code == 200: - data = response.json() - return [ - f'{item["snippet"]["title"]} - https://www.youtube.com/watch?v={item["id"]["videoId"]}' - for item in data.get("items", []) - ] - else: - print(f"Error fetching related songs: {response.status_code} {response.text}") - return [] - - -def generate_recommendations(user_id): - """ - Generate recommendations using the last song the user listened to. - """ - user_data = load_user_data() - user_history = user_data.get(str(user_id), {}).get("history", []) - - if not user_history: - return ["No recommendations yet. Start playing some songs!"] - - # Use the last played song to fetch recommendations - last_song = user_history[-1] - title = last_song["title"] - artist = last_song["artist"] - recommendations = fetch_related_songs(title, artist) - - if recommendations: - return recommendations - else: - return ["No recommendations could be fetched at this time."] diff --git a/commands.py b/commands.py index 8ef03c6..daaf9b7 100644 --- a/commands.py +++ b/commands.py @@ -9,38 +9,39 @@ from audio import ( pause_audio, resume_audio, repeat_modes, - generate_recommendations - ) + _247_mode, + generate_recommendations, +) -async def setup_commands(client, guild_id=None): +async def setup_commands(client: discord.Client, guild_id: int | None = None) -> None: @client.tree.command(name="join", description="Join the user's current voice channel.") - async def join(interaction: discord.Interaction): + async def join(interaction: discord.Interaction) -> None: await join_voice(interaction) @client.tree.command(name="leave", description="Leave the current voice channel.") - async def leave(interaction: discord.Interaction): + async def leave(interaction: discord.Interaction) -> None: await leave_voice(interaction) @client.tree.command(name="play", description="Play a song by title or artist.") - async def play(interaction: discord.Interaction, query: str): + async def play(interaction: discord.Interaction, query: str) -> None: await play_audio(interaction, query) @client.tree.command(name="stop", description="Stop playback and clear the queue.") - async def stop(interaction: discord.Interaction): + async def stop(interaction: discord.Interaction) -> None: await stop_audio(interaction) @client.tree.command(name="pause", description="Pause the current song.") - async def pause(interaction: discord.Interaction): + async def pause(interaction: discord.Interaction) -> None: await pause_audio(interaction) @client.tree.command(name="resume", description="Resume the paused song.") - async def resume(interaction: discord.Interaction): + async def resume(interaction: discord.Interaction) -> None: await resume_audio(interaction) @client.tree.command(name="repeat", description="Set the repeat mode for playback.") @app_commands.describe(mode="Repeat mode: 'one', 'all', or 'off'") - async def repeat(interaction: discord.Interaction, mode: str): + async def repeat(interaction: discord.Interaction, mode: str) -> None: guild_id = interaction.guild.id valid_modes = ["one", "all", "off"] @@ -52,11 +53,24 @@ async def setup_commands(client, guild_id=None): await interaction.response.send_message(f"πŸ”„ **Repeat mode set to:** {mode.capitalize()}.") @client.tree.command(name="volume", description="Set playback volume.") - async def volume(interaction: discord.Interaction, level: int): + async def volume(interaction: discord.Interaction, level: int) -> None: await set_volume(interaction, level) + @client.tree.command(name="247", description="Enable or disable 24/7 mode.") + @app_commands.describe(mode="on/off") + async def _247(interaction: discord.Interaction, mode: str) -> None: + guild_id = interaction.guild.id + if mode.lower() == "on": + _247_mode[guild_id] = True + await interaction.response.send_message("βœ… **24/7 mode enabled. Amber will stay in the voice channel even when idle.**") + elif mode.lower() == "off": + _247_mode[guild_id] = False + await interaction.response.send_message("βœ… **24/7 mode disabled. Amber will leave the voice channel when idle.**") + else: + await interaction.response.send_message("❌ **Invalid mode. Use 'on' or 'off'.**") + @client.tree.command(name="recommend", description="Get song recommendations based on your listening history.") - async def recommend(interaction: discord.Interaction): + async def recommend(interaction: discord.Interaction) -> None: user_id = interaction.user.id recommendations = generate_recommendations(user_id)