diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java index a5e061f85a993..b39930c1cdb90 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java @@ -25,6 +25,7 @@ package io.airbyte.scheduler.app; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; import io.airbyte.commons.concurrency.LifecycledCallable; import io.airbyte.commons.enums.Enums; import io.airbyte.config.helpers.LogClientSingleton; @@ -36,7 +37,9 @@ import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; import java.nio.file.Path; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -50,6 +53,9 @@ public class JobSubmitter implements Runnable { private final TemporalWorkerRunFactory temporalWorkerRunFactory; private final JobTracker jobTracker; + // See attemptJobSubmit() to understand the need for this Concurrent Set. + private final Set runningJobs = Sets.newConcurrentHashSet(); + public JobSubmitter(final ExecutorService threadPool, final JobPersistence persistence, final TemporalWorkerRunFactory temporalWorkerRunFactory, @@ -67,11 +73,7 @@ public void run() { final Optional nextJob = persistence.getNextJob(); - nextJob.ifPresent(job -> { - trackSubmission(job); - submitJob(job); - LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope()); - }); + nextJob.ifPresent(attemptJobSubmit()); LOGGER.debug("Completed Job-Submitter..."); } catch (Throwable e) { @@ -79,6 +81,34 @@ public void run() { } } + /** + * Since job submission and job execution happen in two separate thread pools, and job execution is + * what removes a job from the submission queue, it is possible for a job to be submitted multiple + * times. + * + * This synchronised block guarantees only a single thread can utilise the concurrent set to decide + * whether a job should be submitted. This job id is added here, and removed in the finish block of + * {@link #submitJob(Job)}. + * + * Since {@link JobPersistence#getNextJob()} returns the next queued job, this solution cause + * head-of-line blocking as the JobSubmitter tries to submit the same job. However, this suggests + * the Worker Pool needs more workers and is inevitable when dealing with pending jobs. + * + * See https://github.com/airbytehq/airbyte/issues/4378 for more info. + */ + synchronized private Consumer attemptJobSubmit() { + return job -> { + if (!runningJobs.contains(job.getId())) { + runningJobs.add(job.getId()); + trackSubmission(job); + submitJob(job); + LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope()); + } else { + LOGGER.info("Attempting to submit already running job {}. There are probably too many queued jobs.", job.getId()); + } + }; + } + @VisibleForTesting void submitJob(Job job) { final WorkerRun workerRun = temporalWorkerRunFactory.create(job); @@ -94,7 +124,6 @@ void submitJob(Job job) { final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME); final long persistedAttemptId = persistence.createAttempt(job.getId(), logFilePath); assertSameIds(attemptNumber, persistedAttemptId); - LogClientSingleton.setJobMdc(workerRun.getJobRoot()); }) .setOnSuccess(output -> { @@ -114,7 +143,10 @@ void submitJob(Job job) { persistence.failAttempt(job.getId(), attemptNumber); trackCompletion(job, io.airbyte.workers.JobStatus.FAILED); }) - .setOnFinish(MDC::clear) + .setOnFinish(() -> { + runningJobs.remove(job.getId()); + MDC.clear(); + }) .build()); } diff --git a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobSubmitterTest.java b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobSubmitterTest.java index 8f0d6526b9792..43738d6be7485 100644 --- a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobSubmitterTest.java +++ b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobSubmitterTest.java @@ -58,10 +58,14 @@ import java.nio.file.Path; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.mockito.InOrder; +import org.mockito.Mockito; import org.slf4j.MDC; public class JobSubmitterTest { @@ -226,4 +230,74 @@ void testMDC() throws Exception { assertTrue(MDC.getCopyOfContextMap().isEmpty()); } + @Nested + class OnlyOneJobIdRunning { + + /** + * See {@link JobSubmitter#attemptJobSubmit()} to understand why we need to test that only one job + * id can be successfully submited at once. + */ + @Test + public void testOnlyOneJobCanBeSubmittedAtOnce() throws Exception { + var jobDone = new AtomicReference<>(false); + when(workerRun.call()).thenAnswer((a) -> { + Thread.sleep(5000); + jobDone.set(true); + return SUCCESS_OUTPUT; + }); + + // Simulate the same job being submitted over and over again. + var simulatedJobSubmitterPool = Executors.newFixedThreadPool(10); + var submitCounter = new AtomicInteger(0); + while (!jobDone.get()) { + // This sleep mimics our SchedulerApp loop. + Thread.sleep(1000); + simulatedJobSubmitterPool.submit(() -> { + if (!jobDone.get()) { + jobSubmitter.run(); + submitCounter.incrementAndGet(); + } + }); + } + + simulatedJobSubmitterPool.shutdownNow(); + verify(persistence, Mockito.times(submitCounter.get())).getNextJob(); + // Assert that the job is actually only submitted once. + verify(jobSubmitter, Mockito.times(1)).submitJob(Mockito.any()); + } + + @Test + public void testSuccessShouldUnlockId() throws Exception { + when(workerRun.call()).thenReturn(SUCCESS_OUTPUT); + + jobSubmitter.run(); + + // This sleep mimics our SchedulerApp loop. + Thread.sleep(1000); + + // If the id was not removed, the second call would not trigger submitJob(). + jobSubmitter.run(); + + verify(persistence, Mockito.times(2)).getNextJob(); + verify(jobSubmitter, Mockito.times(2)).submitJob(Mockito.any()); + } + + @Test + public void testFailureShouldUnlockId() throws Exception { + when(workerRun.call()).thenThrow(new RuntimeException()); + + jobSubmitter.run(); + + // This sleep mimics our SchedulerApp loop. + Thread.sleep(1000); + + // If the id was not removed, the second call would not trigger submitJob(). + jobSubmitter.run(); + + verify(persistence, Mockito.times(2)).getNextJob(); + verify(jobSubmitter, Mockito.times(2)).submitJob(Mockito.any()); + } + + } + }