Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add data-point and event file inputs for az iot ops asset create #280

Merged
merged 9 commits into from
Jul 29, 2024
5 changes: 5 additions & 0 deletions azext_edge/edge/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,11 @@ def load_iotops_help():
data_source={data_source} name={name} observability_mode={observability_mode} sampling_interval={sampling_interval}
queue_size={queue_size} --data data_source={data_source}

- name: Create an asset using a file containing data-points and another file containing events.
text: >
az iot ops asset create --name {asset_name} -g {resource_group} --custom-location {custom_location}
--endpoint {endpoint} --data-file {data_point_file_path} --event-file {event_file_path}

- name: Create an asset with the given pre-filled values.
text: >
az iot ops asset create --name MyAsset -g MyRg --custom-location MyLocation --endpoint example.com
Expand Down
4 changes: 4 additions & 0 deletions azext_edge/edge/commands_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ def create_asset(
custom_location_resource_group: Optional[str] = None,
custom_location_subscription: Optional[str] = None,
data_points: Optional[List[str]] = None,
data_points_file_path: Optional[str] = None,
description: Optional[str] = None,
disabled: bool = False,
display_name: Optional[str] = None,
documentation_uri: Optional[str] = None,
events: Optional[List[str]] = None,
events_file_path: Optional[List[str]] = None,
external_asset_id: Optional[str] = None,
hardware_revision: Optional[str] = None,
location: Optional[str] = None,
Expand Down Expand Up @@ -65,10 +67,12 @@ def create_asset(
custom_location_resource_group=custom_location_resource_group,
custom_location_subscription=custom_location_subscription,
data_points=data_points,
data_points_file_path=data_points_file_path,
description=description,
disabled=disabled,
display_name=display_name,
documentation_uri=documentation_uri,
events_file_path=events_file_path,
events=events,
external_asset_id=external_asset_id,
hardware_revision=hardware_revision,
Expand Down
14 changes: 14 additions & 0 deletions azext_edge/edge/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,13 @@ def load_iotops_arguments(self, _):
"--data can be used 1 or more times. Review help examples for full parameter usage",
arg_group="Additional Info",
)
context.argument(
"data_points_file_path",
options_list=["--data-file", "--df"],
help="File path for the file containing the data points. The following file types are supported: "
f"{', '.join(FileType.list())}.",
arg_group="Additional Info",
)
context.argument(
"description",
options_list=["--description", "-d"],
Expand Down Expand Up @@ -682,6 +689,13 @@ def load_iotops_arguments(self, _):
"--event can be used 1 or more times. Review help examples for full parameter usage",
arg_group="Additional Info",
)
context.argument(
"events_file_path",
options_list=["--event-file", "--ef"],
help="File path for the file containing the events. The following file types are supported: "
f"{', '.join(FileType.list())}.",
arg_group="Additional Info",
)
context.argument(
"external_asset_id",
options_list=["--external-asset-id", "--eai"],
Expand Down
25 changes: 20 additions & 5 deletions azext_edge/edge/providers/rpsaas/adr/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ def create(
custom_location_resource_group: Optional[str] = None,
custom_location_subscription: Optional[str] = None,
data_points: Optional[List[str]] = None,
data_points_file_path: Optional[str] = None,
description: Optional[str] = None,
disabled: bool = False,
display_name: Optional[str] = None,
documentation_uri: Optional[str] = None,
events: Optional[List[str]] = None,
events_file_path: Optional[List[str]] = None,
external_asset_id: Optional[str] = None,
hardware_revision: Optional[str] = None,
location: Optional[str] = None,
Expand All @@ -68,7 +70,7 @@ def create(
ev_queue_size: int = 1,
tags: Optional[Dict[str, str]] = None,
):
if not any([data_points, events]):
if not any([data_points, events, data_points_file_path, events_file_path]):
raise RequiredArgumentMissingError(MISSING_DATA_EVENT_ERROR)
extended_location = self.check_cluster_and_custom_location(
custom_location_name=custom_location_name,
Expand All @@ -88,6 +90,14 @@ def create(
"dataPoints": _process_asset_sub_points("data_source", data_points),
"events": _process_asset_sub_points("event_notifier", events),
}
if data_points_file_path:
properties["dataPoints"].extend(
_process_asset_sub_points_file_path(file_path=data_points_file_path)
)
if events_file_path:
properties["events"].extend(
_process_asset_sub_points_file_path(file_path=events_file_path)
)

# Other properties
_update_properties(
Expand Down Expand Up @@ -354,7 +364,6 @@ def import_sub_points(
resource_group_name: str,
replace: bool = False
):
from ....util import deserialize_file_content
asset = self.show(
resource_name=asset_name,
resource_group_name=resource_group_name,
Expand All @@ -363,8 +372,7 @@ def import_sub_points(
if sub_point_type not in asset["properties"]:
asset["properties"][sub_point_type] = []

sub_points = list(deserialize_file_content(file_path=file_path))
_convert_sub_points_from_csv(sub_points)
sub_points = _process_asset_sub_points_file_path(file_path=file_path)

key = "dataSource" if sub_point_type == "dataPoints" else "eventNotifier"
if replace:
Expand Down Expand Up @@ -591,7 +599,7 @@ def _convert_sub_points_to_csv(
return list(csv_conversion_map.values())


def _process_asset_sub_points(required_arg: str, sub_points: Optional[List[str]]) -> Dict[str, str]:
def _process_asset_sub_points(required_arg: str, sub_points: Optional[List[str]]) -> List[Dict[str, str]]:
"""This is for the main create/update asset commands"""
if not sub_points:
return []
Expand All @@ -618,6 +626,13 @@ def _process_asset_sub_points(required_arg: str, sub_points: Optional[List[str]]
return processed_points


def _process_asset_sub_points_file_path(file_path: str) -> List[Dict[str, str]]:
from ....util import deserialize_file_content
sub_points = list(deserialize_file_content(file_path=file_path))
_convert_sub_points_from_csv(sub_points)
return sub_points


def _process_custom_attributes(current_attributes: Dict[str, str], custom_attributes: List[str]):
custom_attributes = assemble_nargs_to_dict(custom_attributes)
for key, value in custom_attributes.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ def test_asset_sub_point_lifecycle(require_init, tracked_resources, tracked_file
assert_sub_point(asset_data_points[i], **expected_data_points[i])

for file_type in FileType.list():
file_path = run(
data_file_path = run(
f"az iot ops asset data-point export -a {asset_name} -g {rg} -f {file_type}"
)["file_path"]
tracked_files.append(file_path)
assert os.path.exists(file_path)
tracked_files.append(data_file_path)
assert os.path.exists(data_file_path)

asset_data_points = run(
f"az iot ops asset data-point remove -a {asset_name} -g {rg} "
Expand All @@ -202,7 +202,7 @@ def test_asset_sub_point_lifecycle(require_init, tracked_resources, tracked_file
assert len(asset_data_points) + 1 == len(expected_data_points)

asset_data_points = run(
f"az iot ops asset data-point import -a {asset_name} -g {rg} --input-file {file_path}"
f"az iot ops asset data-point import -a {asset_name} -g {rg} --input-file {data_file_path}"
)
assert len(asset_data_points) == len(expected_data_points)
assert expected_data_points[1]['data_source'] in [point["dataSource"] for point in asset_data_points]
Expand Down Expand Up @@ -238,11 +238,11 @@ def test_asset_sub_point_lifecycle(require_init, tracked_resources, tracked_file
assert_sub_point(asset_events[i], **expected_events[i])

for file_type in FileType.list():
file_path = run(
event_file_path = run(
f"az iot ops asset event export -a {asset_name} -g {rg} -f {file_type}"
)["file_path"]
tracked_files.append(file_path)
assert os.path.exists(file_path)
tracked_files.append(event_file_path)
assert os.path.exists(event_file_path)

asset_events = run(
f"az iot ops asset event remove -a {asset_name} -g {rg} "
Expand All @@ -251,11 +251,21 @@ def test_asset_sub_point_lifecycle(require_init, tracked_resources, tracked_file
assert len(asset_events) + 1 == len(expected_events)

asset_events = run(
f"az iot ops asset event import -a {asset_name} -g {rg} --input-file {file_path}"
f"az iot ops asset event import -a {asset_name} -g {rg} --input-file {event_file_path}"
)
assert len(asset_events) == len(expected_events)
assert expected_events[1]['event_notifier'] in [point["eventNotifier"] for point in asset_events]

second_asset = run(
f"az iot ops asset create -n {asset_name} -g {rg} -c {cluster_name} --cg {rg} "
f"--endpoint {endpoint_name} --data-file {data_file_path} --event-file {event_file_path}"
)
tracked_resources.append(second_asset["id"])
assert len(asset["properties"]["dataPoints"]) == len(expected_data_points)
assert_sub_point(asset["properties"]["dataPoints"][0], **expected_data_points[0])
assert len(asset["properties"]["events"]) == len(expected_events)
assert_sub_point(asset["properties"]["events"][0], **expected_events[0])


def assert_asset_props(result, **expected):
assert result["name"] == expected["name"]
Expand Down
36 changes: 32 additions & 4 deletions azext_edge/tests/edge/rpsaas/adr/asset/test_create_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from azext_edge.edge.common import ResourceProviderMapping, ResourceTypeMapping
from azext_edge.edge.providers.rpsaas.adr.base import ADR_API_VERSION

from .conftest import FULL_ASSET
from .....generators import generate_random_string


Expand All @@ -22,7 +23,7 @@
"resources.begin_create_or_update": {"result": generate_random_string()}
}], ids=["create"], indirect=True)
@pytest.mark.parametrize("asset_helpers_fixture", [{
"process_asset_sub_points": generate_random_string(),
"process_asset_sub_points": [{generate_random_string(): generate_random_string()}],
"update_properties": generate_random_string(),
}], ids=["create helpers"], indirect=True)
@pytest.mark.parametrize("req", [
Expand All @@ -41,9 +42,11 @@
"data_points": generate_random_string(),
"description": generate_random_string(),
"display_name": generate_random_string(),
"data_points_file_path": generate_random_string(),
"disabled": True,
"documentation_uri": generate_random_string(),
"events": generate_random_string(),
"events_file_path": generate_random_string(),
"external_asset_id": generate_random_string(),
"hardware_revision": generate_random_string(),
"location": generate_random_string(),
Expand Down Expand Up @@ -72,7 +75,23 @@
"ev_queue_size": 888,
},
])
def test_create_asset(mocker, mocked_cmd, mocked_resource_management_client, asset_helpers_fixture, req):
@pytest.mark.parametrize("mocked_deserialize_file_content", [[
FULL_ASSET["properties"]["events"][0],
{
"capabilityId": generate_random_string(),
"dataPointConfiguration": "{\"samplingInterval\": 100}",
"dataSource": FULL_ASSET["properties"]["dataPoints"][1]["dataSource"],
"name": generate_random_string()
}
]], ids=["subPoints"], indirect=True)
def test_create_asset(
mocker,
mocked_cmd,
mocked_deserialize_file_content,
mocked_resource_management_client,
asset_helpers_fixture,
req
):
patched_sp = asset_helpers_fixture["process_asset_sub_points"]
patched_up = asset_helpers_fixture["update_properties"]
patched_cap = mocker.patch(
Expand Down Expand Up @@ -150,10 +169,19 @@ def test_create_asset(mocker, mocked_cmd, mocked_resource_management_client, ass
# Data points + events
assert patched_sp.call_args_list[0].args[0] == "data_source"
assert patched_sp.call_args_list[0].args[1] == req.get("data_points")
assert request_props["dataPoints"] == patched_sp.return_value
expected_data_points = patched_sp.return_value
if req.get("data_points_file_path"):
mocked_deserialize_file_content.assert_any_call(file_path=req["data_points_file_path"])
expected_data_points.extend(mocked_deserialize_file_content.return_value)
assert request_props["dataPoints"] == expected_data_points

assert patched_sp.call_args_list[1].args[0] == "event_notifier"
assert patched_sp.call_args_list[1].args[1] == req.get("events")
assert request_props["events"] == patched_sp.return_value
expected_events = patched_sp.return_value
if req.get("events_file_path"):
mocked_deserialize_file_content.assert_any_call(file_path=req["events_file_path"])
expected_events.extend(mocked_deserialize_file_content.return_value)
assert request_props["events"] == expected_events


def test_create_asset_error(mocked_cmd):
Expand Down