Skip to content

Commit

Permalink
Fix logging and name convention in LoadBasedPartitionAssignmentStrate…
Browse files Browse the repository at this point in the history
…gy (linkedin#866)

Co-authored-by: Vaibhav Maheshwari <[email protected]>
  • Loading branch information
vmaheshw and Vaibhav Maheshwari committed Mar 1, 2022
1 parent c7e0355 commit 32689df
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private void saveStats(Map<String, PartitionThroughputInfo> partitionInfoMap, Ma
if (partitionInfoMap.isEmpty()) {
stat.isThroughputRateLatest = false;
} else {
stat.throughputRate = taskThroughputMap.get(task.getId());
stat.throughputRateInKBps = taskThroughputMap.get(task.getId());
stat.isThroughputRateLatest = true;
}
stat.totalPartitions = partitionCount;
Expand Down Expand Up @@ -275,18 +275,18 @@ void unregisterMetricsForDatastream(String datastream) {
}

static class PartitionAssignmentStatPerTask {
private int throughputRate;
private int throughputRateInKBps;
private int totalPartitions;
private int partitionsWithUnknownThroughput;
private boolean isThroughputRateLatest;

//getters and setters required for fromJson and toJson
public int getThroughputRate() {
return throughputRate;
public int getThroughputRateInKBps() {
return throughputRateInKBps;
}

public void setThroughputRate(int throughputRate) {
this.throughputRate = throughputRate;
public void setThroughputRateInKBps(int throughputRateInKBps) {
this.throughputRateInKBps = throughputRateInKBps;
}

public int getTotalPartitions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String, Set<Datastr
}

LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(_taskCapacityMBps, _taskCapacityUtilizationPct);
int numTasksEstimateBasedOnLoad = estimator.getTaskCount(clusterThroughputInfo, assignedPartitions, unassignedPartitions);
int numTasksEstimateBasedOnLoad = estimator.getTaskCount(clusterThroughputInfo, assignedPartitions,
unassignedPartitions, datastreamGroupName);
numTasksNeeded = Math.max(numTasksNeeded, numTasksEstimateBasedOnLoad);

LOG.info("NumTask estimations for datastream {}: existingTasks: {}, PartitionCountBasedEstimate: {} "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ public LoadBasedTaskCountEstimator(int taskCapacityMBps, int taskCapacityUtiliza
* @param throughputInfo Per-partition throughput information
* @param assignedPartitions The list of assigned partitions
* @param unassignedPartitions The list of unassigned partitions
* @param datastreamName Name of the datastream
* @return The estimated number of tasks
*/
public int getTaskCount(ClusterThroughputInfo throughputInfo, List<String> assignedPartitions,
List<String> unassignedPartitions) {
List<String> unassignedPartitions, String datastreamName) {
Validate.notNull(throughputInfo, "null throughputInfo");
Validate.notNull(assignedPartitions, "null assignedPartitions");
Validate.notNull(unassignedPartitions, "null unassignedPartitions");
Expand All @@ -68,13 +69,15 @@ public int getTaskCount(ClusterThroughputInfo throughputInfo, List<String> assig
.map(p -> throughputMap.getOrDefault(p, defaultThroughputInfo))
.mapToInt(PartitionThroughputInfo::getBytesInKBRate)
.sum();
LOG.info("Total throughput in all {} partitions: {}KB/sec", allPartitions.size(), totalThroughput);
LOG.info("Total throughput in all {} partitions for datastream {}: {}KB/sec, assigned partitions: {} "
+ "unassigned partitions: {}", allPartitions.size(), datastreamName, totalThroughput,
assignedPartitions.size(), unassignedPartitions.size());

double taskCapacityUtilizationCoefficient = _taskCapacityUtilizationPct / 100.0;
int taskCountEstimate = (int) Math.ceil((double) totalThroughput /
(_taskCapacityMBps * 1024 * taskCapacityUtilizationCoefficient));
taskCountEstimate = Math.min(allPartitions.size(), taskCountEstimate);
LOG.info("Estimated number of tasks required to handle the throughput: {}", taskCountEstimate);
LOG.info("Estimated number of tasks for datastream {} required to handle the throughput: {}", datastreamName, taskCountEstimate);
return taskCountEstimate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void emptyAssignmentReturnsZeroTasksTest() {
List<String> unassignedPartitions = Collections.emptyList();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertEquals(taskCount, 0);
}

Expand All @@ -56,7 +56,7 @@ public void lowThroughputAssignmentReturnsOneTaskTest() {
List<String> unassignedPartitions = Collections.emptyList();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertEquals(taskCount, 1);
}

Expand All @@ -67,7 +67,7 @@ public void highThroughputAssignmentTest() {
List<String> unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet());
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");

int throughputSum = throughputInfo.getPartitionInfoMap().values().stream().mapToInt(
PartitionThroughputInfo::getBytesInKBRate).sum();
Expand All @@ -83,7 +83,7 @@ public void highThroughputAssignmentTest2() {
List<String> unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet());
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertEquals(taskCount, unassignedPartitions.size());
}

Expand All @@ -94,7 +94,7 @@ public void partitionsHaveDefaultWeightTest() {
List<String> unassignedPartitions = Arrays.asList("P1", "P2");
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertTrue(taskCount > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void assignFromScratchTest() {
Assert.assertTrue(statObj.getIsThroughputRateLatest());
Assert.assertEquals(statObj.getTotalPartitions(), 1);
Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0);
Assert.assertEquals(statObj.getThroughputRate(), 5);
Assert.assertEquals(statObj.getThroughputRateInKBps(), 5);
}

@Test
Expand Down Expand Up @@ -147,7 +147,7 @@ public void newAssignmentRetainsTasksFromOtherDatastreamsTest() {
Assert.assertTrue(statObj.getIsThroughputRateLatest());
Assert.assertEquals(statObj.getTotalPartitions(), 1);
Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0);
Assert.assertEquals(statObj.getThroughputRate(), 5);
Assert.assertEquals(statObj.getThroughputRateInKBps(), 5);
}

@Test
Expand Down Expand Up @@ -182,7 +182,7 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() {
Assert.assertFalse(statObj.getIsThroughputRateLatest());
Assert.assertEquals(statObj.getTotalPartitions(), 2);
Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 2);
Assert.assertEquals(statObj.getThroughputRate(), 0);
Assert.assertEquals(statObj.getThroughputRateInKBps(), 0);
}

@Test
Expand Down

0 comments on commit 32689df

Please sign in to comment.