import asyncio import logging import re import time from copy import deepcopy from typing import Dict, List, Mapping, Optional, Tuple, Union, cast import aiohttp import discord from discord.ext.commands.converter import Converter from discord.ext.commands.errors import BadArgument from redbot.core import Config, VersionInfo, commands, version_info from redbot.core.bot import Red from redbot.core.i18n import Translator from .errors import GoogleTranslateAPIError from .flags import FLAGS BASE_URL = "https://translation.googleapis.com" _ = Translator("Translate", __file__) log = logging.getLogger("red.trusty-cogs.Translate") FLAG_REGEX = re.compile(r"|".join(rf"{re.escape(f)}" for f in FLAGS.keys())) class FlagTranslation(Converter): """ This will convert flags and languages to the correct code to be used by the API Guidance code on how to do this from: https://github.com/Rapptz/discord.py/blob/rewrite/discord/ext/commands/converter.py#L85 https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/develop/redbot/cogs/mod/mod.py#L24 """ async def convert(self, ctx: commands.Context, argument: str) -> List[str]: result = [] args = [s.strip() for s in argument.split(",")] for arg in args: r = None if arg in FLAGS: r = FLAGS[arg]["code"].upper() else: for lang in FLAGS: if FLAGS[lang]["name"].lower() in arg.lower(): r = FLAGS[lang]["code"] break if FLAGS[lang]["country"].lower() in arg.lower(): r = FLAGS[lang]["code"] break if not FLAGS[lang]["code"]: continue if FLAGS[lang]["code"] in arg.lower() and len(arg) == 2: r = FLAGS[lang]["code"] break if r is None: raise BadArgument('Language "{}" not found'.format(arg)) result.append(r) # filter out repeated languages result = list(set(result)) if not result: raise BadArgument('Language(s) "{}" not found'.format(argument)) return result class GoogleTranslateAPI: config: Config bot: Red cache: dict _key: Optional[str] _guild_counter: Dict[int, Dict[str, int]] _global_counter: Dict[str, int] def __init__(self, *_args): self.config: Config self.bot: Red self.cache: dict self._key: Optional[str] self._guild_counter: Dict[int, Dict[str, int]] self._global_counter: Dict[str, int] async def cleanup_cache(self) -> None: if version_info >= VersionInfo.from_str("3.2.0"): await self.bot.wait_until_red_ready() else: await self.bot.wait_until_ready() while self is self.bot.get_cog("Translate"): # cleanup the cache every 10 minutes self.cache["translations"] = [] await asyncio.sleep(600) async def save_usage(self) -> None: if version_info >= VersionInfo.from_str("3.2.0"): await self.bot.wait_until_red_ready() else: await self.bot.wait_until_ready() while self is self.bot.get_cog("Translate"): # Save usage stats every couple minutes await self._save_usage_stats() await asyncio.sleep(120) async def _save_usage_stats(self): async with self.config.count() as count: for key, value in self._global_counter.items(): count[key] = value for guild_id, data in self._guild_counter.items(): async with self.config.guild_from_id(guild_id).count() as count: for key, value in data.items(): count[key] = value async def add_detect(self, guild: Optional[discord.Guild]): if guild: log.debug(f"adding detect to {guild.name}") if guild.id not in self._guild_counter: self._guild_counter[guild.id] = await self.config.guild(guild).count() self._guild_counter[guild.id]["detect"] += 1 if not self._global_counter: self._global_counter = await self.config.count() self._global_counter["detect"] += 1 async def add_requests(self, guild: Optional[discord.Guild], message: str): if guild: log.debug(f"Adding requests to {guild.name}") if guild.id not in self._guild_counter: self._guild_counter[guild.id] = await self.config.guild(guild).count() self._guild_counter[guild.id]["requests"] += 1 self._guild_counter[guild.id]["characters"] += len(message) if not self._global_counter: self._global_counter = await self.config.count() self._global_counter["requests"] += 1 self._global_counter["characters"] += len(message) async def _get_google_api_key(self) -> Optional[str]: key = {} if not self._key: try: key = await self.bot.get_shared_api_tokens("google_translate") except AttributeError: # Red 3.1 support key = await self.bot.db.api_tokens.get_raw("google_translate", default={}) self._key = key.get("api_key") return self._key async def _bw_list_cache_update(self, guild: discord.Guild) -> None: self.cache["guild_blacklist"][guild.id] = await self.config.guild(guild).blacklist() self.cache["guild_whitelist"][guild.id] = await self.config.guild(guild).whitelist() async def check_bw_list( self, guild: discord.Guild, channel: discord.TextChannel, member: Union[discord.Member, discord.User], ) -> bool: can_run = True if guild.id not in self.cache["guild_blacklist"]: self.cache["guild_blacklist"][guild.id] = await self.config.guild(guild).blacklist() if guild.id not in self.cache["guild_whitelist"]: self.cache["guild_whitelist"][guild.id] = await self.config.guild(guild).whitelist() whitelist = self.cache["guild_whitelist"][guild.id] blacklist = self.cache["guild_blacklist"][guild.id] if whitelist: can_run = False if channel.id in whitelist: can_run = True if channel.category_id and channel.category_id in whitelist: can_run = True if member.id in whitelist: can_run = True for role in getattr(member, "roles", []): if role.is_default(): continue if role.id in whitelist: can_run = True return can_run else: if channel.id in blacklist: can_run = False if channel.category_id and channel.category_id in blacklist: can_run = False if member.id in blacklist: can_run = False for role in getattr(member, "roles", []): if role.is_default(): continue if role.id in blacklist: can_run = False return can_run async def detect_language(self, text: str) -> List[List[Dict[str, str]]]: """ Detect the language from given text """ params = {"q": text, "key": self._key} url = BASE_URL + "/language/translate/v2/detect" async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as resp: data = await resp.json() if "error" in data: log.error(data["error"]["message"]) raise GoogleTranslateAPIError(data["error"]["message"]) return data["data"]["detections"] async def translation_embed( self, author: Union[discord.Member, discord.User], translation: Tuple[str, str, str], requestor: Optional[Union[discord.Member, discord.User]] = None, ) -> discord.Embed: em = discord.Embed(colour=author.colour, description=translation[0]) em.set_author(name=author.display_name + _(" said:"), icon_url=str(author.avatar_url)) detail_string = _("{_from} to {_to} | Requested by ").format( _from=translation[1].upper(), _to=translation[2].upper() ) if requestor: detail_string += str(requestor) else: detail_string += str(author) em.set_footer(text=detail_string) return em async def translate_text(self, from_lang: str, target: str, text: str) -> Optional[str]: """ request to translate the text """ formatting = "text" params = { "q": text, "target": target, "key": self._key, "format": formatting, "source": from_lang, } url = BASE_URL + "/language/translate/v2" try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as resp: data = await resp.json() except Exception: return None if "error" in data: log.error(data["error"]["message"]) raise GoogleTranslateAPIError(data["error"]["message"]) if "data" in data: translated_text: str = data["data"]["translations"][0]["translatedText"] return translated_text @commands.Cog.listener() async def on_message(self, message: discord.Message) -> None: """ Translates the message based off reactions with country flags """ if version_info >= VersionInfo.from_str("3.2.0"): await self.bot.wait_until_red_ready() else: await self.bot.wait_until_ready() if not message.guild: return if message.author.bot: return if not await self._get_google_api_key(): return author = cast(discord.Member, message.author) channel = cast(discord.TextChannel, message.channel) links = await self.config.guild(channel.guild).autosend() link_channels = [] for l in links: link_channels += [int(r) for r in l.keys()] # for checks guild = message.guild if version_info >= VersionInfo.from_str("3.4.0"): if await self.bot.cog_disabled_in_guild(self, guild): return if not await self.check_bw_list(guild, channel, author): return if not await self.config.guild(guild).text() and not channel.id in link_channels: return if guild.id not in self.cache["guild_messages"]: if not await self.config.guild(guild).text() and not channel.id in link_channels: return else: self.cache["guild_messages"].append(guild.id) if not await self.local_perms(guild, author): return if not await self.global_perms(author): return if not await self.check_ignored_channel(message): return flag = FLAG_REGEX.search(message.clean_content) if flag: await self.translate_message(message, flag.group()) elif channel.id in link_channels: await self.translate_automessage(message, links) @commands.Cog.listener() async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent) -> None: """ Translates the message based off reactions with country flags """ if version_info >= VersionInfo.from_str("3.2.0"): await self.bot.wait_until_red_ready() else: await self.bot.wait_until_ready() if payload.message_id in self.cache["translations"]: return if str(payload.emoji) not in FLAGS: return if not await self._get_google_api_key(): return channel = self.bot.get_channel(id=payload.channel_id) if not channel: return try: guild = channel.guild except AttributeError: return if guild is None: return if version_info >= VersionInfo.from_str("3.4.0"): if await self.bot.cog_disabled_in_guild(self, guild): return reacted_user = guild.get_member(payload.user_id) if reacted_user.bot: return if not await self.check_bw_list(guild, channel, reacted_user): return if guild.id not in self.cache["guild_reactions"]: if not await self.config.guild(guild).reaction(): return else: self.cache["guild_reactions"].append(guild.id) if not await self.local_perms(guild, reacted_user): return if not await self.global_perms(reacted_user): return try: message = await channel.fetch_message(id=payload.message_id) except (discord.errors.NotFound, discord.Forbidden): return if not await self.check_ignored_channel(message, reacted_user): return await self.translate_message(message, str(payload.emoji), reacted_user) async def translate_automessage(self, message: discord.Message, all_links: list) -> None: guild = cast(discord.Guild, message.guild) channel = cast(discord.TextChannel, message.channel) for links in all_links: if str(channel.id) not in links: continue # remove sent channel from Links try: del links[str(channel.id)] except: pass if message.embeds != []: if message.embeds[0].description: to_translate = cast(str, message.embeds[0].description) else: to_translate = None else: to_translate = message.clean_content if not to_translate: for l_id, l_lang in links.items(): ch = guild.get_channel(int(l_id)) if not ch: continue if message.attachments: files = [await a.to_file() for a in message.attachments] else: files = None if message.embeds: links = "\n".join([e.url for e in message.embeds]) else: links = "" await ch.send(f"**{message.author.display_name} sent:**\n{links}", files=files) return try: detected_lang = await self.detect_language(to_translate) await self.add_detect(guild) except GoogleTranslateAPIError: return except Exception: log.exception("Error detecting language") return original_lang = detected_lang[0][0]["language"] for l_id, target in links.items(): ch = guild.get_channel(int(l_id)) if not ch: continue try: if target == original_lang: translated_text = to_translate else: translated_text = await self.translate_text(original_lang, target, to_translate) await self.add_requests(guild, to_translate) except Exception: log.exception(f"Error translating message {guild=} {channel=}") return if not translated_text: log.exception(f"Message not translated to {l_lang} {guild=} {channel=}") return author = message.author from_lang = original_lang.upper() to_lang = target.upper() translation = (translated_text, from_lang, to_lang) if message.attachments: files = [await a.to_file() for a in message.attachments] else: files = None if ch.permissions_for(guild.me).embed_links: em = await self.translation_embed(author, translation) translated_msg = await ch.send(embed=em, files=files) else: msg = _("{author} said:\n{translated_text}").format(author=author, translated_text=translated_text) translated_msg = await ch.send(msg, files=files) async def translate_message( self, message: discord.Message, flag: str, reacted_user: Optional[discord.Member] = None, ) -> None: guild = cast(discord.Guild, message.guild) channel = cast(discord.TextChannel, message.channel) if message.id in self.cache["cooldown_translations"]: if str(flag) in self.cache["cooldown_translations"][message.id]["past_flags"]: return if not self.cache["cooldown_translations"][message.id]["multiple"]: return if time.time() < self.cache["cooldown_translations"][message.id]["wait"]: delete_after = self.cache["cooldown_translations"][message.id]["wait"] - time.time() await channel.send(_("You're translating too many messages!"), delete_after=delete_after) return to_translate = None if message.embeds != []: if message.embeds[0].description: to_translate = cast(str, message.embeds[0].description) else: to_translate = message.clean_content if not to_translate: return num_emojis = 0 for reaction in message.reactions: if reaction.emoji == str(flag): num_emojis = reaction.count if num_emojis > 1: return target = FLAGS[str(flag)]["code"] try: detected_lang = await self.detect_language(to_translate) await self.add_detect(guild) except GoogleTranslateAPIError: return except Exception: log.exception("Error detecting language") return original_lang = detected_lang[0][0]["language"] if target == original_lang: return try: translated_text = await self.translate_text(original_lang, target, to_translate) await self.add_requests(guild, to_translate) except Exception: log.exception(f"Error translating message {guild=} {channel=}") return if not translated_text: return author = message.author from_lang = detected_lang[0][0]["language"].upper() to_lang = target.upper() if from_lang == to_lang: # don't post anything if the detected language is the same return translation = (translated_text, from_lang, to_lang) if message.id not in self.cache["cooldown_translations"]: if not self.cache["cooldown"]: self.cache["cooldown"] = await self.config.cooldown() cooldown = deepcopy(self.cache["cooldown"]) else: cooldown = self.cache["cooldown_translations"][message.id] cooldown["wait"] = time.time() + cooldown["timeout"] cooldown["past_flags"].append(str(flag)) self.cache["cooldown_translations"][message.id] = cooldown if channel.permissions_for(guild.me).embed_links: em = await self.translation_embed(author, translation, reacted_user) if version_info >= VersionInfo.from_str("3.4.6"): translated_msg = await channel.send(embed=em, reference=message, mention_author=False) else: translated_msg = await channel.send(embed=em) else: msg = _("{author} said:\n{translated_text}").format(author=author, translate_text=translated_text) translated_msg = await channel.send(msg) if not cooldown["multiple"]: self.cache["translations"].append(translated_msg.id) async def local_perms(self, guild: discord.Guild, author: discord.Member) -> bool: """Check the user is/isn't locally whitelisted/blacklisted. https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/release/3.0.0/redbot/core/global_checks.py """ try: return await self.bot.allowed_by_whitelist_blacklist( author, who_id=author.id, guild_id=guild.id, role_ids=[r.id for r in author.roles] ) except AttributeError: if await self.bot.is_owner(author): return True elif guild is None: return True guild_settings = self.bot.db.guild(guild) local_blacklist = await guild_settings.blacklist() local_whitelist = await guild_settings.whitelist() _ids = [r.id for r in author.roles if not r.is_default()] _ids.append(author.id) if local_whitelist: return any(i in local_whitelist for i in _ids) return not any(i in local_blacklist for i in _ids) async def global_perms(self, author: discord.Member) -> bool: """Check the user is/isn't globally whitelisted/blacklisted. https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/release/3.0.0/redbot/core/global_checks.py """ try: return await self.bot.allowed_by_whitelist_blacklist(author) except AttributeError: if await self.bot.is_owner(author): return True whitelist = await self.bot.db.whitelist() if whitelist: return author.id in whitelist return author.id not in await self.bot.db.blacklist() async def check_ignored_channel( self, message: discord.Message, reacting_user: Optional[discord.Member] = None ) -> bool: """ https://github.com/Cog-Creators/Red-DiscordBot/blob/V3/release/3.0.0/redbot/cogs/mod/mod.py#L1273 """ if version_info >= VersionInfo.from_str("3.3.6"): ctx = await self.bot.get_context(message) if reacting_user: ctx.author = reacting_user return await self.bot.ignored_channel_or_guild(ctx) # everything below this can be removed at a later date when support # for previous versions are no longer required. channel = cast(discord.TextChannel, message.channel) guild = channel.guild author = cast(discord.Member, message.author) if reacting_user: author = reacting_user mod = self.bot.get_cog("Mod") if mod is None: return True perms = channel.permissions_for(author) surpass_ignore = ( isinstance(channel, discord.abc.PrivateChannel) or perms.manage_guild or await self.bot.is_owner(author) or await self.bot.is_admin(author) ) if surpass_ignore: return True guild_ignored = await mod.settings.guild(guild).ignored() chann_ignored = await mod.settings.channel(channel).ignored() return not (guild_ignored or chann_ignored and not perms.manage_channels) @commands.Cog.listener() async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str, str]) -> None: if service_name != "google_translate": return self._key = None