Quartessa/audio.py

481 lines
19 KiB
Python

import discord
import yt_dlp
import asyncio
import os
import json
import random
import requests
import time
from dotenv import load_dotenv
load_dotenv()
voice_clients = {} # Track active voice connections
music_queues = {} # Per-guild song queue
current_tracks = {} # Currently playing tracks
volumes = {} # Volume levels per guild
repeat_modes = {} # Tracks repeat mode per guild: "one", "all", or "off"
now_playing_messages = {} # Tracks the "Now Playing" embed per guild
default_volume = 0.05 # Default volume (5%)
USER_DATA_FILE = 'user_data.json'
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") # Replace with your API key
# ---------- Utility Functions ----------
def load_user_data():
if not os.path.exists(USER_DATA_FILE):
return {}
with open(USER_DATA_FILE, 'r') as f:
return json.load(f)
def save_user_data(data):
with open(USER_DATA_FILE, 'w') as f:
json.dump(data, f, indent=4)
def update_user_history(user_id, song_title, artist):
user_data = load_user_data()
if str(user_id) not in user_data:
user_data[str(user_id)] = {"history": []}
# Add the song to the user's history
user_data[str(user_id)]["history"].append({"title": song_title, "artist": artist})
save_user_data(user_data)
# ---------- Playback Functions ----------
async def join_voice(interaction: discord.Interaction):
"""Joins the user's voice channel."""
guild_id = interaction.guild.id
channel = interaction.user.voice.channel
if guild_id in voice_clients and voice_clients[guild_id].channel == channel:
# Already in the same channel
return True
try:
voice_client = await channel.connect()
voice_clients[guild_id] = voice_client
return True
except Exception as e:
await interaction.followup.send(f"❌ **Error:** Could not join the voice channel. {str(e)}")
return False
async def play_audio(interaction: discord.Interaction, query: str):
guild_id = interaction.guild.id
# Defer the interaction if not already responded
if not interaction.response.is_done():
await interaction.response.defer()
# Ensure Amber is connected to a voice channel
if guild_id not in voice_clients or not voice_clients[guild_id].is_connected():
joined = await join_voice(interaction)
if not joined:
await interaction.followup.send("❌ **Error:** Could not join the voice channel.")
return
# Search for the song on YouTube
ydl_opts = {
'format': 'bestaudio/best',
'quiet': True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(f"ytsearch:{query}", download=False)['entries'][0]
except Exception as e:
await interaction.followup.send(f"❌ **Error:** Could not find or play the requested audio. {str(e)}")
return
song_url = info['url']
title = info.get('title', 'Unknown Title')
artist = info.get('uploader', 'Unknown Artist')
duration = info.get('duration', 0) # Duration in seconds
# Log the song in the user's history
update_user_history(interaction.user.id, title, artist)
# Add the song to the queue
if guild_id not in music_queues:
music_queues[guild_id] = []
music_queues[guild_id].append((song_url, title, duration))
# Start playback if nothing is playing
voice_client = voice_clients[guild_id]
if not voice_client.is_playing():
await play_next(interaction)
# Use the existing embed or create a new one
if guild_id in now_playing_messages and now_playing_messages[guild_id]:
try:
message = await interaction.channel.fetch_message(now_playing_messages[guild_id])
embed = discord.Embed(
title="🎵 Now Playing",
description=f"**{title}** by {artist}\n\n`00:00 / {time.strftime('%M:%S', time.gmtime(duration))}`",
color=discord.Color.blue()
)
view = PlaybackControls(interaction, guild_id)
await message.edit(embed=embed, view=view)
return
except discord.NotFound:
pass # If the original embed is deleted, send a new one
# Send the "Now Playing" embed
embed = discord.Embed(
title="🎵 Now Playing",
description=f"**{title}** by {artist}\n\n`00:00 / {time.strftime('%M:%S', time.gmtime(duration))}`",
color=discord.Color.blue()
)
view = PlaybackControls(interaction, guild_id)
message = await interaction.followup.send(embed=embed, view=view)
now_playing_messages[guild_id] = message.id # Track the message ID
# Start updating the progress bar
start_time = time.time()
asyncio.create_task(update_progress_bar(interaction, duration, start_time))
async def play_next(interaction: discord.Interaction):
guild_id = interaction.guild.id
if guild_id not in music_queues or not music_queues[guild_id]:
# If no songs are left in the queue, update the embed to reflect this
if guild_id in now_playing_messages and now_playing_messages[guild_id]:
try:
message = await interaction.channel.fetch_message(now_playing_messages[guild_id])
embed = discord.Embed(
title="🎵 Queue Finished",
description="Amber is now idle. Add more songs to the queue to keep the music going!",
color=discord.Color.red()
)
await message.edit(embed=embed, view=None) # Remove playback controls
except discord.NotFound:
# If the message was deleted, do nothing
pass
return
# Get the next song from the queue
song_url, title, duration = music_queues[guild_id].pop(0)
current_tracks[guild_id] = (song_url, title)
# Prepare FFmpeg options
ffmpeg_options = {
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn',
}
try:
voice_client = voice_clients[guild_id]
source = discord.FFmpegPCMAudio(song_url, **ffmpeg_options)
volume = volumes.get(guild_id, default_volume)
source = discord.PCMVolumeTransformer(source, volume=volume)
# Play the audio
voice_client.play(
source,
after=lambda e: asyncio.run_coroutine_threadsafe(
play_next(interaction), interaction.client.loop
),
)
# Update or create the "Now Playing" embed
embed = discord.Embed(
title="🎵 Now Playing",
description=f"**{title}**\n\n`00:00 / {time.strftime('%M:%S', time.gmtime(duration))}`",
color=discord.Color.green()
)
# Check if the "Now Playing" message already exists
if guild_id in now_playing_messages and now_playing_messages[guild_id]:
try:
message = await interaction.channel.fetch_message(now_playing_messages[guild_id])
await message.edit(embed=embed)
except discord.NotFound:
# If the message was deleted, send a new one
message = await interaction.followup.send(embed=embed)
now_playing_messages[guild_id] = message.id
else:
# Send a new "Now Playing" embed
message = await interaction.followup.send(embed=embed)
now_playing_messages[guild_id] = message.id
# Start updating the progress bar with the song duration
start_time = time.time()
asyncio.create_task(update_progress_bar(interaction, duration, start_time))
except Exception as e:
await interaction.followup.send(f"❌ **Error:** Could not play the next song. {str(e)}")
async def stop_audio(interaction: discord.Interaction):
guild_id = interaction.guild.id
if guild_id in voice_clients:
voice_client = voice_clients[guild_id]
if voice_client.is_playing():
voice_client.stop()
await interaction.response.send_message("⏹️ **Playback stopped.**")
else:
await interaction.response.send_message("⚠️ **Nothing is playing.**")
# Clear the queue but do not disconnect
music_queues[guild_id] = []
await interaction.followup.send("🗑️ **Queue cleared. Amber will remain idle in the voice channel.**")
else:
await interaction.response.send_message("⚠️ **Amber is not connected to a voice channel.**")
async def pause_audio(interaction: discord.Interaction):
"""Pauses the currently playing song."""
guild_id = interaction.guild.id
if guild_id in voice_clients and voice_clients[guild_id].is_playing():
voice_clients[guild_id].pause() # Pause playback
await interaction.response.send_message("⏸️ **Music paused.**")
else:
await interaction.response.send_message("❌ **No music is currently playing.**")
async def resume_audio(interaction: discord.Interaction):
"""Resumes playback of a paused song."""
guild_id = interaction.guild.id
if guild_id in voice_clients and voice_clients[guild_id].is_paused():
voice_clients[guild_id].resume() # Resume playback
await interaction.response.send_message("▶️ **Music resumed.**")
else:
await interaction.response.send_message("❌ **No paused music to resume.**")
async def set_volume(interaction: discord.Interaction, level: int):
guild_id = interaction.guild.id
if level < 0 or level > 100:
await interaction.response.send_message("❌ **Volume must be between 0 and 100.**")
return
# Set the volume
volume = level / 100
volumes[guild_id] = volume
# Adjust volume for the current source if playing
if guild_id in voice_clients and voice_clients[guild_id].is_playing():
source = voice_clients[guild_id].source
if isinstance(source, discord.PCMVolumeTransformer):
source.volume = volume
await interaction.response.send_message(f"🔊 **Volume set to {level}%**.")
async def leave_voice(interaction: discord.Interaction):
guild_id = interaction.guild.id
if guild_id in voice_clients:
voice_client = voice_clients[guild_id]
if voice_client.is_connected():
await voice_client.disconnect()
voice_clients.pop(guild_id, None)
await interaction.response.send_message("👋 **Left the voice channel.**")
else:
await interaction.response.send_message("❌ **I am not connected to any voice channel.**")
else:
await interaction.response.send_message("❌ **I am not connected to any voice channel.**")
# ---------- Discord UI ----------
class PlaybackControls(discord.ui.View):
def __init__(self, interaction, guild_id):
super().__init__(timeout=None) # Persistent buttons
self.interaction = interaction
self.guild_id = guild_id
@discord.ui.button(label="⏸️ Pause", style=discord.ButtonStyle.primary)
async def pause_button(self, interaction: discord.Interaction, button: discord.ui.Button):
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
voice_clients[self.guild_id].pause()
button.label = "▶️ Resume"
await interaction.response.edit_message(content="⏸️ **Paused playback.**", view=self)
elif self.guild_id in voice_clients and voice_clients[self.guild_id].is_paused():
voice_clients[self.guild_id].resume()
button.label = "⏸️ Pause"
await interaction.response.edit_message(content="▶️ **Resumed playback.**", view=self)
@discord.ui.button(label="⏭️ Skip", style=discord.ButtonStyle.secondary)
async def skip_button(self, interaction: discord.Interaction, button: discord.ui.Button):
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
voice_clients[self.guild_id].stop() # Stop triggers `play_next`
await interaction.response.send_message("⏭️ **Skipped to the next song.**", ephemeral=True)
else:
await interaction.response.send_message("❌ **No song is currently playing to skip.**", ephemeral=True)
@discord.ui.button(label="🛑 Stop", style=discord.ButtonStyle.danger)
async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button):
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
voice_clients[self.guild_id].stop()
music_queues[self.guild_id] = [] # Clear the queue
await interaction.response.send_message("🛑 **Stopped playback and cleared the queue.**", ephemeral=True)
else:
await interaction.response.send_message("❌ **No song is currently playing to stop.**", ephemeral=True)
@discord.ui.button(label="🔄 Repeat: Off", style=discord.ButtonStyle.success)
async def repeat_button(self, interaction: discord.Interaction, button: discord.ui.Button):
current_mode = repeat_modes.get(self.guild_id, "off")
new_mode = "one" if current_mode == "off" else "all" if current_mode == "one" else "off"
repeat_modes[self.guild_id] = new_mode
button.label = f"🔄 Repeat: {new_mode.capitalize()}"
await interaction.response.edit_message(content=f"🔄 **Repeat mode set to:** {new_mode.capitalize()}", view=self)
@discord.ui.button(label="🔈 Volume Down", style=discord.ButtonStyle.secondary)
async def volume_down(self, interaction: discord.Interaction, button: discord.ui.Button):
current_volume = volumes.get(self.guild_id, default_volume)
new_volume = max(0.0, current_volume - 0.1) # Decrease volume by 10%
volumes[self.guild_id] = new_volume
# Apply volume to the current source
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
source = voice_clients[self.guild_id].source
if isinstance(source, discord.PCMVolumeTransformer):
source.volume = new_volume
await interaction.response.send_message(f"🔈 **Volume decreased to {int(new_volume * 100)}%.**", ephemeral=True)
@discord.ui.button(label="🔊 Volume Up", style=discord.ButtonStyle.primary)
async def volume_up(self, interaction: discord.Interaction, button: discord.ui.Button):
current_volume = volumes.get(self.guild_id, default_volume)
new_volume = min(1.0, current_volume + 0.1) # Increase volume by 10%
volumes[self.guild_id] = new_volume
# Apply volume to the current source
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
source = voice_clients[self.guild_id].source
if isinstance(source, discord.PCMVolumeTransformer):
source.volume = new_volume
await interaction.response.send_message(f"🔊 **Volume increased to {int(new_volume * 100)}%.**", ephemeral=True)
@discord.ui.button(label="🔇 Mute/Unmute", style=discord.ButtonStyle.danger)
async def mute_toggle(self, interaction: discord.Interaction, button: discord.ui.Button):
current_volume = volumes.get(self.guild_id, default_volume)
# Toggle mute
if current_volume > 0:
volumes[self.guild_id] = 0.0
button.label = "🔊 Unmute"
else:
volumes[self.guild_id] = default_volume
button.label = "🔇 Mute"
# Apply volume to the current source
if self.guild_id in voice_clients and voice_clients[self.guild_id].is_playing():
source = voice_clients[self.guild_id].source
if isinstance(source, discord.PCMVolumeTransformer):
source.volume = volumes[self.guild_id]
status = "muted" if volumes[self.guild_id] == 0.0 else "unmuted"
await interaction.response.edit_message(content=f"🔇 **Volume {status}.**", view=self)
async def update_progress_bar(interaction: discord.Interaction, duration: int, start_time: float):
"""Updates the progress bar in the 'Now Playing' embed."""
guild_id = interaction.guild.id
while guild_id in voice_clients and voice_clients[guild_id].is_playing():
elapsed_time = time.time() - start_time
# Calculate progress
progress = min(elapsed_time / duration, 1.0)
progress_blocks = int(progress * 20) # 20 blocks for the bar
progress_bar = "" * progress_blocks + "🔘" + "" * (20 - progress_blocks)
elapsed_time_str = time.strftime("%M:%S", time.gmtime(elapsed_time))
duration_str = time.strftime("%M:%S", time.gmtime(duration))
embed = discord.Embed(
title="🎵 Now Playing",
description=f"**{current_tracks[guild_id][1]}**\n\n{progress_bar} `{elapsed_time_str} / {duration_str}`",
color=discord.Color.green()
)
# Update the original "Now Playing" message
try:
message_id = now_playing_messages.get(guild_id)
if message_id:
message = await interaction.channel.fetch_message(message_id)
await message.edit(embed=embed)
else:
break
except discord.NotFound:
break # Stop if the message is deleted or invalid
await asyncio.sleep(2) # Update every 2 seconds
# When playback ends, clear the progress bar
if not voice_clients[guild_id].is_playing():
try:
message_id = now_playing_messages.pop(guild_id, None)
if message_id:
message = await interaction.channel.fetch_message(message_id)
embed = discord.Embed(
title="🎵 Queue Finished",
description="Amber is now idle. Add more songs to the queue to keep the music going!",
color=discord.Color.red()
)
await message.edit(embed=embed, view=None) # Remove playback controls
except discord.NotFound:
pass
# ---------- Recommendations ----------
def fetch_related_songs(song_title, artist):
"""
Fetch related songs using the YouTube Data API v3.
Returns a list of recommendations in the format:
[ "Title - URL" ]
"""
query = f"{song_title} {artist} related songs"
url = "https://www.googleapis.com/youtube/v3/search"
params = {
"q": query,
"key": YOUTUBE_API_KEY,
"type": "video",
"part": "snippet",
"maxResults": 5
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
return [
f'{item["snippet"]["title"]} - https://www.youtube.com/watch?v={item["id"]["videoId"]}'
for item in data.get("items", [])
]
else:
print(f"Error fetching related songs: {response.status_code} {response.text}")
return []
def generate_recommendations(user_id):
"""
Generate recommendations using the last song the user listened to.
"""
user_data = load_user_data()
user_history = user_data.get(str(user_id), {}).get("history", [])
if not user_history:
return ["No recommendations yet. Start playing some songs!"]
# Use the last played song to fetch recommendations
last_song = user_history[-1]
title = last_song["title"]
artist = last_song["artist"]
recommendations = fetch_related_songs(title, artist)
if recommendations:
return recommendations
else:
return ["No recommendations could be fetched at this time."]