from __future__ import annotations import contextlib import asyncio import re import time from abc import ABCMeta from typing import AsyncIterator, Tuple, Optional, Union, List, Dict import discord from discord.ext.commands import CogMeta as DPYCogMeta from redbot.core import checks, commands, bank from redbot.core.config import Config from redbot.core.utils.chat_formatting import box, pagify, warning from .events import EventMixin from .exceptions import RoleManagementException, PermissionOrHierarchyException from .massmanager import MassManagementMixin from .utils import UtilMixin, variation_stripper_re, parse_timedelta, parse_seconds try: from redbot.core.commands import GuildContext except ImportError: from redbot.core.commands import Context as GuildContext # type: ignore # This previously used ``(type(commands.Cog), type(ABC))`` # This was changed to be explicit so that mypy # would be slightly happier about it. # This does introduce a potential place this # can break in the future, but this would be an # Upstream breaking change announced in advance class CompositeMetaClass(DPYCogMeta, ABCMeta): """ This really only exists because of mypy wanting mixins to be individually valid classes. """ pass # MRO is fine on __new__ with super() use # no need to manually ensure both get handled here. MIN_SUB_TIME = 3600 SLEEP_TIME = 300 class RoleManagement( UtilMixin, MassManagementMixin, EventMixin, commands.Cog, metaclass=CompositeMetaClass, ): """ Cog for role management """ __author__ = "mikeshardmind(Sinbad), DiscordLiz" __version__ = "323.1.4" def format_help_for_context(self, ctx): pre_processed = super().format_help_for_context(ctx) return f"{pre_processed}\nCog Version: {self.__version__}" def __init__(self, bot): = bot self.config = Config.get_conf(self, identifier=78631113035100160, force_registration=True) self.config.register_global(handled_variation=False, handled_full_str_emoji=False) self.config.register_role( exclusive_to=[], requires_any=[], requires_all=[], sticky=False, self_removable=False, self_role=False, protected=False, cost=0, subscription=0, subscribed_users={}, ) # subscribed_users maps str(> end time in unix timestamp self.config.register_member(roles=[], forbidden=[]) self.config.init_custom("REACTROLE", 2) self.config.register_custom( "REACTROLE", roleid=None, channelid=None, guildid=None ) # ID :, str(React) self.config.register_guild(notify_channel=None, s_roles=[], free_roles=[]) self._ready = asyncio.Event() self._start_task: Optional[asyncio.Task] = None self.loop = asyncio.get_event_loop() self._sub_task = self.loop.create_task(self.sub_checker()) super().__init__() def cog_unload(self): if self._start_task: self._start_task.cancel() if self._sub_task: self._sub_task.cancel() def init(self): self._start_task = asyncio.create_task(self.initialization()) self._start_task.add_done_callback(lambda f: f.result()) async def initialization(self): data: Dict[str, Dict[str, Dict[str, Union[int, bool, List[int]]]]] await if not await self.config.handled_variation(): data = await self.config.custom("REACTROLE").all() to_adjust = {} for message_id, emojis_to_data in data.items(): for emoji_key in emojis_to_data: new_key, c = variation_stripper_re.subn("", emoji_key) if c: to_adjust[(message_id, emoji_key)] = new_key for (message_id, emoji_key), new_key in to_adjust.items(): data[message_id][new_key] = data[message_id][emoji_key] data[message_id].pop(emoji_key, None) await self.config.custom("REACTROLE").set(data) await self.config.handled_variation.set(True) if not await self.config.handled_full_str_emoji(): data = await self.config.custom("REACTROLE").all() to_adjust = {} pattern = re.compile(r"^(?)$") # Am not a fan.... for message_id, emojis_to_data in data.items(): for emoji_key in emojis_to_data: new_key, c = pattern.subn(r"\3", emoji_key) if c: to_adjust[(message_id, emoji_key)] = new_key for (message_id, emoji_key), new_key in to_adjust.items(): data[message_id][new_key] = data[message_id][emoji_key] data[message_id].pop(emoji_key, None) await self.config.custom("REACTROLE").set(data) await self.config.handled_full_str_emoji.set(True) self._ready.set() async def wait_for_ready(self): await self._ready.wait() async def cog_before_invoke(self, ctx): await self.wait_for_ready() if ctx.guild: await self.maybe_update_guilds(ctx.guild) # makes it a bit more readable async def sub_helper(self, guild, role, role_data): for user_id in list(role_data["subscribed_users"].keys()): end_time = role_data["subscribed_users"][user_id] now_time = time.time() if end_time <= now_time: member = guild.get_member(int(user_id)) if not member: # clean absent members del role_data["subscribed_users"][user_id] continue # charge user cost = await self.config.role(role).cost() currency_name = await bank.get_currency_name(guild) curr_sub = await self.config.role(role).subscription() if cost == 0 or curr_sub == 0: # role is free now or sub is removed, remove stale sub del role_data["subscribed_users"][user_id] continue msg = f"Hello! You are being charged {cost} {currency_name} for your subscription to the {} role in {}." try: await bank.withdraw_credits(member, cost) msg += f"\n\nNo further action is required! You'll be charged again in {parse_seconds(curr_sub)}." role_data["subscribed_users"][user_id] = now_time + curr_sub except ValueError: # user is poor msg += f"\n\nHowever, you do not have enough {currency_name} to cover the subscription. The role will be removed." await self.update_roles_atomically(who=member, remove=[role]) del role_data["subscribed_users"][user_id] try: await member.send(msg) except: # trys to send in system channel, if that fails then # send message in first channel bot can speak in channel = guild.system_channel msg += f"\n\n{member.mention} make sure to allow receiving DM's from server members so I can DM you this message!" if channel.permissions_for( await channel.send(msg) else: for channel in guild.text_channels: if channel.permissions_for( await channel.send(msg) break return role_data async def sub_checker(self): await self.wait_for_ready() while True: await asyncio.sleep(SLEEP_TIME) for guild in async with self.config.guild(guild).s_roles() as s_roles: for role_id in reversed(s_roles): role = guild.get_role(role_id) if not role: # clean stale subs if role is deleted s_roles.remove(role_id) continue role_data = await self.config.role(role).all() role_data = await self.sub_helper(guild, role, role_data) await self.config.role(role).subscribed_users.set(role_data["subscribed_users"]) if len(role_data["subscribed_users"]) == 0: s_roles.remove(role_id) @commands.guild_only() @commands.bot_has_permissions(manage_roles=True) @checks.admin_or_permissions(manage_roles=True) @commands.command(name="hackrole") async def hackrole(self, ctx: GuildContext, user_id: int, *, role: discord.Role): """ Puts a stickyrole on someone not in the server. """ if not await self.all_are_valid_roles(ctx, role): return await ctx.maybe_send_embed("Can't do that. Discord role heirarchy applies here.") if not await self.config.role(role).sticky(): return await ctx.send("This only works on sticky roles.") member = ctx.guild.get_member(user_id) if member: try: await self.update_roles_atomically(who=member, give=[role]) except PermissionOrHierarchyException: await ctx.send("Can't, somehow") else: await ctx.maybe_send_embed("They are in the guild...assigned anyway.") else: async with self.config.member_from_ids(, user_id).roles() as sticky: if not in sticky: sticky.append( await ctx.tick() @checks.is_owner() @commands.command(name="rrcleanup", hidden=True) async def rolemanagementcleanup(self, ctx: GuildContext): """ :eyes: """ data = await self.config.custom("REACTROLE").all() key_data = {} for maybe_message_id, maybe_data in data.items(): try: message_id = int(maybe_message_id) except ValueError: continue ex_keys = list(maybe_data.keys()) if not ex_keys: continue message = None channel_id = maybe_data[ex_keys[0]]["channelid"] channel = if channel: with contextlib.suppress(discord.HTTPException): assert isinstance(channel, discord.TextChannel) # nosec message = await channel.fetch_message(message_id) if not message: key_data.update({maybe_message_id: ex_keys}) for mid, keys in key_data.items(): for k in keys: await self.config.custom("REACTROLE", mid, k).clear() await ctx.tick() @commands.guild_only() @commands.bot_has_permissions(manage_roles=True) @checks.admin_or_permissions(manage_guild=True) @commands.command(name="rolebind") async def bind_role_to_reactions( self, ctx: GuildContext, role: discord.Role, channel: discord.TextChannel, msgid: int, emoji: str, ): """ Binds a role to a reaction on a message... The role is only given if the criteria for it are met. Make sure you configure the other settings for a role in [p]roleset """ if not await self.all_are_valid_roles(ctx, role): return await ctx.maybe_send_embed("Can't do that. Discord role heirarchy applies here.") try: message = await channel.fetch_message(msgid) except discord.HTTPException: return await ctx.maybe_send_embed("No such message") _emoji: Optional[Union[discord.Emoji, str]] _emoji = discord.utils.find(lambda e: str(e) == emoji, if _emoji is None: try: await ctx.message.add_reaction(emoji) except discord.HTTPException: return await ctx.maybe_send_embed("No such emoji") else: _emoji = emoji eid = self.strip_variations(emoji) else: eid = str( if not any(str(r) == emoji for r in message.reactions): try: await message.add_reaction(_emoji) except discord.HTTPException: return await ctx.maybe_send_embed("Hmm, that message couldn't be reacted to") cfg = self.config.custom("REACTROLE", str(, eid) await cfg.set( {"roleid":, "channelid":, "guildid":,} ) await ctx.send( f"Remember, the reactions only function according to " f"the rules set for the roles using `{ctx.prefix}roleset`", delete_after=30, ) @commands.guild_only() @commands.bot_has_permissions(manage_roles=True) @checks.admin_or_permissions(manage_guild=True) @commands.command(name="roleunbind") async def unbind_role_from_reactions(self, ctx: commands.Context, role: discord.Role, msgid: int, emoji: str): """ unbinds a role from a reaction on a message """ if not await self.all_are_valid_roles(ctx, role): return await ctx.maybe_send_embed("Can't do that. Discord role heirarchy applies here.") await self.config.custom("REACTROLE", f"{msgid}", self.strip_variations(emoji)).clear() await ctx.tick() @commands.guild_only() @commands.bot_has_permissions(manage_roles=True) @checks.admin_or_permissions(manage_guild=True)"roleset", autohelp=True) async def rgroup(self, ctx: GuildContext): """ Settings for role requirements """ pass @rgroup.command(name="viewreactions") async def rg_view_reactions(self, ctx: GuildContext): """ View the reactions enabled for the server """ # This design is intentional for later extention to view this per role use_embeds = await ctx.embed_requested() react_roles = "\n".join( [msg async for msg in self.build_messages_for_react_roles(*ctx.guild.roles, use_embeds=use_embeds)] ) if not react_roles: return await ctx.send("No react roles bound here.") # ctx.send is already going to escape said mentions if any somehow get generated # should also not be possible to do so without willfully being done by an admin. color = await ctx.embed_colour() if use_embeds else None for page in pagify(react_roles, escape_mass_mentions=False, page_length=1800, shorten_by=0): # unrolling iterative calling of ctx.maybe_send_embed if use_embeds: await ctx.send(embed=discord.Embed(description=page, color=color)) else: await ctx.send(page) @rgroup.command(name="viewrole") async def rg_view_role(self, ctx: GuildContext, *, role: discord.Role): """ Views the current settings for a role """ rsets = await self.config.role(role).all() output = ( f"This role:\n{'is' if rsets['self_role'] else 'is not'} self assignable" f"\n{'is' if rsets['self_removable'] else 'is not'} self removable" f"\n{'is' if rsets['sticky'] else 'is not'} sticky." ) if rsets["requires_any"]: rstring = ", ".join( for r in ctx.guild.roles if in rsets["requires_any"]) output += f"\nThis role requires any of the following roles: {rstring}" if rsets["requires_all"]: rstring = ", ".join( for r in ctx.guild.roles if in rsets["requires_all"]) output += f"\nThis role requires all of the following roles: {rstring}" if rsets["exclusive_to"]: rstring = ", ".join( for r in ctx.guild.roles if in rsets["exclusive_to"]) output += f"\nThis role is mutually exclusive to the following roles: {rstring}" if rsets["cost"]: curr = await bank.get_currency_name(ctx.guild) cost = rsets["cost"] output += f"\nThis role costs {cost} {curr}" else: output += "\nThis role does not have an associated cost." if rsets["subscription"]: s = rsets["subscription"] output += f"\nThis role has a subscription time of: {parse_seconds(s)}" for page in pagify(output): await ctx.send(page) @rgroup.command(name="cost") async def make_purchasable(self, ctx: GuildContext, cost: int, *, role: discord.Role): """ Makes a role purchasable for a specified cost. Cost must be a number greater than 0. A cost of exactly 0 can be used to remove purchasability. Purchase eligibility still follows other rules including self assignable. Warning: If these roles are bound to a reaction, it will be possible to gain these without paying. """ if not await self.all_are_valid_roles(ctx, role): return await ctx.maybe_send_embed("Can't do that. Discord role heirarchy applies here.") if cost < 0: return await ctx.send_help() await self.config.role(role).cost.set(cost) if cost == 0: await ctx.send(f"{} is no longer purchasable.") else: await ctx.send(f"{} is purchasable for {cost}") @rgroup.command(name="subscription") async def subscription(self, ctx, role: discord.Role, *, interval: str): """ Sets a role to be a subscription, must set cost first. Will charge role's cost every interval, and remove the role if they run out of money Set to 0 to disable **__Minimum subscription duration is 1 hour__** Intervals look like: 5 minutes 1 minute 30 seconds 1 hour 2 days 30 days 5h30m (etc) """ if not await self.all_are_valid_roles(ctx, role): return await ctx.maybe_send_embed("Can't do that. Discord role heirarchy applies here.") role_cost = await self.config.role(role).cost() if role_cost == 0: await ctx.send(waring("Please set a cost for the role first.")) return time = parse_timedelta(interval) if int(time.total_seconds()) == 0: await ctx.send("Subscription removed.") async with self.config.guild(ctx.guild).s_roles() as s: s.remove( return elif int(time.total_seconds()) < MIN_SUB_TIME: await ctx.send("Subscriptions must be 1 hour or longer.") return await self.config.role(role).subscription.set(int(time.total_seconds())) async with self.config.guild(ctx.guild).s_roles() as s: s.append( await ctx.send(f"Subscription set to {parse_seconds(time.total_seconds())}.") @rgroup.command(name="forbid") async def forbid_role(self, ctx: GuildContext, role: discord.Role, *, user: discord.Member): """ Forbids a user from gaining a specific role. """ async with self.config.member(user).forbidden() as fb: if not in fb: fb.append( else: await ctx.send("Role was already forbidden") await ctx.tick() @rgroup.command(name="unforbid") async def unforbid_role(self, ctx: GuildContext, role: discord.Role, *, user: discord.Member): """ Unforbids a user from gaining a specific role. """ async with self.config.member(user).forbidden() as fb: if in fb: fb.remove( else: await ctx.send("Role was not forbidden") await ctx.tick() @rgroup.command(name="exclusive") async def set_exclusivity(self, ctx: GuildContext, *roles: discord.Role): """ Takes 2 or more roles and sets them as exclusive to eachother """ _roles = set(roles) if len(_roles) < 2: return await ctx.send("You need to provide at least 2 roles") for role in _roles: async with self.config.role(role).exclusive_to() as ex_list: ex_list.extend([ for r in _roles if r != role and not in ex_list]) await ctx.tick() @rgroup.command(name="unexclusive") async def unset_exclusivity(self, ctx: GuildContext, *roles: discord.Role): """ Takes any number of roles, and removes their exclusivity settings """ _roles = set(roles) if not _roles: return await ctx.send("You need to provide at least a role to do this to") for role in _roles: ex_list = await self.config.role(role).exclusive_to() ex_list = [idx for idx in ex_list if idx not in [ for r in _roles]] await self.config.role(role).exclusive_to.set(ex_list) await ctx.tick() @rgroup.command(name="sticky") async def setsticky(self, ctx: GuildContext, role: discord.Role, sticky: bool = None): """ sets a role as sticky if used without a settings, gets the current ones """ if sticky is None: is_sticky = await self.config.role(role).sticky() return await ctx.send("{role} {verb} sticky".format(, verb=("is" if is_sticky else "is not"))) await self.config.role(role).sticky.set(sticky) if sticky: for m in role.members: async with self.config.member(m).roles() as rids: if not in rids: rids.append( await ctx.tick() # TODO set roles who don't need to pay for roles @rgroup.command(name="requireall") async def reqall(self, ctx: GuildContext, role: discord.Role, *roles: discord.Role): """ Sets the required roles to gain a role Takes a role plus zero or more other roles (as requirements for the first) """ rids = [ for r in roles] await self.config.role(role).requires_all.set(rids) await ctx.tick() @rgroup.command(name="requireany") async def reqany(self, ctx: GuildContext, role: discord.Role, *roles: discord.Role): """ Sets a role to require already having one of another Takes a role plus zero or more other roles (as requirements for the first) """ rids = [ for r in (roles or [])] await self.config.role(role).requires_any.set(rids) await ctx.tick() @rgroup.command(name="selfrem") async def selfrem(self, ctx: GuildContext, role: discord.Role, removable: bool = None): """ Sets if a role is self-removable (default False) use without a setting to view current """ if removable is None: is_removable = await self.config.role(role).self_removable() return await ctx.send( "{role} {verb} self-removable".format(, verb=("is" if is_removable else "is not")) ) await self.config.role(role).self_removable.set(removable) await ctx.tick() @rgroup.command(name="selfadd") async def selfadd(self, ctx: GuildContext, role: discord.Role, assignable: bool = None): """ Sets if a role is self-assignable via command (default False) use without a setting to view current """ if assignable is None: is_assignable = await self.config.role(role).self_role() return await ctx.send( "{role} {verb} self-assignable".format(, verb=("is" if is_assignable else "is not")) ) await self.config.role(role).self_role.set(assignable) await ctx.tick()"freerole") async def free_roles(self, ctx: GuildContext): """ Sets roles that bypass costs for purchasing roles in your guild. """ pass @free_roles.command(name="add") async def free_roles_add(self, ctx: GuildContext, *, role: discord.Role): """ Add a role to the free list. """ async with self.config.guild(ctx.guild).free_roles() as free_roles: if not in free_roles: free_roles.append( await ctx.tick() @free_roles.command(name="rem") async def free_roles_rem(self, ctx: GuildContext, *, role: discord.Role): """ Remove a role from the free list. """ async with self.config.guild(ctx.guild).free_roles() as free_roles: try: free_roles.remove( except: await ctx.send("Role not in free list!") return await ctx.tick() @free_roles.command(name="list") async def free_roles_list(self, ctx: GuildContext): """ List free roles. """ roles = await self.config.guild(ctx.guild).free_roles() if not roles: await ctx.send("No roles defined.") return roles = [ctx.guild.get_role(role) for role in roles] missing = len([role for role in roles if role is None]) roles = [f"{i+1}.{}" for i, role in enumerate(roles) if role is not None] msg = "\n".join(sorted(roles)) msg = pagify(msg) for m in msg: await ctx.send(box(m)) @checks.bot_has_permissions(manage_roles=True) @commands.guild_only()"srole", autohelp=True) async def srole(self, ctx: GuildContext): """ Self assignable role commands """ pass @srole.command(name="list") async def srole_list(self, ctx: GuildContext): """ Lists the selfroles and any associated costs. """ MYPY = False if MYPY: # remove this when mypy supports type narrowing from := # It's less efficient, so not removing the actual # implementation below data: Dict[discord.Role, tuple] = {} for role_id, vals in (await self.config.all_roles()).items(): role = ctx.guild.get_role(role_id) if role and vals["self_role"]: data[role] = vals["cost"] else: data = { role: (vals["cost"], vals["subscription"]) for role_id, vals in (await self.config.all_roles()).items() if (role := ctx.guild.get_role(role_id)) and vals["self_role"] } if not data: return await ctx.send("There aren't any self roles here.") embed = discord.Embed(title="Roles", i = 0 for role, (cost, sub) in sorted(data.items(), key=lambda kv: kv[1]): embed.add_field( name=f"__**{i+1}. {}**__", value="%s%s" % ((f"Cost: {cost}" if cost else "Free"), (f", every {parse_seconds(sub)}" if sub else "")), ) i += 1 if i % 25 == 0: await ctx.send(embed=embed) await ctx.send(embed=embed) @srole.command(name="buy") async def srole_buy(self, ctx: GuildContext, *, role: discord.Role): """ Purchase a role """ if role in await ctx.send("You already have that role.") return try: remove = await self.is_self_assign_eligible(, role) eligible = await self.config.role(role).self_role() cost = await self.config.role(role).cost() subscription = await self.config.role(role).subscription() except RoleManagementException: return except PermissionOrHierarchyException: await ctx.send("I cannot assign roles which I can not manage. (Discord Hierarchy)") else: if not eligible: return await ctx.send(f"You aren't allowed to add `{role}` to yourself {}!") if not cost: return await ctx.send("This role doesn't have a cost. Please try again using `[p]srole add`.") free_roles = await self.config.guild(ctx.guild).free_roles() currency_name = await bank.get_currency_name(ctx.guild) for m_role in if in free_roles: await ctx.send(f"You're special, no {currency_name} will be deducted from your account.") await self.update_roles_atomically(, give=[role], remove=remove) await ctx.tick() return try: await bank.withdraw_credits(, cost) except ValueError: return await ctx.send(f"You don't have enough {currency_name} (Cost: {cost} {currency_name})") else: if subscription > 0: await ctx.send(f"{} will be renewed every {parse_seconds(subscription)}") async with self.config.role(role).subscribed_users() as s: s[str(] = time.time() + subscription async with self.config.guild(ctx.guild).s_roles() as s: if not in s: s.append( await self.update_roles_atomically(, give=[role], remove=remove) await ctx.tick() @srole.command(name="add") async def sadd(self, ctx: GuildContext, *, role: discord.Role): """ Join a role """ if role in await ctx.send("You already have that role.") return try: remove = await self.is_self_assign_eligible(, role) eligible = await self.config.role(role).self_role() cost = await self.config.role(role).cost() except RoleManagementException: return except PermissionOrHierarchyException: await ctx.send("I cannot assign roles which I can not manage. (Discord Hierarchy)") else: if not eligible: await ctx.send(f"You aren't allowed to add `{role}` to yourself {}!") elif cost: await ctx.send("This role is not free. " "Please use `[p]srole buy` if you would like to purchase it.") else: await self.update_roles_atomically(, give=[role], remove=remove) await ctx.tick() @srole.command(name="remove") async def srem(self, ctx: GuildContext, *, role: discord.Role): """ leave a role """ if role not in await ctx.send("You do not have that role.") return if await self.config.role(role).self_removable(): await self.update_roles_atomically(, remove=[role]) try: # remove subscription, if any async with self.config.role(role).subscribed_users() as s: del s[str(] except: pass await ctx.tick() else: await ctx.send(f"You aren't allowed to remove `{role}` from yourself {}!`") # Stuff for clean interaction with react role entries async def build_messages_for_react_roles(self, *roles: discord.Role, use_embeds=True) -> AsyncIterator[str]: """ Builds info. Info is suitable for passing to embeds if use_embeds is True """ linkfmt = ( "[message #{message_id}]({guild_id}/{channel_id}/{message_id})" if use_embeds else "" ) for role in roles: # pylint: disable=E1133 async for message_id, emoji_info, data in self.get_react_role_entries(role): channel_id = data.get("channelid", None) if channel_id: link = linkfmt.format(, channel_id=channel_id, message_id=message_id,) else: link = ( f"unknown message with id {message_id}" f" (use `roleset fixup` to find missing data for this)" ) emoji: Union[discord.Emoji, str] if emoji_info.isdigit(): emoji = ( discord.utils.get(, id=int(emoji_info)) or f"A custom enoji with id {emoji_info}" ) else: emoji = emoji_info react_m = f"{} is bound to {emoji} on {link}" yield react_m async def get_react_role_entries(self, role: discord.Role) -> AsyncIterator[Tuple[str, str, dict]]: """ yields: str, str, dict first str: message id second str: emoji id or unicode codepoint dict: data from the corresponding: config.custom("REACTROLE", messageid, emojiid) """ # self.config.register_custom( # "REACTROLE", roleid=None, channelid=None, guildid=None # ) # ID :, str(React) data = await self.config.custom("REACTROLE").all() for mid, _outer in data.items(): if not _outer or not isinstance(_outer, dict): continue for em, rdata in _outer.items(): if rdata and rdata["roleid"] == yield (mid, em, rdata)