Compare commits

..

No commits in common. "dev-moments" and "main" have entirely different histories.

4 changed files with 6 additions and 663 deletions

1
.gitignore vendored
View File

@ -172,4 +172,3 @@ cython_debug/
# PyPI configuration file # PyPI configuration file
.pypirc .pypirc
/data

View File

@ -1,426 +0,0 @@
import discord
from discord import app_commands
from typing import Optional, List
from datetime import datetime, timedelta
import sqlite3
import logging
from utils.database import Database
from discord.app_commands import MissingPermissions
db = Database()
class IncidentModal(discord.ui.Modal):
def __init__(self):
super().__init__(title="Log Incident")
self.reason = discord.ui.TextInput(
label="Reason for logging",
style=discord.TextStyle.long,
required=True
)
self.message_count = discord.ui.TextInput(
label="Recent messages to capture (1-50)",
placeholder="Leave blank to use timeframe",
default="",
required=False
)
self.start_time = discord.ui.TextInput(
label="Start time (YYYY-MM-DD HH:MM)",
placeholder="Optional - Example: 2024-05-10 14:30",
required=False
)
self.end_time = discord.ui.TextInput(
label="End time (YYYY-MM-DD HH:MM)",
placeholder="Optional - Example: 2024-05-10 15:00",
required=False
)
self.add_item(self.reason)
self.add_item(self.message_count)
self.add_item(self.start_time)
self.add_item(self.end_time)
async def on_submit(self, interaction: discord.Interaction):
try:
messages = []
capture_mode = "count"
capture_param = ""
start_time = None
end_time = None
# Determine capture mode
if self.start_time.value or self.end_time.value:
if not all([self.start_time.value, self.end_time.value]):
raise ValueError("Both start and end times required for timeframe")
start_time = datetime.strptime(self.start_time.value, "%Y-%m-%d %H:%M")
end_time = datetime.strptime(self.end_time.value, "%Y-%m-%d %H:%M")
if start_time >= end_time:
raise ValueError("End time must be after start time")
if (end_time - start_time).total_seconds() > 86400:
raise ValueError("Maximum timeframe duration is 24 hours")
async for msg in interaction.channel.history(
limit=None,
after=start_time,
before=end_time
):
messages.append(msg)
messages = messages[::-1] # Oldest first
capture_mode = "timeframe"
capture_param = f"{start_time.strftime('%Y-%m-%d %H:%M')} to {end_time.strftime('%Y-%m-%d %H:%M')}"
else:
if not self.message_count.value:
raise ValueError("Please provide either message count or timeframe")
count = int(self.message_count.value)
if not 1 <= count <= 50:
raise ValueError("Message count must be between 1-50")
messages = [
msg async for msg in interaction.channel.history(limit=count)
][::-1]
capture_mode = "count"
capture_param = str(count)
# Format messages
formatted_messages = [{
"id": msg.id,
"author_id": msg.author.id,
"content": msg.content,
"timestamp": msg.created_at
} for msg in messages]
# Generate unique ID
incident_id = f"incident_{uuid.uuid4().hex[:8]}"
success = db.add_incident(
incident_id=incident_id,
reason=self.reason.value,
moderator_id=interaction.user.id,
messages=formatted_messages,
capture_mode=capture_mode,
capture_param=capture_param,
start_time=start_time,
end_time=end_time
)
if not success:
raise Exception("Database storage failed - check server logs")
embed = discord.Embed(
title="✅ Incident Logged",
description=f"**ID:** `{incident_id}`\n**Mode:** {capture_mode.title()}",
color=0x00ff00
)
embed.add_field(name="Reason", value=self.reason.value[:500], inline=False)
if messages:
preview = f"{messages[0].content[:100]}..." if len(messages[0].content) > 100 else messages[0].content
embed.add_field(name="First Message", value=preview, inline=False)
await interaction.response.send_message(embed=embed, ephemeral=True)
except ValueError as e:
await interaction.response.send_message(
f"❌ Validation Error: {str(e)}",
ephemeral=True
)
except Exception as e:
logging.error(f"Incident submission error: {str(e)}", exc_info=True)
await interaction.response.send_message(
"⚠️ Failed to log incident - please check input format",
ephemeral=True
)
class FollowupModal(discord.ui.Modal):
def __init__(self, incident_id: str):
super().__init__(title=f"Follow-up: {incident_id}")
self.incident_id = incident_id
self.notes = discord.ui.TextInput(
label="Additional notes/actions",
style=discord.TextStyle.long,
required=True
)
self.add_item(self.notes)
async def on_submit(self, interaction: discord.Interaction):
try:
success = db.add_followup(
incident_id=self.incident_id,
moderator_id=interaction.user.id,
notes=self.notes.value
)
if success:
await interaction.response.send_message(
f"✅ Follow-up added to **{self.incident_id}**",
ephemeral=True
)
else:
await interaction.response.send_message(
"❌ Failed to save follow-up",
ephemeral=True
)
except Exception as e:
logging.error(f"Followup error: {str(e)}")
await interaction.response.send_message(
"⚠️ Failed to add follow-up",
ephemeral=True
)
async def setup(client):
# Context menu command
@client.tree.context_menu(name="Mark as Funny Moment")
async def mark_funny(interaction: discord.Interaction, message: discord.Message):
try:
await message.add_reaction("😂")
await message.add_reaction("🎉")
message_link = f"https://discord.com/channels/{interaction.guild_id}/{message.channel.id}/{message.id}"
record_id = db.add_funny_moment(
message_link=message_link,
author_id=message.author.id
)
embed = discord.Embed(
title="😂 Funny Moment Saved",
description=f"[Jump to Message]({message_link})",
color=0x00ff00
).set_author(
name=message.author.display_name,
icon_url=message.author.avatar.url
).add_field(
name="Message Preview",
value=message.content[:100] + "..." if len(message.content) > 100 else message.content,
inline=False
)
await interaction.response.send_message(
embed=embed,
ephemeral=True
)
except Exception as e:
logging.error(f"Context menu error: {e}")
await interaction.response.send_message(
"❌ Couldn't mark this message. Is it too old?",
ephemeral=True
)
# Command group
moments_group = app_commands.Group(
name="moments",
description="Manage memorable moments"
)
# Incident command
@moments_group.command(
name="incident",
description="Log an incident with recent messages"
)
@app_commands.checks.has_permissions(manage_messages=True)
async def incident_log(interaction: discord.Interaction):
await interaction.response.send_modal(IncidentModal())
# Review command
@moments_group.command(
name="review",
description="Review a logged incident"
)
@app_commands.describe(incident_id="The incident ID to review")
@app_commands.checks.has_permissions(manage_messages=True)
async def review_incident(interaction: discord.Interaction, incident_id: str):
try:
incident = db.get_incident(incident_id)
if not incident:
await interaction.response.send_message("❌ Incident not found", ephemeral=True)
return
# Format messages with proper timestamps
messages = "\n\n".join(
f"**<t:{int(msg['timestamp'].timestamp())}:F>** <@{msg['author_id']}>:\n"
f"{msg['content']}"
for msg in incident['messages']
)
# Get follow-ups with proper timestamps
followups = db.get_followups(incident_id)
followup_text = "\n\n".join(
f"**<t:{int(f['timestamp'].timestamp())}:f>** <@{f['moderator_id']}>:\n{f['notes'][:200]}"
for f in followups
) if followups else "No follow-up reports yet"
# Create embed
moderator = await interaction.guild.fetch_member(incident['details']['moderator_id'])
embed = discord.Embed(
title=f"Incident {incident_id}",
description=(
f"**Reason:** {incident['details']['reason']}\n"
f"**Logged by:** {moderator.mention}\n"
f"**When:** <t:{int(incident['details']['timestamp'].timestamp())}:F>"
),
color=0xff0000
)
embed.add_field(
name="Messages",
value=messages[:1020] + "..." if len(messages) > 1024 else messages,
inline=False
)
embed.add_field(
name=f"Follow-ups ({len(followups)})",
value=followup_text[:1020] + "..." if len(followup_text) > 1024 else followup_text,
inline=False
)
await interaction.response.send_message(embed=embed, ephemeral=True)
except Exception as e:
logging.error(f"Incident review error: {str(e)}", exc_info=True)
await interaction.response.send_message("❌ Failed to retrieve incident", ephemeral=True)
# Followup commands
@moments_group.command(
name="followup",
description="Add follow-up to an incident"
)
@app_commands.describe(
incident_id="The incident ID to follow up on",
notes="Quick note (optional)"
)
@app_commands.checks.has_permissions(manage_messages=True)
async def add_followup(
interaction: discord.Interaction,
incident_id: str,
notes: Optional[str] = None
):
try:
if not db.get_incident(incident_id):
await interaction.response.send_message(
"❌ Incident not found",
ephemeral=True
)
return
if notes:
success = db.add_followup(
incident_id=incident_id,
moderator_id=interaction.user.id,
notes=notes
)
if success:
await interaction.response.send_message(
f"✅ Added quick follow-up to **{incident_id}**",
ephemeral=True
)
else:
await interaction.response.send_message(
"❌ Failed to add follow-up",
ephemeral=True
)
else:
await interaction.response.send_modal(FollowupModal(incident_id))
except Exception as e:
logging.error(f"Followup error: {e}")
await interaction.response.send_message(
"⚠️ Failed to add follow-up",
ephemeral=True
)
# Autocomplete
@review_incident.autocomplete("incident_id")
@add_followup.autocomplete("incident_id")
async def incident_autocomplete(
interaction: discord.Interaction,
current: str
) -> List[app_commands.Choice[str]]:
incidents = db.get_recent_incidents(25)
choices = []
for inc in incidents:
try:
mod = await interaction.guild.fetch_member(inc['moderator_id'])
name = f"{inc['id']} (by {mod.display_name})"
except:
name = inc['id']
if current.lower() in name.lower():
choices.append(app_commands.Choice(name=name, value=inc['id']))
return choices[:25]
# Global error handler for commands
@client.tree.error
async def on_command_error(interaction: discord.Interaction, error):
"""Handle unauthorized access attempts"""
if isinstance(error, MissingPermissions):
# Log unauthorized access
db.log_unauthorized_access(
user_id=interaction.user.id,
command_used=interaction.command.name if interaction.command else "unknown",
details=f"Attempted params: {interaction.data}"
)
await interaction.response.send_message(
"⛔ You don't have permission to use this command.",
ephemeral=True
)
else:
logging.error(f"Command error: {error}", exc_info=True)
await interaction.response.send_message(
"⚠️ An unexpected error occurred. This has been logged.",
ephemeral=True
)
@moments_group.command(
name="audit",
description="View unauthorized access attempts (Admin only)"
)
@app_commands.describe(days="Lookback period in days (max 30)")
@app_commands.checks.has_permissions(administrator=True)
async def view_audit_log(interaction: discord.Interaction, days: int = 7):
try:
if days > 30 or days < 1:
raise ValueError("Lookback period must be 1-30 days")
cutoff = datetime.now() - timedelta(days=days)
with db._get_connection() as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM unauthorized_access
WHERE timestamp > ?
ORDER BY timestamp DESC
""", (cutoff.isoformat(),)) # Store and compare ISO format
results = [dict(row) for row in cursor.fetchall()]
if not results:
await interaction.response.send_message("✅ No unauthorized access attempts found", ephemeral=True)
return
log_text = "\n".join(
f"<t:{int(datetime.fromisoformat(row['timestamp']).timestamp())}:f> | "
f"<@{row['user_id']}> tried `{row['command_used']}`"
for row in results
)
embed = discord.Embed(
title=f"Unauthorized Access Logs ({days} days)",
description=log_text[:4000],
color=0xff0000
)
await interaction.response.send_message(embed=embed, ephemeral=True)
except Exception as e:
await interaction.response.send_message(f"❌ Error: {str(e)}", ephemeral=True)
client.tree.add_command(moments_group)

30
main.py
View File

@ -6,7 +6,6 @@ from dotenv import load_dotenv
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings
# Configuration
class Settings(BaseSettings): class Settings(BaseSettings):
DISCORD_TOKEN: str DISCORD_TOKEN: str
DISCORD_GUILD: int DISCORD_GUILD: int
@ -18,8 +17,7 @@ class Settings(BaseSettings):
config = Settings() config = Settings()
# Initialize logging
# Logging setup
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s", format="%(asctime)s | %(levelname)s | %(message)s",
@ -31,41 +29,25 @@ class EmeraldClient(discord.Client):
def __init__(self): def __init__(self):
intents = discord.Intents.default() intents = discord.Intents.default()
intents.message_content = True intents.message_content = True
intents.messages = True
intents.reactions = True
super().__init__(intents=intents) super().__init__(intents=intents)
self.tree = app_commands.CommandTree(self) self.tree = app_commands.CommandTree(self)
async def setup_hook(self): async def setup_hook(self):
# Load commands """Sync commands with test guild"""
from commands.moments import setup as moments_setup
await moments_setup(self)
# Sync commands
guild = discord.Object(id=config.DISCORD_GUILD) guild = discord.Object(id=config.DISCORD_GUILD)
self.tree.copy_global_to(guild=guild) self.tree.copy_global_to(guild=guild)
await self.tree.sync(guild=guild) await self.tree.sync(guild=guild)
logging.info("Commands synced") logging.info("Commands synced to guild")
async def on_ready(self): async def on_ready(self):
logging.info(f"Logged in as {self.user}") logging.info(f"Logged in as {self.user} (ID: {self.user.id})")
await self.change_presence(activity=discord.Activity( await self.change_presence(activity=discord.Activity(
type=discord.ActivityType.listening, type=discord.ActivityType.listening,
name="/moments" name="your requests"
)) ))
if __name__ == "__main__": if __name__ == "__main__":
load_dotenv() load_dotenv()
client = EmeraldClient() client = EmeraldClient()
client.run(config.DISCORD_TOKEN)
# Global error handler
@client.tree.error
async def on_error(interaction: discord.Interaction, error):
logging.error(f"Error: {error}")
await interaction.response.send_message(
"⚠️ Something went wrong. Please try again.",
ephemeral=True
)
client.run(config.DISCORD_TOKEN)

View File

@ -1,212 +0,0 @@
import sqlite3
import logging
from datetime import datetime
from typing import List, Dict, Optional
class Database:
def __init__(self, db_path: str = "data/moments.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize database tables"""
with self._get_connection() as conn:
# Drop tables if they exist (for development)
conn.execute("DROP TABLE IF EXISTS incidents")
conn.execute("DROP TABLE IF EXISTS incident_messages")
conn.execute("""
CREATE TABLE IF NOT EXISTS unauthorized_access (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
command_used TEXT NOT NULL,
timestamp DATETIME NOT NULL,
details TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS funny_moments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_link TEXT NOT NULL,
description TEXT,
author_id INTEGER NOT NULL,
timestamp DATETIME NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
reason TEXT NOT NULL,
moderator_id INTEGER NOT NULL,
timestamp DATETIME NOT NULL,
capture_mode TEXT NOT NULL,
capture_param TEXT,
start_time DATETIME,
end_time DATETIME
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS incident_messages (
incident_id TEXT,
message_id INTEGER,
author_id INTEGER,
content TEXT,
timestamp DATETIME,
PRIMARY KEY (incident_id, message_id),
FOREIGN KEY (incident_id) REFERENCES incidents(id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS incident_followups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
incident_id TEXT NOT NULL,
moderator_id INTEGER NOT NULL,
notes TEXT NOT NULL,
timestamp DATETIME NOT NULL,
FOREIGN KEY (incident_id) REFERENCES incidents(id)
)
""")
conn.commit()
def _get_connection(self):
return sqlite3.connect(self.db_path)
def add_incident(self, incident_id: str, reason: str, moderator_id: int,
messages: List[Dict], capture_mode: str, capture_param: str,
start_time: datetime = None, end_time: datetime = None) -> bool:
"""Store an incident with related messages"""
try:
with self._get_connection() as conn:
# Add incident record
conn.execute("""
INSERT INTO incidents
(id, reason, moderator_id, timestamp, capture_mode, capture_param, start_time, end_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
incident_id,
reason,
moderator_id,
datetime.now(),
capture_mode,
capture_param,
start_time,
end_time
))
# Add incident messages
for msg in messages:
conn.execute("""
INSERT INTO incident_messages
(incident_id, message_id, author_id, content, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (
incident_id,
msg['id'],
msg['author_id'],
msg['content'],
msg['timestamp']
))
conn.commit()
return True
except Exception as e:
logging.error(f"Failed to save incident: {str(e)}")
return False
def add_funny_moment(self, message_link: str, author_id: int, description: str = None) -> int:
"""Store a funny moment in database"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO funny_moments
(message_link, description, author_id, timestamp)
VALUES (?, ?, ?, ?)
""", (message_link, description, author_id, datetime.now()))
conn.commit()
return cursor.lastrowid
def get_incident(self, incident_id: str) -> Optional[Dict]:
"""Retrieve an incident with its messages"""
with self._get_connection() as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get incident details
cursor.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,))
incident = cursor.fetchone()
if not incident:
return None
# Convert timestamp string to datetime object
incident_details = dict(incident)
incident_details['timestamp'] = datetime.fromisoformat(incident_details['timestamp'])
# Get related messages
cursor.execute("SELECT * FROM incident_messages WHERE incident_id = ?", (incident_id,))
messages = [
{**dict(msg), 'timestamp': datetime.fromisoformat(msg['timestamp'])}
for msg in cursor.fetchall()
]
return {
"details": incident_details,
"messages": messages
}
def get_recent_incidents(self, limit: int = 25):
"""Get all recent incidents (not limited to current moderator)"""
with self._get_connection() as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, moderator_id FROM incidents
ORDER BY timestamp DESC
LIMIT ?
""", (limit,))
return [dict(row) for row in cursor.fetchall()]
def add_followup(self, incident_id: str, moderator_id: int, notes: str) -> bool:
"""Add a follow-up report to an incident"""
try:
with self._get_connection() as conn:
conn.execute("""
INSERT INTO incident_followups
(incident_id, moderator_id, notes, timestamp)
VALUES (?, ?, ?, ?)
""", (incident_id, moderator_id, notes, datetime.now()))
conn.commit()
return True
except Exception as e:
logging.error(f"Failed to add followup: {str(e)}")
return False
def get_followups(self, incident_id: str) -> List[Dict]:
"""Retrieve follow-ups with proper timestamps"""
with self._get_connection() as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM incident_followups WHERE incident_id = ?", (incident_id,))
return [
{**dict(row), 'timestamp': datetime.fromisoformat(row['timestamp'])}
for row in cursor.fetchall()
]
def log_unauthorized_access(self, user_id: int, command_used: str, details: str = ""):
"""Log unauthorized command attempts"""
try:
with self._get_connection() as conn:
conn.execute("""
INSERT INTO unauthorized_access
(user_id, command_used, timestamp, details)
VALUES (?, ?, ?, ?)
""", (user_id, command_used, datetime.now(), details))
conn.commit()
return True
except Exception as e:
logging.error(f"Failed to log unauthorized access: {e}")
return False