Skip to content

Commit

Permalink
add bulk delete rest endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tdari committed Jun 26, 2024
1 parent 1215588 commit 4b8492a
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 5 deletions.
16 changes: 15 additions & 1 deletion rest/repository/workflow_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ def find_by_id(self, id: int):
session.expunge_all()
return result

def find_by_ids(self, ids: list[int]):
with session_scope() as session:
result = session.query(Workflow).filter(Workflow.id.in_(ids)).all()
session.flush()
session.expunge_all()
return result

def find_by_workspace_id(
self,
workspace_id: int,
Expand Down Expand Up @@ -75,13 +82,20 @@ def get_workflows_summary(self):
session.expunge_all()
return result

def delete(self, id):
def delete_by_id(self, id: int):
with session_scope() as session:
result = session.query(Workflow).filter(Workflow.id==id).delete()
session.flush()
session.expunge_all()
return result

def delete_by_ids(self, ids: list[int]):
with session_scope() as session:
result = session.query(Workflow).filter(Workflow.id.in_(ids)).delete(synchronize_session=False)
session.flush()
session.expunge_all()
return result

def delete_by_workspace_id(self, workspace_id: int):
with session_scope() as session:
result = session.query(Workflow).filter(Workflow.workspace_id==workspace_id).delete(synchronize_session=False)
Expand Down
24 changes: 24 additions & 0 deletions rest/routers/workflow_router.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
from fastapi import APIRouter, HTTPException, status, Depends, Response
from schemas.context.auth_context import AuthorizationContextData
from typing import List
from services.workflow_service import WorkflowService
from schemas.requests.workflow import CreateWorkflowRequest, ListWorkflowsFilters, RunWorkflowsRequest
from schemas.responses.workflow import (
DeleteWorkflowsResponse,
GetWorkflowsResponse,
GetWorkflowResponse,
CreateWorkflowResponse,
Expand Down Expand Up @@ -163,6 +165,28 @@ async def delete_workflow(
except (BaseException, ForbiddenException, ResourceNotFoundException) as e:
raise HTTPException(status_code=e.status_code, detail=e.message)

@router.delete(
path="",
status_code=207,
response_model=DeleteWorkflowsResponse,
responses={
status.HTTP_207_MULTI_STATUS: {"model": DeleteWorkflowsResponse},
status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": SomethingWrongError},
status.HTTP_403_FORBIDDEN: {"model": ForbiddenError},
status.HTTP_404_NOT_FOUND: {"model": ResourceNotFoundError},
},
dependencies=[Depends(write_authorizer.authorize)],
)
async def delete_workflows(
workflow_ids: List[int],
workspace_id: int,
):
try:
return await workflow_service.delete_workflows(
workflow_ids=workflow_ids, workspace_id=workspace_id
)
except (BaseException, ForbiddenException, ResourceNotFoundException) as e:
raise HTTPException(status_code=e.status_code, detail=e.message)


@router.post(
Expand Down
20 changes: 19 additions & 1 deletion rest/schemas/responses/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,22 @@ class CreateWorkflowResponse(BaseModel):


class DeleteWorkflowResponse(BaseModel):
workflow_id: int
workflow_id: int


class DeleteWorkflowFailureDetail(BaseModel):
id: int
message: str


class DeleteWorkflowsFailureDetails(BaseModel):
details: List[DeleteWorkflowFailureDetail]


class DeleteWorkflowsSuccessDetails(BaseModel):
details: str


class DeleteWorkflowsResponse(BaseModel):
result: str
details: DeleteWorkflowsFailureDetails | DeleteWorkflowsSuccessDetails
52 changes: 49 additions & 3 deletions rest/services/workflow_service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from math import ceil
from typing import List
from aiohttp import ClientSession
import asyncio
from copy import deepcopy
Expand All @@ -16,6 +17,10 @@
from schemas.requests.workflow import CreateWorkflowRequest, ListWorkflowsFilters, RunWorkflowsRequest, WorkflowSharedStorageSourceEnum, storage_default_piece_model_map
from schemas.responses.workflow import (
CreateWorkflowResponse,
DeleteWorkflowFailureDetail,
DeleteWorkflowsFailureDetails,
DeleteWorkflowsResponse,
DeleteWorkflowsSuccessDetails,
GetWorkflowsResponse,
GetWorkflowResponse,
GetWorkflowsResponseData,
Expand Down Expand Up @@ -627,12 +632,53 @@ async def delete_workflow(self, workflow_id: str, workspace_id: int):
try:
await self.delete_workflow_files(workflow_uuid=workflow.uuid_name)
self.airflow_client.delete_dag(dag_id=workflow.uuid_name)
self.workflow_repository.delete(id=workflow_id)
except Exception as e: # TODO improve exception handling
self.workflow_repository.delete_by_id(id=workflow_id)
except Exception as e: # TODO improve exception handling
self.logger.exception(e)
self.workflow_repository.delete(id=workflow_id)
self.workflow_repository.delete_by_id(id=workflow_id)
raise e


async def delete_workflows(self, workflow_ids: List[int], workspace_id: int):
try:
failure_details = []
workflows = self.workflow_repository.find_by_ids(ids=workflow_ids)
if not workflows:
raise ResourceNotFoundException("No workflows found.")
found_ids = [workflow.id for workflow in workflows]
not_found_ids = list(set(workflow_ids) - set(found_ids))
if not_found_ids:
not_found_details = [
DeleteWorkflowFailureDetail(id=id, message="Workflow not found.")
for id in not_found_ids
]
failure_details += not_found_details
self.workflow_repository.delete_by_ids(ids=workflow_ids)
for workflow in workflows:
if workflow.workspace_id != workspace_id:
failure_details.append(
DeleteWorkflowFailureDetail(
id=id, message="Workflow does not belong to workspace."
)
)
await self.delete_workflow_files(workflow_uuid=workflow.uuid_name)
self.airflow_client.delete_dag(dag_id=workflow.uuid_name)
if failure_details:
return DeleteWorkflowsResponse(
result="failure",
details=DeleteWorkflowsFailureDetails(details=failure_details),
)
return DeleteWorkflowsResponse(
result="success",
details=DeleteWorkflowsSuccessDetails(
details="Workflows successfully deleted."
),
)
except Exception as e:
self.logger.exception(e)
raise e


def workflow_details(self, workflow_id: str):
try:
all_tasks_response = self.airflow_client.get_all_workflow_tasks(workflow_id=workflow_id).json()
Expand Down

0 comments on commit 4b8492a

Please sign in to comment.