Source code for harmonize.player

from __future__ import annotations

import asyncio
from asyncio import Event
from time import time
from typing import Optional, TYPE_CHECKING, overload

from async_timeout import timeout as _timeout
from disnake import VoiceProtocol, Client, VoiceState
from loguru import logger

from harmonize.abstract import Filter
from harmonize.connection import Pool
from harmonize.enums import EndReason, LoopStatus
from harmonize.exceptions import RequestError, InvalidChannelStateException
from harmonize.objects import Track, MISSING
from harmonize.queue import Queue

if TYPE_CHECKING:
    from harmonize.connection import Node
    from disnake.channel import VocalGuildChannel
    from disnake import Guild

__all__ = (
    "Player",
    "Queue"
)


[docs] class Player(VoiceProtocol): """ Represents a Discord voice player. Attributes ---------- node : :class:`harmonize.connection.Node` The node the player is connected to. connection_event : :class:`asyncio.Event` An event triggered when the player's connection state changes. voice_state : dict[str, any] The current voice state of the player. user_data : dict[any, any] The user data of the player. volume : int The current volume of the player. paused : bool A boolean indicating whether the player is paused or not. connected : bool A boolean indicating whether the player is connected or not. is_playing : bool A boolean indicating whether the player is currently playing or not. guild : Optional[:class:`disnake.Guild`] The guild the player is connected to. loop : :class:`harmonize.enums.LoopStatus` The loop status of the player. position_timestamp : int The timestamp of the player's current position. last_position : int The last known position of the player. last_update : int The timestamp of the player's last update. queue : :class:`harmonize.queue.Queue` The queue of the player. filters : list[:class:`harmonize.abstract.Filter`] The list of filters currently applied to the player. """ channel: VocalGuildChannel def __call__(self, client: Client, channel: VocalGuildChannel) -> Player: super().__init__(client, channel) return self def __init__(self, *args, **kwargs) -> None: self._node: Node = Pool.get_best_node() self._connection_event: Event = asyncio.Event() self._connected: bool = False self._voice_state: dict[str, any] = {} self._user_data: dict[any, any] = {} self._volume: int = 100 self._paused: bool = False self.guild: Optional[Guild] = None self.loop: LoopStatus = LoopStatus(0) self.position_timestamp: int = 0 self.last_position: int = 0 self.last_update: int = 0 self._queue: Queue = Queue(self) self._filters: dict[str, Filter] = {} super().__init__(*args, **kwargs) @property def node(self) -> Node: return self._node @property def is_playing(self) -> bool: return self.queue.current is not None and not self._paused and self._connected @property def connected(self) -> bool: return self._connected @property def paused(self) -> bool: return self._paused @property def user_data(self) -> dict[any, any]: return self._user_data.copy() @property def volume(self) -> int: return self._volume @property def queue(self) -> Queue: return self._queue @property def filters(self) -> list[Filter]: return list(self._filters.values())
[docs] async def on_voice_server_update(self, data: dict) -> None: self._voice_state.update(endpoint=data['endpoint'], token=data['token']) if 'sessionId' not in self._voice_state: logger.warning(f'Player ({self.guild.id}) Missing sessionId, is the client User ID correct?') await self._dispatch_voice_update()
[docs] async def on_voice_state_update(self, data: dict) -> None: if not data['channel_id']: return await self.disconnect(force=True) if data['session_id'] != self._voice_state.get('sessionId'): self._voice_state.update(sessionId=data['session_id']) await self._dispatch_voice_update()
async def _dispatch_voice_update(self) -> None: if {'sessionId', 'endpoint', 'token'} == self._voice_state.keys(): await self._node.update_player(guild_id=self.guild.id, voice_state=self._voice_state) self._connection_event.set()
[docs] async def handle_event(self, reason: EndReason) -> None: """|coro| Handles an event triggered by the player, such as a track finishing or a load failure. Note ---- This function is required for autoplay please do not touch it for personal use Parameters ---------- reason : :class:`harmonize.enums.EndReason` The reason for the event. Returns ------- None """ if ( reason.value == EndReason.FINISHED.value or reason.value == EndReason.LOAD_FAILED.value ): try: await self.play() except RequestError as error: logger.error( 'Encountered a request error whilst ' f'starting a new track on guild ({self.guild.id}) {error}' )
[docs] async def update_state(self, state: dict) -> None: """|coro| Updates the state of the player with the given state dictionary. Note ---- This function is required to update the node statistics, please do not touch it for personal use Parameters ---------- state : dict The state dictionary containing the 'position' and 'time' keys. Returns ------- None """ self.last_update = int(time() * 1000) self.last_position = state.get('position', 0) self.position_timestamp = state.get('time', 0)
[docs] def dispatch(self, name: str, *args: any) -> None: """ Dispatches a message to the client with the given name and arguments. Parameters ---------- name : str The name of the message to dispatch. *args : any Variable number of arguments. Returns ------- None """ self._node.client.dispatch(f"harmonize_{name}", *args)
[docs] def add_user_data(self, **kwargs: any) -> None: """ Adds the given keyword arguments to the user data dictionary. Parameters ---------- **kwargs : any The keyword arguments to be added to the user data dictionary. Returns ------- None """ self._user_data.update(kwargs)
[docs] def fetch_user_data(self, key: any) -> any: """ Retrieves the value associated with the specified key from the user data dictionary. Parameters ---------- key : any The key to retrieve the value from the user data dictionary. Returns ------- any """ return self._user_data.get(key)
async def _play_back( self, track: Track = MISSING, start_time: int = MISSING, end_time: int = MISSING, no_replace: bool = MISSING, volume: int = MISSING, pause: bool = MISSING, **options ) -> Optional[dict[str, any]]: if start_time is not MISSING: if not isinstance(start_time, int) or 0 > start_time: raise ValueError('Start_time must be an int with a value equal to, or greater than 0') options['position'] = start_time if end_time is not MISSING: if not isinstance(end_time, int) or 1 > end_time: raise ValueError('End_time must be an int with a value equal to, or greater than 1') options['end_time'] = end_time if no_replace is not MISSING: if not isinstance(no_replace, bool): raise TypeError('No_replace must be a bool') options['no_replace'] = no_replace if volume is not MISSING: if not isinstance(volume, int): raise TypeError('Volume must be an int') self._volume = max(min(volume, 1000), 0) options['volume'] = self._volume if pause is not MISSING: if not isinstance(pause, bool): raise TypeError('Pause must be a bool') options['paused'] = pause if track := await self._queue._go_to_next(track): options["encoded_track"] = track.encoded return await self._node.update_player( guild_id=self.guild.id, **options )
[docs] async def play( self, track: Optional[Track] = MISSING, start_time: int = MISSING, end_time: int = MISSING, no_replace: bool = MISSING, volume: int = MISSING, pause: bool = MISSING, **kwargs: any ) -> None: """|coro| Plays a track with the given parameters. Note ---- Start_time must be an int with a value equal to, or greater than 0, and less than the track duration End_time must be an int with a value equal to, or greater than 1, and less than or equal to the track duration Parameters ---------- track : Optional[Track] The track to play. If MISSING, the next track in the queue will be played. start_time : int The start time of the track. If MISSING, the track will start from the beginning. end_time : int The end time of the track. If MISSING, the track will play until its end. no_replace : bool Whether to replace the current track or not. If True, the current track will not be replaced. volume : int The volume of the track. If MISSING, the current volume will be used. pause : bool Whether to pause the track after playing. If MISSING, the track will not be paused. **kwargs : any Additional options to be passed to the player. Raises ------ ValueError If the start_time or end_time is invalid. TypeError If the pause parameter is not a bool. Forbidden If the request is forbidden. RequestError Throws an error when the request fails. IOError If the connection has been closed Returns ------- None """ if no_replace is True and self.queue.current: return self.last_position = 0 self.position_timestamp = 0 if pause is not MISSING and isinstance(pause, bool): self._paused = pause if start_time is not MISSING: if not isinstance(start_time, int) or not 0 <= start_time < track.duration: raise ValueError( 'start_time must be an int with a value equal to, or greater than 0, ' 'and less than the track duration' ) self.last_position = start_time if end_time is not MISSING: if not isinstance(end_time, int) or not 1 <= end_time <= track.duration: raise ValueError( 'end_time must be an int with a value equal to, or greater than 1, ' 'and less than, or equal to the track duration' ) response = await self._play_back( track, start_time, end_time, no_replace, volume, pause, **kwargs ) if response is not None: self._paused = response['paused']
[docs] async def stop(self) -> None: """|coro| Stops the current player. Raises ------ Forbidden If the request is forbidden. RequestError Throws an error when the request fails. IOError If the connection has been closed Returns ------- None """ await self._node.update_player(guild_id=self.guild.id, encoded_track=None) self.queue._current = None
[docs] async def skip(self) -> Optional[Track]: """|coro| Skips the current track in the player's queue. Raises ------ Forbidden If the request is forbidden. RequestError Throws an error when the request fails. IOError If the connection has been closed Returns ------- Optional[:class:`harmonize.objects.Track`] """ old = self.queue.current await self.play() return old
[docs] async def set_pause(self, paused: bool) -> None: """|coro| Sets the pause state of the player. Parameters ---------- paused : bool Whether the player should be paused or not. Raises ------ Forbidden If the request is forbidden. RequestError Throws an error when the request fails. IOError If the connection has been closed Returns ------- None """ await self._node.update_player(guild_id=self.guild.id, paused=paused) self._paused = paused
[docs] async def change_volume(self, volume: int) -> None: """|coro| Changes the volume of the player. Parameters ---------- volume : int The new volume of the player. It will be clamped between 0 and 1000. Raises ------ Forbidden If the request is forbidden. RequestError Throws an error when the request fails. IOError If the connection has been closed Returns ------- None """ volume = max(min(volume, 1000), 0) await self._node.update_player(guild_id=self.guild.id, volume=volume) self._volume = volume
[docs] async def seek(self, position: int) -> None: """|coro| Seeks to a specific position in the player. Parameters ---------- position : int The position to seek to. Raises ------ Forbidden If the request is forbidden. RequestError Throws an error when the request fails. IOError If the connection has been closed Returns ------- None """ await self._node.update_player(guild_id=self.guild.id, position=position)
[docs] async def remove_filters(self) -> None: """|coro| Removes all audio filters from the player. Raises ------ Forbidden If the request is forbidden. RequestError Throws an error when the request fails. IOError If the connection has been closed Returns ------- None """ self._filters.clear() await self._node.update_player( guild_id=self.guild.id, filters=list(self._filters.values()) )
@overload async def update_filters(self, filter: Filter) -> None: ... @overload async def update_filters(self, filters: list[Filter]) -> None: ...
[docs] async def update_filters(self, **kwargs: any) -> None: """|coro| Updates the player's audio filters. Parameters ---------- **kwargs : any Keyword arguments containing the filters to update. - filters (list[Filter]): The list of filters to update. - filter (Filter): The filter to update. Raises ------ Forbidden If the request is forbidden. RequestError Throws an error when the request fails. IOError If the connection has been closed Returns ------- None """ filters = self._get_filters_from_kwargs(kwargs) serialized_filters: dict[str, Filter] = self._filters.copy() for i in filters: self._is_filter(i) serialized_filters[type(i).__name__.lower()].update(**i.values) self._filters = serialized_filters await self._node.update_player( guild_id=self.guild.id, filters=list(self._filters.values()) )
@overload async def set_filters(self, filter: Filter) -> None: ... @overload async def set_filters(self, filters: list[Filter]) -> None: ...
[docs] async def set_filters(self, **kwargs: any) -> None: """|coro| Sets the player's audio filters. Parameters ---------- **kwargs : any Keyword arguments containing the filters to set. - filters (list[Filter]): The list of filters to set. - filter (Filter): The filter to set. Raises ------ Forbidden If the request is forbidden. RequestError Throws an error when the request fails. IOError If the connection has been closed Returns ------- None """ filters = self._get_filters_from_kwargs(kwargs) serialized_filters: dict[str, Filter] = {} for i in filters: self._is_filter(i) serialized_filters[type(i).__name__.lower()] = i self._filters = serialized_filters await self._node.update_player( guild_id=self.guild.id, filters=list(self._filters.values()) )
@classmethod def _get_filters_from_kwargs(cls, kwargs: dict[str, any], /) -> list[Filter]: filters = [] if 'filters' in kwargs: filters = kwargs.pop('filters') elif 'filter' in kwargs: filters = [kwargs.pop('filter')] if not filters: raise ValueError('At least one filter must be specified') return filters @classmethod def _is_filter(cls, _filter: any, /) -> None: if not issubclass(_filter.__class__, Filter): raise TypeError(f'Expected subclass of type Filter, not {_filter.__name__}') if not isinstance(_filter, Filter): raise ValueError('Filter must be an instance of Filter')
[docs] @classmethod async def connect_to_channel(cls, channel: VocalGuildChannel, /, **kwargs: any) -> Player: """|coro| Connects a player to a specified channel. Parameters ---------- channel : VocalGuildChannel The channel to connect the player to. **kwargs : any Additional keyword arguments to be passed to the player user data. Returns ------- :class:`harmonize.Player` Raises ------ InvalidChannelStateException If the provided channel is not a valid voice channel. """ player = await channel.connect(cls=cls) # type: ignore player.add_user_data(**kwargs) return player
[docs] async def connect( self, *, timeout: float = 10.0, reconnect: bool, self_deaf: bool = True, self_mute: bool = False ) -> None: """|coro| Connects the player to a voice channel. Parameters ---------- timeout : float The timeout in seconds before the connection.rst attempt is cancelled. Defaults to 10.0. reconnect : bool Whether the player should reconnect if the connection.rst is lost. self_deaf : bool Whether the player should be deafened in the voice channel. Defaults to True. self_mute : bool Whether the player should be muted in the voice channel. Defaults to False. Raises ------ InvalidChannelStateException If the provided channel is not a valid voice channel. Returns ------- None """ if self.channel is MISSING: raise InvalidChannelStateException( f"Player tried to connect without a valid channel: " + 'Please use "disnake.VoiceChannel.connect(cls=...)" and pass this Player to cls.' ) if not self.guild: self.guild = self.channel.guild assert self.guild is not None await self.guild.change_voice_state(channel=self.channel, self_mute=self_mute, self_deaf=self_deaf) try: async with _timeout(timeout): await self._connection_event.wait() except (asyncio.TimeoutError, asyncio.CancelledError): logger.warning("Can't connect to channel. Destroying...") return else: self._connected = True self._node.players[self.guild.id] = self
async def _destroy(self) -> None: self._connection_event.clear() self._connected = False await self.guild.change_voice_state(channel=None) try: self.cleanup() except (AttributeError, KeyError): pass del self._node.players[self.guild.id] await self._node.destroy_player(self.guild.id)
[docs] async def disconnect(self, **kwargs: any) -> None: """|coro| Disconnects the player from the voice channel. Parameters ---------- **kwargs : any Additional keyword arguments. Returns ------- None """ await self._destroy() self.dispatch("player_disconnect", self)
[docs] async def move_to( self, channel: VocalGuildChannel | None, *, timeout: float = 10.0, self_deaf: Optional[bool] = True, self_mute: Optional[bool] = None, ) -> None: """|coro| Moves the player to a specified voice channel. Args: channel : VocalGuildChannel | None The voice channel to move the player to. If `None`, the player will remain in its current channel. timeout : Optional[float] The maximum time in seconds to wait for the player to connect to the channel. Defaults to 10.0. self_deaf : Optional[bool] Whether the player should be deafened in the voice channel. If `None`, the player's deafened status will be inherited from the current voice state. Defaults to `True`. self_mute : Optional[bool] Whether the player should be muted in the voice channel. If `None`, the player's muted status will be inherited from the current voice state. Defaults to `None`. Returns ------- None Raises ------ InvalidChannelStateException: If the player tries to move without a valid guild or channel. Note ----- This method will clear the `_connection_event` event and wait for the player to connect to the specified channel. If the connection attempt times out or is cancelled, the player will be destroyed. """ if not self.guild: raise InvalidChannelStateException("Player tried to move without a valid guild.") if not channel: raise InvalidChannelStateException("Player tried to move without a valid channel.") self._connection_event.clear() voice: VoiceState | None = self.guild.me.voice if self_deaf is None and voice: self_deaf = voice.self_deaf if self_mute is None and voice: self_mute = voice.self_mute self_deaf = bool(self_deaf) self_mute = bool(self_mute) await self.guild.change_voice_state(channel=channel, self_mute=self_mute, self_deaf=self_deaf) try: async with _timeout(timeout): await self._connection_event.wait() except (asyncio.TimeoutError, asyncio.CancelledError): logger.warning("Can't connect to channel. Destroying...") await self._destroy()