345 lines
11 KiB
Python
345 lines
11 KiB
Python
import discord
|
||
from discord import app_commands
|
||
from discord.ext import commands, tasks
|
||
import sqlite3
|
||
|
||
DB_FILE = "swearjar.db"
|
||
SWEAR_WORDS = {REDACTED FOR CODE REVIEW}
|
||
BOT_TOKEN = "REDACTED FOR CODE REVIEW"
|
||
|
||
intents = discord.Intents.default()
|
||
intents.message_content = True
|
||
intents.members = True
|
||
intents.guilds = True
|
||
|
||
|
||
# ==============================
|
||
# BOT CLASS
|
||
# ==============================
|
||
class SwearJarBot(commands.Bot):
|
||
def __init__(self):
|
||
super().__init__(command_prefix="!", intents=intents)
|
||
self.synced = False
|
||
|
||
async def setup_hook(self):
|
||
self.remove_command("help")
|
||
self.update_status.start()
|
||
|
||
async def on_ready(self):
|
||
init_db()
|
||
if not self.synced:
|
||
await self.tree.sync()
|
||
self.synced = True
|
||
print(f"✅ Logged in as {self.user} (ID: {self.user.id})")
|
||
|
||
@tasks.loop(minutes=10)
|
||
async def update_status(self):
|
||
"""Updates the bot's 'Listening to XXX swear words' every 10 minutes."""
|
||
total = get_global_swear_count()
|
||
activity = discord.Activity(
|
||
type=discord.ActivityType.listening,
|
||
name=f"{total:,} swear words"
|
||
)
|
||
await self.change_presence(activity=activity)
|
||
|
||
@update_status.before_loop
|
||
async def before_update_status(self):
|
||
await self.wait_until_ready()
|
||
|
||
|
||
bot = SwearJarBot()
|
||
|
||
|
||
# ==============================
|
||
# DATABASE FUNCTIONS
|
||
# ==============================
|
||
def init_db():
|
||
conn = sqlite3.connect(DB_FILE)
|
||
cur = conn.cursor()
|
||
|
||
cur.execute(
|
||
"""CREATE TABLE IF NOT EXISTS swear_counts (
|
||
guild_id TEXT NOT NULL,
|
||
user_id TEXT NOT NULL,
|
||
count INTEGER DEFAULT 0,
|
||
PRIMARY KEY (guild_id, user_id)
|
||
)"""
|
||
)
|
||
|
||
cur.execute(
|
||
"""CREATE TABLE IF NOT EXISTS privacy_optouts (
|
||
user_id TEXT PRIMARY KEY
|
||
)"""
|
||
)
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def add_swear(guild_id: int, user_id: int, count: int = 1):
|
||
if is_user_opted_out(user_id):
|
||
return
|
||
conn = sqlite3.connect(DB_FILE)
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
"""INSERT INTO swear_counts (guild_id, user_id, count)
|
||
VALUES (?, ?, ?)
|
||
ON CONFLICT(guild_id, user_id)
|
||
DO UPDATE SET count = count + ?""",
|
||
(str(guild_id), str(user_id), count, count)
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def get_swear_counts(user_id: int):
|
||
conn = sqlite3.connect(DB_FILE)
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT guild_id, count FROM swear_counts WHERE user_id = ?", (str(user_id),))
|
||
rows = cur.fetchall()
|
||
conn.close()
|
||
total = sum(row[1] for row in rows)
|
||
return rows, total
|
||
|
||
|
||
def get_top_swearers(limit: int = 10, guild_id: int | None = None, offset: int = 0):
|
||
conn = sqlite3.connect(DB_FILE)
|
||
cur = conn.cursor()
|
||
if guild_id:
|
||
cur.execute(
|
||
"""SELECT user_id, count
|
||
FROM swear_counts
|
||
WHERE guild_id = ?
|
||
ORDER BY count DESC
|
||
LIMIT ? OFFSET ?""",
|
||
(str(guild_id), limit, offset)
|
||
)
|
||
else:
|
||
cur.execute(
|
||
"""SELECT user_id, SUM(count) AS total
|
||
FROM swear_counts
|
||
GROUP BY user_id
|
||
ORDER BY total DESC
|
||
LIMIT ? OFFSET ?""",
|
||
(limit, offset)
|
||
)
|
||
results = cur.fetchall()
|
||
conn.close()
|
||
return results
|
||
|
||
|
||
def get_global_swear_count() -> int:
|
||
"""Return total swears across all users and servers."""
|
||
conn = sqlite3.connect(DB_FILE)
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT SUM(count) FROM swear_counts")
|
||
result = cur.fetchone()
|
||
conn.close()
|
||
return result[0] or 0
|
||
|
||
|
||
# ==============================
|
||
# PRIVACY
|
||
# ==============================
|
||
def is_user_opted_out(user_id: int) -> bool:
|
||
conn = sqlite3.connect(DB_FILE)
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT 1 FROM privacy_optouts WHERE user_id = ?", (str(user_id),))
|
||
result = cur.fetchone()
|
||
conn.close()
|
||
return result is not None
|
||
|
||
|
||
def set_user_optout(user_id: int):
|
||
conn = sqlite3.connect(DB_FILE)
|
||
cur = conn.cursor()
|
||
cur.execute("DELETE FROM swear_counts WHERE user_id = ?", (str(user_id),))
|
||
cur.execute("INSERT OR IGNORE INTO privacy_optouts (user_id) VALUES (?)", (str(user_id),))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def remove_user_optout(user_id: int):
|
||
conn = sqlite3.connect(DB_FILE)
|
||
cur = conn.cursor()
|
||
cur.execute("DELETE FROM privacy_optouts WHERE user_id = ?", (str(user_id),))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
# ==============================
|
||
# UTILS
|
||
# ==============================
|
||
def censor_name(name: str) -> str:
|
||
if len(name) <= 3:
|
||
return "*" * len(name)
|
||
return "*" * (len(name) - 3) + name[-3:]
|
||
|
||
|
||
# ==============================
|
||
# MESSAGE TRACKING
|
||
# ==============================
|
||
@bot.event
|
||
async def on_message(message: discord.Message):
|
||
if message.author.bot or not message.guild:
|
||
return
|
||
if is_user_opted_out(message.author.id):
|
||
return
|
||
content = message.content.lower()
|
||
if any(word in content for word in SWEAR_WORDS):
|
||
add_swear(message.guild.id, message.author.id)
|
||
await bot.process_commands(message)
|
||
|
||
|
||
# ==============================
|
||
# COMMANDS
|
||
# ==============================
|
||
@bot.tree.command(name="swearjar", description="View your or another user's swear count")
|
||
async def swearjar(interaction: discord.Interaction, user: discord.User | None = None):
|
||
user = user or interaction.user
|
||
if is_user_opted_out(user.id):
|
||
await interaction.response.send_message(
|
||
f"{user.display_name} has opted out of tracking 🔒", ephemeral=True
|
||
)
|
||
return
|
||
|
||
rows, total = get_swear_counts(user.id)
|
||
if not rows:
|
||
await interaction.response.send_message(
|
||
f"{user.display_name} hasn't been caught swearing yet 🎉", ephemeral=True
|
||
)
|
||
return
|
||
|
||
embed = discord.Embed(title=f"🪣 Swear Jar — {user.display_name}", color=discord.Color.blurple())
|
||
for guild_id, count in rows:
|
||
g = bot.get_guild(int(guild_id))
|
||
embed.add_field(
|
||
name=g.name if g else f"Unknown Server ({guild_id})", value=f"{count:,}", inline=False
|
||
)
|
||
embed.add_field(name="Total across all servers", value=f"**{total:,}**", inline=False)
|
||
await interaction.response.send_message(embed=embed)
|
||
|
||
|
||
# ==============================
|
||
# LEADERBOARD
|
||
# ==============================
|
||
class LeaderboardView(discord.ui.View):
|
||
def __init__(self, mode: str):
|
||
super().__init__(timeout=60)
|
||
self.mode = mode
|
||
self.page = 0
|
||
self.page_size = 10
|
||
|
||
async def update_page(self, interaction: discord.Interaction):
|
||
offset = self.page * self.page_size
|
||
guild_id = interaction.guild.id if self.mode == "guild" else None
|
||
results = get_top_swearers(limit=self.page_size, guild_id=guild_id, offset=offset)
|
||
embed = format_leaderboard(results, self.mode, self.page)
|
||
await interaction.response.edit_message(embed=embed, view=self)
|
||
|
||
@discord.ui.button(label="⬅️ Prev", style=discord.ButtonStyle.primary, disabled=True)
|
||
async def prev_page(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||
if self.page > 0:
|
||
self.page -= 1
|
||
button.disabled = self.page == 0
|
||
await self.update_page(interaction)
|
||
|
||
@discord.ui.button(label="➡️ Next", style=discord.ButtonStyle.primary)
|
||
async def next_page(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||
self.page += 1
|
||
await self.update_page(interaction)
|
||
|
||
|
||
def format_leaderboard(results, mode="guild", page=0, per_page=10):
|
||
if not results:
|
||
return discord.Embed(title="No data yet!", color=discord.Color.gold())
|
||
|
||
title = "🌍 Global Swear Leaderboard" if mode == "global" else "🏠 Server Swear Leaderboard"
|
||
embed = discord.Embed(title=title, color=discord.Color.gold())
|
||
|
||
desc = ""
|
||
for idx, (user_id, count) in enumerate(results, start=1 + page * per_page):
|
||
if is_user_opted_out(user_id):
|
||
continue
|
||
user = bot.get_user(int(user_id))
|
||
name = censor_name(user.name if user else "Unknown")
|
||
desc += f"**#{idx}** — {name}: `{count:,}`\n"
|
||
|
||
embed.description = desc or "No data to show."
|
||
embed.set_footer(text=f"Page {page+1}")
|
||
return embed
|
||
|
||
|
||
@bot.tree.command(name="sweartop", description="Show the top swearers")
|
||
async def sweartop(interaction: discord.Interaction, mode: str = "server"):
|
||
guild_id = interaction.guild.id if mode.lower() != "global" and interaction.guild else None
|
||
results = get_top_swearers(limit=10, guild_id=guild_id)
|
||
if not results:
|
||
await interaction.response.send_message("No swearing detected yet!", ephemeral=True)
|
||
return
|
||
|
||
embed = format_leaderboard(results, "guild" if guild_id else "global")
|
||
view = LeaderboardView("guild" if guild_id else "global")
|
||
await interaction.response.send_message(embed=embed, view=view)
|
||
|
||
|
||
# ==============================
|
||
# PRIVACY COMMAND
|
||
# ==============================
|
||
@bot.tree.command(name="privacy", description="View or manage your data privacy settings")
|
||
@app_commands.choices(action=[
|
||
app_commands.Choice(name="view", value="view"),
|
||
app_commands.Choice(name="delete", value="delete"),
|
||
app_commands.Choice(name="reenable", value="reenable"),
|
||
])
|
||
async def privacy(interaction: discord.Interaction, action: app_commands.Choice[str]):
|
||
user = interaction.user
|
||
|
||
if action.value == "view":
|
||
opted = is_user_opted_out(user.id)
|
||
msg = ("🔒 You are **OPTED OUT** and no data is being tracked."
|
||
if opted else
|
||
"✅ You are **OPTED IN** and swear counts are being tracked.")
|
||
await interaction.response.send_message(msg, ephemeral=True)
|
||
|
||
elif action.value == "delete":
|
||
set_user_optout(user.id)
|
||
await interaction.response.send_message(
|
||
"✅ Your swear data has been deleted and tracking disabled.", ephemeral=True
|
||
)
|
||
|
||
elif action.value == "reenable":
|
||
remove_user_optout(user.id)
|
||
await interaction.response.send_message(
|
||
"🔓 Tracking has been re‑enabled. Welcome back to the jar!", ephemeral=True
|
||
)
|
||
|
||
|
||
# ==============================
|
||
# HELP MENU
|
||
# ==============================
|
||
@bot.tree.command(name="help", description="Learn how to use the SwearJar bot")
|
||
async def help_command(interaction: discord.Interaction):
|
||
embed = discord.Embed(
|
||
title="🪣 SwearJar Help Menu",
|
||
description="Here's what I can do:",
|
||
color=discord.Color.blurple()
|
||
)
|
||
|
||
embed.add_field(name="💬 `/swearjar [user]`", value="See your swear totals.", inline=False)
|
||
embed.add_field(name="📊 `/sweartop [server/global]`", value="See the top swearers.", inline=False)
|
||
embed.add_field(name="🛡️ `/privacy`", value="Control your data and privacy.", inline=False)
|
||
embed.add_field(name="🤖 Tracking", value="Automatically tracks certain words (except opted-out users).", inline=False)
|
||
|
||
embed.set_footer(
|
||
text="SwearJar Bot • Be nice, or pay the jar 💰",
|
||
icon_url=bot.user.display_avatar.url if bot.user.avatar else None
|
||
)
|
||
|
||
await interaction.response.send_message(embed=embed, ephemeral=True)
|
||
|
||
|
||
# ==============================
|
||
# RUN
|
||
# ==============================
|
||
bot.run(BOT_TOKEN) |