-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathadafruit_ble_apple_media.py
290 lines (228 loc) · 10 KB
/
adafruit_ble_apple_media.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# SPDX-FileCopyrightText: 2020 Scott Shawcroft for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_ble_apple_media`
================================================================================
Support for the Apple Media Service which provides media playback info and control.
Documented by Apple here:
https://developer.apple.com/library/archive/documentation/CoreBluetooth/Reference/AppleMediaService_Reference/Introduction/Introduction.html#//apple_ref/doc/uid/TP40014716-CH2-SW1
"""
try:
from typing import Union, Type
AppleMediaServiceType = Union["AppleMediaService", Type["AppleMediaService"]]
except ImportError:
pass
import struct
import time
import _bleio
from adafruit_ble.attributes import Attribute
from adafruit_ble.characteristics import Characteristic, ComplexCharacteristic
from adafruit_ble.uuid import VendorUUID
from adafruit_ble.services import Service
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE_Apple_Media.git"
# Disable protected access checks since our private classes are tightly coupled.
# pylint: disable=protected-access
class _RemoteCommand(ComplexCharacteristic):
"""Endpoint for sending commands to a media player. The value read will list all available
commands."""
uuid = VendorUUID("9B3C81D8-57B1-4A8A-B8DF-0E56F7CA51C2")
def __init__(self) -> None:
super().__init__(
properties=Characteristic.WRITE_NO_RESPONSE | Characteristic.NOTIFY,
read_perm=Attribute.OPEN,
write_perm=Attribute.OPEN,
max_length=13,
fixed_length=False,
)
def bind(self, service: Service) -> _bleio.PacketBuffer:
"""Binds the characteristic to the given Service."""
bound_characteristic = super().bind(service)
return _bleio.PacketBuffer(bound_characteristic, buffer_size=1)
class _EntityUpdate(ComplexCharacteristic):
"""UTF-8 Encoded string characteristic."""
uuid = VendorUUID("2F7CABCE-808D-411F-9A0C-BB92BA96C102")
def __init__(self) -> None:
super().__init__(
properties=Characteristic.WRITE | Characteristic.NOTIFY,
read_perm=Attribute.OPEN,
write_perm=Attribute.OPEN,
max_length=128,
fixed_length=False,
)
def bind(self, service: Service) -> _bleio.PacketBuffer:
"""Binds the characteristic to the given Service."""
bound_characteristic = super().bind(service)
return _bleio.PacketBuffer(bound_characteristic, buffer_size=8)
class _EntityAttribute(Characteristic): # pylint: disable=too-few-public-methods
"""UTF-8 Encoded string characteristic."""
uuid = VendorUUID("C6B2F38C-23AB-46D8-A6AB-A3A870BBD5D7")
def __init__(self) -> None:
super().__init__(
properties=Characteristic.WRITE | Characteristic.READ,
read_perm=Attribute.OPEN,
write_perm=Attribute.OPEN,
fixed_length=False,
)
class _MediaAttribute:
def __init__(self, entity_id: int, attribute_id: int) -> None:
self.key = (entity_id, attribute_id)
@staticmethod
def _update(obj: AppleMediaServiceType) -> None:
if not obj._buffer:
obj._buffer = bytearray(128)
length_read = obj._entity_update.readinto(obj._buffer)
if length_read > 0:
if length_read < 3:
raise RuntimeError("packet too short")
# Even though flags is currently unused, if it were removed, it would cause there to be
# too many values to unpack which would raise a ValueError
(
entity_id,
attribute_id,
flags, # pylint: disable=unused-variable
) = struct.unpack_from("<BBB", obj._buffer)
value = str(obj._buffer[3:length_read], "utf-8")
obj._attribute_cache[(entity_id, attribute_id)] = value
def __get__(self, obj: AppleMediaServiceType, cls) -> str:
self._update(obj)
if self.key not in obj._attribute_cache:
siblings = [self.key[1]]
for k in obj._attribute_cache:
if k[0] == self.key[0] and k[1] not in siblings:
siblings.append(k[1])
buf = struct.pack("<B" + "B" * len(siblings), self.key[0], *siblings)
obj._entity_update.write(buf)
obj._attribute_cache[self.key] = None
time.sleep(0.05)
self._update(obj)
return obj._attribute_cache[self.key]
class _MediaAttributePlaybackState:
def __init__(self, playback_value: int):
self._playback_value = playback_value
def __get__(self, obj: AppleMediaServiceType, cls) -> bool:
info = obj._playback_info
if info:
return int(info.split(",")[0]) == self._playback_value
return False
class _MediaAttributePlaybackInfo:
def __init__(self, position: int) -> None:
self._position = position
def __get__(self, obj: AppleMediaServiceType, cls) -> float:
info = obj._playback_info
if info:
return float(info.split(",")[self._position])
return 0.0
class UnsupportedCommand(Exception):
"""Raised when the command isn't available with current media player app."""
class AppleMediaService(Service):
"""View and control currently playing media.
Exact functionality varies with different media apps. For example, Spotify will include the
album name and artist name in `title` when controlling playback on a remote device.
`artist` includes a description of the remote playback.
"""
uuid = VendorUUID("89D3502B-0F36-433A-8EF4-C502AD55F8DC")
_remote_command = _RemoteCommand()
_entity_update = _EntityUpdate()
_entity_attribute = _EntityAttribute()
player_name = _MediaAttribute(0, 0)
"""Name of the media player app"""
_playback_info = _MediaAttribute(0, 1)
paused = _MediaAttributePlaybackState(0)
"""True when playback is paused. False otherwise."""
playing = _MediaAttributePlaybackState(1)
"""True when playback is playing. False otherwise."""
rewinding = _MediaAttributePlaybackState(2)
"""True when playback is rewinding. False otherwise."""
fast_forwarding = _MediaAttributePlaybackState(3)
"""True when playback is fast-forwarding. False otherwise."""
playback_rate = _MediaAttributePlaybackInfo(1)
"""Playback rate as a decimal of normal speed."""
elapsed_time = _MediaAttributePlaybackInfo(2)
"""Time elapsed in the current track. Not updated as the track plays. Use (the amount of time
since read elapsed time) * `playback_rate` to estimate the current `elapsed_time`."""
volume = _MediaAttribute(0, 2)
"""Current volume"""
queue_index = _MediaAttribute(1, 0)
"""Current track's index in the queue."""
queue_length = _MediaAttribute(1, 1)
"""Count of tracks in the queue."""
shuffle_mode = _MediaAttribute(1, 2)
"""Current shuffle mode as an integer. Off (0), One (1), and All (2)"""
repeat_mode = _MediaAttribute(1, 3)
"""Current repeat mode as an integer. Off (0), One (1), and All (2)"""
artist = _MediaAttribute(2, 0)
"""Current track's artist name."""
album = _MediaAttribute(2, 1)
"""Current track's album name."""
title = _MediaAttribute(2, 2)
"""Current track's title."""
duration = _MediaAttribute(2, 3)
"""Current track's duration as a string."""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._buffer = None
self._cmd = None
self._register_buffer = None
self._attribute_cache = {}
self._supported_commands = []
self._command_buffer = None
def _send_command(self, command_id: bytearray) -> None:
if not self._command_buffer:
self._command_buffer = bytearray(13)
i = self._remote_command.readinto( # pylint: disable=no-member
self._command_buffer
)
if i > 0:
self._supported_commands = list(self._command_buffer[:i])
if command_id not in self._supported_commands:
if not self._supported_commands:
return
raise UnsupportedCommand()
if not self._cmd:
self._cmd = bytearray(1)
self._cmd[0] = command_id
self._remote_command.write(self._cmd) # pylint: disable=no-member
def play(self) -> None:
"""Plays the current track. Does nothing if already playing."""
self._send_command(0)
def pause(self) -> None:
"""Pauses the current track. Does nothing if already paused."""
self._send_command(1)
def toggle_play_pause(self) -> None:
"""Plays the current track if it is paused. Otherwise it pauses the track."""
self._send_command(2)
def next_track(self) -> None:
"""Stops playing the current track and plays the next one."""
self._send_command(3)
def previous_track(self) -> None:
"""Stops playing the current track and plays the previous track."""
self._send_command(4)
def volume_up(self) -> None:
"""Increases the playback volume."""
self._send_command(5)
def volume_down(self) -> None:
"""Decreases the playback volume."""
self._send_command(6)
def advance_repeat_mode(self) -> None:
"""Advances the repeat mode. Modes are: Off, One and All"""
self._send_command(7)
def advance_shuffle_mode(self) -> None:
"""Advances the shuffle mode. Modes are: Off, One and All"""
self._send_command(8)
def skip_forward(self) -> None:
"""Skips forwards in the current track"""
self._send_command(9)
def skip_backward(self) -> None:
"""Skips backwards in the current track"""
self._send_command(10)
def like_track(self) -> None:
"""Likes the current track"""
self._send_command(11)
def dislike_track(self) -> None:
"""Dislikes the current track"""
self._send_command(12)
def bookmark_track(self) -> None:
"""Bookmarks the current track"""
self._send_command(13)