import asyncio import os from datetime import datetime import aiosqlite import discord from discord import app_commands from dotenv import load_dotenv from .commands import NessaTracker from .consent import ConsentView, check_user_consent, store_user_consent from .database import DATABASE from .logger import logger load_dotenv() GUILD_ID = int(os.getenv("DISCORD_GUILD_ID")) OWNER_ID = int(os.getenv("AUTHORIZED_USER_ID")) class Nessa(discord.Client): def __init__(self): super().__init__(intents=discord.Intents.default()) self.tree = app_commands.CommandTree(self) async def setup_hook(self): self.tree.add_command(NessaTracker()) self.tree.copy_global_to(guild=discord.Object(id=GUILD_ID)) await self.tree.sync(guild=discord.Object(id=GUILD_ID)) def is_owner_or_admin(interaction: discord.Interaction): """Check if the user is the bot owner or an administrator in the guild.""" return ( interaction.user.id == OWNER_ID or interaction.user.guild_permissions.administrator ) @self.tree.command( name="shutdown", description="Shut down Nessa", guild=discord.Object(id=GUILD_ID), ) async def shutdown(interaction: discord.Interaction): if interaction.user.id == OWNER_ID: await interaction.response.send_message( "Night, Night. Shutting down..." ) await self.close() else: await interaction.response.send_message( "Hey! I don't know you! You don't have permission to use " "this command. Only the owner can do that.", ephemeral=True, ) @self.tree.command( name="report_issue", description="Send an anonymous issue report." ) async def report_issue(interaction: discord.Interaction, message: str): if len(message) > 1000: await interaction.response.send_message( "Wowie, that's a long message there. Chief asked for " "messages to be no longer than 1000 characters. Please " "shorten it and try again.", ephemeral=True, ) return owner = await self.fetch_user(OWNER_ID) await owner.send(f"Hey Chief, I got a report for you: {message}") await interaction.response.send_message( "I've passed on your issue to the Chief!", ephemeral=True ) @self.tree.command( name="sync_commands", description="Force Sync to chosen guild." ) @app_commands.check(is_owner_or_admin) async def sync_commands(interaction: discord.Interaction, guild_id: str): try: guild_id_int = int(guild_id) guild = self.get_guild(guild_id_int) if guild is None: await interaction.response.send_message( "Guild not found.", ephemeral=True ) return await self.tree.sync(guild=discord.Object(id=guild_id_int)) self.tree.copy_global_to(guild=discord.Object(id=guild_id_int)) await interaction.response.send_message( f"Commands synced successfully for guild {guild.name}.", ephemeral=True, ) except ValueError: await interaction.response.send_message( "Invalid guild ID.", ephemeral=True ) async def on_ready(self): logger.info(f"Logged on as {self.user}!") self.tree.copy_global_to(guild=discord.Object(id=GUILD_ID)) await self.tree.sync(guild=discord.Object(id=GUILD_ID)) self.loop.create_task(self.reminder_worker()) logger.info("Starting reminder worker...") async def on_disconnect(self): logger.info(f"{self.user} has disconnected. ", "Attempting to reconnect...") async def on_resumed(self): logger.info(f"{self.user} has resumed the connection.") async def on_error(self, event, *args, **kwargs): logger.error(f"An error occurred: {event}") await self.close() async def on_connect(self): logger.info(f"{self.user} has connected.") async def reminder_worker(self): while True: await asyncio.sleep(60) # check every minute now = datetime.now() async with aiosqlite.connect(DATABASE) as db: cursor = await db.execute( "SELECT id, description, reminder_time, " "notification_channel_id FROM tasks WHERE reminder_time " "<= ? AND reminder_sent = FALSE", (now,), ) tasks = await cursor.fetchall() for task_id, description, reminder_time, channel_id in tasks: if channel_id: channel = self.get_channel(int(channel_id)) if channel: await channel.send(f"Reminder for task: {description}") await db.execute( "UPDATE tasks SET reminder_sent = TRUE " "WHERE id = ?", (task_id,), ) else: print(f"Couldn't Find Channel {channel_id}") else: print(f"No channel ID provided for task ID {task_id}") await db.commit() async def on_interaction(self, interaction: discord.Interaction): if interaction.type == discord.InteractionType.application_command: consented = await check_user_consent(interaction.user.id) if not consented: if interaction.command.name == "opt-out": return view = ConsentView() await interaction.response.send_message( "By using the services I, Nessa, provide, you consent to " "the storage of your data necessary for functionality. " "Please confirm your consent. " "See /nessa consent privacy-policy for more details.", view=view, ephemeral=True, ) await view.wait() if view.value: await store_user_consent(interaction.user.id) else: await interaction.followup.send( "Whoops! You have to give me your okay to do store " "your data before you can use my services!", ephemeral=True, ) return if interaction.command.name == "opt-in": if consented: await interaction.response.send_message( "Hey! Thanks, but you've already opted in.", ephemeral=True ) return