Merge branch 'rewrite' of https://github.com/Phxntxm/Bonfire into rewrite
This commit is contained in:
commit
94c5ef1873
|
@ -198,7 +198,8 @@ class Core:
|
|||
if bj_games:
|
||||
embed.add_field(name='Total blackjack games running', value=bj_games)
|
||||
|
||||
embed.add_field(name='Uptime', value=(pendulum.utcnow() - self.bot.uptime).in_words())
|
||||
if hasattr(self.bot, 'uptime'):
|
||||
embed.add_field(name='Uptime', value=(pendulum.utcnow() - self.bot.uptime).in_words())
|
||||
embed.set_footer(text=self.bot.description)
|
||||
|
||||
await ctx.send(embed=embed)
|
||||
|
@ -210,7 +211,10 @@ class Core:
|
|||
|
||||
EXAMPLE: !uptime
|
||||
RESULT: A BAJILLION DAYS"""
|
||||
await ctx.send("Uptime: ```\n{}```".format((pendulum.utcnow() - self.bot.uptime).in_words()))
|
||||
if hasattr(self.bot, 'uptime'):
|
||||
await ctx.send("Uptime: ```\n{}```".format((pendulum.utcnow() - self.bot.uptime).in_words()))
|
||||
else:
|
||||
await ctx.send("I've just restarted and not quite ready yet...gimme time I'm not a morning pony :c")
|
||||
|
||||
@commands.command(aliases=['invite'])
|
||||
@utils.custom_perms(send_messages=True)
|
||||
|
|
164
cogs/da.py
164
cogs/da.py
|
@ -1,164 +0,0 @@
|
|||
import aiohttp
|
||||
import asyncio
|
||||
import discord
|
||||
import traceback
|
||||
import logging
|
||||
|
||||
from discord.ext import commands
|
||||
|
||||
from . import utils
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
class Deviantart:
|
||||
def __init__(self, bot):
|
||||
self.base_url = "https://www.deviantart.com/api/v1/oauth2/gallery/all"
|
||||
self.bot = bot
|
||||
self.token = None
|
||||
self.params = None
|
||||
bot.loop.create_task(self.token_task())
|
||||
bot.loop.create_task(self.post_task())
|
||||
|
||||
async def token_task(self):
|
||||
while True:
|
||||
expires_in = await self.get_token()
|
||||
await asyncio.sleep(expires_in)
|
||||
|
||||
async def post_task(self):
|
||||
await asyncio.sleep(5)
|
||||
# Lets start the task a few seconds after, to ensure our token gets set
|
||||
while True:
|
||||
await self.check_posts()
|
||||
await asyncio.sleep(300)
|
||||
|
||||
async def get_token(self):
|
||||
# We need a token to create requests, it doesn't seem this token goes away
|
||||
# To get this token, we need to make a request and retrieve that
|
||||
url = 'https://www.deviantart.com/oauth2/token'
|
||||
params = {'client_id': utils.da_id,
|
||||
'client_secret': utils.da_secret,
|
||||
'grant_type': 'client_credentials'}
|
||||
|
||||
data = await utils.request(url, payload=params)
|
||||
|
||||
self.token = data.get('access_token', None)
|
||||
self.params = {'access_token': self.token}
|
||||
# Make sure we refresh our token, based on when they tell us it expires
|
||||
# Ensure we call it a few seconds earlier, to give us enough time to set the new token
|
||||
# If there was an issue, lets call this in a minute again
|
||||
return data.get('expires_in', 65) - 5
|
||||
|
||||
async def check_posts(self):
|
||||
content = await utils.get_content('deviantart')
|
||||
# People might sub to the same person, so lets cache every person and their last update
|
||||
cache = {}
|
||||
|
||||
if not content:
|
||||
return
|
||||
|
||||
try:
|
||||
for entry in content:
|
||||
user = discord.utils.get(self.bot.get_all_members(), id=entry['member_id'])
|
||||
# If the bot is not in the server with the member, we might not be able to find this user.
|
||||
if user is None:
|
||||
continue
|
||||
|
||||
params = self.params.copy()
|
||||
# Now loop through the subscriptions
|
||||
for da_name in entry['subbed']:
|
||||
# Check what the last updated content we sent to this user was
|
||||
# Since we cannot go back in time, if this doesn't match the last uploaded from the user
|
||||
# Assume we need to notify the user of this post
|
||||
last_updated_id = entry['last_updated'].get(da_name, None)
|
||||
# Check if this user has been requested already, if so we don't need to make another request
|
||||
result = cache.get(da_name, None)
|
||||
if result is None:
|
||||
params['username'] = da_name
|
||||
data = await utils.request(self.base_url, payload=params)
|
||||
if data is None:
|
||||
continue
|
||||
elif not data['results']:
|
||||
continue
|
||||
|
||||
result = data['results'][0]
|
||||
cache[da_name] = result
|
||||
|
||||
# This means that our last update to this user, for this author, is not the same
|
||||
if last_updated_id != result['deviationid']:
|
||||
# First lets check if the last updated ID was None, if so...then we haven't alerted them yet
|
||||
# We don't want to alert them in this case
|
||||
# We just want to act like the artist's most recent update was the last notified
|
||||
# So just notify the user if this is not None
|
||||
if last_updated_id is not None:
|
||||
fmt = "There has been a new post by an artist you are subscribed to!\n\n" \
|
||||
"**Title:** {}\n**User:** {}\n**URL:** {}".format(
|
||||
result['title'],
|
||||
result['author']['username'],
|
||||
result['url'])
|
||||
await user.send(fmt)
|
||||
# Now we can update the user's last updated for this DA
|
||||
# We want to do this whether or not our last if statement was met
|
||||
update = {'last_updated': {da_name: result['deviationid']}}
|
||||
await utils.update_content('deviantart', update, str(user.id))
|
||||
except Exception as e:
|
||||
tb = traceback.format_exc()
|
||||
fmt = "{1}\n{0.__class__.__name__}: {0}".format(tb, e)
|
||||
log.error(fmt)
|
||||
|
||||
@commands.group()
|
||||
@utils.custom_perms(send_messages=True)
|
||||
async def da(self, ctx):
|
||||
"""This provides a sort of 'RSS' feed for subscribed to artists.
|
||||
Subscribe to artists, and I will PM you when new posts come out from these artists"""
|
||||
pass
|
||||
|
||||
@da.command(name='sub', aliases=['add', 'subscribe'])
|
||||
@utils.custom_perms(send_messages=True)
|
||||
async def da_sub(self, ctx, *, username):
|
||||
"""This can be used to add a feed to your notifications.
|
||||
Provide a username, and when posts are made from this user, you will be notified
|
||||
|
||||
EXAMPLE: !da sub MyFavoriteArtistEva<3
|
||||
RESULT: Notifications of amazing pics c:"""
|
||||
key = str(ctx.message.author.id)
|
||||
content = await utils.get_content('deviantart', key)
|
||||
# TODO: Ensure the user provided is a real user
|
||||
|
||||
if content is None:
|
||||
entry = {'member_id': str(ctx.message.author.id), 'subbed': [username], 'last_updated': {}}
|
||||
await utils.add_content('deviantart', entry)
|
||||
await ctx.send("You have just subscribed to {}!".format(username))
|
||||
elif content['subbed'] is None or username not in content['subbed']:
|
||||
if content['subbed'] is None:
|
||||
sub_list = [username]
|
||||
else:
|
||||
content['subbed'].append(username)
|
||||
sub_list = content['subbed']
|
||||
await utils.update_content('deviantart', {'subbed': sub_list}, key)
|
||||
await ctx.send("You have just subscribed to {}!".format(username))
|
||||
else:
|
||||
await ctx.send("You are already subscribed to that user!")
|
||||
|
||||
@da.command(pass_context=True, name='unsub', aliases=['delete', 'remove', 'unsubscribe'])
|
||||
@utils.custom_perms(send_messages=True)
|
||||
async def da_unsub(self, ctx, *, username):
|
||||
"""This command can be used to unsub from the specified user
|
||||
|
||||
EXAMPLE: !da unsub TheArtistWhoBetrayedMe
|
||||
RESULT: No more pics from that terrible person!"""
|
||||
key = (ctx.message.author.id)
|
||||
content = await utils.get_content('deviantart', key)
|
||||
|
||||
if content is None or content['subbed'] is None:
|
||||
await ctx.send("You are not subscribed to anyone at the moment!")
|
||||
elif username in content['subbed']:
|
||||
content['subbed'].remove(username)
|
||||
await utils.update_content('deviantart', {'subbed': content['subbed']}, key)
|
||||
await ctx.send("You have just unsubscribed from {}!".format(username))
|
||||
else:
|
||||
await ctx.send("You are not subscribed to that user!")
|
||||
|
||||
|
||||
def setup(bot):
|
||||
bot.add_cog(Deviantart(bot))
|
|
@ -59,7 +59,7 @@ class StatsUpdate:
|
|||
try:
|
||||
join_leave_on = server_settings['join_leave']
|
||||
if join_leave_on:
|
||||
channel_id = server_settings['notification_channel'] or member.guild.id
|
||||
channel_id = server_settings.get('notification_channel') or member.guild.id
|
||||
else:
|
||||
return
|
||||
except (IndexError, TypeError, KeyError):
|
||||
|
@ -75,7 +75,7 @@ class StatsUpdate:
|
|||
try:
|
||||
join_leave_on = server_settings['join_leave']
|
||||
if join_leave_on:
|
||||
channel_id = server_settings['notification_channel'] or member.guild.id
|
||||
channel_id = server_settings.get('notification_channel') or member.guild.id
|
||||
else:
|
||||
return
|
||||
except (IndexError, TypeError, KeyError):
|
||||
|
|
292
cogs/osu.py
292
cogs/osu.py
|
@ -3,6 +3,8 @@ from . import utils
|
|||
from discord.ext import commands
|
||||
import discord
|
||||
|
||||
from osuapi import OsuApi, AHConnector
|
||||
|
||||
# https://github.com/ppy/osu-api/wiki
|
||||
BASE_URL = 'https://osu.ppy.sh/api/'
|
||||
MAX_RETRIES = 5
|
||||
|
@ -11,157 +13,171 @@ MAX_RETRIES = 5
|
|||
class Osu:
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.key = utils.osu_key
|
||||
self.payload = {'k': self.key}
|
||||
self.api = OsuApi(utils.osu_key, connector=AHConnector())
|
||||
self.bot.loop.create_task(self.get_users())
|
||||
self.osu_users = {}
|
||||
|
||||
async def find_beatmap(self, query):
|
||||
"""Finds a beatmap ID based on the first match of searching a beatmap"""
|
||||
pass
|
||||
async def get_user(self, member, username):
|
||||
"""A function used to get and save user data in cache"""
|
||||
user = self.osu_users.get(member.id)
|
||||
if user is None:
|
||||
user = await self.get_user_from_api(username)
|
||||
if user is not None:
|
||||
self.osu_users[member.id] = user
|
||||
return user
|
||||
else:
|
||||
if user.username.lower() == username.lower():
|
||||
return user
|
||||
else:
|
||||
user = await self.get_user_from_api(username)
|
||||
if user is not None:
|
||||
self.osu_users[member.id] = user
|
||||
return user
|
||||
|
||||
async def get_beatmap(self, b_id):
|
||||
"""Gets beatmap info based on the ID provided"""
|
||||
payload = self.payload.copy()
|
||||
payload['b'] = b_id
|
||||
url = BASE_URL + 'get_beatmaps'
|
||||
data = await utils.request(url, payload=payload)
|
||||
async def get_user_from_api(self, username):
|
||||
"""A simple helper function to parse the list given and handle failures"""
|
||||
user = await self.api.get_user(username)
|
||||
try:
|
||||
return data[0]
|
||||
except (IndexError, TypeError):
|
||||
return user[0]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
async def get_users(self):
|
||||
"""A task used to 'cache' all member's and their Osu profile's"""
|
||||
data = await utils.get_content('osu')
|
||||
if data is None:
|
||||
return
|
||||
|
||||
for result in data:
|
||||
member = int(result['member_id'])
|
||||
user = await self.get_user_from_api(result['osu_username'])
|
||||
if user:
|
||||
self.osu_users[member] = user
|
||||
|
||||
@commands.group(invoke_without_command=True)
|
||||
@utils.custom_perms(send_messages=True)
|
||||
async def osu(self, ctx):
|
||||
pass
|
||||
async def osu(self, ctx, member: discord.Member=None):
|
||||
"""Provides basic information about a specific user
|
||||
|
||||
@osu.command(name='scores', aliases=['score'])
|
||||
@utils.custom_perms(send_messages=True)
|
||||
async def osu_user_scores(self, ctx, user, song=1):
|
||||
"""Used to get the top scores for a user
|
||||
You can provide either your Osu ID or your username
|
||||
However, if your username is only numbers, this will confuse the API
|
||||
If you have only numbers in your username, you will need to provide your ID
|
||||
This will by default return the top song, provide song [up to 100] to get that song, in order from best to worst
|
||||
EXAMPLE: !osu @Person
|
||||
RESULT: Informationa bout that person's osu account"""
|
||||
if member is None:
|
||||
member = ctx.message.author
|
||||
|
||||
EXAMPLE: !osu MyUsername 5
|
||||
RESULT: Info about your 5th best song"""
|
||||
|
||||
await ctx.send("Looking up your Osu information...")
|
||||
# To make this easy for the user, it's indexed starting at 1, so lets subtract by 1
|
||||
song -= 1
|
||||
# Make sure the song is not negative however, if so set to 0
|
||||
if song < 0:
|
||||
song = 0
|
||||
|
||||
# A list of the possible values we'll receive, that we want to display
|
||||
wanted_info = ['username', 'maxcombo', 'count300', 'count100', 'count50', 'countmiss',
|
||||
'perfect', 'enabled_mods', 'date', 'rank' 'pp', 'beatmap_title', 'beatmap_version',
|
||||
'max_combo', 'artist', 'difficulty']
|
||||
|
||||
# A couple of these aren't the best names to display, so setup a map to change these just a little bit
|
||||
key_map = {'maxcombo': 'combo',
|
||||
'count300': '300 hits',
|
||||
'count100': '100 hits',
|
||||
'count50': '50 hits',
|
||||
'countmiss': 'misses',
|
||||
'perfect': 'got_max_combo'}
|
||||
|
||||
payload = self.payload.copy()
|
||||
payload['u'] = user
|
||||
payload['limit'] = 100
|
||||
# The endpoint that we're accessing to get this information
|
||||
url = BASE_URL + 'get_user_beat'
|
||||
data = await utils.request(url, payload=payload)
|
||||
|
||||
try:
|
||||
data = data[song]
|
||||
except (IndexError, TypeError):
|
||||
if data is not None and len(data) == 0:
|
||||
await ctx.send("I could not find any top songs for the user {}".format(user))
|
||||
return
|
||||
else:
|
||||
data = data[len(data) - 1]
|
||||
|
||||
# There's a little bit more info that we need, some info specific to the beatmap.
|
||||
# Due to this we'll need to make a second request
|
||||
beatmap_data = await self.get_beatmap(data.get('beatmap_id', None))
|
||||
|
||||
# Lets add the extra data we want
|
||||
data['beatmap_title'] = beatmap_data.get('title')
|
||||
data['beatmap_version'] = beatmap_data.get('version')
|
||||
data['max_combo'] = beatmap_data.get('max_combo')
|
||||
data['artist'] = beatmap_data.get('artist')
|
||||
# Lets round this, no need for such a long number
|
||||
data['difficulty'] = round(float(beatmap_data.get('difficultyrating')), 2)
|
||||
|
||||
# Now lets create our dictionary needed to create the image
|
||||
# The dict comprehension we're using is simpler than it looks, it's simply they key: value
|
||||
# If the key is in our wanted_info list
|
||||
# We also get the wanted value from the key_map if it exists, using the key itself if it doesn't
|
||||
# We then title it and replace _ with a space to ensure nice formatting
|
||||
fmt = [(key_map.get(k, k).title().replace('_', ' '), v) for k, v in data.items() if k in wanted_info]
|
||||
|
||||
# Attempt to create our banner and upload that
|
||||
# If we can't find the images needed, or don't have permissions, just send a message instead
|
||||
try:
|
||||
banner = await utils.create_banner(ctx.message.author, "Osu User Stats", fmt)
|
||||
await self.bot.upload(banner)
|
||||
except (FileNotFoundError, discord.Forbidden):
|
||||
_fmt = "\n".join("{}: {}".format(k, r) for k, r in fmt)
|
||||
await ctx.send("```\n{}```".format(_fmt))
|
||||
|
||||
@osu.command(name='user', pass_context=True)
|
||||
@utils.custom_perms(send_messages=True)
|
||||
async def osu_user_info(self, ctx, *, user):
|
||||
"""Used to get information about a specific user
|
||||
You can provide either your Osu ID or your username
|
||||
However, if your username is only numbers, this will confuse the API
|
||||
If you have only numbers in your username, you will need to provide your ID
|
||||
|
||||
EXAMPLE: !osu user MyUserName
|
||||
RESULT: Info about your user"""
|
||||
|
||||
await ctx.send("Looking up your Osu information...")
|
||||
# A list of the possible values we'll receive, that we want to display
|
||||
wanted_info = ['username', 'playcount', 'ranked_score', 'pp_rank', 'level', 'pp_country_rank',
|
||||
'accuracy', 'country', 'pp_country_rank', 'count_rank_s', 'count_rank_a']
|
||||
|
||||
# A couple of these aren't the best names to display, so setup a map to change these just a little bit
|
||||
key_map = {'playcount': 'play_count',
|
||||
'count_rank_ss': 'total_SS_ranks',
|
||||
'count_rank_s': 'total_s_ranks',
|
||||
'count_rank_a': 'total_a_ranks'}
|
||||
|
||||
# The paramaters that we'll send to osu to get the information needed
|
||||
payload = self.payload.copy()
|
||||
payload['u'] = user
|
||||
# The endpoint that we're accessing to get this information
|
||||
url = BASE_URL + 'get_user'
|
||||
data = await utils.request(url, payload=payload)
|
||||
|
||||
# Make sure we found a result, we should only find one with the way we're searching
|
||||
try:
|
||||
data = data[0]
|
||||
except (IndexError, TypeError):
|
||||
await ctx.send("I could not find anyone with the user name/id of {}".format(user))
|
||||
user = self.osu_users[member.id]
|
||||
if user is None:
|
||||
await ctx.send("I do not have {}'s Osu user saved!".format(member.display_name))
|
||||
return
|
||||
|
||||
# Now lets create our dictionary needed to create the image
|
||||
# The dict comprehension we're using is simpler than it looks, it's simply they key: value
|
||||
# If the key is in our wanted_info list
|
||||
# We also get the wanted value from the key_map if it exists, using the key itself if it doesn't
|
||||
# We then title it and replace _ with a space to ensure nice formatting
|
||||
fmt = {key_map.get(k, k).title().replace('_', ' '): v for k, v in data.items() if k in wanted_info}
|
||||
e = discord.Embed(title='Osu profile for {}'.format(user.username))
|
||||
e.add_field(name='Rank', value="{:,}".format(user.pp_rank))
|
||||
e.add_field(name='Level', value=user.level)
|
||||
e.add_field(name='Performance Points', value="{:,}".format(user.pp_raw))
|
||||
e.add_field(name='Accuracy', value="{:.2%}".format(user.accuracy))
|
||||
e.add_field(name='SS Ranks', value="{:,}".format(user.count_rank_ss))
|
||||
e.add_field(name='S Ranks', value="{:,}".format(user.count_rank_s))
|
||||
e.add_field(name='A Ranks', value="{:,}".format(user.count_rank_a))
|
||||
e.add_field(name='Country', value=user.country)
|
||||
e.add_field(name='Country Rank', value="{:,}".format(user.pp_country_rank))
|
||||
e.add_field(name='Playcount', value="{:,}".format(user.playcount))
|
||||
e.add_field(name='Ranked Score', value="{:,}".format(user.ranked_score))
|
||||
e.add_field(name='Total Score', value="{:,}".format(user.total_score))
|
||||
|
||||
# Attempt to create our banner and upload that
|
||||
# If we can't find the images needed, or don't have permissions, just send a message instead
|
||||
await ctx.send(embed=e)
|
||||
|
||||
@osu.command(name='add', aliases=['create', 'connect'])
|
||||
@utils.custom_perms(send_messages=True)
|
||||
async def osu_add(self, ctx, *, username):
|
||||
"""Links an osu account to your discord account
|
||||
|
||||
EXAMPLE: !osu add username
|
||||
RESULT: Links your username to your account, and allows stats to be pulled from it"""
|
||||
author = ctx.message.author
|
||||
user = await self.get_user(author, username)
|
||||
if user is None:
|
||||
await ctx.send("I couldn't find an osu user that matches {}".format(username))
|
||||
return
|
||||
|
||||
entry = {
|
||||
'member_id': str(author.id),
|
||||
'osu_username': user.username
|
||||
}
|
||||
|
||||
if not await utils.add_content('osu', entry):
|
||||
await utils.update_content('osu', entry, str(author.id))
|
||||
|
||||
await ctx.send("I have just saved your Osu user {}".format(author.display_name))
|
||||
|
||||
@osu.command(name='score', aliases=['scores'])
|
||||
@utils.custom_perms(send_messages=True)
|
||||
async def osu_scores(self, ctx, *data):
|
||||
"""Find the top x osu scores for a provided member
|
||||
Note: You can only get the top 50 songs for a user
|
||||
|
||||
EXAMPLE: !osu scores @Person 5
|
||||
RESULT: The top 5 maps for the user @Person"""
|
||||
|
||||
# Set the defaults before we go through our passed data to figure out what we want
|
||||
limit = 5
|
||||
member = ctx.message.author
|
||||
# Only loop through the first 2....we care about nothing after that
|
||||
for piece in data[:2]:
|
||||
# First lets try to convert to an int, for limit
|
||||
try:
|
||||
limit = int(piece)
|
||||
# Since Osu's API returns no information about the beatmap on scores
|
||||
# We also need to call it to get the beatmap...in order to not get rate limited
|
||||
# Lets limit this to 50
|
||||
if limit > 50:
|
||||
limit = 50
|
||||
elif limit < 1:
|
||||
limit = 5
|
||||
except:
|
||||
converter = commands.converter.MemberConverter()
|
||||
converter.prepare(ctx, piece)
|
||||
try:
|
||||
member = converter.convert()
|
||||
except commands.converter.BadArgument:
|
||||
pass
|
||||
|
||||
user = self.osu_users.get(member.id)
|
||||
if user is None:
|
||||
await ctx.send("I don't have that user's Osu account saved!")
|
||||
return
|
||||
|
||||
scores = await self.api.get_user_best(user.username, limit=limit)
|
||||
entries = []
|
||||
for i in scores:
|
||||
m = await self.api.get_beatmaps(beatmap_id=i.beatmap_id)
|
||||
m = m[0]
|
||||
entry = {
|
||||
'title': 'Top {} Osu scores for {}'.format(limit, member.display_name),
|
||||
'fields': [
|
||||
{'name': 'Artist', 'value': m.artist},
|
||||
{'name': 'Title', 'value': m.title},
|
||||
{'name': 'Creator', 'value': m.creator},
|
||||
{'name': 'CS (Circle Size)', 'value': m.diff_size},
|
||||
{'name': 'AR (Approach Rate)', 'value': m.diff_approach},
|
||||
{'name': 'HP (Health Drain)', 'value': m.diff_drain},
|
||||
{'name': 'OD (Overall Difficulty)', 'value': m.diff_overall},
|
||||
{'name': 'Length', 'value': m.total_length},
|
||||
{'name': 'Score', 'value': i.score},
|
||||
{'name': 'Max Combo', 'value': i.maxcombo},
|
||||
{'name': 'Hits', 'value': "{}/{}/{}/{} (300/100/50/misses)".format(i.count300, i.count100, i.count50, i.countmiss), "inline": False},
|
||||
{'name': 'Perfect', 'value': "Yes" if i.perfect else "No"},
|
||||
{'name': 'Rank', 'value': i.rank},
|
||||
{'name': 'PP', 'value': i.pp},
|
||||
{'name': 'Mods', 'value': str(i.enabled_mods)},
|
||||
{'name': 'Date', 'value': str(i.date)}
|
||||
]
|
||||
}
|
||||
|
||||
entries.append(entry)
|
||||
try:
|
||||
banner = await utils.create_banner(ctx.message.author, "Osu User Stats", fmt)
|
||||
await ctx.send(file=banner)
|
||||
except (FileNotFoundError, discord.Forbidden):
|
||||
_fmt = "\n".join("{}: {}".format(k, r) for k, r in fmt.items())
|
||||
await ctx.send("```\n{}```".format(_fmt))
|
||||
|
||||
pages = utils.DetailedPages(self.bot, message=ctx.message, entries=entries)
|
||||
await pages.paginate()
|
||||
except utils.CannotPaginate as e:
|
||||
await ctx.send(str(e))
|
||||
|
||||
def setup(bot):
|
||||
bot.add_cog(Osu(bot))
|
||||
|
|
156
cogs/picarto.py
156
cogs/picarto.py
|
@ -17,95 +17,73 @@ BASE_URL = 'https://ptvappapi.picarto.tv'
|
|||
api_key = '03e26294-b793-11e5-9a41-005056984bd4'
|
||||
|
||||
|
||||
async def online_users():
|
||||
try:
|
||||
# Someone from picarto contacted me and told me their database queries are odd
|
||||
# It is more efficent on their end to make a query for all online users, and base checks off that
|
||||
# In place of requesting for /channel and checking if that is online currently, for each channel
|
||||
# This method is in place to just return all online_users
|
||||
url = BASE_URL + '/online/all'
|
||||
payload = {'key': api_key}
|
||||
return await utils.request(url, payload=payload)
|
||||
except:
|
||||
return {}
|
||||
|
||||
|
||||
def check_online(online_channels, channel):
|
||||
# online_channels is the dictionary of all users online currently
|
||||
# And channel is the name we are checking against that
|
||||
# This creates a list of all users that match this channel name (should only ever be 1)
|
||||
# And returns True as long as it is more than 0
|
||||
matches = [stream for stream in online_channels if stream['channel_name'].lower() == channel.lower()]
|
||||
return len(matches) > 0
|
||||
|
||||
|
||||
class Picarto:
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
|
||||
async def get_online_users(self):
|
||||
# This method is in place to just return all online users so we can compare against it
|
||||
url = BASE_URL + '/online/all'
|
||||
payload = {'key': api_key}
|
||||
self.online_channels = await utils.request(url, payload=payload)
|
||||
|
||||
def channel_online(self, channel):
|
||||
# Channel is the name we are checking against that
|
||||
# This creates a list of all users that match this channel name (should only ever be 1)
|
||||
# And returns True as long as it is more than 0
|
||||
channel = re.search("(?<=picarto.tv/)(.*)", channel).group(1)
|
||||
matches = [stream for stream in self.online_channels if stream['channel_name'].lower() == channel.lower()]
|
||||
return len(matches) > 0
|
||||
|
||||
async def check_channels(self):
|
||||
await self.bot.wait_until_ready()
|
||||
# This is a loop that runs every 30 seconds, checking if anyone has gone online
|
||||
try:
|
||||
while not self.bot.is_closed:
|
||||
r_filter = {'notifications_on': 1}
|
||||
picarto = await utils.filter_content('picarto', r_filter)
|
||||
# Get all online users before looping, so that only one request is needed
|
||||
online_users_list = await online_users()
|
||||
old_online_users = {data['member_id']: data for data in picarto if data['live']}
|
||||
old_offline_users = {data['member_id']: data for data in picarto if not data['live']}
|
||||
|
||||
for m_id, result in old_offline_users.items():
|
||||
# Get their url and their user based on that url
|
||||
url = result['picarto_url']
|
||||
user = re.search("(?<=picarto.tv/)(.*)", url).group(1)
|
||||
# Check if they are online right now
|
||||
if check_online(online_users_list, user):
|
||||
for guild_id in result['servers']:
|
||||
# Get the channel to send the message to, based on the saved alert's channel
|
||||
guild = self.bot.get_guild(guild_id)
|
||||
if guild is None:
|
||||
while not self.bot.is_closed():
|
||||
await self.get_online_users()
|
||||
picarto = await utils.filter_content('picarto', {'notifications_on': 1})
|
||||
for data in picarto:
|
||||
m_id = int(data['member_id'])
|
||||
url = data['picarto_url']
|
||||
# Check if they are online
|
||||
online = self.channel_online(url)
|
||||
# If they're currently online, but saved as not then we'll send our notification
|
||||
if online and data['live'] == 0:
|
||||
for s_id in data['servers']:
|
||||
server = self.bot.get_guild(int(s_id))
|
||||
if server is None:
|
||||
continue
|
||||
guild_alerts = await utils.get_content('server_alerts', {'server_id': guild_id})
|
||||
try:
|
||||
channel_id = guild_alerts['channel_id']
|
||||
except (IndexError, TypeError):
|
||||
channel_id = guild_id
|
||||
channel = self.bot.get_channel(channel_id)
|
||||
# Get the member that has just gone live
|
||||
member = guild.get_member(m_id)
|
||||
member = server.get_member(m_id)
|
||||
if member is None:
|
||||
continue
|
||||
|
||||
fmt = "{} has just gone live! View their stream at {}".format(member.display_name, url)
|
||||
await channel.send(fmt)
|
||||
await utils.update_content('picarto', {'live': 1}, {'member_id': m_id})
|
||||
for m_id, result in old_online_users.items():
|
||||
# Get their url and their user based on that url
|
||||
url = result['picarto_url']
|
||||
user = re.search("(?<=picarto.tv/)(.*)", url).group(1)
|
||||
# Check if they are online right now
|
||||
if not check_online(online_users_list, user):
|
||||
for guild_id in result['servers']:
|
||||
# Get the channel to send the message to, based on the saved alert's channel
|
||||
guild = self.bot.get_guild(guild_id)
|
||||
if guild is None:
|
||||
server_settings = await utils.get_content('server_settings', s_id)
|
||||
if server_settings is not None:
|
||||
channel_id = int(server_settings.get('notification_channel', s_id))
|
||||
else:
|
||||
channel_id = int(s_id)
|
||||
channel = server.get_channel(channel_id)
|
||||
await channel.send("{} has just gone live! View their stream at <{}>".format(member.display_name, data['picarto_url']))
|
||||
self.bot.loop.create_task(utils.update_content('picarto', {'live': 1}, str(m_id)))
|
||||
elif not online and data['live'] == 1:
|
||||
for s_id in data['servers']:
|
||||
server = self.bot.get_guild(int(s_id))
|
||||
if server is None:
|
||||
continue
|
||||
guild_alerts = await utils.get_content('server_alerts', {'server_id': guild_id})
|
||||
try:
|
||||
channel_id = guild_alerts['channel_id']
|
||||
except (IndexError, TypeError):
|
||||
channel_id = guild_id
|
||||
channel = self.bot.get_channel(channel_id)
|
||||
# Get the member that has just gone live
|
||||
member = guild.get_member(m_id)
|
||||
member = server.get_member(m_id)
|
||||
if member is None:
|
||||
continue
|
||||
|
||||
fmt = "{} has just gone offline! Catch them next time they stream at {}".format(
|
||||
member.display_name, url)
|
||||
await channel.send(fmt)
|
||||
await utils.update_content('picarto', {'live': 0}, {'member_id': m_id})
|
||||
server_settings = await utils.get_content('server_settings', s_id)
|
||||
if server_settings is not None:
|
||||
channel_id = int(server_settings.get('notification_channel', s_id))
|
||||
else:
|
||||
channel_id = int(s_id)
|
||||
channel = server.get_channel(channel_id)
|
||||
await channel.send("{} has just gone offline! View their stream next time at <{}>".format(member.display_name, data['picarto_url']))
|
||||
self.bot.loop.create_task(utils.update_content('picarto', {'live': 0}, str(m_id)))
|
||||
await asyncio.sleep(30)
|
||||
except Exception as e:
|
||||
tb = traceback.format_exc()
|
||||
|
@ -119,6 +97,8 @@ class Picarto:
|
|||
|
||||
EXAMPLE: !picarto @otherPerson
|
||||
RESULT: Info about their picarto stream"""
|
||||
await ctx.message.channel.trigger_typing()
|
||||
|
||||
# If member is not given, base information on the author
|
||||
member = member or ctx.message.author
|
||||
picarto_entry = await utils.get_content('picarto', str(member.id))
|
||||
|
@ -134,22 +114,31 @@ class Picarto:
|
|||
payload = {'key': api_key}
|
||||
|
||||
data = await utils.request(url, payload=payload)
|
||||
if data is None:
|
||||
await ctx.send("I couldn't connect to Picarto!")
|
||||
return
|
||||
|
||||
# Not everyone has all these settings, so use this as a way to print information if it does, otherwise ignore it
|
||||
things_to_print = ['channel', 'commissions_enabled', 'is_nsfw', 'program', 'tablet', 'followers',
|
||||
'content_type']
|
||||
# Using title and replace to provide a nice way to print the data
|
||||
fmt = "\n".join(
|
||||
"{}: {}".format(i.title().replace("_", " "), result) for i, result in data.items() if i in things_to_print)
|
||||
|
||||
embed = discord.Embed(title='{}\'s Picarto'.format(data['channel']), url=url)
|
||||
if data['avatar_url']:
|
||||
embed.set_thumbnail(url=data['avatar_url'])
|
||||
|
||||
for i, result in data.items():
|
||||
if i in things_to_print and str(result):
|
||||
i = i.title().replace('_', ' ')
|
||||
embed.add_field(name=i, value=str(result))
|
||||
|
||||
# Social URL's can be given if a user wants them to show
|
||||
# Print them if they exist, otherwise don't try to include them
|
||||
social_links = data.get('social_urls')
|
||||
if social_links:
|
||||
fmt2 = "\n".join(
|
||||
"\t{}: {}".format(i.title().replace("_", " "), result) for i, result in social_links.items())
|
||||
fmt = "{}\nSocial Links:\n{}".format(fmt, fmt2)
|
||||
await ctx.send("Picarto stats for {}: ```\n{}```".format(member.display_name, fmt))
|
||||
|
||||
for i, result in data['social_urls'].items():
|
||||
embed.add_field(name=i.title(), value=result)
|
||||
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
@picarto.command(name='add', no_pm=True)
|
||||
@utils.custom_perms(send_messages=True)
|
||||
|
@ -158,13 +147,15 @@ class Picarto:
|
|||
|
||||
EXAMPLE: !picarto add MyUsername
|
||||
RESULT: Your picarto stream is saved, and notifications should go to this guild"""
|
||||
await ctx.message.channel.trigger_typing()
|
||||
|
||||
# This uses a lookbehind to check if picarto.tv exists in the url given
|
||||
# If it does, it matches picarto.tv/user and sets the url as that
|
||||
# Then (in the else) add https://www. to that
|
||||
# Otherwise if it doesn't match, we'll hit an AttributeError due to .group(0)
|
||||
# This means that the url was just given as a user (or something complete invalid)
|
||||
# So set URL as https://www.picarto.tv/[url]
|
||||
# Even if this was invalid such as https://www.picarto.tv/twitch.tv/user
|
||||
# Even if this was invalid such as https://www.picarto.tv/picarto.tv/user
|
||||
# For example, our next check handles that
|
||||
try:
|
||||
url = re.search("((?<=://)?picarto.tv/)+(.*)", url).group(0)
|
||||
|
@ -173,9 +164,10 @@ class Picarto:
|
|||
else:
|
||||
url = "https://www.{}".format(url)
|
||||
channel = re.search("https://www.picarto.tv/(.*)", url).group(1)
|
||||
url = BASE_URL + '/channel/{}'.format(channel)
|
||||
api_url = BASE_URL + '/channel/{}'.format(channel)
|
||||
payload = {'key': api_key}
|
||||
data = await utils.request(url, payload=payload)
|
||||
|
||||
data = await utils.request(api_url, payload=payload)
|
||||
if not data:
|
||||
await ctx.send("That Picarto user does not exist! What would be the point of adding a nonexistant Picarto "
|
||||
"user? Silly")
|
||||
|
|
|
@ -23,8 +23,9 @@ class Twitch:
|
|||
self.key = utils.twitch_key
|
||||
self.params = {'client_id': self.key}
|
||||
|
||||
async def channel_online(self, channel: str):
|
||||
async def channel_online(self, twitch_url: str):
|
||||
# Check a specific channel's data, and get the response in text format
|
||||
channel = re.search("(?<=twitch.tv/)(.*)", twitch_url).group(1)
|
||||
url = "https://api.twitch.tv/kraken/streams/{}".format(channel)
|
||||
|
||||
response = await utils.request(url, payload=self.params)
|
||||
|
@ -42,60 +43,46 @@ class Twitch:
|
|||
await self.bot.wait_until_ready()
|
||||
# Loop through as long as the bot is connected
|
||||
try:
|
||||
while not self.bot.is_closed:
|
||||
while not self.bot.is_closed():
|
||||
twitch = await utils.filter_content('twitch', {'notifications_on': 1})
|
||||
# Online/offline is based on whether they are set to such, in the utils file
|
||||
# This means they were detected as online/offline before and we check for a change
|
||||
online_users = {data['member_id']: data for data in twitch if data['live']}
|
||||
offline_users = {data['member_id']: data for data in twitch if not data['live']}
|
||||
for m_id, result in offline_users.items():
|
||||
# Get their url and their user based on that url
|
||||
url = result['twitch_url']
|
||||
user = re.search("(?<=twitch.tv/)(.*)", url).group(1)
|
||||
# Check if they are online right now
|
||||
if await self.channel_online(user):
|
||||
for server_id in result['servers']:
|
||||
# Get the channel to send the message to, based on the saved alert's channel
|
||||
server = self.bot.get_server(server_id)
|
||||
for data in twitch:
|
||||
m_id = int(data['member_id'])
|
||||
url = data['twitch_url']
|
||||
# Check if they are online
|
||||
online = await self.channel_online(url)
|
||||
# If they're currently online, but saved as not then we'll send our notification
|
||||
if online and data['live'] == 0:
|
||||
for s_id in data['servers']:
|
||||
server = self.bot.get_guild(int(s_id))
|
||||
if server is None:
|
||||
continue
|
||||
server_settings = await utils.get_content('server_settings', server_id)
|
||||
try:
|
||||
channel_id = server_settings['notification_channel']
|
||||
except (IndexError, TypeError):
|
||||
channel_id = server_id
|
||||
channel = self.bot.get_channel(channel_id)
|
||||
# Get the member that has just gone live
|
||||
member = server.get_member(m_id)
|
||||
if member is None:
|
||||
continue
|
||||
|
||||
fmt = "{} has just gone live! View their stream at {}".format(member.display_name, url)
|
||||
await channel.send(fmt)
|
||||
await utils.update_content('twitch', {'live': 1}, {'member_id': m_id})
|
||||
for m_id, result in online_users.items():
|
||||
# Get their url and their user based on that url
|
||||
url = result['twitch_url']
|
||||
user = re.search("(?<=twitch.tv/)(.*)", url).group(1)
|
||||
# Check if they are online right now
|
||||
if not await self.channel_online(user):
|
||||
for server_id in result['servers']:
|
||||
# Get the channel to send the message to, based on the saved alert's channel
|
||||
server = self.bot.get_server(server_id)
|
||||
server_settings = await utils.get_content('server_settings', s_id)
|
||||
if server_settings is not None:
|
||||
channel_id = int(server_settings.get('notification_channel', s_id))
|
||||
else:
|
||||
channel_id = int(s_id)
|
||||
channel = server.get_channel(channel_id)
|
||||
await channel.send("{} has just gone live! View their stream at <{}>".format(member.display_name, data['twitch_url']))
|
||||
self.bot.loop.create_task(utils.update_content('twitch', {'live': 1}, str(m_id)))
|
||||
elif not online and data['live'] == 1:
|
||||
for s_id in data['servers']:
|
||||
server = self.bot.get_guild(int(s_id))
|
||||
if server is None:
|
||||
continue
|
||||
server_settings = await utils.get_content('server_settings', server_id)
|
||||
try:
|
||||
channel_id = server_settings['notification_channel']
|
||||
except (IndexError, TypeError):
|
||||
channel_id = server_id
|
||||
channel = self.bot.get_channel(channel_id)
|
||||
# Get the member that has just gone live
|
||||
member = server.get_member(m_id)
|
||||
fmt = "{} has just gone offline! Catch them next time they stream at {}".format(
|
||||
member.display_name, url)
|
||||
await channel.send(fmt)
|
||||
await utils.update_content('twitch', {'live': 0}, {'member_id': m_id})
|
||||
if member is None:
|
||||
continue
|
||||
server_settings = await utils.get_content('server_settings', s_id)
|
||||
if server_settings is not None:
|
||||
channel_id = int(server_settings.get('notification_channel', s_id))
|
||||
else:
|
||||
channel_id = int(s_id)
|
||||
channel = server.get_channel(channel_id)
|
||||
await channel.send("{} has just gone offline! View their stream next time at <{}>".format(member.display_name, data['twitch_url']))
|
||||
self.bot.loop.create_task(utils.update_content('twitch', {'live': 0}, str(m_id)))
|
||||
await asyncio.sleep(30)
|
||||
except Exception as e:
|
||||
tb = traceback.format_exc()
|
||||
|
@ -109,6 +96,8 @@ class Twitch:
|
|||
|
||||
EXAMPLE: !twitch @OtherPerson
|
||||
RESULT: Information about their twitch URL"""
|
||||
await ctx.message.channel.trigger_typing()
|
||||
|
||||
if member is None:
|
||||
member = ctx.message.author
|
||||
|
||||
|
@ -143,6 +132,8 @@ class Twitch:
|
|||
|
||||
EXAMPLE: !twitch add MyTwitchName
|
||||
RESULT: Saves your twitch URL; notifications will be sent to this server when you go live"""
|
||||
await ctx.message.channel.trigger_typing()
|
||||
|
||||
# This uses a lookbehind to check if twitch.tv exists in the url given
|
||||
# If it does, it matches twitch.tv/user and sets the url as that
|
||||
# Then (in the else) add https://www. to that
|
||||
|
|
|
@ -3,4 +3,4 @@ from .checks import is_owner, custom_perms, db_check
|
|||
from .config import *
|
||||
from .utilities import *
|
||||
from .images import create_banner
|
||||
from .paginator import Pages, CannotPaginate
|
||||
from .paginator import Pages, CannotPaginate, DetailedPages
|
||||
|
|
|
@ -12,13 +12,13 @@ required_tables = {
|
|||
'battle_records': 'member_id',
|
||||
'boops': 'member_id',
|
||||
'command_usage': 'command',
|
||||
'deviantart': 'member_id',
|
||||
'motd': 'date',
|
||||
'overwatch': 'member_id',
|
||||
'picarto': 'member_id',
|
||||
'server_settings': 'server_id',
|
||||
'raffles': 'id',
|
||||
'strawpolls': 'server_id',
|
||||
'osu': 'member_id',
|
||||
'tags': 'server_id',
|
||||
'tictactoe': 'member_id',
|
||||
'twitch': 'member_id'
|
||||
|
|
|
@ -66,7 +66,10 @@ class Pages:
|
|||
|
||||
if not first:
|
||||
self.embed.description = '\n'.join(p)
|
||||
await self.message.edit(embed=self.embed)
|
||||
try:
|
||||
await self.message.edit(embed=self.embed)
|
||||
except discord.NotFound:
|
||||
self.paginating = False
|
||||
return
|
||||
|
||||
# verify we can actually use the pagination session
|
||||
|
@ -176,7 +179,7 @@ class Pages:
|
|||
self.paginating = False
|
||||
|
||||
def react_check(self, reaction, user):
|
||||
if user is None or user.id != self.author.id or reaction.message != self.message:
|
||||
if user is None or user.id != self.author.id:
|
||||
return False
|
||||
|
||||
for (emoji, func) in self.reaction_emojis:
|
||||
|
@ -191,7 +194,7 @@ class Pages:
|
|||
|
||||
while self.paginating:
|
||||
try:
|
||||
react = await self.bot.wait_for('reaction_add', check=self.react_check, timeout=120.0)
|
||||
react, user = await self.bot.wait_for('reaction_add', check=self.react_check, timeout=120.0)
|
||||
except asyncio.TimeoutError:
|
||||
react = None
|
||||
if react is None:
|
||||
|
@ -204,8 +207,69 @@ class Pages:
|
|||
break
|
||||
|
||||
try:
|
||||
await self.message.remove_reaction(react.reaction.emoji, react.user)
|
||||
await self.message.remove_reaction(react.emoji, user)
|
||||
except:
|
||||
pass # can't remove it so don't bother doing so
|
||||
|
||||
await self.match()
|
||||
|
||||
class DetailedPages(Pages):
|
||||
"""A class built on the normal Paginator, except with the idea that you want one 'thing' per page
|
||||
This allows the ability to have more data on a page, more fields, etc. and page through each 'thing'"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs['per_page'] = 1
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def get_page(self, page):
|
||||
return self.entries[page - 1]
|
||||
|
||||
async def show_page(self, page, *, first=False):
|
||||
self.current_page = page
|
||||
entries = self.get_page(page)
|
||||
|
||||
self.embed.set_footer(text='Page %s/%s (%s entries)' % (page, self.maximum_pages, len(self.entries)))
|
||||
self.embed.clear_fields()
|
||||
self.embed.description = ""
|
||||
|
||||
for key, value in entries.items():
|
||||
if key == 'fields':
|
||||
for f in value:
|
||||
self.embed.add_field(name=f.get('name'), value=f.get('value'), inline=f.get('inline', True))
|
||||
else:
|
||||
setattr(self.embed, key, value)
|
||||
|
||||
if not self.paginating:
|
||||
return await self.message.channel.send(embed=self.embed)
|
||||
|
||||
if not first:
|
||||
try:
|
||||
await self.message.edit(embed=self.embed)
|
||||
except discord.NotFound:
|
||||
self.paginating = False
|
||||
return
|
||||
|
||||
# verify we can actually use the pagination session
|
||||
if not self.permissions.add_reactions:
|
||||
raise CannotPaginate('Bot does not have add reactions permission.')
|
||||
|
||||
if not self.permissions.read_message_history:
|
||||
raise CannotPaginate('Bot does not have Read Message History permission.')
|
||||
|
||||
if self.embed.description:
|
||||
self.embed.description += '\nConfused? React with \N{INFORMATION SOURCE} for more info.'
|
||||
else:
|
||||
self.embed.description = '\nConfused? React with \N{INFORMATION SOURCE} for more info.'
|
||||
|
||||
self.message = await self.message.channel.send(embed=self.embed)
|
||||
for (reaction, _) in self.reaction_emojis:
|
||||
if self.maximum_pages == 2 and reaction in ('\u23ed', '\u23ee'):
|
||||
# no |<< or >>| buttons if we only have two pages
|
||||
# we can't forbid it if someone ends up using it but remove
|
||||
# it from the default set
|
||||
continue
|
||||
try:
|
||||
await self.message.add_reaction(reaction)
|
||||
except discord.NotFound:
|
||||
# If the message isn't found, we don't care about clearing anything
|
||||
return
|
||||
|
|
Loading…
Reference in a new issue