From 55d5251578bd9e6c9d85b63392cee359c3b9d375 Mon Sep 17 00:00:00 2001 From: RealStickman Date: Thu, 10 Dec 2020 13:40:02 +0100 Subject: [PATCH] Use mpris to display audio playing. --- arch-config/.config/polybar/i3config.ini | 2 +- .../.config/polybar/modules/modules.ini | 12 + .../polybar/scripts/player-mpris-tail.py | 529 ++++++++++++++++++ 3 files changed, 542 insertions(+), 1 deletion(-) create mode 100755 arch-config/.config/polybar/scripts/player-mpris-tail.py diff --git a/arch-config/.config/polybar/i3config.ini b/arch-config/.config/polybar/i3config.ini index b4bc81ee..4dbbe098 100755 --- a/arch-config/.config/polybar/i3config.ini +++ b/arch-config/.config/polybar/i3config.ini @@ -84,7 +84,7 @@ font-5 = Iosevka:style=Regular:size=16;3 modules-left = full sysmenu full i3 full left-top light-right-bot light-left-top right-bot full xwindow full left-top modules-center = right-bot full date full left-bot -modules-right = right-top full player-cmus full left-bot light-right-top light-left-bot right-top full battery full left-bot light-right-top light-left-bot right-top +modules-right = right-top full player-mpris-tail full left-bot light-right-top light-left-bot right-top full battery full left-bot light-right-top light-left-bot right-top tray-detached = false tray-offset-x = 0 diff --git a/arch-config/.config/polybar/modules/modules.ini b/arch-config/.config/polybar/modules/modules.ini index ff51a8c3..021e8c05 100755 --- a/arch-config/.config/polybar/modules/modules.ini +++ b/arch-config/.config/polybar/modules/modules.ini @@ -90,6 +90,18 @@ content = " " ################################################################################ +[module/player-mpris-tail] +type = custom/script +exec = ~/.config/polybar/scripts/player-mpris-tail.py -f '{icon} {artist} - {title}' +tail = true +click-left = ~/polybar-scripts/player-mpris-tail.py play-pause & +click-right = ~/polybar-scripts/player-mpris-tail.py next & +click-middle = ~/polybar-scripts/player-mpris-tail.py previous & + + +################################################################################ + + [module/backlight-acpi] inherit = module/xbacklight type = internal/backlight diff --git a/arch-config/.config/polybar/scripts/player-mpris-tail.py b/arch-config/.config/polybar/scripts/player-mpris-tail.py new file mode 100755 index 00000000..1b8d4515 --- /dev/null +++ b/arch-config/.config/polybar/scripts/player-mpris-tail.py @@ -0,0 +1,529 @@ +#!/usr/bin/env python3 + +import sys +import dbus +import os +from operator import itemgetter +import argparse +import re +from urllib.parse import unquote +import time +from dbus.mainloop.glib import DBusGMainLoop +from gi.repository import GLib +DBusGMainLoop(set_as_default=True) + + +FORMAT_STRING = '{icon} {artist} - {title}' +FORMAT_REGEX = re.compile(r'(\{:(?P.*?)(:(?P[wt])(?P\d+))?:(?P.*?):\})', re.I) +FORMAT_TAG_REGEX = re.compile(r'(?P[wt])(?P\d+)') +SAFE_TAG_REGEX = re.compile(r'[{}]') + +class PlayerManager: + def __init__(self, blacklist = [], connect = True): + self.blacklist = blacklist + self._connect = connect + self._session_bus = dbus.SessionBus() + self.players = {} + + self.print_queue = [] + self.connected = False + self.player_states = {} + + self.refreshPlayerList() + + if self._connect: + self.connect() + loop = GLib.MainLoop() + try: + loop.run() + except KeyboardInterrupt: + print("interrupt received, stopping…") + + def connect(self): + self._session_bus.add_signal_receiver(self.onOwnerChangedName, 'NameOwnerChanged') + self._session_bus.add_signal_receiver(self.onChangedProperties, 'PropertiesChanged', + path = '/org/mpris/MediaPlayer2', + sender_keyword='sender') + + def onChangedProperties(self, interface, properties, signature, sender = None): + if sender in self.players: + player = self.players[sender] + # If we know this player, but haven't been able to set up a signal handler + if 'properties_changed' not in player._signals: + # Then trigger the signal handler manually + player.onPropertiesChanged(interface, properties, signature) + else: + # If we don't know this player, get its name and add it + bus_name = self.getBusNameFromOwner(sender) + if bus_name is None: + return + self.addPlayer(bus_name, sender) + player = self.players[sender] + player.onPropertiesChanged(interface, properties, signature) + + def onOwnerChangedName(self, bus_name, old_owner, new_owner): + if self.busNameIsAPlayer(bus_name): + if new_owner and not old_owner: + self.addPlayer(bus_name, new_owner) + elif old_owner and not new_owner: + self.removePlayer(old_owner) + else: + self.changePlayerOwner(bus_name, old_owner, new_owner) + + def getBusNameFromOwner(self, owner): + player_bus_names = [ bus_name for bus_name in self._session_bus.list_names() if self.busNameIsAPlayer(bus_name) ] + for player_bus_name in player_bus_names: + player_bus_owner = self._session_bus.get_name_owner(player_bus_name) + if owner == player_bus_owner: + return player_bus_name + + def busNameIsAPlayer(self, bus_name): + return bus_name.startswith('org.mpris.MediaPlayer2') and bus_name.split('.')[3] not in self.blacklist + + def refreshPlayerList(self): + player_bus_names = [ bus_name for bus_name in self._session_bus.list_names() if self.busNameIsAPlayer(bus_name) ] + for player_bus_name in player_bus_names: + self.addPlayer(player_bus_name) + if self.connected != True: + self.connected = True + self.printQueue() + + def addPlayer(self, bus_name, owner = None): + player = Player(self._session_bus, bus_name, owner = owner, connect = self._connect, _print = self.print) + self.players[player.owner] = player + + def removePlayer(self, owner): + if owner in self.players: + self.players[owner].disconnect() + del self.players[owner] + # If there are no more players, clear the output + if len(self.players) == 0: + _printFlush(ICON_NONE) + # Else, print the output of the next active player + else: + players = self.getSortedPlayerOwnerList() + if len(players) > 0: + self.players[players[0]].printStatus() + + def changePlayerOwner(self, bus_name, old_owner, new_owner): + player = Player(self._session_bus, bus_name, owner = new_owner, connect = self._connect, _print = self.print) + self.players[new_owner] = player + del self.players[old_owner] + + # Get a list of player owners sorted by current status and age + def getSortedPlayerOwnerList(self): + players = [ + { + 'number': int(owner.split('.')[-1]), + 'status': 2 if player.status == 'playing' else 1 if player.status == 'paused' else 0, + 'owner': owner + } + for owner, player in self.players.items() + ] + return [ info['owner'] for info in reversed(sorted(players, key=itemgetter('status', 'number'))) ] + + # Get latest player that's currently playing + def getCurrentPlayer(self): + playing_players = [ + player_owner for player_owner in self.getSortedPlayerOwnerList() + if + self.players[player_owner].status == 'playing' or + self.players[player_owner].status == 'paused' + ] + return self.players[playing_players[0]] if playing_players else None + + def print(self, status, player): + self.player_states[player.bus_name] = status + + if self.connected: + current_player = self.getCurrentPlayer() + if current_player != None: + _printFlush(self.player_states[current_player.bus_name]) + else: + _printFlush(ICON_STOPPED) + else: + self.print_queue.append([status, player]) + + def printQueue(self): + for args in self.print_queue: + self.print(args[0], args[1]) + self.print_queue.clear() + + +class Player: + def __init__(self, session_bus, bus_name, owner = None, connect = True, _print = None): + self._session_bus = session_bus + self.bus_name = bus_name + self._disconnecting = False + self.__print = _print + + self.metadata = { + 'artist' : '', + 'album' : '', + 'title' : '', + 'track' : 0 + } + + self._rate = 1. + self._positionAtLastUpdate = 0. + self._timeAtLastUpdate = time.time() + self._positionTimerRunning = False + + self._metadata = None + self.status = 'stopped' + self.icon = ICON_NONE + self.icon_reversed = ICON_PLAYING + if owner is not None: + self.owner = owner + else: + self.owner = self._session_bus.get_name_owner(bus_name) + self._obj = self._session_bus.get_object(self.bus_name, '/org/mpris/MediaPlayer2') + self._properties_interface = dbus.Interface(self._obj, dbus_interface='org.freedesktop.DBus.Properties') + self._introspect_interface = dbus.Interface(self._obj, dbus_interface='org.freedesktop.DBus.Introspectable') + self._media_interface = dbus.Interface(self._obj, dbus_interface='org.mpris.MediaPlayer2') + self._player_interface = dbus.Interface(self._obj, dbus_interface='org.mpris.MediaPlayer2.Player') + self._introspect = self._introspect_interface.get_dbus_method('Introspect', dbus_interface=None) + self._getProperty = self._properties_interface.get_dbus_method('Get', dbus_interface=None) + self._playerPlay = self._player_interface.get_dbus_method('Play', dbus_interface=None) + self._playerPause = self._player_interface.get_dbus_method('Pause', dbus_interface=None) + self._playerPlayPause = self._player_interface.get_dbus_method('PlayPause', dbus_interface=None) + self._playerStop = self._player_interface.get_dbus_method('Stop', dbus_interface=None) + self._playerPrevious = self._player_interface.get_dbus_method('Previous', dbus_interface=None) + self._playerNext = self._player_interface.get_dbus_method('Next', dbus_interface=None) + self._playerRaise = self._media_interface.get_dbus_method('Raise', dbus_interface=None) + self._signals = {} + + self.refreshPosition() + self.refreshStatus() + self.refreshMetadata() + + if connect: + self.printStatus() + self.connect() + + def play(self): + self._playerPlay() + def pause(self): + self._playerPause() + def playpause(self): + self._playerPlayPause() + def stop(self): + self._playerStop() + def previous(self): + self._playerPrevious() + def next(self): + self._playerNext() + def raisePlayer(self): + self._playerRaise() + + def connect(self): + if self._disconnecting is not True: + introspect_xml = self._introspect(self.bus_name, '/') + if 'TrackMetadataChanged' in introspect_xml: + self._signals['track_metadata_changed'] = self._session_bus.add_signal_receiver(self.onMetadataChanged, 'TrackMetadataChanged', self.bus_name) + self._signals['seeked'] = self._player_interface.connect_to_signal('Seeked', self.onSeeked) + self._signals['properties_changed'] = self._properties_interface.connect_to_signal('PropertiesChanged', self.onPropertiesChanged) + + def disconnect(self): + self._disconnecting = True + for signal_name, signal_handler in list(self._signals.items()): + signal_handler.remove() + del self._signals[signal_name] + + def refreshStatus(self): + # Some clients (VLC) will momentarily create a new player before removing it again + # so we can't be sure the interface still exists + try: + self.status = str(self._getProperty('org.mpris.MediaPlayer2.Player', 'PlaybackStatus')).lower() + self.updateIcon() + self.checkPositionTimer() + except dbus.exceptions.DBusException: + self.disconnect() + + def refreshMetadata(self): + # Some clients (VLC) will momentarily create a new player before removing it again + # so we can't be sure the interface still exists + try: + self._metadata = self._getProperty('org.mpris.MediaPlayer2.Player', 'Metadata') + self._parseMetadata() + except dbus.exceptions.DBusException: + self.disconnect() + + def updateIcon(self): + self.icon = ( + ICON_PLAYING if self.status == 'playing' else + ICON_PAUSED if self.status == 'paused' else + ICON_STOPPED if self.status == 'stopped' else + ICON_NONE + ) + self.icon_reversed = ( + ICON_PAUSED if self.status == 'playing' else + ICON_PLAYING + ) + + def _print(self, status): + self.__print(status, self) + + def _parseMetadata(self): + if self._metadata != None: + # Obtain properties from _metadata + _artist = _getProperty(self._metadata, 'xesam:artist', ['']) + _album = _getProperty(self._metadata, 'xesam:album', '') + _title = _getProperty(self._metadata, 'xesam:title', '') + _track = _getProperty(self._metadata, 'xesam:trackNumber', '') + _genre = _getProperty(self._metadata, 'xesam:genre', ['']) + _disc = _getProperty(self._metadata, 'xesam:discNumber', '') + _length = _getProperty(self._metadata, 'xesam:length', 0) or _getProperty(self._metadata, 'mpris:length', 0) + _length_int = _length if type(_length) is int else int(float(_length)) + _date = _getProperty(self._metadata, 'xesam:contentCreated', '') + _year = _date[0:4] if len(_date) else '' + _url = _getProperty(self._metadata, 'xesam:url', '') + _cover = _getProperty(self._metadata, 'xesam:artUrl', '') or _getProperty(self._metadata, 'mpris:artUrl', '') + _duration = _getDuration(_length_int) + # Update metadata + self.metadata['artist'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _firstIfList(_artist)) + self.metadata['album'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _firstIfList(_album)) + self.metadata['title'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _firstIfList(_title)) + self.metadata['track'] = _track + self.metadata['genre'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _firstIfList(_genre)) + self.metadata['disc'] = _disc + self.metadata['date'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _date) + self.metadata['year'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _year) + self.metadata['url'] = _url + self.metadata['filename'] = os.path.basename(_url) + self.metadata['length'] = _length_int + self.metadata['cover'] = re.sub(SAFE_TAG_REGEX, """\1\1""", _firstIfList(_cover)) + self.metadata['duration'] = _duration + + def onMetadataChanged(self, track_id, metadata): + self.refreshMetadata() + self.printStatus() + + def onPropertiesChanged(self, interface, properties, signature): + updated = False + if dbus.String('Metadata') in properties: + _metadata = properties[dbus.String('Metadata')] + if _metadata != self._metadata: + self._metadata = _metadata + self._parseMetadata() + updated = True + if dbus.String('PlaybackStatus') in properties: + status = str(properties[dbus.String('PlaybackStatus')]).lower() + if status != self.status: + self.status = status + self.checkPositionTimer() + self.updateIcon() + updated = True + if dbus.String('Rate') in properties and dbus.String('PlaybackStatus') not in properties: + self.refreshStatus() + if NEEDS_POSITION and dbus.String('Rate') in properties: + rate = properties[dbus.String('Rate')] + if rate != self._rate: + self._rate = rate + self.refreshPosition() + + if updated: + self.refreshPosition() + self.printStatus() + + def checkPositionTimer(self): + if NEEDS_POSITION and self.status == 'playing' and not self._positionTimerRunning: + self._positionTimerRunning = True + GLib.timeout_add_seconds(1, self._positionTimer) + + def onSeeked(self, position): + self.refreshPosition() + self.printStatus() + + def _positionTimer(self): + self.printStatus() + self._positionTimerRunning = self.status == 'playing' + return self._positionTimerRunning + + def refreshPosition(self): + try: + time_us = self._getProperty('org.mpris.MediaPlayer2.Player', 'Position') + except dbus.exceptions.DBusException: + time_us = 0 + + self._timeAtLastUpdate = time.time() + self._positionAtLastUpdate = time_us / 1000000 + + def _getPosition(self): + if self.status == 'playing': + return self._positionAtLastUpdate + self._rate * (time.time() - self._timeAtLastUpdate) + else: + return self._positionAtLastUpdate + + def _statusReplace(self, match, metadata): + tag = match.group('tag') + format = match.group('format') + formatlen = match.group('formatlen') + text = match.group('text') + tag_found = False + reversed_tag = False + + if tag.startswith('-'): + tag = tag[1:] + reversed_tag = True + + if format is None: + tag_is_format_match = re.match(FORMAT_TAG_REGEX, tag) + if tag_is_format_match: + format = tag_is_format_match.group('format') + formatlen = tag_is_format_match.group('formatlen') + tag_found = True + if format is not None: + text = text.format_map(CleanSafeDict(**metadata)) + if format == 'w': + formatlen = int(formatlen) + text = text[:formatlen] + elif format == 't': + formatlen = int(formatlen) + if len(text) > formatlen: + text = text[:max(formatlen - len(TRUNCATE_STRING), 0)] + TRUNCATE_STRING + if tag_found is False and tag in metadata and len(metadata[tag]): + tag_found = True + + if reversed_tag: + tag_found = not tag_found + + if tag_found: + return text + else: + return '' + + def printStatus(self): + if self.status in [ 'playing', 'paused' ]: + metadata = { **self.metadata, 'icon': self.icon, 'icon-reversed': self.icon_reversed } + if NEEDS_POSITION: + metadata['position'] = time.strftime("%M:%S", time.gmtime(self._getPosition())) + # replace metadata tags in text + text = re.sub(FORMAT_REGEX, lambda match: self._statusReplace(match, metadata), FORMAT_STRING) + # restore polybar tag formatting and replace any remaining metadata tags after that + try: + text = re.sub(r'􏿿p􏿿(.*?)􏿿p􏿿(.*?)􏿿p􏿿(.*?)􏿿p􏿿', r'%{\1}\2%{\3}', text.format_map(CleanSafeDict(**metadata))) + except: + print("Invalid format string") + self._print(text) + else: + self._print(ICON_STOPPED) + + +def _dbusValueToPython(value): + if isinstance(value, dbus.Dictionary): + return {_dbusValueToPython(key): _dbusValueToPython(value) for key, value in value.items()} + elif isinstance(value, dbus.Array): + return [ _dbusValueToPython(item) for item in value ] + elif isinstance(value, dbus.Boolean): + return int(value) == 1 + elif ( + isinstance(value, dbus.Byte) or + isinstance(value, dbus.Int16) or + isinstance(value, dbus.UInt16) or + isinstance(value, dbus.Int32) or + isinstance(value, dbus.UInt32) or + isinstance(value, dbus.Int64) or + isinstance(value, dbus.UInt64) + ): + return int(value) + elif isinstance(value, dbus.Double): + return float(value) + elif ( + isinstance(value, dbus.ObjectPath) or + isinstance(value, dbus.Signature) or + isinstance(value, dbus.String) + ): + return unquote(str(value)) + +def _getProperty(properties, property, default = None): + value = default + if not isinstance(property, dbus.String): + property = dbus.String(property) + if property in properties: + value = properties[property] + return _dbusValueToPython(value) + else: + return value + +def _getDuration(t: int): + seconds = t / 1000000 + return time.strftime("%M:%S", time.gmtime(seconds)) + +def _firstIfList(_value): + return _value[0] if type(_value) is list and len(_value) else _value + +class CleanSafeDict(dict): + def __missing__(self, key): + return '{{{}}}'.format(key) + + +""" +Seems to assure print() actually prints when no terminal is connected +""" + +_last_status = '' +def _printFlush(status, **kwargs): + global _last_status + if status != _last_status: + print(status, **kwargs) + sys.stdout.flush() + _last_status = status + + + +parser = argparse.ArgumentParser() +parser.add_argument('command', help="send the given command to the active player", + choices=[ 'play', 'pause', 'play-pause', 'stop', 'previous', 'next', 'status', 'list', 'current', 'metadata', 'raise' ], + default=None, + nargs='?') +parser.add_argument('-b', '--blacklist', help="ignore a player by it's bus name. Can be be given multiple times (e.g. -b vlc -b audacious)", + action='append', + metavar="BUS_NAME", + default=[]) +parser.add_argument('-f', '--format', default='{icon} {:artist:{artist} - :}{:title:{title}:}{:-title:{filename}:}') +parser.add_argument('--truncate-text', default='…') +parser.add_argument('--icon-playing', default='⏵') +parser.add_argument('--icon-paused', default='⏸') +parser.add_argument('--icon-stopped', default='⏹') +parser.add_argument('--icon-none', default='') +args = parser.parse_args() + +FORMAT_STRING = re.sub(r'%\{(.*?)\}(.*?)%\{(.*?)\}', r'􏿿p􏿿\1􏿿p􏿿\2􏿿p􏿿\3􏿿p􏿿', args.format) +NEEDS_POSITION = "{position}" in FORMAT_STRING + +TRUNCATE_STRING = args.truncate_text +ICON_PLAYING = args.icon_playing +ICON_PAUSED = args.icon_paused +ICON_STOPPED = args.icon_stopped +ICON_NONE = args.icon_none + +if args.command is None: + PlayerManager(blacklist = args.blacklist) +else: + player_manager = PlayerManager(blacklist = args.blacklist, connect = False) + current_player = player_manager.getCurrentPlayer() + if args.command == 'play' and current_player: + current_player.play() + elif args.command == 'pause' and current_player: + current_player.pause() + elif args.command == 'play-pause' and current_player: + current_player.playpause() + elif args.command == 'stop' and current_player: + current_player.stop() + elif args.command == 'previous' and current_player: + current_player.previous() + elif args.command == 'next' and current_player: + current_player.next() + elif args.command == 'status' and current_player: + current_player.printStatus() + elif args.command == 'list': + print("\n".join(sorted([ + "{} : {}".format(player.bus_name.split('.')[3], player.status) + for player in player_manager.players.values() ]))) + elif args.command == 'current' and current_player: + print("{} : {}".format(current_player.bus_name.split('.')[3], current_player.status)) + elif args.command == 'metadata' and current_player: + print(_dbusValueToPython(current_player._metadata)) + elif args.command == 'raise' and current_player: + current_player.raisePlayer()