Add a repl command
This commit is contained in:
parent
30df3be94a
commit
0806cdadb2
109
cogs/owner.py
109
cogs/owner.py
|
@ -9,6 +9,10 @@ import aiohttp
|
|||
import discord
|
||||
import inspect
|
||||
import pendulum
|
||||
import textwrap
|
||||
import traceback
|
||||
from contextlib import redirect_stdout
|
||||
import io
|
||||
|
||||
|
||||
class Owner:
|
||||
|
@ -16,6 +20,111 @@ class Owner:
|
|||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self._last_result = None
|
||||
self.sessions = set()
|
||||
|
||||
def cleanup_code(self, content):
|
||||
"""Automatically removes code blocks from the code."""
|
||||
# remove ```py\n```
|
||||
if content.startswith('```') and content.endswith('```'):
|
||||
return '\n'.join(content.split('\n')[1:-1])
|
||||
|
||||
# remove `foo`
|
||||
return content.strip('` \n')
|
||||
|
||||
def get_syntax_error(self, e):
|
||||
return '```py\n{0.text}{1:>{0.offset}}\n{2}: {0}```'.format(e, '^', type(e).__name__)
|
||||
|
||||
@commands.command(hidden=True)
|
||||
@commands.check(utils.is_owner)
|
||||
async def repl(self, ctx):
|
||||
msg = ctx.message
|
||||
|
||||
variables = {
|
||||
'ctx': ctx,
|
||||
'bot': self.bot,
|
||||
'message': msg,
|
||||
'guild': msg.guild,
|
||||
'channel': msg.channel,
|
||||
'author': msg.author,
|
||||
'_': None,
|
||||
}
|
||||
|
||||
if msg.channel.id in self.sessions:
|
||||
await ctx.send('Already running a REPL session in this channel. Exit it with `quit`.')
|
||||
return
|
||||
|
||||
self.sessions.add(msg.channel.id)
|
||||
await ctx.send('Enter code to execute or evaluate. `exit()` or `quit` to exit.')
|
||||
|
||||
def check(m):
|
||||
return m.author.id == msg.author.id and \
|
||||
m.channel.id == msg.channel.id and \
|
||||
m.content.startswith('`')
|
||||
|
||||
while True:
|
||||
try:
|
||||
response = await self.bot.wait_for('message', check=check, timeout=10.0 * 60.0)
|
||||
except asyncio.TimeoutError:
|
||||
await ctx.send('Exiting REPL session.')
|
||||
self.sessions.remove(msg.channel.id)
|
||||
break
|
||||
|
||||
cleaned = self.cleanup_code(response.content)
|
||||
|
||||
if cleaned in ('quit', 'exit', 'exit()'):
|
||||
await ctx.send('Exiting.')
|
||||
self.sessions.remove(msg.channel.id)
|
||||
return
|
||||
|
||||
executor = exec
|
||||
if cleaned.count('\n') == 0:
|
||||
# single statement, potentially 'eval'
|
||||
try:
|
||||
code = compile(cleaned, '<repl session>', 'eval')
|
||||
except SyntaxError:
|
||||
pass
|
||||
else:
|
||||
executor = eval
|
||||
|
||||
if executor is exec:
|
||||
try:
|
||||
code = compile(cleaned, '<repl session>', 'exec')
|
||||
except SyntaxError as e:
|
||||
await ctx.send(self.get_syntax_error(e))
|
||||
continue
|
||||
|
||||
variables['message'] = response
|
||||
|
||||
fmt = None
|
||||
stdout = io.StringIO()
|
||||
|
||||
try:
|
||||
with redirect_stdout(stdout):
|
||||
result = executor(code, variables)
|
||||
if inspect.isawaitable(result):
|
||||
result = await result
|
||||
except Exception as e:
|
||||
value = stdout.getvalue()
|
||||
fmt = '```py\n{}{}\n```'.format(value, traceback.format_exc())
|
||||
else:
|
||||
value = stdout.getvalue()
|
||||
if result is not None:
|
||||
fmt = '```py\n{}{}\n```'.format(value, result)
|
||||
variables['_'] = result
|
||||
elif value:
|
||||
fmt = '```py\n{}\n```'.format(value)
|
||||
|
||||
try:
|
||||
if fmt is not None:
|
||||
if len(fmt) > 2000:
|
||||
await ctx.send('Content too big to be printed.')
|
||||
else:
|
||||
await ctx.send(fmt)
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
except discord.HTTPException as e:
|
||||
await ctx.send('Unexpected error: `{}`'.format(e))
|
||||
|
||||
@commands.command()
|
||||
@commands.check(utils.is_owner)
|
||||
|
|
Loading…
Reference in a new issue