Skip to content

Commit

Permalink
IOT - Improve Job and JobTemplate (#8322)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsuchenia authored Nov 17, 2024
1 parent a0eb7f8 commit 01829b2
Show file tree
Hide file tree
Showing 9 changed files with 747 additions and 38 deletions.
5 changes: 5 additions & 0 deletions CLOUDFORMATION_COVERAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ Please let us know if you'd like support for a resource not yet listed here.
- [x] update implemented
- [x] delete implemented
- [x] Fn::GetAtt implemented
- AWS::IoT::JobTemplate:
- [x] create implemented
- [x] update implemented
- [x] delete implemented
- [x] Fn::GetAtt implemented
- AWS::KMS::Key:
- [x] create implemented
- [ ] update implemented
Expand Down
8 changes: 4 additions & 4 deletions IMPLEMENTATION_COVERAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4528,7 +4528,7 @@
- [ ] create_dynamic_thing_group
- [ ] create_fleet_metric
- [X] create_job
- [ ] create_job_template
- [X] create_job_template
- [X] create_keys_and_certificate
- [ ] create_mitigation_action
- [ ] create_ota_update
Expand Down Expand Up @@ -4562,7 +4562,7 @@
- [ ] delete_fleet_metric
- [X] delete_job
- [X] delete_job_execution
- [ ] delete_job_template
- [X] delete_job_template
- [ ] delete_mitigation_action
- [ ] delete_ota_update
- [ ] delete_package
Expand Down Expand Up @@ -4604,7 +4604,7 @@
- [ ] describe_index
- [X] describe_job
- [X] describe_job_execution
- [ ] describe_job_template
- [X] describe_job_template
- [ ] describe_managed_job_template
- [ ] describe_mitigation_action
- [ ] describe_provisioning_template
Expand Down Expand Up @@ -4665,7 +4665,7 @@
- [ ] list_indices
- [X] list_job_executions_for_job
- [X] list_job_executions_for_thing
- [ ] list_job_templates
- [X] list_job_templates
- [X] list_jobs
- [ ] list_managed_job_templates
- [ ] list_metric_values
Expand Down
9 changes: 9 additions & 0 deletions moto/iot/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,12 @@ def __init__(self, name: str):
"InvalidRequestException",
f"Cannot delete. Thing {name} is still attached to one or more principals",
)


class ConflictException(IoTClientError):
def __init__(self, name: str):
self.code = 409
super().__init__(
"ConflictException",
f"A resource {name} already exists.",
)
245 changes: 213 additions & 32 deletions moto/iot/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from .exceptions import (
CertificateStateException,
ConflictException,
DeleteConflictException,
InvalidRequestException,
InvalidStateTransitionException,
Expand All @@ -33,7 +34,7 @@
VersionConflictException,
VersionsLimitExceededException,
)
from .utils import PAGINATION_MODEL
from .utils import PAGINATION_MODEL, decapitalize_dict

if TYPE_CHECKING:
from moto.iotdata.models import FakeShadow
Expand Down Expand Up @@ -667,6 +668,10 @@ def __init__(
target_selection: str,
job_executions_rollout_config: Dict[str, Any],
document_parameters: Dict[str, str],
abort_config: Dict[str, List[Dict[str, Any]]],
job_execution_retry_config: Dict[str, Any],
scheduling_config: Dict[str, Any],
timeout_config: Dict[str, Any],
account_id: str,
region_name: str,
):
Expand All @@ -685,6 +690,10 @@ def __init__(
self.presigned_url_config = presigned_url_config
self.target_selection = target_selection
self.job_executions_rollout_config = job_executions_rollout_config
self.abort_config = abort_config
self.job_execution_retry_config = job_execution_retry_config
self.scheduling_config = scheduling_config
self.timeout_config = timeout_config
self.status = "QUEUED" # IN_PROGRESS | CANCELED | COMPLETED
self.comment: Optional[str] = None
self.reason_code: Optional[str] = None
Expand All @@ -711,7 +720,7 @@ def to_dict(self) -> Dict[str, Any]:
"description": self.description,
"presignedUrlConfig": self.presigned_url_config,
"targetSelection": self.target_selection,
"jobExecutionsRolloutConfig": self.job_executions_rollout_config,
"timeoutConfig": self.timeout_config,
"status": self.status,
"comment": self.comment,
"forceCanceled": self.force,
Expand Down Expand Up @@ -783,6 +792,139 @@ def to_dict(self) -> Dict[str, Any]:
}


class FakeJobTemplate(CloudFormationModel):
JOB_TEMPLATE_ID_REGEX_PATTERN = "[a-zA-Z0-9_-]+"
JOB_TEMPLATE_ID_REGEX = re.compile(JOB_TEMPLATE_ID_REGEX_PATTERN)

def __init__(
self,
job_template_id: str,
document_source: str,
document: str,
description: str,
presigned_url_config: Dict[str, Any],
job_executions_rollout_config: Dict[str, Any],
abort_config: Dict[str, List[Dict[str, Any]]],
job_execution_retry_config: Dict[str, Any],
timeout_config: Dict[str, Any],
account_id: str,
region_name: str,
):
if not self._job_template_id_matcher(job_template_id):
raise InvalidRequestException()

self.account_id = account_id
self.region_name = region_name
self.job_template_id = job_template_id
self.job_template_arn = f"arn:{get_partition(self.region_name)}:iot:{self.region_name}:{self.account_id}:jobtemplate/{job_template_id}"
self.document_source = document_source
self.document = document
self.description = description
self.presigned_url_config = presigned_url_config
self.job_executions_rollout_config = job_executions_rollout_config
self.abort_config = abort_config
self.job_execution_retry_config = job_execution_retry_config
self.timeout_config = timeout_config
self.created_at = time.mktime(datetime(2015, 1, 1).timetuple())

def to_dict(self) -> Dict[str, Any]:
return {
"jobTemplateArn": self.job_template_arn,
"jobTemplateId": self.job_template_id,
"description": self.description,
"createdAt": self.created_at,
}

def _job_template_id_matcher(self, argument: str) -> bool:
return (
self.JOB_TEMPLATE_ID_REGEX.fullmatch(argument) is not None
and len(argument) <= 64
)

@staticmethod
def cloudformation_name_type() -> str:
return "JobTemplate"

@staticmethod
def cloudformation_type() -> str:
return "AWS::IoT::JobTemplate"

@classmethod
def has_cfn_attr(cls, attr: str) -> bool:
return attr in [
"Arn",
]

def get_cfn_attribute(self, attribute_name: str) -> Any:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException

if attribute_name == "Arn":
return self.job_template_arn
raise UnformattedGetAttTemplateException()

@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "FakeJobTemplate":
iot_backend = iot_backends[account_id][region_name]
properties = cloudformation_json["Properties"]

return iot_backend.create_job_template(
job_template_id=properties.get("JobTemplateId", resource_name),
document_source=properties.get("DocumentSource", ""),
document=properties.get("Document"),
description=properties.get("Description"),
presigned_url_config=decapitalize_dict(
properties.get("PresignedUrlConfig", {})
),
job_executions_rollout_config=decapitalize_dict(
properties.get("JobExecutionsRolloutConfig", {})
),
abort_config=decapitalize_dict(properties.get("AbortConfig", {})),
job_execution_retry_config=decapitalize_dict(
properties.get("JobExecutionsRetryConfig", {})
),
timeout_config=decapitalize_dict(properties.get("TimeoutConfig", {})),
)

@classmethod
def update_from_cloudformation_json( # type: ignore[misc]
cls,
original_resource: "FakeJobTemplate",
new_resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> "FakeJobTemplate":
iot_backend = iot_backends[account_id][region_name]
iot_backend.delete_job_template(
job_template_id=original_resource.job_template_id
)

return cls.create_from_cloudformation_json(
new_resource_name, cloudformation_json, account_id, region_name
)

@classmethod
def delete_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> None:
properties = cloudformation_json["Properties"]
job_template_id = properties.get("JobTemplateId", resource_name)

iot_backend = iot_backends[account_id][region_name]
iot_backend.delete_job_template(job_template_id=job_template_id)


class FakeEndpoint(BaseModel):
def __init__(self, endpoint_type: str, region_name: str):
if endpoint_type not in [
Expand Down Expand Up @@ -1110,6 +1252,7 @@ def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.things: Dict[str, FakeThing] = OrderedDict()
self.jobs: Dict[str, FakeJob] = OrderedDict()
self.jobs_templates: Dict[str, FakeJobTemplate] = OrderedDict()
self.job_executions: Dict[Tuple[str, str], FakeJobExecution] = OrderedDict()
self.thing_types: Dict[str, FakeThingType] = OrderedDict()
self.thing_groups: Dict[str, FakeThingGroup] = OrderedDict()
Expand Down Expand Up @@ -2092,19 +2235,27 @@ def create_job(
target_selection: str,
job_executions_rollout_config: Dict[str, Any],
document_parameters: Dict[str, str],
abort_config: Dict[str, List[Dict[str, Any]]],
job_execution_retry_config: Dict[str, Any],
scheduling_config: Dict[str, Any],
timeout_config: Dict[str, Any],
) -> Tuple[str, str, str]:
job = FakeJob(
job_id,
targets,
document_source,
document,
description,
presigned_url_config,
target_selection,
job_executions_rollout_config,
document_parameters,
self.account_id,
self.region_name,
job_id=job_id,
targets=targets,
document_source=document_source,
document=document,
description=description,
presigned_url_config=presigned_url_config,
target_selection=target_selection,
job_executions_rollout_config=job_executions_rollout_config,
abort_config=abort_config,
job_execution_retry_config=job_execution_retry_config,
scheduling_config=scheduling_config,
timeout_config=timeout_config,
document_parameters=document_parameters,
account_id=self.account_id,
region_name=self.region_name,
)
self.jobs[job_id] = job

Expand Down Expand Up @@ -2152,28 +2303,12 @@ def cancel_job(
def get_job_document(self, job_id: str) -> FakeJob:
return self.jobs[job_id]

def list_jobs(
self, max_results: int, token: Optional[str]
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
@paginate(PAGINATION_MODEL) # type: ignore[misc]
def list_jobs(self) -> List[Dict[str, Any]]:
"""
The following parameter are not yet implemented: Status, TargetSelection, ThingGroupName, ThingGroupId
"""
all_jobs = [_.to_dict() for _ in self.jobs.values()]
filtered_jobs = all_jobs

if token is None:
jobs = filtered_jobs[0:max_results]
next_token = str(max_results) if len(filtered_jobs) > max_results else None
else:
int_token = int(token)
jobs = filtered_jobs[int_token : int_token + max_results]
next_token = (
str(int_token + max_results)
if len(filtered_jobs) > int_token + max_results
else None
)

return jobs, next_token
return [_.to_dict() for _ in self.jobs.values()]

def describe_job_execution(
self, job_id: str, thing_name: str, execution_number: int
Expand Down Expand Up @@ -2475,5 +2610,51 @@ def update_indexing_configuration(
thingIndexingConfiguration, thingGroupIndexingConfiguration
)

def create_job_template(
self,
job_template_id: str,
document_source: str,
document: str,
description: str,
presigned_url_config: Dict[str, Any],
job_executions_rollout_config: Dict[str, Any],
abort_config: Dict[str, List[Dict[str, Any]]],
job_execution_retry_config: Dict[str, Any],
timeout_config: Dict[str, Any],
) -> "FakeJobTemplate":
if job_template_id in self.jobs_templates:
raise ConflictException(job_template_id)

job_template = FakeJobTemplate(
job_template_id=job_template_id,
document_source=document_source,
document=document,
description=description,
presigned_url_config=presigned_url_config,
job_executions_rollout_config=job_executions_rollout_config,
abort_config=abort_config,
job_execution_retry_config=job_execution_retry_config,
timeout_config=timeout_config,
account_id=self.account_id,
region_name=self.region_name,
)

self.jobs_templates[job_template_id] = job_template
return job_template

@paginate(PAGINATION_MODEL) # type: ignore[misc]
def list_job_templates(self) -> List[Dict[str, Any]]:
return [_.to_dict() for _ in self.jobs_templates.values()]

def delete_job_template(self, job_template_id: str) -> None:
if job_template_id not in self.jobs_templates:
raise ResourceNotFoundException(f"Job template {job_template_id} not found")
del self.jobs_templates[job_template_id]

def describe_job_template(self, job_template_id: str) -> FakeJobTemplate:
if job_template_id not in self.jobs_templates:
raise ResourceNotFoundException(f"Job template {job_template_id} not found")
return self.jobs_templates[job_template_id]


iot_backends = BackendDict(IoTBackend, "iot")
Loading

0 comments on commit 01829b2

Please sign in to comment.