Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect Spark handles to reduce memory usage #32

Merged
merged 2 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public ApplicationStatusHandler(ApplicationStorage applicationStorage, Backend b
this.logService = logService;
}


public void processApplicationStarting(Application application) {
applicationStorage.saveApplication(ApplicationBuilder.builder(application)
.setState(ApplicationState.STARTING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@
import com.exacaster.lighter.application.ApplicationState;
import com.exacaster.lighter.application.ApplicationStatusHandler;
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.concurrency.Waitable;
import com.exacaster.lighter.configuration.AppConfiguration;
import com.exacaster.lighter.spark.SparkApp;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Singleton;
import java.util.function.Consumer;

import java.util.stream.Collectors;
import net.javacrumbs.shedlock.micronaut.SchedulerLock;
import org.slf4j.Logger;

@Singleton
public class BatchHandler {

private static final Logger LOG = getLogger(BatchHandler.class);
private static final int MAX_SLOTS_PER_ITERATION = 10;

private final Backend backend;
private final BatchService batchService;
Expand All @@ -34,23 +37,30 @@ public BatchHandler(Backend backend, BatchService batchService, AppConfiguration
this.statusTracker = statusTracker;
}

public void launch(Application application, Consumer<Throwable> errorHandler) {
public Waitable launch(Application application, Consumer<Throwable> errorHandler) {
var app = new SparkApp(application.getSubmitParams(), errorHandler);
app.launch(backend.getSubmitConfiguration(application));
return app.launch(backend.getSubmitConfiguration(application));
}

@SchedulerLock(name = "processScheduledBatches")
@Scheduled(fixedRate = "1m")
public void processScheduledBatches() {
public void processScheduledBatches() throws InterruptedException {
assertLocked();
var emptySlots = countEmptySlots();
LOG.info("Processing scheduled batches, found empty slots: {}", emptySlots);
batchService.fetchByState(ApplicationState.NOT_STARTED, emptySlots)
.forEach(batch -> {
var slotsToTake = Math.min(MAX_SLOTS_PER_ITERATION, emptySlots);
LOG.info("Processing scheduled batches, found empty slots: {}, using {}", emptySlots, slotsToTake);
var waitables = batchService.fetchByState(ApplicationState.NOT_STARTED, slotsToTake)
.stream()
.map(batch -> {
LOG.info("Launching {}", batch);
statusTracker.processApplicationStarting(batch);
launch(batch, error -> statusTracker.processApplicationError(batch, error));
});
return launch(batch, error -> statusTracker.processApplicationError(batch, error));
})
.collect(Collectors.toList());
LOG.info("Waiting launches to complete");
for (var waitable : waitables) {
waitable.waitCompletion();
}
}

private Integer countEmptySlots() {
Expand All @@ -59,7 +69,7 @@ private Integer countEmptySlots() {

@SchedulerLock(name = "trackRunning")
@Scheduled(fixedRate = "1m")
public void trackRunning() {
public void trackRunning() throws InterruptedException {
assertLocked();
var completedCount = batchService.fetchRunning().stream()
.map(statusTracker::processApplicationRunning)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.exacaster.lighter.application.ApplicationStatusHandler;
import com.exacaster.lighter.application.sessions.processors.StatementHandler;
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.concurrency.Waitable;
import com.exacaster.lighter.configuration.AppConfiguration;
import com.exacaster.lighter.configuration.AppConfiguration.SessionConfiguration;
import com.exacaster.lighter.spark.SparkApp;
Expand Down Expand Up @@ -45,9 +46,9 @@ public SessionHandler(SessionService sessionService,
this.sessionConfiguration = appConfiguration.getSessionConfiguration();
}

public void launch(Application application, Consumer<Throwable> errorHandler) {
public Waitable launch(Application application, Consumer<Throwable> errorHandler) {
var app = new SparkApp(application.getSubmitParams(), errorHandler);
app.launch(backend.getSubmitConfiguration(application));
return app.launch(backend.getSubmitConfiguration(application));
}

@SchedulerLock(name = "keepPermanentSession")
Expand All @@ -66,15 +67,21 @@ public void keepPermanentSessions() {

@SchedulerLock(name = "processScheduledSessions")
@Scheduled(fixedRate = "1m")
public void processScheduledSessions() {
public void processScheduledSessions() throws InterruptedException {
assertLocked();
sessionService.fetchByState(ApplicationState.NOT_STARTED, 10).forEach(this::launchSession);
var waitables = sessionService.fetchByState(ApplicationState.NOT_STARTED, 10).stream()
.map(this::launchSession)
.collect(Collectors.toList());

for (var waitable : waitables) {
waitable.waitCompletion();
}
}

private void launchSession(Application session) {
private Waitable launchSession(Application session) {
LOG.info("Launching {}", session);
statusTracker.processApplicationStarting(session);
launch(session, error -> statusTracker.processApplicationError(session, error));
return launch(session, error -> statusTracker.processApplicationError(session, error));
}

@SchedulerLock(name = "trackRunningSessions")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.exacaster.lighter.concurrency;

public enum EmptyWaitable implements Waitable {
INSTANCE {
@Override
public void waitCompletion() throws InterruptedException {
// nothing to wait
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.exacaster.lighter.concurrency;

public interface Waitable {

void waitCompletion() throws InterruptedException;
}
29 changes: 8 additions & 21 deletions server/src/main/java/com/exacaster/lighter/spark/SparkApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,15 @@
import static org.apache.spark.launcher.SparkLauncher.DRIVER_MEMORY;
import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_CORES;
import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_MEMORY;
import static org.slf4j.LoggerFactory.getLogger;

import com.exacaster.lighter.concurrency.EmptyWaitable;
import com.exacaster.lighter.concurrency.Waitable;
import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkAppHandle.Listener;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;

public class SparkApp {

private static final Logger LOG = getLogger(SparkApp.class);

private final SubmitParams submitParams;
private final Consumer<Throwable> errorHandler;

Expand All @@ -25,7 +20,7 @@ public SparkApp(SubmitParams submitParams, Consumer<Throwable> errorHandler) {
this.errorHandler = errorHandler;
}

public void launch(Map<String, String> extraConfiguration) {
public Waitable launch(Map<String, String> extraConfiguration) {
try {
var launcher = new SparkLauncher()
.setAppName(submitParams.getName())
Expand All @@ -48,22 +43,14 @@ public void launch(Map<String, String> extraConfiguration) {
.setConf(EXECUTOR_CORES, String.valueOf(submitParams.getExecutorCores()))
.setConf(EXECUTOR_MEMORY, submitParams.getExecutorMemory())
.setConf("spark.executor.instances", String.valueOf(submitParams.getNumExecutors()));
launcher.startApplication(new Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), handle.getState());
handle.getError().ifPresent(errorHandler);
}

@Override
public void infoChanged(SparkAppHandle handle) {
// TODO: ?
}
});

var listener = new SparkListener(errorHandler);
launcher.startApplication(listener);
return listener;
} catch (IOException | IllegalArgumentException e) {
this.errorHandler.accept(e);
}

return EmptyWaitable.INSTANCE;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.exacaster.lighter.spark;

import static org.slf4j.LoggerFactory.getLogger;

import com.exacaster.lighter.concurrency.Waitable;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkAppHandle.Listener;
import org.slf4j.Logger;

public class SparkListener implements Listener, Waitable {

private static final Logger LOG = getLogger(SparkListener.class);
private final Consumer<Throwable> errorHandler;
private final CountDownLatch latch;

public SparkListener(Consumer<Throwable> errorHandler) {
this.errorHandler = errorHandler;
this.latch = new CountDownLatch(1);
}

@Override
public void stateChanged(SparkAppHandle handle) {
LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), handle.getState());
handle.getError().ifPresent(errorHandler);
if (handle.getState() != null && handle.getState().isFinal()) {
handle.disconnect();
latch.countDown();
}
}

@Override
public void infoChanged(SparkAppHandle handle) {
// TODO: ?
}

@Override
public void waitCompletion() throws InterruptedException {
latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.exacaster.lighter.application.batch
import com.exacaster.lighter.application.ApplicationState
import com.exacaster.lighter.application.ApplicationStatusHandler
import com.exacaster.lighter.backend.Backend
import com.exacaster.lighter.concurrency.EmptyWaitable
import com.exacaster.lighter.configuration.AppConfiguration
import net.javacrumbs.shedlock.core.LockAssert
import spock.lang.Specification
Expand Down Expand Up @@ -37,7 +38,7 @@ class BatchHandlerTest extends Specification {
then:
_ * service.fetchRunning() >> []
1 * service.fetchByState(ApplicationState.NOT_STARTED, _) >> [app]
1 * handler.launch(app, _) >> { }
1 * handler.launch(app, _) >> EmptyWaitable.INSTANCE
}

def "does not trigger when there are no empty slots"() {
Expand Down