Skip to content

Commit

Permalink
Fix scheduler race condition. (#4691)
Browse files Browse the repository at this point in the history
  • Loading branch information
davinchia authored Jul 13, 2021
1 parent f859ad7 commit 427c46f
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.scheduler.app;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.airbyte.commons.concurrency.LifecycledCallable;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.helpers.LogClientSingleton;
Expand All @@ -36,7 +37,9 @@
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -50,6 +53,9 @@ public class JobSubmitter implements Runnable {
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
private final JobTracker jobTracker;

// See attemptJobSubmit() to understand the need for this Concurrent Set.
private final Set<Long> runningJobs = Sets.newConcurrentHashSet();

public JobSubmitter(final ExecutorService threadPool,
final JobPersistence persistence,
final TemporalWorkerRunFactory temporalWorkerRunFactory,
Expand All @@ -67,18 +73,42 @@ public void run() {

final Optional<Job> nextJob = persistence.getNextJob();

nextJob.ifPresent(job -> {
trackSubmission(job);
submitJob(job);
LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope());
});
nextJob.ifPresent(attemptJobSubmit());

LOGGER.debug("Completed Job-Submitter...");
} catch (Throwable e) {
LOGGER.error("Job Submitter Error", e);
}
}

/**
* Since job submission and job execution happen in two separate thread pools, and job execution is
* what removes a job from the submission queue, it is possible for a job to be submitted multiple
* times.
*
* This synchronised block guarantees only a single thread can utilise the concurrent set to decide
* whether a job should be submitted. This job id is added here, and removed in the finish block of
* {@link #submitJob(Job)}.
*
* Since {@link JobPersistence#getNextJob()} returns the next queued job, this solution cause
* head-of-line blocking as the JobSubmitter tries to submit the same job. However, this suggests
* the Worker Pool needs more workers and is inevitable when dealing with pending jobs.
*
* See https://github.com/airbytehq/airbyte/issues/4378 for more info.
*/
synchronized private Consumer<Job> attemptJobSubmit() {
return job -> {
if (!runningJobs.contains(job.getId())) {
runningJobs.add(job.getId());
trackSubmission(job);
submitJob(job);
LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope());
} else {
LOGGER.info("Attempting to submit already running job {}. There are probably too many queued jobs.", job.getId());
}
};
}

@VisibleForTesting
void submitJob(Job job) {
final WorkerRun workerRun = temporalWorkerRunFactory.create(job);
Expand All @@ -94,7 +124,6 @@ void submitJob(Job job) {
final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME);
final long persistedAttemptId = persistence.createAttempt(job.getId(), logFilePath);
assertSameIds(attemptNumber, persistedAttemptId);

LogClientSingleton.setJobMdc(workerRun.getJobRoot());
})
.setOnSuccess(output -> {
Expand All @@ -114,7 +143,10 @@ void submitJob(Job job) {
persistence.failAttempt(job.getId(), attemptNumber);
trackCompletion(job, io.airbyte.workers.JobStatus.FAILED);
})
.setOnFinish(MDC::clear)
.setOnFinish(() -> {
runningJobs.remove(job.getId());
MDC.clear();
})
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.slf4j.MDC;

public class JobSubmitterTest {
Expand Down Expand Up @@ -226,4 +230,74 @@ void testMDC() throws Exception {
assertTrue(MDC.getCopyOfContextMap().isEmpty());
}

@Nested
class OnlyOneJobIdRunning {

/**
* See {@link JobSubmitter#attemptJobSubmit()} to understand why we need to test that only one job
* id can be successfully submited at once.
*/
@Test
public void testOnlyOneJobCanBeSubmittedAtOnce() throws Exception {
var jobDone = new AtomicReference<>(false);
when(workerRun.call()).thenAnswer((a) -> {
Thread.sleep(5000);
jobDone.set(true);
return SUCCESS_OUTPUT;
});

// Simulate the same job being submitted over and over again.
var simulatedJobSubmitterPool = Executors.newFixedThreadPool(10);
var submitCounter = new AtomicInteger(0);
while (!jobDone.get()) {
// This sleep mimics our SchedulerApp loop.
Thread.sleep(1000);
simulatedJobSubmitterPool.submit(() -> {
if (!jobDone.get()) {
jobSubmitter.run();
submitCounter.incrementAndGet();
}
});
}

simulatedJobSubmitterPool.shutdownNow();
verify(persistence, Mockito.times(submitCounter.get())).getNextJob();
// Assert that the job is actually only submitted once.
verify(jobSubmitter, Mockito.times(1)).submitJob(Mockito.any());
}

@Test
public void testSuccessShouldUnlockId() throws Exception {
when(workerRun.call()).thenReturn(SUCCESS_OUTPUT);

jobSubmitter.run();

// This sleep mimics our SchedulerApp loop.
Thread.sleep(1000);

// If the id was not removed, the second call would not trigger submitJob().
jobSubmitter.run();

verify(persistence, Mockito.times(2)).getNextJob();
verify(jobSubmitter, Mockito.times(2)).submitJob(Mockito.any());
}

@Test
public void testFailureShouldUnlockId() throws Exception {
when(workerRun.call()).thenThrow(new RuntimeException());

jobSubmitter.run();

// This sleep mimics our SchedulerApp loop.
Thread.sleep(1000);

// If the id was not removed, the second call would not trigger submitJob().
jobSubmitter.run();

verify(persistence, Mockito.times(2)).getNextJob();
verify(jobSubmitter, Mockito.times(2)).submitJob(Mockito.any());
}

}

}

0 comments on commit 427c46f

Please sign in to comment.