Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
venkatajagannath committed Jun 10, 2024
1 parent cececb4 commit f1fe9af
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions anyscale_provider/operators/anyscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def defer_job_polling(self, job_id: str) -> None:
def get_current_status(self, job_id: str) -> str:
return str(self.hook.get_job_status(job_id=job_id).state)

def execute_complete(self, context: Context, event: Any) -> None:
def execute_complete(self, context: Context, event: TriggerEvent) -> None:
current_job_id = event["job_id"]

if event["status"] == JobState.FAILED:
Expand Down Expand Up @@ -221,7 +221,6 @@ def __init__(self,
super().__init__(**kwargs)
self.conn_id = conn_id

# Set up explicit parameters
self.service_params: Dict[str, Any] = {
'name': name,
'image_uri': image_uri,
Expand All @@ -244,11 +243,9 @@ def __init__(self,
self.canary_percent = canary_percent
self.max_surge_percent = max_surge_percent

# Ensure name is not empty
if not self.service_params['name']:
raise ValueError("Service name is required.")

# Ensure at least one application is specified
if not self.service_params['applications']:
raise ValueError("At least one application must be specified.")

Expand All @@ -265,7 +262,6 @@ def execute(self, context: Context) -> Optional[str]:
svc_config = ServiceConfig(**self.service_params)
self.log.info("Service with config object: {}".format(svc_config))

# Call the SDK method with the dynamically created service model
service_id = self.hook.deploy_service(config=svc_config,
in_place=self.in_place,
canary_percent=self.canary_percent,
Expand All @@ -282,15 +278,15 @@ def execute(self, context: Context) -> Optional[str]:
self.log.info(f"Service rollout id: {service_id}")
return service_id

def execute_complete(self, context: Context, event: Any) -> None:
def execute_complete(self, context: Context, event: TriggerEvent) -> None:
self.log.info(f"Execution completed...")
service_id = event["service_name"]
service_id = event.payload["service_name"]

if event["status"] == ServiceState.SYSTEM_FAILURE:
self.log.info(f"Anyscale service deployment {service_id} ended with status: {event['status']}")
raise AirflowException(f"Job {service_id} failed with error {event['message']}")
if event.payload["status"] == ServiceState.SYSTEM_FAILURE:
self.log.info(f"Anyscale service deployment {service_id} ended with status: {event.payload['status']}")
raise AirflowException(f"Job {service_id} failed with error {event.payload['message']}")
else:
self.log.info(f"Anyscale service deployment {service_id} completed with status: {event['status']}")
self.log.info(f"Anyscale service deployment {service_id} completed with status: {event.payload['status']}")

return None

Expand Down

0 comments on commit f1fe9af

Please sign in to comment.