1
0
Fork 0
mirror of synced 2024-04-29 10:12:29 +12:00
Bonfire/cogs/spotify.py

99 lines
3.5 KiB
Python

import asyncio
import aiohttp
import traceback
from discord.ext import commands
from base64 import urlsafe_b64encode
import utils
class Spotify(commands.Cog):
"""Pretty self-explanatory"""
def __init__(self, bot):
self.bot = bot
self._token = None
self._client_id = utils.spotify_id or ""
self._client_secret = utils.spotify_secret or ""
self._authorization = "{}:{}".format(self._client_id, self._client_secret)
self.headers = {
"Authorization": "Basic {}".format(
urlsafe_b64encode(self._authorization.encode()).decode()
)
}
self.task = self.bot.loop.create_task(self.api_token_task())
async def api_token_task(self):
while True:
delay = 2400
try:
delay = await self.get_api_token()
except Exception as error:
with open("error_log", "a") as f:
traceback.print_tb(error.__traceback__, file=f)
print("{0.__class__.__name__}: {0}".format(error), file=f)
finally:
await asyncio.sleep(delay)
async def get_api_token(self):
url = "https://accounts.spotify.com/api/token"
opts = {"grant_type": "client_credentials"}
async with aiohttp.ClientSession(headers=self.headers) as session:
response = await session.post(url, data=opts)
data = await response.json()
self._token = data.get("access_token")
return data.get("expires_in")
@commands.group(invoke_without_command=True)
@utils.can_run(send_messages=True)
async def spotify(self, ctx, *, query):
"""Searches Spotify for a song, giving you the link you can use to listen in. Give the query to search for
and it will search by title/artist for the best match
EXAMPLE: !spotify Eminem
RESULT: Some Eminem song"""
# Setup the headers with the token that should be here
headers = {"Authorization": "Bearer {}".format(self._token)}
opts = {"q": query, "type": "track"}
url = "https://api.spotify.com/v1/search"
response = await utils.request(url, headers=headers, payload=opts)
try:
await ctx.send(
response.get("tracks")
.get("items")[0]
.get("external_urls")
.get("spotify")
)
except (KeyError, AttributeError, IndexError):
await ctx.send("Couldn't find a song for:\n{}".format(query))
@spotify.command()
@utils.can_run(send_messages=True)
async def playlist(self, ctx, *, query):
"""Searches Spotify for a playlist, giving you the link you can use to listen in. Give the query to search for
and it will search for the best match
EXAMPLE: !spotify Eminem
RESULT: Some Eminem song"""
# Setup the headers with the token that should be here
headers = {"Authorization": "Bearer {}".format(self._token)}
opts = {"q": query, "type": "playlist"}
url = "https://api.spotify.com/v1/search"
response = await utils.request(url, headers=headers, payload=opts)
try:
await ctx.send(
response.get("playlists")
.get("items")[0]
.get("external_urls")
.get("spotify")
)
except (KeyError, AttributeError, IndexError):
await ctx.send("Couldn't find a song for:\n{}".format(query))
def setup(bot):
bot.add_cog(Spotify(bot))