diff --git a/src/python_testing/TC_FAN_3_1.py b/src/python_testing/TC_FAN_3_1.py index cc1f28b788a348..730b062622f953 100644 --- a/src/python_testing/TC_FAN_3_1.py +++ b/src/python_testing/TC_FAN_3_1.py @@ -34,136 +34,217 @@ # quiet: true # === END CI TEST ARGUMENTS === +import asyncio +import enum import logging import time +from enum import Enum +from typing import Any import chip.clusters as Clusters +from chip.clusters import ClusterObjects as ClusterObjects from chip.interaction_model import Status -from chip.testing.matter_testing import MatterBaseTest, async_test_body, default_matter_test_main +from chip.testing.matter_testing import (ClusterAttributeChangeAccumulator, MatterBaseTest, TestStep, async_test_body, + default_matter_test_main) from mobly import asserts + +class OrderEnum(Enum): + Ascending = 1 + Descending = 2 + + logger = logging.getLogger(__name__) class TC_FAN_3_1(MatterBaseTest): - - async def read_fc_attribute_expect_success(self, endpoint, attribute): + def steps_TC_FAN_3_1(self): + return [TestStep("1", "Commissioning already done.")] + + @staticmethod + async def get_attribute_value_from_queue(queue, attribute, timeout_sec: float) -> Any: + start_time = time.time() + while time.time() - start_time < timeout_sec: + for q in list(queue): + if q.attribute == attribute: + return q.value + await asyncio.sleep(0.01) + return None + + @staticmethod + def get_enum(value): + if isinstance(value, enum.Enum): + enum_type = type(value) + value = enum_type(value) + return value + + @staticmethod + def _supports_speed(feature_map) -> bool: + return feature_map & Clusters.FanControl.Bitmaps.Feature.kMultiSpeed + + async def read_setting(self, endpoint, attribute): cluster = Clusters.Objects.FanControl return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) - async def read_fan_mode(self, endpoint): - return await self.read_fc_attribute_expect_success(endpoint, Clusters.FanControl.Attributes.FanMode) - - async def read_percent_setting(self, endpoint): - return await self.read_fc_attribute_expect_success(endpoint, Clusters.FanControl.Attributes.PercentSetting) - - async def read_percent_current(self, endpoint): - return await self.read_fc_attribute_expect_success(endpoint, Clusters.FanControl.Attributes.PercentCurrent) - - async def write_fan_mode(self, endpoint, fan_mode) -> Status: - result = await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, Clusters.FanControl.Attributes.FanMode(fan_mode))]) - return result[0].Status - - async def write_percent_setting(self, endpoint, percent_setting) -> Status: - result = await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, Clusters.FanControl.Attributes.PercentSetting(percent_setting))]) - return result[0].Status + async def write_setting(self, endpoint, attribute, value) -> Status: + value = self.get_enum(value) + result = await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, attribute(value))]) + write_status = result[0].Status + write_status_success = (write_status == Status.Success) or (write_status == Status.InvalidInState) + asserts.assert_true(write_status_success, + f"{attribute.__name__} write did not return a result of either SUCCESS or INVALID_IN_STATE ({write_status.name})") + return write_status + + async def write_and_verify_attribute(self, endpoint, attribute, value) -> Any: + await self.write_setting(endpoint, attribute, value) + value_read = await self.read_setting(endpoint, attribute) + asserts.assert_equal(value_read, value, + f"Mismatch between written and read {attribute.__name__}") + return value_read + + def verify_attribute_transition(self, attribute, value_current, value_previous, status, order) -> None: + if status == Status.Success: + if order == OrderEnum.Ascending: + # Verify the current attribute value is greater than the previous attribute value + asserts.assert_greater(value_current, value_previous, + f"Current {attribute.__name__} must be greater than previous {attribute.__name__}") + else: + # Verify the current attribute value is less than the previous attribute value + asserts.assert_less(value_current, value_previous, + f"Current {attribute.__name__} must be less than previous {attribute.__name__}") + elif status == Status.InvalidInState: + # Verify the current attribute value is the same than the previous attribute value + asserts.assert_equal(value_current, value_previous, + f"Current {attribute.__name__} must be equal to previous {attribute.__name__}") + + # Logging attribute value change details + if_fan_mode = f"({value_previous}) to ({value_current})" + if attribute == Clusters.FanControl.Attributes.FanMode: + if_fan_mode = f"({value_previous}:{value_previous.name}) to ({value_current}:{value_current.name})" + logging.info(f"\t\t[FC] {attribute.__name__} changed from {if_fan_mode}") + + async def verify_fan_control_attribute_values(self, endpoint, attr_to_write, order) -> None: + # Setup + cluster = Clusters.FanControl + fan_mode_attr = cluster.Attributes.FanMode + percent_setting_attr = cluster.Attributes.PercentSetting + speed_setting_attr = cluster.Attributes.SpeedSetting + speed_max_attr = cluster.Attributes.SpeedMax + attribute_subscription = ClusterAttributeChangeAccumulator(cluster) + fan_mode_off = cluster.Enums.FanModeEnum.kOff + fan_mode_high = cluster.Enums.FanModeEnum.kHigh + percent_setting_max_value = 100 # PercentSetting max value is 100 as per the spec + timeout_sec = 0.1 # Timeout given for item retreival from the attribute queue + + # Sets the 'for loop' range based on the attribute that will be written to + # and the specified order (ascending or descending) + # Sets which mandatory attribute is to be verified (FanMode or PercentSetting) + speed_max = await self.read_setting(endpoint, speed_max_attr) + speed_setting_init = 0 if order == OrderEnum.Ascending else speed_max + range_loop = range(1, percent_setting_max_value + + 1) if order == OrderEnum.Ascending else range(percent_setting_max_value -1, -1, -1) + value_init = fan_mode_off if order == OrderEnum.Ascending else fan_mode_high + attr_to_verify = fan_mode_attr + if attr_to_write == fan_mode_attr: + range_loop = range(1, fan_mode_high.value + + 1) if order == OrderEnum.Ascending else range(fan_mode_high.value -1, -1, -1) + value_init = 0 if order == OrderEnum.Ascending else percent_setting_max_value + attr_to_verify = percent_setting_attr + + # Logging what's being tested + if_speed_supported = " and SpeedSetting" if self.supports_speed else "" + logging.info( + f"[FC] Updating {attr_to_write.__name__} {order.name}, verifying {attr_to_verify.__name__}{if_speed_supported}") + + # Write and read back the initialization value for the attribute to verify + # Verify that the write status is either SUCCESS or INVALID_IN_STATE + # Verify written and read value match + attr_value_read = await self.write_and_verify_attribute(endpoint, attr_to_verify, value_init) + + # Write and read back the initialization value for SpeedSetting (if supported) + # Verify that the write status is either SUCCESS or INVALID_IN_STATE + # Verify written and read value match + if self.supports_speed: + speed_setting_read = await self.write_and_verify_attribute(endpoint, speed_setting_attr, speed_setting_init) + + # Start subscription + await attribute_subscription.start(self.default_controller, self.dut_node_id, endpoint) + + # Logging FanMode, PercentSetting, and SpeedSetting initialization values + fm = cluster.Enums.FanModeEnum(await self.read_setting(endpoint, fan_mode_attr)) + ps = await self.read_setting(endpoint, percent_setting_attr) + logging.info(f"[FC] Initialization values: FanMode({fm}:{fm.name}) PercentSetting({ps}) SpeedSetting({speed_setting_read})") + + # Write to attribute iteratively within a range of 100, one at a time + attr_value_current = attr_value_read + attr_value_previous = attr_value_read + speed_setting_current = speed_setting_read + speed_setting_previous = speed_setting_read + for value_to_write in range_loop: + # Clear the queue + attribute_subscription.get_last_report() + + # Write to attribute + write_status = await self.write_setting(endpoint, attr_to_write, value_to_write) + logging.info(f"[FC] {attr_to_write.__name__} written: {value_to_write}") + + # Get the subscription queue + queue = attribute_subscription.attribute_queue.queue + + # Verifying PercentSetting + attr_value_current = await self.get_attribute_value_from_queue(queue, attr_to_verify, timeout_sec) + if attr_value_current is not None: + self.verify_attribute_transition(attr_to_verify, attr_value_current, attr_value_previous, write_status, order) + attr_value_previous = attr_value_current + + # Verifying SpeedSetting (if supported) + if self.supports_speed: + speed_setting_current = await self.get_attribute_value_from_queue(queue, speed_setting_attr, timeout_sec) + if speed_setting_current is not None: + self.verify_attribute_transition(speed_setting_attr, speed_setting_current, + speed_setting_previous, write_status, order) + speed_setting_previous = speed_setting_current + + await attribute_subscription.cancel() def pics_TC_FAN_3_1(self) -> list[str]: return ["FAN.S"] @async_test_body async def test_TC_FAN_3_1(self): - endpoint = self.get_endpoint(default=1) - - self.print_step(1, "Commissioning, already done") - - self.print_step("2a", "Read from the DUT the FanMode attribute and store") - existing_fan_mode = await self.read_fan_mode(endpoint=endpoint) - - self.print_step("2b", "Write High to FanMode") - status = await self.write_fan_mode(endpoint=endpoint, fan_mode=Clusters.FanControl.Enums.FanModeEnum.kHigh) - status_ok = (status == Status.Success) or (status == Status.InvalidInState) - asserts.assert_true(status_ok, "FanMode write did not return a value of Success or InvalidInState") - - self.print_step("2c", "After a few seconds, read from the DUT the FanMode attribute") - time.sleep(3) - - new_fan_mode = await self.read_fan_mode(endpoint=endpoint) - - if status == Status.Success: - asserts.assert_equal(new_fan_mode, Clusters.FanControl.Enums.FanModeEnum.kHigh, "FanMode is not High") - else: - asserts.assert_equal(new_fan_mode, existing_fan_mode, "FanMode is not unchanged") - - self.print_step("3a", "Read from the DUT the PercentSetting attribute and store") - existing_percent_setting = await self.read_percent_setting(endpoint=endpoint) - - self.print_step("3b", "Write Off to Fan Mode") - status = await self.write_fan_mode(endpoint=endpoint, fan_mode=Clusters.FanControl.Enums.FanModeEnum.kOff) - status_ok = (status == Status.Success) or (status == Status.InvalidInState) - asserts.assert_true(status_ok, "FanMode write did not return a value of Success or InvalidInState") - - self.print_step("3c", "After a few seconds, read from the DUT the PercentSetting attribute") - time.sleep(3) - - new_percent_setting = await self.read_percent_setting(endpoint=endpoint) - - if status == Status.Success: - asserts.assert_equal(new_percent_setting, Clusters.FanControl.Enums.FanModeEnum.kOff, "PercentSetting is not Off") - else: - asserts.assert_equal(new_percent_setting, existing_percent_setting, "PercentSetting is not unchanged") - - self.print_step("3d", "Read from the DUT the PercentCurrent attribute") - percent_current = await self.read_percent_current(endpoint=endpoint) - - if status == Status.Success: - asserts.assert_equal(percent_current, 0, "PercentCurrent is not 0") - else: - asserts.assert_equal(percent_current, existing_percent_setting, "PercentCurrent is not unchanged") - - self.print_step("4a", "Read from the DUT the PercentSetting attribute and store") - existing_percent_setting = await self.read_percent_setting(endpoint=endpoint) - - self.print_step("4b", "Write PercentSetting to 30") - status = await self.write_percent_setting(endpoint=endpoint, percent_setting=30) - status_ok = (status == Status.Success) or (status == Status.InvalidInState) - asserts.assert_true(status_ok, "PercentSetting write did not return a value of Success or InvalidInState") - - self.print_step("4c", "After a few seconds, read from the DUT the PercentSetting attribute") - time.sleep(3) - - new_percent_setting = await self.read_percent_setting(endpoint=endpoint) - - if status == Status.Success: - asserts.assert_equal(new_percent_setting, 30, "PercentSetting is not 30") - else: - asserts.assert_equal(new_percent_setting, existing_percent_setting, "PercentSetting is not unchanged") - - self.print_step("4d", "Read from the DUT the PercentCurrent attribute") - percent_current = await self.read_percent_current(endpoint=endpoint) - - if status == Status.Success: - asserts.assert_equal(percent_current, 30, "PercentCurrent is not 30") - else: - asserts.assert_equal(percent_current, existing_percent_setting, "PercentCurrent is not unchanged") - - self.print_step("5a", "Read from the DUT the FanMode attribute and store") - existing_fan_mode = await self.read_fan_mode(endpoint=endpoint) - - self.print_step("5b", "Write PercentSetting to 0") - status = await self.write_percent_setting(endpoint=endpoint, percent_setting=0) - status_ok = (status == Status.Success) or (status == Status.InvalidInState) - asserts.assert_true(status_ok, "PercentSetting write did not return a value of Success or InvalidInState") - - self.print_step("5c", "After a few seconds, read from the DUT the FanMode attribute") - time.sleep(3) - - new_fan_mode = await self.read_fan_mode(endpoint=endpoint) - - if status == Status.Success: - asserts.assert_equal(new_fan_mode, Clusters.FanControl.Enums.FanModeEnum.kOff, "FanMode is not Off") - else: - asserts.assert_equal(new_fan_mode, existing_fan_mode, "FanMode is not unchanged") + # Setup + ep = self.get_endpoint(default=1) + percent_setting_attr = Clusters.FanControl.Attributes.PercentSetting + fan_mode_attr = Clusters.FanControl.Attributes.FanMode + feature_map_attr = Clusters.FanControl.Attributes.FeatureMap + feature_map = await self.read_setting(endpoint=ep, attribute=feature_map_attr) + self.supports_speed = self._supports_speed(feature_map) + + self.step("1") + + # TH writes to the DUT the PercentSetting attribute iteratively within + # a range of 1 to 100 one at a time in ascending order + # Verifies that FanMode and SpeedSetting values (if supported) are being + # updated accordingly (greater or less than the previous values) + await self.verify_fan_control_attribute_values(ep, percent_setting_attr, OrderEnum.Ascending) + + # TH writes to the DUT the PercentSetting attribute iteratively within + # a range of 1 to 100 one at a time in descending order + # Verifies that FanMode and SpeedSetting values (if supported) are being + # updated accordingly (greater or less than the previous values) + await self.verify_fan_control_attribute_values(ep, percent_setting_attr, OrderEnum.Descending) + + # TH writes to the DUT the FanMode attribute iteratively within a range of + # the number of available fan modes one at a time in ascending order + # Verifies that PercentSetting and SpeedSetting values (if supported) are being + # updated accordingly (greater or less than the previous values) + await self.verify_fan_control_attribute_values(ep, fan_mode_attr, OrderEnum.Ascending) + + # TH writes to the DUT the FanMode attribute iteratively within a range of + # the number of available fan modes one at a time in descending order + # Verifies that PercentSetting and SpeedSetting values (if supported) are being + # updated accordingly (greater or less than the previous values) + await self.verify_fan_control_attribute_values(ep, fan_mode_attr, OrderEnum.Descending) if __name__ == "__main__":