From 70776a2ac6ab5b9524a4bf4523fd9e41cfbd7b5c Mon Sep 17 00:00:00 2001 From: Paymaun Date: Fri, 16 Aug 2024 09:49:02 -0700 Subject: [PATCH 1/2] fix: casing issue in target name (#307) --- azext_edge/constants.py | 2 +- azext_edge/edge/providers/orchestration/work.py | 3 ++- azext_edge/tests/edge/init/test_work_unit.py | 7 +++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/azext_edge/constants.py b/azext_edge/constants.py index c0b96f90f..2d00d07f2 100644 --- a/azext_edge/constants.py +++ b/azext_edge/constants.py @@ -7,7 +7,7 @@ import os -VERSION = "0.6.0b2" +VERSION = "0.6.0b3" EXTENSION_NAME = "azure-iot-ops" EXTENSION_ROOT = os.path.dirname(os.path.abspath(__file__)) USER_AGENT = "IotOperationsCliExtension/{}".format(VERSION) diff --git a/azext_edge/edge/providers/orchestration/work.py b/azext_edge/edge/providers/orchestration/work.py index b1170801b..69cb51c32 100644 --- a/azext_edge/edge/providers/orchestration/work.py +++ b/azext_edge/edge/providers/orchestration/work.py @@ -642,8 +642,9 @@ def build_template(self, work_kpis: dict) -> Tuple[TemplateVer, dict]: # Covers cluster_namespace template.content["variables"]["AIO_CLUSTER_RELEASE_NAMESPACE"] = self._kwargs["cluster_namespace"] - # TODO @digimaun + # TODO @digimaun, this section will be deleted soon safe_cluster_name = self._cluster_name.replace("_", "-") + safe_cluster_name = safe_cluster_name.lower() template.content["variables"]["OBSERVABILITY"]["targetName"] = f"{safe_cluster_name}-observability" tls_map = work_kpis.get("tls", {}) diff --git a/azext_edge/tests/edge/init/test_work_unit.py b/azext_edge/tests/edge/init/test_work_unit.py index 078facbe0..05aa4acb5 100644 --- a/azext_edge/tests/edge/init/test_work_unit.py +++ b/azext_edge/tests/edge/init/test_work_unit.py @@ -113,7 +113,7 @@ def mock_broker_config(): pytest.param( None, # instance_name None, # instance_description - generate_random_string(), # cluster_name + "Mixed_Cluster_Name", # cluster_name generate_random_string(), # cluster_namespace generate_random_string(), # resource_group_name generate_random_string(), # keyvault_spc_secret_name @@ -258,6 +258,7 @@ def test_init_to_template_params( expected_cluster_namespace = cluster_namespace.lower() if cluster_namespace else DEFAULT_NAMESPACE lowered_cluster_name = cluster_name.lower() + safe_cluster_name = lowered_cluster_name.replace("_", "-") assert "clusterName" in parameters assert parameters["clusterName"]["value"] == cluster_name @@ -266,7 +267,9 @@ def test_init_to_template_params( if instance_name: assert parameters["instanceName"]["value"] == instance_name else: - assert parameters["instanceName"]["value"] == f"{lowered_cluster_name}-ops-instance" + assert parameters["instanceName"]["value"] == f"{safe_cluster_name}-ops-instance" + + assert template_ver.content["variables"]["OBSERVABILITY"]["targetName"] == f"{safe_cluster_name}-observability" assert parameters["clusterLocation"]["value"] == connected_cluster_location if location: From 3e3811289c2cf84ee8942f57731c45a11c436d1d Mon Sep 17 00:00:00 2001 From: Ryan K Date: Wed, 21 Aug 2024 14:11:06 -0700 Subject: [PATCH 2/2] feat: add dataflow checks (#306) * feat: Add initial dataflow checks (#283) * feat: dataflow runtime updates (#305) * dataflow runtime check updates * Skipped status updates --------- Co-authored-by: Ryan Kelly --- azext_edge/edge/_help.py | 1 + azext_edge/edge/common.py | 4 + azext_edge/edge/params.py | 5 + .../providers/check/base/check_manager.py | 29 +- .../edge/providers/check/base/display.py | 20 +- azext_edge/edge/providers/check/base/node.py | 1 + azext_edge/edge/providers/check/common.py | 26 + azext_edge/edge/providers/check/dataflow.py | 1246 +++++++++++++++++ azext_edge/edge/providers/checks.py | 4 + .../edge/providers/orchestration/template.py | 5 +- azext_edge/edge/providers/support/dataflow.py | 3 + .../checks/base/test_check_manager_unit.py | 14 +- .../edge/checks/base/test_display_unit.py | 38 +- azext_edge/tests/edge/checks/conftest.py | 9 + azext_edge/tests/edge/checks/int/helpers.py | 12 +- .../edge/checks/int/test_dataflow_int.py | 123 ++ .../edge/checks/int/test_pre_post_int.py | 3 +- .../edge/checks/test_dataflow_checks_unit.py | 1171 ++++++++++++++++ 18 files changed, 2682 insertions(+), 32 deletions(-) create mode 100644 azext_edge/edge/providers/check/dataflow.py create mode 100644 azext_edge/tests/edge/checks/int/test_dataflow_int.py create mode 100644 azext_edge/tests/edge/checks/test_dataflow_checks_unit.py diff --git a/azext_edge/edge/_help.py b/azext_edge/edge/_help.py index b6f874b38..d51508e28 100644 --- a/azext_edge/edge/_help.py +++ b/azext_edge/edge/_help.py @@ -95,6 +95,7 @@ def load_iotops_help(): - {COMPAT_DEVICEREGISTRY_APIS.as_str()} - {COMPAT_MQTT_BROKER_APIS.as_str()} - {COMPAT_OPCUA_APIS.as_str()} + - {COMPAT_DATAFLOW_APIS.as_str()} For more information on cluster requirements, please check https://aka.ms/iot-ops-cluster-requirements diff --git a/azext_edge/edge/common.py b/azext_edge/edge/common.py index 26f2ba7a8..e47550ce0 100644 --- a/azext_edge/edge/common.py +++ b/azext_edge/edge/common.py @@ -157,6 +157,7 @@ def list_check_services(cls): cls.opcua.value, cls.akri.value, cls.deviceregistry.value, + cls.dataflow.value, ] @@ -246,6 +247,9 @@ class BundleResourceKind(Enum): METRICS_SERVICE_API_PORT = 9600 PROTOBUF_SERVICE_API_PORT = 9800 +# Dataflow constants +DEFAULT_DATAFLOW_PROFILE = "profile" + # Init Env Control INIT_NO_PREFLIGHT_ENV_KEY = "AIO_CLI_INIT_PREFLIGHT_DISABLED" diff --git a/azext_edge/edge/params.py b/azext_edge/edge/params.py index fee56b9eb..02c8e775b 100644 --- a/azext_edge/edge/params.py +++ b/azext_edge/edge/params.py @@ -15,6 +15,8 @@ ) from knack.arguments import CaseInsensitiveList +from azext_edge.edge.providers.edge_api.dataflow import DataflowResourceKinds + from ._validators import validate_namespace, validate_resource_name from .common import FileType, OpsServiceType from .providers.check.common import ResourceOutputDetailLevel @@ -164,6 +166,9 @@ def load_iotops_arguments(self, _): OpcuaResourceKinds.ASSET_TYPE.value, AkriResourceKinds.CONFIGURATION.value, AkriResourceKinds.INSTANCE.value, + DataflowResourceKinds.DATAFLOW.value, + DataflowResourceKinds.DATAFLOWENDPOINT.value, + DataflowResourceKinds.DATAFLOWPROFILE.value, ] ) ), diff --git a/azext_edge/edge/providers/check/base/check_manager.py b/azext_edge/edge/providers/check/base/check_manager.py index 4c15126a1..2c63a34c4 100644 --- a/azext_edge/edge/providers/check/base/check_manager.py +++ b/azext_edge/edge/providers/check/base/check_manager.py @@ -68,14 +68,14 @@ def __init__(self, check_name: str, check_desc: str): self.check_desc = check_desc self.targets = {} self.target_displays = {} - self.worst_status = CheckTaskStatus.success.value + self.worst_status = CheckTaskStatus.skipped.value def add_target( self, target_name: str, namespace: str = ALL_NAMESPACES_TARGET, conditions: List[str] = None, - description: str = None + description: str = None, ) -> None: # TODO: maybe make a singular taget into a class for consistent structure? if target_name not in self.targets: @@ -85,7 +85,7 @@ def add_target( self.targets[target_name][namespace] = {} self.targets[target_name][namespace]["conditions"] = conditions self.targets[target_name][namespace]["evaluations"] = [] - self.targets[target_name][namespace]["status"] = CheckTaskStatus.success.value + self.targets[target_name][namespace]["status"] = CheckTaskStatus.skipped.value if description: self.targets[target_name][namespace]["description"] = description @@ -124,20 +124,23 @@ def add_target_eval( self._process_status(target_name, status, namespace) def _process_status(self, target_name: str, status: str, namespace: str = ALL_NAMESPACES_TARGET) -> None: - existing_status = self.targets[target_name].get("status", CheckTaskStatus.success.value) - if existing_status != status: - if existing_status == CheckTaskStatus.success.value and status in [ - CheckTaskStatus.warning.value, - CheckTaskStatus.error.value, - CheckTaskStatus.skipped.value, - ]: + namespace_status = self.targets[target_name][namespace].get("status") + # success only overrides skipped status (default) + if status == CheckTaskStatus.success.value: + if namespace_status == CheckTaskStatus.skipped.value: self.targets[target_name][namespace]["status"] = status + if self.worst_status == CheckTaskStatus.skipped.value: self.worst_status = status - elif existing_status in [ - CheckTaskStatus.warning.value, CheckTaskStatus.skipped.value - ] and status in [CheckTaskStatus.error.value]: + # warning overrides any state that is not "error" + elif status == CheckTaskStatus.warning.value: + if namespace_status != CheckTaskStatus.error.value: self.targets[target_name][namespace]["status"] = status + if self.worst_status != CheckTaskStatus.error.value: self.worst_status = status + # error overrides any state + elif status == CheckTaskStatus.error.value: + self.targets[target_name][namespace]["status"] = status + self.worst_status = status def add_display(self, target_name: str, display: Any, namespace: str = ALL_NAMESPACES_TARGET) -> None: if target_name not in self.target_displays: diff --git a/azext_edge/edge/providers/check/base/display.py b/azext_edge/edge/providers/check/base/display.py index e81b74aa7..94cf7e12c 100644 --- a/azext_edge/edge/providers/check/base/display.py +++ b/azext_edge/edge/providers/check/base/display.py @@ -10,7 +10,7 @@ from typing import Any, Dict, List, Optional, Tuple from .check_manager import CheckManager -from ..common import ALL_NAMESPACES_TARGET +from ..common import ALL_NAMESPACES_TARGET, COLOR_STR_FORMAT, DEFAULT_PADDING, DEFAULT_PROPERTY_DISPLAY_COLOR from ....common import CheckTaskStatus logger = get_logger(__name__) @@ -142,3 +142,21 @@ def process_value_color( ) return f"[red]{value}[/red]" return f"[cyan]{value}[/cyan]" + + +def colorize_string(value: str, color: Optional[str] = DEFAULT_PROPERTY_DISPLAY_COLOR) -> str: + color = color or DEFAULT_PROPERTY_DISPLAY_COLOR + return COLOR_STR_FORMAT.format(value=value, color=color) + + +def basic_property_display( + label: str, + value: str, + color: Optional[str] = DEFAULT_PROPERTY_DISPLAY_COLOR, + padding: Optional[int] = DEFAULT_PADDING +) -> Padding: + padding = padding or DEFAULT_PADDING + return Padding( + f"{label}: {colorize_string(value=value, color=color)}", + (0, 0, 0, padding) + ) diff --git a/azext_edge/edge/providers/check/base/node.py b/azext_edge/edge/providers/check/base/node.py index 9ae98a8b3..703ded493 100644 --- a/azext_edge/edge/providers/check/base/node.py +++ b/azext_edge/edge/providers/check/base/node.py @@ -58,6 +58,7 @@ def check_nodes(as_list: bool = False) -> Dict[str, Any]: check_manager.add_display(target_name=target, display=target_display) return check_manager.as_dict() + check_manager.add_target_eval(target_name=target, status=CheckTaskStatus.success.value, value={"len(cluster/nodes)": len(nodes.items)}) table = _generate_node_table(check_manager, nodes) check_manager.add_display(target_name=target, display=Padding(table, padding)) diff --git a/azext_edge/edge/providers/check/common.py b/azext_edge/edge/providers/check/common.py index 98582b2d0..0860e7d9a 100644 --- a/azext_edge/edge/providers/check/common.py +++ b/azext_edge/edge/providers/check/common.py @@ -103,6 +103,30 @@ class CoreServiceResourceKinds(Enum): RUNTIME_RESOURCE = "coreServiceRuntimeResource" +# Dataflow properties +class DataflowOperationType(ListableEnum): + """ + Dataflow Profile Operation Type: + """ + + source = "source" + destination = "destination" + builtin_transformation = "builtintransformation" + + +class DataflowEndpointType(ListableEnum): + """ + Dataflow Endpoint Type: + """ + + data_explorer = "dataexplorer" + datalake = "datalakestorage" + fabric_onelake = "fabriconelake" + kafka = "kafka" + local_storage = "localstorage" + mqtt = "mqtt" + + # Akri runtime attributes AKRI_PREFIX = "aio-akri-" @@ -132,6 +156,8 @@ class CoreServiceResourceKinds(Enum): DISPLAY_BYTES_PER_GIGABYTE = 10 ** 9 # UI constants +DEFAULT_PADDING = 8 PADDING_SIZE = 4 +DEFAULT_PROPERTY_DISPLAY_COLOR = "cyan" COLOR_STR_FORMAT = "[{color}]{value}[/{color}]" diff --git a/azext_edge/edge/providers/check/dataflow.py b/azext_edge/edge/providers/check/dataflow.py new file mode 100644 index 000000000..8b3919891 --- /dev/null +++ b/azext_edge/edge/providers/check/dataflow.py @@ -0,0 +1,1246 @@ +# coding=utf-8 +# ---------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License file in the project root for license information. +# ---------------------------------------------------------------------------------------------- + +from typing import Any, Dict, List + +from knack.log import get_logger +from rich.padding import Padding + +from ...common import DEFAULT_DATAFLOW_PROFILE, CheckTaskStatus, ResourceState +from ..base import get_namespaced_pods_by_prefix +from ..edge_api.dataflow import DATAFLOW_API_V1B1, DataflowResourceKinds +from ..support.dataflow import DATAFLOW_NAME_LABEL, DATAFLOW_OPERATOR_PREFIX, DATAFLOW_PROFILE_POD_PREFIX +from .base import CheckManager, check_post_deployment, get_resources_by_name, get_resources_grouped_by_namespace +from .base.display import basic_property_display, colorize_string +from .base.pod import process_pod_status +from .base.resource import filter_resources_by_name +from .common import ( + DEFAULT_PADDING, + DEFAULT_PROPERTY_DISPLAY_COLOR, + PADDING_SIZE, + CoreServiceResourceKinds, + DataflowEndpointType, + DataflowOperationType, + ResourceOutputDetailLevel, +) + +logger = get_logger(__name__) + +PADDING = DEFAULT_PADDING +INNER_PADDING = PADDING + PADDING_SIZE + +dataflow_api_check_name = "enumerateDataflowApi" +dataflow_api_check_desc = "Enumerate Dataflow API resources" + +dataflow_runtime_check_name = "evalCoreServiceRuntime" +dataflow_runtime_check_desc = "Evaluate Dataflow core service" + +dataflows_check_name = "evalDataflows" +dataflows_check_desc = "Evaluate Dataflows" + +dataflow_endpoint_check_name = "evalDataflowEndpoints" +dataflow_endpoint_check_desc = "Evaluate Dataflow Endpoints" + +dataflow_profile_check_name = "evalDataflowProfiles" +dataflow_profile_check_desc = "Evaluate Dataflow Profiles" + +dataflow_target = "dataflows.connectivity.iotoperations.azure.com" +dataflow_endpoint_target = "dataflowendpoints.connectivity.iotoperations.azure.com" +dataflow_profile_target = "dataflowprofiles.connectivity.iotoperations.azure.com" + +valid_source_endpoint_types = [DataflowEndpointType.kafka.value, DataflowEndpointType.mqtt.value] + + +def _process_dataflow_sourcesettings( + check_manager: CheckManager, + target: str, + namespace: str, + dataflow_name: str, + endpoints: List[dict], + operation: dict, + detail_level: int, + padding: int, +): + inner_padding = padding + PADDING_SIZE + settings = operation.get("sourceSettings", {}) + + # show endpoint ref + # TODO - lots of shared code for validating source/dest endpoints, consider refactoring + endpoint_ref = settings.get("endpointRef") + + # currently we are only looking for endpoint references in the same namespace + # duplicate names should not exist, so check the first endpoint that matches the name ref + endpoint_ref_string = "not found" + endpoint_ref_status = endpoint_type_status = CheckTaskStatus.error + endpoint_type_status_string = "invalid" + + found_endpoint = next( + (endpoint for endpoint in endpoints if "name" in endpoint and endpoint["name"] == endpoint_ref), None + ) + endpoint_type = found_endpoint["type"] if found_endpoint and "type" in found_endpoint else None + + if found_endpoint: + endpoint_ref_status = CheckTaskStatus.success + endpoint_ref_string = "detected" + endpoint_type_valid = endpoint_type and endpoint_type.lower() in valid_source_endpoint_types + endpoint_type_status = CheckTaskStatus.success if endpoint_type_valid else CheckTaskStatus.error + endpoint_type_status_string = "valid" if endpoint_type_valid else f"has invalid type: {endpoint_type}" + + endpoint_ref_display = colorize_string(value=endpoint_ref_string, color=endpoint_ref_status.color) + endpoint_validity_display = colorize_string(color=endpoint_type_status.color, value=endpoint_type_status_string) + + # valid endpoint ref eval + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=endpoint_ref_status.value, + resource_name=dataflow_name, + resource_kind=DataflowResourceKinds.DATAFLOW.value, + value={"spec.operations[*].sourceSettings.endpointRef": endpoint_ref}, + ) + + # valid source endpoint type eval + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=endpoint_type_status.value, + resource_name=dataflow_name, + resource_kind=DataflowResourceKinds.DATAFLOW.value, + value={"ref(spec.operations[*].sourceSettings.endpointRef).endpointType": endpoint_type}, + ) + + if detail_level > ResourceOutputDetailLevel.summary.value: + check_manager.add_display( + target_name=target, namespace=namespace, display=Padding("\nSource:", (0, 0, 0, padding)) + ) + endpoint_name_display = f"{{{colorize_string(value=endpoint_ref)}}}" + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + f"Dataflow Endpoint {endpoint_name_display} {endpoint_ref_display}, {endpoint_validity_display}", + (0, 0, 0, padding + PADDING_SIZE), + ), + ) + elif not found_endpoint or not endpoint_type_valid: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("[red]Invalid source endpoint reference[/red]", (0, 0, 0, padding - PADDING_SIZE)), + ) + + if detail_level > ResourceOutputDetailLevel.detail.value: + for label, key in [ + # TODO - validate asset ref / colorize + ("DeviceRegistry Asset Reference", "assetRef"), + ("Schema Reference", "schemaRef"), + ("Serialization Format", "serializationFormat"), + ]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=inner_padding), + ) + + # data source strings - not on summary + if detail_level > ResourceOutputDetailLevel.summary.value: + data_sources = settings.get("dataSources", []) + if data_sources: + check_manager.add_display( + target_name=target, namespace=namespace, display=Padding("Data Sources:", (0, 0, 0, inner_padding)) + ) + for data_source in data_sources: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding(f"- {colorize_string(data_source)}", (0, 0, 0, inner_padding + 2)), + ) + + +def _process_dataflow_transformationsettings( + check_manager: CheckManager, target: str, namespace: str, resource: dict, detail_level: int, padding: int +): + settings = resource.get("builtInTransformationSettings", {}) + + # only show details on non-summary + if detail_level > ResourceOutputDetailLevel.summary.value: + check_manager.add_display( + target_name=target, namespace=namespace, display=Padding("\nBuilt-In Transformation:", (0, 0, 0, padding)) + ) + padding += PADDING_SIZE + inner_padding = padding + PADDING_SIZE + + def _process_inputs(inputs: List[str]): + if inputs: + check_manager.add_display( + target_name=target, namespace=namespace, display=Padding("Inputs:", (0, 0, 0, inner_padding)) + ) + for input in inputs: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding(f"- {colorize_string(input)}", (0, 0, 0, inner_padding + 2)), + ) + + # extra properties + for datasets_label, key in [ + ("Schema Reference", "schemaRef"), + ("Serialization Format", "serializationFormat"), + ]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=datasets_label, value=val, padding=padding), + ) + + # only show datasets, filters, maps on verbose + if detail_level > ResourceOutputDetailLevel.detail.value: + # datasets + datasets = settings.get("datasets", []) + if datasets: + check_manager.add_display( + target_name=target, namespace=namespace, display=Padding("Datasets:", (0, 0, 0, padding)) + ) + for dataset in datasets: + for label, key in [ + ("Description", "description"), + ("Key", "key"), + ("Expression", "expression"), + ("Schema", "schemaRef"), + ]: + val = dataset.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=inner_padding), + ) + inputs = dataset.get("inputs", []) + _process_inputs(inputs) + + # filters + filters = settings.get("filter", []) + if filters: + check_manager.add_display( + target_name=target, namespace=namespace, display=Padding("Filters:", (0, 0, 0, padding)) + ) + for filter in filters: + for datasets_label, key in [ + ("Description", "description"), + ("Expression", "expression"), + ("Operation Type", "type"), + ]: + val = filter.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=datasets_label, value=val, padding=padding), + ) + inputs = filter.get("inputs", []) + _process_inputs(inputs) + + # maps + maps = settings.get("map", []) + if maps: + check_manager.add_display( + target_name=target, namespace=namespace, display=Padding("Maps:", (0, 0, 0, padding)) + ) + for map in maps: + for label, key in [ + ("Description", "description"), + ("Expression", "expression"), + ("Output", "output"), + ("Transformation Type", "type"), + ]: + val = map.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=inner_padding), + ) + inputs = map.get("inputs", []) + _process_inputs(inputs) + + +def _process_dataflow_destinationsettings( + check_manager: CheckManager, + target: str, + namespace: str, + dataflow_name: str, + endpoints: List[dict], + operation: dict, + detail_level: int, + padding: int, +): + settings = operation.get("destinationSettings", {}) + if detail_level > ResourceOutputDetailLevel.summary.value: + check_manager.add_display( + target_name=target, namespace=namespace, display=Padding("\nDestination:", (0, 0, 0, padding)) + ) + endpoint_ref = settings.get("endpointRef") + + # currently we are only looking for endpoint references in the same namespace + # duplicate names should not exist, so check the first endpoint that matches the name ref + endpoint_match = next( + (endpoint for endpoint in endpoints if "name" in endpoint and endpoint["name"] == endpoint_ref), None + ) + + endpoint_validity = "valid" + endpoint_status = CheckTaskStatus.success + if not endpoint_match: + endpoint_validity = "not found" + endpoint_status = CheckTaskStatus.error + # valid endpoint ref eval + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=endpoint_status.value, + resource_name=dataflow_name, + resource_kind=DataflowResourceKinds.DATAFLOW.value, + value={"spec.operations[*].destinationSettings.endpointRef": endpoint_ref}, + ) + # show dataflow endpoint ref on detail + if detail_level > ResourceOutputDetailLevel.summary.value: + padding += PADDING_SIZE + endpoint_name_display = f"{{{colorize_string(value=endpoint_ref)}}}" + endpoint_validity_display = colorize_string(color=endpoint_status.color, value=endpoint_validity) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + f"Dataflow Endpoint {endpoint_name_display} {endpoint_validity_display}", + (0, 0, 0, padding), + ), + ) + elif not endpoint_match: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("[red]Invalid destination endpoint reference[/red]", (0, 0, 0, padding - PADDING_SIZE)), + ) + # only show destination on verbose + if detail_level > ResourceOutputDetailLevel.detail.value: + for label, key in [ + ("Data Destination", "dataDestination"), + ]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + + +def _process_endpoint_mqttsettings( + check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int +) -> None: + settings = spec.get("mqttSettings", {}) + for label, key in [ + ("MQTT Host", "host"), + ("Protocol", "protocol"), + ]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + if detail_level > ResourceOutputDetailLevel.detail.value: + for label, key in [ + ("Client ID Prefix", "clientIdPrefix"), + ("Keep Alive (s)", "keepAliveSeconds"), + ("Max Inflight Messages", "maxInflightMessages"), + ("QOS", "qos"), + ("Retain", "retain"), + ("Session Expiry (s)", "sessionExpirySeconds"), + ]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + + tls = settings.get("tls", {}) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("TLS:", (0, 0, 0, padding)), + ) + for label, key in [ + ("Mode", "mode"), + ("Trusted CA ConfigMap", "trustedCaCertificateConfigMapRef"), + ]: + # TODO - validate ref? + val = tls.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=(padding + PADDING_SIZE)), + ) + + +def _process_endpoint_kafkasettings( + check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int +) -> None: + inner_padding = padding + PADDING_SIZE + settings = spec.get("kafkaSettings", {}) + + for label, key in [ + ("Kafka Host", "host"), + ("Consumer Group ID", "consumerGroupId"), + ]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + + if detail_level > ResourceOutputDetailLevel.detail.value: + # extra properties + for label, key in [ + ("Compression", "compression"), + ("Copy MQTT Properties", "copyMqttProperties"), + ("Acks", "kafkaAcks"), + ("Partition Strategy", "partitionStrategy"), + ]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + # tls + tls = settings.get("tls", {}) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("TLS:", (0, 0, 0, padding)), + ) + for label, key in [ + ("Mode", "mode"), + ("Trusted CA ConfigMap", "trustedCaCertificateConfigMapRef"), + ]: + # TODO - validate ref? + val = tls.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=inner_padding), + ) + + # batching + batching = settings.get("batching", {}) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("Batching:", (0, 0, 0, padding)), + ) + + for label, key in [ + ("Latency (ms)", "latencyMs"), + ("Max Bytes", "maxBytes"), + ("Max Messages", "maxMessages"), + ("Mode", "mode"), + ]: + val = batching.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=inner_padding), + ) + + +def _process_endpoint_fabriconelakesettings( + check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int +) -> None: + settings = spec.get("fabricOneLakeSettings", {}) + for label, key in [("Fabric Host", "host"), ("Path Type", "oneLakePathType")]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + if detail_level > ResourceOutputDetailLevel.detail.value: + names = settings.get("names", {}) + for label, key in [ + ("Lakehouse Name", "lakehouseName"), + ("Workspace Name", "workspaceName"), + ]: + val = names.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + + batching = settings.get("batching", {}) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("Batching:", (0, 0, 0, padding)), + ) + + padding += PADDING_SIZE + for label, key in [ + ("Latency (s)", "latencySeconds"), + ("Max Messages", "maxMessages"), + ]: + val = batching.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + + +def _process_endpoint_datalakestoragesettings( + check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int +) -> None: + settings = spec.get("datalakeStorageSettings", {}) + for label, key in [("DataLake Host", "host")]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + + if detail_level > ResourceOutputDetailLevel.detail.value: + batching = settings.get("batching", {}) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("Batching:", (0, 0, 0, padding)), + ) + padding += PADDING_SIZE + for label, key in [ + ("Latency (s)", "latencySeconds"), + ("Max Messages", "maxMessages"), + ]: + val = batching.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + + +def _process_endpoint_dataexplorersettings( + check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int +) -> None: + settings = spec.get("dataExplorerSettings", {}) + for label, key in [("Database Name", "database"), ("Data Explorer Host", "host")]: + val = settings.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + + if detail_level > ResourceOutputDetailLevel.detail.value: + batching = settings.get("batching", {}) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("Batching:", (0, 0, 0, padding)), + ) + + padding += PADDING_SIZE + for label, key in [ + ("Latency (s)", "latencySeconds"), + ("Max Messages", "maxMessages"), + ]: + val = batching.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=padding), + ) + + +def _process_endpoint_localstoragesettings( + check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int +) -> None: + # TODO - validate reference + settings = spec.get("localStorageSettings", {}) + persistent_volume_claim = settings.get("persistentVolumeClaimRef") + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding(f"Persistent Volume Claim: {persistent_volume_claim}", (0, 0, 0, padding)), + ) + + +def check_dataflows_deployment( + result: Dict[str, Any], + as_list: bool = False, + detail_level: int = ResourceOutputDetailLevel.summary.value, + resource_kinds: List[str] = None, + resource_name: str = None, +) -> None: + evaluate_funcs = { + CoreServiceResourceKinds.RUNTIME_RESOURCE: evaluate_core_service_runtime, + DataflowResourceKinds.DATAFLOWPROFILE: evaluate_dataflow_profiles, + DataflowResourceKinds.DATAFLOW: evaluate_dataflows, + DataflowResourceKinds.DATAFLOWENDPOINT: evaluate_dataflow_endpoints, + } + + check_post_deployment( + api_info=DATAFLOW_API_V1B1, + check_name=dataflow_api_check_name, + check_desc=dataflow_api_check_desc, + result=result, + evaluate_funcs=evaluate_funcs, + as_list=as_list, + detail_level=detail_level, + resource_kinds=resource_kinds, + resource_name=resource_name, + ) + + +def evaluate_core_service_runtime( + as_list: bool = False, + detail_level: int = ResourceOutputDetailLevel.summary.value, + resource_name: str = None, +): + check_manager = CheckManager( + check_name=dataflow_runtime_check_name, + check_desc=dataflow_runtime_check_desc, + ) + + operators = get_namespaced_pods_by_prefix( + prefix=DATAFLOW_OPERATOR_PREFIX, + namespace="", + label_selector=DATAFLOW_NAME_LABEL, + ) + if resource_name: + operators = filter_resources_by_name( + resources=operators, + resource_name=resource_name, + ) + + if not operators: + check_manager.add_target(target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value) + check_manager.add_display( + target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, + display=Padding("Unable to fetch pods.", (0, 0, 0, PADDING)), + ) + for namespace, pods in get_resources_grouped_by_namespace(operators): + check_manager.add_target( + target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, + namespace=namespace, + ) + check_manager.add_display( + target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, + namespace=namespace, + display=Padding( + f"Dataflow operator in namespace {{[purple]{namespace}[/purple]}}", + (0, 0, 0, PADDING), + ), + ) + + process_pod_status( + check_manager=check_manager, + target_service_pod=f"pod/{DATAFLOW_OPERATOR_PREFIX}", + target=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, + pods=pods, + namespace=namespace, + display_padding=INNER_PADDING, + detail_level=detail_level, + ) + + return check_manager.as_dict(as_list) + + +def evaluate_dataflows( + as_list: bool = False, + detail_level: int = ResourceOutputDetailLevel.summary.value, + resource_name: str = None, +): + check_manager = CheckManager( + check_name=dataflows_check_name, + check_desc=dataflows_check_desc, + ) + all_dataflows = get_resources_by_name( + api_info=DATAFLOW_API_V1B1, + kind=DataflowResourceKinds.DATAFLOW, + resource_name=resource_name, + ) + target = dataflow_target + + # No dataflows - skip + if not all_dataflows: + no_dataflows_text = "No Dataflow resources detected in any namespace." + check_manager.add_target(target_name=target) + check_manager.add_target_eval( + target_name=target, status=CheckTaskStatus.skipped.value, value={"dataflows": no_dataflows_text} + ) + check_manager.add_display( + target_name=target, + display=Padding(no_dataflows_text, (0, 0, 0, PADDING)), + ) + return check_manager.as_dict(as_list=as_list) + for namespace, dataflows in get_resources_grouped_by_namespace(all_dataflows): + check_manager.add_target(target_name=target, namespace=namespace) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding(f"Dataflows in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING)), + ) + # conditions + check_manager.add_target_conditions( + target_name=target, + namespace=namespace, + conditions=[ + # valid dataflow profile reference + "spec.profileRef", + # at least a source and destination operation + "len(spec.operations)<=3", + # valid source endpoint + "spec.operations[*].sourceSettings.endpointRef", + "ref(spec.operations[*].sourceSettings.endpointRef).endpointType in ('kafka','mqtt')", + # valid destination endpoint + "spec.operations[*].destinationSettings.endpointRef", + # single source/destination + "len(spec.operations[*].sourceSettings)==1", + "len(spec.operations[*].destinationSettings)==1", + ], + ) + + # profile names for reference lookup + all_profiles = get_resources_by_name( + api_info=DATAFLOW_API_V1B1, + kind=DataflowResourceKinds.DATAFLOWPROFILE, + namespace=namespace, + resource_name=None, + ) + profile_names = {profile.get("metadata", {}).get("name") for profile in all_profiles} + + all_endpoints = get_resources_by_name( + api_info=DATAFLOW_API_V1B1, + kind=DataflowResourceKinds.DATAFLOWENDPOINT, + namespace=namespace, + resource_name=None, + ) + + endpoints = [ + {"name": endpoint.get("metadata", {}).get("name"), "type": endpoint.get("spec", {}).get("endpointType")} + for endpoint in all_endpoints + ] + + for dataflow in list(dataflows): + spec = dataflow.get("spec", {}) + dataflow_name = dataflow.get("metadata", {}).get("name") + mode = spec.get("mode") + mode_lower = str(mode).lower() if mode else "unknown" + dataflow_enabled = mode_lower == "enabled" + mode_display = colorize_string(value=mode_lower) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + f"\n- Dataflow {{{colorize_string(value=dataflow_name)}}} is {mode_display}", + (0, 0, 0, PADDING), + ), + ) + + # if dataflow is disabled, skip evaluations and displays + if not dataflow_enabled: + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=CheckTaskStatus.skipped.value, + resource_name=dataflow_name, + resource_kind=DataflowResourceKinds.DATAFLOW.value, + value={"spec.mode": mode}, + ) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + colorize_string( + value=f"{CheckTaskStatus.skipped.emoji} Skipping evaluation of disabled dataflow", + color=CheckTaskStatus.skipped.color, + ), + (0, 0, 0, PADDING + 2) + ), + ) + continue + + profile_ref = spec.get("profileRef") + profile_ref_status = CheckTaskStatus.success + if profile_ref and profile_ref not in profile_names: + profile_ref_status = CheckTaskStatus.error + + # valid profileRef eval + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=profile_ref_status.value, + resource_name=dataflow_name, + resource_kind=DataflowResourceKinds.DATAFLOW.value, + value={"spec.profileRef": profile_ref}, + ) + + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + f"Dataflow Profile: {{{colorize_string(color=profile_ref_status.color, value=profile_ref)}}}", + (0, 0, 0, INNER_PADDING), + ), + ) + if profile_ref_status == CheckTaskStatus.error: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + colorize_string(color=profile_ref_status.color, value="Invalid Dataflow Profile reference"), + (0, 0, 0, INNER_PADDING), + ), + ) + + operations = spec.get("operations", []) + + # check operations count + operations_status = CheckTaskStatus.success.value + if not operations or not (2 <= len(operations) <= 3): + operations_status = CheckTaskStatus.error.value + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=operations_status, + resource_name=dataflow_name, + resource_kind=DataflowResourceKinds.DATAFLOW.value, + value={"len(operations)": len(operations)}, + ) + + if operations and detail_level > ResourceOutputDetailLevel.summary.value: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("Operations:", (0, 0, 0, INNER_PADDING)), + ) + operation_padding = INNER_PADDING + PADDING_SIZE + sources = destinations = 0 + for operation in operations: + op_type = operation.get("operationType", "").lower() + if op_type == DataflowOperationType.source.value: + sources += 1 + _process_dataflow_sourcesettings( + check_manager=check_manager, + target=target, + namespace=namespace, + dataflow_name=dataflow_name, + endpoints=endpoints, + operation=operation, + detail_level=detail_level, + padding=operation_padding, + ) + elif op_type == DataflowOperationType.builtin_transformation.value: + _process_dataflow_transformationsettings( + check_manager=check_manager, + target=target, + namespace=namespace, + resource=operation, + detail_level=detail_level, + padding=operation_padding, + ) + elif op_type == DataflowOperationType.destination.value: + destinations += 1 + _process_dataflow_destinationsettings( + check_manager=check_manager, + target=target, + namespace=namespace, + dataflow_name=dataflow_name, + endpoints=endpoints, + operation=operation, + detail_level=detail_level, + padding=operation_padding, + ) + # eval source amount (1) + sources_status = destinations_status = CheckTaskStatus.success.value + if sources != 1: + sources_status = CheckTaskStatus.error.value + message = "Missing source operation" if sources == 0 else f"Too many source operations: {sources}" + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding(f"[red]{message}[/red]", (0, 0, 0, INNER_PADDING)), + ) + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=sources_status, + resource_name=dataflow_name, + resource_kind=DataflowResourceKinds.DATAFLOW.value, + value={"len(spec.operations[*].sourceSettings)": sources}, + ) + + if destinations != 1: + destinations_status = CheckTaskStatus.error.value + message = ( + "Missing destination operation" + if destinations == 0 + else f"Too many destination operations: {destinations}" + ) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding(f"[red]{message}[/red]", (0, 0, 0, INNER_PADDING)), + ) + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=destinations_status, + resource_name=dataflow_name, + resource_kind=DataflowResourceKinds.DATAFLOW.value, + value={"len(spec.operations[*].destinationSettings)": destinations}, + ) + return check_manager.as_dict(as_list=as_list) + + +def evaluate_dataflow_endpoints( + as_list: bool = False, + detail_level: int = ResourceOutputDetailLevel.summary.value, + resource_name: str = None, +): + check_manager = CheckManager( + check_name=dataflow_endpoint_check_name, + check_desc=dataflow_endpoint_check_desc, + ) + all_endpoints = get_resources_by_name( + api_info=DATAFLOW_API_V1B1, + kind=DataflowResourceKinds.DATAFLOWENDPOINT, + resource_name=resource_name, + ) + target = dataflow_endpoint_target + if not all_endpoints: + no_endpoints_text = "No Dataflow Endpoints detected in any namespace." + check_manager.add_target(target_name=target) + check_manager.add_target_eval( + target_name=target, status=CheckTaskStatus.skipped.value, value={"endpoints": no_endpoints_text} + ) + check_manager.add_display( + target_name=target, + display=Padding(no_endpoints_text, (0, 0, 0, PADDING)), + ) + return check_manager.as_dict(as_list=as_list) + for namespace, endpoints in get_resources_grouped_by_namespace(all_endpoints): + check_manager.add_target(target_name=target, namespace=namespace, conditions=["spec.endpointType"]) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding(f"Dataflow Endpoints in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING)), + ) + for endpoint in list(endpoints): + spec = endpoint.get("spec", {}) + endpoint_name = endpoint.get("metadata", {}).get("name") + endpoint_type = spec.get("endpointType") + valid_endpoint_type = endpoint_type and endpoint_type.lower() in DataflowEndpointType.list() + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=CheckTaskStatus.success.value if valid_endpoint_type else CheckTaskStatus.error.value, + resource_name=endpoint_name, + resource_kind=DataflowResourceKinds.DATAFLOWENDPOINT.value, + value={"spec.endpointType": endpoint_type}, + ) + + endpoint_string = f"Endpoint {{{colorize_string(value=endpoint_name)}}}" + detected_string = colorize_string(color="green", value="detected") + type_string = f"type: {colorize_string(color=DEFAULT_PROPERTY_DISPLAY_COLOR if valid_endpoint_type else 'red', value=endpoint_type)}" + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + f"\n- {endpoint_string} {detected_string}, {type_string}", + (0, 0, 0, PADDING), + ), + ) + + # endpoint auth + if detail_level > ResourceOutputDetailLevel.summary.value: + auth = spec.get("authentication", {}) + auth_method = auth.get("method") + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display( + label="Authentication Method", value=auth_method, padding=INNER_PADDING + ), + ) + + endpoint_processor_dict = { + DataflowEndpointType.mqtt.value: _process_endpoint_mqttsettings, + DataflowEndpointType.kafka.value: _process_endpoint_kafkasettings, + DataflowEndpointType.fabric_onelake.value: _process_endpoint_fabriconelakesettings, + DataflowEndpointType.datalake.value: _process_endpoint_datalakestoragesettings, + DataflowEndpointType.data_explorer.value: _process_endpoint_dataexplorersettings, + DataflowEndpointType.local_storage.value: _process_endpoint_localstoragesettings, + } + # process endpoint settings + if endpoint_type and endpoint_type.lower() in endpoint_processor_dict: + endpoint_processor_dict[endpoint_type.lower()]( + check_manager=check_manager, + target=target, + namespace=namespace, + spec=spec, + detail_level=detail_level, + padding=INNER_PADDING, + ) + else: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + colorize_string(color="red", value=f"Unknown endpoint type: {endpoint_type}"), + (0, 0, 0, INNER_PADDING), + ), + ) + return check_manager.as_dict(as_list=as_list) + + +def evaluate_dataflow_profiles( + as_list: bool = False, + detail_level: int = ResourceOutputDetailLevel.summary.value, + resource_name: str = None, +): + check_manager = CheckManager( + check_name=dataflow_profile_check_name, + check_desc=dataflow_profile_check_desc, + ) + target = dataflow_profile_target + + all_profiles = get_resources_by_name( + api_info=DATAFLOW_API_V1B1, + kind=DataflowResourceKinds.DATAFLOWPROFILE, + resource_name=resource_name, + ) + if not all_profiles: + no_profiles_text = "No Dataflow Profiles detected in any namespace." + check_manager.add_target(target_name=target) + check_manager.add_target_eval( + target_name=target, status=CheckTaskStatus.warning.value, value={"profiles": no_profiles_text} + ) + check_manager.add_display( + target_name=target, + display=Padding(no_profiles_text, (0, 0, 0, PADDING)), + ) + return check_manager.as_dict(as_list=as_list) + for namespace, profiles in get_resources_grouped_by_namespace(all_profiles): + check_manager.add_target( + target_name=target, + namespace=namespace, + conditions=["spec.instanceCount", f"[*].metadata.name=='{DEFAULT_DATAFLOW_PROFILE}'"], + ) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding(f"Dataflow Profiles in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING)), + ) + + # warn if no default dataflow profile + default_profile_status = CheckTaskStatus.warning + for profile in list(profiles): + profile_name = profile.get("metadata", {}).get("name") + # check for default dataflow profile + if profile_name == DEFAULT_DATAFLOW_PROFILE: + default_profile_status = CheckTaskStatus.success + spec = profile.get("spec", {}) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + f"\n- Profile {{{colorize_string(value=profile_name)}}} {colorize_string(color='green', value='detected')}", + (0, 0, 0, PADDING), + ), + ) + profile_status = profile.get("status", {}) + status_level = profile_status.get("configStatusLevel") + status_description = profile_status.get("statusDescription") + # add eval for status if present + if profile_status: + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + resource_name=profile_name, + resource_kind=DataflowResourceKinds.DATAFLOWPROFILE.value, + status=ResourceState.map_to_status(status_level).value, + value={"status": profile_status}, + ) + # show status description (colorized) if it exists + if status_description: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display( + label="Status", + value=status_description, + color=ResourceState.map_to_color(status_level), + padding=INNER_PADDING, + ), + ) + instance_count = spec.get("instanceCount") + has_instances = instance_count is not None and int(instance_count) >= 0 + instance_status = CheckTaskStatus.success if has_instances else CheckTaskStatus.error + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=instance_status.value, + resource_name=profile_name, + resource_kind=DataflowResourceKinds.DATAFLOWPROFILE.value, + value={"spec.instanceCount": instance_count}, + ) + if has_instances: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display( + label="Instance count", value=instance_count, color=instance_status.color, padding=INNER_PADDING + ), + ) + else: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("[red]No instance count set[/red]", (0, 0, 0, INNER_PADDING)), + ) + + # diagnostics on higher detail levels + if detail_level > ResourceOutputDetailLevel.summary.value: + log_padding = PADDING + PADDING_SIZE + log_inner_padding = log_padding + PADDING_SIZE + diagnostics = spec.get("diagnostics", {}) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("Diagnostic Logs:", (0, 0, 0, log_padding)), + ) + + # diagnostic logs + diagnostic_logs = diagnostics.get("logs", {}) + diagnostic_log_level = diagnostic_logs.get("level") + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display( + label="Log Level", value=diagnostic_log_level, padding=log_inner_padding + ), + ) + + if detail_level > ResourceOutputDetailLevel.detail.value: + diagnostic_log_otelconfig = diagnostic_logs.get("openTelemetryExportConfig", {}) + if diagnostic_log_otelconfig: + for label, key in [ + ("Endpoint", "otlpGrpcEndpoint"), + ("Interval (s)", "intervalSeconds"), + ("Level", "level"), + ]: + val = diagnostic_log_otelconfig.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=log_inner_padding), + ) + + # diagnostic metrics + diagnostic_metrics = diagnostics.get("metrics", {}) + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding("Diagnostic Metrics:", (0, 0, 0, log_padding)), + ) + + diagnostic_metrics_prometheusPort = diagnostic_metrics.get("prometheusPort") + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display( + label="Prometheus Port", value=diagnostic_metrics_prometheusPort, padding=log_inner_padding + ), + ) + + diagnostic_metrics_otelconfig = diagnostic_metrics.get("openTelemetryExportConfig", {}) + if diagnostic_metrics_otelconfig: + for label, key in [ + ("Endpoint", "otlpGrpcEndpoint"), + ("Interval (s)", "intervalSeconds"), + ]: + val = diagnostic_metrics_otelconfig.get(key) + if val: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=basic_property_display(label=label, value=val, padding=log_inner_padding), + ) + # pod health - trailing `-` is important in case profiles have similar prefixes + pod_prefix = f"{DATAFLOW_PROFILE_POD_PREFIX}{profile_name}-" + profile_pods = get_namespaced_pods_by_prefix( + prefix=pod_prefix, + namespace=namespace, + label_selector=DATAFLOW_NAME_LABEL, + ) + # only show pods if they exist + if profile_pods: + process_pod_status( + check_manager=check_manager, + target_service_pod=f"pod/{pod_prefix}", + target=target, + pods=profile_pods, + namespace=namespace, + display_padding=INNER_PADDING, + detail_level=detail_level, + ) + + # default dataflow profile status, display warning if not success + check_manager.add_target_eval( + target_name=target, + namespace=namespace, + status=default_profile_status.value, + resource_kind=DataflowResourceKinds.DATAFLOWPROFILE.value, + resource_name=DEFAULT_DATAFLOW_PROFILE, + value={f"[*].metadata.name=='{DEFAULT_DATAFLOW_PROFILE}'": default_profile_status.value}, + ) + if not default_profile_status == CheckTaskStatus.success: + check_manager.add_display( + target_name=target, + namespace=namespace, + display=Padding( + colorize_string( + color=default_profile_status.color, + value=f"\nDefault Dataflow Profile '{DEFAULT_DATAFLOW_PROFILE}' not found in namespace '{namespace}'", + ), + (0, 0, 0, PADDING), + ), + ) + + return check_manager.as_dict(as_list=as_list) diff --git a/azext_edge/edge/providers/checks.py b/azext_edge/edge/providers/checks.py index 5089e0fb2..6f743f0eb 100644 --- a/azext_edge/edge/providers/checks.py +++ b/azext_edge/edge/providers/checks.py @@ -7,6 +7,7 @@ from typing import Any, Dict, List from azure.cli.core.azclierror import ArgumentUsageError +from azext_edge.edge.providers.edge_api.dataflow import DataflowResourceKinds from rich.console import Console from ..common import ListableEnum, OpsServiceType @@ -18,6 +19,7 @@ from .edge_api.deviceregistry import DeviceRegistryResourceKinds from .edge_api.mq import MqResourceKinds from .check.akri import check_akri_deployment +from .check.dataflow import check_dataflows_deployment from .edge_api.akri import AkriResourceKinds from .edge_api.opcua import OpcuaResourceKinds @@ -61,6 +63,7 @@ def run_checks( OpsServiceType.mq.value: check_mq_deployment, OpsServiceType.deviceregistry.value: check_deviceregistry_deployment, OpsServiceType.opcua.value: check_opcua_deployment, + OpsServiceType.dataflow.value: check_dataflows_deployment, } service_check_dict[ops_service]( detail_level=detail_level, @@ -81,6 +84,7 @@ def _validate_resource_kinds_under_service(ops_service: str, resource_kinds: Lis OpsServiceType.deviceregistry.value: DeviceRegistryResourceKinds, OpsServiceType.mq.value: MqResourceKinds, OpsServiceType.opcua.value: OpcuaResourceKinds, + OpsServiceType.dataflow.value: DataflowResourceKinds, } valid_resource_kinds = service_kinds_dict[ops_service].list() if ops_service in service_kinds_dict else [] diff --git a/azext_edge/edge/providers/orchestration/template.py b/azext_edge/edge/providers/orchestration/template.py index 1de6ded2e..28acca907 100644 --- a/azext_edge/edge/providers/orchestration/template.py +++ b/azext_edge/edge/providers/orchestration/template.py @@ -6,8 +6,9 @@ import json from copy import deepcopy -from typing import NamedTuple, Optional, List, Union +from typing import List, NamedTuple, Optional, Union +from ...common import DEFAULT_DATAFLOW_PROFILE from ...util import read_file_content @@ -521,7 +522,7 @@ def get_current_template_copy(custom_template_path: Optional[str] = None) -> Tem ) -def get_basic_dataflow_profile(profile_name: str = "profile", instance_count: int = 1): +def get_basic_dataflow_profile(profile_name: str = DEFAULT_DATAFLOW_PROFILE, instance_count: int = 1): return { "type": "Microsoft.IoTOperations/instances/dataflowProfiles", "apiVersion": "2024-07-01-preview", diff --git a/azext_edge/edge/providers/support/dataflow.py b/azext_edge/edge/providers/support/dataflow.py index 9fda6295a..af768a9c8 100644 --- a/azext_edge/edge/providers/support/dataflow.py +++ b/azext_edge/edge/providers/support/dataflow.py @@ -23,6 +23,9 @@ DATAFLOW_NAME_LABEL = NAME_LABEL_FORMAT.format(label=DATAFLOW_API_V1B1.label) DATAFLOW_DIRECTORY_PATH = DATAFLOW_API_V1B1.moniker +DATAFLOW_OPERATOR_PREFIX = "aio-dataflow-operator" +DATAFLOW_DEPLOYMENT_FIELD_SELECTOR = f"metadata.name={DATAFLOW_OPERATOR_PREFIX}" +DATAFLOW_PROFILE_POD_PREFIX = "aio-dataflow-" def fetch_deployments(): diff --git a/azext_edge/tests/edge/checks/base/test_check_manager_unit.py b/azext_edge/tests/edge/checks/base/test_check_manager_unit.py index 8cbf2b0f4..d17eae1c8 100644 --- a/azext_edge/tests/edge/checks/base/test_check_manager_unit.py +++ b/azext_edge/tests/edge/checks/base/test_check_manager_unit.py @@ -17,7 +17,11 @@ def test_check_manager(): namespace = generate_random_string() check_manager = CheckManager(check_name=name, check_desc=desc) assert_check_manager_dict( - check_manager=check_manager, expected_name=name, expected_namespace=namespace, expected_desc=desc + check_manager=check_manager, + expected_name=name, + expected_namespace=namespace, + expected_desc=desc, + expected_status=CheckTaskStatus.skipped.value, ) target_1 = generate_random_string() @@ -54,9 +58,7 @@ def test_check_manager(): expected_targets=expected_targets, expected_target_displays={target_1: [target_1_display_1]}, ) - check_manager.add_target_eval( - target_name=target_1, namespace=namespace, status=CheckTaskStatus.warning.value - ) + check_manager.add_target_eval(target_name=target_1, namespace=namespace, status=CheckTaskStatus.warning.value) expected_targets = { target_1: { "conditions": target_1_conditions, @@ -83,9 +85,7 @@ def test_check_manager(): target_2_condition_1 = generate_random_string() target_2_conditions = [target_2_condition_1] check_manager.add_target(target_name=target_2, namespace=namespace, conditions=target_2_conditions) - check_manager.add_target_eval( - target_name=target_2, namespace=namespace, status=CheckTaskStatus.error.value - ) + check_manager.add_target_eval(target_name=target_2, namespace=namespace, status=CheckTaskStatus.error.value) expected_targets = { target_1: { diff --git a/azext_edge/tests/edge/checks/base/test_display_unit.py b/azext_edge/tests/edge/checks/base/test_display_unit.py index 444c80ea8..c1feb1f4a 100644 --- a/azext_edge/tests/edge/checks/base/test_display_unit.py +++ b/azext_edge/tests/edge/checks/base/test_display_unit.py @@ -6,7 +6,11 @@ import pytest from azext_edge.edge.common import CheckTaskStatus -from azext_edge.edge.providers.check.common import ALL_NAMESPACES_TARGET +from azext_edge.edge.providers.check.common import ( + ALL_NAMESPACES_TARGET, + DEFAULT_PADDING, + DEFAULT_PROPERTY_DISPLAY_COLOR +) from ....generators import generate_random_string @@ -73,3 +77,35 @@ def test_process_value_color(mocked_check_manager, key, value): ) else: assert result.startswith("[cyan]") + + +@pytest.mark.parametrize("color", ["red", "yellow", "green", "cyan", None]) +@pytest.mark.parametrize("value", [generate_random_string()]) +def test_colorize_string(value, color): + from azext_edge.edge.providers.check.base.display import colorize_string + result = colorize_string(value=value, color=color) + if not color: + color = DEFAULT_PROPERTY_DISPLAY_COLOR + assert result == f"[{color}]{value}[/{color}]" + + +@pytest.mark.parametrize("label", [generate_random_string()]) +@pytest.mark.parametrize("value", [generate_random_string()]) +@pytest.mark.parametrize("color", ["red", "yellow", "green", "cyan", None]) +@pytest.mark.parametrize("padding", [4, 8, -1, 0, None]) +def test_basic_property_display(label, value, color, padding): + from azext_edge.edge.providers.check.base.display import basic_property_display + result = basic_property_display( + label=label, value=value, color=color, padding=padding + ) + if not color: + color = DEFAULT_PROPERTY_DISPLAY_COLOR + if not padding: + padding = DEFAULT_PADDING + assert result.renderable == f"{label}: [{color}]{value}[/{color}]" + assert ( + result.top, + result.right, + result.bottom, + result.left + ) == (0, 0, 0, padding) diff --git a/azext_edge/tests/edge/checks/conftest.py b/azext_edge/tests/edge/checks/conftest.py index 21b866d05..6a602516c 100644 --- a/azext_edge/tests/edge/checks/conftest.py +++ b/azext_edge/tests/edge/checks/conftest.py @@ -119,6 +119,15 @@ def mock_resource_types(mocker, ops_service): "AssetEndpointProfile": [{}], } ) + elif ops_service == "dataflow": + patched.return_value = ( + {}, + { + "Dataflow": [{}], + "DataflowEndpoint": [{}], + "DataflowProfile": [{}], + } + ) yield patched diff --git a/azext_edge/tests/edge/checks/int/helpers.py b/azext_edge/tests/edge/checks/int/helpers.py index 94019ff6c..55cd1f8c3 100644 --- a/azext_edge/tests/edge/checks/int/helpers.py +++ b/azext_edge/tests/edge/checks/int/helpers.py @@ -54,10 +54,10 @@ def assert_eval_core_service_runtime( ): assert post_deployment["evalCoreServiceRuntime"] assert post_deployment["evalCoreServiceRuntime"]["description"] == f"Evaluate {description_name} core service" - overall_status = "success" + overall_status = "skipped" runtime_resource = post_deployment["evalCoreServiceRuntime"]["targets"]["coreServiceRuntimeResource"] for namespace in runtime_resource.keys(): - namespace_status = "success" + namespace_status = "skipped" evals = runtime_resource[namespace]["evaluations"] kubectl_pods = get_kubectl_workload_items( prefixes=pod_prefix, @@ -197,12 +197,10 @@ def assert_general_eval_custom_resources( namespace_dict = post_deployment[key]["targets"][target_key] for namespace, kubectl_items in sorted_items.items(): assert namespace in namespace_dict - check_names = [] # filter out the kubernetes runtime resource evals using /, only check the CRD evals crd_evals = [item for item in namespace_dict[namespace]["evaluations"] if "/" not in item.get("name", "")] - for item in crd_evals: - if item.get("name"): - check_names.append(item.get("name")) + # filter checks by unique checks per item name + check_names = {item.get("name") for item in crd_evals if item.get("name")} # if using resource name filter, could have missing items assert len(check_names) <= len(kubectl_items) for name in check_names: @@ -240,7 +238,7 @@ def run_check_command( ops_service: str, resource_api: EdgeResourceApi, resource_kind: str, - resource_match: str, + resource_match: Optional[str] = None, ) -> Tuple[Dict[str, Any], bool]: try: aio_check = run(f"kubectl api-resources --api-group={resource_api.group}") diff --git a/azext_edge/tests/edge/checks/int/test_dataflow_int.py b/azext_edge/tests/edge/checks/int/test_dataflow_int.py new file mode 100644 index 000000000..5a940b00c --- /dev/null +++ b/azext_edge/tests/edge/checks/int/test_dataflow_int.py @@ -0,0 +1,123 @@ +# coding=utf-8 +# ---------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License file in the project root for license information. +# ---------------------------------------------------------------------------------------------- + +from typing import Any, Dict +import pytest +from knack.log import get_logger +from azext_edge.edge.providers.check.common import ResourceOutputDetailLevel +from azext_edge.edge.providers.edge_api import ( + DataflowResourceKinds, DATAFLOW_API_V1B1 +) +from .helpers import ( + assert_enumerate_resources, + assert_eval_core_service_runtime, + assert_general_eval_custom_resources, + run_check_command +) +from ....helpers import get_kubectl_custom_items + +logger = get_logger(__name__) + + +@pytest.mark.parametrize("detail_level", ResourceOutputDetailLevel.list()) +@pytest.mark.parametrize("resource_kind", DataflowResourceKinds.list() + [None]) +def test_dataflow_check(init_setup, detail_level, resource_kind): + post_deployment, dataflow_present = run_check_command( + detail_level=detail_level, + ops_service="dataflow", + resource_api=DATAFLOW_API_V1B1, + resource_kind=resource_kind + ) + + # overall api + assert_enumerate_resources( + post_deployment=post_deployment, + description_name="Dataflow", + key_name="Dataflow", + resource_api=DATAFLOW_API_V1B1, + resource_kinds=DataflowResourceKinds.list(), + present=dataflow_present, + ) + + if not resource_kind: + assert_eval_core_service_runtime( + post_deployment=post_deployment, + description_name="Dataflow", + pod_prefix="aio-dataflow-operator-", + ) + else: + assert "evalCoreServiceRuntime" not in post_deployment + + custom_resources = get_kubectl_custom_items( + resource_api=DATAFLOW_API_V1B1, + include_plural=True + ) + assert_eval_dataflows( + post_deployment=post_deployment, + custom_resources=custom_resources, + resource_kind=resource_kind + ) + assert_eval_dataflow_endpoints( + post_deployment=post_deployment, + custom_resources=custom_resources, + resource_kind=resource_kind + ) + assert_eval_dataflow_profiles( + post_deployment=post_deployment, + custom_resources=custom_resources, + resource_kind=resource_kind + ) + + +def assert_eval_dataflows( + post_deployment: Dict[str, Any], + custom_resources: Dict[str, Any], + resource_kind: str, +): + resource_kind_present = resource_kind in [None, DataflowResourceKinds.DATAFLOW.value] + dataflows = custom_resources[DataflowResourceKinds.DATAFLOW.value] + assert_general_eval_custom_resources( + post_deployment=post_deployment, + items=dataflows, + description_name="Dataflows", + resource_api=DATAFLOW_API_V1B1, + resource_kind_present=resource_kind_present + ) + # TODO: add more as --as-object gets fixed, such as success conditions + + +def assert_eval_dataflow_endpoints( + post_deployment: Dict[str, Any], + custom_resources: Dict[str, Any], + resource_kind: str, +): + resource_kind_present = resource_kind in [None, DataflowResourceKinds.DATAFLOWENDPOINT.value] + endpoints = custom_resources[DataflowResourceKinds.DATAFLOWENDPOINT.value] + assert_general_eval_custom_resources( + post_deployment=post_deployment, + items=endpoints, + description_name="Dataflow Endpoints", + resource_api=DATAFLOW_API_V1B1, + resource_kind_present=resource_kind_present, + ) + # TODO: add more as --as-object gets fixed, such as success conditions + + +def assert_eval_dataflow_profiles( + post_deployment: Dict[str, Any], + custom_resources: Dict[str, Any], + resource_kind: str, +): + resource_kind_present = resource_kind in [None, DataflowResourceKinds.DATAFLOWPROFILE.value] + profiles = custom_resources[DataflowResourceKinds.DATAFLOWPROFILE.value] + assert_general_eval_custom_resources( + post_deployment=post_deployment, + items=profiles, + description_name="Dataflow Profiles", + resource_api=DATAFLOW_API_V1B1, + resource_kind_present=resource_kind_present, + ) + # TODO: add more as --as-object gets fixed, such as success conditions diff --git a/azext_edge/tests/edge/checks/int/test_pre_post_int.py b/azext_edge/tests/edge/checks/int/test_pre_post_int.py index 31539d2d8..321ec89a8 100644 --- a/azext_edge/tests/edge/checks/int/test_pre_post_int.py +++ b/azext_edge/tests/edge/checks/int/test_pre_post_int.py @@ -81,7 +81,8 @@ def test_check_pre_post(init_setup, post, pre): assert "_all_" in node_result["targets"]["cluster/nodes"] node_count_target = node_result["targets"]["cluster/nodes"]["_all_"] assert node_count_target["conditions"] == ["len(cluster/nodes)>=1"] - assert not node_count_target["evaluations"] + assert node_count_target["evaluations"][0]["status"] == expected_status(success_or_fail=len(kubectl_nodes) >= 1) + assert node_count_target["evaluations"][0]["value"] == {"len(cluster/nodes)": len(kubectl_nodes)} final_status = expected_status(success_or_fail=len(kubectl_nodes) >= 1) assert node_count_target["status"] == final_status diff --git a/azext_edge/tests/edge/checks/test_dataflow_checks_unit.py b/azext_edge/tests/edge/checks/test_dataflow_checks_unit.py new file mode 100644 index 000000000..a79ca02fa --- /dev/null +++ b/azext_edge/tests/edge/checks/test_dataflow_checks_unit.py @@ -0,0 +1,1171 @@ +# coding=utf-8 +# ---------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License file in the project root for license information. +# ---------------------------------------------------------------------------------------------- + + +import pytest +from azext_edge.edge.common import DEFAULT_DATAFLOW_PROFILE +from azext_edge.edge.providers.base import DEFAULT_NAMESPACE +from azext_edge.edge.providers.check.dataflow import ( + evaluate_dataflow_endpoints, + evaluate_core_service_runtime, + evaluate_dataflow_profiles, + evaluate_dataflows, +) +from azext_edge.edge.providers.check.common import ( + CoreServiceResourceKinds, + ResourceOutputDetailLevel, +) +from azext_edge.edge.providers.edge_api.dataflow import DataflowResourceKinds + +from .conftest import ( + assert_check_by_resource_types, + assert_conditions, + assert_evaluations, + generate_pod_stub, +) +from ...generators import generate_random_string + +dataflow_conditions = [ + "spec.profileRef", + "len(spec.operations)<=3", + "spec.operations[*].sourceSettings.endpointRef", + "ref(spec.operations[*].sourceSettings.endpointRef).endpointType in ('kafka','mqtt')", + "spec.operations[*].destinationSettings.endpointRef", + "len(spec.operations[*].sourceSettings)==1", + "len(spec.operations[*].destinationSettings)==1", +] + + +@pytest.mark.parametrize( + "resource_kinds", + [ + None, + [], + [DataflowResourceKinds.DATAFLOW.value], + [ + DataflowResourceKinds.DATAFLOW.value, + DataflowResourceKinds.DATAFLOWENDPOINT.value, + ], + [ + DataflowResourceKinds.DATAFLOW.value, + DataflowResourceKinds.DATAFLOWENDPOINT.value, + DataflowResourceKinds.DATAFLOWPROFILE.value, + ], + ], +) +@pytest.mark.parametrize("ops_service", ["dataflow"]) +def test_check_dataflow_by_resource_types(ops_service, mocker, mock_resource_types, resource_kinds): + eval_lookup = { + CoreServiceResourceKinds.RUNTIME_RESOURCE.value: + "azext_edge.edge.providers.check.dataflow.evaluate_core_service_runtime", + DataflowResourceKinds.DATAFLOW.value: + "azext_edge.edge.providers.check.dataflow.evaluate_dataflows", + DataflowResourceKinds.DATAFLOWENDPOINT.value: + "azext_edge.edge.providers.check.dataflow.evaluate_dataflow_endpoints", + DataflowResourceKinds.DATAFLOWPROFILE.value: + "azext_edge.edge.providers.check.dataflow.evaluate_dataflow_profiles", + } + + assert_check_by_resource_types(ops_service, mocker, resource_kinds, eval_lookup) + + +@pytest.mark.parametrize("detail_level", ResourceOutputDetailLevel.list()) +@pytest.mark.parametrize( + "dataflows, profiles, endpoints, conditions, evaluations", + [ + # dataflows (valid) + ( + # dataflows + [ + { + "metadata": { + "name": "dataflow-2", + }, + "spec": { + "mode": "Enabled", + "profileRef": "dataflow-profile-1", + "operations": [ + { + "operationType": "source", + "sourceSettings": { + "endpointRef": "dataflow-endpoint-1", + "assetRef": "asset-ref", + "serializationFormat": "JSON", + "dataSources": ["one", "two"], + }, + }, + { + "operationType": "builtintransformation", + "builtInTransformationSettings": { + "schemaRef": "Schema", + "datasets": [ + { + "description": "desc", + "key": "key", + "expression": "$1 < 20", + "inputs": ["temperature", "pressure"], + }, + ], + "filter": [ + { + "expression": "$1 > 10", + "type": "operationType", + "inputs": ["temperature", "pressure"], + } + ], + "map": [ + { + "description": "desc", + "output": "output", + "inputs": ["temperature", "pressure"], + } + ], + }, + }, + { + "operationType": "destination", + "destinationSettings": { + "endpointRef": "dataflow-endpoint-2", + "dataDestination": "destination", + }, + }, + ], + }, + }, + ], + # profiles + [{"metadata": {"name": "dataflow-profile-1"}}], + # endpoints + [ + { + "metadata": { + "name": "dataflow-endpoint-1", + }, + "spec": {"endpointType": "mqtt"}, + }, + { + "metadata": { + "name": "dataflow-endpoint-2", + }, + "spec": {"endpointType": "kafka"}, + }, + ], + # conditions + dataflow_conditions, + # evaluations + [ + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"spec.profileRef": "dataflow-profile-1"}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"len(operations)": 3}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"spec.operations[*].sourceSettings.endpointRef": "dataflow-endpoint-1"}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"ref(spec.operations[*].sourceSettings.endpointRef).endpointType": "mqtt"}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"spec.operations[*].destinationSettings.endpointRef": "dataflow-endpoint-2"}), + ], + ], + ), + # dataflows (invalid) + ( + # dataflows + [ + { + "metadata": { + "name": "dataflow-1", + }, + "spec": { + "mode": "Enabled", + # invalid profileRef + "profileRef": "nonexistent-dataflow-profile", + "operations": [ + # only one destination + { + "operationType": "destination", + "destinationSettings": { + "endpointRef": "invalid-endpoint", + }, + } + ], + }, + }, + { + "metadata": { + "name": "dataflow-2", + }, + "spec": { + "mode": "Enabled", + "profileRef": "real-dataflow-profile", + "operations": [ + # good destination, bad source + { + "operationType": "source", + "sourceSettings": { + "endpointRef": "invalid-endpoint", + }, + }, + { + "operationType": "destination", + "destinationSettings": { + "endpointRef": "real-endpoint", + }, + }, + ], + }, + }, + { + "metadata": { + "name": "dataflow-3", + }, + "spec": { + "mode": "Enabled", + "profileRef": "real-dataflow-profile", + "operations": [ + # invalid source type + { + "operationType": "source", + "sourceSettings": { + "endpointRef": "bad-source-endpoint", + }, + }, + { + "operationType": "destination", + "destinationSettings": { + "endpointRef": "real-endpoint", + }, + }, + ], + }, + }, + { + "metadata": { + "name": "dataflow-4", + }, + "spec": { + "mode": "Enabled", + "profileRef": "real-dataflow-profile", + # no operations + "operations": [], + }, + }, + ], + # profiles + [{"metadata": {"name": "real-dataflow-profile"}}], + # endpoints + [ + { + "metadata": { + "name": "real-endpoint", + }, + "spec": {"endpointType": "kafka"}, + }, + { + "metadata": { + "name": "bad-source-endpoint", + }, + "spec": {"endpointType": "fabriconelake"}, + }, + ], + # conditions + dataflow_conditions, + # evaluations + [ + [ + ("status", "error"), + ( + "name", + "dataflow-1", + ), + ("value", {"spec.profileRef": "nonexistent-dataflow-profile"}), + ], + [ + ("status", "error"), + ( + "name", + "dataflow-1", + ), + ("value", {"len(operations)": 1}), + ], + [ + ("status", "error"), + ( + "name", + "dataflow-1", + ), + ("value", {"spec.operations[*].destinationSettings.endpointRef": "invalid-endpoint"}), + ], + [ + ("status", "error"), + ( + "name", + "dataflow-1", + ), + ("value", {"len(spec.operations[*].sourceSettings)": 0}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-1", + ), + ("value", {"len(spec.operations[*].destinationSettings)": 1}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"spec.profileRef": "real-dataflow-profile"}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"len(operations)": 2}), + ], + [ + ("status", "error"), + ( + "name", + "dataflow-2", + ), + ("value", {"spec.operations[*].sourceSettings.endpointRef": "invalid-endpoint"}), + ], + [ + ("status", "error"), + ( + "name", + "dataflow-2", + ), + ("value", {"ref(spec.operations[*].sourceSettings.endpointRef).endpointType": None}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"spec.operations[*].destinationSettings.endpointRef": "real-endpoint"}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"len(spec.operations[*].sourceSettings)": 1}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-2", + ), + ("value", {"len(spec.operations[*].destinationSettings)": 1}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-3", + ), + ("value", {"spec.profileRef": "real-dataflow-profile"}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-3", + ), + ("value", {"len(operations)": 2}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-3", + ), + ("value", {"spec.operations[*].sourceSettings.endpointRef": "bad-source-endpoint"}), + ], + [ + ("status", "error"), + ( + "name", + "dataflow-3", + ), + ("value", {"ref(spec.operations[*].sourceSettings.endpointRef).endpointType": "fabriconelake"}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-3", + ), + ("value", {"spec.operations[*].destinationSettings.endpointRef": "real-endpoint"}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-3", + ), + ("value", {"len(spec.operations[*].sourceSettings)": 1}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-3", + ), + ("value", {"len(spec.operations[*].destinationSettings)": 1}), + ], + [ + ("status", "success"), + ( + "name", + "dataflow-4", + ), + ("value", {"spec.profileRef": "real-dataflow-profile"}), + ], + [ + ("status", "error"), + ( + "name", + "dataflow-4", + ), + ("value", {"len(operations)": 0}), + ], + [ + ("status", "error"), + ( + "name", + "dataflow-4", + ), + ("value", {"len(spec.operations[*].sourceSettings)": 0}), + ], + [ + ("status", "error"), + ( + "name", + "dataflow-4", + ), + ("value", {"len(spec.operations[*].destinationSettings)": 0}), + ], + ], + ), + # disabled dataflows + ( + # dataflows + [ + { + "metadata": { + "name": "disabled-dataflow", + }, + "spec": { + "mode": "Disabled", + "profileRef": "profile", + "operations": [ + { + "operationType": "source", + "sourceSettings": { + "endpointRef": "dataflow-endpoint-1", + }, + }, + { + "operationType": "destination", + "destinationSettings": { + "endpointRef": "dataflow-endpoint-2", + }, + }, + ], + }, + } + ], + # profiles + [{"metadata": {"name": "profile"}}], + # endpoints + [ + { + "metadata": { + "name": "dataflow-endpoint-1", + }, + "spec": {"endpointType": "mqtt"}, + }, + { + "metadata": { + "name": "dataflow-endpoint-2", + }, + "spec": {"endpointType": "kafka"}, + }, + ], + # conditions + dataflow_conditions, + # evaluations + [ + [ + ("status", "skipped"), + ( + "name", + "disabled-dataflow", + ), + ("value/spec.mode", "Disabled"), + ] + ], + + ), + # no dataflows + ( + # dataflows + [], + # profiles + [], + # endpoints + [], + # conditions + [], + # evaluations + [ + [ + ("status", "skipped"), + ( + "value/dataflows", + "No Dataflow resources detected in any namespace.", + ), + ] + ], + ), + ], +) +def test_evaluate_dataflows( + mocker, + dataflows, + profiles, + endpoints, + conditions, + evaluations, + detail_level, +): + namespace = generate_random_string() + for resource in [dataflows, profiles, endpoints]: + for item in resource: + item["metadata"]["namespace"] = namespace + # this expects the logic to follow this path: + # 1. get all dataflows + # 2. get all profiles + # 3. get all endpoints + side_effects = [{"items": dataflows}, {"items": profiles}, {"items": endpoints}] + + mocker = mocker.patch( + "azext_edge.edge.providers.edge_api.base.EdgeResourceApi.get_resources", + side_effect=side_effects, + ) + + result = evaluate_dataflows(detail_level=detail_level) + + assert result["name"] == "evalDataflows" + assert result["targets"]["dataflows.connectivity.iotoperations.azure.com"] + target = result["targets"]["dataflows.connectivity.iotoperations.azure.com"] + + for namespace in target: + assert namespace in result["targets"]["dataflows.connectivity.iotoperations.azure.com"] + + target[namespace]["conditions"] = [] if not target[namespace]["conditions"] else target[namespace]["conditions"] + assert_conditions(target[namespace], conditions) + assert_evaluations(target[namespace], evaluations) + + +@pytest.mark.parametrize("detail_level", ResourceOutputDetailLevel.list()) +@pytest.mark.parametrize( + "endpoints, conditions, evaluations", + [ + ( + # endpoints + [ + # kafka endpoint + { + "metadata": { + "name": "endpoint-1", + }, + "spec": { + "endpointType": "kafka", + "authentication": {"method": "authMethod"}, + "kafkaSettings": { + "host": "kafkaHost", + "consumerGroupId": None, + "compression": "compression", + "kafkaAcks": 3, + "tls": {"mode": "Enabled"}, + "batching": {"latencyMs": 300}, + }, + }, + }, + # localStorage + { + "metadata": { + "name": "endpoint-2", + }, + "spec": { + "endpointType": "localstorage", + "authentication": {"method": "authMethod"}, + "localStorageSettings": {"persistentVolumeClaimRef": "ref"}, + }, + }, + # Fabric Onelake + { + "metadata": { + "name": "endpoint-3", + }, + "spec": { + "endpointType": "fabriconelake", + "authentication": {"method": "authMethod"}, + "fabricOneLakeSettings": { + "host": "fabric_host", + "names": {"lakehouseName": "lakehouse", "workspaceName": "workspaceName"}, + "batching": {"latencySeconds": 2}, + }, + }, + }, + # datalake storage + { + "metadata": { + "name": "endpoint-4", + }, + "spec": { + "endpointType": "datalakestorage", + "authentication": {"method": "authMethod"}, + "datalakeStorageSettings": {"host": "datalakeHost", "batching": {"latencySeconds": 12}}, + }, + }, + # dataExplorer + { + "metadata": { + "name": "endpoint-5", + }, + "spec": { + "endpointType": "dataExplorer", + "authentication": {"method": "authMethod"}, + "dataExplorerSettings": { + "database": "databse", + "host": "data_explorer_host", + "batching": {"latencySeconds": 3}, + }, + }, + }, + # mqtt + { + "metadata": { + "name": "endpoint-6", + }, + "spec": { + "endpointType": "mqtt", + "authentication": {"method": "authMethod"}, + "mqttSettings": { + "host": "mqttHost", + "protocol": "Websockets", + "clientIdPrefix": None, + "qos": 3, + "maxInflightMessages": 100, + "tls": {"mode": "Enabled", "trustedCaCertificateConfigMapRef": "ref"}, + }, + }, + }, + # invalid endpoint type + { + "metadata": { + "name": "endpoint-7", + }, + "spec": { + "endpointType": "invalid", + }, + }, + ], + # conditions + [ + "spec.endpointType", + ], + # evaluations + [ + [ + ("status", "success"), + ( + "name", + "endpoint-1", + ), + ("value", {"spec.endpointType": "kafka"}), + ], + [ + ("status", "success"), + ( + "name", + "endpoint-2", + ), + ("value", {"spec.endpointType": "localstorage"}), + ], + [ + ("status", "success"), + ( + "name", + "endpoint-3", + ), + ("value", {"spec.endpointType": "fabriconelake"}), + ], + [ + ("status", "success"), + ( + "name", + "endpoint-4", + ), + ("value", {"spec.endpointType": "datalakestorage"}), + ], + [ + ("status", "success"), + ( + "name", + "endpoint-5", + ), + ("value", {"spec.endpointType": "dataExplorer"}), + ], + [ + ("status", "success"), + ( + "name", + "endpoint-6", + ), + ("value", {"spec.endpointType": "mqtt"}), + ], + [ + ("status", "error"), + ( + "name", + "endpoint-7", + ), + ("value", {"spec.endpointType": "invalid"}), + ], + ], + ), + # no endpoints + ( + # endpoints + [], + # conditions + [], + # evaluations + [ + [ + ("status", "skipped"), + ( + "value/endpoints", + "No Dataflow Endpoints detected in any namespace.", + ), + ] + ], + ), + ], +) +def test_evaluate_dataflow_endpoints( + mocker, + endpoints, + conditions, + evaluations, + detail_level, +): + mocker = mocker.patch( + "azext_edge.edge.providers.edge_api.base.EdgeResourceApi.get_resources", + side_effect=[{"items": endpoints}], + ) + + namespace = generate_random_string() + for endpoint in endpoints: + endpoint["metadata"]["namespace"] = namespace + result = evaluate_dataflow_endpoints(detail_level=detail_level) + + assert result["name"] == "evalDataflowEndpoints" + assert result["targets"]["dataflowendpoints.connectivity.iotoperations.azure.com"] + target = result["targets"]["dataflowendpoints.connectivity.iotoperations.azure.com"] + + for namespace in target: + assert namespace in result["targets"]["dataflowendpoints.connectivity.iotoperations.azure.com"] + + target[namespace]["conditions"] = [] if not target[namespace]["conditions"] else target[namespace]["conditions"] + assert_conditions(target[namespace], conditions) + assert_evaluations(target[namespace], evaluations) + + +@pytest.mark.parametrize("detail_level", ResourceOutputDetailLevel.list()) +@pytest.mark.parametrize( + "profiles, pods, conditions, evaluations", + [ + # good default profile (name, namespace, instance count and diagnostic vals) + ( + # profiles + [ + { + "metadata": { + "name": DEFAULT_DATAFLOW_PROFILE, + "namespace": DEFAULT_NAMESPACE, + }, + "spec": { + "instanceCount": 1, + "diagnostics": { + "logs": { + "level": "info", + "openTelemetryExportConfig": { + "otlpGrpcEndpoint": "endpoint", + }, + }, + "metrics": { + "openTelemetryExportConfig": { + "otlpGrpcEndpoint": "endpoint", + } + }, + }, + }, + }, + ], + # pods + [ + # good profile pod + generate_pod_stub( + name=f"aio-dataflow-{DEFAULT_DATAFLOW_PROFILE}-0", + phase="Running", + ), + ], + # conditions + [ + "spec.instanceCount", + f"[*].metadata.name=='{DEFAULT_DATAFLOW_PROFILE}'", + ], + # evaluations + [ + [ + ("status", "success"), + ( + "name", + "profile", + ), + ("value", {"spec.instanceCount": 1}), + ], + [ + ("status", "success"), + ( + "name", + "pod/aio-dataflow-profile-0", + ), + ("value", {"status.phase": "Running"}), + ], + ], + ), + # good and bad profiles, no default profile warning + ( + # profiles + [ + { + "metadata": { + "name": "good-profile", + "namespace": DEFAULT_NAMESPACE, + }, + "spec": { + "instanceCount": 1, + "diagnostics": { + "logs": { + "level": "info", + "openTelemetryExportConfig": { + "otlpGrpcEndpoint": "endpoint", + }, + }, + "metrics": { + "openTelemetryExportConfig": { + "otlpGrpcEndpoint": "endpoint", + } + }, + }, + }, + }, + { + "metadata": { + "name": "bad-profile", + "namespace": DEFAULT_NAMESPACE, + }, + "spec": { + "instanceCount": None, + }, + }, + ], + # pods + [ + # good profile pod + generate_pod_stub( + name="aio-dataflow-good-profile-0", + phase="Running", + ), + ], + # conditions + [ + "spec.instanceCount", + ], + # evaluations + [ + [ + ("status", "success"), + ( + "name", + "good-profile", + ), + ("value", {"spec.instanceCount": 1}), + ], + [ + ("status", "success"), + ( + "name", + "pod/aio-dataflow-good-profile-0", + ), + ("value", {"status.phase": "Running"}), + ], + [ + ("status", "error"), + ( + "name", + "bad-profile", + ), + ("value", {"spec.instanceCount": None}), + ], + [ + ("status", "warning"), + ( + "value", + {f"[*].metadata.name=='{DEFAULT_DATAFLOW_PROFILE}'": "warning"}, + ), + ], + ], + ), + # good profile, warning status + ( + # profiles + [ + { + "metadata": { + "name": DEFAULT_DATAFLOW_PROFILE, + "namespace": DEFAULT_NAMESPACE, + }, + "spec": { + "instanceCount": 1, + }, + "status": { + "configStatusLevel": "warn", + "statusDescription": "this should display a warning" + }, + }, + ], + # pods + [ + # good profile pod + generate_pod_stub( + name=f"aio-dataflow-{DEFAULT_DATAFLOW_PROFILE}-0", + phase="Running", + ), + ], + # conditions + [ + "spec.instanceCount", + f"[*].metadata.name=='{DEFAULT_DATAFLOW_PROFILE}'", + ], + # evaluations + [ + [ + ("status", "warning"), + ( + "name", + "profile", + ), + ("value/status/configStatusLevel", "warn"), + ("value/status/statusDescription", "this should display a warning"), + ], + [ + ("status", "success"), + ( + "name", + "profile", + ), + ("value", {"spec.instanceCount": 1}), + ], + [ + ("status", "success"), + ( + "name", + "pod/aio-dataflow-profile-0", + ), + ("value", {"status.phase": "Running"}), + ], + ], + ), + # no profiles + ( + # profiles + [], + # pods + [], + # conditions + [], + # evaluations + [ + [ + ("status", "warning"), + ( + "value/profiles", + "No Dataflow Profiles detected in any namespace.", + ), + ] + ], + ), + ], +) +def test_evaluate_dataflow_profiles( + mocker, + profiles, + pods, + conditions, + evaluations, + detail_level, +): + + mocker.patch( + "azext_edge.edge.providers.edge_api.base.EdgeResourceApi.get_resources", side_effect=[{"items": profiles}] + ) + mocker.patch( + "azext_edge.edge.providers.check.dataflow.get_namespaced_pods_by_prefix", + return_value=pods, + side_effect=[pods, []], + ) + result = evaluate_dataflow_profiles(detail_level=detail_level) + + assert result["name"] == "evalDataflowProfiles" + assert result["targets"]["dataflowprofiles.connectivity.iotoperations.azure.com"] + target = result["targets"]["dataflowprofiles.connectivity.iotoperations.azure.com"] + + for namespace in target: + assert namespace in result["targets"]["dataflowprofiles.connectivity.iotoperations.azure.com"] + + target[namespace]["conditions"] = [] if not target[namespace]["conditions"] else target[namespace]["conditions"] + assert_conditions(target[namespace], conditions) + assert_evaluations(target[namespace], evaluations) + + +@pytest.mark.parametrize("detail_level", ResourceOutputDetailLevel.list()) +@pytest.mark.parametrize( + "resource_name", + [ + None, + "aio-dataflow-*", + ], +) +@pytest.mark.parametrize( + "pods, namespace_conditions, namespace_evaluations", + [ + ( + # pods + [ + generate_pod_stub( + name="aio-dataflow-operator-12345", + phase="Running", + ) + ], + # namespace conditions str + [], + # namespace evaluations str + [ + [ + ("status", "success"), + ("value/status.phase", "Running"), + ], + ], + ), + ( + # pods + [ + generate_pod_stub( + name="aio-dataflow-operator-12345", + phase="Failed", + ) + ], + # namespace conditions str + [], + # namespace evaluations str + [ + [("status", "error")], + ], + ), + # no pods + ( + # pods + [], + # namespace conditions str + [], + # namespace evaluations str + [], + ), + ], +) +def test_evaluate_core_service_runtime( + mocker, + pods, + namespace_conditions, + namespace_evaluations, + detail_level, + resource_name, +): + mocker = mocker.patch( + "azext_edge.edge.providers.check.dataflow.get_namespaced_pods_by_prefix", + return_value=pods, + ) + + namespace = generate_random_string() + for pod in pods: + pod.metadata.namespace = namespace + result = evaluate_core_service_runtime(detail_level=detail_level, resource_name=resource_name) + + assert result["name"] == "evalCoreServiceRuntime" + assert result["targets"][CoreServiceResourceKinds.RUNTIME_RESOURCE.value] + target = result["targets"][CoreServiceResourceKinds.RUNTIME_RESOURCE.value] + + for namespace in target: + assert namespace in result["targets"][CoreServiceResourceKinds.RUNTIME_RESOURCE.value] + + target[namespace]["conditions"] = [] if not target[namespace]["conditions"] else target[namespace]["conditions"] + assert_conditions(target[namespace], namespace_conditions) + assert_evaluations(target[namespace], namespace_evaluations)