Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logging in LoadBasedPartitionAssignmentStrategy #866

Merged
merged 5 commits into from
Nov 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private void saveStats(Map<String, PartitionThroughputInfo> partitionInfoMap, Ma
if (partitionInfoMap.isEmpty()) {
stat.isThroughputRateLatest = false;
} else {
stat.throughputRate = taskThroughputMap.get(task.getId());
stat.throughputRateInKBps = taskThroughputMap.get(task.getId());
stat.isThroughputRateLatest = true;
}
stat.totalPartitions = partitionCount;
Expand Down Expand Up @@ -275,18 +275,18 @@ void unregisterMetricsForDatastream(String datastream) {
}

static class PartitionAssignmentStatPerTask {
private int throughputRate;
private int throughputRateInKBps;
private int totalPartitions;
private int partitionsWithUnknownThroughput;
private boolean isThroughputRateLatest;

//getters and setters required for fromJson and toJson
public int getThroughputRate() {
return throughputRate;
public int getThroughputRateInKBps() {
return throughputRateInKBps;
}

public void setThroughputRate(int throughputRate) {
this.throughputRate = throughputRate;
public void setThroughputRateInKBps(int throughputRateInKBps) {
this.throughputRateInKBps = throughputRateInKBps;
}

public int getTotalPartitions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String, Set<Datastr
}

LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(_taskCapacityMBps, _taskCapacityUtilizationPct);
int numTasksEstimateBasedOnLoad = estimator.getTaskCount(clusterThroughputInfo, assignedPartitions, unassignedPartitions);
int numTasksEstimateBasedOnLoad = estimator.getTaskCount(clusterThroughputInfo, assignedPartitions,
unassignedPartitions, datastreamGroupName);
numTasksNeeded = Math.max(numTasksNeeded, numTasksEstimateBasedOnLoad);

LOG.info("NumTask estimations for datastream {}: existingTasks: {}, PartitionCountBasedEstimate: {} "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ public LoadBasedTaskCountEstimator(int taskCapacityMBps, int taskCapacityUtiliza
* @param throughputInfo Per-partition throughput information
* @param assignedPartitions The list of assigned partitions
* @param unassignedPartitions The list of unassigned partitions
* @param datastreamName Name of the datastream
* @return The estimated number of tasks
*/
public int getTaskCount(ClusterThroughputInfo throughputInfo, List<String> assignedPartitions,
List<String> unassignedPartitions) {
List<String> unassignedPartitions, String datastreamName) {
Validate.notNull(throughputInfo, "null throughputInfo");
Validate.notNull(assignedPartitions, "null assignedPartitions");
Validate.notNull(unassignedPartitions, "null unassignedPartitions");
Expand All @@ -68,13 +69,15 @@ public int getTaskCount(ClusterThroughputInfo throughputInfo, List<String> assig
.map(p -> throughputMap.getOrDefault(p, defaultThroughputInfo))
.mapToInt(PartitionThroughputInfo::getBytesInKBRate)
.sum();
LOG.info("Total throughput in all {} partitions: {}KB/sec", allPartitions.size(), totalThroughput);
LOG.info("Total throughput in all {} partitions for datastream {}: {}KB/sec, assigned partitions: {} "
+ "unassigned partitions: {}", allPartitions.size(), datastreamName, totalThroughput,
assignedPartitions.size(), unassignedPartitions.size());

double taskCapacityUtilizationCoefficient = _taskCapacityUtilizationPct / 100.0;
int taskCountEstimate = (int) Math.ceil((double) totalThroughput /
(_taskCapacityMBps * 1024 * taskCapacityUtilizationCoefficient));
taskCountEstimate = Math.min(allPartitions.size(), taskCountEstimate);
LOG.info("Estimated number of tasks required to handle the throughput: {}", taskCountEstimate);
LOG.info("Estimated number of tasks for datastream {} required to handle the throughput: {}", datastreamName, taskCountEstimate);
return taskCountEstimate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void emptyAssignmentReturnsZeroTasksTest() {
List<String> unassignedPartitions = Collections.emptyList();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertEquals(taskCount, 0);
}

Expand All @@ -56,7 +56,7 @@ public void lowThroughputAssignmentReturnsOneTaskTest() {
List<String> unassignedPartitions = Collections.emptyList();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertEquals(taskCount, 1);
}

Expand All @@ -67,7 +67,7 @@ public void highThroughputAssignmentTest() {
List<String> unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet());
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");

int throughputSum = throughputInfo.getPartitionInfoMap().values().stream().mapToInt(
PartitionThroughputInfo::getBytesInKBRate).sum();
Expand All @@ -83,7 +83,7 @@ public void highThroughputAssignmentTest2() {
List<String> unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet());
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertEquals(taskCount, unassignedPartitions.size());
}

Expand All @@ -94,7 +94,7 @@ public void partitionsHaveDefaultWeightTest() {
List<String> unassignedPartitions = Arrays.asList("P1", "P2");
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Assert.assertTrue(taskCount > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void assignFromScratchTest() {
Assert.assertTrue(statObj.getIsThroughputRateLatest());
Assert.assertEquals(statObj.getTotalPartitions(), 1);
Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0);
Assert.assertEquals(statObj.getThroughputRate(), 5);
Assert.assertEquals(statObj.getThroughputRateInKBps(), 5);
}

@Test
Expand Down Expand Up @@ -147,7 +147,7 @@ public void newAssignmentRetainsTasksFromOtherDatastreamsTest() {
Assert.assertTrue(statObj.getIsThroughputRateLatest());
Assert.assertEquals(statObj.getTotalPartitions(), 1);
Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 0);
Assert.assertEquals(statObj.getThroughputRate(), 5);
Assert.assertEquals(statObj.getThroughputRateInKBps(), 5);
}

@Test
Expand Down Expand Up @@ -182,7 +182,7 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() {
Assert.assertFalse(statObj.getIsThroughputRateLatest());
Assert.assertEquals(statObj.getTotalPartitions(), 2);
Assert.assertEquals(statObj.getPartitionsWithUnknownThroughput(), 2);
Assert.assertEquals(statObj.getThroughputRate(), 0);
Assert.assertEquals(statObj.getThroughputRateInKBps(), 0);
}

@Test
Expand Down