modifed rolemanagement cog from sinbad to have subscriptions for roles

This commit is contained in:
brandons209 2020-01-30 03:10:04 -05:00
parent e55b3b2802
commit 217fe41183
10 changed files with 2088 additions and 0 deletions

View file

@ -0,0 +1,7 @@
from .core import RoleManagement
def setup(bot):
cog = RoleManagement(bot)
bot.add_cog(cog)
cog.init()

52
rolemanagement/abc.py Normal file
View file

@ -0,0 +1,52 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class MixinMeta(ABC):
"""
Metaclass for well behaved type hint detection with composite class.
"""
# https://github.com/python/mypy/issues/1996
def __init__(self, *_args):
self.config: Config
self.bot: Red
@abstractmethod
def strip_variations(self, s: str) -> str:
raise NotImplementedError()
@abstractmethod
async def wait_for_ready(self) -> None:
raise NotImplementedError()
@abstractmethod
async def is_self_assign_eligible(
self, who: discord.Member, role: discord.Role
) -> List[discord.Role]:
raise NotImplementedError()
@abstractmethod
async def update_roles_atomically(
self,
*,
who: discord.Member,
give: Optional[List[discord.Role]] = None,
remove: Optional[List[discord.Role]] = None,
):
raise NotImplementedError()
@abstractmethod
async def all_are_valid_roles(self, ctx, *roles: discord.Role) -> bool:
raise NotImplementedError()
@abstractmethod
async def maybe_update_guilds(self, *guilds: discord.Guild) -> None:
raise NotImplementedError()

View file

@ -0,0 +1,239 @@
import argparse
import shlex
from typing import Optional, List, NamedTuple, Dict
from redbot.core.commands import RoleConverter, Context, BadArgument
import discord
_RoleConverter = RoleConverter()
class NoExitParser(argparse.ArgumentParser):
def error(self, message):
raise BadArgument()
class RoleSyntaxConverter(NamedTuple):
parsed: Dict[str, List[discord.Role]]
@classmethod
async def convert(cls, ctx: Context, argument: str):
parser = NoExitParser(
description="Role management syntax help", add_help=False, allow_abbrev=True
)
parser.add_argument("--add", nargs="*", dest="add", default=[])
parser.add_argument("--remove", nargs="*", dest="remove", default=[])
try:
vals = vars(parser.parse_args(shlex.split(argument)))
except Exception:
raise BadArgument()
if not vals["add"] and not vals["remove"]:
raise BadArgument("Must provide at least one action")
for attr in ("add", "remove"):
vals[attr] = [await _RoleConverter.convert(ctx, r) for r in vals[attr]]
return cls(vals)
class ComplexActionConverter(NamedTuple):
"""
--has-all roles
--has-none roles
--has-any roles
--has-no-roles
--has-exactly-nroles
--has-more-than-nroles
--has-less-than-nroles
--has-perm permissions
--any-perm permissions
--not-perm permissions
--above role
--below role
--add roles
--remove roles
--only-humans
--only-bots
--everyone
"""
parsed: dict
@classmethod
async def convert(cls, ctx: Context, argument: str):
parser = NoExitParser(description="Role management syntax help", add_help=False)
parser.add_argument("--has-any", nargs="*", dest="any", default=[])
parser.add_argument("--has-all", nargs="*", dest="all", default=[])
parser.add_argument("--has-none", nargs="*", dest="none", default=[])
parser.add_argument(
"--has-no-roles", action="store_true", default=False, dest="noroles"
)
parser.add_argument("--has-perms", nargs="*", dest="hasperm", default=[])
parser.add_argument("--any-perm", nargs="*", dest="anyperm", default=[])
parser.add_argument("--not-perm", nargs="*", dest="notperm", default=[])
parser.add_argument("--add", nargs="*", dest="add", default=[])
parser.add_argument("--remove", nargs="*", dest="remove", default=[])
parser.add_argument("--has-exactly-nroles", dest="quantity", type=int)
parser.add_argument("--has-more-than-nroles", dest="gt", type=int, default=None)
parser.add_argument("--has-less-than-nroles", dest="lt", type=int, default=None)
parser.add_argument("--above", dest="above", type=str, default=None)
parser.add_argument("--below", dest="below", type=str, default=None)
hum_or_bot = parser.add_mutually_exclusive_group()
hum_or_bot.add_argument(
"--only-humans", action="store_true", default=False, dest="humans"
)
hum_or_bot.add_argument(
"--only-bots", action="store_true", default=False, dest="bots"
)
hum_or_bot.add_argument(
"--everyone", action="store_true", default=False, dest="everyone"
)
try:
vals = vars(parser.parse_args(shlex.split(argument)))
except Exception:
raise BadArgument()
if not vals["add"] and not vals["remove"]:
raise BadArgument("Must provide at least one action")
if not any(
(
vals["humans"],
vals["everyone"],
vals["bots"],
vals["any"],
vals["all"],
vals["none"],
vals["hasperm"],
vals["notperm"],
vals["anyperm"],
vals["noroles"],
bool(vals["quantity"] is not None),
bool(vals["gt"] is not None),
bool(vals["lt"] is not None),
vals["above"],
vals["below"],
)
):
raise BadArgument("You need to provide at least 1 search criterion")
for attr in ("any", "all", "none", "add", "remove"):
vals[attr] = [await _RoleConverter.convert(ctx, r) for r in vals[attr]]
for attr in ("below", "above"):
if vals[attr] is None:
continue
vals[attr] = await _RoleConverter.convert(ctx, vals[attr])
for attr in ("hasperm", "anyperm", "notperm"):
vals[attr] = [
i.replace("_", " ").lower().replace(" ", "_").replace("server", "guild")
for i in vals[attr]
]
if any(perm not in dir(discord.Permissions) for perm in vals[attr]):
raise BadArgument("You gave an invalid permission")
return cls(vals)
class ComplexSearchConverter(NamedTuple):
"""
--has-all roles
--has-none roles
--has-any roles
--has-no-roles
--has-exactly-nroles
--has-more-than-nroles
--has-less-than-nroles
--only-humans
--only-bots
--above role
--below role
--has-perm permissions
--any-perm permissions
--not-perm permissions
--everyone
--csv
"""
parsed: dict
@classmethod
async def convert(cls, ctx: Context, argument: str):
parser = NoExitParser(description="Role management syntax help", add_help=False)
parser.add_argument("--has-any", nargs="*", dest="any", default=[])
parser.add_argument("--has-all", nargs="*", dest="all", default=[])
parser.add_argument("--has-none", nargs="*", dest="none", default=[])
parser.add_argument(
"--has-no-roles", action="store_true", default=False, dest="noroles"
)
parser.add_argument("--has-perms", nargs="*", dest="hasperm", default=[])
parser.add_argument("--any-perm", nargs="*", dest="anyperm", default=[])
parser.add_argument("--not-perm", nargs="*", dest="notperm", default=[])
parser.add_argument("--csv", action="store_true", default=False)
parser.add_argument(
"--has-exactly-nroles", dest="quantity", type=int, default=None
)
parser.add_argument("--has-more-than-nroles", dest="gt", type=int, default=None)
parser.add_argument("--has-less-than-nroles", dest="lt", type=int, default=None)
parser.add_argument("--above", dest="above", type=str, default=None)
parser.add_argument("--below", dest="below", type=str, default=None)
hum_or_bot = parser.add_mutually_exclusive_group()
hum_or_bot.add_argument(
"--only-humans", action="store_true", default=False, dest="humans"
)
hum_or_bot.add_argument(
"--only-bots", action="store_true", default=False, dest="bots"
)
hum_or_bot.add_argument(
"--everyone", action="store_true", default=False, dest="everyone"
)
try:
vals = vars(parser.parse_args(shlex.split(argument)))
except Exception:
raise BadArgument()
if not any(
(
vals["humans"],
vals["everyone"],
vals["bots"],
vals["any"],
vals["all"],
vals["none"],
vals["hasperm"],
vals["notperm"],
vals["anyperm"],
vals["noroles"],
bool(vals["quantity"] is not None),
bool(vals["gt"] is not None),
bool(vals["lt"] is not None),
vals["above"],
vals["below"],
)
):
raise BadArgument("You need to provide at least 1 search criterion")
for attr in ("any", "all", "none"):
vals[attr] = [await _RoleConverter.convert(ctx, r) for r in vals[attr]]
for attr in ("below", "above"):
if vals[attr] is None:
continue
vals[attr] = await _RoleConverter.convert(ctx, vals[attr])
for attr in ("hasperm", "anyperm", "notperm"):
vals[attr] = [
i.replace("_", " ").lower().replace(" ", "_").replace("server", "guild")
for i in vals[attr]
]
if any(perm not in dir(discord.Permissions) for perm in vals[attr]):
raise BadArgument("You gave an invalid permission")
return cls(vals)

976
rolemanagement/core.py Normal file
View file

@ -0,0 +1,976 @@
from __future__ import annotations
import contextlib
import asyncio
import re
import time
from abc import ABCMeta
from typing import AsyncIterator, Tuple, Optional, Union, List, Dict
import discord
from discord.ext.commands import CogMeta as DPYCogMeta
from redbot.core import checks, commands, bank
from redbot.core.config import Config
from redbot.core.utils.chat_formatting import box, pagify, warning
from .events import EventMixin
from .exceptions import RoleManagementException, PermissionOrHierarchyException
from .massmanager import MassManagementMixin
from .utils import UtilMixin, variation_stripper_re, parse_timedelta, parse_seconds
try:
from redbot.core.commands import GuildContext
except ImportError:
from redbot.core.commands import Context as GuildContext # type: ignore
# This previously used ``(type(commands.Cog), type(ABC))``
# This was changed to be explicit so that mypy
# would be slightly happier about it.
# This does introduce a potential place this
# can break in the future, but this would be an
# Upstream breaking change announced in advance
class CompositeMetaClass(DPYCogMeta, ABCMeta):
"""
This really only exists because of mypy
wanting mixins to be individually valid classes.
"""
pass # MRO is fine on __new__ with super() use
# no need to manually ensure both get handled here.
MIN_SUB_TIME = 3600
SLEEP_TIME = 300
class RoleManagement(
UtilMixin,
MassManagementMixin,
EventMixin,
commands.Cog,
metaclass=CompositeMetaClass,
):
"""
Cog for role management
"""
__author__ = "mikeshardmind(Sinbad), DiscordLiz"
__version__ = "323.1.4"
def format_help_for_context(self, ctx):
pre_processed = super().format_help_for_context(ctx)
return f"{pre_processed}\nCog Version: {self.__version__}"
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(
self, identifier=78631113035100160, force_registration=True
)
self.config.register_global(
handled_variation=False, handled_full_str_emoji=False
)
self.config.register_role(
exclusive_to=[],
requires_any=[],
requires_all=[],
sticky=False,
self_removable=False,
self_role=False,
protected=False,
cost=0,
subscription=0,
subscribed_users={}
)#subscribed_users maps str(user.id)-> end time in unix timestamp
self.config.register_member(roles=[], forbidden=[])
self.config.init_custom("REACTROLE", 2)
self.config.register_custom(
"REACTROLE", roleid=None, channelid=None, guildid=None
) # ID : Message.id, str(React)
self.config.register_guild(notify_channel=None, s_roles=[], free_roles=[])
self._ready = asyncio.Event()
self._start_task: Optional[asyncio.Task] = None
self.loop = asyncio.get_event_loop()
self._sub_task = self.loop.create_task(self.sub_checker())
super().__init__()
def cog_unload(self):
if self._start_task:
self._start_task.cancel()
if self._sub_task:
self._sub_task.cancel()
def init(self):
self._start_task = asyncio.create_task(self.initialization())
self._start_task.add_done_callback(lambda f: f.result())
async def initialization(self):
data: Dict[str, Dict[str, Dict[str, Union[int, bool, List[int]]]]]
await self.bot.wait_until_red_ready()
if not await self.config.handled_variation():
data = await self.config.custom("REACTROLE").all()
to_adjust = {}
for message_id, emojis_to_data in data.items():
for emoji_key in emojis_to_data:
new_key, c = variation_stripper_re.subn("", emoji_key)
if c:
to_adjust[(message_id, emoji_key)] = new_key
for (message_id, emoji_key), new_key in to_adjust.items():
data[message_id][new_key] = data[message_id][emoji_key]
data[message_id].pop(emoji_key, None)
await self.config.custom("REACTROLE").set(data)
await self.config.handled_variation.set(True)
if not await self.config.handled_full_str_emoji():
data = await self.config.custom("REACTROLE").all()
to_adjust = {}
pattern = re.compile(r"^(<?a?:)?([A-Za-z0-9_]+):([0-9]+)(\:?>?)$")
# Am not a fan....
for message_id, emojis_to_data in data.items():
for emoji_key in emojis_to_data:
new_key, c = pattern.subn(r"\3", emoji_key)
if c:
to_adjust[(message_id, emoji_key)] = new_key
for (message_id, emoji_key), new_key in to_adjust.items():
data[message_id][new_key] = data[message_id][emoji_key]
data[message_id].pop(emoji_key, None)
await self.config.custom("REACTROLE").set(data)
await self.config.handled_full_str_emoji.set(True)
self._ready.set()
async def wait_for_ready(self):
await self._ready.wait()
async def cog_before_invoke(self, ctx):
await self.wait_for_ready()
if ctx.guild:
await self.maybe_update_guilds(ctx.guild)
# makes it a bit more readable
async def sub_helper(self, guild, role, role_data):
for user_id in list(role_data["subscribed_users"].keys()):
end_time = role_data["subscribed_users"][user_id]
now_time = time.time()
if end_time <= now_time:
member = guild.get_member(int(user_id))
if not member: # clean absent members
del role_data["subscribed_users"][user_id]
continue
# charge user
cost = await self.config.role(role).cost()
currency_name = await bank.get_currency_name(guild)
curr_sub = await self.config.role(role).subscription()
if cost == 0 or curr_sub == 0:
# role is free now or sub is removed, remove stale sub
del role_data["subscribed_users"][user_id]
continue
msg = f"Hello! You are being charged {cost} {currency_name} for your subscription to the {role.name} role in {guild.name}."
try:
await bank.withdraw_credits(member, cost)
msg += f"\n\nNo further action is required! You'll be charged again in {parse_seconds(curr_sub)}."
role_data["subscribed_users"][user_id] = now_time + curr_sub
except ValueError: # user is poor
msg += f"\n\nHowever, you do not have enough {currency_name} to cover the subscription. The role will be removed."
await self.update_roles_atomically(who=member, remove=[role])
del role_data["subscribed_users"][user_id]
try:
await member.send(msg)
except:
# trys to send in system channel, if that fails then
# send message in first channel bot can speak in
channel = guild.system_channel
msg += f"\n\n{member.mention} make sure to allow receiving DM's from server members so I can DM you this message!"
if channel.permissions_for(channel.guild.me).send_messages:
await channel.send(msg)
else:
for channel in guild.text_channels:
if channel.permissions_for(channel.guild.me).send_messages:
await channel.send(msg)
break
return role_data
async def sub_checker(self):
await self.wait_for_ready()
while True:
await asyncio.sleep(SLEEP_TIME)
for guild in self.bot.guilds:
async with self.config.guild(guild).s_roles() as s_roles:
for role_id in reversed(s_roles):
role = guild.get_role(role_id)
if not role: # clean stale subs if role is deleted
s_roles.remove(role_id)
continue
role_data = await self.config.role(role).all()
role_data = await self.sub_helper(guild, role, role_data)
await self.config.role(role).subscribed_users.set(
role_data["subscribed_users"]
)
if len(role_data["subscribed_users"]) == 0:
s_roles.remove(role_id)
@commands.guild_only()
@commands.bot_has_permissions(manage_roles=True)
@checks.admin_or_permissions(manage_roles=True)
@commands.command(name="hackrole")
async def hackrole(self, ctx: GuildContext, user_id: int, *, role: discord.Role):
"""
Puts a stickyrole on someone not in the server.
"""
if not await self.all_are_valid_roles(ctx, role):
return await ctx.maybe_send_embed(
"Can't do that. Discord role heirarchy applies here."
)
if not await self.config.role(role).sticky():
return await ctx.send("This only works on sticky roles.")
member = ctx.guild.get_member(user_id)
if member:
try:
await self.update_roles_atomically(who=member, give=[role])
except PermissionOrHierarchyException:
await ctx.send("Can't, somehow")
else:
await ctx.maybe_send_embed("They are in the guild...assigned anyway.")
else:
async with self.config.member_from_ids(
ctx.guild.id, user_id
).roles() as sticky:
if role.id not in sticky:
sticky.append(role.id)
await ctx.tick()
@checks.is_owner()
@commands.command(name="rrcleanup", hidden=True)
async def rolemanagementcleanup(self, ctx: GuildContext):
""" :eyes: """
data = await self.config.custom("REACTROLE").all()
key_data = {}
for maybe_message_id, maybe_data in data.items():
try:
message_id = int(maybe_message_id)
except ValueError:
continue
ex_keys = list(maybe_data.keys())
if not ex_keys:
continue
message = None
channel_id = maybe_data[ex_keys[0]]["channelid"]
channel = ctx.bot.get_channel(channel_id)
if channel:
with contextlib.suppress(discord.HTTPException):
assert isinstance(channel, discord.TextChannel) # nosec
message = await channel.fetch_message(message_id)
if not message:
key_data.update({maybe_message_id: ex_keys})
for mid, keys in key_data.items():
for k in keys:
await self.config.custom("REACTROLE", mid, k).clear()
await ctx.tick()
@commands.guild_only()
@commands.bot_has_permissions(manage_roles=True)
@checks.admin_or_permissions(manage_guild=True)
@commands.command(name="rolebind")
async def bind_role_to_reactions(
self,
ctx: GuildContext,
role: discord.Role,
channel: discord.TextChannel,
msgid: int,
emoji: str,
):
"""
Binds a role to a reaction on a message...
The role is only given if the criteria for it are met.
Make sure you configure the other settings for a role in [p]roleset
"""
if not await self.all_are_valid_roles(ctx, role):
return await ctx.maybe_send_embed(
"Can't do that. Discord role heirarchy applies here."
)
try:
message = await channel.fetch_message(msgid)
except discord.HTTPException:
return await ctx.maybe_send_embed("No such message")
_emoji: Optional[Union[discord.Emoji, str]]
_emoji = discord.utils.find(lambda e: str(e) == emoji, self.bot.emojis)
if _emoji is None:
try:
await ctx.message.add_reaction(emoji)
except discord.HTTPException:
return await ctx.maybe_send_embed("No such emoji")
else:
_emoji = emoji
eid = self.strip_variations(emoji)
else:
eid = str(_emoji.id)
if not any(str(r) == emoji for r in message.reactions):
try:
await message.add_reaction(_emoji)
except discord.HTTPException:
return await ctx.maybe_send_embed(
"Hmm, that message couldn't be reacted to"
)
cfg = self.config.custom("REACTROLE", str(message.id), eid)
await cfg.set(
{
"roleid": role.id,
"channelid": message.channel.id,
"guildid": role.guild.id,
}
)
await ctx.send(
f"Remember, the reactions only function according to "
f"the rules set for the roles using `{ctx.prefix}roleset`",
delete_after=30,
)
@commands.guild_only()
@commands.bot_has_permissions(manage_roles=True)
@checks.admin_or_permissions(manage_guild=True)
@commands.command(name="roleunbind")
async def unbind_role_from_reactions(
self, ctx: commands.Context, role: discord.Role, msgid: int, emoji: str
):
"""
unbinds a role from a reaction on a message
"""
if not await self.all_are_valid_roles(ctx, role):
return await ctx.maybe_send_embed(
"Can't do that. Discord role heirarchy applies here."
)
await self.config.custom(
"REACTROLE", f"{msgid}", self.strip_variations(emoji)
).clear()
await ctx.tick()
@commands.guild_only()
@commands.bot_has_permissions(manage_roles=True)
@checks.admin_or_permissions(manage_guild=True)
@commands.group(name="roleset", autohelp=True)
async def rgroup(self, ctx: GuildContext):
"""
Settings for role requirements
"""
pass
@rgroup.command(name="viewreactions")
async def rg_view_reactions(self, ctx: GuildContext):
"""
View the reactions enabled for the server
"""
# This design is intentional for later extention to view this per role
use_embeds = await ctx.embed_requested()
react_roles = "\n".join(
[
msg
async for msg in self.build_messages_for_react_roles(
*ctx.guild.roles, use_embeds=use_embeds
)
]
)
if not react_roles:
return await ctx.send("No react roles bound here.")
# ctx.send is already going to escape said mentions if any somehow get generated
# should also not be possible to do so without willfully being done by an admin.
color = await ctx.embed_colour() if use_embeds else None
for page in pagify(
react_roles, escape_mass_mentions=False, page_length=1800, shorten_by=0
):
# unrolling iterative calling of ctx.maybe_send_embed
if use_embeds:
await ctx.send(embed=discord.Embed(description=page, color=color))
else:
await ctx.send(page)
@rgroup.command(name="viewrole")
async def rg_view_role(self, ctx: GuildContext, *, role: discord.Role):
"""
Views the current settings for a role
"""
rsets = await self.config.role(role).all()
output = (
f"This role:\n{'is' if rsets['self_role'] else 'is not'} self assignable"
f"\n{'is' if rsets['self_removable'] else 'is not'} self removable"
f"\n{'is' if rsets['sticky'] else 'is not'} sticky."
)
if rsets["requires_any"]:
rstring = ", ".join(
r.name for r in ctx.guild.roles if r.id in rsets["requires_any"]
)
output += f"\nThis role requires any of the following roles: {rstring}"
if rsets["requires_all"]:
rstring = ", ".join(
r.name for r in ctx.guild.roles if r.id in rsets["requires_all"]
)
output += f"\nThis role requires all of the following roles: {rstring}"
if rsets["exclusive_to"]:
rstring = ", ".join(
r.name for r in ctx.guild.roles if r.id in rsets["exclusive_to"]
)
output += (
f"\nThis role is mutually exclusive to the following roles: {rstring}"
)
if rsets["cost"]:
curr = await bank.get_currency_name(ctx.guild)
cost = rsets["cost"]
output += f"\nThis role costs {cost} {curr}"
else:
output += "\nThis role does not have an associated cost."
if rsets["subscription"]:
s = rsets["subscription"]
output += f"\nThis role has a subscription time of: {parse_seconds(s)}"
for page in pagify(output):
await ctx.send(page)
@rgroup.command(name="cost")
async def make_purchasable(
self, ctx: GuildContext, cost: int, *, role: discord.Role
):
"""
Makes a role purchasable for a specified cost.
Cost must be a number greater than 0.
A cost of exactly 0 can be used to remove purchasability.
Purchase eligibility still follows other rules including self assignable.
Warning: If these roles are bound to a reaction,
it will be possible to gain these without paying.
"""
if not await self.all_are_valid_roles(ctx, role):
return await ctx.maybe_send_embed(
"Can't do that. Discord role heirarchy applies here."
)
if cost < 0:
return await ctx.send_help()
await self.config.role(role).cost.set(cost)
if cost == 0:
await ctx.send(f"{role.name} is no longer purchasable.")
else:
await ctx.send(f"{role.name} is purchasable for {cost}")
@rgroup.command(name="subscription")
async def subscription(self, ctx, role: discord.Role, *, interval: str):
"""
Sets a role to be a subscription, must set cost first.
Will charge role's cost every interval, and remove the role if they run out of money
Set to 0 to disable
**__Minimum subscription duration is 1 hour__**
Intervals look like:
5 minutes
1 minute 30 seconds
1 hour
2 days
30 days
5h30m
(etc)
"""
if not await self.all_are_valid_roles(ctx, role):
return await ctx.maybe_send_embed(
"Can't do that. Discord role heirarchy applies here."
)
role_cost = await self.config.role(role).cost()
if role_cost == 0:
await ctx.send(waring("Please set a cost for the role first."))
return
time = parse_timedelta(interval)
if int(time.total_seconds()) == 0:
await ctx.send("Subscription removed.")
async with self.config.guild(ctx.guild).s_roles() as s:
s.remove(role.id)
return
elif int(time.total_seconds()) < MIN_SUB_TIME:
await ctx.send("Subscriptions must be 1 hour or longer.")
return
await self.config.role(role).subscription.set(int(time.total_seconds()))
async with self.config.guild(ctx.guild).s_roles() as s:
s.append(role.id)
await ctx.send(f"Subscription set to {parse_seconds(time.total_seconds())}.")
@rgroup.command(name="forbid")
async def forbid_role(
self, ctx: GuildContext, role: discord.Role, *, user: discord.Member
):
"""
Forbids a user from gaining a specific role.
"""
async with self.config.member(user).forbidden() as fb:
if role.id not in fb:
fb.append(role.id)
else:
await ctx.send("Role was already forbidden")
await ctx.tick()
@rgroup.command(name="unforbid")
async def unforbid_role(
self, ctx: GuildContext, role: discord.Role, *, user: discord.Member
):
"""
Unforbids a user from gaining a specific role.
"""
async with self.config.member(user).forbidden() as fb:
if role.id in fb:
fb.remove(role.id)
else:
await ctx.send("Role was not forbidden")
await ctx.tick()
@rgroup.command(name="exclusive")
async def set_exclusivity(self, ctx: GuildContext, *roles: discord.Role):
"""
Takes 2 or more roles and sets them as exclusive to eachother
"""
_roles = set(roles)
if len(_roles) < 2:
return await ctx.send("You need to provide at least 2 roles")
for role in _roles:
async with self.config.role(role).exclusive_to() as ex_list:
ex_list.extend(
[r.id for r in _roles if r != role and r.id not in ex_list]
)
await ctx.tick()
@rgroup.command(name="unexclusive")
async def unset_exclusivity(self, ctx: GuildContext, *roles: discord.Role):
"""
Takes any number of roles, and removes their exclusivity settings
"""
_roles = set(roles)
if not _roles:
return await ctx.send("You need to provide at least a role to do this to")
for role in _roles:
ex_list = await self.config.role(role).exclusive_to()
ex_list = [idx for idx in ex_list if idx not in [r.id for r in _roles]]
await self.config.role(role).exclusive_to.set(ex_list)
await ctx.tick()
@rgroup.command(name="sticky")
async def setsticky(
self, ctx: GuildContext, role: discord.Role, sticky: bool = None
):
"""
sets a role as sticky if used without a settings, gets the current ones
"""
if sticky is None:
is_sticky = await self.config.role(role).sticky()
return await ctx.send(
"{role} {verb} sticky".format(
role=role.name, verb=("is" if is_sticky else "is not")
)
)
await self.config.role(role).sticky.set(sticky)
if sticky:
for m in role.members:
async with self.config.member(m).roles() as rids:
if role.id not in rids:
rids.append(role.id)
await ctx.tick()
#TODO set roles who don't need to pay for roles
@rgroup.command(name="requireall")
async def reqall(self, ctx: GuildContext, role: discord.Role, *roles: discord.Role):
"""
Sets the required roles to gain a role
Takes a role plus zero or more other roles (as requirements for the first)
"""
rids = [r.id for r in roles]
await self.config.role(role).requires_all.set(rids)
await ctx.tick()
@rgroup.command(name="requireany")
async def reqany(self, ctx: GuildContext, role: discord.Role, *roles: discord.Role):
"""
Sets a role to require already having one of another
Takes a role plus zero or more other roles (as requirements for the first)
"""
rids = [r.id for r in (roles or [])]
await self.config.role(role).requires_any.set(rids)
await ctx.tick()
@rgroup.command(name="selfrem")
async def selfrem(
self, ctx: GuildContext, role: discord.Role, removable: bool = None
):
"""
Sets if a role is self-removable (default False)
use without a setting to view current
"""
if removable is None:
is_removable = await self.config.role(role).self_removable()
return await ctx.send(
"{role} {verb} self-removable".format(
role=role.name, verb=("is" if is_removable else "is not")
)
)
await self.config.role(role).self_removable.set(removable)
await ctx.tick()
@rgroup.command(name="selfadd")
async def selfadd(
self, ctx: GuildContext, role: discord.Role, assignable: bool = None
):
"""
Sets if a role is self-assignable via command
(default False)
use without a setting to view current
"""
if assignable is None:
is_assignable = await self.config.role(role).self_role()
return await ctx.send(
"{role} {verb} self-assignable".format(
role=role.name, verb=("is" if is_assignable else "is not")
)
)
await self.config.role(role).self_role.set(assignable)
await ctx.tick()
@rgroup.group(name="freerole")
async def free_roles(self, ctx: GuildContext):
"""
Sets roles that bypass costs for purchasing roles in your guild.
"""
pass
@free_roles.command(name="add")
async def free_roles_add(self, ctx: GuildContext, *, role: discord.Role):
"""
Add a role to the free list.
"""
async with self.config.guild(ctx.guild).free_roles() as free_roles:
if role.id not in free_roles:
free_roles.append(role.id)
await ctx.tick()
@free_roles.command(name="rem")
async def free_roles_rem(self, ctx: GuildContext, *, role: discord.Role):
"""
Remove a role from the free list.
"""
async with self.config.guild(ctx.guild).free_roles() as free_roles:
try:
free_roles.remove(role.id)
except:
await ctx.send("Role not in free list!")
return
await ctx.tick()
@free_roles.command(name="list")
async def free_roles_list(self, ctx: GuildContext):
"""
List free roles.
"""
roles = await self.config.guild(ctx.guild).free_roles()
if not roles:
await ctx.send("No roles defined.")
return
roles = [ctx.guild.get_role(role) for role in roles]
missing = len([role for role in roles if role is None])
roles = [f"{i+1}.{role.name}" for i, role in enumerate(roles) if role is not None]
msg = "\n".join(sorted(roles))
msg = pagify(msg)
for m in msg:
await ctx.send(box(m))
@checks.bot_has_permissions(manage_roles=True)
@commands.guild_only()
@commands.group(name="srole", autohelp=True)
async def srole(self, ctx: GuildContext):
"""
Self assignable role commands
"""
pass
@srole.command(name="list")
async def srole_list(self, ctx: GuildContext):
"""
Lists the selfroles and any associated costs.
"""
MYPY = False
if MYPY:
# remove this when mypy supports type narrowing from :=
# It's less efficient, so not removing the actual
# implementation below
data: Dict[discord.Role, tuple] = {}
for role_id, vals in (await self.config.all_roles()).items():
role = ctx.guild.get_role(role_id)
if role and vals["self_role"]:
data[role] = vals["cost"]
else:
data = {
role: (vals["cost"], vals["subscription"])
for role_id, vals in (await self.config.all_roles()).items()
if (role := ctx.guild.get_role(role_id)) and vals["self_role"]
}
if not data:
return await ctx.send("There aren't any self roles here.")
embed = discord.Embed(title="Roles", colour=ctx.guild.me.colour)
i = 0
for role, (cost, sub) in sorted(data.items(), key=lambda kv: kv[1]):
embed.add_field(
name=f"__**{i+1}. {role.name}**__",
value="%s%s" % ((f"Cost: {cost}" if cost else "Free"), (f", every {parse_seconds(sub)}" if sub else ""))
)
i += 1
if i % 25 == 0:
await ctx.send(embed=embed)
await ctx.send(embed=embed)
@srole.command(name="buy")
async def srole_buy(self, ctx: GuildContext, *, role: discord.Role):
"""
Purchase a role
"""
if role in ctx.author.roles:
await ctx.send("You already have that role.")
return
try:
remove = await self.is_self_assign_eligible(ctx.author, role)
eligible = await self.config.role(role).self_role()
cost = await self.config.role(role).cost()
subscription = await self.config.role(role).subscription()
except RoleManagementException:
return
except PermissionOrHierarchyException:
await ctx.send(
"I cannot assign roles which I can not manage. (Discord Hierarchy)"
)
else:
if not eligible:
return await ctx.send(
f"You aren't allowed to add `{role}` to yourself {ctx.author.mention}!"
)
if not cost:
return await ctx.send(
"This role doesn't have a cost. Please try again using `[p]srole add`."
)
free_roles = await self.config.guild(ctx.guild).free_roles()
currency_name = await bank.get_currency_name(ctx.guild)
for m_role in ctx.author.roles:
if m_role.id in free_roles:
await ctx.send(f"You're special, no {currency_name} will be deducted from your account.")
await self.update_roles_atomically(
who=ctx.author, give=[role], remove=remove
)
await ctx.tick()
return
try:
await bank.withdraw_credits(ctx.author, cost)
except ValueError:
return await ctx.send(
f"You don't have enough {currency_name} (Cost: {cost} {currency_name})"
)
else:
if subscription > 0:
await ctx.send(f"{role.name} will be renewed every {parse_seconds(subscription)}")
async with self.config.role(role).subscribed_users() as s:
s[str(ctx.author.id)] = time.time() + subscription
async with self.config.guild(ctx.guild).s_roles() as s:
if role.id not in s:
s.append(role.id)
await self.update_roles_atomically(
who=ctx.author, give=[role], remove=remove
)
await ctx.tick()
@srole.command(name="add")
async def sadd(self, ctx: GuildContext, *, role: discord.Role):
"""
Join a role
"""
if role in ctx.author.roles:
await ctx.send("You already have that role.")
return
try:
remove = await self.is_self_assign_eligible(ctx.author, role)
eligible = await self.config.role(role).self_role()
cost = await self.config.role(role).cost()
except RoleManagementException:
return
except PermissionOrHierarchyException:
await ctx.send(
"I cannot assign roles which I can not manage. (Discord Hierarchy)"
)
else:
if not eligible:
await ctx.send(
f"You aren't allowed to add `{role}` to yourself {ctx.author.mention}!"
)
elif cost:
await ctx.send(
"This role is not free. "
"Please use `[p]srole buy` if you would like to purchase it."
)
else:
await self.update_roles_atomically(
who=ctx.author, give=[role], remove=remove
)
await ctx.tick()
@srole.command(name="remove")
async def srem(self, ctx: GuildContext, *, role: discord.Role):
"""
leave a role
"""
if role not in ctx.author.roles:
await ctx.send("You do not have that role.")
return
if await self.config.role(role).self_removable():
await self.update_roles_atomically(who=ctx.author, remove=[role])
try: # remove subscription, if any
async with self.config.role(role).subscribed_users() as s:
del s[str(ctx.author.id)]
except:
pass
await ctx.tick()
else:
await ctx.send(
f"You aren't allowed to remove `{role}` from yourself {ctx.author.mention}!`"
)
# Stuff for clean interaction with react role entries
async def build_messages_for_react_roles(
self, *roles: discord.Role, use_embeds=True
) -> AsyncIterator[str]:
"""
Builds info.
Info is suitable for passing to embeds if use_embeds is True
"""
linkfmt = (
"[message #{message_id}](https://discordapp.com/channels/{guild_id}/{channel_id}/{message_id})"
if use_embeds
else "<https://discordapp.com/channels/{guild_id}/{channel_id}/{message_id}>"
)
for role in roles:
# pylint: disable=E1133
async for message_id, emoji_info, data in self.get_react_role_entries(role):
channel_id = data.get("channelid", None)
if channel_id:
link = linkfmt.format(
guild_id=role.guild.id,
channel_id=channel_id,
message_id=message_id,
)
else:
link = (
f"unknown message with id {message_id}"
f" (use `roleset fixup` to find missing data for this)"
)
emoji: Union[discord.Emoji, str]
if emoji_info.isdigit():
emoji = (
discord.utils.get(self.bot.emojis, id=int(emoji_info))
or f"A custom enoji with id {emoji_info}"
)
else:
emoji = emoji_info
react_m = f"{role.name} is bound to {emoji} on {link}"
yield react_m
async def get_react_role_entries(
self, role: discord.Role
) -> AsyncIterator[Tuple[str, str, dict]]:
"""
yields:
str, str, dict
first str: message id
second str: emoji id or unicode codepoint
dict: data from the corresponding:
config.custom("REACTROLE", messageid, emojiid)
"""
# self.config.register_custom(
# "REACTROLE", roleid=None, channelid=None, guildid=None
# ) # ID : Message.id, str(React)
data = await self.config.custom("REACTROLE").all()
for mid, _outer in data.items():
if not _outer or not isinstance(_outer, dict):
continue
for em, rdata in _outer.items():
if rdata and rdata["roleid"] == role.id:
yield (mid, em, rdata)

162
rolemanagement/events.py Normal file
View file

@ -0,0 +1,162 @@
from __future__ import annotations
from datetime import datetime, timedelta
from typing import List
import discord
from redbot.core import commands
from .abc import MixinMeta
from .exceptions import RoleManagementException, PermissionOrHierarchyException
class EventMixin(MixinMeta):
def verification_level_issue(self, member: discord.Member) -> bool:
"""
Returns True if this would bypass verification level settings
prevent react roles from bypassing time limits.
It's exceptionally dumb that users can react while
restricted by verification level, but that's Discord.
They block reacting to blocked users, but interacting
with entire guilds by reaction before hand? A-OK. *eyerolls*
Can't check the email/2FA, blame discord for allowing people to react with above.
"""
guild: discord.Guild = member.guild
now = datetime.utcnow()
level: int = guild.verification_level.value
if level >= 3 and member.created_at + timedelta(minutes=5) > now: # medium
return True
if level >= 4: # high
if not member.joined_at or member.joined_at + timedelta(minutes=10) > now:
return True
return False
@commands.Cog.listener()
async def on_member_update(self, before: discord.Member, after: discord.Member):
"""
DEP-WARN
Section has been optimized assuming member._roles
remains an iterable containing snowflakes
"""
await self.wait_for_ready()
if before._roles == after._roles:
return
lost, gained = set(before._roles), set(after._roles)
lost, gained = lost - gained, gained - lost
sym_diff = lost | gained
for r in sym_diff:
if not await self.config.role_from_id(r).sticky():
lost.discard(r)
gained.discard(r)
async with self.config.member(after).roles() as rids:
for r in lost:
while r in rids:
rids.remove(r)
for r in gained:
if r not in rids:
rids.append(r)
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member):
await self.wait_for_ready()
guild = member.guild
if not guild.me.guild_permissions.manage_roles:
return
async with self.config.member(member).roles() as rids:
to_add: List[discord.Role] = []
for _id in rids:
role = discord.utils.get(guild.roles, id=_id)
if not role:
continue
if await self.config.role(role).sticky():
to_add.append(role)
if to_add:
to_add = [r for r in to_add if r < guild.me.top_role]
await member.add_roles(*to_add)
@commands.Cog.listener()
async def on_raw_reaction_add(
self, payload: discord.raw_models.RawReactionActionEvent
):
await self.wait_for_ready()
if not payload.guild_id:
return
emoji = payload.emoji
if emoji.is_custom_emoji():
eid = str(emoji.id)
else:
eid = self.strip_variations(str(emoji))
cfg = self.config.custom("REACTROLE", str(payload.message_id), eid)
rid = await cfg.roleid()
if rid is None or not await self.config.role_from_id(rid).self_role():
return
guild = self.bot.get_guild(payload.guild_id)
if guild:
await self.maybe_update_guilds(guild)
else:
return
member = guild.get_member(payload.user_id)
if member is None or member.bot:
return
if self.verification_level_issue(member):
return
role = guild.get_role(rid)
if role is None or role in member.roles:
return
try:
remove = await self.is_self_assign_eligible(member, role)
except (RoleManagementException, PermissionOrHierarchyException):
pass
else:
await self.update_roles_atomically(who=member, give=[role], remove=remove)
@commands.Cog.listener()
async def on_raw_reaction_remove(
self, payload: discord.raw_models.RawReactionActionEvent
):
await self.wait_for_ready()
if not payload.guild_id:
return
emoji = payload.emoji
if emoji.is_custom_emoji():
eid = str(emoji.id)
else:
eid = self.strip_variations(str(emoji))
cfg = self.config.custom("REACTROLE", str(payload.message_id), eid)
rid = await cfg.roleid()
if rid is None:
return
if await self.config.role_from_id(rid).self_removable():
guild = self.bot.get_guild(payload.guild_id)
if not guild:
# Where's it go?
return
member = guild.get_member(payload.user_id)
if not member or member.bot:
return
role = discord.utils.get(guild.roles, id=rid)
if not role or role not in member.roles:
return
if guild.me.guild_permissions.manage_roles and guild.me.top_role > role:
await self.update_roles_atomically(who=member, give=None, remove=[role])

View file

@ -0,0 +1,20 @@
from __future__ import annotations
class RoleManagementException(Exception):
pass
class PermissionOrHierarchyException(Exception):
pass
class MissingRequirementsException(RoleManagementException):
def __init__(self, *, miss_any=None, miss_all=None):
self.miss_all = miss_all or []
self.miss_any = miss_any or []
super().__init__()
class ConflictingRoleException(RoleManagementException):
def __init__(self, *, conflicts=None):
self.conflicts = conflicts or []
super().__init__()

View file

@ -0,0 +1,72 @@
# Below is planned schema for SQLite handling and expansion of functionality.
"""
CREATE TABLE IF NOT EXISTS roles (
role_id INTEGER PRIMARY KEY NOT NULL,
self_role BOOLEAN DEFAULT FALSE,
sticky BOOLEAN DEFAULT FALSE,
self_removable BOOLEAN DEFAULT FALSE,
-- useful for preventing pre 10 minute bypass
-- and just for keeping it to people who have been around a bit
minimum_join_time INTEGER DEFAULT 0,
cost INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS groups (
uid INTEGER AUTOINCREMENT,
name TEXT,
minimum INTEGER DEFAULT 0,
maximum INTEGER DEFAULT NULL
);
CREATE TABLE IF NOT EXISTS exclusions (
role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE,
blocks_role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS requires (
role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE,
requires_role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS actions (
message_id INTEGER NOT NULL,
reaction TEXT NOT NULL -- unicode emoji or str(discord.Emoji.id),
channel_id INTEGER NOT NULL,
guild_id INTEGER NOT NULL,
reaction_id INTEGER REFERENCES reactions(uid) ON DELETE CASCADE,
action_type INTEGER, -- handle as enum from python
role_id INTEGER REFERENCES roles(role_id)
);
CREATE TABLE IF NOT EXISTS members (
member_id INTEGER NOT NULL,
guild_id INTEGER NOT NULL,
kaizo_locked BOOLEAN DEFAULT FALSE,
kaizo_kicked BOOLEAN DEFAULT FALSE,
kaizo_banned BOOLEAN DEFAULT FALSE
);
CREATE TABLE IF NOT EXISTS sticky_roles (
member_id INTEGER NOT NULL, -- intentional no ref
role_id INTEGER REFERENCES roles(role_id) ON DELETE CASCADE,
is_stickied BOOLEAN DEFAULT FALSE
);
"""
# Below are some action types
import enum
class ActionType(enum.IntEnum):
TOGGLE = 1
ADD = 2
REMOVE = 3
INVERTED_TOGGLE = 4
# anti auto react bot measures
# A subset of spam bots appear to be attempting to auto react
# to things which appear to be verification channels
# (As of at least September 2019)
KAIZO_LOCK = 5
KAIZO_KICK = 6
KAIZO_BAN = 7

20
rolemanagement/info.json Normal file
View file

@ -0,0 +1,20 @@
{
"author": [
"mikeshardmind(Sinbad)",
"DiscordLiz"
],
"install_msg": "If you need help, I have a channel in https://discord.gg/mb85deu",
"name": "RoleManagement",
"disabled": false,
"short": "Role searches, reactroles, requirements for roles, etc.",
"description": "React roles with requirements, selfroles with requirements, role based member search and more.",
"tags": [
"react_roles",
"react roles",
"rolemanagement",
"role search",
"massrole"
],
"hidden": false,
"min_bot_version": "3.2.0a0.dev1"
}

View file

@ -0,0 +1,338 @@
import csv
import io
import logging
from typing import Optional, cast, Set
import discord
from redbot.core import checks, commands
from .abc import MixinMeta
from .converters import (
RoleSyntaxConverter,
ComplexActionConverter,
ComplexSearchConverter,
)
from .exceptions import RoleManagementException
try:
from redbot.core.commands import GuildContext
except ImportError:
from redbot.core.commands import Context as GuildContext # type: ignore
log = logging.getLogger("red.sinbadcogs.rolemanagement.massmanager")
class MassManagementMixin(MixinMeta):
"""
Mass role operations
"""
@commands.guild_only()
@checks.admin_or_permissions(manage_roles=True)
@commands.group(name="massrole", autohelp=True, aliases=["mrole"])
async def mrole(self, ctx: GuildContext):
"""
Commands for mass role management
"""
pass
@staticmethod
def search_filter(members: set, query: dict) -> set:
"""
Reusable
"""
if query["everyone"]:
return members
all_set: Set[discord.Member] = set()
if query["all"]:
first, *rest = query["all"]
all_set = set(first.members)
for other_role in rest:
all_set &= set(other_role.members)
none_set: Set[discord.Member] = set()
if query["none"]:
for role in query["none"]:
none_set.update(role.members)
any_set: Set[discord.Member] = set()
if query["any"]:
for role in query["any"]:
any_set.update(role.members)
minimum_perms: Optional[discord.Permissions] = None
if query["hasperm"]:
minimum_perms = discord.Permissions()
minimum_perms.update(**{x: True for x in query["hasperm"]})
def mfilter(m: discord.Member) -> bool:
if query["bots"] and not m.bot:
return False
if query["humans"] and m.bot:
return False
if query["any"] and m not in any_set:
return False
if query["all"] and m not in all_set:
return False
if query["none"] and m in none_set:
return False
if query["hasperm"] and not m.guild_permissions.is_superset(minimum_perms):
return False
if query["anyperm"] and not any(
bool(value and perm in query["anyperm"])
for perm, value in iter(m.guild_permissions)
):
return False
if query["notperm"] and any(
bool(value and perm in query["notperm"])
for perm, value in iter(m.guild_permissions)
):
return False
if query["noroles"] and len(m.roles) != 1:
return False
# 0 is a valid option for these, everyone role not counted
if query["quantity"] is not None and len(m.roles) - 1 != query["quantity"]:
return False
if query["lt"] is not None and len(m.roles) - 1 >= query["lt"]:
return False
if query["gt"] is not None and len(m.roles) - 1 <= query["gt"]:
return False
if query["above"] and m.top_role <= query["above"]:
return False
if query["below"] and m.top_role >= query["below"]:
return False
return True
members = {m for m in members if mfilter(m)}
return members
@mrole.command(name="user")
async def mrole_user(
self,
ctx: GuildContext,
users: commands.Greedy[discord.Member],
*,
_query: RoleSyntaxConverter,
) -> None:
"""
adds/removes roles to one or more users
You cannot add and remove the same role
Example Usage:
[p]massrole user Sinbad --add RoleToGive "Role with spaces to give"
--remove RoleToRemove "some other role to remove" Somethirdrole
[p]massrole user LoudMouthedUser ProfaneUser --add muted
For role operations based on role membership, permissions had, or whether someone is a bot
(or even just add to/remove from all) see `[p]massrole search` and `[p]massrole modify`
"""
query = _query.parsed
apply = query["add"] + query["remove"]
if not await self.all_are_valid_roles(ctx, *apply):
await ctx.send(
"Either you or I don't have the required permissions "
"or position in the hierarchy."
)
return
for user in users:
await self.update_roles_atomically(
who=user, give=query["add"], remove=query["remove"]
)
await ctx.tick()
@mrole.command(name="search")
async def mrole_search(self, ctx: GuildContext, *, _query: ComplexSearchConverter):
"""
Searches for users with the specified role criteria
--has-all roles
--has-none roles
--has-any roles
--has-no-roles
--has-exactly-nroles number
--has-more-than-nroles number
--has-less-than-nroles number
--has-perm permissions
--any-perm permissions
--not-perm permissions
--above role
--below role
--only-humans
--only-bots
--everyone
--csv
csv output will be used if output would exceed embed limits, or if flag is provided
"""
members = set(ctx.guild.members)
query = _query.parsed
members = self.search_filter(members, query)
if len(members) < 50 and not query["csv"]:
def chunker(memberset, size=3):
ret_str = ""
for i, m in enumerate(memberset, 1):
ret_str += m.mention
if i % size == 0:
ret_str += "\n"
else:
ret_str += " "
return ret_str
description = chunker(members)
embed = discord.Embed(description=description)
if ctx.guild:
embed.color = ctx.guild.me.color
await ctx.send(
embed=embed, content=f"Search results for {ctx.author.mention}"
)
else:
await self.send_maybe_chunked_csv(ctx, list(members))
@staticmethod
async def send_maybe_chunked_csv(ctx: GuildContext, members):
chunk_size = 75000
chunks = [
members[i : (i + chunk_size)] for i in range(0, len(members), chunk_size)
]
for part, chunk in enumerate(chunks, 1):
csvf = io.StringIO()
fieldnames = [
"ID",
"Display Name",
"Username#Discrim",
"Joined Server",
"Joined Discord",
]
fmt = "%Y-%m-%d"
writer = csv.DictWriter(csvf, fieldnames=fieldnames)
writer.writeheader()
for member in chunk:
writer.writerow(
{
"ID": member.id,
"Display Name": member.display_name,
"Username#Discrim": str(member),
"Joined Server": member.joined_at.strftime(fmt)
if member.joined_at
else None,
"Joined Discord": member.created_at.strftime(fmt),
}
)
csvf.seek(0)
b_data = csvf.read().encode()
data = io.BytesIO(b_data)
data.seek(0)
filename = f"{ctx.message.id}"
if len(chunks) > 1:
filename += f"-part{part}"
filename += ".csv"
await ctx.send(
content=f"Data for {ctx.author.mention}",
files=[discord.File(data, filename=filename)],
)
csvf.close()
data.close()
del csvf
del data
@mrole.command(name="modify")
async def mrole_complex(self, ctx: GuildContext, *, _query: ComplexActionConverter):
"""
Similar syntax to search, while applying/removing roles
--has-all roles
--has-none roles
--has-any roles
--has-no-roles
--has-exactly-nroles number
--has-more-than-nroles number
--has-less-than-nroles number
--has-perm permissions
--any-perm permissions
--not-perm permissions
--above role
--below role
--only-humans
--only-bots
--everyone
--add roles
--remove roles
"""
query = _query.parsed
apply = query["add"] + query["remove"]
if not await self.all_are_valid_roles(ctx, *apply):
return await ctx.send(
"Either you or I don't have the required permissions "
"or position in the hierarchy."
)
members = set(ctx.guild.members)
members = self.search_filter(members, query)
if len(members) > 100:
await ctx.send(
"This may take a while given the number of members to update."
)
async with ctx.typing():
for member in members:
try:
await self.update_roles_atomically(
who=member, give=query["add"], remove=query["remove"]
)
except RoleManagementException:
log.debug(
"Internal filter failure on member id %d guild id %d query %s",
member.id,
ctx.guild.id,
query,
)
except discord.HTTPException:
log.debug(
"Unpredicted failure for member id %d in guild id %d query %s",
member.id,
ctx.guild.id,
query,
)
await ctx.tick()

202
rolemanagement/utils.py Normal file
View file

@ -0,0 +1,202 @@
from __future__ import annotations
import re
from typing import List
from datetime import timedelta
import discord
from .abc import MixinMeta
from .exceptions import (
ConflictingRoleException,
MissingRequirementsException,
PermissionOrHierarchyException,
)
variation_stripper_re = re.compile(r"[\ufe00-\ufe0f]")
TIME_RE_STRING = r"\s?".join(
[
r"((?P<weeks>\d+?)\s?(weeks?|w))?",
r"((?P<days>\d+?)\s?(days?|d))?",
r"((?P<hours>\d+?)\s?(hours?|hrs|hr?))?",
r"((?P<minutes>\d+?)\s?(minutes?|mins?|m(?!o)))?", # prevent matching "months"
r"((?P<seconds>\d+?)\s?(seconds?|secs?|s))?",
]
)
TIME_RE = re.compile(TIME_RE_STRING, re.I)
def parse_timedelta(argument: str) -> timedelta:
"""
Parses a string that contains a time interval and converts it to a timedelta object.
"""
matches = TIME_RE.match(argument)
if matches:
params = {k: int(v) for k, v in matches.groupdict().items() if v}
if params:
return timedelta(**params)
return None
def parse_seconds(seconds) -> str:
"""
Take seconds and converts it to larger units
Returns parsed message string
"""
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
weeks, days = divmod(days, 7)
months, weeks = divmod(weeks, 4)
msg = []
if months:
msg.append(f"{int(months)} {'months' if months > 1 else 'month'}")
if weeks:
msg.append(f"{int(weeks)} {'weeks' if weeks > 1 else 'week'}")
if days:
msg.append(f"{int(days)} {'days' if days > 1 else 'day'}")
if hours:
msg.append(f"{int(hours)} {'hours' if hours > 1 else 'hour'}")
if minutes:
msg.append(f"{int(minutes)} {'minutes' if minutes > 1 else 'minute'}")
if seconds:
msg.append(f"{int(seconds)} {'seconds' if seconds > 1 else 'second'}")
return ", ".join(msg)
class UtilMixin(MixinMeta):
"""
Mixin for utils, some of which need things stored in the class
"""
def strip_variations(self, s: str) -> str:
"""
Normalizes emoji, removing variation selectors
"""
return variation_stripper_re.sub("", s)
async def update_roles_atomically(
self,
*,
who: discord.Member,
give: List[discord.Role] = None,
remove: List[discord.Role] = None,
):
"""
Give and remove roles as a single op with some slight sanity
wrapping
"""
me = who.guild.me
give = give or []
remove = remove or []
heirarchy_testing = give + remove
roles = [r for r in who.roles if r not in remove]
roles.extend([r for r in give if r not in roles])
if sorted(roles) == sorted(who.roles):
return
if (
any(r >= me.top_role for r in heirarchy_testing)
or not me.guild_permissions.manage_roles
):
raise PermissionOrHierarchyException("Can't do that.")
await who.edit(roles=roles)
async def all_are_valid_roles(self, ctx, *roles: discord.Role) -> bool:
"""
Quick heirarchy check on a role set in syntax returned
"""
author = ctx.author
guild = ctx.guild
# Author allowed
if not (
(guild.owner == author)
or all(author.top_role > role for role in roles)
or await ctx.bot.is_owner(ctx.author)
):
return False
# Bot allowed
if not (
guild.me.guild_permissions.manage_roles
and (
guild.me == guild.owner
or all(guild.me.top_role > role for role in roles)
)
):
return False
# Sanity check on managed roles
if any(role.managed for role in roles):
return False
return True
async def is_self_assign_eligible(
self, who: discord.Member, role: discord.Role
) -> List[discord.Role]:
"""
Returns a list of roles to be removed if this one is added, or raises an
exception
"""
await self.check_required(who, role)
ret: List[discord.Role] = await self.check_exclusivity(who, role)
forbidden = await self.config.member(who).forbidden()
if role.id in forbidden:
raise PermissionOrHierarchyException()
guild = who.guild
if not guild.me.guild_permissions.manage_roles or role > guild.me.top_role:
raise PermissionOrHierarchyException()
return ret
async def check_required(self, who: discord.Member, role: discord.Role) -> None:
"""
Raises an error on missing reqs
"""
req_any = await self.config.role(role).requires_any()
req_any_fail = req_any[:]
if req_any:
for idx in req_any:
if who._roles.has(idx):
req_any_fail = []
break
req_all_fail = [
idx
for idx in await self.config.role(role).requires_all()
if not who._roles.has(idx)
]
if req_any_fail or req_all_fail:
raise MissingRequirementsException(
miss_all=req_all_fail, miss_any=req_any_fail
)
return None
async def check_exclusivity(
self, who: discord.Member, role: discord.Role
) -> List[discord.Role]:
"""
Returns a list of roles to remove, or raises an error
"""
data = await self.config.all_roles()
ex = data.get(role.id, {}).get("exclusive_to", [])
conflicts: List[discord.Role] = [r for r in who.roles if r.id in ex]
for r in conflicts:
if not data.get(r.id, {}).get("self_removable", False):
raise ConflictingRoleException(conflicts=conflicts)
return conflicts
async def maybe_update_guilds(self, *guilds: discord.Guild):
_guilds = [g for g in guilds if not g.unavailable and g.large and not g.chunked]
if _guilds:
await self.bot.request_offline_members(*_guilds)