From 6142925e84f3b192af03b6a1ae6acb6d3d6424a6 Mon Sep 17 00:00:00 2001 From: Hyralex Date: Sun, 5 Jun 2022 17:03:15 +1200 Subject: [PATCH] - Add support for multi zone - Add support for MDX and MDA --- anthemav/protocol.py | 526 ++++++++++++++++++++++++++++++----------- anthemav/tools.py | 9 +- example.py | 7 +- setup.py | 2 +- tests/test_protocol.py | 287 +++++++++++++++------- 5 files changed, 609 insertions(+), 222 deletions(-) diff --git a/anthemav/protocol.py b/anthemav/protocol.py index 07ca848..b6184e1 100755 --- a/anthemav/protocol.py +++ b/anthemav/protocol.py @@ -1,14 +1,18 @@ """Module to maintain AVR state information and network interface.""" import asyncio import logging -from typing import Awaitable, Callable +from typing import Awaitable, Callable, Dict from anthemav.device_error import DeviceError __all__ = ["AVR"] # These properties apply even when the AVR is powered off -ATTR_CORE = {"Z1POW", "IDM"} +ATTR_CORE = ["IDM"] + +# These properties are sent when the device is powered on +# This is used to force refresh the power state of the device is POW command isn't sent +ATTR_POWERED_ON = ["Z1ALM", "Z1AIC", "Z1VIR"] # Audio Listening mode ALM_NUMBER_x20 = { @@ -46,18 +50,22 @@ ALM_RESTRICTED_MODEL = ["MRX 520"] -LOOKUP = {} +LOOKUP: Dict[str, Dict[str, str]] = {} +ZONELOOKUP: Dict[str, Dict[str, str]] = {} + +ZONELOOKUP["POW"] = {"description": "Zone Power", "0": "Off", "1": "On"} +ZONELOOKUP["VOL"] = {"description": "Zone Volume"} +ZONELOOKUP["INP"] = {"description": "Zone current input"} +ZONELOOKUP["MUT"] = {"description": "Zone mute", "0": "Unmuted", "1": "Muted"} +# MRX 540, 740, 1140 +ZONELOOKUP["PVOL"] = {"description": "Zone Volume in percent"} -LOOKUP["Z1POW"] = {"description": "Zone 1 Power", "0": "Off", "1": "On"} -LOOKUP["Z1VOL"] = {"description": "Zone 1 Volume"} LOOKUP["IDR"] = {"description": "Region"} LOOKUP["IDM"] = {"description": "Model"} LOOKUP["IDS"] = {"description": "Software version"} LOOKUP["IDB"] = {"description": "Software build date"} LOOKUP["IDH"] = {"description": "Hardware version"} LOOKUP["ICN"] = {"description": "Active input count"} -LOOKUP["Z1INP"] = {"description": "Zone 1 current input"} -LOOKUP["Z1MUT"] = {"description": "Zone 1 mute", "0": "Unmuted", "1": "Muted"} LOOKUP["Z1VIR"] = { "description": "Video input resolution", "0": "No video", @@ -146,7 +154,6 @@ } # MRX 540, 740, 1140 -LOOKUP["Z1PVOL"] = {"description": "Zone 1 Volume"} LOOKUP["WMAC"] = {"description": "Wi-Fi MAC address"} LOOKUP["EMAC"] = {"description": "Ethernet MAC address"} LOOKUP["IS1ARC"] = {"description": "Zone 1 ARC", "0": "Off", "1": "On"} @@ -158,8 +165,13 @@ "2": "IP and RS232 on", } +# MDX +LOOKUP["MAC"] = {"description": "MAC address"} + COMMANDS_X20 = ["IDN", "ECH", "SIP", "Z1ARC", "FPB"] -COMMANDS_X40 = ["Z1PVOL", "WMAC", "EMAC", "IS1ARC", "GCFPB", "GCTXS"] +COMMANDS_X40 = ["PVOL", "WMAC", "EMAC", "IS1ARC", "GCFPB", "GCTXS"] +COMMANDS_MDX_IGNORE = ["IDR"] +COMMANDS_MDX = ["MAC"] EMPTY_MAC = "00:00:00:00:00:00" UNKNOWN_MODEL = "Unknown Model" @@ -201,19 +213,20 @@ def __init__( self.buffer = "" self._input_names = {} self._input_numbers = {} + self._device_power = False self._poweron_refresh_successful = False self.transport: asyncio.Transport = None self._ignored_commands = [] self._force_refresh = False self._model_series = "" + self._last_command = "" self._deviceinfo_received = asyncio.Event() self._alm_number = {"None": 0} + self.zones: Dict[int, Zone] = {1: Zone(self, 1)} for key in LOOKUP: setattr(self, f"_{key}", "") - self._Z1POW = "0" - async def wait_for_device_initialised(self, timeout: float): """Wait to receive the model and mac address for the device""" try: @@ -240,7 +253,7 @@ async def refresh_core(self): This does not return any data, it just issues the queries. """ - self.log.debug("Sending out mass query for all attributes") + self.log.debug("Sending out core query for all attributes") for key in ATTR_CORE: if self.transport is None: self.log.warning("Lost connection to receiver while refreshing device") @@ -276,13 +289,35 @@ async def refresh_all(self): This does not return any data, it just issues the queries. """ self.log.debug("refresh_all") - for key in {k: v for k, v in LOOKUP.items() if k not in self._ignored_commands}: - if self.transport is None: - self.log.warning("Lost connection to receiver while refreshing device") - break - self.query(key) - # small delay between command - await asyncio.sleep(0.01) + # refresh main attribues + await self.query_commands(LOOKUP) + + async def refresh_power(self): + """Refresh power of all zones.""" + self.log.debug("refresh_power") + for zone in self.zones: + self.query(f"Z{zone}POW") + await asyncio.sleep(0.02) + + async def refresh_zone(self, zone: int): + """Query all zones for all attributes.""" + self.log.debug(f"refresh_zone: {zone}") + await self.query_commands(ZONELOOKUP, zone) + + async def query_commands(self, commands: Dict[str, Dict[str, str]], zone: int = 0): + for key in commands: + if key not in self._ignored_commands: + if self.transport is None: + self.log.warning( + "Lost connection to receiver while refreshing device" + ) + break + if zone > 0: + self.log.debug(f"Add zone to command {key} for zone {zone}") + key = f"Z{zone}{key}" + self.query(key) + # small delay between command + await asyncio.sleep(0.02) # # asyncio network functions @@ -296,7 +331,10 @@ def connection_made(self, transport: asyncio.Transport): # self.transport.set_write_buffer_limits(0) limit_low, limit_high = self.transport.get_write_buffer_limits() self.log.debug("Write buffer limits %d to %d", limit_low, limit_high) - + self._poweron_refresh_successful = False + self._device_power = False + for zone in self.zones.values(): + zone.need_refresh = True asyncio.run_coroutine_threadsafe(self.refresh_core(), self._loop) def data_received(self, data): @@ -304,7 +342,7 @@ def data_received(self, data): self.buffer += data.decode() self.log.debug("Received %d bytes from AVR: %s", len(self.buffer), self.buffer) try: - self._assemble_buffer() + self._loop.create_task(self._assemble_buffer()) except Exception as error: self.log.warning("Unable to parse message. Error: %s", error) @@ -322,7 +360,7 @@ def connection_lost(self, exc): self._connection_lost_callback(), self._loop ) - def _assemble_buffer(self): + async def _assemble_buffer(self): """Split up received data from device into individual commands. Data sent by the device is a sequence of datagrams separated by @@ -336,7 +374,13 @@ def _assemble_buffer(self): for message in self.buffer.split(";"): if message != "": self.log.debug("assembled message %s", message) - self._parse_message(message) + await self._parse_message(message) + elif self._last_command != "": + # For some case the receiver only send ; to confirm receiving previous command. + last_command = self._last_command + self._last_command = "" + self.log.debug("send last command %s", last_command) + await self._parse_message(last_command) self.buffer = "" @@ -356,7 +400,7 @@ def _populate_inputs(self, total): else: self.query(f"ISN{input_number:02d}") - def _parse_message(self, data: str): + async def _parse_message(self, data: str): """Interpret each message datagram from device and do the needful. This function receives datagrams from _assemble_buffer and inerprets @@ -380,7 +424,6 @@ def _parse_message(self, data: str): self.log.debug("Ignoring command for powered-off zone: %s", data[2:]) recognized = True else: - for key, commands in LOOKUP.items(): if data.startswith(key): recognized = True @@ -415,45 +458,45 @@ def _parse_message(self, data: str): setattr(self, "_" + key, value) if key == "IDM" and value != oldvalue: + # receiving model number, we can initialize the device and request all attributes self.set_model_command(value) + self.set_zones(value) + await self.refresh_power() + elif key == "IDM" and self._poweron_refresh_successful is False: + # Could be because of reconnection + await self.refresh_power() - if key == "IDM" or key == "IDN" or key == "EMAC" or key == "WMAC": + if ( + key == "IDM" + or key == "IDN" + or key == "EMAC" + or key == "WMAC" + or key == "MAC" + ): self._set_device_initialised() - if key == "Z1POW" and value == "1" and oldvalue == "0": - self.log.debug("Power on detected, refreshing all attributes") - self._poweron_refresh_successful = False - self._loop.call_later( - 1, - asyncio.run_coroutine_threadsafe, - self.poweron_refresh(), - self._loop, - ) + await self.force_refresh_power(key) - if key == "Z1POW" and value == "0" and oldvalue == "1": - self._poweron_refresh_successful = False - - if ( - self._force_refresh is False - and self._Z1POW == "0" - and all(coreKey not in key for coreKey in ATTR_CORE) - ): - # AVR doesn't send Power State ON - # force refresh power state when receiving any command from a potential powered on AVR - self._force_refresh = True - self.log.debug("Force refresh Power State") - self.query("Z1POW") - self._loop.call_later(2, setattr, self, "_force_refresh", False) + if key == "GCTXS" and value == "0": + # tx status is disabled but required for this library. Set it back on again + self.command("GCTXS1") break + if data.startswith("Z"): + self.log.debug("Zone command received: %s", data) + newdata = (await self.parse_zone_command(data)) or newdata + recognized = True + if data.startswith("ICN"): self.log.debug("ICN update received") self._poweron_refresh_successful = True recognized = True self._populate_inputs(int(value)) - if data.startswith("ISN") and len(data) > 5: # x20 series: ISN01Turntable + if ( + data.startswith("ISN") and len(data) > 5 + ): # x20 and mdx series: ISN01Turntable recognized = True self._poweron_refresh_successful = True @@ -492,6 +535,60 @@ def _parse_message(self, data: str): if not recognized: self.log.debug("Unrecognized response: %s", data) + async def parse_zone_command(self, data: str) -> bool: + newdata = False + zone: int = int(data[1]) + if zone not in self.zones: + self.log.error(f"Zone {zone} isn't registered for this amplifier.") + # remove zone (Z1, Z2 ....) from the data + zone_data = data[2:] + for zoneCommand in ZONELOOKUP: + if zone_data.startswith(zoneCommand): + self.log.debug(f"Parse message {zone_data} for zone {zone}") + value = zone_data[len(zoneCommand) :] + oldvalue = self.zones[zone].values.get(zoneCommand, "") + self.zones[zone].values[zoneCommand] = value + if oldvalue != value: + newdata = True + if zoneCommand == "POW" and (newdata or self.zones[zone].need_refresh): + self.zones[zone].need_refresh = False + if value == "1": + await self.refresh_zone(zone) + if self._device_power is False: + self.log.debug( + "Powered on device detected refresh all attributes" + ) + self._device_power = True + self._poweron_refresh_successful = False + self._loop.call_later( + 1, + asyncio.run_coroutine_threadsafe, + self.poweron_refresh(), + self._loop, + ) + elif value == "0" and oldvalue == "1": + if all(zone.power is False for zone in self.zones.values()): + # all zone are off, switch off device + self.log.debug("Power off device") + self._poweron_refresh_successful = False + self._device_power = False + break + + return newdata + + async def force_refresh_power(self, command): + if ( + self._force_refresh is False + and self._device_power is False + and command in ATTR_POWERED_ON + ): + # AVR doesn't send Power State ON + # force refresh power state when receiving any command from a potential powered on AVR + self._force_refresh = True + self.log.debug("Force refresh Power State") + await self.refresh_power() + self._loop.call_later(2, setattr, self, "_force_refresh", False) + def query(self, item: str): """Issue a raw query to the device for an item. @@ -519,19 +616,40 @@ def set_model_command(self, model: str): """Add the commands to the model.""" if "40" in model or "70" in model or "90" in model: self.log.debug("Set Command to Model x40") - self._ignored_commands = COMMANDS_X20 + self._ignored_commands = COMMANDS_X20 + COMMANDS_MDX self._model_series = "x40" + self.query("GCTXS") self.query("EMAC") self.query("WMAC") self._alm_number = ALM_NUMBER_x40 + elif "MDX" in model or "MDA" in model: + self.log.debug("Set Command to Model MDX") + self._ignored_commands = COMMANDS_X20 + COMMANDS_X40 + COMMANDS_MDX_IGNORE + self._model_series = "mdx" + self.query("MAC") else: self.log.debug("Set Command to Model x20") - self._ignored_commands = COMMANDS_X40 + self._ignored_commands = COMMANDS_X40 + COMMANDS_MDX self._model_series = "x20" self._alm_number = ALM_NUMBER_x20 self.command("ECH1") self.query("IDN") + def set_zones(self, model: str): + """Set zones for the appropriate objects.""" + number_of_zones: int = 0 + if model == "MDX 16" or model == "MDA 16": + number_of_zones = 8 + elif model == "MDX 8" or model == "MDA 8": + number_of_zones = 4 + else: + number_of_zones = 2 + + self.log.debug(f"Initialize {number_of_zones} zones") + for zone in range(1, number_of_zones + 1): + if zone not in self.zones: + self.zones[zone] = Zone(self, zone) + def command(self, command: str): """Issue a raw command to the device. @@ -549,6 +667,8 @@ def command(self, command: str): >>> command('Z1VOL-50') """ + if "?" not in command and "INP" not in command: + self._last_command = command command = command + ";" self.formatted_command(command) @@ -578,50 +698,6 @@ def formatted_command(self, command: str): "No transport found, unable to send command. error: %s", str(error) ) - # - # Volume and Attenuation handlers. The Anthem tracks volume internally as - # an attenuation level ranging from -90dB (silent) to 0dB (bleeding ears) - # - # We expose this in three methods for the convenience of downstream apps - # which will almost certainly be doing things their own way: - # - # - attenuation (-90 to 0) - # - volume (0-100) - # - volume_as_percentage (0-1 floating point) - # - - def attenuation_to_volume(self, value): - """Convert a native attenuation value to a volume value. - - Takes an attenuation in dB from the Anthem (-90 to 0) and converts it - into a normal volume value (0-100). - - :param arg1: attenuation in dB (negative integer from -90 to 0) - :type arg1: int - - returns an integer value representing volume - """ - try: - return round((90.00 + int(value)) / 90 * 100) - except ValueError: - return 0 - - def volume_to_attenuation(self, value): - """Convert a volume value to a native attenuation value. - - Takes a volume value and turns it into an attenuation value suitable - to send to the Anthem AVR. - - :param arg1: volume (integer from 0 to 100) - :type arg1: int - - returns a negative integer value representing attenuation in dB - """ - try: - return round((value / 100) * 90) - 90 - except ValueError: - return -90 - @property def attenuation(self): """Current volume attenuation in dB (read/write). @@ -634,18 +710,11 @@ def attenuation(self): >>> attvalue = attenuation >>> attenuation = -50 """ - try: - return int(self._Z1VOL) - except ValueError: - return -90 - except NameError: - return -90 + self.zones[1].attenuation @attenuation.setter def attenuation(self, value): - if isinstance(value, int) and -90 <= value <= 0: - self.log.debug("Setting attenuation to %s", str(value)) - self.command("Z1VOL" + str(value)) + self.zones[1].attenuation = value @property def volume(self): @@ -659,18 +728,11 @@ def volume(self): >>> volvalue = volume >>> volume = 20 """ - if self._model_series == "x40" and self._Z1PVOL: - return int(self._Z1PVOL) - else: - return self.attenuation_to_volume(self.attenuation) + return self.zones[1].volume @volume.setter def volume(self, value): - if isinstance(value, int) and 0 <= value <= 100: - if self._model_series == "x40": - self.command(f"Z1PVOL{value}") - else: - self.attenuation = self.volume_to_attenuation(value) + self.zones[1].volume = value @property def volume_as_percentage(self): @@ -684,15 +746,11 @@ def volume_as_percentage(self): >>> volper = volume_as_percentage >>> volume_as_percentage = 0.20 """ - volume_per = self.volume / 100 - return volume_per + return self.zones[1].volume_as_percentage @volume_as_percentage.setter def volume_as_percentage(self, value): - if isinstance(value, float) or isinstance(value, int): - if 0 <= value <= 1: - value = round(value * 100) - self.volume = value + self.zones[1].volume_as_percentage = value # # Internal assistant functions for unified handling of boolean @@ -725,12 +783,11 @@ def power(self): Returns and expects a boolean value. """ - return self._get_boolean("Z1POW") + return self.zones[1].power @power.setter def power(self, value): - self._set_boolean("Z1POW", value) - self.query("Z1POW") + self.zones[1].power = value @property def txstatus(self): @@ -783,14 +840,11 @@ def arc(self, value): @property def mute(self): """Mute on or off (read/write).""" - return self._get_boolean("Z1MUT") + return self.zones[1].mute @mute.setter def mute(self, value): - self._set_boolean("Z1MUT", value) - # Query mute because the AVR doesn't always return back the state - # (eg: after power on without changing the volume first) - self.query("Z1MUT") + self.zones[1].mute = value # # Read-only text properties @@ -824,7 +878,7 @@ def hwversion(self): @property def macaddress(self): """Network MCU MAC address (read-only).""" - return self._IDN or self._EMAC or self._WMAC or EMPTY_MAC + return self._IDN or self._EMAC or self._WMAC or self._MAC or EMPTY_MAC @property def audio_input_name(self): @@ -1058,14 +1112,11 @@ def input_name(self, value): @property def input_number(self): """Number of currently active input (read-write).""" - return self._get_integer("Z1INP") + return self.zones[1].input_number @input_number.setter def input_number(self, number): - if isinstance(number, int): - if 1 <= number <= 99: - self.log.debug("Switching input to %s", str(number)) - self.command(f"Z1INP{number}") + self.zones[1].input_number = number # # Miscellany @@ -1082,3 +1133,210 @@ def dump_rawdata(self): def test_string(self): """I really do.""" return "I like cows" + + +class Zone: + """Control of a specific Zone of the amplifier""" + + def __init__(self, avr: AVR, zone: int) -> None: + self._zone = zone + self._avr = avr + self.need_refresh = True + self.values = {} + + def command(self, command: str) -> None: + self._avr.command(f"Z{self._zone}{command}") + + def query(self, command: str) -> None: + self._avr.query(f"Z{self._zone}{command}") + + def _get_integer(self, key, default: int = 0): + if key not in self.values: + return default + try: + return int(self.values[key]) + except ValueError: + return 0 + + def _get_boolean(self, key): + if key not in self.values: + return False + try: + return bool(int(self.values[key])) + except ValueError: + return False + except AttributeError: + return False + + def _set_boolean(self, key, value): + if value is True: + self.command(key + "1") + else: + self.command(key + "0") + + # + # Volume and Attenuation handlers. The Anthem tracks volume internally as + # an attenuation level ranging from -90dB (silent) to 0dB (bleeding ears) + # + # We expose this in three methods for the convenience of downstream apps + # which will almost certainly be doing things their own way: + # + # - attenuation (-90 to 0) + # - volume (0-100) + # - volume_as_percentage (0-1 floating point) + # + + def attenuation_to_volume(self, value): + """Convert a native attenuation value to a volume value. + + Takes an attenuation in dB from the Anthem (-90 to 0) and converts it + into a normal volume value (0-100). + + :param arg1: attenuation in dB (negative integer from -90 to 0) + :type arg1: int + + returns an integer value representing volume + """ + try: + return round((90.00 + int(value)) / 90 * 100) + except ValueError: + return 0 + + def volume_to_attenuation(self, value): + """Convert a volume value to a native attenuation value. + + Takes a volume value and turns it into an attenuation value suitable + to send to the Anthem AVR. + + :param arg1: volume (integer from 0 to 100) + :type arg1: int + + returns a negative integer value representing attenuation in dB + """ + try: + return round((value / 100) * 90) - 90 + except ValueError: + return -90 + + @property + def power(self): + """Report if device powered on or off (read/write). + + Returns and expects a boolean value. + """ + return self._get_boolean("POW") + + @power.setter + def power(self, value): + self._set_boolean("POW", value) + self.query("POW") + + @property + def volume(self): + """Current volume level (read/write). + + You can get or set the current volume value on the device with this + property. Valid range from 0 to 100. + + :Examples: + + >>> volvalue = volume + >>> volume = 20 + """ + if self._avr._model_series == "x40" and "PVOL" in self.values: + return self._get_integer("PVOL") + elif self._avr._model_series == "mdx": + return self._get_integer("VOL") + else: + return self.attenuation_to_volume(self.attenuation) + + @volume.setter + def volume(self, value): + if isinstance(value, int) and 0 <= value <= 100: + if self._avr._model_series == "x40": + self.command(f"PVOL{value}") + elif self._avr._model_series == "mdx": + self.command(f"VOL{value}") + else: + self.attenuation = self.volume_to_attenuation(value) + + @property + def volume_as_percentage(self): + """Current volume as percentage (read/write). + + You can get or set the current volume value as a percentage. Valid + range from 0 to 1 (float). + + :Examples: + + >>> volper = volume_as_percentage + >>> volume_as_percentage = 0.20 + """ + volume_per = self.volume / 100 + return volume_per + + @volume_as_percentage.setter + def volume_as_percentage(self, value): + if isinstance(value, float) or isinstance(value, int): + if 0 <= value <= 1: + value = round(value * 100) + self.volume = value + + @property + def attenuation(self): + """Current volume attenuation in dB (read/write). + + You can get or set the current attenuation value on the device with this + property. Valid range from -90 to 0. + + :Examples: + + >>> attvalue = attenuation + >>> attenuation = -50 + """ + return self._get_integer("VOL", -90) + + @attenuation.setter + def attenuation(self, value): + if isinstance(value, int) and -90 <= value <= 0: + self._avr.log.debug("Setting attenuation to %s", str(value)) + self.command(f"VOL{value}") + + @property + def mute(self): + """Mute on or off (read/write).""" + return self._get_boolean("MUT") + + @mute.setter + def mute(self, value): + self._set_boolean("MUT", value) + # Query mute because the AVR doesn't always return back the state + # (eg: after power on without changing the volume first) + self.query("MUT") + + @property + def input_number(self): + """Number of currently active input (read-write).""" + return self._get_integer("INP") + + @input_number.setter + def input_number(self, number): + if isinstance(number, int): + if 1 <= number <= 99: + self._avr.log.debug( + f"Switching input to {number} for zone {self._zone}" + ) + self.command(f"INP{number}") + # Query to make sure it actually changes + self.query("INP") + + @property + def input_name(self): + """Name of currently active input (read-write).""" + return self._avr._input_names.get(self.input_number, "Unknown") + + @input_name.setter + def input_name(self, value): + number = self._avr._input_numbers.get(value, 0) + if number > 0: + self.input_number = number diff --git a/anthemav/tools.py b/anthemav/tools.py index 3da83ef..24d085d 100644 --- a/anthemav/tools.py +++ b/anthemav/tools.py @@ -49,12 +49,11 @@ def log_callback(message): log.info("Power state is " + str(conn.protocol.power)) conn.protocol.power = True + await asyncio.sleep(5) log.info("Power state is " + str(conn.protocol.power)) - - await asyncio.sleep(10, loop=loop) - - log.info("Panel brightness (raw) is " + str(conn.protocol.panel_brightness)) - log.info("Panel brightness (text) is " + str(conn.protocol.panel_brightness_text)) + log.info("Model is %s", conn.protocol.model) + log.info("Number of zone is %s", len(conn.protocol.zones)) + log.info("Volume is " + str(conn.protocol.zones[1].volume)) def monitor(): diff --git a/example.py b/example.py index d31ed25..9d59b66 100755 --- a/example.py +++ b/example.py @@ -39,10 +39,11 @@ def log_callback(message): conn.protocol.power = True log.info("Power state is " + str(conn.protocol.power)) - await asyncio.sleep(2, loop=loop) + await asyncio.sleep(3) - log.info("Panel brightness (raw) is " + str(conn.protocol.panel_brightness)) - log.info("Panel brightness (text) is " + str(conn.protocol.panel_brightness_text)) + log.info("Model is %s", conn.protocol.model) + log.info("Number of zone is %s", len(conn.protocol.zones)) + log.info("Volume is " + str(conn.protocol.zones[1].volume)) log.info( "Video resolution (text) is " + str(conn.protocol.video_input_resolution_text) diff --git a/setup.py b/setup.py index c1cd1ca..054127a 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ def readme(): setup( name="anthemav", - version="1.3.2", + version="1.4.0", author="David McNett", author_email="nugget@macnugget.org", url="https://github.com/nugget/python-anthemav", diff --git a/tests/test_protocol.py b/tests/test_protocol.py index a7993cd..d1f97ce 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -1,83 +1,212 @@ +import pytest from anthemav.protocol import LOOKUP, ALM_NUMBER_x20 from anthemav import AVR -from unittest.mock import patch - - -def test_default_alm_list(): - avr = AVR() - avr.set_model_command("MRX 1120") - almList = avr.audio_listening_mode_list - assert almList is not None - assert len(almList) == 15 - - -def test_restricted_alm_list(): - avr = AVR() - avr.set_model_command("MRX 520") - avr._IDM = "MRX 520" - almList = avr.audio_listening_mode_list - assert almList is not None - assert len(almList) == 8 - - -def test_set_alm_text(): - avr = AVR() - avr.set_model_command("MRX 520") - avr._IDM = "MRX 520" - with patch.object(avr, "command") as mock: - avr.audio_listening_mode_text = "Neo Music" - mock.assert_called_once_with("Z1ALM06") - - -def test_all_alm_matchnumber(): - for alm in list(LOOKUP["Z1ALM"].values())[1:]: - assert alm in ALM_NUMBER_x20 - - -def test_power_on_force_refresh(): - avr = AVR() - with patch.object(avr, "query") as mock: - with patch.object(avr, "_loop"): - avr._parse_message("Z1INP01;") +from unittest.mock import call, patch + + +@pytest.mark.asyncio +class TestProtocol: + def test_default_alm_list(self): + avr = AVR() + avr.set_model_command("MRX 1120") + almList = avr.audio_listening_mode_list + assert almList is not None + assert len(almList) == 15 + + async def test_restricted_alm_list(self): + avr = AVR() + await avr._parse_message("IDMMRX 520") + almList = avr.audio_listening_mode_list + assert almList is not None + assert len(almList) == 8 + + def test_set_alm_text(self): + avr = AVR() + avr.set_model_command("MRX 520") + with patch.object(avr, "command") as mock: + avr.audio_listening_mode_text = "Neo Music" + mock.assert_called_once_with("Z1ALM06") + + def test_all_alm_matchnumber(self): + for alm in list(LOOKUP["Z1ALM"].values())[1:]: + assert alm in ALM_NUMBER_x20 + + async def test_power_on_force_refresh(self): + avr = AVR() + with patch.object(avr, "query") as mock, patch.object(avr, "_loop"): + await avr._parse_message("Z1AIC;") mock.assert_called_once_with("Z1POW") - -def test_mute_force_refresh(): - avr = AVR() - with patch.object(avr, "query") as mock: - avr.mute = True - mock.assert_called_once_with("Z1MUT") - - -def test_populate_input(): - avr = AVR() - with patch.object(avr, "query") as mock: - avr._populate_inputs(2) - mock.assert_any_call("ISN01") - mock.assert_called_with("ISN02") - - -def test_populate_input_x40(): - avr = AVR() - with patch.object(avr, "query") as mock: - avr.set_model_command("MRX 1140") - avr._populate_inputs(2) - mock.assert_any_call("IS1IN") - mock.assert_called_with("IS2IN") - - -def test_parse_input_x40(): - avr = AVR() - with patch.object(avr, "query"): - avr.set_model_command("MRX 1140") - avr._parse_message("IS3INName") - assert avr._input_names.get(3, "") == "Name" - - -def test_parse_message_x40(): - avr = AVR() - with patch.object(avr, "query"): - avr.set_model_command("MRX 1140") - avr._Z1POW = "1" - avr._parse_message("IS1ARC1") - assert avr._IS1ARC == "1" + def test_mute_force_refresh(self): + avr = AVR() + with patch.object(avr, "query") as mock: + avr.mute = True + mock.assert_called_once_with("Z1MUT") + + def test_populate_input_x20(self): + avr = AVR() + with patch.object(avr, "query") as mock: + avr._populate_inputs(2) + mock.assert_any_call("ISN01") + mock.assert_called_with("ISN02") + + def test_populate_input_x40(self): + avr = AVR() + with patch.object(avr, "query") as mock: + avr.set_model_command("MRX 1140") + avr._populate_inputs(2) + mock.assert_any_call("IS1IN") + mock.assert_called_with("IS2IN") + + async def test_parse_input_x40(self): + avr = AVR() + with patch.object(avr, "query"): + await avr._parse_message("IS3INName") + assert avr._input_names.get(3, "") == "Name" + + async def test_parse_message_x40(self): + avr = AVR() + with patch.object(avr, "query"): + await avr._parse_message("IS1ARC1") + assert avr._IS1ARC == "1" + + async def test_zone_created_x20(self): + avr = AVR() + with patch.object(avr, "query"): + await avr._parse_message("IDMMRX 520") + assert len(avr.zones) == 2 + + async def test_zone_created_x40(self): + avr = AVR() + with patch.object(avr, "query"): + await avr._parse_message("IDMMRX 1140") + assert len(avr.zones) == 2 + + async def test_zone_created_MDX8(self): + avr = AVR() + with patch.object(avr, "query"): + await avr._parse_message("IDMMDX 8") + assert len(avr.zones) == 4 + + async def test_zone_created_MDX16(self): + avr = AVR() + with patch.object(avr, "query"): + await avr._parse_message("IDMMDX 16") + assert len(avr.zones) == 8 + + async def test_power_refreshed_MDX16(self): + avr = AVR() + with patch.object(avr, "query") as mock: + await avr._parse_message("IDMMDX 16") + for zone in range(1, 9): + mock.assert_any_call(f"Z{zone}POW") + assert call("Z9POW") not in mock.mock_calls + + async def test_pvol_x40(self): + avr = AVR() + with patch.object(avr, "query"): + await avr._parse_message("IDMMRX 740") + await avr._parse_message("Z2PVOL51") + assert avr.zones[2].volume == 51 + + async def test_zone2_power(self): + avr = AVR() + with patch.object(avr, "refresh_zone") as refreshmock, patch.object( + avr, "_loop" + ): + await avr._parse_message("IDMMRX 740") + assert avr.zones[2].power is False + await avr._parse_message("Z2POW1") + refreshmock.assert_called_with(2) + assert avr.zones[2].power is True + assert avr._device_power is True + + async def test_attenuation(self): + avr = AVR() + avr._device_power = True + with patch.object(avr, "query"): + await avr._parse_message("IDMMRX 740") + assert avr.zones[1].attenuation == -90 + await avr._parse_message("Z1VOL-42") + assert avr.zones[1].attenuation == -42 + + async def test_volume_x20(self): + avr = AVR() + avr._device_power = True + with patch.object(avr, "query"): + await avr._parse_message("IDMMRX 1120") + assert avr.zones[1].volume == 0 + await avr._parse_message("Z1VOL-42") + assert avr.zones[1].volume == 53 + + async def test_zone_set_volume_as_percentage_x20(self): + avr = AVR() + avr._device_power = True + with patch.object(avr, "command") as mock: + avr._model_series = "x20" + assert avr.zones[1].volume_as_percentage == 0 + avr.zones[1].volume_as_percentage = 0.53 + mock.assert_any_call("Z1VOL-42") + + async def test_set_volume_as_percentage_x20(self): + avr = AVR() + avr._device_power = True + with patch.object(avr, "command") as mock: + avr._model_series = "x20" + assert avr.volume_as_percentage == 0 + avr.volume_as_percentage = 0.53 + mock.assert_any_call("Z1VOL-42") + + async def test_set_volume_as_percentage_x40(self): + avr = AVR() + avr._device_power = True + with patch.object(avr, "command") as mock: + avr._model_series = "x40" + avr.volume_as_percentage = 0.53 + mock.assert_any_call("Z1PVOL53") + + async def test_set_volume_as_percentage_mdx(self): + avr = AVR() + avr._device_power = True + with patch.object(avr, "command") as mock: + avr._model_series = "mdx" + avr.volume_as_percentage = 0.53 + mock.assert_any_call("Z1VOL53") + + async def test_refresh_zone_x40(self): + avr = AVR() + with patch.object(avr, "query") as mock, patch.object(avr, "transport"): + await avr._parse_message("IDMMRX 740") + await avr.refresh_zone(2) + mock.assert_any_call("Z2POW") + mock.assert_any_call("Z2INP") + mock.assert_any_call("Z2PVOL") + mock.assert_any_call("Z2VOL") + mock.assert_any_call("Z2MUT") + + async def test_refresh_zone_x20(self): + avr = AVR() + with patch.object(avr, "query") as mock, patch.object(avr, "transport"): + await avr._parse_message("IDMMRX 720") + await avr.refresh_zone(2) + mock.assert_any_call("Z2POW") + mock.assert_any_call("Z2INP") + mock.assert_any_call("Z2VOL") + mock.assert_any_call("Z2MUT") + assert call("Z2PVOL") not in mock.mock_calls + + async def test_device_power_off(self): + avr = AVR() + with patch.object(avr, "refresh_zone") as refreshmock, patch.object( + avr, "_loop" + ): + await avr._parse_message("IDMMRX 740") + await avr._parse_message("Z2POW1") + refreshmock.assert_called_with(2) + assert avr._device_power is True + await avr._parse_message("Z1POW1") + assert avr._device_power is True + await avr._parse_message("Z1POW0") + assert avr._device_power is True + await avr._parse_message("Z2POW0") + assert avr._device_power is False