1
0
Fork 0
mirror of synced 2024-05-20 20:42:27 +12:00
Bonfire/cogs/voice_utilities/playlist.py

281 lines
10 KiB
Python

import datetime
import traceback
from collections import deque
from itertools import islice
from random import shuffle
from .entry import URLPlaylistEntry
from .exceptions import ExtractionError, WrongEntryTypeError
from .event_emitter import EventEmitter
class Playlist(EventEmitter):
"""
A playlist is manages the list of songs that will be played.
"""
def __init__(self, bot):
super().__init__()
self.bot = bot
self.loop = bot.loop
self.downloader = bot.downloader
self.entries = deque()
self.max_songs = 10
def __iter__(self):
return iter(self.entries)
def shuffle(self):
shuffle(self.entries)
def clear(self):
self.entries.clear()
@property
def full(self):
return self.count >= self.max_songs
@property
def count(self):
if self.entries:
return len(self.entries)
else:
return 0
async def add_entry(self, song_url, requester, **meta):
"""
Validates and adds a song_url to be played. This does not start the download of the song.
Returns the entry & the position it is in the queue.
:param song_url: The song url to add to the playlist.
:param meta: Any additional metadata to add to the playlist entry.
"""
try:
info = await self.downloader.extract_info(self.loop, song_url, download=False)
except Exception as e:
raise ExtractionError('Could not extract information from {}\n\n{}'.format(song_url, e))
if not info:
raise ExtractionError('Could not extract information from %s' % song_url)
# TODO: Sort out what happens next when this happens
if info.get('_type', None) == 'playlist':
raise WrongEntryTypeError("This is a playlist.", True, info.get('webpage_url', None) or info.get('url', None))
if info['extractor'] in ['generic', 'Dropbox']:
try:
# unfortunately this is literally broken
# https://github.com/KeepSafe/aiohttp/issues/758
# https://github.com/KeepSafe/aiohttp/issues/852
content_type = await get_header(self.bot.aiosession, info['url'], 'CONTENT-TYPE')
print("Got content type", content_type)
except Exception as e:
print("[Warning] Failed to get content type for url %s (%s)" % (song_url, e))
content_type = None
if content_type:
if content_type.startswith(('application/', 'image/')):
if '/ogg' not in content_type: # How does a server say `application/ogg` what the actual fuck
raise ExtractionError("Invalid content type \"%s\" for url %s" % (content_type, song_url))
elif not content_type.startswith(('audio/', 'video/')):
print("[Warning] Questionable content type \"%s\" for url %s" % (content_type, song_url))
entry = URLPlaylistEntry(
self,
song_url,
info.get('title', 'Untitled'),
requester,
info.get('duration', 0) or 0,
self.downloader.ytdl.prepare_filename(info),
**meta
)
self._add_entry(entry)
return entry, len(self.entries)
async def import_from(self, playlist_url, **meta):
"""
Imports the songs from `playlist_url` and queues them to be played.
Returns a list of `entries` that have been enqueued.
:param playlist_url: The playlist url to be cut into individual urls and added to the playlist
:param meta: Any additional metadata to add to the playlist entry
"""
position = len(self.entries) + 1
entry_list = []
try:
info = await self.downloader.safe_extract_info(self.loop, playlist_url, download=False)
except Exception as e:
raise ExtractionError('Could not extract information from {}\n\n{}'.format(playlist_url, e))
if not info:
raise ExtractionError('Could not extract information from %s' % playlist_url)
# Once again, the generic extractor fucks things up.
if info.get('extractor', None) == 'generic':
url_field = 'url'
else:
url_field = 'webpage_url'
baditems = 0
for items in info['entries']:
if items:
try:
entry = URLPlaylistEntry(
self,
items[url_field],
items.get('title', 'Untitled'),
items.get('duration', 0) or 0,
self.downloader.ytdl.prepare_filename(items),
**meta
)
self._add_entry(entry)
entry_list.append(entry)
except:
baditems += 1
# Once I know more about what's happening here I can add a proper message
traceback.print_exc()
print(items)
print("Could not add item")
else:
baditems += 1
if baditems:
print("Skipped %s bad entries" % baditems)
return entry_list, position
async def async_process_youtube_playlist(self, playlist_url, **meta):
"""
Processes youtube playlists links from `playlist_url` in a questionable, async fashion.
:param playlist_url: The playlist url to be cut into individual urls and added to the playlist
:param meta: Any additional metadata to add to the playlist entry
"""
try:
info = await self.downloader.safe_extract_info(self.loop, playlist_url, download=False, process=False)
except Exception as e:
raise ExtractionError('Could not extract information from {}\n\n{}'.format(playlist_url, e))
if not info:
raise ExtractionError('Could not extract information from %s' % playlist_url)
gooditems = []
baditems = 0
for entry_data in info['entries']:
if entry_data:
baseurl = info['webpage_url'].split('playlist?list=')[0]
song_url = baseurl + 'watch?v=%s' % entry_data['id']
try:
entry, elen = await self.add_entry(song_url, **meta)
gooditems.append(entry)
except ExtractionError:
baditems += 1
except Exception as e:
baditems += 1
print("There was an error adding the song {}: {}: {}\n".format(
entry_data['id'], e.__class__.__name__, e))
else:
baditems += 1
if baditems:
print("Skipped %s bad entries" % baditems)
return gooditems
async def async_process_sc_bc_playlist(self, playlist_url, **meta):
"""
Processes soundcloud set and bancdamp album links from `playlist_url` in a questionable, async fashion.
:param playlist_url: The playlist url to be cut into individual urls and added to the playlist
:param meta: Any additional metadata to add to the playlist entry
"""
try:
info = await self.downloader.safe_extract_info(self.loop, playlist_url, download=False, process=False)
except Exception as e:
raise ExtractionError('Could not extract information from {}\n\n{}'.format(playlist_url, e))
if not info:
raise ExtractionError('Could not extract information from %s' % playlist_url)
gooditems = []
baditems = 0
for entry_data in info['entries']:
if entry_data:
song_url = entry_data['url']
try:
entry, elen = await self.add_entry(song_url, **meta)
gooditems.append(entry)
except ExtractionError:
baditems += 1
except Exception as e:
baditems += 1
print("There was an error adding the song {}: {}: {}\n".format(
entry_data['id'], e.__class__.__name__, e))
else:
baditems += 1
if baditems:
print("Skipped %s bad entries" % baditems)
return gooditems
def _add_entry(self, entry):
self.entries.append(entry)
self.emit('entry-added', playlist=self, entry=entry)
if self.peek() is entry:
entry.get_ready_future()
async def get_next_entry(self, predownload_next=True):
"""
A coroutine which will return the next song or None if no songs left to play.
Additionally, if predownload_next is set to True, it will attempt to download the next
song to be played - so that it's ready by the time we get to it.
"""
if not self.entries:
return None
entry = self.entries.popleft()
if predownload_next:
next_entry = self.peek()
if next_entry:
next_entry.get_ready_future()
return await entry.get_ready_future()
def peek(self):
"""
Returns the next entry that should be scheduled to be played.
"""
if self.entries:
return self.entries[0]
async def estimate_time_until(self, position, player):
"""
(very) Roughly estimates the time till the queue will 'position'
"""
estimated_time = sum([e.duration for e in islice(self.entries, position - 1)])
# When the player plays a song, it eats the first playlist item, so we just have to add the time back
if not player.is_stopped and player.current_entry:
estimated_time += player.current_entry.duration - player.progress
return datetime.timedelta(seconds=estimated_time)
def count_for_user(self, user):
return sum(1 for e in self.entries if e.meta.get('author', None) == user)