diff --git a/docs/configuration.md b/docs/configuration.md index d4621096..526a9bdd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -7,6 +7,7 @@ Lighter can be configured by using environment variables. Currently, Lighter sup | Property | Description | Default | |----------------------------------------|----------------------------------------------------------------|---------------------------------| | LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 | +| LIGHTER_MAX_STARTING_JOBS | Max starting Batch jobs in parallel | 5 | | LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on frontend | http://localhost/spark-history/ | | LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs | | | LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 | diff --git a/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java b/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java index b70e10b7..5308cef8 100644 --- a/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java @@ -12,8 +12,10 @@ import com.exacaster.lighter.storage.SortOrder; import io.micronaut.scheduling.annotation.Scheduled; import jakarta.inject.Singleton; + import java.util.function.Consumer; import java.util.stream.Collectors; + import net.javacrumbs.shedlock.micronaut.SchedulerLock; import org.slf4j.Logger; @@ -21,7 +23,6 @@ public class BatchHandler { private static final Logger LOG = getLogger(BatchHandler.class); - private static final int MAX_SLOTS_PER_ITERATION = 10; private final Backend backend; private final BatchService batchService; @@ -29,7 +30,7 @@ public class BatchHandler { private final ApplicationStatusHandler statusTracker; public BatchHandler(Backend backend, BatchService batchService, AppConfiguration appConfiguration, - ApplicationStatusHandler statusTracker) { + ApplicationStatusHandler statusTracker) { this.backend = backend; this.batchService = batchService; this.appConfiguration = appConfiguration; @@ -45,10 +46,8 @@ public Waitable launch(Application application, Consumer errorHandler @Scheduled(fixedDelay = "30s") public void processScheduledBatches() throws InterruptedException { assertLocked(); - var emptySlots = countEmptySlots(); - var slotsToTake = Math.min(MAX_SLOTS_PER_ITERATION, emptySlots); - LOG.info("Processing scheduled batches, found empty slots: {}, using {}", emptySlots, slotsToTake); - var waitables = batchService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, slotsToTake) + var maxSlotsForNewJobs = getMaxSlotsForNewJobs(); + var batchesToStart = batchService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, maxSlotsForNewJobs) .stream() .map(batch -> { LOG.info("Launching {}", batch); @@ -56,14 +55,27 @@ public void processScheduledBatches() throws InterruptedException { return launch(batch, error -> statusTracker.processApplicationError(batch, error)); }) .collect(Collectors.toList()); - LOG.info("Waiting launches to complete"); - for (var waitable : waitables) { - waitable.waitCompletion(); + LOG.info("Triggered {} new batch jobs. Waiting launches to complete.", batchesToStart.size()); + for (var batchToStart : batchesToStart) { + batchToStart.waitCompletion(); } } - private Integer countEmptySlots() { - return Math.max(this.appConfiguration.getMaxRunningJobs() - this.batchService.fetchRunning().size(), 0); + private Integer getMaxSlotsForNewJobs() { + var numberOfJobsRunning = this.batchService.fetchRunning().size(); + + var maxAvailableSlots = Math.max(this.appConfiguration.getMaxRunningJobs() - numberOfJobsRunning, 0); + var maxSlotsForNewJobs = Math.min( + this.appConfiguration.getMaxStartingJobs(), + maxAvailableSlots + ); + + LOG.info("Processing scheduled batches. Running jobs: {}/{}. {} slots can be used for new jobs.", + numberOfJobsRunning, + this.appConfiguration.getMaxRunningJobs(), + maxSlotsForNewJobs); + + return maxSlotsForNewJobs; } @SchedulerLock(name = "trackRunning") diff --git a/server/src/main/java/com/exacaster/lighter/configuration/AppConfiguration.java b/server/src/main/java/com/exacaster/lighter/configuration/AppConfiguration.java index bb40d15d..b16349b5 100644 --- a/server/src/main/java/com/exacaster/lighter/configuration/AppConfiguration.java +++ b/server/src/main/java/com/exacaster/lighter/configuration/AppConfiguration.java @@ -24,6 +24,8 @@ public class AppConfiguration { @JsonProperty(access = Access.WRITE_ONLY) private final Integer maxRunningJobs; + @JsonProperty(access = Access.WRITE_ONLY) + private final Integer maxStartingJobs; private final String sparkHistoryServerUrl; private final String externalLogsUrlTemplate; @JsonProperty(access = Access.WRITE_ONLY) @@ -36,6 +38,7 @@ public class AppConfiguration { @ConfigurationInject public AppConfiguration(Integer maxRunningJobs, + Integer maxStartingJobs, @Nullable String sparkHistoryServerUrl, @Nullable String externalLogsUrlTemplate, Integer pyGatewayPort, @@ -45,6 +48,7 @@ public AppConfiguration(Integer maxRunningJobs, @Nullable Map batchDefaultConf, @Nullable Map sessionDefaultConf) { this.maxRunningJobs = maxRunningJobs; + this.maxStartingJobs = maxStartingJobs; this.sparkHistoryServerUrl = sparkHistoryServerUrl; this.externalLogsUrlTemplate = externalLogsUrlTemplate; this.pyGatewayPort = pyGatewayPort; @@ -58,6 +62,10 @@ public Integer getMaxRunningJobs() { return maxRunningJobs; } + public Integer getMaxStartingJobs() { + return maxStartingJobs; + } + public String getSparkHistoryServerUrl() { return sparkHistoryServerUrl; } @@ -90,6 +98,7 @@ public Map getSessionDefaultConf() { public String toString() { return new StringJoiner(", ", AppConfiguration.class.getSimpleName() + "[", "]") .add("maxRunningJobs=" + maxRunningJobs) + .add("maxStartingJobs=" + maxStartingJobs) .add("sparkHistoryServerUrl=" + sparkHistoryServerUrl) .add("sessionConfiguration=" + sessionConfiguration) .add("batchDefaultConf=" + batchDefaultConf) diff --git a/server/src/main/resources/application.yml b/server/src/main/resources/application.yml index 00f0988c..7a175171 100644 --- a/server/src/main/resources/application.yml +++ b/server/src/main/resources/application.yml @@ -1,5 +1,6 @@ lighter: max-running-jobs: 5 + max-starting-jobs: 5 spark-history-server-url: http://localhost/spark-history/ frontend-path: file:${FRONTEND_PATH:../frontend/build} py-gateway-port: 25333 diff --git a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy index 4cd7002a..7c7872d7 100644 --- a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy @@ -1,5 +1,6 @@ package com.exacaster.lighter.application.batch +import com.exacaster.lighter.Application import com.exacaster.lighter.application.ApplicationState import com.exacaster.lighter.application.ApplicationStatusHandler import com.exacaster.lighter.backend.Backend @@ -51,8 +52,35 @@ class BatchHandlerTest extends Specification { then: _ * service.fetchRunning() >> [app] - _ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, config.getMaxRunningJobs() - 1) >> [] - 0 * handler.launch(app, _) >> { } + _ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, _) >> [] + } + + def "validate max running jobs limit"() { + given: + def app = newApplication() + def runningApps = [app, app] + + when: + handler.processScheduledBatches() + + then: + _ * service.fetchRunning() >> runningApps + _ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, config.getMaxRunningJobs() - runningApps.size()) >> [app] + (config.getMaxRunningJobs() - runningApps.size()) * handler.launch(app, _) >> EmptyWaitable.INSTANCE + } + + def "validate max starting jobs limit"() { + given: + def app = newApplication() + def appsToRun = [app, app, app] + + when: + handler.processScheduledBatches() + + then: + _ * service.fetchRunning() >> [] + _ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, config.getMaxStartingJobs()) >> appsToRun + (config.getMaxStartingJobs()) * handler.launch(app, _) >> EmptyWaitable.INSTANCE } def "tracks running jobs"() { diff --git a/server/src/test/groovy/com/exacaster/lighter/test/Factories.groovy b/server/src/test/groovy/com/exacaster/lighter/test/Factories.groovy index c1a4416c..d237ece6 100644 --- a/server/src/test/groovy/com/exacaster/lighter/test/Factories.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/test/Factories.groovy @@ -56,7 +56,8 @@ class Factories { static appConfiguration() { new AppConfiguration( - 10, + 3, + 2, "http://history", null, 5432,