Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry broadcast OOM with BHJ disabled within the same spark session #17528

Merged
merged 1 commit into from
Mar 28, 2022

Conversation

pgupta2
Copy link
Contributor

@pgupta2 pgupta2 commented Mar 25, 2022

Presto on Spark uses temp storage for storing and distributing
broadcast tables. Spark driver performs the necessary threshold
checks on broadcast table and if the size is over the threshold,
the query fails with broadcast oom. The only way to fix this
failure is to disable broadcast join in the query.

As we are able to detect broadcast OOM on driver confidently,
we can just disable broadcast join, replan and resubmit the
query for execution. This can happen within the same spark
session itself and thus would not need any users intervention
for fixing such failures.

Test plan -

  • Unit Test
  • Tested production workload and verified that retry logic is working correctly.
== RELEASE NOTES ==

Spark Changes
* Add a new configuration property ``spark.retry-on-out-of-memory-broadcast-join-enabled`` to disable broadcast join on broadcast OOM and retry the query again within the same spark session.  This can be overridden by ``spark_retry_on_out_of_memory_broadcast_join_enabled`` session property

@pgupta2 pgupta2 marked this pull request as draft March 25, 2022 01:22
@pgupta2 pgupta2 force-pushed the retry_broadcast_oom branch from 6ff98a0 to f739e05 Compare March 25, 2022 06:07
@pgupta2 pgupta2 marked this pull request as ready for review March 25, 2022 15:33

import static java.util.Objects.requireNonNull;

public class PrestoSparkFailure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick- may be rename to reflect that this is a runtime exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is based on Failure.java in presto-main module and I have tried to keep things as similar as possible between the two. The reason I had to create this new class is to enable the flow of error information from presto-main to presto-spark-launcher module where PrestoSparkRunner resides. PrestoSparkRunner is the entity that orchestrates the execution of PoS query and thus it needs to have access to failure info to decide it it should retry or not.

@singcha singcha self-requested a review March 25, 2022 17:51

private IPrestoSparkQueryExecution createSparkQueryExecution(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to split this into a function or this can be pulled in execute()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. This is not needed. I actually did some refactor before and then changed it again and forgot to remove this.

@@ -88,7 +91,136 @@ public void run(
Optional<String> queryDataOutputLocation)
{
IPrestoSparkQueryExecutionFactory queryExecutionFactory = driverPrestoSparkService.getQueryExecutionFactory();
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of arguments probably justify moving it to a context structure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Let me do this change.

public class PrestoSparkFailure
extends RuntimeException
{
private final String type;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of string shall we think of enums? Both for the errorcode and type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer above comment.

@@ -41,6 +41,7 @@
private int splitAssignmentBatchSize = 1_000_000;
private double memoryRevokingThreshold;
private double memoryRevokingTarget;
private boolean disableBroadcastJoinOnOOM;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ...OnOutOfMemory

We don't use abbreviations in the codebase.

Also, it would be good to use some positive names like enable instead of disable. Having a negation in the config will usually make it hard to understand for users. By looking through the PR, maybe retryOnOutOfMemoryBroadcastJoin

Comment on lines 230 to 236
public boolean isDisableBroadcastJoinOnOOM()
{
return disableBroadcastJoinOnOOM;
}

@Config("spark.disable-broadcast-join-on-oom")
public PrestoSparkConfig setDisableBroadcastJoinOnOOM(boolean disableBroadcastJoinOnOOM)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same nits

prestoSparkSession.getCatalogSessionProperties(),
prestoSparkSession.getTraceToken());
}

private static Map<String, String> getFinalSystemProperties(Map<String, String> systemProperties, Optional<RetryExecutionStrategy> retryExecutionStrategy)
{
if (retryExecutionStrategy.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

if (!retryExecutionStrategy.isPresent()) {
    return systemProperties;
}
...

@@ -41,6 +41,7 @@
public static final String SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE = "spark_split_assignment_batch_size";
public static final String SPARK_MEMORY_REVOKING_THRESHOLD = "spark_memory_revoking_threshold";
public static final String SPARK_MEMORY_REVOKING_TARGET = "spark_memory_revoking_target";
public static final String SPARK_DISABLE_BROADCAST_JOIN_ON_OOM = "spark_disable_broadcast_join_on_oom";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same nit: spell out OOM

Comment on lines 283 to 285
if (executionFailureInfo == null) {
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scares me a bit.. for both input and output. From the callsites, seems there are no nulls? Actually we might checkArgument non null here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toPrestoSparkFailure() is called recursively and executionFailureInfo will be null where we explicitly throw an error from spark driver itself like in case of broadcast join OOM detected on driver side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, maybe add a comment for now. Let me read the logic deeper in the next iteration lol

@@ -908,6 +910,35 @@ public void testStorageBasedBroadcastJoinMaxThreshold()
"Query exceeded per-node total memory limit of 1MB \\[Compressed broadcast size: .*kB; Uncompressed broadcast size: .*MB\\]");
}

@Test
public void testDisableBroadcastJoinOnOOM()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same nit OOM

Comment on lines 51 to 55
public boolean isBroadcastJoinOOM()
{
return getErrorCode().equals("EXCEEDED_LOCAL_MEMORY_LIMIT")
&& getMessage().contains("Query exceeded per-node broadcast memory limit");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmm this looks a bit hacky...

Can we return a set of RetryExecutionStrategy instead? Check my other comment at toPrestoSparkFailure

Comment on lines 119 to 120
String disableBroadcastJoinOnOOM = sessionProperties.get("spark_disable_broadcast_join_on_oom");
if (disableBroadcastJoinOnOOM != null && disableBroadcastJoinOnOOM.equalsIgnoreCase("true") && failure.isBroadcastJoinOOM()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make these tests very generic. In the future, we may have more retry strategies. We should get the retry signal from PrestoSparkFailure directly instead of having string/session comparison scared around

@pgupta2 pgupta2 force-pushed the retry_broadcast_oom branch from f739e05 to ff45eef Compare March 27, 2022 07:21
@pgupta2 pgupta2 requested review from highker and souravpal March 27, 2022 19:48
Comment on lines +37 to +59
if (executionFailureInfo == null) {
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By reading the logic, only recursive calls can accept or return nulls right? Shall we restructure the code the following one?

public static PrestoSparkFailure toPrestoSparkFailure(Session session, ExecutionFailureInfo executionFailureInfo)
{
    requireNonNull(executionFailureInfo, "executionFailureInfo is null");
    PrestoSparkFailure prestoSparkFailure = toPrestoSparkFailure(executionFailureInfo);
    checkState(prestoSparkFailure != null);

    Optional<RetryExecutionStrategy> retryExecutionStrategy = getRetryExecutionStrategy(session, executionFailureInfo.getErrorCode(), executionFailureInfo.getMessage());
    return new PrestoSparkFailure(
            prestoSparkFailure.getMessage(),
            prestoSparkFailure.getCause(),
            prestoSparkFailure.getType(),
            prestoSparkFailure.getErrorCode(),
            retryExecutionStrategy);
}

@Nullable
private static PrestoSparkFailure toPrestoSparkFailure(ExecutionFailureInfo executionFailureInfo)
{
    if (executionFailureInfo == null) {
        return null;
    }

    PrestoSparkFailure prestoSparkFailure = new PrestoSparkFailure(
            executionFailureInfo.getMessage(),
            toPrestoSparkFailure(executionFailureInfo.getCause()),
            executionFailureInfo.getType(),
            executionFailureInfo.getErrorCode() == null ? "" : executionFailureInfo.getErrorCode().getName(),
            Optional.empty());

    for (ExecutionFailureInfo suppressed : executionFailureInfo.getSuppressed()) {
        prestoSparkFailure.addSuppressed(requireNonNull(toPrestoSparkFailure(suppressed), "suppressed failure is null"));
    }
    ImmutableList.Builder<StackTraceElement> stackTraceBuilder = ImmutableList.builder();
    for (String stack : executionFailureInfo.getStack()) {
        stackTraceBuilder.add(toStackTraceElement(stack));
    }
    List<StackTraceElement> stackTrace = stackTraceBuilder.build();
    prestoSparkFailure.setStackTrace(stackTrace.toArray(new StackTraceElement[stackTrace.size()]));
    return prestoSparkFailure;
}

Comment on lines 74 to 78
private static boolean isBroadcastJoinOOM(ErrorCode errorCode, String message)
{
return errorCode == EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode()
&& message.contains("Query exceeded per-node broadcast memory limit");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's inline this function by introducing a new error code in StandardErrorCode: EXCEEDED_LOCAL_BROADCAST_JOIN_MEMORY_LIMIT so we don't compare message content

Copy link
Contributor Author

@pgupta2 pgupta2 Mar 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that I discussed with the team as well few week ago. The only concern I had was if this will have any side-effects in our accounting/alerting/monitoring since we will be changing the existing error code for broadcast failures (or are you suggesting that we introduce a totally new error code for broadcast join ooms in Presto on Spark only?). If you feel this will be safe, I am more than happy to do this in this PR itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we introduce a new error code 'EXCEEDED_LOCAL_BROADCAST_JOIN_MEMORY_LIMIT' that is only thrown in exceededLocalBroadcastMemoryLimit branch in ExceededMemoryLimitException.

Presto on Spark uses temp storage for storing and distributing
broadcast tables. Spark driver performs the necessary threshold
checks on broadcast table and if the size is over the threshold,
the query fails with broadcast oom. The only way to fix this
failure is to disable broadcast join in the query.

As we are able to detect broadcast OOM on driver confidently,
we can just disable broadcast join, replan and resubmit the
query for execution. This can happen within the same spark
session itself and thus would not need any users intervention
for fixing such failures.
@pgupta2 pgupta2 force-pushed the retry_broadcast_oom branch from ff45eef to af8feb8 Compare March 28, 2022 16:24
@pgupta2 pgupta2 requested a review from highker March 28, 2022 16:25
@highker highker self-assigned this Mar 28, 2022
@highker highker merged commit e98b500 into prestodb:master Mar 28, 2022
@mshang816 mshang816 mentioned this pull request May 17, 2022
14 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants