Skip to content

Commit

Permalink
[GOBBLIN-2189] Implemented ContainerCompletion callback in DynamicSca…
Browse files Browse the repository at this point in the history
…lingYarnService
  • Loading branch information
Blazer-007 authored Feb 20, 2025
2 parents 8dde23a + db9cf7e commit eb60a2c
Show file tree
Hide file tree
Showing 7 changed files with 517 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ public void run() {
List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives();
if (CollectionUtils.isNotEmpty(scalingDirectives)) {
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
} else {
dynamicScalingYarnService.calcDeltasAndRequestContainers();
}
} catch (FileNotFoundException fnfe) {
log.warn("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace
// FNFE comes when scaling directives path is not yet created, so we should just calc delta & request containers if needed
log.debug("Scaling directives file not found(possibly not yet created). Falling back to delta calculation. - " + fnfe.getMessage());
dynamicScalingYarnService.calcDeltasAndRequestContainers();
} catch (IOException e) {
log.error("Failed to get scaling directives", e);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,29 @@

package org.apache.gobblin.temporal.yarn;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
import org.apache.gobblin.temporal.dynamic.WorkerProfile;
Expand All @@ -42,18 +54,28 @@
*/
@Slf4j
public class DynamicScalingYarnService extends YarnService {
private static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile";
private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE = 1;
protected static final int GENERAL_OOM_EXIT_STATUS_CODE = 137;
protected static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2;
private static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB
private static final int EPSILON_MIILIS = 1;

/** this holds the current count of containers already requested for each worker profile */
private final WorkforceStaffing actualWorkforceStaffing;
/** this holds the current total workforce plan as per latest received scaling directives */
private final WorkforcePlan workforcePlan;
protected final Queue<ContainerId> removedContainerIds;
private final AtomicLong profileNameSuffixGenerator;

public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception {
super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);

this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
this.removedContainerIds = new ConcurrentLinkedQueue<>();
this.profileNameSuffixGenerator = new AtomicLong();
}

@Override
Expand All @@ -62,6 +84,78 @@ protected synchronized void requestInitialContainers() {
requestNewContainersForStaffingDeltas(deltas);
}

/**
* Handle the completion of a container. A new container will be requested to replace the one
* that just exited depending on the exit status.
* <p>
* A container completes in either of the following conditions:
* <ol>
* <li> The container gets stopped by the ApplicationMaster. </li>
* <li> Some error happens in the container and caused the container to exit </li>
* <li> The container gets preempted by the ResourceManager </li>
* <li> The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory </li>
* </ol>
* A replacement container is needed in all except the first case.
* </p>
*/
@Override
protected void handleContainerCompletion(ContainerStatus containerStatus) {
ContainerId completedContainerId = containerStatus.getContainerId();
ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId);

// Because callbacks are processed asynchronously, we might encounter situations where handleContainerCompletion()
// is called before onContainersAllocated(), resulting in the containerId missing from the containersMap.
// We use removedContainerIds to remember these containers and remove them from containerMap later
// when we call reviseWorkforcePlanAndRequestNewContainers method
if (completedContainerInfo == null) {
log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()",
completedContainerId);
this.removedContainerIds.add(completedContainerId);
return;
}

log.info("Container {} running profile {} completed with exit status {}",
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus());

if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
log.info("Container {} running profile {} completed with diagnostics: {}",
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics());
}

if (this.shutdownInProgress) {
log.info("Ignoring container completion for container {} as shutdown is in progress", completedContainerId);
return;
}

WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();

switch (containerStatus.getExitStatus()) {
case(ContainerExitStatus.ABORTED):
handleAbortedContainer(completedContainerId, completedContainerInfo);
break;
case(ContainerExitStatus.PREEMPTED):
log.info("Container {} for profile {} preempted, starting to launching a replacement container",
completedContainerId, completedContainerInfo.getWorkerProfileName());
requestContainersForWorkerProfile(workerProfile, 1);
break;
case(GENERAL_OOM_EXIT_STATUS_CODE):
case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
handleContainerExitedWithOOM(completedContainerId, completedContainerInfo);
break;
case(LAUNCH_CONTAINER_FAILED_EXIT_CODE):
log.info("Exit status 1.CompletedContainerInfo = {}", completedContainerInfo);
break;
case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION:
case ContainerExitStatus.SUCCESS:
break;
default:
log.warn("Container {} exited with unhandled status code {}. ContainerInfo: {}",
completedContainerId, containerStatus.getExitStatus(), completedContainerInfo);
break;
}
}

/**
* Revises the workforce plan and requests new containers based on the given scaling directives.
*
Expand All @@ -72,28 +166,101 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<Scaling
return;
}
this.workforcePlan.reviseWhenNewer(scalingDirectives);
calcDeltasAndRequestContainers();
}

public synchronized void calcDeltasAndRequestContainers() {
// Correct the actualWorkforceStaffing in case of handleContainerCompletion() getting called before onContainersAllocated()
Iterator<ContainerId> iterator = removedContainerIds.iterator();
while (iterator.hasNext()) {
ContainerId containerId = iterator.next();
ContainerInfo containerInfo = this.containerMap.remove(containerId);
if (containerInfo != null) {
WorkerProfile workerProfile = containerInfo.getWorkerProfile();
int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
if (currNumContainers > 0) {
this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1,
System.currentTimeMillis());
}
iterator.remove();
}
}
StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}

private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
deltas.getPerProfileDeltas().forEach(profileDelta -> {
if (profileDelta.getDelta() > 0) { // scale up!
WorkerProfile workerProfile = profileDelta.getProfile();
String profileName = workerProfile.getName();
int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
int delta = profileDelta.getDelta();
WorkerProfile workerProfile = profileDelta.getProfile();
String profileName = workerProfile.getName();
int delta = profileDelta.getDelta();
int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
if (delta > 0) { // scale up!
log.info("Requesting {} new containers for profile {} having currently {} containers", delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
requestContainersForWorkerProfile(workerProfile, delta);
// update our staffing after requesting new containers
this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis());
} else if (profileDelta.getDelta() < 0) { // scale down!
// TODO: Decide how to handle negative deltas
log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ",
profileDelta.getProfile().getName(), profileDelta.getDelta());
} else if (delta < 0) { // scale down!
log.info("Releasing {} containers for profile {} having currently {} containers", -delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
releaseContainersForWorkerProfile(profileName, delta);
// update our staffing after releasing containers
int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
this.actualWorkforceStaffing.reviseStaffing(profileName, numContainersAfterRelease, System.currentTimeMillis());
} // else, already at staffing plan (or at least have requested, so in-progress)
});
}

private void handleAbortedContainer(ContainerId completedContainerId, ContainerInfo completedContainerInfo) {
// Case 1 : Container release requested while scaling down
if (this.releasedContainerCache.getIfPresent(completedContainerId) != null) {
log.info("Container {} was released while downscaling for profile {}", completedContainerId, completedContainerInfo.getWorkerProfileName());
this.releasedContainerCache.invalidate(completedContainerId);
return;
}

// Case 2 : Container release was not requested, we need to request a replacement container
log.info("Container {} aborted for profile {}, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName());
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
}

private synchronized void handleContainerExitedWithOOM(ContainerId completedContainerId, ContainerInfo completedContainerInfo) {
log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container",
completedContainerId, completedContainerInfo.getWorkerProfileName());

List<ScalingDirective> scalingDirectives = new ArrayList<>();

WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
long currTimeMillis = System.currentTimeMillis();
// Update the current staffing to reflect the container that exited with OOM
int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
if (currNumContainers > 0) {
this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, currTimeMillis);
// Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile,
// otherwise extra containers will be requested when calculating deltas
scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, currTimeMillis));
}

// Request a replacement container
int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
if (currContainerMemoryMbs >= MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
log.warn("Container {} already had max allowed memory {} MBs. Not requesting a replacement container.",
completedContainerId, currContainerMemoryMbs);
return;
}
int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER,
MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
Optional<ProfileDerivation> optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(),
new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + ""))
));
scalingDirectives.add(new ScalingDirective(
DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + profileNameSuffixGenerator.getAndIncrement(),
1,
currTimeMillis + EPSILON_MIILIS, // Each scaling directive should have a newer timestamp than the previous one
optProfileDerivation
));
reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
}

}
Loading

0 comments on commit eb60a2c

Please sign in to comment.