diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index 37dfb2206..243b1ff3e 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -194,7 +194,7 @@ private void saveStats(Map partitionInfoMap, Ma if (partitionInfoMap.isEmpty()) { stat.isThroughputRateLatest = false; } else { - stat.throughputRate = taskThroughputMap.get(task.getId()); + stat.throughputRateInKBps = taskThroughputMap.get(task.getId()); stat.isThroughputRateLatest = true; } stat.totalPartitions = partitionCount; @@ -275,18 +275,18 @@ void unregisterMetricsForDatastream(String datastream) { } static class PartitionAssignmentStatPerTask { - private int throughputRate; + private int throughputRateInKBps; private int totalPartitions; private int partitionsWithUnknownThroughput; private boolean isThroughputRateLatest; //getters and setters required for fromJson and toJson - public int getThroughputRate() { - return throughputRate; + public int getThroughputRateInKBps() { + return throughputRateInKBps; } - public void setThroughputRate(int throughputRate) { - this.throughputRate = throughputRate; + public void setThroughputRateInKBps(int throughputRateInKBps) { + this.throughputRateInKBps = throughputRateInKBps; } public int getTotalPartitions() { diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java index 8539449f6..698c5e50a 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java @@ -139,7 +139,8 @@ public Map> assignPartitions(Map assignedPartitions, - List unassignedPartitions) { + List unassignedPartitions, String datastreamName) { Validate.notNull(throughputInfo, "null throughputInfo"); Validate.notNull(assignedPartitions, "null assignedPartitions"); Validate.notNull(unassignedPartitions, "null unassignedPartitions"); @@ -68,13 +69,15 @@ public int getTaskCount(ClusterThroughputInfo throughputInfo, List assig .map(p -> throughputMap.getOrDefault(p, defaultThroughputInfo)) .mapToInt(PartitionThroughputInfo::getBytesInKBRate) .sum(); - LOG.info("Total throughput in all {} partitions: {}KB/sec", allPartitions.size(), totalThroughput); + LOG.info("Total throughput in all {} partitions for datastream {}: {}KB/sec, assigned partitions: {} " + + "unassigned partitions: {}", allPartitions.size(), datastreamName, totalThroughput, + assignedPartitions.size(), unassignedPartitions.size()); double taskCapacityUtilizationCoefficient = _taskCapacityUtilizationPct / 100.0; int taskCountEstimate = (int) Math.ceil((double) totalThroughput / (_taskCapacityMBps * 1024 * taskCapacityUtilizationCoefficient)); taskCountEstimate = Math.min(allPartitions.size(), taskCountEstimate); - LOG.info("Estimated number of tasks required to handle the throughput: {}", taskCountEstimate); + LOG.info("Estimated number of tasks for datastream {} required to handle the throughput: {}", datastreamName, taskCountEstimate); return taskCountEstimate; } } diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java index 3e8b3ab44..ed8706d4e 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java @@ -44,7 +44,7 @@ public void emptyAssignmentReturnsZeroTasksTest() { List unassignedPartitions = Collections.emptyList(); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT); - int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions); + int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); Assert.assertEquals(taskCount, 0); } @@ -56,7 +56,7 @@ public void lowThroughputAssignmentReturnsOneTaskTest() { List unassignedPartitions = Collections.emptyList(); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT); - int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions); + int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); Assert.assertEquals(taskCount, 1); } @@ -67,7 +67,7 @@ public void highThroughputAssignmentTest() { List unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet()); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT); - int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions); + int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); int throughputSum = throughputInfo.getPartitionInfoMap().values().stream().mapToInt( PartitionThroughputInfo::getBytesInKBRate).sum(); @@ -83,7 +83,7 @@ public void highThroughputAssignmentTest2() { List unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet()); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT); - int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions); + int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); Assert.assertEquals(taskCount, unassignedPartitions.size()); } @@ -94,7 +94,7 @@ public void partitionsHaveDefaultWeightTest() { List unassignedPartitions = Arrays.asList("P1", "P2"); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT); - int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions); + int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); Assert.assertTrue(taskCount > 0); } } diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java index 0b662016b..885e0c6f6 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java @@ -93,7 +93,7 @@ public void assignFromScratchTest() { Assert.assertTrue(statObj.getIsThroughputRateLatest()); Assert.assertEquals(statObj.getTotalPartitions(), 1); Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0); - Assert.assertEquals(statObj.getThroughputRate(), 5); + Assert.assertEquals(statObj.getThroughputRateInKBps(), 5); } @Test @@ -147,7 +147,7 @@ public void newAssignmentRetainsTasksFromOtherDatastreamsTest() { Assert.assertTrue(statObj.getIsThroughputRateLatest()); Assert.assertEquals(statObj.getTotalPartitions(), 1); Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0); - Assert.assertEquals(statObj.getThroughputRate(), 5); + Assert.assertEquals(statObj.getThroughputRateInKBps(), 5); } @Test @@ -182,7 +182,7 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() { Assert.assertFalse(statObj.getIsThroughputRateLatest()); Assert.assertEquals(statObj.getTotalPartitions(), 2); Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 2); - Assert.assertEquals(statObj.getThroughputRate(), 0); + Assert.assertEquals(statObj.getThroughputRateInKBps(), 0); } @Test