123 lines
6.5 KiB
Python
123 lines
6.5 KiB
Python
import discord
|
|
from discord import app_commands
|
|
import aiosqlite
|
|
import asyncio
|
|
from datetime import datetime
|
|
from .commands import NessaTracker
|
|
from .consent import ConsentView, check_user_consent, store_user_consent
|
|
from .database import DATABASE
|
|
from dotenv import load_dotenv
|
|
import os
|
|
|
|
load_dotenv()
|
|
GUILD_ID = int(os.getenv("DISCORD_GUILD_ID"))
|
|
OWNER_ID = int(os.getenv("AUTHORIZED_USER_ID"))
|
|
|
|
class Nessa(discord.Client):
|
|
def __init__(self):
|
|
super().__init__(intents=discord.Intents.default())
|
|
self.tree = app_commands.CommandTree(self)
|
|
|
|
async def setup_hook(self):
|
|
self.tree.add_command(NessaTracker())
|
|
self.tree.copy_global_to(guild=discord.Object(id=GUILD_ID))
|
|
await self.tree.sync(guild=discord.Object(id=GUILD_ID))
|
|
|
|
def is_owner_or_admin(interaction: discord.Interaction):
|
|
"""Check if the user is the bot owner or an administrator in the guild."""
|
|
return interaction.user.id == OWNER_ID or interaction.user.guild_permissions.administrator
|
|
|
|
@self.tree.command(name="shutdown", description="Shut down Nessa", guild=discord.Object(id=GUILD_ID))
|
|
async def shutdown(interaction: discord.Interaction):
|
|
if interaction.user.id == OWNER_ID:
|
|
await interaction.response.send_message("Night, Night. Shutting down...")
|
|
await self.close()
|
|
else:
|
|
await interaction.response.send_message("Hey! I don't know you! You don't have permission to use this command. Only the owner can do that.", ephemeral=True)
|
|
|
|
@self.tree.command(name="report_issue", description="Send an anonymous issue report.")
|
|
async def report_issue(interaction: discord.Interaction, message: str):
|
|
# Check if the message exceeds 1000 characters
|
|
if len(message) > 1000:
|
|
await interaction.response.send_message("Wowie, that's a long message there. Chief asked for messages to be no longer than 1000 characters. Please shorten it and try again.", ephemeral=True)
|
|
return
|
|
|
|
# Your Discord user ID
|
|
your_user_id = OWNER_ID # Make sure OWNER_ID is defined somewhere in your code
|
|
# Fetch your user object using your ID
|
|
owner = await self.fetch_user(your_user_id)
|
|
# Send the DM to you
|
|
await owner.send(f"Hey Chief, I got a report for you: {message}")
|
|
# Respond to the user to confirm the message has been sent
|
|
await interaction.response.send_message("I've passed on your issue to the Chief!", ephemeral=True)
|
|
|
|
@self.tree.command(name="sync_commands", description="Force sync commands in a specific guild.")
|
|
@app_commands.check(is_owner_or_admin) # or @discord.app_commands.checks.is_owner()
|
|
async def sync_commands(interaction: discord.Interaction, guild_id: int):
|
|
if interaction.user.id == OWNER_ID:
|
|
guild = self.get_guild(guild_id)
|
|
if guild is None:
|
|
await interaction.response.send_message("Guild not found.", ephemeral=True)
|
|
return
|
|
await self.tree.sync(guild=discord.Object(id=guild))
|
|
self.tree.copy_global_to(guild=discord.Object(id=guild))
|
|
await interaction.response.send_message(f"Commands synced successfully for guild {guild.name}.", ephemeral=True)
|
|
|
|
async def on_ready(self):
|
|
print(f"Logged on as {self.user}!")
|
|
self.tree.copy_global_to(guild=discord.Object(id=GUILD_ID))
|
|
await self.tree.sync(guild=discord.Object(id=GUILD_ID))
|
|
self.loop.create_task(self.reminder_worker())
|
|
print("Starting reminder worker...")
|
|
|
|
async def reminder_worker(self):
|
|
while True:
|
|
await asyncio.sleep(60) # check every minute
|
|
now = datetime.now()
|
|
async with aiosqlite.connect(DATABASE) as db:
|
|
cursor = await db.execute(
|
|
"SELECT id, description, reminder_time, notification_channel_id FROM tasks WHERE reminder_time <= ? AND reminder_sent = FALSE",
|
|
(now,)
|
|
)
|
|
tasks = await cursor.fetchall()
|
|
for task_id, description, reminder_time, channel_id in tasks:
|
|
if channel_id:
|
|
channel = client.get_channel(int(channel_id))
|
|
if channel:
|
|
await channel.send(f"Reminder for task: {description}")
|
|
await db.execute("UPDATE tasks SET reminder_sent = TRUE WHERE id = ?", (task_id,))
|
|
else:
|
|
print(f"Failed to find channel with ID {channel_id}")
|
|
else:
|
|
print(f"No channel ID provided for task ID {task_id}")
|
|
await db.commit()
|
|
|
|
async def on_interaction(self, interaction: discord.Interaction):
|
|
if interaction.type == discord.InteractionType.application_command:
|
|
# First, check if the user has consented to data storage.
|
|
consented = await check_user_consent(interaction.user.id)
|
|
|
|
if not consented:
|
|
# If there is no consent, show the consent dialog,
|
|
# unless the command is to opt-out, which should be accessible without prior consent.
|
|
if interaction.command.name == 'opt-out':
|
|
# Allow users to opt-out directly if they mistakenly initiated any command.
|
|
return
|
|
view = ConsentView()
|
|
await interaction.response.send_message(
|
|
"By using the services I, Nessa, provide, you consent to the storage of your data necessary for functionality. Please confirm your consent. See /nessa consent privacy-policy for more details.",
|
|
view=view,
|
|
ephemeral=True
|
|
)
|
|
await view.wait()
|
|
if view.value:
|
|
await store_user_consent(interaction.user.id)
|
|
else:
|
|
await interaction.followup.send("Whoops! You have to give me your okay to do store your data before you can use my services!", ephemeral=True)
|
|
return # Stop processing if they do not consent
|
|
|
|
# For opt-in command, check if they're trying to opt-in after opting out.
|
|
if interaction.command.name == 'opt-in':
|
|
if consented:
|
|
await interaction.response.send_message("Hey! Thanks, but you've already opted in.", ephemeral=True)
|
|
return |