Skip to content

Commit

Permalink
Fix the metrics deregistration in AbstractKafkaConnector when multipl…
Browse files Browse the repository at this point in the history
…e stop are called (#865)

Metric registration and deregistration are ref counted. For aggregate metrics, all the tasks share the same metric object. If the ref-count gets to zero, the metric Register class removes the metrics and on next registration, creates a new metric object that will be emitted outside the container. So, if there are more de-registration than the registration, some of the live running tasks will point to a metric object, that no longer exists in Metric Registry, resulting in incorrect Aggregate metrics.

This PR addresses and fixes that deregistration of metrics should happen only once per task thread.
  • Loading branch information
vmaheshw authored Nov 5, 2021
1 parent 64129c2 commit 4522c52
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -93,6 +94,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu
protected volatile long _lastPollCompletedTimeMillis = 0;
protected final CountDownLatch _startedLatch = new CountDownLatch(1);
protected final CountDownLatch _stoppedLatch = new CountDownLatch(1);
private final AtomicBoolean _metricDeregistered = new AtomicBoolean(false);

// config
protected DatastreamTask _datastreamTask;
Expand Down Expand Up @@ -451,6 +453,13 @@ public void stop() {
_logger.info("Waking up the consumer for task {}", _taskName);
_consumer.wakeup();
}
if (!_metricDeregistered.getAndSet(true)) {
deregisterMetrics();
}
}

@VisibleForTesting
void deregisterMetrics() {
_consumerMetrics.deregisterMetrics();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {

synchronized (_runningTasks) {
Set<DatastreamTask> toCancel = new HashSet<>(_runningTasks.keySet());
toCancel.removeAll(tasks);
tasks.forEach(toCancel::remove);

if (toCancel.size() > 0) {
// Mark the connector task as stopped so that, in case stopping the task here fails for any reason in
Expand Down Expand Up @@ -363,13 +363,12 @@ private Future<DatastreamTask> asyncStopTask(DatastreamTask task, ConnectorTaskE
private DatastreamTask stopTask(DatastreamTask datastreamTask, ConnectorTaskEntry connectorTaskEntry) {
try {
connectorTaskEntry.setPendingStop();

AbstractKafkaBasedConnectorTask connectorTask = connectorTaskEntry.getConnectorTask();
connectorTask.stop();
boolean stopped = connectorTask.awaitStop(CANCEL_TASK_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (!stopped) {
_logger.warn("Connector task for datastream task {} took longer than {} ms to stop. Interrupting the thread.",
datastreamTask, CANCEL_TASK_TIMEOUT.toMillis());
datastreamTask.getDatastreamTaskName(), CANCEL_TASK_TIMEOUT.toMillis());
connectorTaskEntry.getThread().interrupt();
// Check that the thread really got interrupted and log a message if it seems like the thread is still running.
// Threads which don't check for the interrupt status may land up running forever, and we would like to
Expand Down Expand Up @@ -398,10 +397,7 @@ private DatastreamTask stopTask(DatastreamTask datastreamTask, ConnectorTaskEntr
*/
protected boolean isTaskThreadDead(ConnectorTaskEntry connectorTaskEntry) {
Thread taskThread = connectorTaskEntry.getThread();
if (taskThread == null || !taskThread.isAlive()) {
return true;
}
return false;
return taskThread == null || !taskThread.isAlive();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ public void testOnAssignmentChangeReassignment() {
// With failStopTaskOnce set to true the AbstractKafkaBasedConnectorTask.stop is configured
// to fail the first time with InterruptedException and pass the second time.
TestKafkaConnector connector = new TestKafkaConnector(false, props, true);
DatastreamTask datastreamTask1 = new DatastreamTaskImpl();
((DatastreamTaskImpl) datastreamTask1).setTaskPrefix("testtask1");
DatastreamTaskImpl datastreamTask1 = new DatastreamTaskImpl();
datastreamTask1.setTaskPrefix("testtask1");
connector.onAssignmentChange(Collections.singletonList(datastreamTask1));
connector.start(null);

DatastreamTask datastreamTask2 = new DatastreamTaskImpl();
((DatastreamTaskImpl) datastreamTask2).setTaskPrefix("testtask2");
DatastreamTaskImpl datastreamTask2 = new DatastreamTaskImpl();
datastreamTask2.setTaskPrefix("testtask2");
// AbstractKafkaBasedConnectorTask stop should fail on this onAssignmentChange call
connector.onAssignmentChange(Collections.singletonList(datastreamTask2));
Assert.assertEquals(connector.getTasksToStopCount(), 1);
Expand All @@ -115,13 +115,13 @@ public void testOnAssignmentChangeStopTaskFailure() {
// With failStopTaskOnce set to true the AbstractKafkaBasedConnectorTask.stop is configured
// to fail the first time with InterruptedException and pass the second time.
TestKafkaConnector connector = new TestKafkaConnector(false, props, true);
DatastreamTask datastreamTask = new DatastreamTaskImpl();
((DatastreamTaskImpl) datastreamTask).setTaskPrefix("testtask1");
DatastreamTaskImpl datastreamTask = new DatastreamTaskImpl();
datastreamTask.setTaskPrefix("testtask1");
connector.onAssignmentChange(Collections.singletonList(datastreamTask));
connector.start(null);

datastreamTask = new DatastreamTaskImpl();
((DatastreamTaskImpl) datastreamTask).setTaskPrefix("testtask2");
datastreamTask.setTaskPrefix("testtask2");
// AbstractKafkaBasedConnectorTask stop should fail on this onAssignmentChange call
connector.onAssignmentChange(Collections.singletonList(datastreamTask));
Assert.assertEquals(connector.getTasksToStopCount(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,13 @@ public void testRewindWhenSkippingMessage() throws Exception {
any(Exception.class));
//Verify that we have call at least seekToLastCheckpoint twice as the skip messages also trigger this
verify(connectorTask, atLeast(2)).seekToLastCheckpoint(ImmutableSet.of(topicPartition));
verify(connectorTask, times(0)).deregisterMetrics();
connectorTask.stop();
// Verify that multiple stop requests do not result in multiple metric de-registration.
connectorTask.stop();
verify(connectorTask, times(1)).deregisterMetrics();
connectorTask.stop();
verify(connectorTask, times(1)).deregisterMetrics();
}

@Test
Expand Down

0 comments on commit 4522c52

Please sign in to comment.