import discord import yt_dlp import asyncio import os import json import random import requests from dotenv import load_dotenv load_dotenv() voice_clients = {} # Track active voice connections music_queues = {} # Per-guild song queue current_tracks = {} # Currently playing tracks volumes = {} # Volume levels per guild repeat_modes = {} # Tracks repeat mode per guild: "one", "all", or "off" default_volume = 0.05 # Default volume (5%) USER_DATA_FILE = 'user_data.json' YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") # Replace with your API key # ---------- Utility Functions ---------- def load_user_data(): if not os.path.exists(USER_DATA_FILE): return {} with open(USER_DATA_FILE, 'r') as f: return json.load(f) def save_user_data(data): with open(USER_DATA_FILE, 'w') as f: json.dump(data, f, indent=4) def update_user_history(user_id, song_title, artist): user_data = load_user_data() if str(user_id) not in user_data: user_data[str(user_id)] = {"history": []} # Add the song to the user's history user_data[str(user_id)]["history"].append({"title": song_title, "artist": artist}) save_user_data(user_data) # ---------- Playback Functions ---------- async def join_voice(interaction: discord.Interaction): if interaction.user.voice is None or interaction.user.voice.channel is None: await interaction.response.send_message("You need to be in a voice channel for me to join.") return None channel = interaction.user.voice.channel guild_id = interaction.guild.id if guild_id not in voice_clients: voice_clients[guild_id] = await channel.connect() # Ensure the response is completed if interaction.response.is_done(): await interaction.followup.send(f"βœ… **Joined {channel.name}.**") else: await interaction.response.send_message(f"βœ… **Joined {channel.name}.**") return voice_clients[guild_id] async def play_audio(interaction: discord.Interaction, query: str): guild_id = interaction.guild.id # Ensure Amber is connected to a voice channel if guild_id not in voice_clients or not voice_clients[guild_id].is_connected(): await interaction.response.defer() # Defer the response early voice_client = await join_voice(interaction) if voice_client is None: # If join failed return else: voice_client = voice_clients[guild_id] # Defer response if not already done if not interaction.response.is_done(): await interaction.response.defer() # Search for the song on YouTube ydl_opts = { 'format': 'bestaudio/best', 'quiet': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = ydl.extract_info(f"ytsearch:{query}", download=False)['entries'][0] except Exception as e: await interaction.followup.send(f"Failed to find or play the requested audio. Error: {str(e)}") return song_url = info['url'] title = info.get('title', 'Unknown Title') artist = info.get('uploader', 'Unknown Artist') # Log the song in the user's history update_user_history(interaction.user.id, title, artist) # Add the song to the queue if guild_id not in music_queues: music_queues[guild_id] = [] music_queues[guild_id].append((song_url, title)) await interaction.followup.send(f"βœ… **Added to queue:** {title}") # If nothing is playing, start playback if not voice_client.is_playing(): await play_next(interaction) async def play_next(interaction: discord.Interaction): guild_id = interaction.guild.id if guild_id not in music_queues: music_queues[guild_id] = [] # Handle Repeat One if repeat_modes.get(guild_id) == "one" and current_tracks.get(guild_id): song_url, title = current_tracks[guild_id] music_queues[guild_id].insert(0, (song_url, title)) # Re-add the current song to the front of the queue # Handle Repeat All elif repeat_modes.get(guild_id) == "all" and not music_queues[guild_id]: # Move completed songs back to the queue song_url, title = current_tracks.get(guild_id, (None, None)) if song_url and title: music_queues[guild_id].append((song_url, title)) music_queues[guild_id].extend(current_tracks.values()) # Proceed to play the next song if not music_queues[guild_id]: await interaction.followup.send("❌ **No more songs in the queue.**") return voice_client = voice_clients[guild_id] song_url, title = music_queues[guild_id].pop(0) current_tracks[guild_id] = (song_url, title) # Prepare FFmpeg options ffmpeg_options = { 'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn', } try: source = discord.FFmpegPCMAudio(song_url, **ffmpeg_options) volume = volumes.get(guild_id, default_volume) source = discord.PCMVolumeTransformer(source, volume=volume) # Play the audio voice_client.play( source, after=lambda e: asyncio.run_coroutine_threadsafe( play_next(interaction), interaction.client.loop ), ) await interaction.followup.send(f"🎡 **Now playing:** {title}") except Exception as e: await interaction.followup.send(f"Failed to play the next song. Error: {str(e)}") async def stop_audio(interaction: discord.Interaction): guild_id = interaction.guild.id if guild_id in voice_clients: voice_client = voice_clients[guild_id] if voice_client.is_playing(): voice_client.stop() music_queues[guild_id] = [] # Clear the queue current_tracks.pop(guild_id, None) # Remove the current track await interaction.response.send_message("πŸ›‘ **Playback stopped and queue cleared.**") else: await interaction.response.send_message("❌ **No music is playing.**") async def pause_audio(interaction: discord.Interaction): """Pauses the currently playing song.""" guild_id = interaction.guild.id if guild_id in voice_clients and voice_clients[guild_id].is_playing(): voice_clients[guild_id].pause() # Pause playback await interaction.response.send_message("⏸️ **Music paused.**") else: await interaction.response.send_message("❌ **No music is currently playing.**") async def resume_audio(interaction: discord.Interaction): """Resumes playback of a paused song.""" guild_id = interaction.guild.id if guild_id in voice_clients and voice_clients[guild_id].is_paused(): voice_clients[guild_id].resume() # Resume playback await interaction.response.send_message("▢️ **Music resumed.**") else: await interaction.response.send_message("❌ **No paused music to resume.**") async def set_volume(interaction: discord.Interaction, level: int): guild_id = interaction.guild.id if level < 0 or level > 100: await interaction.response.send_message("❌ **Volume must be between 0 and 100.**") return # Set the volume volume = level / 100 volumes[guild_id] = volume # Adjust volume for the current source if playing if guild_id in voice_clients and voice_clients[guild_id].is_playing(): source = voice_clients[guild_id].source if isinstance(source, discord.PCMVolumeTransformer): source.volume = volume await interaction.response.send_message(f"πŸ”Š **Volume set to {level}%**.") async def leave_voice(interaction: discord.Interaction): guild_id = interaction.guild.id if guild_id in voice_clients: voice_client = voice_clients[guild_id] if voice_client.is_connected(): await voice_client.disconnect() voice_clients.pop(guild_id, None) await interaction.response.send_message("πŸ‘‹ **Left the voice channel.**") else: await interaction.response.send_message("❌ **I am not connected to any voice channel.**") else: await interaction.response.send_message("❌ **I am not connected to any voice channel.**") # ---------- Recommendations ---------- def fetch_related_songs(song_title, artist): """ Fetch related songs using the YouTube Data API v3. Returns a list of recommendations in the format: [ "Title - URL" ] """ query = f"{song_title} {artist} related songs" url = "https://www.googleapis.com/youtube/v3/search" params = { "q": query, "key": YOUTUBE_API_KEY, "type": "video", "part": "snippet", "maxResults": 5 } response = requests.get(url, params=params) if response.status_code == 200: data = response.json() return [ f'{item["snippet"]["title"]} - https://www.youtube.com/watch?v={item["id"]["videoId"]}' for item in data.get("items", []) ] else: print(f"Error fetching related songs: {response.status_code} {response.text}") return [] def generate_recommendations(user_id): """ Generate recommendations using the last song the user listened to. """ user_data = load_user_data() user_history = user_data.get(str(user_id), {}).get("history", []) if not user_history: return ["No recommendations yet. Start playing some songs!"] # Use the last played song to fetch recommendations last_song = user_history[-1] title = last_song["title"] artist = last_song["artist"] recommendations = fetch_related_songs(title, artist) if recommendations: return recommendations else: return ["No recommendations could be fetched at this time."]