From e2938f0157d64b14612e67ed412979d4b7cd1459 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Mon, 11 Apr 2022 14:15:27 -0700 Subject: [PATCH] Refactor Stopping Tasks On Assignment Change of Tasks --- .../kafka/AbstractKafkaConnector.java | 106 ++++++++++-------- .../kafka/TestAbstractKafkaConnector.java | 69 ++++++++++++ 2 files changed, 128 insertions(+), 47 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java index 1fcc8689b..4ffcb611d 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java @@ -102,7 +102,10 @@ public abstract class AbstractKafkaConnector implements Connector, DiagnosticsAw // multiple concurrent threads. If access is required to both maps then the order of synchronization must be // _runningTasks followed by _tasksToStop to prevent deadlocks. private final Map _runningTasks = new HashMap<>(); - private final Map _tasksToStop = new HashMap<>(); + + // _tasksPendingStop contains the tasks that are pending stop across various assignment changes. The periodic health + // check call will attempt to stop these tasks until they are not stopped / are stuck somewhere in stop path. + private final Map _tasksPendingStop = new HashMap<>(); // A daemon executor to constantly check whether all tasks are running and restart them if not. private final ScheduledExecutorService _daemonThreadExecutorService = @@ -154,23 +157,25 @@ public synchronized void onAssignmentChange(List tasks) { _logger.info("onAssignmentChange called with tasks {}", tasks); synchronized (_runningTasks) { + Map runningTasksToStop = new HashMap<>(); Set toCancel = new HashSet<>(_runningTasks.keySet()); tasks.forEach(toCancel::remove); if (toCancel.size() > 0) { // Mark the connector task as stopped so that, in case stopping the task here fails for any reason in // restartDeadTasks the task is not restarted - synchronized (_tasksToStop) { + synchronized (_tasksPendingStop) { toCancel.forEach(task -> { - _tasksToStop.put(task, _runningTasks.get(task)); + runningTasksToStop.put(task, _runningTasks.get(task)); + _tasksPendingStop.put(task, _runningTasks.get(task)); _runningTasks.remove(task); }); } - stopUnassignedTasks(); + scheduleTasksToStop(runningTasksToStop); } boolean toCallRestartDeadTasks = false; - synchronized (_tasksToStop) { + synchronized (_tasksPendingStop) { for (DatastreamTask task : tasks) { ConnectorTaskEntry connectorTaskEntry = _runningTasks.get(task); if (connectorTaskEntry != null) { @@ -180,16 +185,17 @@ public synchronized void onAssignmentChange(List tasks) { // This is necessary because DatastreamTaskImpl.hashCode() does not take into account all the // fields/properties of the DatastreamTask (e.g. dependencies). _runningTasks.remove(task); - _runningTasks.put(task, connectorTaskEntry); } else { - if (_tasksToStop.containsKey(task)) { + // If a pending stop task is reassigned to this host, we'd have to ensure to restart the + // task or replace the connectorTaskEntry for that task in the restartDeadTasks function. + if (_tasksPendingStop.containsKey(task)) { toCallRestartDeadTasks = true; - connectorTaskEntry = _tasksToStop.remove(task); + connectorTaskEntry = _tasksPendingStop.remove(task); } else { connectorTaskEntry = createKafkaConnectorTask(task); } - _runningTasks.put(task, connectorTaskEntry); } + _runningTasks.put(task, connectorTaskEntry); } } // If any tasks pending stop were re-assigned to this host we explicitly call restartDeadTasks to ensure @@ -293,8 +299,8 @@ protected void restartDeadTasks() { * Returns the number of tasks yet to be stopped. */ int getTasksToStopCount() { - synchronized (_tasksToStop) { - return _tasksToStop.size(); + synchronized (_tasksPendingStop) { + return _tasksPendingStop.size(); } } @@ -308,44 +314,50 @@ int getRunningTasksCount() { } /** - * Attempt to stop the unassigned tasks. + * Attempt to stop the unassigned tasks from the _tasksToStop map. */ private void stopUnassignedTasks() { - synchronized (_tasksToStop) { - if (_tasksToStop.size() == 0) { - _logger.info("No tasks to stop"); - return; - } + scheduleTasksToStop(_tasksPendingStop); + } - // Spawn a separate thread to attempt stopping the connectorTask. The connectorTask will be canceled if it - // does not stop within a certain amount of time. This will force cleanup of connectorTasks which take too long - // to stop, or are stuck indefinitely. A separate thread is spawned to handle this because the Coordinator - // requires that this step completely because we call this from onAssignmentChange() (assignment thread gets - // killed if it takes too long) and restartDeadTasks which must complete quickly. - List> stopTaskFutures = _tasksToStop.keySet().stream() - .map(task -> asyncStopTask(task, _tasksToStop.get(task))) - .collect(Collectors.toList()); + /** + * Attempt to stop the unassigned tasks from the argument map. + */ + private void scheduleTasksToStop(Map tasks) { + if (tasks.size() == 0) { + _logger.info("No tasks to stop"); + return; + } - _shutdownExecutorService.submit(() -> { - List toRemoveTasks = stopTaskFutures.stream().map(stopTaskFuture -> { - try { - return stopTaskFuture.get(CANCEL_TASK_TIMEOUT.plus(POST_CANCEL_TASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS); - } catch (ExecutionException | InterruptedException | TimeoutException e) { - _logger.warn("Stop task future failed with exception", e); - } - return null; - }).filter(Objects::nonNull).collect(Collectors.toList()); - - if (toRemoveTasks.size() > 0) { - synchronized (_tasksToStop) { - // Its possible that while stopping the task was pending there was another onAssignmentChange event - // which reassigned the task back to this host and the task was moved back to _runningTasks. In this - // case the remove operation here will be a no-op. - toRemoveTasks.forEach(_tasksToStop::remove); - } + // Spawn a separate thread to attempt stopping the connectorTask. The connectorTask will be canceled if it + // does not stop within a certain amount of time. This will force cleanup of connectorTasks which take too long + // to stop, or are stuck indefinitely. A separate thread is spawned to handle this because the Coordinator + // requires that this step completely because we call this from onAssignmentChange() (assignment thread gets + // killed if it takes too long) and restartDeadTasks which must complete quickly. + List> stopTaskFutures = tasks.keySet().stream() + .map(task -> asyncStopTask(task, tasks.get(task))) + .collect(Collectors.toList()); + + _shutdownExecutorService.submit(() -> { + List toRemoveTasks = stopTaskFutures.stream().map(stopTaskFuture -> { + try { + return stopTaskFuture.get(CANCEL_TASK_TIMEOUT.plus(POST_CANCEL_TASK_TIMEOUT).toMillis(), + TimeUnit.MILLISECONDS); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + _logger.warn("Stop task future failed with exception", e); } - }); - } + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); + + if (toRemoveTasks.size() > 0) { + synchronized (_tasksPendingStop) { + // Its possible that while stopping the task was pending there was another onAssignmentChange event + // which reassigned the task back to this host and the task was moved back to _runningTasks. In this + // case the remove operation here will be a no-op. + toRemoveTasks.forEach(_tasksPendingStop::remove); + } + } + }); } @NotNull @@ -420,10 +432,10 @@ public void stop() { _runningTasks.forEach(this::asyncStopTask); _runningTasks.clear(); } - synchronized (_tasksToStop) { + synchronized (_tasksPendingStop) { // Try to stop the tasks - _tasksToStop.forEach(this::asyncStopTask); - _tasksToStop.clear(); + _tasksPendingStop.forEach(this::asyncStopTask); + _tasksPendingStop.clear(); } _logger.info("Start to shut down the shutdown executor and wait up to {} ms.", SHUTDOWN_EXECUTOR_SHUTDOWN_TIMEOUT.toMillis()); diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java index 17e242b0b..6a08ccde1 100644 --- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java +++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java @@ -14,6 +14,10 @@ import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.slf4j.Logger; @@ -138,6 +142,51 @@ public void testOnAssignmentChangeStopTaskFailure() { connector.stop(); } + @Test + public void testOnAssignmentChangeMultipleReassignments() throws InterruptedException { + Properties props = new Properties(); + // Reduce time interval between calls to restartDeadTasks to force invocation of stopTasks + props.setProperty("daemonThreadIntervalInSeconds", "2"); + // With failStopTaskOnce set to true the AbstractKafkaBasedConnectorTask.stop is configured + // to fail the first time with InterruptedException and pass the second time. + TestKafkaConnector connector = new TestKafkaConnector(false, props, true); + + // first task assignment assigns task 1 + List firstTaskAssignment = getTaskListInRange(1, 2); + connector.onAssignmentChange(firstTaskAssignment); + connector.start(null); + Assert.assertEquals(connector.getRunningTasksCount(), 1); + + // second task assignment assigns task 2,3,4,5 and takes out task 1 + List secondTaskAssignment = getTaskListInRange(2, 6); + + // during the assignment, the _taskToStop map count need to be less than 1, as only task 1 would be taken out. + ExecutorService executor = Executors.newFixedThreadPool(2); + executor.execute(() -> connector.onAssignmentChange(secondTaskAssignment)); + executor.execute(() -> Assert.assertTrue(connector.getTasksToStopCount() <= 1)); + + awaitForExecution(executor, 50L); + Assert.assertTrue(connector.getTasksToStopCount() >= 1); // the count of the _taskToStopTracker + Assert.assertEquals(connector.getRunningTasksCount(), 4); + + // second task assignment keeps task 5, assigns task 6,7,8 and takes out task 2,3,4 + List thirdTaskAssignment = getTaskListInRange(5, 9); + + // during the assignment, the _taskToStop map count need to be less than 4, as task 2,3,4 would be taken out and task 1 if not already stopped. + executor = Executors.newFixedThreadPool(2); + executor.execute(() -> connector.onAssignmentChange(thirdTaskAssignment)); + executor.execute(() -> Assert.assertTrue(connector.getTasksToStopCount() <= 4)); + + awaitForExecution(executor, 50L); + Assert.assertTrue(connector.getTasksToStopCount() >= 3); // the count of the _taskToStopTracker + + // Wait for restartDeadTasks to be called to attempt another stopTasks call + PollUtils.poll(() -> connector.getCreateTaskCalled() >= 3, Duration.ofSeconds(1).toMillis(), + Duration.ofSeconds(10).toMillis()); + Assert.assertEquals(connector.getRunningTasksCount(), 4); + connector.stop(); + } + @Test public void testCalculateThreadStartDelay() { Properties props = new Properties(); @@ -191,6 +240,26 @@ public void testRestartThrowsException() { connector.stop(); } + // helper method to generate the tasks in a range for assignment + private List getTaskListInRange(int start, int end) { + List taskAssignmentList = new ArrayList<>(); + IntStream.range(start, end).forEach(index -> { + DatastreamTaskImpl dt = new DatastreamTaskImpl(); + dt.setTaskPrefix("testtask" + index); + taskAssignmentList.add(dt); + }); + return taskAssignmentList; + } + + // helper method to await on the executor for the given timeout period + private void awaitForExecution(ExecutorService executor, Long timeUnitMs) throws InterruptedException { + try { + executor.awaitTermination(timeUnitMs, TimeUnit.MILLISECONDS); + } finally { + executor.shutdownNow(); + } + } + /** * Dummy implementation of {@link AbstractKafkaConnector} for testing purposes */