import csv import io import logging from typing import Optional, cast, Set import discord from redbot.core import checks, commands from .abc import MixinMeta from .converters import ( RoleSyntaxConverter, ComplexActionConverter, ComplexSearchConverter, ) from .exceptions import RoleManagementException try: from redbot.core.commands import GuildContext except ImportError: from redbot.core.commands import Context as GuildContext # type: ignore log = logging.getLogger("red.sinbadcogs.rolemanagement.massmanager") class MassManagementMixin(MixinMeta): """ Mass role operations """ @commands.guild_only() @checks.admin_or_permissions(manage_roles=True) @commands.group(name="massrole", autohelp=True, aliases=["mrole"]) async def mrole(self, ctx: GuildContext): """ Commands for mass role management """ pass @staticmethod def search_filter(members: set, query: dict) -> set: """ Reusable """ if query["everyone"]: return members all_set: Set[discord.Member] = set() if query["all"]: first, *rest = query["all"] all_set = set(first.members) for other_role in rest: all_set &= set(other_role.members) none_set: Set[discord.Member] = set() if query["none"]: for role in query["none"]: none_set.update(role.members) any_set: Set[discord.Member] = set() if query["any"]: for role in query["any"]: any_set.update(role.members) minimum_perms: Optional[discord.Permissions] = None if query["hasperm"]: minimum_perms = discord.Permissions() minimum_perms.update(**{x: True for x in query["hasperm"]}) def mfilter(m: discord.Member) -> bool: if query["bots"] and not m.bot: return False if query["humans"] and m.bot: return False if query["any"] and m not in any_set: return False if query["all"] and m not in all_set: return False if query["none"] and m in none_set: return False if query["hasperm"] and not m.guild_permissions.is_superset(minimum_perms): return False if query["anyperm"] and not any( bool(value and perm in query["anyperm"]) for perm, value in iter(m.guild_permissions) ): return False if query["notperm"] and any( bool(value and perm in query["notperm"]) for perm, value in iter(m.guild_permissions) ): return False if query["noroles"] and len(m.roles) != 1: return False # 0 is a valid option for these, everyone role not counted if query["quantity"] is not None and len(m.roles) - 1 != query["quantity"]: return False if query["lt"] is not None and len(m.roles) - 1 >= query["lt"]: return False if query["gt"] is not None and len(m.roles) - 1 <= query["gt"]: return False if query["above"] and m.top_role <= query["above"]: return False if query["below"] and m.top_role >= query["below"]: return False return True members = {m for m in members if mfilter(m)} return members @mrole.command(name="user") async def mrole_user( self, ctx: GuildContext, users: commands.Greedy[discord.Member], *, _query: RoleSyntaxConverter, ) -> None: """ adds/removes roles to one or more users You cannot add and remove the same role Example Usage: [p]massrole user Sinbad --add RoleToGive "Role with spaces to give" --remove RoleToRemove "some other role to remove" Somethirdrole [p]massrole user LoudMouthedUser ProfaneUser --add muted For role operations based on role membership, permissions had, or whether someone is a bot (or even just add to/remove from all) see `[p]massrole search` and `[p]massrole modify` """ query = _query.parsed apply = query["add"] + query["remove"] if not await self.all_are_valid_roles(ctx, *apply): await ctx.send("Either you or I don't have the required permissions " "or position in the hierarchy.") return for user in users: await self.update_roles_atomically(who=user, give=query["add"], remove=query["remove"]) await ctx.tick() @mrole.command(name="search") async def mrole_search(self, ctx: GuildContext, *, _query: ComplexSearchConverter): """ Searches for users with the specified role criteria --has-all roles --has-none roles --has-any roles --has-no-roles --has-exactly-nroles number --has-more-than-nroles number --has-less-than-nroles number --has-perm permissions --any-perm permissions --not-perm permissions --above role --below role --only-humans --only-bots --everyone --csv csv output will be used if output would exceed embed limits, or if flag is provided """ members = set(ctx.guild.members) query = _query.parsed members = self.search_filter(members, query) if len(members) < 50 and not query["csv"]: def chunker(memberset, size=3): ret_str = "" for i, m in enumerate(memberset, 1): ret_str += m.mention if i % size == 0: ret_str += "\n" else: ret_str += " " return ret_str description = chunker(members) embed = discord.Embed(description=description) if ctx.guild: embed.color = ctx.guild.me.color await ctx.send( embed=embed, content=f"Search results for {ctx.author.mention}", allowed_mentions=discord.AllowedMentions.all(), ) else: await self.send_maybe_chunked_csv(ctx, list(members)) @staticmethod async def send_maybe_chunked_csv(ctx: GuildContext, members): chunk_size = 75000 chunks = [members[i : (i + chunk_size)] for i in range(0, len(members), chunk_size)] for part, chunk in enumerate(chunks, 1): csvf = io.StringIO() fieldnames = [ "ID", "Display Name", "Username#Discrim", "Joined Server", "Joined Discord", ] fmt = "%Y-%m-%d" writer = csv.DictWriter(csvf, fieldnames=fieldnames) writer.writeheader() for member in chunk: writer.writerow( { "ID": member.id, "Display Name": member.display_name, "Username#Discrim": str(member), "Joined Server": member.joined_at.strftime(fmt) if member.joined_at else None, "Joined Discord": member.created_at.strftime(fmt), } ) csvf.seek(0) b_data = csvf.read().encode() data = io.BytesIO(b_data) data.seek(0) filename = f"{ctx.message.id}" if len(chunks) > 1: filename += f"-part{part}" filename += ".csv" await ctx.send( content=f"Data for {ctx.author.mention}", files=[discord.File(data, filename=filename)], allowed_mentions=discord.AllowedMentions.all(), ) csvf.close() data.close() del csvf del data @mrole.command(name="modify") async def mrole_complex(self, ctx: GuildContext, *, _query: ComplexActionConverter): """ Similar syntax to search, while applying/removing roles --has-all roles --has-none roles --has-any roles --has-no-roles --has-exactly-nroles number --has-more-than-nroles number --has-less-than-nroles number --has-perm permissions --any-perm permissions --not-perm permissions --above role --below role --only-humans --only-bots --everyone --add roles --remove roles """ query = _query.parsed apply = query["add"] + query["remove"] if not await self.all_are_valid_roles(ctx, *apply): return await ctx.send( "Either you or I don't have the required permissions " "or position in the hierarchy." ) members = set(ctx.guild.members) members = self.search_filter(members, query) if len(members) > 100: await ctx.send("This may take a while given the number of members to update.") async with ctx.typing(): for member in members: try: await self.update_roles_atomically(who=member, give=query["add"], remove=query["remove"]) except RoleManagementException: log.debug( "Internal filter failure on member id %d guild id %d query %s", member.id, ctx.guild.id, query, ) except discord.HTTPException: log.debug( "Unpredicted failure for member id %d in guild id %d query %s", member.id, ctx.guild.id, query, ) await ctx.tick()