2022-03-08 09:51:04 +13:00
|
|
|
import asyncio
|
|
|
|
import logging
|
|
|
|
import re
|
|
|
|
import time
|
|
|
|
from copy import deepcopy
|
|
|
|
from typing import Dict, List, Mapping, Optional, Tuple, Union, cast
|
|
|
|
|
|
|
|
import aiohttp
|
|
|
|
import discord
|
|
|
|
from discord.ext.commands.converter import Converter
|
|
|
|
from discord.ext.commands.errors import BadArgument
|
|
|
|
|
|
|
|
from redbot.core import Config, VersionInfo, commands, version_info
|
|
|
|
from redbot.core.bot import Red
|
|
|
|
from redbot.core.i18n import Translator
|
|
|
|
|
|
|
|
from .errors import GoogleTranslateAPIError
|
|
|
|
from .flags import FLAGS
|
|
|
|
|
|
|
|
BASE_URL = "https://translation.googleapis.com"
|
|
|
|
_ = Translator("Translate", __file__)
|
|
|
|
log = logging.getLogger("red.trusty-cogs.Translate")
|
|
|
|
|
|
|
|
FLAG_REGEX = re.compile(r"|".join(rf"{re.escape(f)}" for f in FLAGS.keys()))
|
|
|
|
|
|
|
|
|
|
|
|
class FlagTranslation(Converter):
|
|
|
|
"""
|
|
|
|
This will convert flags and languages to the correct code to be used by the API
|
|
|
|
|
|
|
|
Guidance code on how to do this from:
|
|
|
|
https://github.com/Rapptz/discord.py/blob/rewrite/discord/ext/commands/converter.py#L85
|
|
|
|
https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/develop/redbot/cogs/mod/mod.py#L24
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
async def convert(self, ctx: commands.Context, argument: str) -> List[str]:
|
|
|
|
result = []
|
2022-03-13 19:39:48 +13:00
|
|
|
args = [s.strip() for s in argument.split(",")]
|
|
|
|
for arg in args:
|
|
|
|
r = None
|
|
|
|
if arg in FLAGS:
|
|
|
|
r = FLAGS[arg]["code"].upper()
|
|
|
|
else:
|
|
|
|
for lang in FLAGS:
|
|
|
|
if FLAGS[lang]["name"].lower() in arg.lower():
|
|
|
|
r = FLAGS[lang]["code"]
|
|
|
|
break
|
|
|
|
if FLAGS[lang]["country"].lower() in arg.lower():
|
|
|
|
r = FLAGS[lang]["code"]
|
|
|
|
break
|
|
|
|
if not FLAGS[lang]["code"]:
|
|
|
|
continue
|
|
|
|
if FLAGS[lang]["code"] in arg.lower() and len(arg) == 2:
|
|
|
|
r = FLAGS[lang]["code"]
|
|
|
|
break
|
|
|
|
if r is None:
|
|
|
|
raise BadArgument('Language "{}" not found'.format(arg))
|
|
|
|
result.append(r)
|
|
|
|
|
|
|
|
# filter out repeated languages
|
|
|
|
result = list(set(result))
|
2022-03-08 09:51:04 +13:00
|
|
|
if not result:
|
2022-03-13 19:39:48 +13:00
|
|
|
raise BadArgument('Language(s) "{}" not found'.format(argument))
|
2022-03-08 09:51:04 +13:00
|
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
class GoogleTranslateAPI:
|
|
|
|
config: Config
|
|
|
|
bot: Red
|
|
|
|
cache: dict
|
|
|
|
_key: Optional[str]
|
|
|
|
_guild_counter: Dict[int, Dict[str, int]]
|
|
|
|
_global_counter: Dict[str, int]
|
|
|
|
|
|
|
|
def __init__(self, *_args):
|
|
|
|
self.config: Config
|
|
|
|
self.bot: Red
|
|
|
|
self.cache: dict
|
|
|
|
self._key: Optional[str]
|
|
|
|
self._guild_counter: Dict[int, Dict[str, int]]
|
|
|
|
self._global_counter: Dict[str, int]
|
|
|
|
|
|
|
|
async def cleanup_cache(self) -> None:
|
|
|
|
if version_info >= VersionInfo.from_str("3.2.0"):
|
|
|
|
await self.bot.wait_until_red_ready()
|
|
|
|
else:
|
|
|
|
await self.bot.wait_until_ready()
|
|
|
|
while self is self.bot.get_cog("Translate"):
|
|
|
|
# cleanup the cache every 10 minutes
|
|
|
|
self.cache["translations"] = []
|
|
|
|
await asyncio.sleep(600)
|
|
|
|
|
|
|
|
async def save_usage(self) -> None:
|
|
|
|
if version_info >= VersionInfo.from_str("3.2.0"):
|
|
|
|
await self.bot.wait_until_red_ready()
|
|
|
|
else:
|
|
|
|
await self.bot.wait_until_ready()
|
|
|
|
while self is self.bot.get_cog("Translate"):
|
|
|
|
# Save usage stats every couple minutes
|
|
|
|
await self._save_usage_stats()
|
|
|
|
await asyncio.sleep(120)
|
|
|
|
|
|
|
|
async def _save_usage_stats(self):
|
|
|
|
async with self.config.count() as count:
|
|
|
|
for key, value in self._global_counter.items():
|
|
|
|
count[key] = value
|
|
|
|
for guild_id, data in self._guild_counter.items():
|
|
|
|
async with self.config.guild_from_id(guild_id).count() as count:
|
|
|
|
for key, value in data.items():
|
|
|
|
count[key] = value
|
|
|
|
|
|
|
|
async def add_detect(self, guild: Optional[discord.Guild]):
|
|
|
|
if guild:
|
|
|
|
log.debug(f"adding detect to {guild.name}")
|
|
|
|
if guild.id not in self._guild_counter:
|
|
|
|
self._guild_counter[guild.id] = await self.config.guild(guild).count()
|
|
|
|
self._guild_counter[guild.id]["detect"] += 1
|
|
|
|
if not self._global_counter:
|
|
|
|
self._global_counter = await self.config.count()
|
|
|
|
self._global_counter["detect"] += 1
|
|
|
|
|
|
|
|
async def add_requests(self, guild: Optional[discord.Guild], message: str):
|
|
|
|
if guild:
|
|
|
|
log.debug(f"Adding requests to {guild.name}")
|
|
|
|
if guild.id not in self._guild_counter:
|
|
|
|
self._guild_counter[guild.id] = await self.config.guild(guild).count()
|
|
|
|
self._guild_counter[guild.id]["requests"] += 1
|
|
|
|
self._guild_counter[guild.id]["characters"] += len(message)
|
|
|
|
if not self._global_counter:
|
|
|
|
self._global_counter = await self.config.count()
|
|
|
|
self._global_counter["requests"] += 1
|
|
|
|
self._global_counter["characters"] += len(message)
|
|
|
|
|
|
|
|
async def _get_google_api_key(self) -> Optional[str]:
|
|
|
|
key = {}
|
|
|
|
if not self._key:
|
|
|
|
try:
|
|
|
|
key = await self.bot.get_shared_api_tokens("google_translate")
|
|
|
|
except AttributeError:
|
|
|
|
# Red 3.1 support
|
|
|
|
key = await self.bot.db.api_tokens.get_raw("google_translate", default={})
|
|
|
|
self._key = key.get("api_key")
|
|
|
|
return self._key
|
|
|
|
|
|
|
|
async def _bw_list_cache_update(self, guild: discord.Guild) -> None:
|
|
|
|
self.cache["guild_blacklist"][guild.id] = await self.config.guild(guild).blacklist()
|
|
|
|
self.cache["guild_whitelist"][guild.id] = await self.config.guild(guild).whitelist()
|
|
|
|
|
|
|
|
async def check_bw_list(
|
2022-03-28 21:08:34 +13:00
|
|
|
self,
|
|
|
|
guild: discord.Guild,
|
|
|
|
channel: discord.TextChannel,
|
|
|
|
member: Union[discord.Member, discord.User],
|
2022-03-08 09:51:04 +13:00
|
|
|
) -> bool:
|
|
|
|
can_run = True
|
|
|
|
if guild.id not in self.cache["guild_blacklist"]:
|
|
|
|
self.cache["guild_blacklist"][guild.id] = await self.config.guild(guild).blacklist()
|
|
|
|
if guild.id not in self.cache["guild_whitelist"]:
|
|
|
|
self.cache["guild_whitelist"][guild.id] = await self.config.guild(guild).whitelist()
|
|
|
|
whitelist = self.cache["guild_whitelist"][guild.id]
|
|
|
|
blacklist = self.cache["guild_blacklist"][guild.id]
|
|
|
|
if whitelist:
|
|
|
|
can_run = False
|
|
|
|
if channel.id in whitelist:
|
|
|
|
can_run = True
|
|
|
|
if channel.category_id and channel.category_id in whitelist:
|
|
|
|
can_run = True
|
|
|
|
if member.id in whitelist:
|
|
|
|
can_run = True
|
|
|
|
for role in getattr(member, "roles", []):
|
|
|
|
if role.is_default():
|
|
|
|
continue
|
|
|
|
if role.id in whitelist:
|
|
|
|
can_run = True
|
|
|
|
return can_run
|
|
|
|
else:
|
|
|
|
if channel.id in blacklist:
|
|
|
|
can_run = False
|
|
|
|
if channel.category_id and channel.category_id in blacklist:
|
|
|
|
can_run = False
|
|
|
|
if member.id in blacklist:
|
|
|
|
can_run = False
|
|
|
|
for role in getattr(member, "roles", []):
|
|
|
|
if role.is_default():
|
|
|
|
continue
|
|
|
|
if role.id in blacklist:
|
|
|
|
can_run = False
|
|
|
|
return can_run
|
|
|
|
|
|
|
|
async def detect_language(self, text: str) -> List[List[Dict[str, str]]]:
|
|
|
|
"""
|
|
|
|
Detect the language from given text
|
|
|
|
"""
|
|
|
|
params = {"q": text, "key": self._key}
|
|
|
|
url = BASE_URL + "/language/translate/v2/detect"
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
async with session.get(url, params=params) as resp:
|
|
|
|
data = await resp.json()
|
|
|
|
if "error" in data:
|
|
|
|
log.error(data["error"]["message"])
|
|
|
|
raise GoogleTranslateAPIError(data["error"]["message"])
|
|
|
|
return data["data"]["detections"]
|
|
|
|
|
|
|
|
async def translation_embed(
|
|
|
|
self,
|
|
|
|
author: Union[discord.Member, discord.User],
|
|
|
|
translation: Tuple[str, str, str],
|
|
|
|
requestor: Optional[Union[discord.Member, discord.User]] = None,
|
|
|
|
) -> discord.Embed:
|
|
|
|
em = discord.Embed(colour=author.colour, description=translation[0])
|
|
|
|
em.set_author(name=author.display_name + _(" said:"), icon_url=str(author.avatar_url))
|
|
|
|
detail_string = _("{_from} to {_to} | Requested by ").format(
|
|
|
|
_from=translation[1].upper(), _to=translation[2].upper()
|
|
|
|
)
|
|
|
|
if requestor:
|
|
|
|
detail_string += str(requestor)
|
|
|
|
else:
|
|
|
|
detail_string += str(author)
|
|
|
|
em.set_footer(text=detail_string)
|
|
|
|
return em
|
|
|
|
|
|
|
|
async def translate_text(self, from_lang: str, target: str, text: str) -> Optional[str]:
|
|
|
|
"""
|
|
|
|
request to translate the text
|
|
|
|
"""
|
|
|
|
formatting = "text"
|
|
|
|
params = {
|
|
|
|
"q": text,
|
|
|
|
"target": target,
|
|
|
|
"key": self._key,
|
|
|
|
"format": formatting,
|
|
|
|
"source": from_lang,
|
|
|
|
}
|
|
|
|
url = BASE_URL + "/language/translate/v2"
|
|
|
|
try:
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
async with session.get(url, params=params) as resp:
|
|
|
|
data = await resp.json()
|
|
|
|
except Exception:
|
|
|
|
return None
|
|
|
|
if "error" in data:
|
|
|
|
log.error(data["error"]["message"])
|
|
|
|
raise GoogleTranslateAPIError(data["error"]["message"])
|
|
|
|
if "data" in data:
|
|
|
|
translated_text: str = data["data"]["translations"][0]["translatedText"]
|
|
|
|
return translated_text
|
|
|
|
|
|
|
|
@commands.Cog.listener()
|
|
|
|
async def on_message(self, message: discord.Message) -> None:
|
|
|
|
"""
|
|
|
|
Translates the message based off reactions
|
|
|
|
with country flags
|
|
|
|
"""
|
|
|
|
if version_info >= VersionInfo.from_str("3.2.0"):
|
|
|
|
await self.bot.wait_until_red_ready()
|
|
|
|
else:
|
|
|
|
await self.bot.wait_until_ready()
|
|
|
|
if not message.guild:
|
|
|
|
return
|
|
|
|
if message.author.bot:
|
|
|
|
return
|
|
|
|
if not await self._get_google_api_key():
|
|
|
|
return
|
|
|
|
author = cast(discord.Member, message.author)
|
|
|
|
channel = cast(discord.TextChannel, message.channel)
|
|
|
|
links = await self.config.guild(channel.guild).autosend()
|
|
|
|
link_channels = [int(r) for r in links.keys()]
|
|
|
|
guild = message.guild
|
|
|
|
if version_info >= VersionInfo.from_str("3.4.0"):
|
|
|
|
if await self.bot.cog_disabled_in_guild(self, guild):
|
|
|
|
return
|
|
|
|
if not await self.check_bw_list(guild, channel, author):
|
|
|
|
return
|
|
|
|
if not await self.config.guild(guild).text() and not channel.id in link_channels:
|
|
|
|
return
|
|
|
|
if guild.id not in self.cache["guild_messages"]:
|
|
|
|
if not await self.config.guild(guild).text() and not channel.id in link_channels:
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
self.cache["guild_messages"].append(guild.id)
|
|
|
|
if not await self.local_perms(guild, author):
|
|
|
|
return
|
|
|
|
if not await self.global_perms(author):
|
|
|
|
return
|
|
|
|
if not await self.check_ignored_channel(message):
|
|
|
|
return
|
|
|
|
flag = FLAG_REGEX.search(message.clean_content)
|
|
|
|
if flag:
|
|
|
|
await self.translate_message(message, flag.group())
|
|
|
|
elif channel.id in link_channels:
|
|
|
|
await self.translate_automessage(message, links)
|
|
|
|
|
|
|
|
@commands.Cog.listener()
|
|
|
|
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent) -> None:
|
|
|
|
"""
|
|
|
|
Translates the message based off reactions
|
|
|
|
with country flags
|
|
|
|
"""
|
|
|
|
if version_info >= VersionInfo.from_str("3.2.0"):
|
|
|
|
await self.bot.wait_until_red_ready()
|
|
|
|
else:
|
|
|
|
await self.bot.wait_until_ready()
|
|
|
|
if payload.message_id in self.cache["translations"]:
|
|
|
|
return
|
|
|
|
if str(payload.emoji) not in FLAGS:
|
|
|
|
return
|
|
|
|
if not await self._get_google_api_key():
|
|
|
|
return
|
|
|
|
channel = self.bot.get_channel(id=payload.channel_id)
|
|
|
|
if not channel:
|
|
|
|
return
|
|
|
|
try:
|
|
|
|
guild = channel.guild
|
|
|
|
except AttributeError:
|
|
|
|
return
|
|
|
|
if guild is None:
|
|
|
|
return
|
|
|
|
if version_info >= VersionInfo.from_str("3.4.0"):
|
|
|
|
if await self.bot.cog_disabled_in_guild(self, guild):
|
|
|
|
return
|
|
|
|
reacted_user = guild.get_member(payload.user_id)
|
|
|
|
if reacted_user.bot:
|
|
|
|
return
|
|
|
|
if not await self.check_bw_list(guild, channel, reacted_user):
|
|
|
|
return
|
|
|
|
|
|
|
|
if guild.id not in self.cache["guild_reactions"]:
|
|
|
|
if not await self.config.guild(guild).reaction():
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
self.cache["guild_reactions"].append(guild.id)
|
|
|
|
|
|
|
|
if not await self.local_perms(guild, reacted_user):
|
|
|
|
return
|
|
|
|
if not await self.global_perms(reacted_user):
|
|
|
|
return
|
|
|
|
try:
|
|
|
|
message = await channel.fetch_message(id=payload.message_id)
|
|
|
|
except (discord.errors.NotFound, discord.Forbidden):
|
|
|
|
return
|
|
|
|
|
|
|
|
if not await self.check_ignored_channel(message, reacted_user):
|
|
|
|
return
|
|
|
|
await self.translate_message(message, str(payload.emoji), reacted_user)
|
|
|
|
|
|
|
|
async def translate_automessage(self, message: discord.Message, links: dict) -> None:
|
|
|
|
guild = cast(discord.Guild, message.guild)
|
|
|
|
channel = cast(discord.TextChannel, message.channel)
|
|
|
|
# remove sent channel from Links
|
|
|
|
try:
|
|
|
|
del links[str(channel.id)]
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
|
|
if message.embeds != []:
|
|
|
|
if message.embeds[0].description:
|
|
|
|
to_translate = cast(str, message.embeds[0].description)
|
|
|
|
else:
|
|
|
|
to_translate = None
|
|
|
|
else:
|
|
|
|
to_translate = message.clean_content
|
|
|
|
|
|
|
|
if not to_translate:
|
|
|
|
for l_id, l_lang in links.items():
|
|
|
|
ch = guild.get_channel(int(l_id))
|
|
|
|
if not ch:
|
|
|
|
continue
|
|
|
|
|
|
|
|
if message.attachments:
|
|
|
|
files = [await a.to_file() for a in message.attachments]
|
|
|
|
else:
|
|
|
|
files = None
|
|
|
|
|
|
|
|
if message.embeds:
|
|
|
|
links = "\n".join([e.url for e in message.embeds])
|
|
|
|
else:
|
|
|
|
links = ""
|
|
|
|
|
|
|
|
await ch.send(f"**{message.author.display_name} sent:**\n{links}", files=files)
|
|
|
|
return
|
|
|
|
try:
|
|
|
|
detected_lang = await self.detect_language(to_translate)
|
|
|
|
await self.add_detect(guild)
|
|
|
|
except GoogleTranslateAPIError:
|
|
|
|
return
|
|
|
|
except Exception:
|
|
|
|
log.exception("Error detecting language")
|
|
|
|
return
|
|
|
|
|
|
|
|
original_lang = detected_lang[0][0]["language"]
|
|
|
|
for l_id, target in links.items():
|
|
|
|
ch = guild.get_channel(int(l_id))
|
|
|
|
if not ch:
|
|
|
|
continue
|
|
|
|
try:
|
|
|
|
if target == original_lang:
|
|
|
|
translated_text = to_translate
|
|
|
|
else:
|
|
|
|
translated_text = await self.translate_text(original_lang, target, to_translate)
|
|
|
|
await self.add_requests(guild, to_translate)
|
|
|
|
except Exception:
|
|
|
|
log.exception(f"Error translating message {guild=} {channel=}")
|
|
|
|
return
|
|
|
|
if not translated_text:
|
|
|
|
log.exception(f"Message not translated to {l_lang} {guild=} {channel=}")
|
|
|
|
return
|
|
|
|
author = message.author
|
|
|
|
from_lang = original_lang.upper()
|
|
|
|
to_lang = target.upper()
|
|
|
|
|
|
|
|
translation = (translated_text, from_lang, to_lang)
|
|
|
|
|
|
|
|
if message.attachments:
|
|
|
|
files = [await a.to_file() for a in message.attachments]
|
|
|
|
else:
|
|
|
|
files = None
|
|
|
|
|
|
|
|
if ch.permissions_for(guild.me).embed_links:
|
|
|
|
em = await self.translation_embed(author, translation)
|
|
|
|
translated_msg = await ch.send(embed=em, files=files)
|
|
|
|
else:
|
|
|
|
msg = _("{author} said:\n{translated_text}").format(author=author, translated_text=translated_text)
|
|
|
|
translated_msg = await ch.send(msg, files=files)
|
|
|
|
|
|
|
|
async def translate_message(
|
2022-03-28 21:08:34 +13:00
|
|
|
self,
|
|
|
|
message: discord.Message,
|
|
|
|
flag: str,
|
|
|
|
reacted_user: Optional[discord.Member] = None,
|
2022-03-08 09:51:04 +13:00
|
|
|
) -> None:
|
|
|
|
guild = cast(discord.Guild, message.guild)
|
|
|
|
channel = cast(discord.TextChannel, message.channel)
|
|
|
|
if message.id in self.cache["cooldown_translations"]:
|
|
|
|
if str(flag) in self.cache["cooldown_translations"][message.id]["past_flags"]:
|
|
|
|
return
|
|
|
|
if not self.cache["cooldown_translations"][message.id]["multiple"]:
|
|
|
|
return
|
|
|
|
if time.time() < self.cache["cooldown_translations"][message.id]["wait"]:
|
|
|
|
delete_after = self.cache["cooldown_translations"][message.id]["wait"] - time.time()
|
|
|
|
await channel.send(_("You're translating too many messages!"), delete_after=delete_after)
|
|
|
|
return
|
|
|
|
to_translate = None
|
|
|
|
if message.embeds != []:
|
|
|
|
if message.embeds[0].description:
|
|
|
|
to_translate = cast(str, message.embeds[0].description)
|
|
|
|
else:
|
|
|
|
to_translate = message.clean_content
|
|
|
|
|
|
|
|
if not to_translate:
|
|
|
|
return
|
|
|
|
num_emojis = 0
|
|
|
|
for reaction in message.reactions:
|
|
|
|
if reaction.emoji == str(flag):
|
|
|
|
num_emojis = reaction.count
|
|
|
|
if num_emojis > 1:
|
|
|
|
return
|
|
|
|
target = FLAGS[str(flag)]["code"]
|
|
|
|
try:
|
|
|
|
detected_lang = await self.detect_language(to_translate)
|
|
|
|
await self.add_detect(guild)
|
|
|
|
except GoogleTranslateAPIError:
|
|
|
|
return
|
|
|
|
except Exception:
|
|
|
|
log.exception("Error detecting language")
|
|
|
|
return
|
|
|
|
original_lang = detected_lang[0][0]["language"]
|
|
|
|
if target == original_lang:
|
|
|
|
return
|
|
|
|
try:
|
|
|
|
translated_text = await self.translate_text(original_lang, target, to_translate)
|
|
|
|
await self.add_requests(guild, to_translate)
|
|
|
|
except Exception:
|
|
|
|
log.exception(f"Error translating message {guild=} {channel=}")
|
|
|
|
return
|
|
|
|
if not translated_text:
|
|
|
|
return
|
|
|
|
author = message.author
|
|
|
|
from_lang = detected_lang[0][0]["language"].upper()
|
|
|
|
to_lang = target.upper()
|
|
|
|
if from_lang == to_lang:
|
|
|
|
# don't post anything if the detected language is the same
|
|
|
|
return
|
|
|
|
translation = (translated_text, from_lang, to_lang)
|
|
|
|
|
|
|
|
if message.id not in self.cache["cooldown_translations"]:
|
|
|
|
if not self.cache["cooldown"]:
|
|
|
|
self.cache["cooldown"] = await self.config.cooldown()
|
|
|
|
cooldown = deepcopy(self.cache["cooldown"])
|
|
|
|
else:
|
|
|
|
cooldown = self.cache["cooldown_translations"][message.id]
|
|
|
|
cooldown["wait"] = time.time() + cooldown["timeout"]
|
|
|
|
cooldown["past_flags"].append(str(flag))
|
|
|
|
self.cache["cooldown_translations"][message.id] = cooldown
|
|
|
|
|
|
|
|
if channel.permissions_for(guild.me).embed_links:
|
|
|
|
em = await self.translation_embed(author, translation, reacted_user)
|
|
|
|
if version_info >= VersionInfo.from_str("3.4.6"):
|
|
|
|
translated_msg = await channel.send(embed=em, reference=message, mention_author=False)
|
|
|
|
else:
|
|
|
|
translated_msg = await channel.send(embed=em)
|
|
|
|
else:
|
|
|
|
msg = _("{author} said:\n{translated_text}").format(author=author, translate_text=translated_text)
|
|
|
|
translated_msg = await channel.send(msg)
|
|
|
|
if not cooldown["multiple"]:
|
|
|
|
self.cache["translations"].append(translated_msg.id)
|
|
|
|
|
|
|
|
async def local_perms(self, guild: discord.Guild, author: discord.Member) -> bool:
|
|
|
|
"""Check the user is/isn't locally whitelisted/blacklisted.
|
|
|
|
https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/release/3.0.0/redbot/core/global_checks.py
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
return await self.bot.allowed_by_whitelist_blacklist(
|
|
|
|
author, who_id=author.id, guild_id=guild.id, role_ids=[r.id for r in author.roles]
|
|
|
|
)
|
|
|
|
except AttributeError:
|
|
|
|
if await self.bot.is_owner(author):
|
|
|
|
return True
|
|
|
|
elif guild is None:
|
|
|
|
return True
|
|
|
|
guild_settings = self.bot.db.guild(guild)
|
|
|
|
local_blacklist = await guild_settings.blacklist()
|
|
|
|
local_whitelist = await guild_settings.whitelist()
|
|
|
|
|
|
|
|
_ids = [r.id for r in author.roles if not r.is_default()]
|
|
|
|
_ids.append(author.id)
|
|
|
|
if local_whitelist:
|
|
|
|
return any(i in local_whitelist for i in _ids)
|
|
|
|
|
|
|
|
return not any(i in local_blacklist for i in _ids)
|
|
|
|
|
|
|
|
async def global_perms(self, author: discord.Member) -> bool:
|
|
|
|
"""Check the user is/isn't globally whitelisted/blacklisted.
|
|
|
|
https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/release/3.0.0/redbot/core/global_checks.py
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
return await self.bot.allowed_by_whitelist_blacklist(author)
|
|
|
|
except AttributeError:
|
|
|
|
if await self.bot.is_owner(author):
|
|
|
|
return True
|
|
|
|
whitelist = await self.bot.db.whitelist()
|
|
|
|
if whitelist:
|
|
|
|
return author.id in whitelist
|
|
|
|
|
|
|
|
return author.id not in await self.bot.db.blacklist()
|
|
|
|
|
|
|
|
async def check_ignored_channel(
|
|
|
|
self, message: discord.Message, reacting_user: Optional[discord.Member] = None
|
|
|
|
) -> bool:
|
|
|
|
"""
|
|
|
|
https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/release/3.0.0/redbot/cogs/mod/mod.py#L1273
|
|
|
|
"""
|
|
|
|
if version_info >= VersionInfo.from_str("3.3.6"):
|
|
|
|
ctx = await self.bot.get_context(message)
|
|
|
|
if reacting_user:
|
|
|
|
ctx.author = reacting_user
|
|
|
|
return await self.bot.ignored_channel_or_guild(ctx)
|
|
|
|
# everything below this can be removed at a later date when support
|
|
|
|
# for previous versions are no longer required.
|
|
|
|
channel = cast(discord.TextChannel, message.channel)
|
|
|
|
guild = channel.guild
|
|
|
|
author = cast(discord.Member, message.author)
|
|
|
|
if reacting_user:
|
|
|
|
author = reacting_user
|
|
|
|
mod = self.bot.get_cog("Mod")
|
|
|
|
if mod is None:
|
|
|
|
return True
|
|
|
|
perms = channel.permissions_for(author)
|
|
|
|
surpass_ignore = (
|
|
|
|
isinstance(channel, discord.abc.PrivateChannel)
|
|
|
|
or perms.manage_guild
|
|
|
|
or await self.bot.is_owner(author)
|
|
|
|
or await self.bot.is_admin(author)
|
|
|
|
)
|
|
|
|
if surpass_ignore:
|
|
|
|
return True
|
|
|
|
guild_ignored = await mod.settings.guild(guild).ignored()
|
|
|
|
chann_ignored = await mod.settings.channel(channel).ignored()
|
|
|
|
return not (guild_ignored or chann_ignored and not perms.manage_channels)
|
|
|
|
|
|
|
|
@commands.Cog.listener()
|
|
|
|
async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str, str]) -> None:
|
|
|
|
if service_name != "google_translate":
|
|
|
|
return
|
|
|
|
|
|
|
|
self._key = None
|