1
0
Fork 0
mirror of synced 2024-04-29 10:12:29 +12:00
Bonfire/cogs/owner.py
2021-04-01 00:19:24 -08:00

281 lines
9 KiB
Python

from discord.ext import commands
import asyncio
from contextlib import redirect_stdout
import discord
import inspect
import io
import re
import textwrap
import traceback
def get_syntax_error(e):
if e.text is None:
return "```py\n{0.__class__.__name__}: {0}\n```".format(e)
return "```py\n{0.text}{1:>{0.offset}}\n{2}: {0}```".format(
e, "^", type(e).__name__
)
class Owner(commands.Cog):
"""Commands that can only be used by the owner of the bot, bot management commands"""
_last_result = None
sessions = set()
async def cog_check(self, ctx):
return await ctx.bot.is_owner(ctx.author)
@staticmethod
def cleanup_code(content):
"""Automatically removes code blocks from the code."""
# remove ```py\n```
if content.startswith("```") and content.endswith("```"):
return "\n".join(content.split("\n")[1:-1])
# remove `foo`
return content.strip("` \n")
@commands.command(hidden=True)
async def repl(self, ctx):
msg = ctx.message
variables = {
"ctx": ctx,
"bot": ctx.bot,
"message": msg,
"guild": msg.guild,
"server": msg.guild,
"channel": msg.channel,
"author": msg.author,
"self": self,
"_": None,
}
if msg.channel.id in self.sessions:
await ctx.send(
"Already running a REPL session in this channel. Exit it with `quit`."
)
return
self.sessions.add(msg.channel.id)
await ctx.send("Enter code to execute or evaluate. `exit()` or `quit` to exit.")
def check(m):
return (
m.author.id == msg.author.id
and m.channel.id == msg.channel.id
and m.content.startswith("`")
)
code = None
while True:
try:
response = await ctx.bot.wait_for(
"message", check=check, timeout=10.0 * 60.0
)
except asyncio.TimeoutError:
await ctx.send("Exiting REPL session.")
self.sessions.remove(msg.channel.id)
break
cleaned = self.cleanup_code(response.content)
if cleaned in ("quit", "exit", "exit()"):
await ctx.send("Exiting.")
self.sessions.remove(msg.channel.id)
return
executor = exec
if cleaned.count("\n") == 0:
# single statement, potentially 'eval'
try:
code = compile(cleaned, "<repl session>", "eval")
except SyntaxError:
pass
else:
executor = eval
if executor is exec:
try:
code = compile(cleaned, "<repl session>", "exec")
except SyntaxError as e:
await ctx.send(get_syntax_error(e))
continue
variables["message"] = response
fmt = None
stdout = io.StringIO()
try:
with redirect_stdout(stdout):
result = executor(code, variables)
if inspect.isawaitable(result):
result = await result
except Exception:
value = stdout.getvalue()
fmt = "```py\n{}{}\n```".format(value, traceback.format_exc())
else:
value = stdout.getvalue()
if result is not None:
fmt = "```py\n{}{}\n```".format(value, result)
variables["_"] = result
elif value:
fmt = "```py\n{}\n```".format(value)
try:
if fmt is not None:
if len(fmt) > 2000:
await ctx.send("Content too big to be printed.")
else:
await ctx.send(fmt)
except discord.Forbidden:
pass
except discord.HTTPException as e:
await ctx.send("Unexpected error: `{}`".format(e))
@commands.command()
async def sendtochannel(self, ctx, cid: int, *, message):
"""Sends a message to a provided channel, by ID"""
channel = ctx.bot.get_channel(cid)
await channel.send(message)
try:
await ctx.message.delete()
except discord.Forbidden:
pass
@commands.command()
async def debug(self, ctx, *, body: str):
env = {
"bot": ctx.bot,
"ctx": ctx,
"channel": ctx.message.channel,
"author": ctx.message.author,
"server": ctx.message.guild,
"guild": ctx.message.guild,
"message": ctx.message,
"self": self,
"_": self._last_result,
}
env.update(globals())
body = self.cleanup_code(body)
stdout = io.StringIO()
to_compile = "async def func():\n%s" % textwrap.indent(body, " ")
try:
exec(to_compile, env)
except SyntaxError as e:
return await ctx.send(get_syntax_error(e))
func = env["func"]
try:
with redirect_stdout(stdout):
ret = await func()
except Exception:
value = stdout.getvalue()
await ctx.send(f"```py\n{value}{traceback.format_exc()}\n```"[:2000])
else:
value = stdout.getvalue()
try:
await ctx.message.add_reaction("\u2705")
except Exception:
pass
if ret is None:
if value:
await ctx.send(f"```py\n{value}\n```"[:2000])
else:
self._last_result = ret
await ctx.send(f"```py\n{value}{ret}\n```"[:2000])
@commands.command()
async def bash(self, ctx, *, cmd: str):
"""Runs a bash command"""
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
)
m = r"(https?:\/\/(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b[-a-zA-Z0-9()@:%_\+.~#?&//=]*)"
stdout = (await proc.communicate())[0]
if stdout:
output = re.sub(m, r"<\1>", stdout.decode())
await ctx.send(f"[stdout]\n{output}")
else:
await ctx.send("Process finished, no output")
@commands.command()
async def shutdown(self, ctx):
"""Shuts the bot down"""
fmt = "Shutting down, I will miss you {0.author.name}"
await ctx.send(fmt.format(ctx.message))
await ctx.bot.logout()
await ctx.bot.close()
@commands.command()
async def name(self, ctx, new_nick: str):
"""Changes the bot's name"""
await ctx.bot.user.edit(username=new_nick)
await ctx.send("Changed username to " + new_nick)
@commands.command()
async def status(self, ctx, *, status: str):
"""Changes the bot's 'playing' status"""
await ctx.bot.change_presence(activity=discord.Game(name=status, type=0))
await ctx.send("Just changed my status to '{}'!".format(status))
@commands.command()
async def load(self, ctx, *, module: str):
"""Loads a module"""
# Do this because I'm too lazy to type cogs.module
module = module.lower()
if not module.startswith("cogs"):
module = "cogs.{}".format(module)
# This try catch will catch errors such as syntax errors in the module we are loading
try:
ctx.bot.load_extension(module)
await ctx.send("I have just loaded the {} module".format(module))
except Exception as error:
fmt = "An error occurred while processing this request: ```py\n{}: {}\n```"
await ctx.send(fmt.format(type(error).__name__, error))
@commands.command()
async def unload(self, ctx, *, module: str):
"""Unloads a module"""
# Do this because I'm too lazy to type cogs.module
module = module.lower()
if not module.startswith("cogs"):
module = "cogs.{}".format(module)
ctx.bot.unload_extension(module)
await ctx.send("I have just unloaded the {} module".format(module))
@commands.command()
async def reload(self, ctx, *, module: str):
"""Reloads a module"""
# Do this because I'm too lazy to type cogs.module
module = module.lower()
if not module.startswith("cogs"):
module = "cogs.{}".format(module)
ctx.bot.unload_extension(module)
# This try block will catch errors such as syntax errors in the module we are loading
try:
ctx.bot.load_extension(module)
await ctx.send("I have just reloaded the {} module".format(module))
except Exception as error:
fmt = "An error occurred while processing this request: ```py\n{}: {}\n```"
await ctx.send(fmt.format(type(error).__name__, error))
def setup(bot):
bot.add_cog(Owner(bot))