From f1fe9af01ff1616c3a4e6e4ed9e614f64cc0c0c4 Mon Sep 17 00:00:00 2001 From: Venkat Date: Mon, 10 Jun 2024 17:38:27 -0400 Subject: [PATCH] bug fix --- anyscale_provider/operators/anyscale.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/anyscale_provider/operators/anyscale.py b/anyscale_provider/operators/anyscale.py index 8c415c4..4fe3ecd 100644 --- a/anyscale_provider/operators/anyscale.py +++ b/anyscale_provider/operators/anyscale.py @@ -151,7 +151,7 @@ def defer_job_polling(self, job_id: str) -> None: def get_current_status(self, job_id: str) -> str: return str(self.hook.get_job_status(job_id=job_id).state) - def execute_complete(self, context: Context, event: Any) -> None: + def execute_complete(self, context: Context, event: TriggerEvent) -> None: current_job_id = event["job_id"] if event["status"] == JobState.FAILED: @@ -221,7 +221,6 @@ def __init__(self, super().__init__(**kwargs) self.conn_id = conn_id - # Set up explicit parameters self.service_params: Dict[str, Any] = { 'name': name, 'image_uri': image_uri, @@ -244,11 +243,9 @@ def __init__(self, self.canary_percent = canary_percent self.max_surge_percent = max_surge_percent - # Ensure name is not empty if not self.service_params['name']: raise ValueError("Service name is required.") - # Ensure at least one application is specified if not self.service_params['applications']: raise ValueError("At least one application must be specified.") @@ -265,7 +262,6 @@ def execute(self, context: Context) -> Optional[str]: svc_config = ServiceConfig(**self.service_params) self.log.info("Service with config object: {}".format(svc_config)) - # Call the SDK method with the dynamically created service model service_id = self.hook.deploy_service(config=svc_config, in_place=self.in_place, canary_percent=self.canary_percent, @@ -282,15 +278,15 @@ def execute(self, context: Context) -> Optional[str]: self.log.info(f"Service rollout id: {service_id}") return service_id - def execute_complete(self, context: Context, event: Any) -> None: + def execute_complete(self, context: Context, event: TriggerEvent) -> None: self.log.info(f"Execution completed...") - service_id = event["service_name"] + service_id = event.payload["service_name"] - if event["status"] == ServiceState.SYSTEM_FAILURE: - self.log.info(f"Anyscale service deployment {service_id} ended with status: {event['status']}") - raise AirflowException(f"Job {service_id} failed with error {event['message']}") + if event.payload["status"] == ServiceState.SYSTEM_FAILURE: + self.log.info(f"Anyscale service deployment {service_id} ended with status: {event.payload['status']}") + raise AirflowException(f"Job {service_id} failed with error {event.payload['message']}") else: - self.log.info(f"Anyscale service deployment {service_id} completed with status: {event['status']}") + self.log.info(f"Anyscale service deployment {service_id} completed with status: {event.payload['status']}") return None