Skip to content

Commit

Permalink
[apache/helix] -- Added Full cache refresh trigger after cleaning up …
Browse files Browse the repository at this point in the history
…of a workflow.
  • Loading branch information
himanshukandwal committed Nov 3, 2024
1 parent 8c94db9 commit 92c549a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ private void cleanupWorkflow(String workflow) {
// clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
// and jobs will rescheduled again.
removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache());
// Request for full-data refresh to re-fetch workflow resource configs.
_clusterDataCache.requireFullRefresh();
}
} else {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -85,6 +86,36 @@ public void testDeleteWorkflow() throws InterruptedException {
Assert.assertNull(_driver.getWorkflowContext(jobQueueName));
}

@Test
public void testDeleteWorkflowAndRecreate() throws Exception {
String workflowId_1 = TestHelper.getTestMethodName();
String workflowId_2 = TestHelper.getTestMethodName() + "1";
String workflowId_3 = TestHelper.getTestMethodName() + "2";

Workflow.Builder builder_1 = WorkflowGenerator.generateDefaultSingleJobWorkflowBuilder(workflowId_1);
Workflow.Builder builder_2 = WorkflowGenerator.generateDefaultSingleJobWorkflowBuilder(workflowId_2);
Workflow.Builder builder_3 = WorkflowGenerator.generateDefaultSingleJobWorkflowBuilder(workflowId_3);

_driver.start(builder_1.build());

TaskState polledState = _driver.pollForWorkflowState(workflowId_1, 2_000L, TaskState.COMPLETED, TaskState.FAILED);
Assert.assertEquals(TaskState.COMPLETED, polledState);
Assert.assertNotNull(_driver.getWorkflowConfig(workflowId_1));

// delete workflowId_1 and start other workflows
_driver.start(builder_2.build());
_driver.deleteAndWaitForCompletion(workflowId_1, 2_000);
_driver.start(builder_3.build());
Assert.assertNull(_driver.getWorkflowConfig(workflowId_1));

// re-create workflowId_1
_driver.start(builder_1.build());
TaskState recreatedPolledState = _driver.pollForWorkflowState(workflowId_1, 40_000L, TaskState.COMPLETED, TaskState.FAILED);
Assert.assertEquals(TaskState.COMPLETED, recreatedPolledState);
Assert.assertNotNull(_driver.getWorkflowConfig(workflowId_1));
Assert.assertNotNull(_driver.getWorkflowContext(workflowId_1));
}

@Test
public void testDeleteWorkflowForcefully() throws InterruptedException {
String jobQueueName = TestHelper.getTestMethodName();
Expand Down

0 comments on commit 92c549a

Please sign in to comment.