1
0
Fork 0
mirror of synced 2024-05-18 19:42:28 +12:00
Bonfire/cogs/twitch.py

163 lines
7.2 KiB
Python

from discord.ext import commands
from .utils import config
from .utils import checks
import urllib.request
import urllib.parse
import asyncio
import discord
import json
import re
def channelOnline(channel: str):
url = "https://api.twitch.tv/kraken/streams/{}".format(channel)
response = urllib.request.urlopen(url)
data = json.loads(response.read().decode('utf-8'))
return data['stream'] is not None
class Twitch:
"""Class for some twitch integration
You can add or remove your twitch stream for your user
I will then notify the server when you have gone live or offline"""
def __init__(self, bot):
self.bot = bot
async def checkChannels(self):
await self.bot.wait_until_ready()
while not self.bot.is_closed:
twitch = config.getContent('twitch')
for id,r in twitch.items():
server = discord.utils.find(lambda s: s.id == r['server_id'], self.bot.servers)
member = discord.utils.find(lambda m: m.id == r['user_id'], server.members)
url = r['twitch_url']
live = r['live']
notify = r['notifications_on']
user = re.search("(?<=twitch.tv/)(.*)", url).group(1)
if not live and notify and channelOnline(user):
twitch[id]['live'] = 1
await self.bot.send_message(server, "{} has just gone live! "
"View their stream at {}".format(member.name, url))
elif live and not channelOnline(user):
twitch[id]['live'] = 0
await self.bot.send_message(server,
"{} has just gone offline! Catch them next time they stream at {}"
.format(member.name, url))
config.closeConnection()
await asyncio.sleep(30)
@commands.group(no_pm=True, invoke_without_command=True)
@checks.customPermsOrRole("none")
async def twitch(self, *, member: discord.Member=None):
"""Use this command to check the twitch info of a user"""
if member is not None:
twitch = config.getContent('twitch')
result = twitch.get(ctx.message.author.id)
if result is not None:
url = result['twitch_url']
user = re.search("(?<=twitch.tv/)(.*)", url).group(1)
result = urllib.request.urlopen("https://api.twitch.tv/kraken/channels/{}".format(user))
data = json.loads(result.read().decode('utf-8'))
fmt = "Username: {}".format(data['display_name'])
fmt += "\nStatus: {}".format(data['status'])
fmt += "\nFollowers: {}".format(data['followers'])
fmt += "\nURL: {}".format(url)
await self.bot.say("```{}```".format(fmt))
else:
await self.bot.say("{} has not saved their twitch URL yet!".format(member.name))
@twitch.command(name='add', pass_context=True, no_pm=True)
@checks.customPermsOrRole("none")
async def add_twitch_url(self, ctx, url: str):
"""Saves your user's twitch URL"""
try:
url = re.search("((?<=://)?twitch.tv/)+(.*)", url).group(0)
except AttributeError:
url = "https://www.twitch.tv/{}".format(url)
else:
url = "https://www.{}".format(url)
try:
urllib.request.urlopen(url)
except urllib.request.HTTPError:
await self.bot.say("That twitch user does not exist! "
"What would be the point of adding a nonexistant twitch user? Silly")
return
twitch = config.getContent('twitch')
result = twitch.get(ctx.message.author.id)
if result is not None:
twitch[ctx.message.author.id]['twitch_url'] = result
else:
twitch[ctx.message.author.id] = {'twitch_url':url,'server_id':ctx.message.server.id,'notifications_on': 1,'live':0}
config.saveContent('twitch',twitch)
await self.bot.say("I have just saved your twitch url {}".format(ctx.message.author.mention))
@twitch.command(name='remove', aliases=['delete'], pass_context=True, no_pm=True)
@checks.customPermsOrRole("none")
async def remove_twitch_url(self, ctx):
"""Removes your twitch URL"""
twitch = config.getContent('twitch')
if twitch.get(ctx.message.author.id) is not None:
del twitch[ctx.message.author.id]
config.saveContent('twitch',twitch)
await self.bot.say("I am no longer saving your twitch URL {}".format(ctx.message.author.mention))
else:
await self.bot.say(
"I do not have your twitch URL added {}. You can save your twitch url with !twitch add".format(
ctx.message.author.mention))
@twitch.group(pass_context=True, no_pm=True, invoke_without_command=True)
@checks.customPermsOrRole("none")
async def notify(self, ctx):
"""This can be used to turn notifications on or off"""
pass
@notify.command(name='on', aliases=['start,yes'], pass_context=True, no_pm=True)
@checks.customPermsOrRole("none")
async def notify_on(self, ctx):
"""Turns twitch notifications on"""
twitch = config.getContent('twitch')
result = twitch.get(ctx.message.author.id)
if result is None:
await self.bot.say(
"I do not have your twitch URL added {}. You can save your twitch url with !twitch add".format(
ctx.message.author.mention))
elif result['notifications_on']:
await self.bot.say("What do you want me to do, send two notifications? Not gonna happen {}".format(
ctx.message.author.mention))
else:
twitch[ctx.message.author.id]['notifications_on'] = 1
config.saveContent('twitch',twitch)
await self.bot.say("I will notify if you go live {}, you'll get a bajillion followers I promise c:".format(
ctx.message.author.mention))
@notify.command(name='off', aliases=['stop,no'], pass_context=True, no_pm=True)
@checks.customPermsOrRole("none")
async def notify_off(self, ctx):
"""Turns twitch notifications off"""
twitch = config.getContent('twitch')
if twitch.get(ctx.message.author.id) is None:
await self.bot.say(
"I do not have your twitch URL added {}. You can save your twitch url with !twitch add".format(
ctx.message.author.mention))
elif not result['notifications_on']:
await self.bot.say("I am already set to not notify if you go live! Pay attention brah {}".format(
ctx.message.author.mention))
else:
twitch[ctx.message.author.id]['notifications_on'] = 0
config.saveContent('twitch',twitch)
await self.bot.say(
"I will not notify if you go live anymore {}, "
"are you going to stream some lewd stuff you don't want people to see?~".format(
ctx.message.author.mention))
def setup(bot):
t = Twitch(bot)
config.loop.create_task(t.checkChannels())
bot.add_cog(Twitch(bot))