From bd92201774b84ddae0e4f31fd4ca5da45a937aca Mon Sep 17 00:00:00 2001 From: Vaibhav Maheshwari Date: Mon, 1 Jun 2020 14:16:03 -0700 Subject: [PATCH] Optimize ZookeeperCheckpointProvider flush --- .../ZookeeperCheckpointProvider.java | 46 +++++++++++++------ .../TestZookeeperCheckpointProvider.java | 39 ++++++++++++++++ 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/providers/ZookeeperCheckpointProvider.java b/datastream-server/src/main/java/com/linkedin/datastream/server/providers/ZookeeperCheckpointProvider.java index fa92e8489..bc88b4be4 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/providers/ZookeeperCheckpointProvider.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/providers/ZookeeperCheckpointProvider.java @@ -10,8 +10,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.StringUtils; @@ -28,10 +30,10 @@ import com.linkedin.datastream.server.DatastreamTask; import com.linkedin.datastream.server.zk.ZkAdapter; - /** - * ZooKeeper-backed {@link CheckpointProvider} that maintains {@link DatastreamTask} - * processing state information, e.g. offsets/checkpoints, errors. - */ +/** + * ZooKeeper-backed {@link CheckpointProvider} that maintains {@link DatastreamTask} + * processing state information, e.g. offsets/checkpoints, errors. + */ public class ZookeeperCheckpointProvider implements CheckpointProvider { public static final String CHECKPOINT_KEY_NAME = "sourceCheckpoint"; @@ -50,13 +52,14 @@ public class ZookeeperCheckpointProvider implements CheckpointProvider { new TypeReference>() { }; - private final ConcurrentHashMap> _checkpointsToCommit = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> _checkpoints = new ConcurrentHashMap<>(); + private final Set _checkpointsToCommit = new HashSet<>(); private final ConcurrentHashMap _lastCommitTime = new ConcurrentHashMap<>(); - /** - * Construct an instance of ZookeeperCheckpointProvider - * @param zkAdapter ZooKeeper client adapter to use - */ + /** + * Construct an instance of ZookeeperCheckpointProvider + * @param zkAdapter ZooKeeper client adapter to use + */ public ZookeeperCheckpointProvider(ZkAdapter zkAdapter) { _zkAdapter = zkAdapter; // Initialize metrics @@ -65,7 +68,10 @@ public ZookeeperCheckpointProvider(ZkAdapter zkAdapter) { @Override public void unassignDatastreamTask(DatastreamTask task) { - _checkpointsToCommit.remove(task); + _checkpoints.remove(task); + synchronized (_checkpointsToCommit) { + _checkpointsToCommit.remove(task); + } _lastCommitTime.remove(task); } @@ -81,12 +87,19 @@ public void updateCheckpoint(DatastreamTask task, int partition, String checkpoi if (!_lastCommitTime.containsKey(task) || Instant.now() .isAfter(_lastCommitTime.get(task).plus(CHECKPOINT_INTERVAL))) { writeCheckpointsToStore(task); + synchronized (_checkpointsToCommit) { + _checkpointsToCommit.remove(task); + } + } else { + synchronized (_checkpointsToCommit) { + _checkpointsToCommit.add(task); + } } } } private Map getOrAddCheckpointMap(DatastreamTask task) { - return _checkpointsToCommit.computeIfAbsent(task, k -> new HashMap<>()); + return _checkpoints.computeIfAbsent(task, k -> new HashMap<>()); } private void writeCheckpointsToStore(DatastreamTask task) { @@ -100,7 +113,7 @@ private void writeCheckpointsToStore(DatastreamTask task) { _dynamicMetricsManager.createOrUpdateHistogram(MODULE, CHECKPOINT_COMMIT_LATENCY_MS, System.currentTimeMillis() - startTime); - Map committedCheckpoints = _checkpointsToCommit.get(task); + Map committedCheckpoints = _checkpoints.get(task); // This check is necessary since task may have been unassigned/removed by a concurrent call to unassignDatastreamTask(). if (committedCheckpoints != null) { // Clear the checkpoints to commit. @@ -112,8 +125,11 @@ private void writeCheckpointsToStore(DatastreamTask task) { @Override public void flush() { - LOG.info("Flushing checkpoints for {} datatstream tasks to ZooKeeper", _checkpointsToCommit.size()); - _checkpointsToCommit.keySet().forEach(this::writeCheckpointsToStore); + synchronized (_checkpointsToCommit) { + LOG.info("Flushing checkpoints for {} datatstream tasks to ZooKeeper", _checkpointsToCommit.size()); + _checkpointsToCommit.forEach(this::writeCheckpointsToStore); + _checkpointsToCommit.clear(); + } LOG.info("Flushing checkpoints to ZooKeeper completed successfully"); } @@ -177,4 +193,4 @@ public List getMetricInfos() { return Collections.unmodifiableList(metrics); } -} +} \ No newline at end of file diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/providers/TestZookeeperCheckpointProvider.java b/datastream-server/src/test/java/com/linkedin/datastream/server/providers/TestZookeeperCheckpointProvider.java index fe0a7463f..4ac20b9c5 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/providers/TestZookeeperCheckpointProvider.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/providers/TestZookeeperCheckpointProvider.java @@ -10,6 +10,7 @@ import java.util.Collections; import java.util.Map; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -30,6 +31,12 @@ import com.linkedin.datastream.server.zk.ZkAdapter; import com.linkedin.datastream.testutil.EmbeddedZookeeper; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + /** * Tests for {@link ZookeeperCheckpointProvider} @@ -104,6 +111,38 @@ public void testCommitAndReadCheckpoints() { Assert.assertEquals(committedCheckpoints2.get(0), "checkpoint2"); } + @Test + public void testFlush() { + ZkAdapter adapter = spy(new ZkAdapter(_zookeeper.getConnection(), "testcluster", defaultTransportProviderName, ZkClient.DEFAULT_SESSION_TIMEOUT, + ZkClient.DEFAULT_CONNECTION_TIMEOUT, null)); + adapter.connect(); + ZookeeperCheckpointProvider checkpointProvider = new ZookeeperCheckpointProvider(adapter); + DatastreamTaskImpl datastreamTask1 = new DatastreamTaskImpl(Collections.singletonList(generateDatastream(1))); + datastreamTask1.setId("dt1"); + + DatastreamTaskImpl datastreamTask2 = new DatastreamTaskImpl(Collections.singletonList(generateDatastream(2))); + datastreamTask2.setId("dt2"); + + checkpointProvider.updateCheckpoint(datastreamTask1, 0, "checkpoint1"); + checkpointProvider.updateCheckpoint(datastreamTask2, 0, "checkpoint2"); + + Map committedCheckpoints1 = checkpointProvider.getSafeCheckpoints(datastreamTask1); + Map committedCheckpoints2 = checkpointProvider.getSafeCheckpoints(datastreamTask2); + Assert.assertEquals(committedCheckpoints1.size(), 1); + + Assert.assertEquals(committedCheckpoints1.get(0), "checkpoint1"); + Assert.assertEquals(committedCheckpoints2.get(0), "checkpoint2"); + + verify(adapter, times(2)).setDatastreamTaskStateForKey(any(), anyString(), anyString()); + Mockito.reset(adapter); + checkpointProvider.flush(); + verify(adapter, times(0)).setDatastreamTaskStateForKey(any(), anyString(), anyString()); + checkpointProvider.updateCheckpoint(datastreamTask1, 0, "checkpoint3"); + Mockito.reset(adapter); + checkpointProvider.flush(); + verify(adapter, times(1)).setDatastreamTaskStateForKey(any(), anyString(), anyString()); + } + /** * Generate a datastream */