1
0
Fork 0
mirror of synced 2024-05-18 19:42:28 +12:00
Bonfire/cogs/voice_utilities/source.py
2017-08-06 16:16:43 -05:00

172 lines
6.4 KiB
Python

import discord
import time
import asyncio
from .exceptions import ExtractionError, WrongEntryTypeError, LiveStreamError
from .entry import get_header
class YoutubeDLSource(discord.FFmpegPCMAudio):
def __init__(self, playlist, url):
self._process = None
self.playlist = playlist
self.loop = playlist.loop
self.downloader = playlist.downloader
self.url = url
self.info = None
self.ready = False
self.error = False
async def get_info(self):
try:
# First attempt to gather the information
info = await self.downloader.extract_info(self.loop, self.url, download=False)
except Exception as e:
raise ExtractionError('Could not extract information from {}\n\n{}'.format(self.url, e))
# Check if a playlist was provided
if info.get('_type', None) == 'playlist':
# It is possible that the 'playlist' is the search
if info.get('extractor') == 'youtube:search':
# If so, and we have no entries, then nothing with this search was found
if len(info['entries']) == 0:
raise ExtractionError('Could not extract information from %s' % self.url)
# Otherwise get the first result
else:
info = info['entries'][0]
# If this isn't a search, then it is a playlist, this can't be done
else:
raise WrongEntryTypeError("This is a playlist.", True,
info.get('webpage_url', None) or info.get('url', None))
if info['extractor'] in ['generic', 'Dropbox']:
try:
# unfortunately this is literally broken
# https://github.com/KeepSafe/aiohttp/issues/758
# https://github.com/KeepSafe/aiohttp/issues/852
headers = await get_header(info['url'])
content_type = headers.get('Content-Type')
except Exception as e:
content_type = None
if content_type:
if content_type.startswith(('application/', 'image/')):
if '/ogg' not in content_type: # How does a server say `application/ogg` what the actual fuck
raise ExtractionError("Invalid content type \"%s\" for url %s" % (content_type, self.url))
if headers.get('ice-audio-info'):
raise LiveStreamError("Cannot download from a livestream")
if info.get('is_live', False):
raise LiveStreamError("Cannot download from a livestream")
# Set our info
self.info = info
async def prepare(self):
await self.get_info()
asyncio.run_coroutine_threadsafe(self.download(), self.loop)
return self.info
async def download(self):
try:
result = await self.downloader.extract_info(self.loop, self.url, download=True)
except Exception as e:
self.error = True
raise ExtractionError(e)
if result:
self.ready = True
opts = {
'before_options': '-nostdin',
'options': '-vn -b:a 128k -v fatal'
}
super().__init__(self.downloader.ytdl.prepare_filename(self.info), **opts)
@property
def title(self):
return self.info.get('title', 'Untitled')
@property
def thumbnail(self):
return self.info.get('thumbnail', None)
@property
def length(self):
return self.info.get('duration') or 0
@property
def progress(self):
if hasattr(self, 'start_time') and self.start_time:
return round(time.time() - self.start_time)
@property
def remaining(self):
length = self.length
progress = self.progress
if length and progress:
return length - progress
@property
def embed(self):
"""A property that returns an embed that can be used to display information about this particular song"""
# Create the embed object we'll use
embed = discord.Embed()
# Fill in the simple things
embed.add_field(name='Title', value=self.title, inline=False)
embed.add_field(name='Requester', value=self.requester.display_name, inline=False)
embed.add_field(name='Place in Queue', value=str(self.playlist.count))
if self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
# Get the current length of the song and display this
if self.length:
length = divmod(round(self.length, 0), 60)
fmt = "{0[0]}m {0[1]}s".format(length)
embed.add_field(name='Duration', value=fmt, inline=False)
# And return the embed we created
return embed
class YoutubeDLLiveStreamSource(discord.FFmpegPCMAudio):
def __init__(self, bot, url):
self._process = None
self.downloader = bot.downloader
self.loop = bot.loop
self.url = url
@property
def title(self):
return self.info.get('title', 'Untitled')
@property
def thumbnail(self):
return self.info.get('thumbnail', None)
@property
def embed(self):
"""A property that returns an embed that can be used to display information about this particular song"""
# Create the embed object we'll use
embed = discord.Embed()
# Fill in the simple things
embed.add_field(name='Title', value=self.title, inline=False)
embed.add_field(name='Requester', value=self.requester.display_name, inline=False)
if self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
# And return the embed we created
return embed
async def get_ready(self):
try:
# First attempt to gather the information
info = await self.downloader.extract_info(self.loop, self.url, download=False)
except Exception as e:
raise ExtractionError('Could not extract information from {}\n\n{}'.format(self.url, e))
if not info.get('is_live', False):
raise WrongEntryTypeError("This is not a livestream!", True, info.get('webpage_url', None) or info.get('url', None))
# Set our info
self.info = info
opts = {
'before_options': '-nostdin',
'options': '-vn -b:a 128k -v fatal'
}
super().__init__(info.get('manifest_url'), **opts)