Skip to content

Commit

Permalink
fix flaky test cases with a bit more robust waiting logic (#8154)
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince authored Feb 8, 2022
1 parent e4e5c6d commit a47af49
Showing 1 changed file with 50 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.pinot.controller.helix.core.minion;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.core.common.MinionConstants;
Expand All @@ -33,10 +33,12 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.impl.matchers.GroupMatcher;
import org.testng.annotations.AfterClass;
Expand All @@ -49,6 +51,7 @@
public class PinotTaskManagerStatelessTest extends ControllerTest {
private static final String RAW_TABLE_NAME = "myTable";
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
private static final long TIMEOUT_IN_MS = 10_000L;

@BeforeClass
public void setUp()
Expand Down Expand Up @@ -87,46 +90,44 @@ public void testPinotTaskManagerSchedulerWithUpdate()
new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
addTableConfig(tableConfig);
Thread.sleep(2000);
List<String> jobGroupNames = scheduler.getJobGroupNames();
assertEquals(jobGroupNames, Lists.newArrayList(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask only");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *");

// 2. Update table to new schedule
tableConfig.setTaskConfig(new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */20 * ? * * *"))));
updateTableConfig(tableConfig);
Thread.sleep(2000);
jobGroupNames = scheduler.getJobGroupNames();
assertEquals(jobGroupNames, Lists.newArrayList(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask only");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */20 * ? * * *");

// 3. Update table to new task and schedule
tableConfig.setTaskConfig(new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */30 * ? * * *"),
"MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"))));
updateTableConfig(tableConfig);
Thread.sleep(2000);
jobGroupNames = scheduler.getJobGroupNames();
assertEquals(jobGroupNames.size(), 2);
assertTrue(jobGroupNames.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
assertTrue(jobGroupNames.contains(MinionConstants.MergeRollupTask.TASK_TYPE));
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn
.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */30 * ? * * *");
validateJob(MinionConstants.MergeRollupTask.TASK_TYPE, "0 */10 * ? * * *");

// 4. Remove one task from the table
tableConfig.setTaskConfig(
new TableTaskConfig(ImmutableMap.of("MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"))));
updateTableConfig(tableConfig);
Thread.sleep(2000);
jobGroupNames = scheduler.getJobGroupNames();
assertEquals(jobGroupNames, Lists.newArrayList(MinionConstants.MergeRollupTask.TASK_TYPE));
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
"JobGroupNames should have MergeRollupTask only");
validateJob(MinionConstants.MergeRollupTask.TASK_TYPE, "0 */10 * ? * * *");

// 4. Drop table
dropOfflineTable(RAW_TABLE_NAME);
jobGroupNames = scheduler.getJobGroupNames();
assertTrue(jobGroupNames.isEmpty());
waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");

stopFakeInstances();
stopController();
Expand Down Expand Up @@ -154,9 +155,9 @@ public void testPinotTaskManagerSchedulerWithRestart()
new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
addTableConfig(tableConfig);
Thread.sleep(2000);
List<String> jobGroupNames = scheduler.getJobGroupNames();
assertEquals(jobGroupNames, Lists.newArrayList(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask only");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *");

// Restart controller
Expand All @@ -168,7 +169,6 @@ public void testPinotTaskManagerSchedulerWithRestart()
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"),
"MergeRollupTask", ImmutableMap.of("schedule", "0 */20 * ? * * *"))));
updateTableConfig(tableConfig);
Thread.sleep(2000);

// Task is put into table config.
TableConfig tableConfigAfterRestart =
Expand All @@ -179,21 +179,31 @@ public void testPinotTaskManagerSchedulerWithRestart()

// The new MergeRollup task wouldn't be scheduled if not eagerly checking table configs
// after setting up subscriber on ChildChanges zk event when controller gets restarted.
taskManager = _controllerStarter.getTaskManager();
scheduler = taskManager.getScheduler();
jobGroupNames = scheduler.getJobGroupNames();
assertEquals(jobGroupNames.size(), 2);
assertTrue(jobGroupNames.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
assertTrue(jobGroupNames.contains(MinionConstants.MergeRollupTask.TASK_TYPE));
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn
.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask");

dropOfflineTable(RAW_TABLE_NAME);
jobGroupNames = scheduler.getJobGroupNames();
assertTrue(jobGroupNames.isEmpty());
waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");

stopFakeInstances();
stopController();
}

private void waitForJobGroupNames(PinotTaskManager taskManager, Predicate<List<String>> predicate,
String errorMessage) {
TestUtils.waitForCondition(aVoid -> {
try {
Scheduler scheduler = taskManager.getScheduler();
List<String> jobGroupNames = scheduler.getJobGroupNames();
return predicate.test(jobGroupNames);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}, TIMEOUT_IN_MS, errorMessage);
}

private void validateJob(String taskType, String cronExpression)
throws Exception {
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
Expand All @@ -208,8 +218,18 @@ private void validateJob(String taskType, String cronExpression)
assertEquals(jobDetail.getKey().getGroup(), taskType);
assertSame(jobDetail.getJobDataMap().get("PinotTaskManager"), taskManager);
assertSame(jobDetail.getJobDataMap().get("LeadControllerManager"), _controllerStarter.getLeadControllerManager());
// jobDetail and jobTrigger are not added atomically by the scheduler,
// the jobDetail is added to an internal map firstly, and jobTrigger
// is added to another internal map afterwards, so we check for the existence
// of jobTrigger with some waits to be more defensive.
TestUtils.waitForCondition(aVoid -> {
try {
return scheduler.getTriggersOfJob(jobKey).size() == 1;
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}, TIMEOUT_IN_MS, "JobDetail exiting but missing JobTrigger");
List<? extends Trigger> triggersOfJob = scheduler.getTriggersOfJob(jobKey);
assertEquals(triggersOfJob.size(), 1);
Trigger trigger = triggersOfJob.iterator().next();
assertTrue(trigger instanceof CronTrigger);
assertEquals(((CronTrigger) trigger).getCronExpression(), cronExpression);
Expand Down

0 comments on commit a47af49

Please sign in to comment.