Brandon209-Red-bot-Cogs/warnings_custom/warnings.py
2022-09-21 15:52:47 -04:00

710 lines
28 KiB
Python

import asyncio
import contextlib
from datetime import timezone
from collections import namedtuple
from copy import copy
from typing import Union, Optional, Literal
import discord
from redbot.cogs.warnings.helpers import (
warning_points_add_check,
get_command_for_exceeded_points,
get_command_for_dropping_points,
warning_points_remove_check,
)
from redbot.core import Config, checks, commands, modlog
from redbot.core.bot import Red
from redbot.core.commands import UserInputOptional
from redbot.core.i18n import Translator, cog_i18n
from redbot.core.utils import AsyncIter
from redbot.core.utils.chat_formatting import warning, pagify, error
from redbot.core.utils.menus import menu, DEFAULT_CONTROLS, start_adding_reactions
from redbot.core.utils.predicates import MessagePredicate, ReactionPredicate
_ = Translator("Warnings", __file__)
@cog_i18n(_)
class Warnings_Custom(commands.Cog):
"""Warn misbehaving users and take automated actions."""
default_guild = {
"actions": [],
"reasons": {},
"allow_custom_reasons": False,
"allow_context": False,
"allow_context_dm": False,
"toggle_dm": True,
"show_mod": False,
"warn_channel": None,
"toggle_channel": False,
}
default_member = {"total_points": 0, "status": "", "warnings": {}}
def __init__(self, bot: Red):
super().__init__()
self.config = Config.get_conf(self, identifier=5757575755)
self.config.register_guild(**self.default_guild)
self.config.register_member(**self.default_member)
self.bot = bot
self.registration_task = self.bot.loop.create_task(self.register_warningtype())
async def red_delete_data_for_user(
self,
*,
requester: Literal["discord_deleted_user", "owner", "user", "user_strict"],
user_id: int,
):
if requester != "discord_deleted_user":
return
all_members = await self.config.all_members()
c = 0
for guild_id, guild_data in all_members.items():
c += 1
if not c % 100:
await asyncio.sleep(0)
if user_id in guild_data:
await self.config.member_from_ids(guild_id, user_id).clear()
for remaining_user, user_warns in guild_data.items():
c += 1
if not c % 100:
await asyncio.sleep(0)
for warn_id, warning in user_warns.get("warnings", {}).items():
c += 1
if not c % 100:
await asyncio.sleep(0)
if warning.get("mod", 0) == user_id:
grp = self.config.member_from_ids(guild_id, remaining_user)
await grp.set_raw("warnings", warn_id, "mod", value=0xDE1)
# We're not utilising modlog yet - no need to register a casetype
@staticmethod
async def register_warningtype():
casetypes_to_register = [
{
"name": "warning",
"default_setting": True,
"image": "\N{WARNING SIGN}\N{VARIATION SELECTOR-16}",
"case_str": "Warning",
},
{
"name": "unwarned",
"default_setting": True,
"image": "\N{WARNING SIGN}\N{VARIATION SELECTOR-16}",
"case_str": "Unwarned",
},
]
try:
await modlog.register_casetypes(casetypes_to_register)
except RuntimeError:
pass
@commands.group()
@commands.guild_only()
@checks.guildowner_or_permissions(administrator=True)
async def warningset(self, ctx: commands.Context):
"""Manage settings for Warnings."""
pass
@warningset.command()
@commands.guild_only()
async def allowcustomreasons(self, ctx: commands.Context, allowed: bool):
"""Enable or disable custom reasons for a warning."""
guild = ctx.guild
await self.config.guild(guild).allow_custom_reasons.set(allowed)
if allowed:
await ctx.send(_("Custom reasons have been enabled."))
else:
await ctx.send(_("Custom reasons have been disabled."))
@warningset.command()
@commands.guild_only()
async def allowcontext(self, ctx: commands.Context, allowed: bool):
"""Enable or disable adding context to warnings."""
guild = ctx.guild
await self.config.guild(guild).allow_context.set(allowed)
if allowed:
await ctx.send(_("Context has been enabled."))
else:
await ctx.send(_("Context has been disabled."))
@warningset.command()
@commands.guild_only()
async def allowcontextdm(self, ctx: commands.Context, allowed: bool):
"""Enable or disable sending context of warnings to warned user.
Send DM and Allow Context must also be enabled for this to take effect.
"""
guild = ctx.guild
await self.config.guild(guild).allow_context_dm.set(allowed)
if allowed:
await ctx.send(_("DMing context has been enabled."))
else:
await ctx.send(_("DMing context has been disabled."))
@warningset.command()
@commands.guild_only()
async def senddm(self, ctx: commands.Context, true_or_false: bool):
"""Set whether warnings should be sent to users in DMs."""
await self.config.guild(ctx.guild).toggle_dm.set(true_or_false)
if true_or_false:
await ctx.send(_("I will now try to send warnings to users DMs."))
else:
await ctx.send(_("Warnings will no longer be sent to users DMs."))
@warningset.command()
@commands.guild_only()
async def showmoderator(self, ctx, true_or_false: bool):
"""Decide whether the name of the moderator warning a user should be included in the DM to that user."""
await self.config.guild(ctx.guild).show_mod.set(true_or_false)
if true_or_false:
await ctx.send(
_("I will include the name of the moderator who issued the warning when sending a DM to a user.")
)
else:
await ctx.send(
_("I will not include the name of the moderator who issued the warning when sending a DM to a user.")
)
@warningset.command()
@commands.guild_only()
async def warnchannel(self, ctx: commands.Context, channel: discord.TextChannel = None):
"""Set the channel where warnings should be sent to.
Leave empty to use the channel `[p]warn` command was called in.
"""
guild = ctx.guild
if channel:
await self.config.guild(guild).warn_channel.set(channel.id)
await ctx.send(_("The warn channel has been set to {channel}.").format(channel=channel.mention))
else:
await self.config.guild(guild).warn_channel.set(channel)
await ctx.send(_("Warnings will now be sent in the channel command was used in."))
@warningset.command()
@commands.guild_only()
async def usewarnchannel(self, ctx: commands.Context, true_or_false: bool):
"""
Set if warnings should be sent to a channel set with `[p]warningset warnchannel`.
"""
await self.config.guild(ctx.guild).toggle_channel.set(true_or_false)
channel = self.bot.get_channel(await self.config.guild(ctx.guild).warn_channel())
if true_or_false:
if channel:
await ctx.send(_("Warnings will now be sent to {channel}.").format(channel=channel.mention))
else:
await ctx.send(_("Warnings will now be sent in the channel command was used in."))
else:
await ctx.send(_("Toggle channel has been disabled."))
@commands.group()
@commands.guild_only()
@checks.guildowner_or_permissions(administrator=True)
async def warnaction(self, ctx: commands.Context):
"""Manage automated actions for Warnings.
Actions are essentially command macros. Any command can be run
when the action is initially triggered, and/or when the action
is lifted.
Actions must be given a name and a points threshold. When a
user is warned enough so that their points go over this
threshold, the action will be executed.
"""
pass
@warnaction.command(name="add")
@commands.guild_only()
async def action_add(self, ctx: commands.Context, name: str, points: int):
"""Create an automated action.
Duplicate action names are not allowed.
"""
guild = ctx.guild
exceed_command = await get_command_for_exceeded_points(ctx)
drop_command = await get_command_for_dropping_points(ctx)
to_add = {
"action_name": name,
"points": points,
"exceed_command": exceed_command,
"drop_command": drop_command,
}
# Have all details for the action, now save the action
guild_settings = self.config.guild(guild)
async with guild_settings.actions() as registered_actions:
for act in registered_actions:
if act["action_name"] == to_add["action_name"]:
await ctx.send(_("Duplicate action name found!"))
break
else:
registered_actions.append(to_add)
# Sort in descending order by point count for ease in
# finding the highest possible action to take
registered_actions.sort(key=lambda a: a["points"], reverse=True)
await ctx.send(_("Action {name} has been added.").format(name=name))
@warnaction.command(name="delete", aliases=["del", "remove"])
@commands.guild_only()
async def action_del(self, ctx: commands.Context, action_name: str):
"""Delete the action with the specified name."""
guild = ctx.guild
guild_settings = self.config.guild(guild)
async with guild_settings.actions() as registered_actions:
to_remove = None
for act in registered_actions:
if act["action_name"] == action_name:
to_remove = act
break
if to_remove:
registered_actions.remove(to_remove)
await ctx.tick()
else:
await ctx.send(_("No action named {name} exists!").format(name=action_name))
@commands.group()
@commands.guild_only()
@checks.guildowner_or_permissions(administrator=True)
async def warnreason(self, ctx: commands.Context):
"""Manage warning reasons.
Reasons must be given a name, description and points value. The
name of the reason must be given when a user is warned.
"""
pass
@warnreason.command(name="create", aliases=["add"])
@commands.guild_only()
async def reason_create(self, ctx: commands.Context, name: str, points: int, *, description: str):
"""Create a warning reason."""
guild = ctx.guild
if name.lower() == "custom":
await ctx.send(_("*Custom* cannot be used as a reason name!"))
return
to_add = {"points": points, "description": description}
completed = {name.lower(): to_add}
guild_settings = self.config.guild(guild)
async with guild_settings.reasons() as registered_reasons:
registered_reasons.update(completed)
await ctx.send(_("The new reason has been registered."))
@warnreason.command(name="delete", aliases=["remove", "del"])
@commands.guild_only()
async def reason_del(self, ctx: commands.Context, reason_name: str):
"""Delete a warning reason."""
guild = ctx.guild
guild_settings = self.config.guild(guild)
async with guild_settings.reasons() as registered_reasons:
if registered_reasons.pop(reason_name.lower(), None):
await ctx.tick()
else:
await ctx.send(_("That is not a registered reason name."))
@commands.command()
@commands.guild_only()
@checks.admin_or_permissions(ban_members=True)
async def reasonlist(self, ctx: commands.Context):
"""List all configured reasons for Warnings."""
guild = ctx.guild
guild_settings = self.config.guild(guild)
msg_list = []
async with guild_settings.reasons() as registered_reasons:
for r, v in registered_reasons.items():
if await ctx.embed_requested():
em = discord.Embed(
title=_("Reason: {name}").format(name=r),
description=v["description"],
)
em.add_field(name=_("Points"), value=str(v["points"]))
msg_list.append(em)
else:
msg_list.append(
_("Name: {reason_name}\nPoints: {points}\nDescription: {description}").format(
reason_name=r, **v
)
)
if msg_list:
await menu(ctx, msg_list, DEFAULT_CONTROLS)
else:
await ctx.send(_("There are no reasons configured!"))
@commands.command()
@commands.guild_only()
@checks.admin_or_permissions(ban_members=True)
async def actionlist(self, ctx: commands.Context):
"""List all configured automated actions for Warnings."""
guild = ctx.guild
guild_settings = self.config.guild(guild)
msg_list = []
async with guild_settings.actions() as registered_actions:
for r in registered_actions:
if await ctx.embed_requested():
em = discord.Embed(title=_("Action: {name}").format(name=r["action_name"]))
em.add_field(name=_("Points"), value="{}".format(r["points"]), inline=False)
em.add_field(
name=_("Exceed command"),
value=r["exceed_command"],
inline=False,
)
em.add_field(name=_("Drop command"), value=r["drop_command"], inline=False)
msg_list.append(em)
else:
msg_list.append(
_(
"Name: {action_name}\nPoints: {points}\n"
"Exceed command: {exceed_command}\nDrop command: {drop_command}"
).format(**r)
)
if msg_list:
await menu(ctx, msg_list, DEFAULT_CONTROLS)
else:
await ctx.send(_("There are no actions configured!"))
@commands.command()
@commands.guild_only()
@checks.admin_or_permissions(ban_members=True)
async def warn(
self,
ctx: commands.Context,
user: discord.Member,
points: UserInputOptional[int] = 1,
*,
reason: str,
):
"""Warn the user for the specified reason. Context can be provided after running command
`<points>` number of points the warning should be for. If no number is supplied
1 point will be given. Pre-set warnings disregard this.
`<reason>` can be a registered reason if it exists or a custom one
is created by default.
"""
guild = ctx.guild
if user == ctx.author:
return await ctx.send(_("You cannot warn yourself."))
if user.bot:
return await ctx.send(_("You cannot warn other bots."))
if user == ctx.guild.owner:
return await ctx.send(_("You cannot warn the server owner."))
if user.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
return await ctx.send(
_(
"The person you're trying to warn is equal or higher than you in the discord hierarchy, you cannot warn them."
)
)
guild_settings = await self.config.guild(ctx.guild).all()
custom_allowed = guild_settings["allow_custom_reasons"]
reason_type = None
async with self.config.guild(ctx.guild).reasons() as registered_reasons:
if (reason_type := registered_reasons.get(reason.lower())) is None:
msg = _("That is not a registered reason!")
if custom_allowed:
reason_type = {"description": reason, "points": points}
else:
# logic taken from `[p]permissions canrun`
fake_message = copy(ctx.message)
fake_message.content = f"{ctx.prefix}warningset allowcustomreasons"
fake_context = await ctx.bot.get_context(fake_message)
try:
can = await self.allowcustomreasons.can_run(
fake_context, check_all_parents=True, change_permission_state=False
)
except commands.CommandError:
can = False
if can:
msg += " " + _(
"Do `{prefix}warningset allowcustomreasons true` to enable custom " "reasons."
).format(prefix=ctx.clean_prefix)
return await ctx.send(msg)
if reason_type is None:
return
# get context of reason, if provided
context = ""
if await self.config.guild(guild).allow_context():
msg = await ctx.send(
"Would you like to provide more context to the warning? (react with yes or no)", delete_after=31
)
start_adding_reactions(msg, ReactionPredicate.YES_OR_NO_EMOJIS)
pred = ReactionPredicate.yes_or_no(msg, ctx.author)
try:
await self.bot.wait_for("reaction_add", check=pred, timeout=30)
except asyncio.TimeoutError:
await ctx.send(error("Took too long, cancelling warning!"), delete_after=30)
return
if pred.result:
done = False
while not done:
await ctx.send("Please provide context as text and/or an attachment.", delete_after=240)
pred = MessagePredicate.same_context(ctx)
try:
msg = await self.bot.wait_for("message", check=pred, timeout=240)
except asyncio.TimeoutError:
await ctx.send(error("Took too long, cancelling warning!"), delete_after=30)
return
yes_or_no = await ctx.send("Continue with provided context? React no to redo.", delete_after=31)
start_adding_reactions(yes_or_no, ReactionPredicate.YES_OR_NO_EMOJIS)
pred = ReactionPredicate.yes_or_no(yes_or_no, ctx.author)
try:
await self.bot.wait_for("reaction_add", check=pred, timeout=30)
except asyncio.TimeoutError:
await ctx.send(error("Took too long, cancelling warning!"))
return
done = pred.result
if len(msg.attachments):
urls = "\n".join([a.url for a in msg.attachments])
context = f"{msg.content}\n**urls**: {urls}"
else:
context = msg.content
member_settings = self.config.member(user)
current_point_count = await member_settings.total_points()
current_point_count += reason_type["points"]
await member_settings.total_points.set(current_point_count)
await warning_points_add_check(self.config, ctx, user, current_point_count)
dm = guild_settings["toggle_dm"]
showmod = guild_settings["show_mod"]
dm_failed = False
if dm:
if showmod:
title = _("Warning from {user}").format(user=ctx.author)
else:
title = _("Warning")
em = discord.Embed(
title=title,
description=reason_type["description"],
)
em.add_field(name=_("Points"), value=str(reason_type["points"]))
if await self.config.guild(guild).allow_context_dm() and context != "":
em.add_field(name="Context", value=context)
try:
await user.send(
_("You have received a warning in {guild_name}.").format(guild_name=ctx.guild.name),
embed=em,
)
except discord.HTTPException:
dm_failed = True
if dm_failed:
await ctx.send(
_("A warning for {user} has been issued," " but I wasn't able to send them a warn message.").format(
user=user.mention
)
)
toggle_channel = guild_settings["toggle_channel"]
if toggle_channel:
if showmod:
title = _("Warning from {user}").format(user=ctx.author)
else:
title = _("Warning")
em = discord.Embed(
title=title,
description=reason_type["description"],
)
em.add_field(name=_("Points"), value=str(reason_type["points"]))
warn_channel = self.bot.get_channel(guild_settings["warn_channel"])
if warn_channel:
if warn_channel.permissions_for(guild.me).send_messages:
with contextlib.suppress(discord.HTTPException):
await warn_channel.send(
_("{user} has been warned.").format(user=user.mention),
embed=em,
)
if not dm_failed:
if warn_channel:
await ctx.tick()
else:
await ctx.send(_("{user} has been warned.").format(user=user.mention), embed=em)
else:
if not dm_failed:
await ctx.tick()
reason_msg = _("{reason}\n\nUse `{prefix}unwarn {user} {message}` to remove this warning.{context}").format(
reason=_("{description}\nPoints: {points}").format(
description=reason_type["description"], points=reason_type["points"]
),
prefix=ctx.clean_prefix,
user=user.id,
message=ctx.message.id,
context=f"\n\n**Context**:\n{context}" if context else "",
)
case = await modlog.create_case(
self.bot,
ctx.guild,
ctx.message.created_at.replace(tzinfo=timezone.utc),
"warning",
user,
ctx.message.author,
reason_msg,
until=None,
channel=None,
)
warning_to_add = {
str(ctx.message.id): {
"points": reason_type["points"],
"description": reason_type["description"],
"mod": ctx.author.id,
"date": ctx.message.created_at.replace(tzinfo=timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC"),
"caseno": case.case_number,
}
}
async with member_settings.warnings() as user_warnings:
user_warnings.update(warning_to_add)
@commands.command()
@commands.guild_only()
@checks.admin()
async def warnings(self, ctx: commands.Context, user: Union[discord.Member, int]):
"""List the warnings for the specified user."""
try:
userid: int = user.id
except AttributeError:
userid: int = user
user = ctx.guild.get_member(userid)
user = user or namedtuple("Member", "id guild")(userid, ctx.guild)
msg = ""
member_settings = self.config.member(user)
async with member_settings.warnings() as user_warnings:
if not user_warnings.keys(): # no warnings for the user
await ctx.send(_("That user has no warnings!"))
else:
for key in user_warnings.keys():
mod_id = user_warnings[key]["mod"]
if mod_id == 0xDE1:
mod = _("Deleted Moderator")
else:
bot = ctx.bot
mod = bot.get_user(mod_id) or _("Unknown Moderator ({})").format(mod_id)
date = user_warnings[key].get(
"date", None
) # not all warnings may have date if switched from using warnings cog by red
num = user_warnings[key].get("caseno", None) # same as above
msg += _(
"{num}{num_points} point warning {reason_name} issued by {user} for " "{description}{date}\n"
).format(
num_points=user_warnings[key]["points"],
reason_name=key,
user=mod,
description=user_warnings[key]["description"],
date=" at {}".format(date) if date else "",
num=f"Case #{num}: " if num else "",
)
await ctx.send_interactive(
pagify(msg, shorten_by=58),
box_lang=_("Warnings for {user}").format(user=user),
)
@commands.command()
@commands.guild_only()
async def mywarnings(self, ctx: commands.Context):
"""List warnings for yourself."""
user = ctx.author
msg = ""
member_settings = self.config.member(user)
async with member_settings.warnings() as user_warnings:
if not user_warnings.keys(): # no warnings for the user
await ctx.send(_("You have no warnings!"))
else:
for key in user_warnings.keys():
mod_id = user_warnings[key]["mod"]
if mod_id == 0xDE1:
mod = _("Deleted Moderator")
else:
bot = ctx.bot
mod = bot.get_user(mod_id) or _("Unknown Moderator ({})").format(mod_id)
date = user_warnings[key].get(
"date", None
) # not all warnings may have date if switched from using warnings cog by red
msg += _(
"{num_points} point warning {reason_name} issued by {user} for " "{description}{date}\n"
).format(
num_points=user_warnings[key]["points"],
reason_name=key,
user=mod,
description=user_warnings[key]["description"],
date=" at {}".format(date) if date else "",
)
await ctx.send_interactive(
pagify(msg, shorten_by=58),
box_lang=_("Warnings for {user}").format(user=user),
)
@commands.command()
@commands.guild_only()
@checks.admin_or_permissions(ban_members=True)
async def unwarn(
self,
ctx: commands.Context,
user: Union[discord.Member, int],
warn_id: str,
*,
reason: str = None,
):
"""Remove a warning from a user."""
guild = ctx.guild
try:
user_id = user.id
member = user
except AttributeError:
user_id = user
member = guild.get_member(user_id)
member = member or namedtuple("Member", "guild id")(guild, user_id)
if user_id == ctx.author.id:
return await ctx.send(_("You cannot remove warnings from yourself."))
member_settings = self.config.member(member)
current_point_count = await member_settings.total_points()
await warning_points_remove_check(self.config, ctx, member, current_point_count)
async with member_settings.warnings() as user_warnings:
if warn_id not in user_warnings.keys():
return await ctx.send(_("That warning doesn't exist!"))
else:
current_point_count -= user_warnings[warn_id]["points"]
await member_settings.total_points.set(current_point_count)
user_warnings.pop(warn_id)
await modlog.create_case(
self.bot,
ctx.guild,
ctx.message.created_at.replace(tzinfo=timezone.utc),
"unwarned",
member,
ctx.message.author,
reason,
until=None,
channel=None,
)
await ctx.tick()