Skip to content

Commit

Permalink
Add max starting jobs configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Minutis committed Feb 23, 2023
1 parent 7f32899 commit c1a8908
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Lighter can be configured by using environment variables. Currently, Lighter sup
| Property | Description | Default |
|----------------------------------------|----------------------------------------------------------------|---------------------------------|
| LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 |
| LIGHTER_MAX_STARTING_JOBS | Max starting Batch jobs in parallel | 5 |
| LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on frontend | http://localhost/spark-history/ |
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,25 @@
import com.exacaster.lighter.storage.SortOrder;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Singleton;

import java.util.function.Consumer;
import java.util.stream.Collectors;

import net.javacrumbs.shedlock.micronaut.SchedulerLock;
import org.slf4j.Logger;

@Singleton
public class BatchHandler {

private static final Logger LOG = getLogger(BatchHandler.class);
private static final int MAX_SLOTS_PER_ITERATION = 10;

private final Backend backend;
private final BatchService batchService;
private final AppConfiguration appConfiguration;
private final ApplicationStatusHandler statusTracker;

public BatchHandler(Backend backend, BatchService batchService, AppConfiguration appConfiguration,
ApplicationStatusHandler statusTracker) {
ApplicationStatusHandler statusTracker) {
this.backend = backend;
this.batchService = batchService;
this.appConfiguration = appConfiguration;
Expand All @@ -45,25 +46,36 @@ public Waitable launch(Application application, Consumer<Throwable> errorHandler
@Scheduled(fixedDelay = "30s")
public void processScheduledBatches() throws InterruptedException {
assertLocked();
var emptySlots = countEmptySlots();
var slotsToTake = Math.min(MAX_SLOTS_PER_ITERATION, emptySlots);
LOG.info("Processing scheduled batches, found empty slots: {}, using {}", emptySlots, slotsToTake);
var waitables = batchService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, slotsToTake)
var maxSlotsForNewJobs = getMaxSlotsForNewJobs();
var batchesToStart = batchService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, maxSlotsForNewJobs)
.stream()
.map(batch -> {
LOG.info("Launching {}", batch);
statusTracker.processApplicationStarting(batch);
return launch(batch, error -> statusTracker.processApplicationError(batch, error));
})
.collect(Collectors.toList());
LOG.info("Waiting launches to complete");
for (var waitable : waitables) {
waitable.waitCompletion();
LOG.info("Triggered {} new batch jobs. Waiting launches to complete.", batchesToStart.size());
for (var batchToStart : batchesToStart) {
batchToStart.waitCompletion();
}
}

private Integer countEmptySlots() {
return Math.max(this.appConfiguration.getMaxRunningJobs() - this.batchService.fetchRunning().size(), 0);
private Integer getMaxSlotsForNewJobs() {
var numberOfJobsRunning = this.batchService.fetchRunning().size();

var maxAvailableSlots = Math.max(this.appConfiguration.getMaxRunningJobs() - numberOfJobsRunning, 0);
var maxSlotsForNewJobs = Math.min(
this.appConfiguration.getMaxStartingJobs(),
maxAvailableSlots
);

LOG.info("Processing scheduled batches. Running jobs: {}/{}. {} slots can be used for new jobs.",
numberOfJobsRunning,
this.appConfiguration.getMaxRunningJobs(),
maxSlotsForNewJobs);

return maxSlotsForNewJobs;
}

@SchedulerLock(name = "trackRunning")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class AppConfiguration {

@JsonProperty(access = Access.WRITE_ONLY)
private final Integer maxRunningJobs;
@JsonProperty(access = Access.WRITE_ONLY)
private final Integer maxStartingJobs;
private final String sparkHistoryServerUrl;
private final String externalLogsUrlTemplate;
@JsonProperty(access = Access.WRITE_ONLY)
Expand All @@ -36,6 +38,7 @@ public class AppConfiguration {

@ConfigurationInject
public AppConfiguration(Integer maxRunningJobs,
Integer maxStartingJobs,
@Nullable String sparkHistoryServerUrl,
@Nullable String externalLogsUrlTemplate,
Integer pyGatewayPort,
Expand All @@ -45,6 +48,7 @@ public AppConfiguration(Integer maxRunningJobs,
@Nullable Map<String, String> batchDefaultConf,
@Nullable Map<String, String> sessionDefaultConf) {
this.maxRunningJobs = maxRunningJobs;
this.maxStartingJobs = maxStartingJobs;
this.sparkHistoryServerUrl = sparkHistoryServerUrl;
this.externalLogsUrlTemplate = externalLogsUrlTemplate;
this.pyGatewayPort = pyGatewayPort;
Expand All @@ -58,6 +62,10 @@ public Integer getMaxRunningJobs() {
return maxRunningJobs;
}

public Integer getMaxStartingJobs() {
return maxStartingJobs;
}

public String getSparkHistoryServerUrl() {
return sparkHistoryServerUrl;
}
Expand Down Expand Up @@ -90,6 +98,7 @@ public Map<String, String> getSessionDefaultConf() {
public String toString() {
return new StringJoiner(", ", AppConfiguration.class.getSimpleName() + "[", "]")
.add("maxRunningJobs=" + maxRunningJobs)
.add("maxStartingJobs=" + maxStartingJobs)
.add("sparkHistoryServerUrl=" + sparkHistoryServerUrl)
.add("sessionConfiguration=" + sessionConfiguration)
.add("batchDefaultConf=" + batchDefaultConf)
Expand Down
1 change: 1 addition & 0 deletions server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
lighter:
max-running-jobs: 5
max-starting-jobs: 5
spark-history-server-url: http://localhost/spark-history/
frontend-path: file:${FRONTEND_PATH:../frontend/build}
py-gateway-port: 25333
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.exacaster.lighter.application.batch

import com.exacaster.lighter.Application
import com.exacaster.lighter.application.ApplicationState
import com.exacaster.lighter.application.ApplicationStatusHandler
import com.exacaster.lighter.backend.Backend
Expand Down Expand Up @@ -51,8 +52,35 @@ class BatchHandlerTest extends Specification {

then:
_ * service.fetchRunning() >> [app]
_ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, config.getMaxRunningJobs() - 1) >> []
0 * handler.launch(app, _) >> { }
_ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, _) >> []
}

def "validate max running jobs limit"() {
given:
def app = newApplication()
def runningApps = [app, app]

when:
handler.processScheduledBatches()

then:
_ * service.fetchRunning() >> runningApps
_ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, config.getMaxRunningJobs() - runningApps.size()) >> [app]
(config.getMaxRunningJobs() - runningApps.size()) * handler.launch(app, _) >> EmptyWaitable.INSTANCE
}

def "validate max starting jobs limit"() {
given:
def app = newApplication()
def appsToRun = [app, app, app]

when:
handler.processScheduledBatches()

then:
_ * service.fetchRunning() >> []
_ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, config.getMaxStartingJobs()) >> appsToRun
(config.getMaxStartingJobs()) * handler.launch(app, _) >> EmptyWaitable.INSTANCE
}

def "tracks running jobs"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class Factories {

static appConfiguration() {
new AppConfiguration(
10,
3,
2,
"http://history",
null,
5432,
Expand Down

0 comments on commit c1a8908

Please sign in to comment.