import discord import yt_dlp import asyncio import os import json import requests import time from dotenv import load_dotenv from typing import List, Dict, Any load_dotenv() voice_clients: Dict[int, discord.VoiceClient] = {} music_queues: Dict[int, List[Any]] = {} current_tracks: Dict[int, Any] = {} volumes: Dict[int, float] = {} repeat_modes: Dict[int, str] = {} now_playing_messages: Dict[int, int] = {} progress_tasks: Dict[int, asyncio.Task] = {} _247_mode: Dict[int, bool] = {} default_volume: float = 0.05 USER_DATA_FILE: str = 'user_data.json' YOUTUBE_API_KEY: str | None = os.getenv("YOUTUBE_API_KEY") # ---------- Utility Functions ---------- def load_user_data() -> Dict[str, Any]: if not os.path.exists(USER_DATA_FILE): return {} with open(USER_DATA_FILE, 'r') as f: return json.load(f) def save_user_data(data: Dict[str, Any]) -> None: with open(USER_DATA_FILE, 'w') as f: json.dump(data, f, indent=4) def update_user_history(user_id: int, song_title: str, artist: str) -> None: user_data = load_user_data() if str(user_id) not in user_data: user_data[str(user_id)] = {"history": []} user_data[str(user_id)]["history"].append({"title": song_title, "artist": artist}) save_user_data(user_data) # ---------- Playback Functions ---------- async def join_voice(interaction: discord.Interaction) -> bool: guild_id = interaction.guild.id channel = interaction.user.voice.channel if guild_id in voice_clients and voice_clients[guild_id].channel == channel: return True try: voice_client = await channel.connect() voice_clients[guild_id] = voice_client return True except Exception as e: await interaction.followup.send(f"❌ **Error:** Could not join the voice channel. {str(e)}") return False async def play_audio(interaction: discord.Interaction, query: str) -> None: guild_id = interaction.guild.id if not interaction.response.is_done(): await interaction.response.defer() if guild_id not in voice_clients or not voice_clients[guild_id].is_connected(): joined = await join_voice(interaction) if not joined: await interaction.followup.send("❌ **Error:** Could not join the voice channel.") return asyncio.create_task(fetch_and_queue_song(interaction, query)) async def fetch_and_queue_song(interaction: discord.Interaction, query: str) -> None: guild_id = interaction.guild.id ydl_opts = { 'format': 'bestaudio/best', 'quiet': True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(f"ytsearch:{query}", download=False)['entries'][0] except Exception as e: await interaction.followup.send(f"❌ **Error:** Could not find or play the requested audio. {str(e)}") return song_url = info['url'] title = info.get('title', 'Unknown Title') artist = info.get('uploader', 'Unknown Artist') duration = info.get('duration', 0) update_user_history(interaction.user.id, title, artist) if guild_id not in music_queues: music_queues[guild_id] = [] music_queues[guild_id].append((song_url, title, duration)) await interaction.followup.send(f"✅ **Added to Queue:** {title}") if not voice_clients[guild_id].is_playing(): await play_next(interaction) async def play_next(interaction: discord.Interaction) -> None: guild_id = interaction.guild.id if guild_id not in music_queues or not music_queues[guild_id]: if _247_mode.get(guild_id, True): if guild_id in now_playing_messages and now_playing_messages[guild_id]: try: message = await interaction.channel.fetch_message(now_playing_messages[guild_id]) embed = discord.Embed( title="🎵 Queue Empty", description="Amber is in 24/7 mode. Add more songs to the queue to resume playback.", color=discord.Color.orange() ) await message.edit(embed=embed, view=None) except discord.NotFound: pass return else: if guild_id in voice_clients and voice_clients[guild_id].is_connected(): await voice_clients[guild_id].disconnect() del voice_clients[guild_id] return song_url, title, duration = music_queues[guild_id].pop(0) current_tracks[guild_id] = (song_url, title) ffmpeg_options = { 'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn', } try: voice_client = voice_clients[guild_id] source = discord.FFmpegPCMAudio(song_url, **ffmpeg_options) volume = volumes.get(guild_id, default_volume) source = discord.PCMVolumeTransformer(source, volume=volume) voice_client.play( source, after=lambda e: asyncio.run_coroutine_threadsafe( play_next(interaction), interaction.client.loop ), ) embed = discord.Embed( title="🎵 Now Playing", description=f"**{title}**\n\n`00:00 / {time.strftime('%M:%S', time.gmtime(duration))}`", color=discord.Color.green() ) embed.set_thumbnail(url=f"https://img.youtube.com/vi/{song_url.split('?v=')[-1]}/0.jpg") embed.add_field(name="Artist", value=artist, inline=True) embed.set_footer(text="React with 👍 or 👎 to rate this song!") view = PlaybackControls(interaction, guild_id) if guild_id in now_playing_messages and now_playing_messages[guild_id]: try: message = await interaction.channel.fetch_message(now_playing_messages[guild_id]) await message.edit(embed=embed, view=view) except discord.NotFound: message = await interaction.followup.send(embed=embed, view=view) now_playing_messages[guild_id] = message.id else: message = await interaction.followup.send(embed=embed, view=view) now_playing_messages[guild_id] = message.id start_time = time.time() asyncio.create_task(update_progress_bar(interaction, duration, start_time)) except Exception as e: await interaction.followup.send(f"❌ **Error:** Could not play the next song. {str(e)}") async def update_progress_bar(interaction: discord.Interaction, duration: int, start_time: float) -> None: guild_id = interaction.guild.id try: while guild_id in voice_clients and voice_clients[guild_id].is_playing(): elapsed_time = time.time() - start_time progress = min(elapsed_time / duration, 1.0) progress_blocks = int(progress * 20) progress_bar = "▬" * progress_blocks + "🔘" + "▬" * (20 - progress_blocks) elapsed_time_str = time.strftime("%M:%S", time.gmtime(elapsed_time)) duration_str = time.strftime("%M:%S", time.gmtime(duration)) embed = discord.Embed( title="🎵 Now Playing", description=f"**{current_tracks[guild_id][1]}**\n\n{progress_bar} `{elapsed_time_str} / {duration_str}`", color=discord.Color.green() ) if guild_id in now_playing_messages and now_playing_messages[guild_id]: message_id = now_playing_messages[guild_id] try: message = await interaction.channel.fetch_message(message_id) await message.edit(embed=embed) except discord.NotFound: break await asyncio.sleep(1) except asyncio.CancelledError: pass