diff --git a/rolemanagement/__init__.py b/rolemanagement/__init__.py new file mode 100644 index 0000000..18a93e3 --- /dev/null +++ b/rolemanagement/__init__.py @@ -0,0 +1,7 @@ +from .core import RoleManagement + + +def setup(bot): + cog = RoleManagement(bot) + bot.add_cog(cog) + cog.init() diff --git a/rolemanagement/abc.py b/rolemanagement/abc.py new file mode 100644 index 0000000..2bf2d96 --- /dev/null +++ b/rolemanagement/abc.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import List, Optional + +import discord +from redbot.core import Config +from redbot.core.bot import Red + + +class MixinMeta(ABC): + """ + Metaclass for well behaved type hint detection with composite class. + """ + + # https://github.com/python/mypy/issues/1996 + + def __init__(self, *_args): + self.config: Config + self.bot: Red + + @abstractmethod + def strip_variations(self, s: str) -> str: + raise NotImplementedError() + + @abstractmethod + async def wait_for_ready(self) -> None: + raise NotImplementedError() + + @abstractmethod + async def is_self_assign_eligible( + self, who: discord.Member, role: discord.Role + ) -> List[discord.Role]: + raise NotImplementedError() + + @abstractmethod + async def update_roles_atomically( + self, + *, + who: discord.Member, + give: Optional[List[discord.Role]] = None, + remove: Optional[List[discord.Role]] = None, + ): + raise NotImplementedError() + + @abstractmethod + async def all_are_valid_roles(self, ctx, *roles: discord.Role) -> bool: + raise NotImplementedError() + + @abstractmethod + async def maybe_update_guilds(self, *guilds: discord.Guild) -> None: + raise NotImplementedError() diff --git a/rolemanagement/converters.py b/rolemanagement/converters.py new file mode 100644 index 0000000..56235cd --- /dev/null +++ b/rolemanagement/converters.py @@ -0,0 +1,239 @@ +import argparse +import shlex +from typing import Optional, List, NamedTuple, Dict + +from redbot.core.commands import RoleConverter, Context, BadArgument +import discord + + +_RoleConverter = RoleConverter() + + +class NoExitParser(argparse.ArgumentParser): + def error(self, message): + raise BadArgument() + + +class RoleSyntaxConverter(NamedTuple): + parsed: Dict[str, List[discord.Role]] + + @classmethod + async def convert(cls, ctx: Context, argument: str): + parser = NoExitParser( + description="Role management syntax help", add_help=False, allow_abbrev=True + ) + parser.add_argument("--add", nargs="*", dest="add", default=[]) + parser.add_argument("--remove", nargs="*", dest="remove", default=[]) + try: + vals = vars(parser.parse_args(shlex.split(argument))) + except Exception: + raise BadArgument() + + if not vals["add"] and not vals["remove"]: + raise BadArgument("Must provide at least one action") + + for attr in ("add", "remove"): + vals[attr] = [await _RoleConverter.convert(ctx, r) for r in vals[attr]] + + return cls(vals) + + +class ComplexActionConverter(NamedTuple): + """ + --has-all roles + --has-none roles + --has-any roles + --has-no-roles + --has-exactly-nroles + --has-more-than-nroles + --has-less-than-nroles + --has-perm permissions + --any-perm permissions + --not-perm permissions + --above role + --below role + --add roles + --remove roles + --only-humans + --only-bots + --everyone + """ + + parsed: dict + + @classmethod + async def convert(cls, ctx: Context, argument: str): + + parser = NoExitParser(description="Role management syntax help", add_help=False) + parser.add_argument("--has-any", nargs="*", dest="any", default=[]) + parser.add_argument("--has-all", nargs="*", dest="all", default=[]) + parser.add_argument("--has-none", nargs="*", dest="none", default=[]) + parser.add_argument( + "--has-no-roles", action="store_true", default=False, dest="noroles" + ) + parser.add_argument("--has-perms", nargs="*", dest="hasperm", default=[]) + parser.add_argument("--any-perm", nargs="*", dest="anyperm", default=[]) + parser.add_argument("--not-perm", nargs="*", dest="notperm", default=[]) + parser.add_argument("--add", nargs="*", dest="add", default=[]) + parser.add_argument("--remove", nargs="*", dest="remove", default=[]) + parser.add_argument("--has-exactly-nroles", dest="quantity", type=int) + parser.add_argument("--has-more-than-nroles", dest="gt", type=int, default=None) + parser.add_argument("--has-less-than-nroles", dest="lt", type=int, default=None) + parser.add_argument("--above", dest="above", type=str, default=None) + parser.add_argument("--below", dest="below", type=str, default=None) + hum_or_bot = parser.add_mutually_exclusive_group() + hum_or_bot.add_argument( + "--only-humans", action="store_true", default=False, dest="humans" + ) + hum_or_bot.add_argument( + "--only-bots", action="store_true", default=False, dest="bots" + ) + hum_or_bot.add_argument( + "--everyone", action="store_true", default=False, dest="everyone" + ) + + try: + vals = vars(parser.parse_args(shlex.split(argument))) + except Exception: + raise BadArgument() + + if not vals["add"] and not vals["remove"]: + raise BadArgument("Must provide at least one action") + + if not any( + ( + vals["humans"], + vals["everyone"], + vals["bots"], + vals["any"], + vals["all"], + vals["none"], + vals["hasperm"], + vals["notperm"], + vals["anyperm"], + vals["noroles"], + bool(vals["quantity"] is not None), + bool(vals["gt"] is not None), + bool(vals["lt"] is not None), + vals["above"], + vals["below"], + ) + ): + raise BadArgument("You need to provide at least 1 search criterion") + + for attr in ("any", "all", "none", "add", "remove"): + vals[attr] = [await _RoleConverter.convert(ctx, r) for r in vals[attr]] + + for attr in ("below", "above"): + if vals[attr] is None: + continue + vals[attr] = await _RoleConverter.convert(ctx, vals[attr]) + + for attr in ("hasperm", "anyperm", "notperm"): + + vals[attr] = [ + i.replace("_", " ").lower().replace(" ", "_").replace("server", "guild") + for i in vals[attr] + ] + if any(perm not in dir(discord.Permissions) for perm in vals[attr]): + raise BadArgument("You gave an invalid permission") + + return cls(vals) + + +class ComplexSearchConverter(NamedTuple): + """ + --has-all roles + --has-none roles + --has-any roles + --has-no-roles + --has-exactly-nroles + --has-more-than-nroles + --has-less-than-nroles + --only-humans + --only-bots + --above role + --below role + --has-perm permissions + --any-perm permissions + --not-perm permissions + --everyone + --csv + """ + + parsed: dict + + @classmethod + async def convert(cls, ctx: Context, argument: str): + parser = NoExitParser(description="Role management syntax help", add_help=False) + parser.add_argument("--has-any", nargs="*", dest="any", default=[]) + parser.add_argument("--has-all", nargs="*", dest="all", default=[]) + parser.add_argument("--has-none", nargs="*", dest="none", default=[]) + parser.add_argument( + "--has-no-roles", action="store_true", default=False, dest="noroles" + ) + parser.add_argument("--has-perms", nargs="*", dest="hasperm", default=[]) + parser.add_argument("--any-perm", nargs="*", dest="anyperm", default=[]) + parser.add_argument("--not-perm", nargs="*", dest="notperm", default=[]) + parser.add_argument("--csv", action="store_true", default=False) + parser.add_argument( + "--has-exactly-nroles", dest="quantity", type=int, default=None + ) + parser.add_argument("--has-more-than-nroles", dest="gt", type=int, default=None) + parser.add_argument("--has-less-than-nroles", dest="lt", type=int, default=None) + parser.add_argument("--above", dest="above", type=str, default=None) + parser.add_argument("--below", dest="below", type=str, default=None) + hum_or_bot = parser.add_mutually_exclusive_group() + hum_or_bot.add_argument( + "--only-humans", action="store_true", default=False, dest="humans" + ) + hum_or_bot.add_argument( + "--only-bots", action="store_true", default=False, dest="bots" + ) + hum_or_bot.add_argument( + "--everyone", action="store_true", default=False, dest="everyone" + ) + try: + vals = vars(parser.parse_args(shlex.split(argument))) + except Exception: + raise BadArgument() + + if not any( + ( + vals["humans"], + vals["everyone"], + vals["bots"], + vals["any"], + vals["all"], + vals["none"], + vals["hasperm"], + vals["notperm"], + vals["anyperm"], + vals["noroles"], + bool(vals["quantity"] is not None), + bool(vals["gt"] is not None), + bool(vals["lt"] is not None), + vals["above"], + vals["below"], + ) + ): + raise BadArgument("You need to provide at least 1 search criterion") + + for attr in ("any", "all", "none"): + vals[attr] = [await _RoleConverter.convert(ctx, r) for r in vals[attr]] + + for attr in ("below", "above"): + if vals[attr] is None: + continue + vals[attr] = await _RoleConverter.convert(ctx, vals[attr]) + + for attr in ("hasperm", "anyperm", "notperm"): + + vals[attr] = [ + i.replace("_", " ").lower().replace(" ", "_").replace("server", "guild") + for i in vals[attr] + ] + if any(perm not in dir(discord.Permissions) for perm in vals[attr]): + raise BadArgument("You gave an invalid permission") + + return cls(vals) diff --git a/rolemanagement/core.py b/rolemanagement/core.py new file mode 100644 index 0000000..5711961 --- /dev/null +++ b/rolemanagement/core.py @@ -0,0 +1,976 @@ +from __future__ import annotations + +import contextlib +import asyncio +import re +import time +from abc import ABCMeta +from typing import AsyncIterator, Tuple, Optional, Union, List, Dict + +import discord +from discord.ext.commands import CogMeta as DPYCogMeta +from redbot.core import checks, commands, bank +from redbot.core.config import Config +from redbot.core.utils.chat_formatting import box, pagify, warning + +from .events import EventMixin +from .exceptions import RoleManagementException, PermissionOrHierarchyException +from .massmanager import MassManagementMixin +from .utils import UtilMixin, variation_stripper_re, parse_timedelta, parse_seconds + +try: + from redbot.core.commands import GuildContext +except ImportError: + from redbot.core.commands import Context as GuildContext # type: ignore + +# This previously used ``(type(commands.Cog), type(ABC))`` +# This was changed to be explicit so that mypy +# would be slightly happier about it. +# This does introduce a potential place this +# can break in the future, but this would be an +# Upstream breaking change announced in advance +class CompositeMetaClass(DPYCogMeta, ABCMeta): + """ + This really only exists because of mypy + wanting mixins to be individually valid classes. + """ + + pass # MRO is fine on __new__ with super() use + # no need to manually ensure both get handled here. + +MIN_SUB_TIME = 3600 +SLEEP_TIME = 300 + +class RoleManagement( + UtilMixin, + MassManagementMixin, + EventMixin, + commands.Cog, + metaclass=CompositeMetaClass, +): + """ + Cog for role management + """ + + __author__ = "mikeshardmind(Sinbad), DiscordLiz" + __version__ = "323.1.4" + + def format_help_for_context(self, ctx): + pre_processed = super().format_help_for_context(ctx) + return f"{pre_processed}\nCog Version: {self.__version__}" + + def __init__(self, bot): + self.bot = bot + self.config = Config.get_conf( + self, identifier=78631113035100160, force_registration=True + ) + self.config.register_global( + handled_variation=False, handled_full_str_emoji=False + ) + self.config.register_role( + exclusive_to=[], + requires_any=[], + requires_all=[], + sticky=False, + self_removable=False, + self_role=False, + protected=False, + cost=0, + subscription=0, + subscribed_users={} + )#subscribed_users maps str(user.id)-> end time in unix timestamp + self.config.register_member(roles=[], forbidden=[]) + self.config.init_custom("REACTROLE", 2) + self.config.register_custom( + "REACTROLE", roleid=None, channelid=None, guildid=None + ) # ID : Message.id, str(React) + self.config.register_guild(notify_channel=None, s_roles=[], free_roles=[]) + self._ready = asyncio.Event() + self._start_task: Optional[asyncio.Task] = None + self.loop = asyncio.get_event_loop() + self._sub_task = self.loop.create_task(self.sub_checker()) + super().__init__() + + def cog_unload(self): + if self._start_task: + self._start_task.cancel() + if self._sub_task: + self._sub_task.cancel() + + def init(self): + self._start_task = asyncio.create_task(self.initialization()) + self._start_task.add_done_callback(lambda f: f.result()) + + async def initialization(self): + data: Dict[str, Dict[str, Dict[str, Union[int, bool, List[int]]]]] + await self.bot.wait_until_red_ready() + if not await self.config.handled_variation(): + data = await self.config.custom("REACTROLE").all() + to_adjust = {} + for message_id, emojis_to_data in data.items(): + for emoji_key in emojis_to_data: + new_key, c = variation_stripper_re.subn("", emoji_key) + if c: + to_adjust[(message_id, emoji_key)] = new_key + + for (message_id, emoji_key), new_key in to_adjust.items(): + data[message_id][new_key] = data[message_id][emoji_key] + data[message_id].pop(emoji_key, None) + + await self.config.custom("REACTROLE").set(data) + await self.config.handled_variation.set(True) + + if not await self.config.handled_full_str_emoji(): + data = await self.config.custom("REACTROLE").all() + to_adjust = {} + pattern = re.compile(r"^(?)$") + # Am not a fan.... + for message_id, emojis_to_data in data.items(): + for emoji_key in emojis_to_data: + new_key, c = pattern.subn(r"\3", emoji_key) + if c: + to_adjust[(message_id, emoji_key)] = new_key + + for (message_id, emoji_key), new_key in to_adjust.items(): + data[message_id][new_key] = data[message_id][emoji_key] + data[message_id].pop(emoji_key, None) + + await self.config.custom("REACTROLE").set(data) + await self.config.handled_full_str_emoji.set(True) + + self._ready.set() + + async def wait_for_ready(self): + await self._ready.wait() + + async def cog_before_invoke(self, ctx): + await self.wait_for_ready() + if ctx.guild: + await self.maybe_update_guilds(ctx.guild) + + # makes it a bit more readable + async def sub_helper(self, guild, role, role_data): + for user_id in list(role_data["subscribed_users"].keys()): + end_time = role_data["subscribed_users"][user_id] + now_time = time.time() + if end_time <= now_time: + member = guild.get_member(int(user_id)) + if not member: # clean absent members + del role_data["subscribed_users"][user_id] + continue + # charge user + cost = await self.config.role(role).cost() + currency_name = await bank.get_currency_name(guild) + curr_sub = await self.config.role(role).subscription() + if cost == 0 or curr_sub == 0: + # role is free now or sub is removed, remove stale sub + del role_data["subscribed_users"][user_id] + continue + + msg = f"Hello! You are being charged {cost} {currency_name} for your subscription to the {role.name} role in {guild.name}." + try: + await bank.withdraw_credits(member, cost) + msg += f"\n\nNo further action is required! You'll be charged again in {parse_seconds(curr_sub)}." + role_data["subscribed_users"][user_id] = now_time + curr_sub + except ValueError: # user is poor + msg += f"\n\nHowever, you do not have enough {currency_name} to cover the subscription. The role will be removed." + await self.update_roles_atomically(who=member, remove=[role]) + del role_data["subscribed_users"][user_id] + + try: + await member.send(msg) + except: + # trys to send in system channel, if that fails then + # send message in first channel bot can speak in + channel = guild.system_channel + msg += f"\n\n{member.mention} make sure to allow receiving DM's from server members so I can DM you this message!" + if channel.permissions_for(channel.guild.me).send_messages: + await channel.send(msg) + else: + for channel in guild.text_channels: + if channel.permissions_for(channel.guild.me).send_messages: + await channel.send(msg) + break + + return role_data + + async def sub_checker(self): + await self.wait_for_ready() + while True: + await asyncio.sleep(SLEEP_TIME) + for guild in self.bot.guilds: + async with self.config.guild(guild).s_roles() as s_roles: + for role_id in reversed(s_roles): + role = guild.get_role(role_id) + if not role: # clean stale subs if role is deleted + s_roles.remove(role_id) + continue + + role_data = await self.config.role(role).all() + + role_data = await self.sub_helper(guild, role, role_data) + + await self.config.role(role).subscribed_users.set( + role_data["subscribed_users"] + ) + if len(role_data["subscribed_users"]) == 0: + s_roles.remove(role_id) + + @commands.guild_only() + @commands.bot_has_permissions(manage_roles=True) + @checks.admin_or_permissions(manage_roles=True) + @commands.command(name="hackrole") + async def hackrole(self, ctx: GuildContext, user_id: int, *, role: discord.Role): + """ + Puts a stickyrole on someone not in the server. + """ + + if not await self.all_are_valid_roles(ctx, role): + return await ctx.maybe_send_embed( + "Can't do that. Discord role heirarchy applies here." + ) + + if not await self.config.role(role).sticky(): + return await ctx.send("This only works on sticky roles.") + + member = ctx.guild.get_member(user_id) + if member: + + try: + await self.update_roles_atomically(who=member, give=[role]) + except PermissionOrHierarchyException: + await ctx.send("Can't, somehow") + else: + await ctx.maybe_send_embed("They are in the guild...assigned anyway.") + else: + + async with self.config.member_from_ids( + ctx.guild.id, user_id + ).roles() as sticky: + if role.id not in sticky: + sticky.append(role.id) + + await ctx.tick() + + @checks.is_owner() + @commands.command(name="rrcleanup", hidden=True) + async def rolemanagementcleanup(self, ctx: GuildContext): + """ :eyes: """ + data = await self.config.custom("REACTROLE").all() + + key_data = {} + + for maybe_message_id, maybe_data in data.items(): + try: + message_id = int(maybe_message_id) + except ValueError: + continue + + ex_keys = list(maybe_data.keys()) + if not ex_keys: + continue + + message = None + channel_id = maybe_data[ex_keys[0]]["channelid"] + channel = ctx.bot.get_channel(channel_id) + if channel: + with contextlib.suppress(discord.HTTPException): + assert isinstance(channel, discord.TextChannel) # nosec + message = await channel.fetch_message(message_id) + + if not message: + key_data.update({maybe_message_id: ex_keys}) + + for mid, keys in key_data.items(): + for k in keys: + await self.config.custom("REACTROLE", mid, k).clear() + + await ctx.tick() + + @commands.guild_only() + @commands.bot_has_permissions(manage_roles=True) + @checks.admin_or_permissions(manage_guild=True) + @commands.command(name="rolebind") + async def bind_role_to_reactions( + self, + ctx: GuildContext, + role: discord.Role, + channel: discord.TextChannel, + msgid: int, + emoji: str, + ): + """ + Binds a role to a reaction on a message... + + The role is only given if the criteria for it are met. + Make sure you configure the other settings for a role in [p]roleset + """ + + if not await self.all_are_valid_roles(ctx, role): + return await ctx.maybe_send_embed( + "Can't do that. Discord role heirarchy applies here." + ) + + try: + message = await channel.fetch_message(msgid) + except discord.HTTPException: + return await ctx.maybe_send_embed("No such message") + + _emoji: Optional[Union[discord.Emoji, str]] + + _emoji = discord.utils.find(lambda e: str(e) == emoji, self.bot.emojis) + if _emoji is None: + try: + await ctx.message.add_reaction(emoji) + except discord.HTTPException: + return await ctx.maybe_send_embed("No such emoji") + else: + _emoji = emoji + eid = self.strip_variations(emoji) + else: + eid = str(_emoji.id) + + if not any(str(r) == emoji for r in message.reactions): + try: + await message.add_reaction(_emoji) + except discord.HTTPException: + return await ctx.maybe_send_embed( + "Hmm, that message couldn't be reacted to" + ) + + cfg = self.config.custom("REACTROLE", str(message.id), eid) + await cfg.set( + { + "roleid": role.id, + "channelid": message.channel.id, + "guildid": role.guild.id, + } + ) + await ctx.send( + f"Remember, the reactions only function according to " + f"the rules set for the roles using `{ctx.prefix}roleset`", + delete_after=30, + ) + + @commands.guild_only() + @commands.bot_has_permissions(manage_roles=True) + @checks.admin_or_permissions(manage_guild=True) + @commands.command(name="roleunbind") + async def unbind_role_from_reactions( + self, ctx: commands.Context, role: discord.Role, msgid: int, emoji: str + ): + """ + unbinds a role from a reaction on a message + """ + + if not await self.all_are_valid_roles(ctx, role): + return await ctx.maybe_send_embed( + "Can't do that. Discord role heirarchy applies here." + ) + + await self.config.custom( + "REACTROLE", f"{msgid}", self.strip_variations(emoji) + ).clear() + await ctx.tick() + + @commands.guild_only() + @commands.bot_has_permissions(manage_roles=True) + @checks.admin_or_permissions(manage_guild=True) + @commands.group(name="roleset", autohelp=True) + async def rgroup(self, ctx: GuildContext): + """ + Settings for role requirements + """ + pass + + @rgroup.command(name="viewreactions") + async def rg_view_reactions(self, ctx: GuildContext): + """ + View the reactions enabled for the server + """ + # This design is intentional for later extention to view this per role + + use_embeds = await ctx.embed_requested() + react_roles = "\n".join( + [ + msg + async for msg in self.build_messages_for_react_roles( + *ctx.guild.roles, use_embeds=use_embeds + ) + ] + ) + + if not react_roles: + return await ctx.send("No react roles bound here.") + + # ctx.send is already going to escape said mentions if any somehow get generated + # should also not be possible to do so without willfully being done by an admin. + + color = await ctx.embed_colour() if use_embeds else None + + for page in pagify( + react_roles, escape_mass_mentions=False, page_length=1800, shorten_by=0 + ): + # unrolling iterative calling of ctx.maybe_send_embed + if use_embeds: + await ctx.send(embed=discord.Embed(description=page, color=color)) + else: + await ctx.send(page) + + @rgroup.command(name="viewrole") + async def rg_view_role(self, ctx: GuildContext, *, role: discord.Role): + """ + Views the current settings for a role + """ + + rsets = await self.config.role(role).all() + + output = ( + f"This role:\n{'is' if rsets['self_role'] else 'is not'} self assignable" + f"\n{'is' if rsets['self_removable'] else 'is not'} self removable" + f"\n{'is' if rsets['sticky'] else 'is not'} sticky." + ) + if rsets["requires_any"]: + rstring = ", ".join( + r.name for r in ctx.guild.roles if r.id in rsets["requires_any"] + ) + output += f"\nThis role requires any of the following roles: {rstring}" + if rsets["requires_all"]: + rstring = ", ".join( + r.name for r in ctx.guild.roles if r.id in rsets["requires_all"] + ) + output += f"\nThis role requires all of the following roles: {rstring}" + if rsets["exclusive_to"]: + rstring = ", ".join( + r.name for r in ctx.guild.roles if r.id in rsets["exclusive_to"] + ) + output += ( + f"\nThis role is mutually exclusive to the following roles: {rstring}" + ) + if rsets["cost"]: + curr = await bank.get_currency_name(ctx.guild) + cost = rsets["cost"] + output += f"\nThis role costs {cost} {curr}" + else: + output += "\nThis role does not have an associated cost." + + if rsets["subscription"]: + s = rsets["subscription"] + output += f"\nThis role has a subscription time of: {parse_seconds(s)}" + + for page in pagify(output): + await ctx.send(page) + + @rgroup.command(name="cost") + async def make_purchasable( + self, ctx: GuildContext, cost: int, *, role: discord.Role + ): + """ + Makes a role purchasable for a specified cost. + Cost must be a number greater than 0. + A cost of exactly 0 can be used to remove purchasability. + + Purchase eligibility still follows other rules including self assignable. + + Warning: If these roles are bound to a reaction, + it will be possible to gain these without paying. + """ + + if not await self.all_are_valid_roles(ctx, role): + return await ctx.maybe_send_embed( + "Can't do that. Discord role heirarchy applies here." + ) + + if cost < 0: + return await ctx.send_help() + + await self.config.role(role).cost.set(cost) + if cost == 0: + await ctx.send(f"{role.name} is no longer purchasable.") + else: + await ctx.send(f"{role.name} is purchasable for {cost}") + + @rgroup.command(name="subscription") + async def subscription(self, ctx, role: discord.Role, *, interval: str): + """ + Sets a role to be a subscription, must set cost first. + Will charge role's cost every interval, and remove the role if they run out of money + Set to 0 to disable + **__Minimum subscription duration is 1 hour__** + Intervals look like: + 5 minutes + 1 minute 30 seconds + 1 hour + 2 days + 30 days + 5h30m + (etc) + """ + if not await self.all_are_valid_roles(ctx, role): + return await ctx.maybe_send_embed( + "Can't do that. Discord role heirarchy applies here." + ) + role_cost = await self.config.role(role).cost() + + if role_cost == 0: + await ctx.send(waring("Please set a cost for the role first.")) + return + + time = parse_timedelta(interval) + if int(time.total_seconds()) == 0: + await ctx.send("Subscription removed.") + async with self.config.guild(ctx.guild).s_roles() as s: + s.remove(role.id) + return + elif int(time.total_seconds()) < MIN_SUB_TIME: + await ctx.send("Subscriptions must be 1 hour or longer.") + return + + await self.config.role(role).subscription.set(int(time.total_seconds())) + async with self.config.guild(ctx.guild).s_roles() as s: + s.append(role.id) + await ctx.send(f"Subscription set to {parse_seconds(time.total_seconds())}.") + + @rgroup.command(name="forbid") + async def forbid_role( + self, ctx: GuildContext, role: discord.Role, *, user: discord.Member + ): + """ + Forbids a user from gaining a specific role. + """ + async with self.config.member(user).forbidden() as fb: + if role.id not in fb: + fb.append(role.id) + else: + await ctx.send("Role was already forbidden") + await ctx.tick() + + @rgroup.command(name="unforbid") + async def unforbid_role( + self, ctx: GuildContext, role: discord.Role, *, user: discord.Member + ): + """ + Unforbids a user from gaining a specific role. + """ + async with self.config.member(user).forbidden() as fb: + if role.id in fb: + fb.remove(role.id) + else: + await ctx.send("Role was not forbidden") + await ctx.tick() + + @rgroup.command(name="exclusive") + async def set_exclusivity(self, ctx: GuildContext, *roles: discord.Role): + """ + Takes 2 or more roles and sets them as exclusive to eachother + """ + + _roles = set(roles) + + if len(_roles) < 2: + return await ctx.send("You need to provide at least 2 roles") + + for role in _roles: + async with self.config.role(role).exclusive_to() as ex_list: + ex_list.extend( + [r.id for r in _roles if r != role and r.id not in ex_list] + ) + await ctx.tick() + + @rgroup.command(name="unexclusive") + async def unset_exclusivity(self, ctx: GuildContext, *roles: discord.Role): + """ + Takes any number of roles, and removes their exclusivity settings + """ + + _roles = set(roles) + + if not _roles: + return await ctx.send("You need to provide at least a role to do this to") + + for role in _roles: + ex_list = await self.config.role(role).exclusive_to() + ex_list = [idx for idx in ex_list if idx not in [r.id for r in _roles]] + await self.config.role(role).exclusive_to.set(ex_list) + await ctx.tick() + + @rgroup.command(name="sticky") + async def setsticky( + self, ctx: GuildContext, role: discord.Role, sticky: bool = None + ): + """ + sets a role as sticky if used without a settings, gets the current ones + """ + + if sticky is None: + is_sticky = await self.config.role(role).sticky() + return await ctx.send( + "{role} {verb} sticky".format( + role=role.name, verb=("is" if is_sticky else "is not") + ) + ) + + await self.config.role(role).sticky.set(sticky) + if sticky: + for m in role.members: + async with self.config.member(m).roles() as rids: + if role.id not in rids: + rids.append(role.id) + + await ctx.tick() + + #TODO set roles who don't need to pay for roles + @rgroup.command(name="requireall") + async def reqall(self, ctx: GuildContext, role: discord.Role, *roles: discord.Role): + """ + Sets the required roles to gain a role + + Takes a role plus zero or more other roles (as requirements for the first) + """ + + rids = [r.id for r in roles] + await self.config.role(role).requires_all.set(rids) + await ctx.tick() + + @rgroup.command(name="requireany") + async def reqany(self, ctx: GuildContext, role: discord.Role, *roles: discord.Role): + """ + Sets a role to require already having one of another + + Takes a role plus zero or more other roles (as requirements for the first) + """ + + rids = [r.id for r in (roles or [])] + await self.config.role(role).requires_any.set(rids) + await ctx.tick() + + @rgroup.command(name="selfrem") + async def selfrem( + self, ctx: GuildContext, role: discord.Role, removable: bool = None + ): + """ + Sets if a role is self-removable (default False) + + use without a setting to view current + """ + + if removable is None: + is_removable = await self.config.role(role).self_removable() + return await ctx.send( + "{role} {verb} self-removable".format( + role=role.name, verb=("is" if is_removable else "is not") + ) + ) + + await self.config.role(role).self_removable.set(removable) + await ctx.tick() + + @rgroup.command(name="selfadd") + async def selfadd( + self, ctx: GuildContext, role: discord.Role, assignable: bool = None + ): + """ + Sets if a role is self-assignable via command + + (default False) + + use without a setting to view current + """ + + if assignable is None: + is_assignable = await self.config.role(role).self_role() + return await ctx.send( + "{role} {verb} self-assignable".format( + role=role.name, verb=("is" if is_assignable else "is not") + ) + ) + + await self.config.role(role).self_role.set(assignable) + await ctx.tick() + + @rgroup.group(name="freerole") + async def free_roles(self, ctx: GuildContext): + """ + Sets roles that bypass costs for purchasing roles in your guild. + """ + pass + + @free_roles.command(name="add") + async def free_roles_add(self, ctx: GuildContext, *, role: discord.Role): + """ + Add a role to the free list. + """ + async with self.config.guild(ctx.guild).free_roles() as free_roles: + if role.id not in free_roles: + free_roles.append(role.id) + + await ctx.tick() + + @free_roles.command(name="rem") + async def free_roles_rem(self, ctx: GuildContext, *, role: discord.Role): + """ + Remove a role from the free list. + """ + async with self.config.guild(ctx.guild).free_roles() as free_roles: + try: + free_roles.remove(role.id) + except: + await ctx.send("Role not in free list!") + return + + await ctx.tick() + + @free_roles.command(name="list") + async def free_roles_list(self, ctx: GuildContext): + """ + List free roles. + """ + roles = await self.config.guild(ctx.guild).free_roles() + if not roles: + await ctx.send("No roles defined.") + return + roles = [ctx.guild.get_role(role) for role in roles] + missing = len([role for role in roles if role is None]) + roles = [f"{i+1}.{role.name}" for i, role in enumerate(roles) if role is not None] + + msg = "\n".join(sorted(roles)) + msg = pagify(msg) + for m in msg: + await ctx.send(box(m)) + + @checks.bot_has_permissions(manage_roles=True) + @commands.guild_only() + @commands.group(name="srole", autohelp=True) + async def srole(self, ctx: GuildContext): + """ + Self assignable role commands + """ + pass + + @srole.command(name="list") + async def srole_list(self, ctx: GuildContext): + """ + Lists the selfroles and any associated costs. + """ + + MYPY = False + if MYPY: + # remove this when mypy supports type narrowing from := + # It's less efficient, so not removing the actual + # implementation below + data: Dict[discord.Role, tuple] = {} + for role_id, vals in (await self.config.all_roles()).items(): + role = ctx.guild.get_role(role_id) + if role and vals["self_role"]: + data[role] = vals["cost"] + else: + data = { + role: (vals["cost"], vals["subscription"]) + for role_id, vals in (await self.config.all_roles()).items() + if (role := ctx.guild.get_role(role_id)) and vals["self_role"] + } + + if not data: + return await ctx.send("There aren't any self roles here.") + + embed = discord.Embed(title="Roles", colour=ctx.guild.me.colour) + i = 0 + for role, (cost, sub) in sorted(data.items(), key=lambda kv: kv[1]): + embed.add_field( + name=f"__**{i+1}. {role.name}**__", + value="%s%s" % ((f"Cost: {cost}" if cost else "Free"), (f", every {parse_seconds(sub)}" if sub else "")) + ) + i += 1 + if i % 25 == 0: + await ctx.send(embed=embed) + + await ctx.send(embed=embed) + + @srole.command(name="buy") + async def srole_buy(self, ctx: GuildContext, *, role: discord.Role): + """ + Purchase a role + """ + if role in ctx.author.roles: + await ctx.send("You already have that role.") + return + try: + remove = await self.is_self_assign_eligible(ctx.author, role) + eligible = await self.config.role(role).self_role() + cost = await self.config.role(role).cost() + subscription = await self.config.role(role).subscription() + except RoleManagementException: + return + except PermissionOrHierarchyException: + await ctx.send( + "I cannot assign roles which I can not manage. (Discord Hierarchy)" + ) + else: + if not eligible: + return await ctx.send( + f"You aren't allowed to add `{role}` to yourself {ctx.author.mention}!" + ) + + if not cost: + return await ctx.send( + "This role doesn't have a cost. Please try again using `[p]srole add`." + ) + + free_roles = await self.config.guild(ctx.guild).free_roles() + currency_name = await bank.get_currency_name(ctx.guild) + for m_role in ctx.author.roles: + if m_role.id in free_roles: + await ctx.send(f"You're special, no {currency_name} will be deducted from your account.") + await self.update_roles_atomically( + who=ctx.author, give=[role], remove=remove + ) + await ctx.tick() + return + + try: + await bank.withdraw_credits(ctx.author, cost) + except ValueError: + return await ctx.send( + f"You don't have enough {currency_name} (Cost: {cost} {currency_name})" + ) + else: + if subscription > 0: + await ctx.send(f"{role.name} will be renewed every {parse_seconds(subscription)}") + async with self.config.role(role).subscribed_users() as s: + s[str(ctx.author.id)] = time.time() + subscription + async with self.config.guild(ctx.guild).s_roles() as s: + if role.id not in s: + s.append(role.id) + + await self.update_roles_atomically( + who=ctx.author, give=[role], remove=remove + ) + await ctx.tick() + + @srole.command(name="add") + async def sadd(self, ctx: GuildContext, *, role: discord.Role): + """ + Join a role + """ + if role in ctx.author.roles: + await ctx.send("You already have that role.") + return + try: + remove = await self.is_self_assign_eligible(ctx.author, role) + eligible = await self.config.role(role).self_role() + cost = await self.config.role(role).cost() + except RoleManagementException: + return + except PermissionOrHierarchyException: + await ctx.send( + "I cannot assign roles which I can not manage. (Discord Hierarchy)" + ) + else: + if not eligible: + await ctx.send( + f"You aren't allowed to add `{role}` to yourself {ctx.author.mention}!" + ) + + elif cost: + await ctx.send( + "This role is not free. " + "Please use `[p]srole buy` if you would like to purchase it." + ) + else: + await self.update_roles_atomically( + who=ctx.author, give=[role], remove=remove + ) + await ctx.tick() + + @srole.command(name="remove") + async def srem(self, ctx: GuildContext, *, role: discord.Role): + """ + leave a role + """ + if role not in ctx.author.roles: + await ctx.send("You do not have that role.") + return + if await self.config.role(role).self_removable(): + await self.update_roles_atomically(who=ctx.author, remove=[role]) + try: # remove subscription, if any + async with self.config.role(role).subscribed_users() as s: + del s[str(ctx.author.id)] + except: + pass + await ctx.tick() + else: + await ctx.send( + f"You aren't allowed to remove `{role}` from yourself {ctx.author.mention}!`" + ) + + # Stuff for clean interaction with react role entries + + async def build_messages_for_react_roles( + self, *roles: discord.Role, use_embeds=True + ) -> AsyncIterator[str]: + """ + Builds info. + + Info is suitable for passing to embeds if use_embeds is True + """ + + linkfmt = ( + "[message #{message_id}](https://discordapp.com/channels/{guild_id}/{channel_id}/{message_id})" + if use_embeds + else "" + ) + + for role in roles: + # pylint: disable=E1133 + async for message_id, emoji_info, data in self.get_react_role_entries(role): + + channel_id = data.get("channelid", None) + if channel_id: + link = linkfmt.format( + guild_id=role.guild.id, + channel_id=channel_id, + message_id=message_id, + ) + else: + link = ( + f"unknown message with id {message_id}" + f" (use `roleset fixup` to find missing data for this)" + ) + + emoji: Union[discord.Emoji, str] + if emoji_info.isdigit(): + emoji = ( + discord.utils.get(self.bot.emojis, id=int(emoji_info)) + or f"A custom enoji with id {emoji_info}" + ) + else: + emoji = emoji_info + + react_m = f"{role.name} is bound to {emoji} on {link}" + yield react_m + + async def get_react_role_entries( + self, role: discord.Role + ) -> AsyncIterator[Tuple[str, str, dict]]: + """ + yields: + str, str, dict + + first str: message id + second str: emoji id or unicode codepoint + dict: data from the corresponding: + config.custom("REACTROLE", messageid, emojiid) + """ + + # self.config.register_custom( + # "REACTROLE", roleid=None, channelid=None, guildid=None + # ) # ID : Message.id, str(React) + + data = await self.config.custom("REACTROLE").all() + + for mid, _outer in data.items(): + if not _outer or not isinstance(_outer, dict): + continue + for em, rdata in _outer.items(): + if rdata and rdata["roleid"] == role.id: + yield (mid, em, rdata) diff --git a/rolemanagement/events.py b/rolemanagement/events.py new file mode 100644 index 0000000..0d14eab --- /dev/null +++ b/rolemanagement/events.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import List + +import discord +from redbot.core import commands + +from .abc import MixinMeta +from .exceptions import RoleManagementException, PermissionOrHierarchyException + + +class EventMixin(MixinMeta): + def verification_level_issue(self, member: discord.Member) -> bool: + """ + Returns True if this would bypass verification level settings + + prevent react roles from bypassing time limits. + It's exceptionally dumb that users can react while + restricted by verification level, but that's Discord. + They block reacting to blocked users, but interacting + with entire guilds by reaction before hand? A-OK. *eyerolls* + + Can't check the email/2FA, blame discord for allowing people to react with above. + """ + guild: discord.Guild = member.guild + now = datetime.utcnow() + level: int = guild.verification_level.value + + if level >= 3 and member.created_at + timedelta(minutes=5) > now: # medium + return True + + if level >= 4: # high + if not member.joined_at or member.joined_at + timedelta(minutes=10) > now: + return True + + return False + + @commands.Cog.listener() + async def on_member_update(self, before: discord.Member, after: discord.Member): + """ + DEP-WARN + Section has been optimized assuming member._roles + remains an iterable containing snowflakes + """ + await self.wait_for_ready() + if before._roles == after._roles: + return + + lost, gained = set(before._roles), set(after._roles) + lost, gained = lost - gained, gained - lost + sym_diff = lost | gained + + for r in sym_diff: + if not await self.config.role_from_id(r).sticky(): + lost.discard(r) + gained.discard(r) + + async with self.config.member(after).roles() as rids: + for r in lost: + while r in rids: + rids.remove(r) + for r in gained: + if r not in rids: + rids.append(r) + + @commands.Cog.listener() + async def on_member_join(self, member: discord.Member): + await self.wait_for_ready() + guild = member.guild + if not guild.me.guild_permissions.manage_roles: + return + + async with self.config.member(member).roles() as rids: + to_add: List[discord.Role] = [] + for _id in rids: + role = discord.utils.get(guild.roles, id=_id) + if not role: + continue + if await self.config.role(role).sticky(): + to_add.append(role) + if to_add: + to_add = [r for r in to_add if r < guild.me.top_role] + await member.add_roles(*to_add) + + @commands.Cog.listener() + async def on_raw_reaction_add( + self, payload: discord.raw_models.RawReactionActionEvent + ): + await self.wait_for_ready() + if not payload.guild_id: + return + + emoji = payload.emoji + if emoji.is_custom_emoji(): + eid = str(emoji.id) + else: + eid = self.strip_variations(str(emoji)) + + cfg = self.config.custom("REACTROLE", str(payload.message_id), eid) + rid = await cfg.roleid() + if rid is None or not await self.config.role_from_id(rid).self_role(): + return + + guild = self.bot.get_guild(payload.guild_id) + if guild: + await self.maybe_update_guilds(guild) + else: + return + member = guild.get_member(payload.user_id) + + if member is None or member.bot: + return + + if self.verification_level_issue(member): + return + + role = guild.get_role(rid) + if role is None or role in member.roles: + return + + try: + remove = await self.is_self_assign_eligible(member, role) + except (RoleManagementException, PermissionOrHierarchyException): + pass + else: + await self.update_roles_atomically(who=member, give=[role], remove=remove) + + @commands.Cog.listener() + async def on_raw_reaction_remove( + self, payload: discord.raw_models.RawReactionActionEvent + ): + await self.wait_for_ready() + if not payload.guild_id: + return + + emoji = payload.emoji + + if emoji.is_custom_emoji(): + eid = str(emoji.id) + else: + eid = self.strip_variations(str(emoji)) + + cfg = self.config.custom("REACTROLE", str(payload.message_id), eid) + rid = await cfg.roleid() + + if rid is None: + return + + if await self.config.role_from_id(rid).self_removable(): + guild = self.bot.get_guild(payload.guild_id) + if not guild: + # Where's it go? + return + member = guild.get_member(payload.user_id) + if not member or member.bot: + return + role = discord.utils.get(guild.roles, id=rid) + if not role or role not in member.roles: + return + if guild.me.guild_permissions.manage_roles and guild.me.top_role > role: + await self.update_roles_atomically(who=member, give=None, remove=[role]) diff --git a/rolemanagement/exceptions.py b/rolemanagement/exceptions.py new file mode 100644 index 0000000..e371d8a --- /dev/null +++ b/rolemanagement/exceptions.py @@ -0,0 +1,20 @@ +from __future__ import annotations + + +class RoleManagementException(Exception): + pass + +class PermissionOrHierarchyException(Exception): + pass + +class MissingRequirementsException(RoleManagementException): + def __init__(self, *, miss_any=None, miss_all=None): + self.miss_all = miss_all or [] + self.miss_any = miss_any or [] + super().__init__() + + +class ConflictingRoleException(RoleManagementException): + def __init__(self, *, conflicts=None): + self.conflicts = conflicts or [] + super().__init__() diff --git a/rolemanagement/future_sql.py b/rolemanagement/future_sql.py new file mode 100644 index 0000000..2dcff66 --- /dev/null +++ b/rolemanagement/future_sql.py @@ -0,0 +1,72 @@ +# Below is planned schema for SQLite handling and expansion of functionality. +""" +CREATE TABLE IF NOT EXISTS roles ( + role_id INTEGER PRIMARY KEY NOT NULL, + self_role BOOLEAN DEFAULT FALSE, + sticky BOOLEAN DEFAULT FALSE, + self_removable BOOLEAN DEFAULT FALSE, + -- useful for preventing pre 10 minute bypass + -- and just for keeping it to people who have been around a bit + minimum_join_time INTEGER DEFAULT 0, + cost INTEGER DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS groups ( + uid INTEGER AUTOINCREMENT, + name TEXT, + minimum INTEGER DEFAULT 0, + maximum INTEGER DEFAULT NULL +); + +CREATE TABLE IF NOT EXISTS exclusions ( + role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE, + blocks_role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS requires ( + role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE, + requires_role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS actions ( + message_id INTEGER NOT NULL, + reaction TEXT NOT NULL -- unicode emoji or str(discord.Emoji.id), + channel_id INTEGER NOT NULL, + guild_id INTEGER NOT NULL, + reaction_id INTEGER REFERENCES reactions(uid) ON DELETE CASCADE, + action_type INTEGER, -- handle as enum from python + role_id INTEGER REFERENCES roles(role_id) +); + +CREATE TABLE IF NOT EXISTS members ( + member_id INTEGER NOT NULL, + guild_id INTEGER NOT NULL, + kaizo_locked BOOLEAN DEFAULT FALSE, + kaizo_kicked BOOLEAN DEFAULT FALSE, + kaizo_banned BOOLEAN DEFAULT FALSE +); + +CREATE TABLE IF NOT EXISTS sticky_roles ( + member_id INTEGER NOT NULL, -- intentional no ref + role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE, + is_stickied BOOLEAN DEFAULT FALSE +); +""" + + +# Below are some action types +import enum + + +class ActionType(enum.IntEnum): + TOGGLE = 1 + ADD = 2 + REMOVE = 3 + INVERTED_TOGGLE = 4 + # anti auto react bot measures + # A subset of spam bots appear to be attempting to auto react + # to things which appear to be verification channels + # (As of at least September 2019) + KAIZO_LOCK = 5 + KAIZO_KICK = 6 + KAIZO_BAN = 7 diff --git a/rolemanagement/info.json b/rolemanagement/info.json new file mode 100644 index 0000000..2e71a2d --- /dev/null +++ b/rolemanagement/info.json @@ -0,0 +1,20 @@ +{ + "author": [ + "mikeshardmind(Sinbad)", + "DiscordLiz" + ], + "install_msg": "If you need help, I have a channel in https://discord.gg/mb85deu", + "name": "RoleManagement", + "disabled": false, + "short": "Role searches, reactroles, requirements for roles, etc.", + "description": "React roles with requirements, selfroles with requirements, role based member search and more.", + "tags": [ + "react_roles", + "react roles", + "rolemanagement", + "role search", + "massrole" + ], + "hidden": false, + "min_bot_version": "3.2.0a0.dev1" +} \ No newline at end of file diff --git a/rolemanagement/massmanager.py b/rolemanagement/massmanager.py new file mode 100644 index 0000000..bc25db3 --- /dev/null +++ b/rolemanagement/massmanager.py @@ -0,0 +1,338 @@ +import csv +import io +import logging +from typing import Optional, cast, Set + +import discord +from redbot.core import checks, commands + +from .abc import MixinMeta +from .converters import ( + RoleSyntaxConverter, + ComplexActionConverter, + ComplexSearchConverter, +) +from .exceptions import RoleManagementException + +try: + from redbot.core.commands import GuildContext +except ImportError: + from redbot.core.commands import Context as GuildContext # type: ignore + + +log = logging.getLogger("red.sinbadcogs.rolemanagement.massmanager") + + +class MassManagementMixin(MixinMeta): + """ + Mass role operations + """ + + @commands.guild_only() + @checks.admin_or_permissions(manage_roles=True) + @commands.group(name="massrole", autohelp=True, aliases=["mrole"]) + async def mrole(self, ctx: GuildContext): + """ + Commands for mass role management + """ + pass + + @staticmethod + def search_filter(members: set, query: dict) -> set: + """ + Reusable + """ + + if query["everyone"]: + return members + + all_set: Set[discord.Member] = set() + if query["all"]: + first, *rest = query["all"] + all_set = set(first.members) + for other_role in rest: + all_set &= set(other_role.members) + + none_set: Set[discord.Member] = set() + if query["none"]: + for role in query["none"]: + none_set.update(role.members) + + any_set: Set[discord.Member] = set() + if query["any"]: + for role in query["any"]: + any_set.update(role.members) + + minimum_perms: Optional[discord.Permissions] = None + if query["hasperm"]: + minimum_perms = discord.Permissions() + minimum_perms.update(**{x: True for x in query["hasperm"]}) + + def mfilter(m: discord.Member) -> bool: + if query["bots"] and not m.bot: + return False + + if query["humans"] and m.bot: + return False + + if query["any"] and m not in any_set: + return False + + if query["all"] and m not in all_set: + return False + + if query["none"] and m in none_set: + return False + + if query["hasperm"] and not m.guild_permissions.is_superset(minimum_perms): + return False + + if query["anyperm"] and not any( + bool(value and perm in query["anyperm"]) + for perm, value in iter(m.guild_permissions) + ): + return False + + if query["notperm"] and any( + bool(value and perm in query["notperm"]) + for perm, value in iter(m.guild_permissions) + ): + return False + + if query["noroles"] and len(m.roles) != 1: + return False + + # 0 is a valid option for these, everyone role not counted + if query["quantity"] is not None and len(m.roles) - 1 != query["quantity"]: + return False + + if query["lt"] is not None and len(m.roles) - 1 >= query["lt"]: + return False + + if query["gt"] is not None and len(m.roles) - 1 <= query["gt"]: + return False + + if query["above"] and m.top_role <= query["above"]: + return False + + if query["below"] and m.top_role >= query["below"]: + return False + + return True + + members = {m for m in members if mfilter(m)} + + return members + + @mrole.command(name="user") + async def mrole_user( + self, + ctx: GuildContext, + users: commands.Greedy[discord.Member], + *, + _query: RoleSyntaxConverter, + ) -> None: + """ + adds/removes roles to one or more users + + You cannot add and remove the same role + + Example Usage: + + [p]massrole user Sinbad --add RoleToGive "Role with spaces to give" + --remove RoleToRemove "some other role to remove" Somethirdrole + + [p]massrole user LoudMouthedUser ProfaneUser --add muted + + For role operations based on role membership, permissions had, or whether someone is a bot + (or even just add to/remove from all) see `[p]massrole search` and `[p]massrole modify` + """ + query = _query.parsed + apply = query["add"] + query["remove"] + if not await self.all_are_valid_roles(ctx, *apply): + await ctx.send( + "Either you or I don't have the required permissions " + "or position in the hierarchy." + ) + return + + for user in users: + await self.update_roles_atomically( + who=user, give=query["add"], remove=query["remove"] + ) + + await ctx.tick() + + @mrole.command(name="search") + async def mrole_search(self, ctx: GuildContext, *, _query: ComplexSearchConverter): + """ + Searches for users with the specified role criteria + + --has-all roles + --has-none roles + --has-any roles + + --has-no-roles + --has-exactly-nroles number + --has-more-than-nroles number + --has-less-than-nroles number + + --has-perm permissions + --any-perm permissions + --not-perm permissions + + --above role + --below role + + --only-humans + --only-bots + --everyone + + --csv + + csv output will be used if output would exceed embed limits, or if flag is provided + """ + + members = set(ctx.guild.members) + query = _query.parsed + members = self.search_filter(members, query) + + if len(members) < 50 and not query["csv"]: + + def chunker(memberset, size=3): + ret_str = "" + for i, m in enumerate(memberset, 1): + ret_str += m.mention + if i % size == 0: + ret_str += "\n" + else: + ret_str += " " + return ret_str + + description = chunker(members) + embed = discord.Embed(description=description) + if ctx.guild: + embed.color = ctx.guild.me.color + await ctx.send( + embed=embed, content=f"Search results for {ctx.author.mention}" + ) + + else: + await self.send_maybe_chunked_csv(ctx, list(members)) + + @staticmethod + async def send_maybe_chunked_csv(ctx: GuildContext, members): + chunk_size = 75000 + chunks = [ + members[i : (i + chunk_size)] for i in range(0, len(members), chunk_size) + ] + + for part, chunk in enumerate(chunks, 1): + + csvf = io.StringIO() + fieldnames = [ + "ID", + "Display Name", + "Username#Discrim", + "Joined Server", + "Joined Discord", + ] + fmt = "%Y-%m-%d" + writer = csv.DictWriter(csvf, fieldnames=fieldnames) + writer.writeheader() + for member in chunk: + writer.writerow( + { + "ID": member.id, + "Display Name": member.display_name, + "Username#Discrim": str(member), + "Joined Server": member.joined_at.strftime(fmt) + if member.joined_at + else None, + "Joined Discord": member.created_at.strftime(fmt), + } + ) + + csvf.seek(0) + b_data = csvf.read().encode() + data = io.BytesIO(b_data) + data.seek(0) + filename = f"{ctx.message.id}" + if len(chunks) > 1: + filename += f"-part{part}" + filename += ".csv" + await ctx.send( + content=f"Data for {ctx.author.mention}", + files=[discord.File(data, filename=filename)], + ) + csvf.close() + data.close() + del csvf + del data + + @mrole.command(name="modify") + async def mrole_complex(self, ctx: GuildContext, *, _query: ComplexActionConverter): + """ + Similar syntax to search, while applying/removing roles + + --has-all roles + --has-none roles + --has-any roles + + --has-no-roles + --has-exactly-nroles number + --has-more-than-nroles number + --has-less-than-nroles number + + --has-perm permissions + --any-perm permissions + --not-perm permissions + + --above role + --below role + + --only-humans + --only-bots + --everyone + + --add roles + --remove roles + """ + query = _query.parsed + apply = query["add"] + query["remove"] + if not await self.all_are_valid_roles(ctx, *apply): + return await ctx.send( + "Either you or I don't have the required permissions " + "or position in the hierarchy." + ) + + members = set(ctx.guild.members) + members = self.search_filter(members, query) + + if len(members) > 100: + await ctx.send( + "This may take a while given the number of members to update." + ) + + async with ctx.typing(): + for member in members: + try: + await self.update_roles_atomically( + who=member, give=query["add"], remove=query["remove"] + ) + except RoleManagementException: + log.debug( + "Internal filter failure on member id %d guild id %d query %s", + member.id, + ctx.guild.id, + query, + ) + except discord.HTTPException: + log.debug( + "Unpredicted failure for member id %d in guild id %d query %s", + member.id, + ctx.guild.id, + query, + ) + + await ctx.tick() diff --git a/rolemanagement/utils.py b/rolemanagement/utils.py new file mode 100644 index 0000000..de26bba --- /dev/null +++ b/rolemanagement/utils.py @@ -0,0 +1,202 @@ +from __future__ import annotations + +import re +from typing import List +from datetime import timedelta +import discord + +from .abc import MixinMeta +from .exceptions import ( + ConflictingRoleException, + MissingRequirementsException, + PermissionOrHierarchyException, +) + +variation_stripper_re = re.compile(r"[\ufe00-\ufe0f]") + +TIME_RE_STRING = r"\s?".join( + [ + r"((?P\d+?)\s?(weeks?|w))?", + r"((?P\d+?)\s?(days?|d))?", + r"((?P\d+?)\s?(hours?|hrs|hr?))?", + r"((?P\d+?)\s?(minutes?|mins?|m(?!o)))?", # prevent matching "months" + r"((?P\d+?)\s?(seconds?|secs?|s))?", + ] +) + +TIME_RE = re.compile(TIME_RE_STRING, re.I) + +def parse_timedelta(argument: str) -> timedelta: + """ + Parses a string that contains a time interval and converts it to a timedelta object. + """ + matches = TIME_RE.match(argument) + if matches: + params = {k: int(v) for k, v in matches.groupdict().items() if v} + if params: + return timedelta(**params) + return None + +def parse_seconds(seconds) -> str: + """ + Take seconds and converts it to larger units + Returns parsed message string + """ + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + days, hours = divmod(hours, 24) + weeks, days = divmod(days, 7) + months, weeks = divmod(weeks, 4) + msg = [] + + if months: + msg.append(f"{int(months)} {'months' if months > 1 else 'month'}") + if weeks: + msg.append(f"{int(weeks)} {'weeks' if weeks > 1 else 'week'}") + if days: + msg.append(f"{int(days)} {'days' if days > 1 else 'day'}") + if hours: + msg.append(f"{int(hours)} {'hours' if hours > 1 else 'hour'}") + if minutes: + msg.append(f"{int(minutes)} {'minutes' if minutes > 1 else 'minute'}") + if seconds: + msg.append(f"{int(seconds)} {'seconds' if seconds > 1 else 'second'}") + + return ", ".join(msg) + + +class UtilMixin(MixinMeta): + """ + Mixin for utils, some of which need things stored in the class + """ + + def strip_variations(self, s: str) -> str: + """ + Normalizes emoji, removing variation selectors + """ + return variation_stripper_re.sub("", s) + + async def update_roles_atomically( + self, + *, + who: discord.Member, + give: List[discord.Role] = None, + remove: List[discord.Role] = None, + ): + """ + Give and remove roles as a single op with some slight sanity + wrapping + """ + me = who.guild.me + give = give or [] + remove = remove or [] + heirarchy_testing = give + remove + roles = [r for r in who.roles if r not in remove] + roles.extend([r for r in give if r not in roles]) + if sorted(roles) == sorted(who.roles): + return + if ( + any(r >= me.top_role for r in heirarchy_testing) + or not me.guild_permissions.manage_roles + ): + raise PermissionOrHierarchyException("Can't do that.") + await who.edit(roles=roles) + + async def all_are_valid_roles(self, ctx, *roles: discord.Role) -> bool: + """ + Quick heirarchy check on a role set in syntax returned + """ + author = ctx.author + guild = ctx.guild + + # Author allowed + if not ( + (guild.owner == author) + or all(author.top_role > role for role in roles) + or await ctx.bot.is_owner(ctx.author) + ): + return False + + # Bot allowed + if not ( + guild.me.guild_permissions.manage_roles + and ( + guild.me == guild.owner + or all(guild.me.top_role > role for role in roles) + ) + ): + return False + + # Sanity check on managed roles + if any(role.managed for role in roles): + return False + + return True + + async def is_self_assign_eligible( + self, who: discord.Member, role: discord.Role + ) -> List[discord.Role]: + """ + Returns a list of roles to be removed if this one is added, or raises an + exception + """ + await self.check_required(who, role) + + ret: List[discord.Role] = await self.check_exclusivity(who, role) + + forbidden = await self.config.member(who).forbidden() + if role.id in forbidden: + raise PermissionOrHierarchyException() + + guild = who.guild + if not guild.me.guild_permissions.manage_roles or role > guild.me.top_role: + raise PermissionOrHierarchyException() + + return ret + + async def check_required(self, who: discord.Member, role: discord.Role) -> None: + """ + Raises an error on missing reqs + """ + + req_any = await self.config.role(role).requires_any() + req_any_fail = req_any[:] + if req_any: + for idx in req_any: + if who._roles.has(idx): + req_any_fail = [] + break + + req_all_fail = [ + idx + for idx in await self.config.role(role).requires_all() + if not who._roles.has(idx) + ] + + if req_any_fail or req_all_fail: + raise MissingRequirementsException( + miss_all=req_all_fail, miss_any=req_any_fail + ) + + return None + + async def check_exclusivity( + self, who: discord.Member, role: discord.Role + ) -> List[discord.Role]: + """ + Returns a list of roles to remove, or raises an error + """ + + data = await self.config.all_roles() + ex = data.get(role.id, {}).get("exclusive_to", []) + conflicts: List[discord.Role] = [r for r in who.roles if r.id in ex] + + for r in conflicts: + if not data.get(r.id, {}).get("self_removable", False): + raise ConflictingRoleException(conflicts=conflicts) + return conflicts + + async def maybe_update_guilds(self, *guilds: discord.Guild): + _guilds = [g for g in guilds if not g.unavailable and g.large and not g.chunked] + if _guilds: + await self.bot.request_offline_members(*_guilds)