2025-01-18 21:45:25 -05:00
|
|
|
import discord
|
|
|
|
import yt_dlp
|
2025-01-19 22:29:16 -05:00
|
|
|
import asyncio
|
2025-01-19 23:19:16 -05:00
|
|
|
import os
|
|
|
|
import json
|
|
|
|
import random
|
|
|
|
import requests
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
|
|
load_dotenv()
|
2025-01-18 21:45:25 -05:00
|
|
|
|
|
|
|
voice_clients = {} # Track active voice connections
|
2025-01-19 22:29:16 -05:00
|
|
|
music_queues = {} # Per-guild song queue
|
|
|
|
current_tracks = {} # Currently playing tracks
|
|
|
|
volumes = {} # Volume levels per guild
|
2025-01-20 10:35:28 -05:00
|
|
|
repeat_modes = {} # Tracks repeat mode per guild: "one", "all", or "off"
|
2025-01-19 23:19:16 -05:00
|
|
|
default_volume = 0.05 # Default volume (5%)
|
|
|
|
USER_DATA_FILE = 'user_data.json'
|
|
|
|
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") # Replace with your API key
|
2025-01-18 21:45:25 -05:00
|
|
|
|
|
|
|
|
2025-01-19 23:19:16 -05:00
|
|
|
# ---------- Utility Functions ----------
|
|
|
|
def load_user_data():
|
|
|
|
if not os.path.exists(USER_DATA_FILE):
|
|
|
|
return {}
|
|
|
|
with open(USER_DATA_FILE, 'r') as f:
|
|
|
|
return json.load(f)
|
|
|
|
|
|
|
|
|
|
|
|
def save_user_data(data):
|
|
|
|
with open(USER_DATA_FILE, 'w') as f:
|
|
|
|
json.dump(data, f, indent=4)
|
|
|
|
|
|
|
|
|
|
|
|
def update_user_history(user_id, song_title, artist):
|
|
|
|
user_data = load_user_data()
|
|
|
|
if str(user_id) not in user_data:
|
|
|
|
user_data[str(user_id)] = {"history": []}
|
|
|
|
|
|
|
|
# Add the song to the user's history
|
|
|
|
user_data[str(user_id)]["history"].append({"title": song_title, "artist": artist})
|
|
|
|
save_user_data(user_data)
|
|
|
|
|
|
|
|
|
|
|
|
# ---------- Playback Functions ----------
|
|
|
|
async def join_voice(interaction: discord.Interaction):
|
|
|
|
if interaction.user.voice is None or interaction.user.voice.channel is None:
|
|
|
|
await interaction.response.send_message("You need to be in a voice channel for me to join.")
|
|
|
|
return None
|
|
|
|
|
|
|
|
channel = interaction.user.voice.channel
|
2025-01-18 21:45:25 -05:00
|
|
|
guild_id = interaction.guild.id
|
2025-01-19 22:29:16 -05:00
|
|
|
|
2025-01-18 21:45:25 -05:00
|
|
|
if guild_id not in voice_clients:
|
2025-01-19 23:19:16 -05:00
|
|
|
voice_clients[guild_id] = await channel.connect()
|
2025-01-20 10:35:28 -05:00
|
|
|
|
|
|
|
# Ensure the response is completed
|
|
|
|
if interaction.response.is_done():
|
|
|
|
await interaction.followup.send(f"✅ **Joined {channel.name}.**")
|
|
|
|
else:
|
|
|
|
await interaction.response.send_message(f"✅ **Joined {channel.name}.**")
|
2025-01-19 23:19:16 -05:00
|
|
|
return voice_clients[guild_id]
|
2025-01-18 21:45:25 -05:00
|
|
|
|
2025-01-19 22:29:16 -05:00
|
|
|
|
2025-01-19 23:19:16 -05:00
|
|
|
async def play_audio(interaction: discord.Interaction, query: str):
|
|
|
|
guild_id = interaction.guild.id
|
|
|
|
|
2025-01-22 20:20:11 -05:00
|
|
|
# Defer the response if not already done
|
|
|
|
if not interaction.response.is_done():
|
|
|
|
await interaction.response.defer()
|
|
|
|
|
2025-01-19 23:19:16 -05:00
|
|
|
# Ensure Amber is connected to a voice channel
|
|
|
|
if guild_id not in voice_clients or not voice_clients[guild_id].is_connected():
|
|
|
|
voice_client = await join_voice(interaction)
|
|
|
|
if voice_client is None: # If join failed
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
voice_client = voice_clients[guild_id]
|
2025-01-18 21:45:25 -05:00
|
|
|
|
2025-01-19 22:29:16 -05:00
|
|
|
# Search for the song on YouTube
|
2025-01-18 21:45:25 -05:00
|
|
|
ydl_opts = {
|
|
|
|
'format': 'bestaudio/best',
|
|
|
|
'quiet': True,
|
|
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
|
|
try:
|
|
|
|
info = ydl.extract_info(f"ytsearch:{query}", download=False)['entries'][0]
|
|
|
|
except Exception as e:
|
2025-01-22 20:20:11 -05:00
|
|
|
await interaction.followup.send(f"❌ **Error:** Could not find or play the requested audio. {str(e)}")
|
2025-01-18 21:45:25 -05:00
|
|
|
return
|
|
|
|
|
2025-01-19 22:29:16 -05:00
|
|
|
song_url = info['url']
|
2025-01-18 21:45:25 -05:00
|
|
|
title = info.get('title', 'Unknown Title')
|
2025-01-19 23:19:16 -05:00
|
|
|
artist = info.get('uploader', 'Unknown Artist')
|
|
|
|
|
|
|
|
# Log the song in the user's history
|
|
|
|
update_user_history(interaction.user.id, title, artist)
|
2025-01-18 21:45:25 -05:00
|
|
|
|
2025-01-19 22:29:16 -05:00
|
|
|
# Add the song to the queue
|
|
|
|
if guild_id not in music_queues:
|
|
|
|
music_queues[guild_id] = []
|
|
|
|
|
|
|
|
music_queues[guild_id].append((song_url, title))
|
|
|
|
await interaction.followup.send(f"✅ **Added to queue:** {title}")
|
|
|
|
|
|
|
|
# If nothing is playing, start playback
|
|
|
|
if not voice_client.is_playing():
|
|
|
|
await play_next(interaction)
|
|
|
|
|
2025-01-22 20:20:11 -05:00
|
|
|
# Send "Now Playing" embed with controls
|
|
|
|
embed = discord.Embed(
|
|
|
|
title="🎵 Now Playing",
|
|
|
|
description=f"**{title}** by {artist}",
|
|
|
|
color=discord.Color.blue()
|
|
|
|
)
|
|
|
|
view = PlaybackControls(interaction, guild_id)
|
|
|
|
await interaction.followup.send(embed=embed, view=view)
|
|
|
|
|
2025-01-19 22:29:16 -05:00
|
|
|
|
|
|
|
async def play_next(interaction: discord.Interaction):
|
|
|
|
guild_id = interaction.guild.id
|
|
|
|
|
2025-01-20 10:35:28 -05:00
|
|
|
if guild_id not in music_queues:
|
|
|
|
music_queues[guild_id] = []
|
|
|
|
|
|
|
|
# Handle Repeat One
|
|
|
|
if repeat_modes.get(guild_id) == "one" and current_tracks.get(guild_id):
|
|
|
|
song_url, title = current_tracks[guild_id]
|
|
|
|
music_queues[guild_id].insert(0, (song_url, title)) # Re-add the current song to the front of the queue
|
2025-01-22 20:34:07 -05:00
|
|
|
# Skip sending an alert for the repeated song
|
|
|
|
voice_client = voice_clients[guild_id]
|
2025-01-20 10:35:28 -05:00
|
|
|
elif repeat_modes.get(guild_id) == "all" and not music_queues[guild_id]:
|
2025-01-22 20:34:07 -05:00
|
|
|
# Recycle the current track into the queue
|
|
|
|
if current_tracks.get(guild_id):
|
|
|
|
music_queues[guild_id].append(current_tracks[guild_id])
|
2025-01-20 10:35:28 -05:00
|
|
|
|
2025-01-22 20:34:07 -05:00
|
|
|
# If no songs are left in the queue
|
2025-01-20 10:35:28 -05:00
|
|
|
if not music_queues[guild_id]:
|
|
|
|
await interaction.followup.send("❌ **No more songs in the queue.**")
|
2025-01-19 22:29:16 -05:00
|
|
|
return
|
|
|
|
|
2025-01-22 20:34:07 -05:00
|
|
|
# Get the next song from the queue
|
2025-01-19 22:29:16 -05:00
|
|
|
song_url, title = music_queues[guild_id].pop(0)
|
2025-01-20 10:35:28 -05:00
|
|
|
current_tracks[guild_id] = (song_url, title)
|
2025-01-19 22:29:16 -05:00
|
|
|
|
|
|
|
# Prepare FFmpeg options
|
2025-01-18 21:45:25 -05:00
|
|
|
ffmpeg_options = {
|
2025-01-19 22:29:16 -05:00
|
|
|
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
|
2025-01-18 21:45:25 -05:00
|
|
|
'options': '-vn',
|
|
|
|
}
|
2025-01-19 22:29:16 -05:00
|
|
|
|
|
|
|
try:
|
2025-01-22 20:34:07 -05:00
|
|
|
voice_client = voice_clients[guild_id]
|
2025-01-19 22:29:16 -05:00
|
|
|
source = discord.FFmpegPCMAudio(song_url, **ffmpeg_options)
|
|
|
|
volume = volumes.get(guild_id, default_volume)
|
|
|
|
source = discord.PCMVolumeTransformer(source, volume=volume)
|
|
|
|
|
|
|
|
# Play the audio
|
|
|
|
voice_client.play(
|
|
|
|
source,
|
|
|
|
after=lambda e: asyncio.run_coroutine_threadsafe(
|
|
|
|
play_next(interaction), interaction.client.loop
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
2025-01-22 20:34:07 -05:00
|
|
|
# Only alert if the song is new
|
|
|
|
if repeat_modes.get(guild_id) != "one":
|
|
|
|
embed = discord.Embed(
|
|
|
|
title="🎵 Now Playing",
|
|
|
|
description=f"**{title}**",
|
|
|
|
color=discord.Color.green()
|
|
|
|
)
|
|
|
|
await interaction.followup.send(embed=embed)
|
2025-01-19 22:29:16 -05:00
|
|
|
except Exception as e:
|
2025-01-22 20:34:07 -05:00
|
|
|
await interaction.followup.send(f"❌ **Error:** Failed to play the next song. {str(e)}")
|
2025-01-19 22:29:16 -05:00
|
|
|
|
|
|
|
|
|
|
|
async def stop_audio(interaction: discord.Interaction):
|
|
|
|
guild_id = interaction.guild.id
|
|
|
|
|
|
|
|
if guild_id in voice_clients:
|
|
|
|
voice_client = voice_clients[guild_id]
|
|
|
|
if voice_client.is_playing():
|
|
|
|
voice_client.stop()
|
|
|
|
music_queues[guild_id] = [] # Clear the queue
|
|
|
|
current_tracks.pop(guild_id, None) # Remove the current track
|
|
|
|
await interaction.response.send_message("🛑 **Playback stopped and queue cleared.**")
|
|
|
|
else:
|
|
|
|
await interaction.response.send_message("❌ **No music is playing.**")
|
|
|
|
|
|
|
|
|
2025-01-19 23:19:16 -05:00
|
|
|
async def pause_audio(interaction: discord.Interaction):
|
|
|
|
"""Pauses the currently playing song."""
|
|
|
|
guild_id = interaction.guild.id
|
|
|
|
|
|
|
|
if guild_id in voice_clients and voice_clients[guild_id].is_playing():
|
|
|
|
voice_clients[guild_id].pause() # Pause playback
|
|
|
|
await interaction.response.send_message("⏸️ **Music paused.**")
|
|
|
|
else:
|
|
|
|
await interaction.response.send_message("❌ **No music is currently playing.**")
|
|
|
|
|
|
|
|
|
|
|
|
async def resume_audio(interaction: discord.Interaction):
|
|
|
|
"""Resumes playback of a paused song."""
|
|
|
|
guild_id = interaction.guild.id
|
|
|
|
|
|
|
|
if guild_id in voice_clients and voice_clients[guild_id].is_paused():
|
|
|
|
voice_clients[guild_id].resume() # Resume playback
|
|
|
|
await interaction.response.send_message("▶️ **Music resumed.**")
|
|
|
|
else:
|
|
|
|
await interaction.response.send_message("❌ **No paused music to resume.**")
|
|
|
|
|
|
|
|
|
2025-01-19 22:29:16 -05:00
|
|
|
async def set_volume(interaction: discord.Interaction, level: int):
|
|
|
|
guild_id = interaction.guild.id
|
|
|
|
|
|
|
|
if level < 0 or level > 100:
|
|
|
|
await interaction.response.send_message("❌ **Volume must be between 0 and 100.**")
|
|
|
|
return
|
|
|
|
|
|
|
|
# Set the volume
|
|
|
|
volume = level / 100
|
|
|
|
volumes[guild_id] = volume
|
|
|
|
|
|
|
|
# Adjust volume for the current source if playing
|
|
|
|
if guild_id in voice_clients and voice_clients[guild_id].is_playing():
|
|
|
|
source = voice_clients[guild_id].source
|
|
|
|
if isinstance(source, discord.PCMVolumeTransformer):
|
|
|
|
source.volume = volume
|
|
|
|
|
|
|
|
await interaction.response.send_message(f"🔊 **Volume set to {level}%**.")
|
|
|
|
|
|
|
|
|
|
|
|
async def leave_voice(interaction: discord.Interaction):
|
|
|
|
guild_id = interaction.guild.id
|
|
|
|
|
|
|
|
if guild_id in voice_clients:
|
|
|
|
voice_client = voice_clients[guild_id]
|
|
|
|
if voice_client.is_connected():
|
|
|
|
await voice_client.disconnect()
|
|
|
|
voice_clients.pop(guild_id, None)
|
|
|
|
await interaction.response.send_message("👋 **Left the voice channel.**")
|
|
|
|
else:
|
|
|
|
await interaction.response.send_message("❌ **I am not connected to any voice channel.**")
|
|
|
|
else:
|
|
|
|
await interaction.response.send_message("❌ **I am not connected to any voice channel.**")
|
2025-01-19 23:19:16 -05:00
|
|
|
|
|
|
|
|
2025-01-22 20:20:11 -05:00
|
|
|
# ---------- Discord UI ----------
|
|
|
|
|
|
|
|
|
|
|
|
class PlaybackControls(discord.ui.View):
|
|
|
|
def __init__(self, interaction, guild_id):
|
|
|
|
super().__init__(timeout=None) # Persistent buttons
|
|
|
|
self.interaction = interaction
|
|
|
|
self.guild_id = guild_id
|
|
|
|
|
|
|
|
@discord.ui.button(label="⏸️ Pause", style=discord.ButtonStyle.primary)
|
|
|
|
async def pause_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
|
|
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
|
|
|
|
voice_clients[self.guild_id].pause()
|
|
|
|
button.label = "▶️ Resume"
|
|
|
|
await interaction.response.edit_message(content="⏸️ **Paused playback.**", view=self)
|
|
|
|
elif self.guild_id in voice_clients and voice_clients[self.guild_id].is_paused():
|
|
|
|
voice_clients[self.guild_id].resume()
|
|
|
|
button.label = "⏸️ Pause"
|
|
|
|
await interaction.response.edit_message(content="▶️ **Resumed playback.**", view=self)
|
|
|
|
|
|
|
|
@discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.secondary)
|
|
|
|
async def skip_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
|
|
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
|
|
|
|
voice_clients[self.guild_id].stop() # Stop triggers `play_next`
|
|
|
|
await interaction.response.send_message("⏭️ **Skipped to the next song.**", ephemeral=True)
|
|
|
|
else:
|
|
|
|
await interaction.response.send_message("❌ **No song is currently playing to skip.**", ephemeral=True)
|
|
|
|
|
|
|
|
@discord.ui.button(label="🛑 Stop", style=discord.ButtonStyle.danger)
|
|
|
|
async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
|
|
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
|
|
|
|
voice_clients[self.guild_id].stop()
|
|
|
|
music_queues[self.guild_id] = [] # Clear the queue
|
|
|
|
await interaction.response.send_message("🛑 **Stopped playback and cleared the queue.**", ephemeral=True)
|
|
|
|
else:
|
|
|
|
await interaction.response.send_message("❌ **No song is currently playing to stop.**", ephemeral=True)
|
|
|
|
|
|
|
|
@discord.ui.button(label="🔄 Repeat: Off", style=discord.ButtonStyle.success)
|
|
|
|
async def repeat_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
|
|
current_mode = repeat_modes.get(self.guild_id, "off")
|
|
|
|
new_mode = "one" if current_mode == "off" else "all" if current_mode == "one" else "off"
|
|
|
|
repeat_modes[self.guild_id] = new_mode
|
|
|
|
button.label = f"🔄 Repeat: {new_mode.capitalize()}"
|
|
|
|
await interaction.response.edit_message(content=f"🔄 **Repeat mode set to:** {new_mode.capitalize()}", view=self)
|
|
|
|
|
|
|
|
@discord.ui.button(label="🔈 Volume Down", style=discord.ButtonStyle.secondary)
|
|
|
|
async def volume_down(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
|
|
current_volume = volumes.get(self.guild_id, default_volume)
|
|
|
|
new_volume = max(0.0, current_volume - 0.1) # Decrease volume by 10%
|
|
|
|
volumes[self.guild_id] = new_volume
|
|
|
|
|
|
|
|
# Apply volume to the current source
|
|
|
|
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
|
|
|
|
source = voice_clients[self.guild_id].source
|
|
|
|
if isinstance(source, discord.PCMVolumeTransformer):
|
|
|
|
source.volume = new_volume
|
|
|
|
|
|
|
|
await interaction.response.send_message(f"🔈 **Volume decreased to {int(new_volume * 100)}%.**", ephemeral=True)
|
|
|
|
|
|
|
|
@discord.ui.button(label="🔊 Volume Up", style=discord.ButtonStyle.primary)
|
|
|
|
async def volume_up(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
|
|
current_volume = volumes.get(self.guild_id, default_volume)
|
|
|
|
new_volume = min(1.0, current_volume + 0.1) # Increase volume by 10%
|
|
|
|
volumes[self.guild_id] = new_volume
|
|
|
|
|
|
|
|
# Apply volume to the current source
|
|
|
|
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
|
|
|
|
source = voice_clients[self.guild_id].source
|
|
|
|
if isinstance(source, discord.PCMVolumeTransformer):
|
|
|
|
source.volume = new_volume
|
|
|
|
|
|
|
|
await interaction.response.send_message(f"🔊 **Volume increased to {int(new_volume * 100)}%.**", ephemeral=True)
|
|
|
|
|
|
|
|
@discord.ui.button(label="🔇 Mute/Unmute", style=discord.ButtonStyle.danger)
|
|
|
|
async def mute_toggle(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
|
|
current_volume = volumes.get(self.guild_id, default_volume)
|
|
|
|
|
|
|
|
# Toggle mute
|
|
|
|
if current_volume > 0:
|
|
|
|
volumes[self.guild_id] = 0.0
|
|
|
|
button.label = "🔊 Unmute"
|
|
|
|
else:
|
|
|
|
volumes[self.guild_id] = default_volume
|
|
|
|
button.label = "🔇 Mute"
|
|
|
|
|
|
|
|
# Apply volume to the current source
|
|
|
|
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
|
|
|
|
source = voice_clients[self.guild_id].source
|
|
|
|
if isinstance(source, discord.PCMVolumeTransformer):
|
|
|
|
source.volume = volumes[self.guild_id]
|
|
|
|
|
|
|
|
status = "muted" if volumes[self.guild_id] == 0.0 else "unmuted"
|
|
|
|
await interaction.response.edit_message(content=f"🔇 **Volume {status}.**", view=self)
|
|
|
|
|
|
|
|
|
2025-01-19 23:19:16 -05:00
|
|
|
# ---------- Recommendations ----------
|
|
|
|
def fetch_related_songs(song_title, artist):
|
|
|
|
"""
|
|
|
|
Fetch related songs using the YouTube Data API v3.
|
|
|
|
Returns a list of recommendations in the format:
|
|
|
|
[ "Title - URL" ]
|
|
|
|
"""
|
|
|
|
query = f"{song_title} {artist} related songs"
|
|
|
|
url = "https://www.googleapis.com/youtube/v3/search"
|
|
|
|
params = {
|
|
|
|
"q": query,
|
|
|
|
"key": YOUTUBE_API_KEY,
|
|
|
|
"type": "video",
|
|
|
|
"part": "snippet",
|
|
|
|
"maxResults": 5
|
|
|
|
}
|
|
|
|
|
|
|
|
response = requests.get(url, params=params)
|
|
|
|
if response.status_code == 200:
|
|
|
|
data = response.json()
|
|
|
|
return [
|
|
|
|
f'{item["snippet"]["title"]} - https://www.youtube.com/watch?v={item["id"]["videoId"]}'
|
|
|
|
for item in data.get("items", [])
|
|
|
|
]
|
|
|
|
else:
|
|
|
|
print(f"Error fetching related songs: {response.status_code} {response.text}")
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def generate_recommendations(user_id):
|
|
|
|
"""
|
|
|
|
Generate recommendations using the last song the user listened to.
|
|
|
|
"""
|
|
|
|
user_data = load_user_data()
|
|
|
|
user_history = user_data.get(str(user_id), {}).get("history", [])
|
|
|
|
|
|
|
|
if not user_history:
|
|
|
|
return ["No recommendations yet. Start playing some songs!"]
|
|
|
|
|
|
|
|
# Use the last played song to fetch recommendations
|
|
|
|
last_song = user_history[-1]
|
|
|
|
title = last_song["title"]
|
|
|
|
artist = last_song["artist"]
|
|
|
|
recommendations = fetch_related_songs(title, artist)
|
|
|
|
|
|
|
|
if recommendations:
|
|
|
|
return recommendations
|
|
|
|
else:
|
|
|
|
return ["No recommendations could be fetched at this time."]
|