Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler race condition. #4691

Merged
merged 4 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.scheduler.app;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.airbyte.commons.concurrency.LifecycledCallable;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.helpers.LogClientSingleton;
Expand All @@ -36,7 +37,9 @@
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -50,6 +53,9 @@ public class JobSubmitter implements Runnable {
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
private final JobTracker jobTracker;

// See attemptJobSubmit() to understand the need for this Concurrent Set.
private final Set<Long> runningJobs = Sets.newConcurrentHashSet();

public JobSubmitter(final ExecutorService threadPool,
final JobPersistence persistence,
final TemporalWorkerRunFactory temporalWorkerRunFactory,
Expand All @@ -67,18 +73,42 @@ public void run() {

final Optional<Job> nextJob = persistence.getNextJob();

nextJob.ifPresent(job -> {
trackSubmission(job);
submitJob(job);
LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope());
});
nextJob.ifPresent(attemptJobSubmit());

LOGGER.debug("Completed Job-Submitter...");
} catch (Throwable e) {
LOGGER.error("Job Submitter Error", e);
}
}

/**
* Since job submission and job execution happen in two separate thread pools, and job execution is
* what removes a job from the submission queue, it is possible for a job to be submitted multiple
* times.
*
* This synchronised block guarantees only a single thread can utilise the concurrent set to decide
* whether a job should be submitted. This job id is added here, and removed in the finish block of
* {@link #submitJob(Job)}.
*
* Since {@link JobPersistence#getNextJob()} returns the next queued job, this solution cause
* head-of-line blocking as the JobSubmitter tries to submit the same job. However, this suggests
* the Worker Pool needs more workers and is inevitable when dealing with pending jobs.
*
* See https://github.com/airbytehq/airbyte/issues/4378 for more info.
*/
synchronized private Consumer<Job> attemptJobSubmit() {
return job -> {
if (!runningJobs.contains(job.getId())) {
runningJobs.add(job.getId());
trackSubmission(job);
submitJob(job);
LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope());
} else {
LOGGER.info("Attempting to submit already running job {}. There are probably too many queued jobs.", job.getId());
}
};
}

@VisibleForTesting
void submitJob(Job job) {
final WorkerRun workerRun = temporalWorkerRunFactory.create(job);
Expand All @@ -94,7 +124,6 @@ void submitJob(Job job) {
final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME);
final long persistedAttemptId = persistence.createAttempt(job.getId(), logFilePath);
assertSameIds(attemptNumber, persistedAttemptId);

LogClientSingleton.setJobMdc(workerRun.getJobRoot());
})
.setOnSuccess(output -> {
Expand All @@ -114,7 +143,10 @@ void submitJob(Job job) {
persistence.failAttempt(job.getId(), attemptNumber);
trackCompletion(job, io.airbyte.workers.JobStatus.FAILED);
})
.setOnFinish(MDC::clear)
.setOnFinish(() -> {
runningJobs.remove(job.getId());
MDC.clear();
})
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.slf4j.MDC;

public class JobSubmitterTest {
Expand Down Expand Up @@ -226,4 +230,68 @@ void testMDC() throws Exception {
assertTrue(MDC.getCopyOfContextMap().isEmpty());
}

@Nested
class OnlyOneJobIdRunning {

@Test
public void testMultipleSubmissionShouldFail() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the name of this one is a bit misleading or maybe add a comment explaining what you're testing. this was a bit hard for me to parse. if i'm understanding it correctly it is testing that while a job is running or queued, the submitter still runs but if it tries to submit the same job again subsequent submissions don't happen. thus getNextJob happens more than once but submitJob should happen exactly once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly that. I will add comments.

var jobDone = new AtomicReference<>(false);
when(workerRun.call()).thenAnswer((a) -> {
Thread.sleep(5000);
jobDone.set(true);
return SUCCESS_OUTPUT;
});

var simulatedJobSubmitterPool = Executors.newFixedThreadPool(10);
var submitCounter = new AtomicInteger(0);
while (!jobDone.get()) {
// This sleep mimics our SchedulerApp loop.
Thread.sleep(1000);
simulatedJobSubmitterPool.submit(() -> {
if (!jobDone.get()) {
jobSubmitter.run();
submitCounter.incrementAndGet();
}
});
}

simulatedJobSubmitterPool.shutdownNow();
verify(persistence, Mockito.times(submitCounter.get())).getNextJob();
verify(jobSubmitter, Mockito.times(1)).submitJob(Mockito.any());
}

@Test
public void testFailureShouldUnlockId() throws Exception {
davinchia marked this conversation as resolved.
Show resolved Hide resolved
when(workerRun.call()).thenReturn(SUCCESS_OUTPUT);

jobSubmitter.run();

// This sleep mimics our SchedulerApp loop.
Thread.sleep(1000);

// If the id was not removed, the second call would not trigger submitJob().
jobSubmitter.run();

verify(persistence, Mockito.times(2)).getNextJob();
verify(jobSubmitter, Mockito.times(2)).submitJob(Mockito.any());
}

@Test
public void testSuccessShouldUnlockId() throws Exception {
when(workerRun.call()).thenThrow(new RuntimeException());

jobSubmitter.run();

// This sleep mimics our SchedulerApp loop.
Thread.sleep(1000);

// If the id was not removed, the second call would not trigger submitJob().
jobSubmitter.run();

verify(persistence, Mockito.times(2)).getNextJob();
verify(jobSubmitter, Mockito.times(2)).submitJob(Mockito.any());
}

}

}