From 8ca2cb8b924e477ea079b2ff8ee5c3c42c061f94 Mon Sep 17 00:00:00 2001 From: brandons209 Date: Sat, 8 Feb 2020 17:09:50 -0500 Subject: [PATCH] add isolate, fixed nitro booster for punish --- isolate/__init__.py | 7 + isolate/info.json | 20 + isolate/isolate.py | 1395 +++++++++++++++++++++++++++++++++++++++++++ isolate/memoizer.py | 40 ++ isolate/utils.py | 221 +++++++ punish/punish.py | 13 +- punish/utils.py | 4 +- 7 files changed, 1689 insertions(+), 11 deletions(-) create mode 100644 isolate/__init__.py create mode 100644 isolate/info.json create mode 100644 isolate/isolate.py create mode 100644 isolate/memoizer.py create mode 100644 isolate/utils.py diff --git a/isolate/__init__.py b/isolate/__init__.py new file mode 100644 index 0000000..6904340 --- /dev/null +++ b/isolate/__init__.py @@ -0,0 +1,7 @@ +from .isolate import Isolate + + +async def setup(bot): + isolate = Isolate(bot) + await isolate.initialize() + bot.add_cog(isolate) diff --git a/isolate/info.json b/isolate/info.json new file mode 100644 index 0000000..3690ead --- /dev/null +++ b/isolate/info.json @@ -0,0 +1,20 @@ +{ + "author": [ + "Brandons209", + "calebj" + ], + "bot_version": [ + 3, + 0, + 0 + ], + "description": "Provides a way to quickly remove a user for all chats and isolate them to a single channel. Supports setting times, automatic removal of isolate, modlog integration, custom role overrides, automatic server setup, and a lot more. This is different from punish as it always remove all a user's roles and by default makes the user not read all channels.", + "hidden": false, + "install_msg": "Thank you for using this cog! Please make sure to setup the cog using [p]isolateset", + "requirements": [], + "short": "Provides a way to quickly remove a user for all chats and isolate them to a single channel.", + "tags": [ + "brandons209", + "isolate" + ] +} diff --git a/isolate/isolate.py b/isolate/isolate.py new file mode 100644 index 0000000..f6a6f23 --- /dev/null +++ b/isolate/isolate.py @@ -0,0 +1,1395 @@ +# redbot/discord +from redbot.core.utils.chat_formatting import * +from redbot.core.utils import mod +from redbot.core.utils.menus import menu, DEFAULT_CONTROLS +from redbot.core import Config, checks, commands, modlog +import discord + +from .utils import * +from .memoizer import Memoizer + +# general +import asyncio +from datetime import datetime +import inspect +import logging +import time +import textwrap + +log = logging.getLogger("red.isolate") + +__version__ = "3.0.0" + +PURGE_MESSAGES = 1 # for cisolate + +DEFAULT_ROLE_NAME = "Isolated" +DEFAULT_TEXT_OVERWRITE = discord.PermissionOverwrite(send_messages=False, send_tts_messages=False, add_reactions=False, read_messages=False) +DEFAULT_VOICE_OVERWRITE = discord.PermissionOverwrite(speak=False, connect=False, view_channel=False) +DEFAULT_TIMEOUT_OVERWRITE = discord.PermissionOverwrite(send_messages=True, read_messages=True) + +QUEUE_TIME_CUTOFF = 30 + +DEFAULT_TIMEOUT = "5m" +DEFAULT_CASE_MIN_LENGTH = "5m" # only create modlog cases when length is longer than this + + +class Isolate(commands.Cog): + """ + Put misbehaving users in timeout where they are unable to speak, read, or + do other things that can be denied using discord permissions. Includes + auto-setup and more. + """ + + def __init__(self, bot): + super().__init__() + + self.bot = bot + self.config = Config.get_conf(self, identifier=1574368792) + # config + default_guild = { + "ISOLATED": {}, + "CASE_MIN_LENGTH": parse_time(DEFAULT_CASE_MIN_LENGTH), + "PENDING_UNMUTE": [], + "TEXT_OVERWRITE": overwrite_to_dict(DEFAULT_TEXT_OVERWRITE), + "VOICE_OVERWRITE": overwrite_to_dict(DEFAULT_VOICE_OVERWRITE), + "ROLE_ID": None, + "NITRO_ID": None, + "CHANNEL_ID": None, + } + self.config.register_guild(**default_guild) + + # queue variables + self.queue = asyncio.PriorityQueue(loop=bot.loop) + self.queue_lock = asyncio.Lock(loop=bot.loop) + self.pending = {} + self.enqueued = set() + + self.task = bot.loop.create_task(self.on_load()) + + def cog_unload(self): + self.task.cancel() + + async def initialize(self): + await self.register_casetypes() + + @staticmethod + async def register_casetypes(): + # register mod case + isolate_case = { + "name": "Timed Mute", + "default_setting": True, + "image": "\N{HOURGLASS WITH FLOWING SAND}\N{SPEAKER WITH CANCELLATION STROKE}", + "case_str": "Timed Mute", + } + try: + await modlog.register_casetype(**isolate_case) + except RuntimeError: + pass + + @commands.group(invoke_without_command=True) + @commands.guild_only() + @checks.mod() + async def isolate(self, ctx, user: discord.Member, duration: str = None, *, reason: str = None): + """ + Puts a user into timeout for a specified time, with optional reason. + + Time specification is any combination of number with the units s,m,h,d,w. + Example: !isolate @idiot 1.1h10m Breaking rules + """ + if ctx.invoked_subcommand: + return + elif user: + await self._isolate_cmd_common(ctx, user, duration, reason) + + @isolate.command(name="cstart") + @commands.guild_only() + @checks.mod() + async def isolate_cstart(self, ctx, user: discord.Member, duration: str = None, *, reason: str = None): + """ + Same as [p]isolate start, but cleans up the target's last message. + """ + + success = await self._isolate_cmd_common(ctx, user, duration, reason, quiet=True) + + if not success: + return + + def check(m): + return m.id == ctx.message.id or m.author == user + + try: + await ctx.message.channel.purge(limit=PURGE_MESSAGES + 1, check=check) + except discord.errors.Forbidden: + await ctx.send("Isolation set, but I need permissions to manage messages to clean up.") + + @isolate.command(name="list") + @commands.guild_only() + @checks.mod() + async def isolate_list(self, ctx): + """ + Shows a table of isolated users with time, mod and reason. + + Displays isolated users, time remaining, responsible moderator and + the reason for isolation, if any. + """ + + guild = ctx.guild + guild_id = guild.id + now = time.time() + headers = ["Member", "Remaining", "Moderator", "Reason"] + isolated = await self.config.guild(guild).ISOLATED() + + embeds = [] + num_p = len(isolated) + for i, data in enumerate(isolated.items()): + member_id, data = data + member_name = getmname(member_id, guild) + moderator = getmname(data["by"], guild) + reason = data["reason"] + until = data["until"] + sort = until or float("inf") + remaining = generate_timespec(until - now, short=True) if until else "forever" + + row = [member_name, remaining, moderator, reason or "No reason set."] + embed = discord.Embed(title="Isolate List", colour=discord.Colour.from_rgb(255, 0, 0)) + + for header, row_val in zip(headers, row): + embed.add_field(name=header, value=row_val) + + embed.set_footer(text=f"Page {i+1} out of {num_p}") + embeds.append(embed) + + if not isolated: + await ctx.send("No users are currently isolated.") + return + + await menu(ctx, embeds, DEFAULT_CONTROLS) + + @isolate.command(name="clean") + @commands.guild_only() + @checks.mod() + async def isolate_clean(self, ctx, clean_pending: bool = False): + """ + Removes absent members from the isolated list. + + If run without an argument, it only removes members who are no longer + present but whose timer has expired. If the argument is 'yes', 1, + or another trueish value, it will also remove absent members whose + timers have yet to expire. + + Use this option with care, as removing them will prevent the isolated + role from being re-added if they rejoin before their timer expires. + """ + + count = 0 + now = time.time() + guild = ctx.guild + data = await self.config.guild(guild).ISOLATED() + + for mid, mdata in data.copy().items(): + if not mid.isdigit() or guild.get_member(mid): + continue + + elif clean_pending or ((mdata["until"] or 0) < now): + del data[mid] + count += 1 + + await self.config.guild(guild).ISOLATED.set(data) + await ctx.send("Cleaned %i absent members from the list." % count) + + @isolate.command(name="clean-bans") + @commands.guild_only() + @checks.mod() + async def isolate_clean_bans(self, ctx): + """ + Removes banned members from the isolated list. + """ + + count = 0 + guild = ctx.guild + data = await self.config.guild(guild).ISOLATED() + + try: + bans = await guild.bans() + ban_ids = {u.id for u in bans} + except discord.errors.Forbidden: + await ctx.send(warning("I need ban permissions to see the list of banned users.")) + return + + for mid, mdata in data.copy().items(): + if not mid.isdigit() or guild.get_member(mid): + continue + + elif mid in ban_ids: + del data[mid] + count += 1 + + await self.config.guild(guild).ISOLATED.set(data) + await ctx.send("Cleaned %i banned users from the list." % count) + + @isolate.command(name="warn") + @commands.guild_only() + @checks.mod_or_permissions(manage_messages=True) + async def isolate_warn(self, ctx, user: discord.Member, *, reason: str = None): + """ + Warns a user with boilerplate about the rules + """ + + msg = ["Hey %s, " % user.mention] + msg.append("you're doing something that might get you muted if you keep " "doing it.") + if reason: + msg.append(" Specifically, %s." % reason) + + msg.append("Be sure to review the guild rules.") + await ctx.send(" ".join(msg)) + + @isolate.command(name="end", aliases=["remove"]) + @commands.guild_only() + @checks.mod() + async def isolate_end(self, ctx, user: discord.Member, *, reason: str = None): + """ + Removes Isolation from a user before time has expired + + This is the same as removing the role directly. + """ + + role = await self.get_role(user.guild, quiet=True) + sid = user.guild.id + guild = user.guild + moderator = ctx.author + now = time.time() + isolated = await self.config.guild(guild).ISOLATED() + data = isolated.get(str(user.id), {}) + removed_roles_parsed = resolve_role_list(guild, data.get("removed_roles", [])) + + if role and role in user.roles: + msg = "Isolation manually ended early by %s." % ctx.author + + original_start = data.get("start") + original_end = data.get("until") + remaining = original_end and (original_end - now) + + if remaining: + msg += " %s was left" % generate_timespec(round(remaining)) + + if original_start: + msg += " of the original %s." % generate_timespec(round(original_end - original_start)) + else: + msg += "." + + if reason: + msg += "\n\nReason for ending early: " + reason + + if data.get("reason"): + msg += "\n\nOriginal reason was: " + data["reason"] + + updated_reason = str(msg) # copy string + + if removed_roles_parsed: + names_list = format_list(*(r.name for r in removed_roles_parsed)) + msg += "\nRestored role(s): {}".format(names_list) + + if not await self._unisolate(user, reason=updated_reason, update=True, moderator=moderator): + msg += "\n\n(failed to send Isolation end notification DM)" + + await ctx.send(msg) + elif data: # This shouldn't happen, but just in case + now = time.time() + until = data.get("until") + remaining = until and generate_timespec(round(until - now)) or "forever" + + data_fmt = "\n".join( + [ + "**Reason:** %s" % (data.get("reason") or "no reason set"), + "**Time remaining:** %s" % remaining, + "**Moderator**: %s" % (user.guild.get_member(data.get("by")) or "Missing ID#%s" % data.get("by")), + ] + ) + del isolated[str(user.id)] + await self.config.guild(guild).ISOLATED.set(isolated) + + await ctx.send( + "That user doesn't have the %s role, but they still have a data entry. I removed it, " + "but in case it's needed, this is what was there:\n\n%s" % (role.name, data_fmt) + ) + elif role: + await ctx.send("That user doesn't have the %s role." % role.name) + else: + await ctx.send("The isolate role couldn't be found in this guild.") + + @isolate.command(name="reason") + @commands.guild_only() + @checks.mod() + async def isolate_reason(self, ctx, user: discord.Member, *, reason: str = None): + """ + Updates the reason for a Isolation, including the modlog if a case exists. + """ + guild = ctx.guild + isolated = await self.config.guild(guild).ISOLATED() + data = isolated.get(str(user.id), None) + + if not data: + await ctx.send( + "That user doesn't have an active Isolation entry. To update modlog " + "cases manually, use the `%sreason` command." % ctx.prefix + ) + return + + isolated[str(user.id)]["reason"] = reason + await self.config.guild(guild).ISOLATED.set(isolated) + + if reason: + msg = "Reason updated." + else: + msg = "Reason cleared." + + caseno = data.get("caseno") + try: + case = await modlog.get_case(caseno, guild, self.bot) + except: + msg += "\nMod case not found!" + case = None + + if case: + moderator = ctx.author + + try: + edits = {"reason": reason} + + if moderator.id != data.get("by"): + edits["amended_by"] = moderator + + edits["modified_at"] = ctx.message.created_at.timestamp() + + await case.edit(edits) + except: + msg += "\n" + warning("Mod case not modified due to error.") + + await ctx.send(msg) + + @commands.group() + @commands.guild_only() + @checks.admin_or_permissions(administrator=True) + async def isolateset(self, ctx): + pass + + @isolateset.command(name="nitro-role") + async def isolateset_nitro_role(self, ctx, *, role: str = None): + """ + Set nitro booster role so its not removed when isolateing. + If your server doesn't have a nitro role, run this command with the role string `no_nitro_role` + """ + guild = ctx.guild + current = await self.config.guild(guild).NITRO_ID() + current = role_from_string(guild, current) + + if role and role.lower() == "no_nitro_role": + await self.config.guild(guild).NITRO_ID.set(role) + await ctx.send("No nitro role set.") + return + + if not role and current: + await ctx.send(f"Nitro role set to {current}") + return + elif not role: + await ctx.send("No nitro role defined.") + return + + role = role_from_string(guild, role) + if not role: + await ctx.send("Role not found!") + return + + await self.config.guild(guild).NITRO_ID.set(role.id) + await ctx.send("Nitro role set!") + + @isolateset.command(name="sync-roles") + async def isolateset_sync_roles(self, ctx): + """ + Applies the remove-roles list to all isolated users + + This operation may take some time to complete, depending on the number of members. + """ + guild = ctx.guild + isolated = await self.config.guild(guild).ISOLATED() + role = await self.config.guild(guild).ROLE_ID() + nitro = await self.config.guild(guild).NITRO_ID() + role_memo = Memoizer(role_from_string, guild) + highest_role = guild.me.top_role + count = 0 + errors = 0 + + if not guild.me.guild_permissions.manage_roles: + await ctx.send(error("I need the Manage Roles permission to do that.")) + return + + # (re)populate the member cache + if guild.large: + await self.bot.request_offline_members(guild) + + for member_id, member_data in isolated.items(): + + member = guild.get_member(member_id) + + if not member: + continue + + member_roles = set(member.roles) + original_roles = member_roles.copy() + + try: + # Combine sets to get the baseline (roles they'd have normally) + member_roles |= set(role_memo.filter(member_data["removed_roles"], skip_nulls=True)) + except KeyError: + pass + + guild_remove_roles = [r for r in member_roles if r.id != role and r.id != nitro and r.name != "@everyone"] + + # update new removed roles with intersection of guild removal list and baseline + new_removed = guild_remove_roles & member_roles + isolated[str(member.id)]["removed_roles"] = [r.id for r in new_removed] + + member_roles -= guild_remove_roles + + # can't restore, so skip (remove from set) + for role in member_roles - original_roles: + if role >= highest_role: + member_roles.discard(role) + + # can't remove, so skip (re-add to set) + for role in original_roles - member_roles: + if role >= highest_role: + member_roles.add(role) + + # Now update roles if we need to + if member_roles != original_roles: + try: + await member.edit(roles=member_roles, reason="isolate sync roles") + except Exception: + log.exception(f"Couldn't modify roles in sync-roles command in {guild.name}!") + errors += 1 + else: + count += 1 + + msg = f"Updated {count} members' roles." + + if errors: + msg += "\n" + warning(f"{errors} errors occured; check the bot logs for more information.") + + await ctx.send(msg) + + @isolateset.command(name="setup") + async def isolateset_setup(self, ctx): + """ + (Re)configures the isolate role and channel overrides + """ + guild = ctx.guild + default_name = DEFAULT_ROLE_NAME + role_id = await self.config.guild(guild).ROLE_ID() + + if role_id: + role = discord.utils.get(guild.roles, id=role_id) + else: + role = discord.utils.get(guild.roles, name=default_name) + + perms = guild.me.guild_permissions + if not perms.manage_roles and perms.manage_channels: + await ctx.send("I need the Manage Roles and Manage Channels permissions for that command to work.") + return + + if not role: + msg = "The %s role doesn't exist; Creating it now... " % default_name + + msgobj = await ctx.send(msg) + + perms = discord.Permissions.none() + role = await guild.create_role(name=default_name, permissions=perms, reason="isolate cog.") + else: + msgobj = await ctx.send("%s role exists... " % role.name) + + if role.position != (guild.me.top_role.position - 1): + if role < guild.me.top_role: + await msgobj.edit(content=msgobj.content + "moving role to higher position... ") + await role.edit(position=guild.me.top_role.position - 1) + else: + await msgobj.edit( + content=msgobj.content + "role is too high to manage." " Please move it to below my highest role." + ) + return + + await msgobj.edit(content=msgobj.content + "(re)configuring channels... ") + + for channel in guild.channels: + await self.setup_channel(channel, role) + + await msgobj.edit(content=msgobj.content + "done.") + + if role and role.id != role_id: + await self.config.guild(guild).ROLE_ID.set(role.id) + + @isolateset.command(name="channel") + async def isolateset_channel(self, ctx, channel: discord.TextChannel = None): + """ + Sets or shows the isolation "timeout" channel. + + This channel has special settings to allow isolated users to discuss their + infraction(s) with moderators. + + If there is a role deny on the channel for the isolate role, it is + automatically set to allow. If the default permissions don't allow the + isolated role to see or speak in it, an overwrite is created to allow + them to do so. + """ + guild = ctx.guild + current = await self.config.guild(guild).CHANNEL_ID() + current = current and guild.get_channel(current) + + if channel is None: + if not current: + await ctx.send("No timeout channel has been set.") + else: + await ctx.send("The timeout channel is currently %s." % current.mention) + else: + if current == channel: + await ctx.send( + "The timeout channel is already %s. If you need to repair its permissions, use `%sisolateset setup`." + % (current.mention, ctx.prefix) + ) + return + + await self.config.guild(guild).CHANNEL_ID.set(channel.id) + + role = await self.get_role(guild, create=True) + update_msg = "{} to the %s role" % role + grants = [] + denies = [] + perms = permissions_for_roles(channel, role) + overwrite = channel.overwrites_for(role) or discord.PermissionOverwrite() + + for perm, value in DEFAULT_TIMEOUT_OVERWRITE: + if value is None: + continue + + if getattr(perms, perm) != value: + setattr(overwrite, perm, value) + name = perm.replace("_", " ").title().replace("Tts", "TTS") + + if value: + grants.append(name) + else: + denies.append(name) + + # Any changes made? Apply them. + if grants or denies: + grants = grants and ("grant " + format_list(*grants)) + denies = denies and ("deny " + format_list(*denies)) + to_join = [x for x in (grants, denies) if x] + update_msg = update_msg.format(format_list(*to_join)) + + if current and current.id != channel.id: + if current.permissions_for(guild.me).manage_roles: + msg = info("Resetting permissions in the old channel (%s) to the default...") + else: + msg = error("I don't have permissions to reset permissions in the old channel (%s)") + + await ctx.send(msg % current.mention) + await self.setup_channel(current, role) + + if channel.permissions_for(guild.me).manage_roles: + await ctx.send(info("Updating permissions in %s to %s..." % (channel.mention, update_msg))) + await channel.set_permissions(role, overwrite=overwrite) + else: + await ctx.send(error("I don't have permissions to %s." % update_msg)) + + await ctx.send("Timeout channel set to %s." % channel.mention) + + @isolateset.command(name="clear-channel") + async def isolateset_clear_channel(self, ctx): + """ + Clears the timeout channel and resets its permissions + """ + guild = ctx.guild + current = await self.config.guild(guild).CHANNEL_ID() + current = current and guild.get_channel(current) + + if current: + msg = None + await self.config.guild(guild).CHANNEL_ID.set(None) + + if current.permissions_for(guild.me).manage_roles: + role = await self.get_role(guild, quiet=True) + await self.setup_channel(current, role) + msg = " and its permissions reset" + else: + msg = ", but I don't have permissions to reset its permissions." + + await ctx.send("Timeout channel has been cleared%s." % msg) + else: + await ctx.send("No timeout channel has been set yet.") + + @isolateset.command(name="case-min") + async def isolateset_case_min(self, ctx, *, timespec: str = None): + """ + Set/disable or display the minimum isolation case duration + + If the isolation duration is less than this value, a case will not be created. + Specify 'disable' to turn off case creation altogether. + """ + guild = ctx.guild + current = await self.config.guild(guild).CASE_MIN_LENGTH() + + if not timespec: + if current: + await ctx.send("Isolations longer than %s will create cases." % generate_timespec(current)) + else: + await ctx.send("Isolation case creation is disabled.") + else: + if timespec.strip("'\"").lower() == "disable": + value = None + else: + try: + value = parse_time(timespec) + except BadTimeExpr as e: + await ctx.send(error(e.args[0])) + return + + await self.config.guild(guild).CASE_MIN_LENGTH.set(value) + + await ctx.send("Isolations longer than %s will create cases." % generate_timespec(value)) + + @isolateset.command(name="overrides") + async def isolateset_overrides(self, ctx, *, channel_id: int = None): + """ + Copy or display the isolate role overrides + + If a channel id is specified, the allow/deny settings for it are saved + and applied to new channels when they are created. To apply the new + settings to existing channels, use [p]isolateset setup. + + An important caveat: voice channel and text channel overrides are + configured separately! To set the overrides for a channel type, + specify the name of or mention a channel of that type. + """ + + guild = ctx.guild + role = await self.get_role(guild, quiet=True) + timeout_channel_id = await self.config.guild(guild).CHANNEL_ID() + confirm_msg = None + channel = guild.get_channel(channel_id) + + if not role: + await ctx.send(error("Isolate role has not been created yet. Run `%sisolateset setup` first." % ctx.prefix)) + return + + if channel: + overwrite = channel.overwrites_for(role) + if channel.id == timeout_channel_id: + confirm_msg = "Are you sure you want to copy overrides from the timeout channel?" + elif overwrite is None: + overwrite = discord.PermissionOverwrite() + confirm_msg = "Are you sure you want to copy blank (no permissions set) overrides?" + else: + confirm_msg = "Are you sure you want to copy overrides from this channel?" + + if channel.type is discord.ChannelType.text: + key = "text" + elif channel.type is discord.ChannelType.voice: + key = "voice" + else: + await ctx.send(error("Unknown channel type!")) + return + + if confirm_msg: + await ctx.send(warning(confirm_msg + "(reply `yes` within 30s to confirm)")) + + def check(m): + return m.author == ctx.author and m.channel == ctx.channel + + try: + reply = await self.bot.wait_for("message", check=check, timeout=30.0) + if reply.content.strip(" `\"'").lower() != "yes": + await ctx.send("Commmand cancelled.") + return + except asyncio.TimeoutError: + await ctx.send("Timed out waiting for a response.") + return + + if key == "text": + await self.config.guild(guild).TEXT_OVERWRITE.set(overwrite_to_dict(overwrite)) + else: + await self.config.guild(guild).VOICE_OVERWRITE.set(overwrite_to_dict(overwrite)) + + await ctx.send( + "{} channel overrides set to:\n".format(key.title()) + + format_permissions(overwrite) + + "\n\nRun `%sisolateset setup` to apply them to all channels." % ctx.prefix + ) + else: + msg = [] + for key in ("text", "voice"): + if key == "text": + data = await self.config.guild(guild).TEXT_OVERWRITE() + else: + data = await self.config.guild(guild).VOICE_OVERWRITE() + title = "%s permission overrides:" % key.title() + + if data == overwrite_to_dict(DEFAULT_TEXT_OVERWRITE) or data == overwrite_to_dict( + DEFAULT_VOICE_OVERWRITE + ): + title = title[:-1] + " (defaults):" + + msg.append(bold(title) + "\n" + format_permissions(overwrite_from_dict(data))) + + await ctx.send("\n\n".join(msg)) + + @isolateset.command(name="reset-overrides") + async def isolateset_reset_overrides(self, ctx, channel_type: str = "both"): + """ + Resets the isolate role overrides for text, voice or both (default) + + This command exists in case you want to restore the default settings + for newly created channels. + """ + + channel_type = channel_type.strip("`\"' ").lower() + + msg = [] + for key in ("text", "voice"): + if channel_type not in ["both", key]: + continue + + title = "%s permission overrides reset to:" % key.title() + + if key == "text": + await self.config.guild(guild).TEXT_OVERWRITE.set(overwrite_to_dict(DEFAULT_TEXT_OVERWRITE)) + msg.append(bold(title) + "\n" + format_permissions(overwrite_to_dict(DEFAULT_TEXT_OVERWRITE))) + else: + await self.config.guild(guild).VOICE_OVERWRITE.set(overwrite_to_dict(DEFAULT_VOICE_OVERWRITE)) + msg.append(bold(title) + "\n" + format_permissions(overwrite_to_dict(DEFAULT_VOICE_OVERWRITE))) + + if not msg: + await ctx.send("Invalid channel type. Use `text`, `voice`, or `both` (the default, if not specified)") + return + + msg.append("Run `%sisolateset setup` to apply them to all channels." % ctx.prefix) + + await ctx.send("\n\n".join(msg)) + + async def get_role(self, guild, quiet=False, create=False): + role_id = await self.config.guild(guild).ROLE_ID() + + if role_id: + role = discord.utils.get(guild.roles, id=role_id) + else: + role = discord.utils.get(guild.roles, name=DEFAULT_ROLE_NAME) + + if create and not role: + perms = guild.me.guild_permissions + if not perms.manage_roles and perms.manage_channels: + await ctx.send("The Manage Roles and Manage Channels permissions are required to use this command.") + return + + else: + msg = "The %s role doesn't exist; Creating it now..." % DEFAULT_ROLE_NAME + + if not quiet: + msgobj = await ctx.send(msg) + + log.debug("Creating isolate role in %s" % guild.name) + perms = discord.Permissions.none() + role = await guild.create_role(name=DEFAULT_ROLE_NAME, permissions=perms, reason="isolate cog.") + await role.edit(position=guild.me.top_role.position - 1) + + if not quiet: + await msgobj.edit(content=msgobj.content + "\nconfiguring channels... ") + + for channel in guild.channels: + await self.setup_channel(channel, role) + + if not quiet: + await msgobj.edit(content=msgobj.content + "\ndone.") + + if role and role.id != role_id: + await self.config.guild(guild).ROLE_ID.set(role.id) + + return role + + async def setup_channel(self, channel, role): + guild = channel.guild + timeout_channel_id = await self.config.guild(guild).CHANNEL_ID() + + if channel.id == timeout_channel_id: + # maybe this will be used later: + # config = settings.get('TIMEOUT_OVERWRITE') + config = None + defaults = DEFAULT_TIMEOUT_OVERWRITE + elif channel.type is discord.ChannelType.voice: + config = await self.config.guild(guild).VOICE_OVERWRITE() + defaults = DEFAULT_VOICE_OVERWRITE + else: + config = await self.config.guild(guild).TEXT_OVERWRITE() + defaults = DEFAULT_TEXT_OVERWRITE + + if config: + perms = overwrite_from_dict(config) + else: + perms = defaults + + await channel.set_permissions(role, overwrite=perms, reason="isolate cog") + + async def on_load(self): + await self.bot.wait_until_ready() + + for guild in self.bot.guilds: + me = guild.me + role = await self.get_role(guild, quiet=True, create=True) + + if not role: + log.error("Needed to create isolate role in %s, but couldn't." % guild.name) + continue + + role_memo = Memoizer(role_from_string, guild) + isolated = await self.config.guild(guild).ISOLATED() + + for member_id, data in isolated.items(): + + until = data["until"] + member = guild.get_member(member_id) + + if until and (until - time.time()) < 0: + if member: + reason = "Isolation removal overdue, maybe the bot was offline. " + + if data["reason"]: + reason += data["reason"] + + await self._unisolate(member, reason=reason) + else: # member disappeared + del isolated[str(member_id)] + elif member: + # re-check roles + user_roles = set(member.roles) + removed_roles = set(role_memo.filter(data.get("removed_roles", ()), skip_nulls=True)) + removed_roles = user_roles & {r for r in removed_roles if r < me.top_role} + user_roles -= removed_roles + + apply_roles = removed_roles + + if role not in user_roles: + if role >= me.top_role: + log.error("Needed to re-add isolate role to %s in %s, but couldn't." % (member, guild.name)) + else: + user_roles.add(role) # add isolate role to the set + apply_roles = True + + if apply_roles: + await member.edit(roles=member_roles, reason="isolate ending") + + if until: + await self.schedule_unisolate(until, member) + + while True: + try: + async with self.queue_lock: + while await self.process_queue_event(): + pass + + await asyncio.sleep(5) + + except asyncio.CancelledError: + break + except Exception: + pass + + log.debug("queue manager dying") + + while not self.queue.empty(): + self.queue.get_nowait() + + for fut in self.pending.values(): + fut.cancel() + + async def cancel_queue_event(self, *args) -> bool: + if args in self.pending: + self.pending.pop(args).cancel() + return True + else: + events = [] + removed = None + + async with self.queue_lock: + while not self.queue.empty(): + item = self.queue.get_nowait() + + if args == item[1:]: + removed = item + break + else: + events.append(item) + + for item in events: + self.queue.put_nowait(item) + + return removed is not None + + async def put_queue_event(self, run_at: float, *args): + diff = run_at - time.time() + + if args in self.enqueued: + return False + + self.enqueued.add(args) + + if diff < 0: + self.execute_queue_event(*args) + elif run_at - time.time() < QUEUE_TIME_CUTOFF: + self.pending[args] = self.bot.loop.call_later(diff, self.execute_queue_event, *args) + else: + await self.queue.put((run_at, *args)) + + async def process_queue_event(self): + if self.queue.empty(): + return False + + now = time.time() + item = await self.queue.get() + next_time, *args = item + + diff = next_time - now + + if diff < 0: + if self.execute_queue_event(*args): + return + elif diff < QUEUE_TIME_CUTOFF: + self.pending[args] = self.bot.loop.call_later(diff, self.execute_queue_event, *args) + return True + + await self.queue.put(item) + return False + + def execute_queue_event(self, *args) -> bool: + self.enqueued.discard(args) + + try: + return self.execute_unisolate(*args) + except Exception: + log.exception("failed to execute scheduled event") + + async def _isolate_cmd_common(self, ctx, member, duration, reason, quiet=False): + guild = ctx.guild + using_default = False + updating_case = False + case_error = None + + isolated = await self.config.guild(guild).ISOLATED() + current = isolated.get(str(member.id), {}) + reason = reason or current.get("reason") # don't clear if not given + hierarchy_allowed = ctx.author.top_role > member.top_role + case_min_length = await self.config.guild(guild).CASE_MIN_LENGTH() + nitro_role = await self.config.guild(guild).NITRO_ID() + + if nitro_role is None: + await ctx.send(f"Please set the nitro role using `{ctx.prefix}isolateset nitro-role`") + return + + if member == guild.me: + await ctx.send("You can't isolate the bot.") + return + + if duration and duration.lower() in ["forever", "inf", "infinite"]: + duration = None + else: + if not duration: + using_default = True + duration = DEFAULT_TIMEOUT + try: + duration = parse_time(duration) + if duration < 1: + await ctx.send("Duration must be 1 second or longer.") + return False + except BadTimeExpr as e: + await ctx.send("Error parsing duration: %s." % e.args) + return False + + role = await self.get_role(guild, quiet=quiet, create=True) + + if role is None: + return + elif role >= guild.me.top_role: + await ctx.send("The %s role is too high for me to manage." % role) + return + + # Call time() after getting the role due to potential creation delay + now = time.time() + until = (now + duration + 0.5) if duration else None + duration_ok = (case_min_length is not None) and ((duration is None) or duration >= case_min_length) + + if duration_ok: + now_date = datetime.utcfromtimestamp(now) + mod_until = until and datetime.utcfromtimestamp(until) + + try: + if current: + case_number = current.get("caseno") + try: + case = await modlog.get_case(case_number, guild, self.bot) + except: # shouldn't happen + await ctx.send( + warning( + "Error, modlog case not found, but user is isolated with case.\nTry unisolating and isolating again." + ) + ) + return + + moderator = ctx.author + + try: + edits = {"reason": reason} + + if moderator.id != current.get("by"): + edits["amended_by"] = moderator + + edits["modified_at"] = ctx.message.created_at.timestamp() + + await case.edit(edits) + except Exception as e: + await ctx.send(warning(f"Couldn't edit case: {e}")) + return + + updating_case = True + + else: + case = await modlog.create_case( + self.bot, + guild, + now_date, + "Timed Mute", + member, + moderator=ctx.author, + reason=reason, + until=mod_until, + ) + case_number = case.case_number + + except Exception as e: + case_error = e + else: + case_number = None + + subject = "the %s role" % role.name + + if str(member.id) in isolated: + if role in member.roles: + msg = "{0} already had the {1.name} role; resetting their timer." + else: + msg = "{0} is missing the {1.name} role for some reason. I added it and reset their timer." + elif role in member.roles: + msg = "{0} already had the {1.name} role, but had no timer; setting it now." + else: + msg = "Applied the {1.name} role to {0}." + subject = "it" + + msg = msg.format(member, role) + + if duration: + timespec = generate_timespec(duration) + + if using_default: + timespec += " (the default)" + + msg += " I will remove %s in %s." % (subject, timespec) + + if case_error: + if isinstance(case_error, CaseMessageNotFound): + case_error = "the case message could not be found" + elif isinstance(case_error, NoModLogAccess): + case_error = "I do not have access to the modlog channel" + else: + case_error = None + + if case_error: + verb = "updating" if updating_case else "creating" + msg += "\n\n" + warning("There was an error %s the modlog case: %s." % (verb, case_error)) + elif case_number: + verb = "updated" if updating_case else "created" + msg += " I also %s case #%i in the modlog." % (verb, case_number) + + voice_overwrite = await self.config.guild(guild).VOICE_OVERWRITE() + + if voice_overwrite: + voice_overwrite = overwrite_from_dict(voice_overwrite) + else: + voice_overwrite = DEFAULT_VOICE_OVERWRITE + + overwrite_denies_speak = (voice_overwrite.speak is False) or (voice_overwrite.connect is False) + + # remove all roles from user that are specified in remove_role_list, only if its a new isolati + if str(member.id) not in isolated: + user_roles = {r for r in member.roles if r.name != "@everyone"} + removed_roles = user_roles.copy() + if nitro_role != "no_nitro_role": + nitro_role = role_from_string(guild, nitro_role) + removed_roles.discard(nitro_role) + + # build lists of roles that *should* be removed and ones that *can* be + too_high_to_remove = {r for r in removed_roles if r >= guild.me.top_role} + user_roles -= removed_roles - too_high_to_remove + user_roles.add(role) # add isolate role to the set + await member.edit(roles=user_roles, reason=f"isolate {member}") + + else: + removed_roles = set(resolve_role_list(guild, current.get("removed_roles", []))) + too_high_to_remove = {r for r in removed_roles if r >= guild.me.top_role} + + if removed_roles: + actually_removed = removed_roles - too_high_to_remove + if actually_removed: + msg += "\nRemoved roles: {}".format(format_list(*(r.name for r in actually_removed))) + + if too_high_to_remove: + fmt_list = format_list(*(r.name for r in removed_roles)) + msg += "\n" + warning( + "These roles were too high to remove (fix hierarchy, then run " + "`{}isolateset sync-roles`): {}".format(ctx.prefix, fmt_list) + ) + if member.voice: + muted = member.voice.mute + else: + muted = False + + async with self.config.guild(guild).ISOLATED() as isolated: + isolated[str(member.id)] = { + "start": current.get("start") or now, # don't override start time if updating + "until": until, + "by": current.get("by") or ctx.author.id, # don't override original moderator + "reason": reason, + "unmute": overwrite_denies_speak and not muted, + "caseno": case_number, + "removed_roles": [r.id for r in removed_roles], + } + + if member.voice and overwrite_denies_speak: + if member.voice.channel: + await member.edit(mute=True, deafen=True) + + # schedule callback for role removal + if until: + await self.schedule_unisolate(until, member) + + if not quiet: + await ctx.send(msg) + + return True + + # Functions related to unisolateing + + async def schedule_unisolate(self, until, member): + """ + Schedules role removal, canceling and removing existing tasks if present + """ + + await self.put_queue_event(until, member.guild.id, member.id) + + def execute_unisolate(self, guild_id, member_id) -> bool: + guild = self.bot.get_guild(guild_id) + + if not guild: + return False + + member = guild.get_member(member_id) + + if member: + self.bot.loop.create_task(self._unisolate(member)) + return True + else: + self.bot.loop.create_task(self.bot.request_offline_members(guild)) + return False + + async def _unisolate(self, member, reason=None, apply_roles=True, update=False, moderator=None, quiet=False) -> bool: + """ + Remove isolate role, delete record and task handle + """ + guild = member.guild + role = await self.get_role(guild, quiet=True) + nitro_role = await self.config.guild(guild).NITRO_ID() + + if role: + data = await self.config.guild(guild).ISOLATED() + member_data = data.get(str(member.id), {}) + caseno = member_data.get("caseno") + removed_roles = set(resolve_role_list(guild, member_data.get("removed_roles", []))) + + # Has to be done first to prevent triggering listeners + await self._unisolate_data(member) + await self.cancel_queue_event(member.guild.id, member.id) + + if apply_roles: + + # readd removed roles from user, by replacing user's roles with all of their roles plus the ones that + # were removed (and can be re-added), minus the isolate role + user_roles = set(member.roles) + too_high_to_restore = {r for r in removed_roles if r >= guild.me.top_role} + removed_roles -= too_high_to_restore + user_roles |= removed_roles + user_roles.discard(role) + await member.edit(roles=user_roles, reason="isolate end") + + if update and caseno: + until = member_data.get("until") or False + # fallback gracefully + moderator = moderator or guild.get_member(member_data.get("by")) or guild.me + + if until: + until = datetime.utcfromtimestamp(until).timestamp() + + edits = {"reason": reason} + + if moderator.id != data.get("by"): + edits["amended_by"] = moderator + + edits["modified_at"] = time.time() + edits["until"] = until + + try: + case = await modlog.get_case(caseno, guild, self.bot) + await case.edit(edits) + except Exception: + pass + + if member_data.get("unmute", False): + if member.voice: + if member.voice.channel: + await member.edit(mute=False, deafen=False) + else: + async with self.config.guild(guild).PENDING_UNMUTE() as unmute_list: + if member.id not in unmute_list: + unmute_list.append(member.id) + + if quiet: + return True + + msg = "Your Isolation in %s has ended." % member.guild.name + + if reason: + msg += "\nReason: %s" % reason + + if removed_roles: + msg += "\n\nRestored roles: {}.".format(format_list(*(r.name for r in removed_roles))) + + if too_high_to_restore: + fmt_list = format_list(*(r.name for r in too_high_to_restore)) + msg += "\n" + warning( + "These roles were too high for me to restore: {}. " "Ask a mod for help.".format(fmt_list) + ) + + try: + await member.send(msg) + return True + except Exception: + return False + + async def _unisolate_data(self, member): + """Removes isolate data entry and cancels any present callback""" + guild = member.guild + + async with self.config.guild(guild).ISOLATED() as isolated: + if str(member.id) in isolated: + del isolated[str(member.id)] + + # Listeners + @commands.Cog.listener() + async def on_guild_channel_create(self, channel): + """Run when new channels are created and set up role permissions""" + role = await self.get_role(channel.guild, quiet=True) + if not role: + return + + await self.setup_channel(channel, role) + + @commands.Cog.listener() + async def on_member_update(self, before, after): + """Remove scheduled unisolate when manually removed role""" + try: + assert before.roles != after.roles + guild_data = await self.config.guild(before.guild).ISOLATED() + member_data = guild_data[str(before.id)] + role = await self.get_role(before.guild, quiet=True) + assert role + except (KeyError, AssertionError): + return + + new_roles = {role.id: role for role in after.roles} + + if role in before.roles and role.id not in new_roles: + msg = "Isolation manually ended early by a moderator/admin." + + if member_data["reason"]: + msg += "\nReason was: " + member_data["reason"] + + await self._unisolate(after, reason=msg, update=True) + else: + to_remove = {new_roles.get(role_id) for role_id in member_data.get("removed_roles", [])} + to_remove = [r for r in to_remove if r and r < after.guild.me.top_role] + + if to_remove: + await after.remove_roles(*to_remove) + + @commands.Cog.listener() + async def on_member_join(self, member): + """Restore Isolation if isolated user leaves/rejoins""" + guild = member.guild + isolated = await self.config.guild(guild).ISOLATED() + data = isolated.get(str(member.id), {}) + + if not data: + return + + # give other tools a chance to settle, then re-fetch data just in case + await asyncio.sleep(1) + member = self.bot.get_guild(guild.id).get_member(member.id) + role = await self.get_role(member.guild, quiet=True) + + until = data["until"] + duration = until - time.time() + + if role and duration > 0: + await self.schedule_unisolate(until, member) + + if role not in member.roles: + await member.add_roles(role) + + @commands.Cog.listener() + async def on_voice_state_update(self, member, before, after): + if not after.channel: + return + + guild = member.guild + data = await self.config.guild(guild).ISOLATED() + member_data = data.get(str(member.id), {}) + unmute_list = await self.config.guild(guild).PENDING_UNMUTE() + + if member_data and not after.mute: + await member.edit(mute=True, deafen=True) + elif member.id in unmute_list: + await member.edit(mute=False, deafen=False) + if member.id in unmute_list: + unmute_list.remove(member.id) + + await self.config.guild(guild).PENDING_UNMUTE.set(unmute_list) + + @commands.Cog.listener() + async def on_member_ban(self, member): + """Remove Isolation record when member is banned.""" + guild = member.guild + data = await self.config.guild(guild).ISOLATED() + member_data = data.get(str(member.id)) + + if member_data is None: + return + + msg = "Isolation ended early due to ban." + + if member_data.get("reason"): + msg += "\n\nOriginal reason was: " + member_data["reason"] + + await self._unisolate(member, reason=msg, apply_roles=False, update=True, quiet=True) diff --git a/isolate/memoizer.py b/isolate/memoizer.py new file mode 100644 index 0000000..811f953 --- /dev/null +++ b/isolate/memoizer.py @@ -0,0 +1,40 @@ +class Memoizer: + """ + General purpose cache for function results. Appends positional args, overlays kwargs. Both must be hashable. + """ + + __slots__ = ["_cache", "_func", "_args", "_kwargs"] + + def __init__(self, func, *args, **kwargs): + self._cache = {} + self._func = func + self._args = args + self._kwargs = kwargs + + def clear(self): + "clears the internal arg -> result cache" + self._cache.clear() + + def filter(self, iterable, *, skip_nulls=False): + "Calls the function on each item in the passed iterable. Only one positional arg at a time is supported." + gen = map(self, iterable) + + if skip_nulls: + return filter(None, gen) + else: + return gen + + def __call__(self, *args, **kwargs): + if kwargs: + key = (*args, tuple(kwargs.items())) + call_kwargs = self._kwargs.copy() + call_kwargs.update(kwargs) + else: + key = args + call_kwargs = self._kwargs + + try: + return self._cache[key] + except KeyError: + ret = self._cache[key] = self._func(*self._args, *args, **call_kwargs) + return ret diff --git a/isolate/utils.py b/isolate/utils.py new file mode 100644 index 0000000..cc493e4 --- /dev/null +++ b/isolate/utils.py @@ -0,0 +1,221 @@ +import re +import discord + +UNIT_TABLE = ( + (("weeks", "wks", "w"), 60 * 60 * 24 * 7), + (("days", "dys", "d"), 60 * 60 * 24), + (("hours", "hrs", "h"), 60 * 60), + (("minutes", "mins", "m"), 60), + (("seconds", "secs", "s"), 1), +) + + +class BadTimeExpr(Exception): + pass + + +def _find_unit(unit): + for names, length in UNIT_TABLE: + if any(n.startswith(unit) for n in names): + return names, length + raise BadTimeExpr("Invalid unit: %s" % unit) + + +def parse_time(time): + time = time.lower() + if not time.isdigit(): + time = re.split(r"\s*([\d.]+\s*[^\d\s,;]*)(?:[,;\s]|and)*", time) + time = sum(map(_timespec_sec, filter(None, time))) + return int(time) + + +def _timespec_sec(expr): + atoms = re.split(r"([\d.]+)\s*([^\d\s]*)", expr) + atoms = list(filter(None, atoms)) + + if len(atoms) > 2: # This shouldn't ever happen + raise BadTimeExpr("invalid expression: '%s'" % expr) + elif len(atoms) == 2: + names, length = _find_unit(atoms[1]) + if atoms[0].count(".") > 1 or not atoms[0].replace(".", "").isdigit(): + raise BadTimeExpr("Not a number: '%s'" % atoms[0]) + else: + names, length = _find_unit("seconds") + + try: + return float(atoms[0]) * length + except ValueError: + raise BadTimeExpr("invalid value: '%s'" % atoms[0]) + + +def generate_timespec(sec: int, short=False, micro=False) -> str: + timespec = [] + sec = int(sec) + neg = sec < 0 + sec = abs(sec) + + for names, length in UNIT_TABLE: + n, sec = divmod(sec, length) + + if n: + if micro: + s = "%d%s" % (n, names[2]) + elif short: + s = "%d%s" % (n, names[1]) + else: + s = "%d %s" % (n, names[0]) + + if n <= 1 and not (micro and names[2] == "s"): + s = s.rstrip("s") + + timespec.append(s) + + if len(timespec) > 1: + if micro: + spec = "".join(timespec) + + segments = timespec[:-1], timespec[-1:] + spec = " and ".join(", ".join(x) for x in segments) + elif timespec: + spec = timespec[0] + else: + return "0" + + if neg: + spec += " ago" + + return spec + + +def format_list(*items, join="and", delim=", "): + if len(items) > 1: + return (" %s " % join).join((delim.join(items[:-1]), items[-1])) + elif items: + return items[0] + else: + return "" + + +def overwrite_to_dict(overwrite): + allow, deny = overwrite.pair() + return {"allow": allow.value, "deny": deny.value} + + +def format_permissions(permissions, include_null=False): + entries = [] + + for perm, value in sorted(permissions, key=lambda t: t[0]): + if value is True: + symbol = "\N{WHITE HEAVY CHECK MARK}" + elif value is False: + symbol = "\N{NO ENTRY SIGN}" + elif include_null: + symbol = "\N{RADIO BUTTON}" + else: + continue + + entries.append(symbol + " " + perm.replace("_", " ").title().replace("Tts", "TTS")) + + if entries: + return "\n".join(entries) + else: + return "No permission entries." + + +def getmname(mid, guild): + member = discord.utils.get(guild.members, id=int(mid)) + + if member: + return str(member) + else: + return "(absent user #%s)" % mid + + +def role_from_string(guild, rolename, roles=None): + if rolename is None: + return None + + if roles is None: + roles = guild.roles + else: + roles = [r for r in roles if isinstance(r, discord.Role)] + + if type(rolename) == int: + role = discord.utils.get(roles, id=rolename) + + if role: + return role + + rolename = rolename.lower() + role = discord.utils.find(lambda r: r.name.lower() == rolename, roles) + + return role + + +def resolve_role_list(guild: discord.guild, roles: list) -> list: + gen = (role_from_string(guild, name) for name in roles) + return list(filter(None, gen)) + + +def permissions_for_roles(channel, *roles): + """ + Calculates the effective permissions for a role or combination of roles. + Naturally, if no roles are given, the default role's permissions are used + """ + default = channel.guild.default_role + base = discord.Permissions(default.permissions.value) + + # Apply all role values + for role in roles: + base.value |= role.permissions.value + + # guild-wide Administrator -> True for everything + # Bypass all channel-specific overrides + if base.administrator: + return discord.Permissions.all() + + role_ids = set(map(lambda r: r.id, roles)) + denies = 0 + allows = 0 + + # Apply channel specific role permission overwrites + for target, overwrite in channel.overwrites.items(): + # Handle default role first, if present + if overwrite.id == default.id: + base.handle_overwrite(allow=overwrite.allow, deny=overwrite.deny) + + if isinstance(target, discord.Role) and target.id in role_ids: + denies |= overwrite.deny + allows |= overwrite.allow + + base.handle_overwrite(allow=allows, deny=denies) + + # default channels can always be read + if channel.is_default: + base.read_messages = True + + # if you can't send a message in a channel then you can't have certain + # permissions as well + if not base.send_messages: + base.send_tts_messages = False + base.mention_everyone = False + base.embed_links = False + base.attach_files = False + + # if you can't read a channel then you have no permissions there + if not base.read_messages: + denied = discord.Permissions.all_channel() + base.value &= ~denied.value + + # text channels do not have voice related permissions + if channel.type is discord.ChannelType.text: + denied = discord.Permissions.voice() + base.value &= ~denied.value + + return base + + +def overwrite_from_dict(data): + allow = discord.Permissions(data.get("allow", 0)) + deny = discord.Permissions(data.get("deny", 0)) + return discord.PermissionOverwrite.from_pair(allow, deny) diff --git a/punish/punish.py b/punish/punish.py index 953c759..2df0b12 100644 --- a/punish/punish.py +++ b/punish/punish.py @@ -1199,9 +1199,9 @@ class Punish(commands.Cog): if str(member.id) not in punished: if nitro_role != "no_nitro_role": nitro_role = role_from_string(guild, nitro_role) - user_roles = {r for r in member.roles if r is not nitro_role} - else: - user_roles = {r for r in member.roles} + remove_role_set.discard(nitro_role) + + user_roles = set(member.roles) # build lists of roles that *should* be removed and ones that *can* be removed_roles = user_roles & remove_role_set too_high_to_remove = {r for r in removed_roles if r >= guild.me.top_role} @@ -1296,15 +1296,10 @@ class Punish(commands.Cog): await self.cancel_queue_event(member.guild.id, member.id) if apply_roles: - if nitro_role != "no_nitro_role": - nitro_role = role_from_string(guild, nitro_role) - user_roles = {r for r in member.roles if r is not nitro_role} - else: - user_roles = {r for r in member.roles} # readd removed roles from user, by replacing user's roles with all of their roles plus the ones that # were removed (and can be re-added), minus the punish role - user_roles = {r for r in member.roles if r is not nitro_role} + user_roles = set(member.roles) too_high_to_restore = {r for r in removed_roles if r >= guild.me.top_role} removed_roles -= too_high_to_restore user_roles |= removed_roles diff --git a/punish/utils.py b/punish/utils.py index 0a77ab1..cc493e4 100644 --- a/punish/utils.py +++ b/punish/utils.py @@ -179,12 +179,12 @@ def permissions_for_roles(channel, *roles): allows = 0 # Apply channel specific role permission overwrites - for overwrite in channel.overwrites: + for target, overwrite in channel.overwrites.items(): # Handle default role first, if present if overwrite.id == default.id: base.handle_overwrite(allow=overwrite.allow, deny=overwrite.deny) - if overwrite.type == "role" and overwrite.id in role_ids: + if isinstance(target, discord.Role) and target.id in role_ids: denies |= overwrite.deny allows |= overwrite.allow