from redbot.core.utils.chat_formatting import * from redbot.core.utils.mod import is_mod_or_superior from redbot.core import Config, checks, commands, modlog import discord import discord import asyncio import re import time from datetime import datetime, timedelta from typing import Literal from .time_utils import * # May need to not save on every reaction add if it causes too much lag class ReactPoll(commands.Cog): """Create polls using emoji reactions""" def __init__(self, bot): super().__init__() self.bot = bot self.poll_sessions = {} self.config = Config.get_conf(self, identifier=9675846083, force_registration=True) self.config.register_global(poll_sessions={}) self.load_task = asyncio.create_task(self.load_polls()) self.poll_task = asyncio.create_task(self.poll_closer()) def cog_unload(self): if self.load_task: self.load_task.cancel() if self.poll_task: self.poll_task.cancel() async def poll_closer(self): await self.bot.wait_until_ready() while True: await asyncio.sleep(5) now_time = time.time() for poll in self.poll_sessions.values(): if poll.end_time <= now_time: await poll.endPoll() await self.delete_poll(poll) async def delete_poll(self, poll): async with self.config.poll_sessions() as polls: try: del polls[str(poll.channel.id)] except: pass async def store_poll(self, poll): async with self.config.poll_sessions() as polls: polls[str(poll.channel.id)] = poll.as_dict() async def load_polls(self): polls = await self.config.poll_sessions() if not polls: await self.config.poll_sessions.set({}) return else: for poll in polls.values(): load_poll = LoadedPoll(self, poll) if not load_poll.channel: await self.delete_poll(load_poll) continue load_poll.message = await load_poll.channel.fetch_message(load_poll.message) if load_poll.valid: self.poll_sessions[str(load_poll.channel.id)] = load_poll else: await self.delete_poll(load_poll) @commands.command() @commands.guild_only() @checks.bot_has_permissions(manage_messages=True) async def rpoll(self, ctx, *text): """Starts/stops a reaction poll Usage example (time argument is optional) [p]rpoll question;option1;option2...;t= [p]rpoll stop Durations look like (must be greater than 10 seconds): 15s 5 minutes 1 minute 30 seconds 1 hour 2 days 5h30m times look like: February 14 at 6pm EDT 2019-04-13 06:43:00 PST 01/20/18 at 21:00:43 times default to UTC if no timezone provided. """ message = ctx.message channel = message.channel guild = message.guild if len(text) == 1: if text[0].lower() == "stop": await self.endpoll(message, ctx) return if not self.getPollByChannel(message): p = NewReactPoll(message=message, text=escape(" ".join(text), mass_mentions=True), main=self) if p.valid: self.poll_sessions[str(channel.id)] = p await p.start() await self.store_poll(p) else: await ctx.send_help() else: await ctx.send("A reaction poll is already ongoing in this channel.") async def endpoll(self, message, ctx): if self.getPollByChannel(message): p = self.getPollByChannel(message) if p.author == message.author.id or is_mod_or_superior(self.bot, message.author): await p.endPoll() else: await ctx.send("Only admins and the author can stop the poll.") else: await ctx.send("There's no reaction poll ongoing in this channel.") def getPollByChannel(self, message): try: return self.poll_sessions[str(message.channel.id)] except KeyError: return False async def check_poll_votes(self, message): if message.author.id != self.bot.user.id: if self.getPollByChannel(message): self.getPollByChannel(message).checkAnswer(message) @commands.Cog.listener() async def on_raw_reaction_add(self, payload): # parse payload guild = self.bot.get_guild(payload.guild_id) if not guild: return if await self.bot.cog_disabled_in_guild(self, guild): return user = guild.get_member(payload.user_id) message = await self.bot.get_channel(payload.channel_id).fetch_message(payload.message_id) # Listener is required to remove bad reactions if user == self.bot.user or not guild: return # Don't remove bot's own reactions emoji = payload.emoji p = self.getPollByChannel(message) if p: if message.id == p.message.id and emoji.is_unicode_emoji() and emoji.name in p.emojis: # Valid reaction if str(user.id) not in p.already_voted: # First vote p.already_voted[str(user.id)] = str(emoji) else: # Allow subsequent vote but remove the previous await message.remove_reaction(p.already_voted[str(user.id)], user) p.already_voted[str(user.id)] = str(emoji) await self.store_poll(p) return # remove any other reaction emojis that arent valid elif message.id == p.message.id and (emoji.is_custom_emoji() or emoji.name not in p.emojis): await message.remove_reaction(emoji, user) def cog_unload(self): self.poll_task.cancel() class NewReactPoll: def __init__(self, message=None, text=None, main=None): self.channel = message.channel self.author = message.author.id self.client = main.bot self.main = main self.poll_sessions = main.poll_sessions self.duration = 60 # Default duration msg = [ans.strip() for ans in text.split(";")] # Detect optional duration parameter if len(msg[-1].strip().split("t=")) == 2: dur_s = msg[-1].strip().split("t=")[1] dur = parse_timedelta(dur_s) if not dur: try: dur = parse_time(dur_s) - datetime.utcnow() except: dur = None if dur and dur.total_seconds() > 5: self.duration = int(dur.total_seconds()) else: self.duration = 60 msg.pop() else: self.duration = 60 # Reaction poll supports maximum of 9 answers and minimum of 2 if len(msg) < 2 or len(msg) > 10: self.valid = False return None else: self.valid = True self.end_time = time.time() + self.duration self.already_voted = {} self.question = msg[0] msg.remove(self.question) self.answers = {} # Made this a dict to make my life easier for now self.emojis = [] i = 1 # Starting codepoint for keycap number emojis (\u0030... == 0) base_emoji = [ord("\u0030"), ord("\u20E3")] for answer in msg: # {id : {answer, votes}} base_emoji[0] += 1 self.emojis.append(chr(base_emoji[0]) + chr(base_emoji[1])) answer = self.emojis[i - 1] + " " + answer self.answers[str(i)] = {"ANSWER": answer, "VOTES": 0} i += 1 self.message = None def as_dict(self): return { "author": self.author, "channel": self.channel.id, "message": self.message.id, "question": self.question, "answers": self.answers, "emojis": self.emojis, "end_time": self.end_time, "already_voted": self.already_voted, } async def start(self): msg = "**POLL STARTED!**\n\n{}\n\n".format(self.question) for id, data in self.answers.items(): msg += "{}\n".format(data["ANSWER"]) end_time = datetime.utcnow() + timedelta(seconds=self.duration) if self.duration // 60 < 1: # less than a minute conj = "in" dur = int(self.duration) unit = "seconds" elif self.duration // 60 >= 1 and self.duration // 3600 < 1: # between 1 minute and 1 hour conj = "in" dur = int(self.duration // 60) unit = "minutes" if self.duration // 60 > 1 else "minute" elif self.duration // 3600 >= 1 and self.duration // 86400 < 1: # 1 hour and 1 day conj = "in" dur = int(self.duration // 3600) unit = "hours" if self.duration // 3600 > 1 else "hour" elif self.duration // 86400 == 1: conj = "in" dur = 1 unit = "day" else: conj = "on" dur = str(end_time.strftime("%m/%d/%Y at %I:%M%p") + " UTC") unit = "" msg += "\nSelect the number to vote!" "\nPoll closes {} {} {}.".format(conj, dur, unit) self.message = await self.channel.send(msg) for emoji in self.emojis: await self.message.add_reaction(emoji) await asyncio.sleep(0.5) async def endPoll(self): self.valid = False if not self.message: # poll message deleted del self.poll_sessions[str(self.channel.id)] await self.main.delete_poll(self) await self.channel.send("Poll message not found! Deleting poll data.") return # Need a fresh message object self.message = await self.channel.fetch_message(self.message.id) msg = "**POLL ENDED!**\n\n{}\n\n".format(self.question) for reaction in self.message.reactions: if reaction.emoji in self.emojis: self.answers[str(ord(reaction.emoji[0]) - 48)]["VOTES"] = reaction.count - 1 await self.message.clear_reactions() cur_max = 0 # Track the winning number of votes # Double iteration probably not the fastest way, but works for now for data in self.answers.values(): if data["VOTES"] > cur_max: cur_max = data["VOTES"] for data in self.answers.values(): if cur_max > 0 and data["VOTES"] == cur_max: msg += "**{} - {} votes**\n".format(data["ANSWER"], str(data["VOTES"])) else: msg += "*{}* - {} votes\n".format(data["ANSWER"], str(data["VOTES"])) await self.channel.send(msg) del self.poll_sessions[str(self.channel.id)] await self.main.delete_poll(self) async def red_delete_data_for_user( self, *, requester: Literal["discord_deleted_user", "owner", "user", "user_strict"], user_id: int, ): pass class LoadedPoll(NewReactPoll): """A reaction poll loaded from disk""" def __init__(self, main, data): self.main = main self.client = main.bot self.poll_sessions = main.poll_sessions self.author = data["author"] self.channel = self.client.get_channel(data["channel"]) self.message = data["message"] self.question = data["question"] self.answers = data["answers"] self.emojis = data["emojis"] self.end_time = data["end_time"] self.already_voted = data["already_voted"] if self.end_time <= time.time(): self.valid = False else: self.valid = True