diff --git a/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java b/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java index 6e66bd9d..8cb52816 100644 --- a/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java @@ -26,7 +26,6 @@ public ApplicationStatusHandler(ApplicationStorage applicationStorage, Backend b this.logService = logService; } - public void processApplicationStarting(Application application) { applicationStorage.saveApplication(ApplicationBuilder.builder(application) .setState(ApplicationState.STARTING) diff --git a/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java b/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java index fb0699f7..c7c17d5f 100644 --- a/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java @@ -7,12 +7,14 @@ import com.exacaster.lighter.application.ApplicationState; import com.exacaster.lighter.application.ApplicationStatusHandler; import com.exacaster.lighter.backend.Backend; +import com.exacaster.lighter.concurrency.Waitable; import com.exacaster.lighter.configuration.AppConfiguration; import com.exacaster.lighter.spark.SparkApp; import io.micronaut.scheduling.annotation.Scheduled; import jakarta.inject.Singleton; import java.util.function.Consumer; +import java.util.stream.Collectors; import net.javacrumbs.shedlock.micronaut.SchedulerLock; import org.slf4j.Logger; @@ -20,6 +22,7 @@ public class BatchHandler { private static final Logger LOG = getLogger(BatchHandler.class); + private static final int MAX_SLOTS_PER_ITERATION = 10; private final Backend backend; private final BatchService batchService; @@ -34,23 +37,30 @@ public BatchHandler(Backend backend, BatchService batchService, AppConfiguration this.statusTracker = statusTracker; } - public void launch(Application application, Consumer errorHandler) { + public Waitable launch(Application application, Consumer errorHandler) { var app = new SparkApp(application.getSubmitParams(), errorHandler); - app.launch(backend.getSubmitConfiguration(application)); + return app.launch(backend.getSubmitConfiguration(application)); } @SchedulerLock(name = "processScheduledBatches") @Scheduled(fixedRate = "1m") - public void processScheduledBatches() { + public void processScheduledBatches() throws InterruptedException { assertLocked(); var emptySlots = countEmptySlots(); - LOG.info("Processing scheduled batches, found empty slots: {}", emptySlots); - batchService.fetchByState(ApplicationState.NOT_STARTED, emptySlots) - .forEach(batch -> { + var slotsToTake = Math.min(MAX_SLOTS_PER_ITERATION, emptySlots); + LOG.info("Processing scheduled batches, found empty slots: {}, using {}", emptySlots, slotsToTake); + var waitables = batchService.fetchByState(ApplicationState.NOT_STARTED, slotsToTake) + .stream() + .map(batch -> { LOG.info("Launching {}", batch); statusTracker.processApplicationStarting(batch); - launch(batch, error -> statusTracker.processApplicationError(batch, error)); - }); + return launch(batch, error -> statusTracker.processApplicationError(batch, error)); + }) + .collect(Collectors.toList()); + LOG.info("Waiting launches to complete"); + for (var waitable : waitables) { + waitable.waitCompletion(); + } } private Integer countEmptySlots() { @@ -59,7 +69,7 @@ private Integer countEmptySlots() { @SchedulerLock(name = "trackRunning") @Scheduled(fixedRate = "1m") - public void trackRunning() { + public void trackRunning() throws InterruptedException { assertLocked(); var completedCount = batchService.fetchRunning().stream() .map(statusTracker::processApplicationRunning) diff --git a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java index 15e8bf94..28378363 100644 --- a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java @@ -10,6 +10,7 @@ import com.exacaster.lighter.application.ApplicationStatusHandler; import com.exacaster.lighter.application.sessions.processors.StatementHandler; import com.exacaster.lighter.backend.Backend; +import com.exacaster.lighter.concurrency.Waitable; import com.exacaster.lighter.configuration.AppConfiguration; import com.exacaster.lighter.configuration.AppConfiguration.SessionConfiguration; import com.exacaster.lighter.spark.SparkApp; @@ -45,9 +46,9 @@ public SessionHandler(SessionService sessionService, this.sessionConfiguration = appConfiguration.getSessionConfiguration(); } - public void launch(Application application, Consumer errorHandler) { + public Waitable launch(Application application, Consumer errorHandler) { var app = new SparkApp(application.getSubmitParams(), errorHandler); - app.launch(backend.getSubmitConfiguration(application)); + return app.launch(backend.getSubmitConfiguration(application)); } @SchedulerLock(name = "keepPermanentSession") @@ -66,15 +67,21 @@ public void keepPermanentSessions() { @SchedulerLock(name = "processScheduledSessions") @Scheduled(fixedRate = "1m") - public void processScheduledSessions() { + public void processScheduledSessions() throws InterruptedException { assertLocked(); - sessionService.fetchByState(ApplicationState.NOT_STARTED, 10).forEach(this::launchSession); + var waitables = sessionService.fetchByState(ApplicationState.NOT_STARTED, 10).stream() + .map(this::launchSession) + .collect(Collectors.toList()); + + for (var waitable : waitables) { + waitable.waitCompletion(); + } } - private void launchSession(Application session) { + private Waitable launchSession(Application session) { LOG.info("Launching {}", session); statusTracker.processApplicationStarting(session); - launch(session, error -> statusTracker.processApplicationError(session, error)); + return launch(session, error -> statusTracker.processApplicationError(session, error)); } @SchedulerLock(name = "trackRunningSessions") diff --git a/server/src/main/java/com/exacaster/lighter/concurrency/EmptyWaitable.java b/server/src/main/java/com/exacaster/lighter/concurrency/EmptyWaitable.java new file mode 100644 index 00000000..ac4f9234 --- /dev/null +++ b/server/src/main/java/com/exacaster/lighter/concurrency/EmptyWaitable.java @@ -0,0 +1,10 @@ +package com.exacaster.lighter.concurrency; + +public enum EmptyWaitable implements Waitable { + INSTANCE { + @Override + public void waitCompletion() throws InterruptedException { + // nothing to wait + } + } +} diff --git a/server/src/main/java/com/exacaster/lighter/concurrency/Waitable.java b/server/src/main/java/com/exacaster/lighter/concurrency/Waitable.java new file mode 100644 index 00000000..e6aa35a5 --- /dev/null +++ b/server/src/main/java/com/exacaster/lighter/concurrency/Waitable.java @@ -0,0 +1,6 @@ +package com.exacaster.lighter.concurrency; + +public interface Waitable { + + void waitCompletion() throws InterruptedException; +} diff --git a/server/src/main/java/com/exacaster/lighter/spark/SparkApp.java b/server/src/main/java/com/exacaster/lighter/spark/SparkApp.java index d6783c54..c32d151d 100644 --- a/server/src/main/java/com/exacaster/lighter/spark/SparkApp.java +++ b/server/src/main/java/com/exacaster/lighter/spark/SparkApp.java @@ -3,20 +3,15 @@ import static org.apache.spark.launcher.SparkLauncher.DRIVER_MEMORY; import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_CORES; import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_MEMORY; -import static org.slf4j.LoggerFactory.getLogger; +import com.exacaster.lighter.concurrency.EmptyWaitable; +import com.exacaster.lighter.concurrency.Waitable; import java.io.IOException; import java.util.Map; import java.util.function.Consumer; -import org.apache.spark.launcher.SparkAppHandle; -import org.apache.spark.launcher.SparkAppHandle.Listener; import org.apache.spark.launcher.SparkLauncher; -import org.slf4j.Logger; public class SparkApp { - - private static final Logger LOG = getLogger(SparkApp.class); - private final SubmitParams submitParams; private final Consumer errorHandler; @@ -25,7 +20,7 @@ public SparkApp(SubmitParams submitParams, Consumer errorHandler) { this.errorHandler = errorHandler; } - public void launch(Map extraConfiguration) { + public Waitable launch(Map extraConfiguration) { try { var launcher = new SparkLauncher() .setAppName(submitParams.getName()) @@ -48,22 +43,14 @@ public void launch(Map extraConfiguration) { .setConf(EXECUTOR_CORES, String.valueOf(submitParams.getExecutorCores())) .setConf(EXECUTOR_MEMORY, submitParams.getExecutorMemory()) .setConf("spark.executor.instances", String.valueOf(submitParams.getNumExecutors())); - launcher.startApplication(new Listener() { - @Override - public void stateChanged(SparkAppHandle handle) { - LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), handle.getState()); - handle.getError().ifPresent(errorHandler); - } - - @Override - public void infoChanged(SparkAppHandle handle) { - // TODO: ? - } - }); - + var listener = new SparkListener(errorHandler); + launcher.startApplication(listener); + return listener; } catch (IOException | IllegalArgumentException e) { this.errorHandler.accept(e); } + + return EmptyWaitable.INSTANCE; } } diff --git a/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java b/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java new file mode 100644 index 00000000..637a14ff --- /dev/null +++ b/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java @@ -0,0 +1,42 @@ +package com.exacaster.lighter.spark; + +import static org.slf4j.LoggerFactory.getLogger; + +import com.exacaster.lighter.concurrency.Waitable; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import org.apache.spark.launcher.SparkAppHandle; +import org.apache.spark.launcher.SparkAppHandle.Listener; +import org.slf4j.Logger; + +public class SparkListener implements Listener, Waitable { + + private static final Logger LOG = getLogger(SparkListener.class); + private final Consumer errorHandler; + private final CountDownLatch latch; + + public SparkListener(Consumer errorHandler) { + this.errorHandler = errorHandler; + this.latch = new CountDownLatch(1); + } + + @Override + public void stateChanged(SparkAppHandle handle) { + LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), handle.getState()); + handle.getError().ifPresent(errorHandler); + if (handle.getState() != null && handle.getState().isFinal()) { + handle.disconnect(); + latch.countDown(); + } + } + + @Override + public void infoChanged(SparkAppHandle handle) { + // TODO: ? + } + + @Override + public void waitCompletion() throws InterruptedException { + latch.await(); + } +} diff --git a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy index 7578f63c..0f95e8a1 100644 --- a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy @@ -3,6 +3,7 @@ package com.exacaster.lighter.application.batch import com.exacaster.lighter.application.ApplicationState import com.exacaster.lighter.application.ApplicationStatusHandler import com.exacaster.lighter.backend.Backend +import com.exacaster.lighter.concurrency.EmptyWaitable import com.exacaster.lighter.configuration.AppConfiguration import net.javacrumbs.shedlock.core.LockAssert import spock.lang.Specification @@ -37,7 +38,7 @@ class BatchHandlerTest extends Specification { then: _ * service.fetchRunning() >> [] 1 * service.fetchByState(ApplicationState.NOT_STARTED, _) >> [app] - 1 * handler.launch(app, _) >> { } + 1 * handler.launch(app, _) >> EmptyWaitable.INSTANCE } def "does not trigger when there are no empty slots"() {