diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java index 19529aa79..08fc363fc 100644 --- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java +++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java @@ -630,7 +630,7 @@ public void testFetchPartitionChange() throws Exception { Map> partitionInfo = connector.getDatastreamPartitions(); Assert.assertEquals(partitionInfo.get(group.getTaskPrefix()).get().getDatastreamGroup().getName(), group.getTaskPrefix()); - Assert.assertEquals(new HashSet<>(partitionInfo.get(group.getTaskPrefix()).get().getPartitions()), + Assert.assertEquals(partitionInfo.get(group.getTaskPrefix()).get().getPartitions(), ImmutableSet.of(yummyTopic + "-0")); String saltyTopic = "SaltyPizza"; @@ -638,7 +638,7 @@ public void testFetchPartitionChange() throws Exception { Assert.assertTrue(PollUtils.poll(() -> partitionChangeCalls.get() == 2, POLL_PERIOD_MS, POLL_TIMEOUT_MS)); partitionInfo = connector.getDatastreamPartitions(); - Assert.assertEquals(new HashSet<>(partitionInfo.get(group.getTaskPrefix()).get().getPartitions()), + Assert.assertEquals(partitionInfo.get(group.getTaskPrefix()).get().getPartitions(), ImmutableSet.of(yummyTopic + "-0", saltyTopic + "-0", saltyTopic + "-1")); connector.stop(); } diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamGroupPartitionsMetadata.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamGroupPartitionsMetadata.java index fa4503f88..5181e8d46 100644 --- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamGroupPartitionsMetadata.java +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamGroupPartitionsMetadata.java @@ -5,8 +5,11 @@ */ package com.linkedin.datastream.server; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import com.linkedin.datastream.common.LogUtils; @@ -16,14 +19,23 @@ public class DatastreamGroupPartitionsMetadata { private final DatastreamGroup _datastreamGroup; - private final List _partitions; + private final Set _partitions; /** * constructor * @param datastreamGroup datastream group which handle the partitions - * @param partitions the partitions that belong to this datastream + * @param partitions the partitions in a list that belong to this datastream */ public DatastreamGroupPartitionsMetadata(DatastreamGroup datastreamGroup, List partitions) { + this(datastreamGroup, new HashSet<>(partitions)); + } + + /** + * constructor + * @param datastreamGroup datastream group which handle the partitions + * @param partitions the partitions in a set that belong to this datastream + */ + public DatastreamGroupPartitionsMetadata(DatastreamGroup datastreamGroup, Set partitions) { _datastreamGroup = datastreamGroup; _partitions = partitions; } @@ -32,12 +44,13 @@ public DatastreamGroup getDatastreamGroup() { return _datastreamGroup; } - public List getPartitions() { - return Collections.unmodifiableList(_partitions); + public Set getPartitions() { + return Collections.unmodifiableSet(_partitions); } @Override public String toString() { - return String.format("datastream %s, partitions %s", _datastreamGroup.getName(), LogUtils.logSummarizedTopicPartitionsMapping(_partitions)); + return String.format("datastream %s, partitions %s", _datastreamGroup.getName(), + LogUtils.logSummarizedTopicPartitionsMapping(new ArrayList<>(_partitions))); } } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index 35c2e1ef2..8feea662f 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -91,7 +91,7 @@ public Map> assignPartitions( tasks.forEach(task -> { if (task.getTaskPrefix().equals(datastreamGroupName)) { Set retainedPartitions = new HashSet<>(task.getPartitionsV2()); - retainedPartitions.retainAll(new HashSet<>(partitionMetadata.getPartitions())); + retainedPartitions.retainAll(partitionMetadata.getPartitions()); newPartitionAssignmentMap.put(task.getId(), retainedPartitions); if (retainedPartitions.size() != task.getPartitionsV2().size()) { tasksWithChangedPartition.add(task.getId()); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java index 8eb6d0dc7..4f3ae61c3 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java @@ -262,7 +262,7 @@ public Map> assignPartitions(Map newPartitions = new HashSet<>(task.getPartitionsV2()); - newPartitions.retainAll(new HashSet<>(datastreamPartitions.getPartitions())); + newPartitions.retainAll(datastreamPartitions.getPartitions()); //We need to create new task if the partition is changed boolean partitionChanged = newPartitions.size() != task.getPartitionsV2().size(); @@ -332,7 +332,7 @@ public Map> movePartitions(Map allToReassignPartitions = new HashSet<>(); targetAssignment.values().forEach(allToReassignPartitions::addAll); - allToReassignPartitions.retainAll(new HashSet<>(partitionsMetadata.getPartitions())); + allToReassignPartitions.retainAll(partitionsMetadata.getPartitions()); // construct a map to store the tasks and if it contain the partitions that can be released // map: