Added an automatic Audit for commands when someone attempts to access them when they shouldn't be.
This commit is contained in:
@ -4,11 +4,6 @@ from datetime import datetime
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
|
||||
import sqlite3
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
class Database:
|
||||
def __init__(self, db_path: str = "data/moments.db"):
|
||||
self.db_path = db_path
|
||||
@ -21,6 +16,16 @@ class Database:
|
||||
conn.execute("DROP TABLE IF EXISTS incidents")
|
||||
conn.execute("DROP TABLE IF EXISTS incident_messages")
|
||||
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS unauthorized_access (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
user_id INTEGER NOT NULL,
|
||||
command_used TEXT NOT NULL,
|
||||
timestamp DATETIME NOT NULL,
|
||||
details TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS funny_moments (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@ -131,16 +136,17 @@ class Database:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get incident details and parse timestamp
|
||||
# Get incident details
|
||||
cursor.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,))
|
||||
incident = cursor.fetchone()
|
||||
if not incident:
|
||||
return None
|
||||
|
||||
# Convert timestamp string to datetime object
|
||||
incident_details = dict(incident)
|
||||
incident_details['timestamp'] = datetime.fromisoformat(incident_details['timestamp']) # Convert string to datetime
|
||||
incident_details['timestamp'] = datetime.fromisoformat(incident_details['timestamp'])
|
||||
|
||||
# Get messages with parsed timestamps
|
||||
# Get related messages
|
||||
cursor.execute("SELECT * FROM incident_messages WHERE incident_id = ?", (incident_id,))
|
||||
messages = [
|
||||
{**dict(msg), 'timestamp': datetime.fromisoformat(msg['timestamp'])}
|
||||
@ -180,7 +186,7 @@ class Database:
|
||||
return False
|
||||
|
||||
def get_followups(self, incident_id: str) -> List[Dict]:
|
||||
"""Get follow-ups with proper timestamps"""
|
||||
"""Retrieve follow-ups with proper timestamps"""
|
||||
with self._get_connection() as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
@ -189,3 +195,18 @@ class Database:
|
||||
{**dict(row), 'timestamp': datetime.fromisoformat(row['timestamp'])}
|
||||
for row in cursor.fetchall()
|
||||
]
|
||||
|
||||
def log_unauthorized_access(self, user_id: int, command_used: str, details: str = ""):
|
||||
"""Log unauthorized command attempts"""
|
||||
try:
|
||||
with self._get_connection() as conn:
|
||||
conn.execute("""
|
||||
INSERT INTO unauthorized_access
|
||||
(user_id, command_used, timestamp, details)
|
||||
VALUES (?, ?, ?, ?)
|
||||
""", (user_id, command_used, datetime.now(), details))
|
||||
conn.commit()
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to log unauthorized access: {e}")
|
||||
return False
|
||||
|
Reference in New Issue
Block a user