Skip to content

Commit

Permalink
Add support for ARC and some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Hyralex committed Aug 5, 2022
1 parent e694f43 commit f47f174
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 72 deletions.
3 changes: 2 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ ignore =
W503,
E203,
D202,
W504
W504,
D401
noqa-require-code = True
2 changes: 1 addition & 1 deletion anthemav/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _increase_retry_interval(self):
self._retry_interval = min(300, 1.5 * self._retry_interval)

async def reconnect(self):
"""Connect to the host and keep the connection open"""
"""Connect to the host and keep the connection open."""
while True:
try:
if self._halted:
Expand Down
42 changes: 42 additions & 0 deletions anthemav/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Module containing the parser for Anthem command."""


class ParsedMessage:
"""Class containing parsed message information."""

command: str
value: str
input_number: int | None


def parse_message(message: str) -> ParsedMessage:
"""Try to parse a message to a ParsedMessage object."""
return parse_x40_message(message)


def parse_x40_message(message: str) -> ParsedMessage:
"""Try to parse a message for the x40 models."""
return parse_x40_input_message(message, "ARC")


def parse_x40_input_message(message: str, command: str) -> ParsedMessage:
"""Try to parse a message associated to a specific input for the x40 models."""
if (
message.startswith("IS")
and command in message
and len(message) >= len(command) + 4
):
parsed_message = ParsedMessage()
command_position = message.index(command)
parsed_message.command = message[0 : command_position + len(command)]
parsed_message.input_number = int(message[2:command_position])
parsed_message.value = message[command_position + len(command) :]
return parsed_message
return None


def get_x40_input_command(self, input_number: int, command: str) -> str | None:
"""Return a formatted message for a specific input."""
if input_number > 0:
return f"IS{self.input_number}{command}"
return None
188 changes: 126 additions & 62 deletions anthemav/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from typing import Awaitable, Callable, Dict

from anthemav.device_error import DeviceError
from anthemav.parser import parse_message

__all__ = ["AVR"]

# These properties apply even when the AVR is powered off
ATTR_CORE = ["IDM"]

# These properties are sent when the device is powered on
# This is used to force refresh the power state of the device is POW command isn't sent
# This is used to force refresh the power state of the device if POW command isn't sent
ATTR_POWERED_ON = ["Z1ALM", "Z1AIC", "Z1VIR"]

# Audio Listening mode
Expand Down Expand Up @@ -158,7 +159,6 @@
# MRX 540, 740, 1140
LOOKUP["WMAC"] = {"description": "Wi-Fi MAC address"}
LOOKUP["EMAC"] = {"description": "Ethernet MAC address"}
LOOKUP["IS1ARC"] = {"description": "Zone 1 ARC", "0": "Off", "1": "On"}
LOOKUP["GCFPB"] = {"description": "Front Panel Brightness"}
LOOKUP["GCTXS"] = {
"description": "Tx status",
Expand Down Expand Up @@ -243,12 +243,13 @@ def __init__(
self._alm_number = {"None": 0}
self._available_input_numbers = []
self.zones: Dict[int, Zone] = {1: Zone(self, 1)}
self.values: Dict[str, str] = {}

for key in LOOKUP:
setattr(self, f"_{key}", "")

async def wait_for_device_initialised(self, timeout: float):
"""Wait to receive the model and mac address for the device"""
"""Wait to receive the model and mac address for the device."""
try:
await asyncio.wait_for(self._deviceinfo_received.wait(), timeout)
except asyncio.TimeoutError:
Expand All @@ -260,7 +261,7 @@ async def wait_for_device_initialised(self, timeout: float):
self.log.debug("device is initialised")

def _set_device_initialised(self):
"""Indicate if the model and mac address have been received"""
"""Indicate if the model and mac address have been received."""
if self._model_series and self.macaddress != EMPTY_MAC:
self._deviceinfo_received.set()

Expand Down Expand Up @@ -328,6 +329,7 @@ async def refresh_zone(self, zone: int):
await self.query_commands(ZONELOOKUP, zone)

async def query_commands(self, commands: Dict[str, Dict[str, str]], zone: int = 0):
"""Query a list of commands."""
for key in commands:
if key not in self._ignored_commands:
if self.transport is None:
Expand Down Expand Up @@ -420,6 +422,7 @@ def _populate_inputs(self, total):
for input_number in range(1, total):
if self._model_series == MODEL_X40:
self.query(f"IS{input_number}IN")
self.query(f"IS{input_number}ARC")
else:
if (
len(self._available_input_numbers) == 0
Expand Down Expand Up @@ -514,44 +517,57 @@ async def _parse_message(self, data: str):
self.log.debug("Zone command received: %s", data)
newdata = (await self.parse_zone_command(data)) or newdata
recognized = True

if data.startswith("ICN"):
self.log.debug("ICN update received")
self._poweron_refresh_successful = True
recognized = True
self._populate_inputs(int(value))

if (
data.startswith("ISN") and len(data) > 5
): # x20 and mdx series: ISN01Turntable
recognized = True
self._poweron_refresh_successful = True

input_number = int(data[3:5])
value = data[5:]

oldname = self._input_names.get(input_number, "")

if oldname != value:
self._input_numbers[value] = input_number
self._input_names[input_number] = value
self.log.debug("New Value: Input %d is called %s", input_number, value)
newdata = True
elif (
data.startswith("IS") and "IN" in data and len(data) > 5
): # x40 series, example "IS3INTurntable"
recognized = True
self._poweron_refresh_successful = True
in_position = data.index("IN")
input_number = int(data[2:in_position])
value = data[in_position + 2 :]
oldname = self._input_names.get(input_number, "")

if oldname != value:
self._input_numbers[value] = input_number
self._input_names[input_number] = value
self.log.debug("New Value: Input %d is called %s", input_number, value)
newdata = True
elif data.startswith("ICN"):
self.log.debug("ICN update received")
self._poweron_refresh_successful = True
recognized = True
self._populate_inputs(int(value))
elif (
data.startswith("ISN") and len(data) > 5
): # parse x20 and mdx inputs: eg: ISN01Turntable
recognized = True
self._poweron_refresh_successful = True
input_number = int(data[3:5])
value = data[5:]
oldname = self._input_names.get(input_number, "")
if oldname != value:
self._input_numbers[value] = input_number
self._input_names[input_number] = value
self.log.debug(
"New Value: Input %d is called %s", input_number, value
)
newdata = True
elif (
data.startswith("IS") and "IN" in data and len(data) > 5
): # parse x40 inputs, eg: IS3INTurntable
recognized = True
self._poweron_refresh_successful = True
in_position = data.index("IN")
input_number = int(data[2:in_position])
value = data[in_position + 2 :]
oldname = self._input_names.get(input_number, "")
if oldname != value:
self._input_numbers[value] = input_number
self._input_names[input_number] = value
self.log.debug(
"New Value: Input %d is called %s", input_number, value
)
newdata = True
else:
# use parser for other commands
parsed_message = parse_message(data)
if parsed_message is not None:
recognized = True
oldvalue = self.values.get(parsed_message.command)
if parsed_message.value != oldvalue:
newdata = True
self.values[parsed_message.command] = parsed_message.value
self.log.debug(
"New value - command:%s value:%s input_number:%s",
parsed_message.command,
parsed_message.value,
parsed_message.input_number,
)

if newdata:
if self._update_callback:
Expand All @@ -563,6 +579,7 @@ async def _parse_message(self, data: str):
self.log.debug("Unrecognized response: %s", data)

async def parse_zone_command(self, data: str) -> bool:
"""Parse command specifically for zones."""
newdata = False
zone: int = int(data[1])
if zone not in self.zones:
Expand All @@ -582,28 +599,49 @@ async def parse_zone_command(self, data: str) -> bool:
if value == "1":
await self.refresh_zone(zone)
if self._device_power is False:
self.log.debug(
"Powered on device detected refresh all attributes"
)
self._device_power = True
self._poweron_refresh_successful = False
self._loop.call_later(
1,
asyncio.run_coroutine_threadsafe,
self.poweron_refresh(),
self._loop,
)
self.power_on_device()
elif value == "0" and oldvalue == "1":
if all(zone.power is False for zone in self.zones.values()):
# all zone are off, switch off device
self.log.debug("Power off device")
self._poweron_refresh_successful = False
self._device_power = False
self.power_off_device()
if newdata and zoneCommand == "INP":
self._loop.call_later(
2,
asyncio.run_coroutine_threadsafe,
self.refresh_input(),
self._loop,
)
break

return newdata

def power_off_device(self):
"""Set device as powered off."""
self.log.debug("Power off device")
self._poweron_refresh_successful = False
self._device_power = False

def power_on_device(self):
"""Set device as powered on."""
self.log.debug("Powered on device detected refresh all attributes")
self._device_power = True
self._poweron_refresh_successful = False
self._loop.call_later(
1,
asyncio.run_coroutine_threadsafe,
self.poweron_refresh(),
self._loop,
)

async def refresh_input(self):
"""Refresh specific input commands."""
if self._model_series == MODEL_X20:
self.query("Z1ARC")
elif self._model_series == MODEL_X40:
self.query(f"IS{self.zones[1].input_number}ARC")

async def force_refresh_power(self, command):
"""Force refresh of poweron when receiving commands."""
if (
self._force_refresh is False
and self._device_power is False
Expand Down Expand Up @@ -796,6 +834,13 @@ def _get_boolean(self, key):
except AttributeError:
return False

def _convert_to_boolean(self, value: str) -> bool | None:
if value == "1":
return True
elif value == "0":
return False
return None

def _set_boolean(self, key, value):
if value is True:
self.command(key + "1")
Expand Down Expand Up @@ -858,13 +903,22 @@ def standby_control(self, value):
self._set_boolean("SIP", value)

@property
def arc(self):
def arc(self) -> bool | None:
"""Current ARC (Anthem Room Correction) on or off (read/write)."""
return self._get_boolean("Z1ARC")
if self._model_series == MODEL_X40:
return self._convert_to_boolean(
self.zones[1].get_current_input_value("ARC")
)
elif self._model_series == MODEL_X20:
return self._get_boolean("Z1ARC")
return None

@arc.setter
def arc(self, value):
self._set_boolean("Z1ARC", value)
if self._model_series == MODEL_X40:
self._set_boolean(f"IS{self.zones[1].input_number}ARC", value)
elif self._model_series == MODEL_X20:
self._set_boolean("Z1ARC", value)

#
# Read-only text properties
Expand Down Expand Up @@ -1003,7 +1057,7 @@ def panel_brightness(self, number):

@property
def audio_listening_mode_list(self):
"""List of available listening mode"""
"""List of available listening mode."""
if any(m in self.model for m in ALM_RESTRICTED_MODEL):
return [LOOKUP["Z1ALM"][s] for s in ALM_RESTRICTED]
return list(self._alm_number.keys())
Expand Down Expand Up @@ -1136,7 +1190,7 @@ def test_string(self):


class Zone:
"""Control of a specific Zone of the amplifier"""
"""Control of a specific Zone of the amplifier."""

def __init__(self, avr: AVR, zone: int) -> None:
self._zone = zone
Expand Down Expand Up @@ -1174,6 +1228,11 @@ def _set_boolean(self, key: str, value: bool):
else:
self.command(key + "0")

def get_current_input_value(self, command: str) -> str | None:
if self.input_number > 0 and self._avr._model_series == MODEL_X40:
return self._avr.values.get(f"IS{self.input_number}{command}")
return None

@property
def support_audio_listening_mode(self) -> bool:
"""Return true if the zone support audio listening mode."""
Expand All @@ -1189,6 +1248,11 @@ def support_profile(self) -> bool:
"""Return true if the zone support sound mode and sound mode list."""
return self._zone == 1 and self._avr._model_series != MODEL_MDX

@property
def support_arc(self) -> bool:
"""Return true if the zone support Anthem room correction."""
return self._zone == 1 and self._avr._model_series != MODEL_MDX

#
# Volume and Attenuation handlers. The Anthem tracks volume internally as
# an attenuation level ranging from -90dB (silent) to 0dB (bleeding ears)
Expand Down Expand Up @@ -1354,7 +1418,7 @@ def input_name(self, value: str):

@property
def input_format(self) -> str:
"""Input video and audio format for the current zone if available (usually only zone 1)"""
"""Input video and audio format for the current zone if available (usually only zone 1)."""
if self._zone == 1 and self._avr._model_series != MODEL_MDX:
return (
f"{self._avr.video_input_resolution_text} {self._avr.audio_input_name}"
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def readme():

setup(
name="anthemav",
version="1.4.1",
version="1.4.2",
author="David McNett",
author_email="[email protected]",
url="https://github.com/nugget/python-anthemav",
Expand Down
Loading

0 comments on commit f47f174

Please sign in to comment.