# redbot/discord from redbot.core.utils.chat_formatting import * from redbot.core.utils import mod from redbot.core.utils.menus import menu, DEFAULT_CONTROLS from redbot.core import Config, checks, commands, modlog import discord from .utils import * from .memoizer import Memoizer # general import asyncio from datetime import datetime from typing import Literal import inspect import logging import time import textwrap log = logging.getLogger("red.isolate") __version__ = "3.0.0" PURGE_MESSAGES = 1 # for cisolate DEFAULT_ROLE_NAME = "Isolated" DEFAULT_TEXT_OVERWRITE = discord.PermissionOverwrite( send_messages=False, send_tts_messages=False, add_reactions=False, read_messages=False ) DEFAULT_VOICE_OVERWRITE = discord.PermissionOverwrite(speak=False, connect=False, view_channel=False) DEFAULT_TIMEOUT_OVERWRITE = discord.PermissionOverwrite( send_messages=True, read_messages=True, read_message_history=True ) QUEUE_TIME_CUTOFF = 30 DEFAULT_TIMEOUT = "5m" DEFAULT_CASE_MIN_LENGTH = "5m" # only create modlog cases when length is longer than this class Isolate(commands.Cog): """ Put misbehaving users in timeout where they are unable to speak, read, or do other things that can be denied using discord permissions. Includes auto-setup and more. """ def __init__(self, bot): super().__init__() self.bot = bot self.config = Config.get_conf(self, identifier=1574368792) # config default_guild = { "ISOLATED": {}, "CASE_MIN_LENGTH": parse_time(DEFAULT_CASE_MIN_LENGTH), "PENDING_UNMUTE": [], "TEXT_OVERWRITE": overwrite_to_dict(DEFAULT_TEXT_OVERWRITE), "VOICE_OVERWRITE": overwrite_to_dict(DEFAULT_VOICE_OVERWRITE), "ROLE_ID": None, "NITRO_ID": None, "CHANNEL_ID": None, } self.config.register_guild(**default_guild) # queue variables self.queue = asyncio.PriorityQueue() self.queue_lock = asyncio.Lock() self.pending = {} self.enqueued = set() self.task = asyncio.create_task(self.on_load()) def cog_unload(self): self.task.cancel() async def initialize(self): await self.register_casetypes() @staticmethod async def register_casetypes(): # register mod case isolate_case = { "name": "Timed Mute", "default_setting": True, "image": "\N{HOURGLASS WITH FLOWING SAND}\N{SPEAKER WITH CANCELLATION STROKE}", "case_str": "Timed Mute", } try: await modlog.register_casetype(**isolate_case) except RuntimeError: pass @commands.group(invoke_without_command=True) @commands.guild_only() @checks.mod() async def isolate(self, ctx, user: discord.Member, duration: str = None, *, reason: str = None): """ Puts a user into timeout for a specified time, with optional reason. Time specification is any combination of number with the units s,m,h,d,w. Example: !isolate @idiot 1.1h10m Breaking rules """ if ctx.invoked_subcommand: return elif user: await self._isolate_cmd_common(ctx, user, duration, reason) @isolate.command(name="cstart") @commands.guild_only() @checks.mod() async def isolate_cstart(self, ctx, user: discord.Member, duration: str = None, *, reason: str = None): """ Same as [p]isolate start, but cleans up the target's last message. """ success = await self._isolate_cmd_common(ctx, user, duration, reason, quiet=True) if not success: return def check(m): return m.id == ctx.message.id or m.author == user try: await ctx.message.channel.purge(limit=PURGE_MESSAGES + 1, check=check) except discord.errors.Forbidden: await ctx.send("Isolation set, but I need permissions to manage messages to clean up.") @isolate.command(name="list") @commands.guild_only() @checks.mod() async def isolate_list(self, ctx): """ Shows a table of isolated users with time, mod and reason. Displays isolated users, time remaining, responsible moderator and the reason for isolation, if any. """ guild = ctx.guild guild_id = guild.id now = time.time() headers = ["Member", "Remaining", "Moderator", "Reason"] isolated = await self.config.guild(guild).ISOLATED() embeds = [] num_p = len(isolated) for i, data in enumerate(isolated.items()): member_id, data = data member_name = getmname(member_id, guild) moderator = getmname(data["by"], guild) reason = data["reason"] until = data["until"] sort = until or float("inf") remaining = generate_timespec(until - now, short=True) if until else "forever" row = [member_name, remaining, moderator, reason or "No reason set."] embed = discord.Embed(title="Isolate List", colour=discord.Colour.from_rgb(255, 0, 0)) for header, row_val in zip(headers, row): embed.add_field(name=header, value=row_val) embed.set_footer(text=f"Page {i+1} out of {num_p}") embeds.append(embed) if not isolated: await ctx.send("No users are currently isolated.") return await menu(ctx, embeds, DEFAULT_CONTROLS) @isolate.command(name="clean") @commands.guild_only() @checks.mod() async def isolate_clean(self, ctx, clean_pending: bool = False): """ Removes absent members from the isolated list. If run without an argument, it only removes members who are no longer present but whose timer has expired. If the argument is 'yes', 1, or another trueish value, it will also remove absent members whose timers have yet to expire. Use this option with care, as removing them will prevent the isolated role from being re-added if they rejoin before their timer expires. """ count = 0 now = time.time() guild = ctx.guild data = await self.config.guild(guild).ISOLATED() for mid, mdata in data.copy().items(): intid = int(mid) if guild.get_member(intid): continue elif clean_pending or ((mdata["until"] or 0) < now): del data[mid] count += 1 await self.config.guild(guild).ISOLATED.set(data) await ctx.send("Cleaned %i absent members from the list." % count) @isolate.command(name="clean-bans") @commands.guild_only() @checks.mod() @checks.bot_has_permissions(ban_members=True) async def isolate_clean_bans(self, ctx): """ Removes banned members from the isolated list. """ count = 0 guild = ctx.guild data = await self.config.guild(guild).ISOLATED() bans = await guild.bans() ban_ids = {ban.user.id for ban in bans} for mid, mdata in data.copy().items(): intid = int(mid) if guild.get_member(intid): continue elif intid in ban_ids: del data[mid] count += 1 await self.config.guild(guild).ISOLATED.set(data) await ctx.send("Cleaned %i banned users from the list." % count) @isolate.command(name="warn") @commands.guild_only() @checks.mod_or_permissions(manage_messages=True) async def isolate_warn(self, ctx, user: discord.Member, *, reason: str = None): """ Warns a user with boilerplate about the rules """ msg = ["Hey %s, " % user.mention] msg.append("you're doing something that might get you muted if you keep " "doing it.") if reason: msg.append(" Specifically, %s." % reason) msg.append("Be sure to review the guild rules.") await ctx.send(" ".join(msg)) @isolate.command(name="end", aliases=["remove"]) @commands.guild_only() @checks.mod() async def isolate_end(self, ctx, user: discord.Member, *, reason: str = None): """ Removes Isolation from a user before time has expired This is the same as removing the role directly. """ role = await self.get_role(user.guild, quiet=True) sid = user.guild.id guild = user.guild moderator = ctx.author now = time.time() isolated = await self.config.guild(guild).ISOLATED() data = isolated.get(str(user.id), {}) removed_roles_parsed = resolve_role_list(guild, data.get("removed_roles", [])) if role and role in user.roles: msg = "Isolation manually ended early by %s." % ctx.author original_start = data.get("start") original_end = data.get("until") remaining = original_end and (original_end - now) if remaining: msg += " %s was left" % generate_timespec(round(remaining)) if original_start: msg += " of the original %s." % generate_timespec(round(original_end - original_start)) else: msg += "." if reason: msg += "\n\nReason for ending early: " + reason if data.get("reason"): msg += "\n\nOriginal reason was: " + data["reason"] updated_reason = str(msg) # copy string if removed_roles_parsed: names_list = format_list(*(r.name for r in removed_roles_parsed)) msg += "\nRestored role(s): {}".format(names_list) if not await self._unisolate(user, reason=updated_reason, update=True, moderator=moderator): msg += "\n\n(failed to send Isolation end notification DM)" await ctx.send(msg) elif data: # This shouldn't happen, but just in case now = time.time() until = data.get("until") remaining = until and generate_timespec(round(until - now)) or "forever" data_fmt = "\n".join( [ "**Reason:** %s" % (data.get("reason") or "no reason set"), "**Time remaining:** %s" % remaining, "**Moderator**: %s" % (user.guild.get_member(data.get("by")) or "Missing ID#%s" % data.get("by")), ] ) del isolated[str(user.id)] await self.config.guild(guild).ISOLATED.set(isolated) await ctx.send( "That user doesn't have the %s role, but they still have a data entry. I removed it, " "but in case it's needed, this is what was there:\n\n%s" % (role.name, data_fmt) ) elif role: await ctx.send("That user doesn't have the %s role." % role.name) else: await ctx.send("The isolate role couldn't be found in this guild.") @isolate.command(name="reason") @commands.guild_only() @checks.mod() async def isolate_reason(self, ctx, user: discord.Member, *, reason: str = None): """ Updates the reason for a Isolation, including the modlog if a case exists. """ guild = ctx.guild isolated = await self.config.guild(guild).ISOLATED() data = isolated.get(str(user.id), None) if not data: await ctx.send( "That user doesn't have an active Isolation entry. To update modlog " "cases manually, use the `%sreason` command." % ctx.prefix ) return isolated[str(user.id)]["reason"] = reason await self.config.guild(guild).ISOLATED.set(isolated) if reason: msg = "Reason updated." else: msg = "Reason cleared." caseno = data.get("caseno") try: case = await modlog.get_case(caseno, guild, self.bot) except: msg += "\nMod case not found!" case = None if case: moderator = ctx.author try: edits = {"reason": reason} if moderator.id != data.get("by"): edits["amended_by"] = moderator edits["modified_at"] = ctx.message.created_at.timestamp() await case.edit(edits) except: msg += "\n" + warning("Mod case not modified due to error.") await ctx.send(msg) @commands.group() @commands.guild_only() @checks.admin_or_permissions(administrator=True) async def isolateset(self, ctx): pass @isolateset.command(name="nitro-role") async def isolateset_nitro_role(self, ctx, *, role: str = None): """ Set nitro booster role so its not removed when isolateing. If your server doesn't have a nitro role, run this command with the role string `no_nitro_role` """ guild = ctx.guild current = await self.config.guild(guild).NITRO_ID() current = role_from_string(guild, current) if role and role.lower() == "no_nitro_role": await self.config.guild(guild).NITRO_ID.set(role) await ctx.send("No nitro role set.") return if not role and current: await ctx.send(f"Nitro role set to {current}") return elif not role: await ctx.send("No nitro role defined.") return role = role_from_string(guild, role) if not role: await ctx.send("Role not found!") return await self.config.guild(guild).NITRO_ID.set(role.id) await ctx.send("Nitro role set!") @isolateset.command(name="sync-roles") async def isolateset_sync_roles(self, ctx): """ Applies the remove-roles list to all isolated users This operation may take some time to complete, depending on the number of members. """ guild = ctx.guild isolated = await self.config.guild(guild).ISOLATED() role = await self.config.guild(guild).ROLE_ID() nitro = await self.config.guild(guild).NITRO_ID() role_memo = Memoizer(role_from_string, guild) highest_role = guild.me.top_role count = 0 errors = 0 if not guild.me.guild_permissions.manage_roles: await ctx.send(error("I need the Manage Roles permission to do that.")) return # (re)populate the member cache if guild.large: await self.bot.request_offline_members(guild) for member_id, member_data in isolated.items(): member = guild.get_member(member_id) if not member: continue member_roles = set(member.roles) original_roles = member_roles.copy() try: # Combine sets to get the baseline (roles they'd have normally) member_roles |= set(role_memo.filter(member_data["removed_roles"], skip_nulls=True)) except KeyError: pass guild_remove_roles = [r for r in member_roles if r.id != role and r.id != nitro and r.name != "@everyone"] # update new removed roles with intersection of guild removal list and baseline new_removed = guild_remove_roles & member_roles isolated[str(member.id)]["removed_roles"] = [r.id for r in new_removed] member_roles -= guild_remove_roles # can't restore, so skip (remove from set) for role in member_roles - original_roles: if role >= highest_role: member_roles.discard(role) # can't remove, so skip (re-add to set) for role in original_roles - member_roles: if role >= highest_role: member_roles.add(role) # Now update roles if we need to if member_roles != original_roles: try: await member.edit(roles=member_roles, reason="isolate sync roles") except Exception: log.exception(f"Couldn't modify roles in sync-roles command in {guild.name}!") errors += 1 else: count += 1 msg = f"Updated {count} members' roles." if errors: msg += "\n" + warning(f"{errors} errors occured; check the bot logs for more information.") await ctx.send(msg) @isolateset.command(name="setup") async def isolateset_setup(self, ctx): """ (Re)configures the isolate role and channel overrides """ guild = ctx.guild default_name = DEFAULT_ROLE_NAME role_id = await self.config.guild(guild).ROLE_ID() if role_id: role = discord.utils.get(guild.roles, id=role_id) else: role = discord.utils.get(guild.roles, name=default_name) perms = guild.me.guild_permissions if not perms.manage_roles and perms.manage_channels: await ctx.send("I need the Manage Roles and Manage Channels permissions for that command to work.") return if not role: msg = "The %s role doesn't exist; Creating it now... " % default_name msgobj = await ctx.send(msg) perms = discord.Permissions.none() role = await guild.create_role(name=default_name, permissions=perms, reason="isolate cog.") else: msgobj = await ctx.send("%s role exists... " % role.name) if role.position != (guild.me.top_role.position - 1): if role < guild.me.top_role: await msgobj.edit(content=msgobj.content + "moving role to higher position... ") await role.edit(position=guild.me.top_role.position - 1) else: await msgobj.edit( content=msgobj.content + "role is too high to manage." " Please move it to below my highest role." ) return await msgobj.edit(content=msgobj.content + "(re)configuring channels... ") for channel in guild.channels: await self.setup_channel(channel, role) await msgobj.edit(content=msgobj.content + "done.") if role and role.id != role_id: await self.config.guild(guild).ROLE_ID.set(role.id) @isolateset.command(name="channel") async def isolateset_channel(self, ctx, channel: discord.TextChannel = None): """ Sets or shows the isolation "timeout" channel. This channel has special settings to allow isolated users to discuss their infraction(s) with moderators. If there is a role deny on the channel for the isolate role, it is automatically set to allow. If the default permissions don't allow the isolated role to see or speak in it, an overwrite is created to allow them to do so. """ guild = ctx.guild current = await self.config.guild(guild).CHANNEL_ID() current = current and guild.get_channel(current) if channel is None: if not current: await ctx.send("No timeout channel has been set.") else: await ctx.send("The timeout channel is currently %s." % current.mention) else: if current == channel: await ctx.send( "The timeout channel is already %s. If you need to repair its permissions, use `%sisolateset setup`." % (current.mention, ctx.prefix) ) return await self.config.guild(guild).CHANNEL_ID.set(channel.id) role = await self.get_role(guild, create=True) update_msg = "{} to the %s role" % role grants = [] denies = [] perms = permissions_for_roles(channel, role) overwrite = channel.overwrites_for(role) or discord.PermissionOverwrite() for perm, value in DEFAULT_TIMEOUT_OVERWRITE: if value is None: continue if getattr(perms, perm) != value: setattr(overwrite, perm, value) name = perm.replace("_", " ").title().replace("Tts", "TTS") if value: grants.append(name) else: denies.append(name) # Any changes made? Apply them. if grants or denies: grants = grants and ("grant " + format_list(*grants)) denies = denies and ("deny " + format_list(*denies)) to_join = [x for x in (grants, denies) if x] update_msg = update_msg.format(format_list(*to_join)) if current and current.id != channel.id: if current.permissions_for(guild.me).manage_roles: msg = info("Resetting permissions in the old channel (%s) to the default...") else: msg = error("I don't have permissions to reset permissions in the old channel (%s)") await ctx.send(msg % current.mention) await self.setup_channel(current, role) if channel.permissions_for(guild.me).manage_roles: await ctx.send(info("Updating permissions in %s to %s..." % (channel.mention, update_msg))) await channel.set_permissions(role, overwrite=overwrite) else: await ctx.send(error("I don't have permissions to %s." % update_msg)) await ctx.send("Timeout channel set to %s." % channel.mention) @isolateset.command(name="clear-channel") async def isolateset_clear_channel(self, ctx): """ Clears the timeout channel and resets its permissions """ guild = ctx.guild current = await self.config.guild(guild).CHANNEL_ID() current = current and guild.get_channel(current) if current: msg = None await self.config.guild(guild).CHANNEL_ID.set(None) if current.permissions_for(guild.me).manage_roles: role = await self.get_role(guild, quiet=True) await self.setup_channel(current, role) msg = " and its permissions reset" else: msg = ", but I don't have permissions to reset its permissions." await ctx.send("Timeout channel has been cleared%s." % msg) else: await ctx.send("No timeout channel has been set yet.") @isolateset.command(name="case-min") async def isolateset_case_min(self, ctx, *, timespec: str = None): """ Set/disable or display the minimum isolation case duration If the isolation duration is less than this value, a case will not be created. Specify 'disable' to turn off case creation altogether. """ guild = ctx.guild current = await self.config.guild(guild).CASE_MIN_LENGTH() if not timespec: if current: await ctx.send("Isolations longer than %s will create cases." % generate_timespec(current)) else: await ctx.send("Isolation case creation is disabled.") else: if timespec.strip("'\"").lower() == "disable": value = None else: try: value = parse_time(timespec) except BadTimeExpr as e: await ctx.send(error(e.args[0])) return await self.config.guild(guild).CASE_MIN_LENGTH.set(value) await ctx.send("Isolations longer than %s will create cases." % generate_timespec(value)) @isolateset.command(name="overrides") async def isolateset_overrides(self, ctx, *, channel_id: int = None): """ Copy or display the isolate role overrides If a channel id is specified, the allow/deny settings for it are saved and applied to new channels when they are created. To apply the new settings to existing channels, use [p]isolateset setup. An important caveat: voice channel and text channel overrides are configured separately! To set the overrides for a channel type, specify the name of or mention a channel of that type. """ guild = ctx.guild role = await self.get_role(guild, quiet=True) timeout_channel_id = await self.config.guild(guild).CHANNEL_ID() confirm_msg = None channel = guild.get_channel(channel_id) if not role: await ctx.send(error("Isolate role has not been created yet. Run `%sisolateset setup` first." % ctx.prefix)) return if channel: overwrite = channel.overwrites_for(role) if channel.id == timeout_channel_id: confirm_msg = "Are you sure you want to copy overrides from the timeout channel?" elif overwrite is None: overwrite = discord.PermissionOverwrite() confirm_msg = "Are you sure you want to copy blank (no permissions set) overrides?" else: confirm_msg = "Are you sure you want to copy overrides from this channel?" if channel.type is discord.ChannelType.text: key = "text" elif channel.type is discord.ChannelType.voice: key = "voice" else: await ctx.send(error("Unknown channel type!")) return if confirm_msg: await ctx.send(warning(confirm_msg + "(reply `yes` within 30s to confirm)")) def check(m): return m.author == ctx.author and m.channel == ctx.channel try: reply = await self.bot.wait_for("message", check=check, timeout=30.0) if reply.content.strip(" `\"'").lower() != "yes": await ctx.send("Commmand cancelled.") return except asyncio.TimeoutError: await ctx.send("Timed out waiting for a response.") return if key == "text": await self.config.guild(guild).TEXT_OVERWRITE.set(overwrite_to_dict(overwrite)) else: await self.config.guild(guild).VOICE_OVERWRITE.set(overwrite_to_dict(overwrite)) await ctx.send( "{} channel overrides set to:\n".format(key.title()) + format_permissions(overwrite) + "\n\nRun `%sisolateset setup` to apply them to all channels." % ctx.prefix ) else: msg = [] for key in ("text", "voice"): if key == "text": data = await self.config.guild(guild).TEXT_OVERWRITE() else: data = await self.config.guild(guild).VOICE_OVERWRITE() title = "%s permission overrides:" % key.title() if data == overwrite_to_dict(DEFAULT_TEXT_OVERWRITE) or data == overwrite_to_dict( DEFAULT_VOICE_OVERWRITE ): title = title[:-1] + " (defaults):" msg.append(bold(title) + "\n" + format_permissions(overwrite_from_dict(data))) await ctx.send("\n\n".join(msg)) @isolateset.command(name="reset-overrides") async def isolateset_reset_overrides(self, ctx, channel_type: str = "both"): """ Resets the isolate role overrides for text, voice or both (default) This command exists in case you want to restore the default settings for newly created channels. """ channel_type = channel_type.strip("`\"' ").lower() msg = [] for key in ("text", "voice"): if channel_type not in ["both", key]: continue title = "%s permission overrides reset to:" % key.title() if key == "text": await self.config.guild(guild).TEXT_OVERWRITE.set(overwrite_to_dict(DEFAULT_TEXT_OVERWRITE)) msg.append(bold(title) + "\n" + format_permissions(overwrite_to_dict(DEFAULT_TEXT_OVERWRITE))) else: await self.config.guild(guild).VOICE_OVERWRITE.set(overwrite_to_dict(DEFAULT_VOICE_OVERWRITE)) msg.append(bold(title) + "\n" + format_permissions(overwrite_to_dict(DEFAULT_VOICE_OVERWRITE))) if not msg: await ctx.send("Invalid channel type. Use `text`, `voice`, or `both` (the default, if not specified)") return msg.append("Run `%sisolateset setup` to apply them to all channels." % ctx.prefix) await ctx.send("\n\n".join(msg)) async def get_role(self, guild, quiet=False, create=False): role_id = await self.config.guild(guild).ROLE_ID() if role_id: role = discord.utils.get(guild.roles, id=role_id) else: role = discord.utils.get(guild.roles, name=DEFAULT_ROLE_NAME) if create and not role: perms = guild.me.guild_permissions if not perms.manage_roles and perms.manage_channels: await ctx.send("The Manage Roles and Manage Channels permissions are required to use this command.") return else: msg = "The %s role doesn't exist; Creating it now..." % DEFAULT_ROLE_NAME if not quiet: msgobj = await ctx.send(msg) log.debug("Creating isolate role in %s" % guild.name) perms = discord.Permissions.none() role = await guild.create_role(name=DEFAULT_ROLE_NAME, permissions=perms, reason="isolate cog.") await role.edit(position=guild.me.top_role.position - 1) if not quiet: await msgobj.edit(content=msgobj.content + "\nconfiguring channels... ") for channel in guild.channels: await self.setup_channel(channel, role) if not quiet: await msgobj.edit(content=msgobj.content + "\ndone.") if role and role.id != role_id: await self.config.guild(guild).ROLE_ID.set(role.id) return role async def setup_channel(self, channel, role): guild = channel.guild timeout_channel_id = await self.config.guild(guild).CHANNEL_ID() if channel.id == timeout_channel_id: # maybe this will be used later: # config = settings.get('TIMEOUT_OVERWRITE') config = None defaults = DEFAULT_TIMEOUT_OVERWRITE elif channel.type is discord.ChannelType.voice: config = await self.config.guild(guild).VOICE_OVERWRITE() defaults = DEFAULT_VOICE_OVERWRITE else: config = await self.config.guild(guild).TEXT_OVERWRITE() defaults = DEFAULT_TEXT_OVERWRITE if config: perms = overwrite_from_dict(config) else: perms = defaults await channel.set_permissions(role, overwrite=perms, reason="isolate cog") async def on_load(self): await self.bot.wait_until_ready() _guilds = [g for g in self.bot.guilds if g.large and not (g.chunked or g.unavailable)] await self.bot.request_offline_members(*_guilds) for guild in self.bot.guilds: me = guild.me role = await self.get_role(guild, quiet=True, create=True) if not role: log.error("Needed to create isolate role in %s, but couldn't." % guild.name) continue role_memo = Memoizer(role_from_string, guild) isolated = await self.config.guild(guild).ISOLATED() for member_id, data in isolated.items(): until = data["until"] member = guild.get_member(int(member_id)) if until and (until - time.time()) < 0: if member: reason = "Isolation removal overdue, maybe the bot was offline. " if data["reason"]: reason += data["reason"] await self._unisolate(member, reason=reason) else: # member disappeared del isolated[member_id] elif member: # re-check roles user_roles = set(member.roles) removed_roles = set(role_memo.filter(data.get("removed_roles", ()), skip_nulls=True)) removed_roles = user_roles & {r for r in removed_roles if r < me.top_role} user_roles -= removed_roles apply_roles = removed_roles if role not in user_roles: if role >= me.top_role: log.error("Needed to re-add isolate role to %s in %s, but couldn't." % (member, guild.name)) else: user_roles.add(role) # add isolate role to the set apply_roles = True if apply_roles: await member.edit(roles=member_roles, reason="isolate ending") if until: await self.schedule_unisolate(until, member) while True: try: async with self.queue_lock: while await self.process_queue_event(): pass await asyncio.sleep(5) except asyncio.CancelledError: break except Exception: pass log.debug("queue manager dying") while not self.queue.empty(): self.queue.get_nowait() for fut in self.pending.values(): fut.cancel() async def cancel_queue_event(self, *args) -> bool: if args in self.pending: self.pending.pop(args).cancel() return True else: events = [] removed = None async with self.queue_lock: while not self.queue.empty(): item = self.queue.get_nowait() if args == item[1:]: removed = item break else: events.append(item) for item in events: self.queue.put_nowait(item) return removed is not None async def put_queue_event(self, run_at: float, *args): diff = run_at - time.time() if args in self.enqueued: return False self.enqueued.add(args) if diff < 0: await self.execute_queue_event(0, *args) elif run_at - time.time() < QUEUE_TIME_CUTOFF: self.pending[args] = asyncio.create_task(self.execute_queue_event(diff, *args)) else: await self.queue.put((run_at, *args)) async def process_queue_event(self): if self.queue.empty(): return False now = time.time() item = await self.queue.get() next_time, *args = item diff = next_time - now if diff < 0: if await self.execute_queue_event(0, *args): return elif diff < QUEUE_TIME_CUTOFF: self.pending[args] = asyncio.create_task(self.execute_queue_event(diff, *args)) return True await self.queue.put(item) return False async def execute_queue_event(self, diff, *args) -> bool: # delays then executes queue event await asyncio.sleep(diff) self.enqueued.discard(args) try: return self.execute_unisolate(*args) except Exception: log.exception("failed to execute scheduled event") async def _isolate_cmd_common(self, ctx, member, duration, reason, quiet=False): guild = ctx.guild using_default = False updating_case = False case_error = None isolated = await self.config.guild(guild).ISOLATED() current = isolated.get(str(member.id), {}) reason = reason or current.get("reason") # don't clear if not given hierarchy_allowed = ctx.author.top_role > member.top_role case_min_length = await self.config.guild(guild).CASE_MIN_LENGTH() nitro_role = await self.config.guild(guild).NITRO_ID() if nitro_role is None: await ctx.send(f"Please set the nitro role using `{ctx.prefix}isolateset nitro-role`") return if member == guild.me: await ctx.send("You can't isolate the bot.") return # check if user is isolated, fix conflict with isolate cog punish = self.bot.get_cog("Punish") if punish: punished = await punish.config.guild(guild).PUNISHED() if str(member.id) in punished: await ctx.send( warning("This person is punished, I will remove it now before isolating to avoid conflicts.") ) await ctx.invoke(punish.punish_end, user=member, reason="Conflict with isolate cog.") # double check it actually worked punished = await punish.config.guild(guild).PUNISHED() if str(member.id) in punished: await ctx.send(error("Couldn't remove punish from user, please do it manually.")) return if duration and duration.lower() in ["forever", "inf", "infinite"]: duration = None else: if not duration: using_default = True duration = DEFAULT_TIMEOUT try: duration = parse_time(duration) if duration < 1: await ctx.send("Duration must be 1 second or longer.") return False except BadTimeExpr as e: await ctx.send("Error parsing duration: %s." % e.args) return False role = await self.get_role(guild, quiet=quiet, create=True) if role is None: return elif role >= guild.me.top_role: await ctx.send("The %s role is too high for me to manage." % role) return # Call time() after getting the role due to potential creation delay now = time.time() until = (now + duration + 0.5) if duration else None duration_ok = (case_min_length is not None) and ((duration is None) or duration >= case_min_length) if duration_ok: now_date = datetime.utcfromtimestamp(now) mod_until = until and datetime.utcfromtimestamp(until) try: if current: case_number = current.get("caseno") try: case = await modlog.get_case(case_number, guild, self.bot) except: # shouldn't happen await ctx.send( warning( "Error, modlog case not found, but user is isolated with case.\nTry unisolating and isolating again." ) ) return moderator = ctx.author try: edits = {"reason": reason} if moderator.id != current.get("by"): edits["amended_by"] = moderator edits["modified_at"] = ctx.message.created_at.timestamp() await case.edit(edits) except Exception as e: await ctx.send(warning(f"Couldn't edit case: {e}")) return updating_case = True else: case = await modlog.create_case( self.bot, guild, now_date, "Timed Mute", member, moderator=ctx.author, reason=reason, until=mod_until, ) case_number = case.case_number except Exception as e: case_error = e else: case_number = None subject = "the %s role" % role.name if str(member.id) in isolated: if role in member.roles: msg = "{0} already had the {1.name} role; resetting their timer." else: msg = "{0} is missing the {1.name} role for some reason. I added it and reset their timer." elif role in member.roles: msg = "{0} already had the {1.name} role, but had no timer; setting it now." else: msg = "Applied the {1.name} role to {0}." subject = "it" msg = msg.format(member, role) if duration: timespec = generate_timespec(duration) if using_default: timespec += " (the default)" msg += " I will remove %s in %s." % (subject, timespec) if case_error: if isinstance(case_error, CaseMessageNotFound): case_error = "the case message could not be found" elif isinstance(case_error, NoModLogAccess): case_error = "I do not have access to the modlog channel" else: case_error = None if case_error: verb = "updating" if updating_case else "creating" msg += "\n\n" + warning("There was an error %s the modlog case: %s." % (verb, case_error)) elif case_number: verb = "updated" if updating_case else "created" msg += " I also %s case #%i in the modlog." % (verb, case_number) voice_overwrite = await self.config.guild(guild).VOICE_OVERWRITE() if voice_overwrite: voice_overwrite = overwrite_from_dict(voice_overwrite) else: voice_overwrite = DEFAULT_VOICE_OVERWRITE voice_deny = voice_overwrite.pair()[1] overwrite_denies_speak = (voice_deny.speak is False) or (voice_deny.connect is False) # remove all roles from user that are specified in remove_role_list, only if its a new isolati if str(member.id) not in isolated: user_roles = {r for r in member.roles if r.name != "@everyone"} removed_roles = user_roles.copy() if nitro_role != "no_nitro_role": nitro_role = role_from_string(guild, nitro_role) removed_roles.discard(nitro_role) # build lists of roles that *should* be removed and ones that *can* be too_high_to_remove = {r for r in removed_roles if r >= guild.me.top_role} user_roles -= removed_roles - too_high_to_remove user_roles.add(role) # add isolate role to the set await member.edit(roles=user_roles, reason=f"isolate {member}") else: removed_roles = set(resolve_role_list(guild, current.get("removed_roles", []))) too_high_to_remove = {r for r in removed_roles if r >= guild.me.top_role} if removed_roles: actually_removed = removed_roles - too_high_to_remove if actually_removed: msg += "\nRemoved roles: {}".format(format_list(*(r.name for r in actually_removed))) if too_high_to_remove: fmt_list = format_list(*(r.name for r in removed_roles)) msg += "\n" + warning( "These roles were too high to remove (fix hierarchy, then run " "`{}isolateset sync-roles`): {}".format(ctx.prefix, fmt_list) ) if member.voice: muted = member.voice.mute else: muted = False async with self.config.guild(guild).ISOLATED() as isolated: isolated[str(member.id)] = { "start": current.get("start") or now, # don't override start time if updating "until": until, "by": current.get("by") or ctx.author.id, # don't override original moderator "reason": reason, "unmute": overwrite_denies_speak and not muted, "caseno": case_number, "removed_roles": [r.id for r in removed_roles], } if member.voice and overwrite_denies_speak: if member.voice.channel: await member.edit(mute=True, deafen=True) # schedule callback for role removal if until: await self.schedule_unisolate(until, member) if not quiet: await ctx.send(msg) return True # Functions related to unisolateing async def schedule_unisolate(self, until, member): """ Schedules role removal, canceling and removing existing tasks if present """ await self.put_queue_event(until, member.guild.id, member.id) def execute_unisolate(self, guild_id, member_id) -> bool: guild = self.bot.get_guild(guild_id) if not guild: return False member = guild.get_member(member_id) if member: asyncio.create_task(self._unisolate(member)) return True else: asyncio.create_task(self.bot.request_offline_members(guild)) return False async def _unisolate( self, member, reason=None, apply_roles=True, update=False, moderator=None, quiet=False ) -> bool: """ Remove isolate role, delete record and task handle """ guild = member.guild role = await self.get_role(guild, quiet=True) nitro_role = await self.config.guild(guild).NITRO_ID() if role: data = await self.config.guild(guild).ISOLATED() member_data = data.get(str(member.id), {}) caseno = member_data.get("caseno") removed_roles = set(resolve_role_list(guild, member_data.get("removed_roles", []))) # Has to be done first to prevent triggering listeners await self._unisolate_data(member) await self.cancel_queue_event(member.guild.id, member.id) if apply_roles: # readd removed roles from user, by replacing user's roles with all of their roles plus the ones that # were removed (and can be re-added), minus the isolate role user_roles = set(member.roles) too_high_to_restore = {r for r in removed_roles if r >= guild.me.top_role} removed_roles -= too_high_to_restore user_roles |= removed_roles user_roles.discard(role) await member.edit(roles=user_roles, reason="isolate end") if update and caseno: until = member_data.get("until") or False # fallback gracefully moderator = moderator or guild.get_member(member_data.get("by")) or guild.me if until: until = datetime.utcfromtimestamp(until).timestamp() edits = {"reason": reason} if moderator.id != data.get("by"): edits["amended_by"] = moderator edits["modified_at"] = time.time() edits["until"] = until try: case = await modlog.get_case(caseno, guild, self.bot) await case.edit(edits) except Exception: pass if member_data.get("unmute", False): if member.voice: if member.voice.channel: await member.edit(mute=False, deafen=False) else: async with self.config.guild(guild).PENDING_UNMUTE() as unmute_list: if member.id not in unmute_list: unmute_list.append(member.id) if quiet: return True msg = "Your Isolation in %s has ended." % member.guild.name if reason: msg += "\nReason: %s" % reason if removed_roles: msg += "\n\nRestored roles: {}.".format(format_list(*(r.name for r in removed_roles))) if too_high_to_restore: fmt_list = format_list(*(r.name for r in too_high_to_restore)) msg += "\n" + warning( "These roles were too high for me to restore: {}. " "Ask a mod for help.".format(fmt_list) ) try: await member.send(msg) return True except Exception: return False async def _unisolate_data(self, member): """Removes isolate data entry and cancels any present callback""" guild = member.guild async with self.config.guild(guild).ISOLATED() as isolated: if str(member.id) in isolated: del isolated[str(member.id)] # Listeners @commands.Cog.listener() async def on_guild_channel_create(self, channel): """Run when new channels are created and set up role permissions""" if await self.bot.cog_disabled_in_guild(self, channel.guild): return role = await self.get_role(channel.guild, quiet=True) if not role: return await self.setup_channel(channel, role) @commands.Cog.listener() async def on_member_update(self, before, after): """Remove scheduled unisolate when manually removed role""" if await self.bot.cog_disabled_in_guild(self, after.guild): return try: assert before.roles != after.roles guild_data = await self.config.guild(before.guild).ISOLATED() member_data = guild_data[str(before.id)] role = await self.get_role(before.guild, quiet=True) assert role except (KeyError, AssertionError): return new_roles = {role.id: role for role in after.roles} if role in before.roles and role.id not in new_roles: msg = "Isolation manually ended early by a moderator/admin." if member_data["reason"]: msg += "\nReason was: " + member_data["reason"] await self._unisolate(after, reason=msg, update=True) else: to_remove = {new_roles.get(role_id) for role_id in member_data.get("removed_roles", [])} to_remove = [r for r in to_remove if r and r < after.guild.me.top_role] if to_remove: await after.remove_roles(*to_remove) @commands.Cog.listener() async def on_member_join(self, member): """Restore Isolation if isolated user leaves/rejoins""" if await self.bot.cog_disabled_in_guild(self, member.guild): return guild = member.guild isolated = await self.config.guild(guild).ISOLATED() data = isolated.get(str(member.id), {}) if not data: return # give other tools a chance to settle, then re-fetch data just in case await asyncio.sleep(1) member = self.bot.get_guild(guild.id).get_member(member.id) role = await self.get_role(member.guild, quiet=True) until = data["until"] duration = until - time.time() if role and duration > 0: await self.schedule_unisolate(until, member) if role not in member.roles: await member.add_roles(role) @commands.Cog.listener() async def on_voice_state_update(self, member, before, after): if await self.bot.cog_disabled_in_guild(self, member.guild): return if not after.channel: return guild = member.guild data = await self.config.guild(guild).ISOLATED() member_data = data.get(str(member.id), {}) unmute_list = await self.config.guild(guild).PENDING_UNMUTE() if member_data and not after.mute: await member.edit(mute=True, deafen=True) elif member.id in unmute_list: await member.edit(mute=False, deafen=False) if member.id in unmute_list: unmute_list.remove(member.id) await self.config.guild(guild).PENDING_UNMUTE.set(unmute_list) @commands.Cog.listener() async def on_member_ban(self, member): """Remove Isolation record when member is banned.""" if await self.bot.cog_disabled_in_guild(self, member.guild): return guild = member.guild data = await self.config.guild(guild).ISOLATED() member_data = data.get(str(member.id)) if member_data is None: return msg = "Isolation ended early due to ban." if member_data.get("reason"): msg += "\n\nOriginal reason was: " + member_data["reason"] await self._unisolate(member, reason=msg, apply_roles=False, update=True, quiet=True) async def red_delete_data_for_user( self, *, requester: Literal["discord_deleted_user", "owner", "user", "user_strict"], user_id: int, ): pass