Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimization] ValidateJoinRequest serialization takes lot of memory in ClusterManager with large cluster state #5100

Closed
shwetathareja opened this issue Nov 7, 2022 · 22 comments
Assignees
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request Performance This is for any performance related enhancements or bugs

Comments

@shwetathareja
Copy link
Member

shwetathareja commented Nov 7, 2022

Describe the bug
ValidateJoinRequest serializes the cluster state and sends it to the node joining the cluster. But, if the cluster state is in 100s of MBs due to large no. of indices, then it takes up lot of memory. It gets worse when there are multiple nodes joining at the same time. This sometimes ends up causing OOM error on the ClusterManager.

public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
transportService.sendRequest(
node,
VALIDATE_JOIN_ACTION_NAME,
new ValidateJoinRequest(state),

Expected behavior

  1. When ClusterManager normally publishes the cluster state, it compresses the state which is not case here. It should compress the state
    private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
    final BytesStreamOutput bStream = new BytesStreamOutput();
    try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
    stream.setVersion(nodeVersion);
    stream.writeBoolean(true);
    clusterState.writeTo(stream);
    }
  2. Even with compression, it would be serializing the state per node join. The index metadata doesn't change with every node join and it is the RoutingNodes and DiscoveryNodes which change and when multiple nodes are joining at the time, it can cache and reuse the Metadata based on the version.
@shwetathareja shwetathareja added bug Something isn't working untriaged labels Nov 7, 2022
@anasalkouz anasalkouz added Performance This is for any performance related enhancements or bugs enhancement Enhancement or improvement to existing feature or request and removed untriaged bug Something isn't working labels Nov 8, 2022
@amkhar
Copy link
Contributor

amkhar commented Mar 20, 2023

@amkhar
Copy link
Contributor

amkhar commented Apr 17, 2023

I have tried the code changes for compression and decompression, using similar approach what is used in PublicationTransportHandler.

For compression :

public void sendValidateJoinRequest(DiscoveryNode node, ClusterState *state*, ActionListener<TransportResponse.Empty> listener) {
    BytesReference bytes = serializeFullClusterState(state, node.getVersion());
    final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion());
    transportService.sendRequest(
        node,
        VALIDATE_JOIN_ACTION_NAME,
        request,
        new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC)
    );
}

Code for serialization :
serializeFullClusterState

Code for decompression:

private void handleValidateJoinRequest(Supplier<ClusterState> currentStateSupplier,
                                           Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
                                           BytesTransportRequest request) throws IOException {
        final Compressor compressor = CompressorFactory.compressor(request.bytes());
        StreamInput in = request.bytes().streamInput();
        final ClusterState incomingState;
        try {
            if (compressor != null) {
                in = new InputStreamStreamInput(compressor.threadLocalInputStream(in));
            }
            in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
            in.setVersion(request.version());
            // If true we received full cluster state - otherwise diffs
            if (in.readBoolean()) {
                // Close early to release resources used by the de-compression as early as possible
                try (StreamInput input = in) {
                    incomingState = ClusterState.readFrom(input, transportService.getLocalNode());
                    long handlerTime = System.nanoTime();
                    logger.info("handler time {}", handlerTime);
                } catch (Exception e) {
                    logger.warn("unexpected error while deserializing an incoming cluster state", e);
                    throw e;
                }

                final ClusterState localState = currentStateSupplier.get();
                if (localState.metadata().clusterUUIDCommitted()
                    && localState.metadata().clusterUUID().equals(incomingState.metadata().clusterUUID()) == false) {
                    throw new CoordinationStateRejectedException(
                        "join validation on cluster state"
                            + " with a different cluster uuid "
                            + incomingState.metadata().clusterUUID()
                            + " than local cluster uuid "
                            + localState.metadata().clusterUUID()
                            + ", rejecting"
                    );
                }
                joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState));
            }
        } finally {
            IOUtils.close(in);
        }
    }

Setup : I created 2 [100 data node + 3 dedicated cluster manager node] cluster with 5G heap, created 4K indices with huge mappings(attaching file) and killed OS process on all the data nodes at once.
I used this code in JoinHelper class for validate-join flow. I was monitoring the JVMMP of ClusterManager node, was not able to see any significant gain out of it.
I did test the total turn around time taken in compression + transport + decompression, overall it's somewhat lesser or same in comparison to current code.

For JVM - Though I was able to see that sometimes(intermittent 1-2 times in 10 runs) it goes beyond 90 too with current code, but with updated code it's always within limits.

Attaching the heap dump screenshot of GC roots and dominators so we can drill down more towards correct way of re-using the memory or implementing any sort of caching in code.
current-code-node-drop-dominators-1404
current-code-node-drop-GC-roots-1404

Same screenshots for newly added code.

new-code-node-drop-dominators-1704
new-code-node-drop-GC-roots-1704

As the timing of taking heapdump also plays a role and node joins finish very quickly, the exact numbers may not be comparable but the unit size of objects.

@shwetathareja
Copy link
Member Author

Couple of thoughts:

  1. This ValidateJoinRequest is best effort so that the joining node can run the compatibility validations and in case of failure it wouldn't add un-necessary node-join task in cluster manager queue and cause the overhead.
  2. I was thinking if we should make this request optional using a setting but incompatible node can stress the cluster manager by sending frequent node-join requests.

Considering this check is anyway best effort, it might be ok to cache the compressed object of the whole cluster state for x duration which could be a configurable ttl with a max threshold. During JoinTaskExecutor, Leader would run the compatibility check again (which would be on latest state always)

ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
ensureIndexCompatibility(node.getVersion(), currentState.getMetadata());

@amkhar
Copy link
Contributor

amkhar commented Apr 26, 2023

@shwetathareja

Thanks for pointing this out. Yes, only 2 validations ensureNodesCompatibility, ensureIndexCompatibility are happening today at node level. And they are also being re-executed at cluster manager flow during node join(after validation from node is completed).

True, making this call optional is not a good idea as not just the node can put load on cluster manager, these validations are provided as an interface so plugins can extend this. Considering this has been there in the past, changing this behaviour can impact our clients.

Other thought which was initially put up on this issue was to break the data into two parts and only cache the metadata as routing data keeps changing. This is not a good approach, as in future other types of data is also required and if we send only cached metadata all the time other side(data node) of implementation would suffer. Also let's understand a little bit more about the flow to discuss the caching solution and its trade offs.

Today we wrap the clusterState object with a ValidateJoinRequest, this means that we're going to call the writeTo(stream) method of ClusterState class every time a node joins at the transport layer. Writing whole cluster state again and again on different streams which includes write each and every part of cluster state object's writeTo method(metadata, routingTable all of their own set of writeTo method). This requires more buffers while writing on stream. Ultimately Netty takes care of using the ByteReference to create the actual ByteBuf, this time only we allocate memory in heap for all byte[] slices.

Now witch caching of [compressed + serialized] cluster state as bytes, we reduce all of the writeTo method calls on every node join. Also the transport layer uses a separate mechanism of writeThin (instead of writeTo of ClusterState) for writing on the final channel. This way we just write the length of ByteReference buffer not the actual write on stream. Eventually it is picked up by our CompositeByteReference object and while iterating for byte[] slices, we intelligently join the two ByteReferences [one created in JoinHelper and other one initalized in OutboundMessage] for sending already cached data on TCP channel.

Trade off : We're not providing latest updated cluster state to the node/plugins for validating the pre-conditions of a join. So, a node join can happen without knowing that a wrong index metadata was added to cluster state.

Action Items:

  • Understand the impact using async profiler instead of heapdump, as timing the dump is nearly impossible.
  • Understand the impact with and without caching just to see the impact of compression and caching separately.
  • Run a load with same setup but bigger cluster state where instead of index metadata other fields are also high in size.

@shwetathareja
Copy link
Member Author

Thanks @amkhar for the dive deep on caching the serialized + compressed cluster state. It will essentially write the same cached byte array over the transport channel for every ValidationJoinRequest. I think stale cluster state is reasonable trade off to make and keeping the cache duration as configurable setting would give user more flexibility if they want to keep this cache or not and for how long.

@amkhar
Copy link
Contributor

amkhar commented May 2, 2023

Updates from previous action items :

I used async profiler to get the allocation profile while 100 nodes are dropping off and joining back the cluster. It shows improvement if we use caching and compression together. Major improvement is from caching part. Compression shows little improvement like 20% less memory allocations in async profiler alloc dump. Allocation I'm talking about here are specifically for sendValidateJoinRequest flow. With caching overall allocations are reduced by 70% for this specific flow.
Attaching async profiler dumps (change txt to .html then open in chrome).

Though one thing is the overall samples of async profile flamegraph are also reduced. Need to check if reducing async profiler's frequency will show similar number of samples, will also dive deep into how sample is counted.

Old code allocations
old-code-validate-join-alloc-25-1

New code with compression only
compression-only-code-alloc-drops-state-27-1

New code with caching + compression allocations
new-caching-code-alloc-after--snapshot-cleanup-26-2

Async profiler dump files:

old-code-25-1-drops-state-alloc.txt
compression-code-drops-state-alloc-27-1.txt
cached-compression-code-drops-state-alloc-26-c2.txt

@andrross
Copy link
Member

I think stale cluster state is reasonable trade off to make and keeping the cache duration as configurable setting would give user more flexibility if they want to keep this cache or not and for how long.

@shwetathareja Apologies for the delay, was just looking at the PR. I admit I don't know this code super well, but is there any kind of version number that goes along with the cluster state so that we could memoize the latest version of the cluster state and re-send that if unchanged, otherwise recompute it? That could result in all the benefits shown here without the tradeoffs of sometimes sending stale data.

@amkhar
Copy link
Contributor

amkhar commented May 18, 2023

@andrross Thanks for following up. Particular scenario which we are trying to improve here is the one when a lot of nodes join quickly and shoots up master memory. Even if there are 100 nodes, join for all of them will finish in a minute. It doesn't happen in a scattered manner over that minute, mostly 50-60 joins will finish in a second. So, call rate for sendValidate will be in milliseconds. Now with every successful join routing will be updated in cluster state. And version of the state will be changed. If we don't apply caching, we'll end up creating new stream and write full cluster again and again quickly which will be the same scenario as before.

The only benefit we'll get without cachig is compression of cluster state. That also shows benefit but are minimal (considering only huge index metadata) in comparison to caching. Overall time taken with compression is also similar to before.

To gain the significant benefits we chose to go ahead with tradeoff.

@shwetathareja
Copy link
Member Author

shwetathareja commented May 18, 2023

@andrross : Thanks for checking the PR. Yes, there is cluster state version which is bumped up with every change. Like you are suggesting, cluster state version based caching can help to some extend instead of blindly serializing every time. During lot of node joins, the cluster state is also changing frequently in the background, that's what @amkhar is pointing out.
May be we can have version based caching as default (instead of no caching at all which is the default in this PR) and if user is fine with stale data and want further lower memory footprint can choose to enable time based caching as well. @amkhar thoughts?

@amkhar
Copy link
Contributor

amkhar commented May 18, 2023

@andrross @shwetathareja

Currently I've set the default caching for 30s time based. It is a breaking change for customers(though they can still set it to 0 to avoid any caching), but this is the optimal number for gaining the benefits. Let me explain that with an example.

Today as we all know sendValidateJoin is a best effort approach to run the validations on data node to avoid loading the cluster manager with unnecessary failed validations before cluster update task is submitted for node-join. Cluster manager also these same validations again as part of JoinTaskExecutor.execute as part of cluster state update task(via TaskBatcher) . It is only helpful if some plugin has some validations and they must be run on data nodes.

ClusterState.writeTo method is the one which impacts memory as we send the state as part of writing the full cluster state on new stream every time. So let's say cluster state version is 5 and a validate join request happens at time t1, and node-join finishes on cluster manager on time t1' = t1 + delta. Remember this delta is is going to be very less, could be 100ms or 500ms(or 1s worst case?) but rest of the flow of cluster state update once validate join returns successful will always be in ms. So we can provide caching on cluster state version, in that case all the requests coming during this delta period will be served via cache.

With my experiments, I've seen that the lesser is cache refresh interval, gain is also minimal. I tried it with 10s interval and the async profiler allocations were not decreased as much as they were decreased with 30s cache. So, there will be some benefit from cluster state version based cache, but only theoretically. In practice, it'll be non-deterministic. Like if at all, a lot of validate join requests land in the same interval before cluster state update flow is completed, we'll utilize cache. Else, once cluster manager has updated cluster state, version is changed, and cache is invalid now (we'll recompute). Mostly 2nd scenario will happen.

What we can do is, that keep the default as 0 in 2.x to make sure no customers are impacted and set it to 30s in 3.0 adding it as a breaking change. Now we can still provide the cluster-state-version based caching if time based cache is set to 0 to provide any little theoretical benefit there is possible.
Also, I'll go back and run the benchmarks again with state-version based caching to understand that number in benefit. [AI on @amkhar ]

Note: cluster state being stale doesn't mean there will be issues in correctness, those validations are still going to run before completing node-join task. It's just that data node itself won't catch it, but again in rare scenarios only. And customers with 3.0 can still set it to 0 to avoid any caching if they want.

@andrross let me know your thoughts.

@shwetathareja - feel free to add your thoughts around adding another setting to make it optional as you mentioned in #5100 (comment)

@amkhar
Copy link
Contributor

amkhar commented May 18, 2023

Updated #5100 (comment) which includes compression only profile to show the impact without caching.

@andrross
Copy link
Member

It is only helpful if some plugin has some validations and they must be run on data nodes.

@amkhar This seems a bit concerning if there is logic that must be run on data nodes. With caching isn't it possible that necessary validations could be skipped if the relevant change in cluster state is missed due to serving stale data out of the cache?

if user is fine with stale data and want further lower memory footprint can choose to enable time based caching as well

@shwetathareja I'm concerned with such a low-level knob here. I think we're making the best-effort behavior of sendValidateJoin slightly less good, depending on the time value chosen. Honestly I would feel better with a switch to completely turn off this best-effort validation (is that terrible idea?) versus the caching behavior. I don't like the non-determinism of caching. Thirty seconds of caching might work great the vast majority of the time but it might also introduce difficult to reproduce edge cases and non-deterministic behavior.

@amkhar
Copy link
Contributor

amkhar commented May 22, 2023

there is logic that must be run on data nodes. With caching isn't it possible that necessary validations could be skipped

@andrross Those important checks are still going to run on Cluster Manager node again even if node send success and the overall join flow will be stopped due that re-check on Cluster Manager node. Only thing which can be missed is the discovery plugin(s) level validation, which are not there any (#7321 (comment)).

Honestly I would feel better with a switch to completely turn off this best-effort validation (is that terrible idea?)

@andrross
If any of the existing discovery plugins(ec2, azure or gce) add any new validators in future, those won't run at all.

I don't like the non-determinism of caching. Thirty seconds of caching might work great the vast majority of the time but it might also introduce difficult to reproduce edge cases and non-deterministic behavior.

Let me run the experiments for cluster-state-version based caching where we invalidate the cache if cluster state is updated. I'll share the numbers soon.

@amkhar
Copy link
Contributor

amkhar commented May 22, 2023

Results with cluster-state-version based caching.
NOTE : Previous number for 30s interval based caching is changed now, as we've started using concurrent Cache instead of just normal hashmap(which I was doing before raising this PR). So, attaching those numbers as well.

Default - cluster state version based caching, 1.19%
cluster-state-based-cache-drops-state-alloc-22may-8

30s interval based caching, 0.16%
30s-time-based-cache-drops-state-alloc-22may-12

10s interval based caching, 0.55%
10s-time-based-cache-drops-state-alloc-22may-14

2s interval based caching, 1.37%
2s-time-based-cache-drops-state-alloc-22may-16

So it looks like cluster-state-version based caching is providing gains similar to 3s interval based caching (considering the difference in 1.37 and 1.19).

As in comparison to current prod code, the state-version based cache is still a good improvement, we should be good to go with that as default. According to previous runs of prod code the allocations were 13.67% without any of the caching and with cluster-state-version based caching it's 1.19%. (Screenshot of flamegraph - #5100 (comment))

Attaching actual profile files also.

default-version-based-cache-code-drops-state-alloc-22-m-8.txt

cached-code-drops-state-alloc-22-m-12-30s.txt

cached-code-drops-state-alloc-22-m-14-10s.txt

cached-code-drops-state-alloc-22-m-16-2s.txt

@amkhar
Copy link
Contributor

amkhar commented May 22, 2023

@andrross @shwetathareja @Bukhtawar

One of my assumption that cluster state would get updated in few milliseconds was wrong, and it may take 3-4 seconds also sometimes after sendValidateJoin request is sent to data node. So we're seeing gains similar to that much time interval based caching. I think as 1.19% is anyway a good amount of improvement in comparison to 13.67% before(current prod), we can go ahead with this as default behaviour where no old state is sent and we always refresh cache if version is updated. We can keep it with upcoming 3.0/4.0 and no need to change current external behaviour.

For the time based setting, we can keep default as 0 and if customer needs it, they can further use it to increase the interval based caching.

Also I think there will more due diligence required to make this call as optional so could be a good idea to take it in next PR as separate issue.
Let me know your thoughts.

@shwetathareja
Copy link
Member Author

shwetathareja commented May 22, 2023

Honestly I would feel better with a switch to completely turn off this best-effort validation (is that terrible idea?) versus the caching behavior.

@andrross : Yes I have been thinking about making it optional or very light weight. I mentioned in one of the comment above as well #5100 (comment). It was pending checking known plugins to see if there are any custom JoinValidators written and also no side effect on leader with node-join tasks.

The mention of stale data via cache makes everyone nervous but it is important to understand the context. Let me add more details to explain the flow to understand why and where in the flow this ValidateJoinRequest call exist (to the best of my knowledge :) ) before we decide on the final behavior.

  1. A new node is joining the cluster and sends the join request to the leader node. The leader node will handle this join request and run different validators to ensure node is compatible with current leader and index version
    onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
  2. Now, once all the above validators succeed, leader will send the ValidateJoinRequest to the node joining the cluster here -
    sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
  3. Now, on the node joining the leader, it runs these validators during the handling of ValidateJoinRequests here, which will also check clusterUUID -
    joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
    The validators on the node joining the cluster could be different from leader node as validators are pluggable using the Discovery Plugin. But, none of the plugin in OpenSearch distribution (ec2, gcp, azure discovery plugins) override the join validators. So, unless somebody is using some custom plugin of their own, this is simply overhead without value for others as built in validators are executed in Step 1.
    default BiConsumer<DiscoveryNode, ClusterState> getJoinValidator() {
    .
  4. So, the validators ran on both source node (joining the cluster) and leader node as of now in the join flow. But these checks are only best effort as there could be changes to cluster state when these checks are done vs when the node actually joins the cluster (the node is still not part of the cluster yet). So, in a way these checks could have been executed on stale cluster state. There is no deterministic time window on how stale it would be. So the question is why are these check still performed? These ensure if there are any obvious incompatibility due to source node version or indices version is caught upfront and the node-join request is not enqueued with leader node. The un-necessary node-join which are going to fail anyway can impact other critical tasks and also cause pending-tasks to spike.
  5. Now, the node-join is enqueued in the leader pending task queue here -
    clusterManagerService.submitStateUpdateTask(
  6. While processing this node-join in the JoinExecutor these validators are run again on the leader node. This run on latest cluster state always here
    if (enforceMajorVersion) {
    ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);
    }
    ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
    // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
    // we have to reject nodes that don't support all indices we have in this cluster
    ensureIndexCompatibility(node.getVersion(), currentState.getMetadata());
  7. Now, when any cluster state is published, the node joining the cluster run these validator again if they was cluster term was changed which means leader was switched. So, if it has joined a leader in the particular term, it assumes it is compatible.

Though i still feel stale state is not a concern, making it optional or light weight would definitely be better once we have more details around other plugins (if any) overriding these join validators.

@andrross
Copy link
Member

@amkhar @shwetathareja It looks like we get most of the gains with the cluster state version-based logic, right? I like that approach because it is an implementation optimization that doesn't change the behavior. I'd be in favor or moving forward with that change and then we can look at more intrusive optimizations if this still proves to be problematic.

@andrross
Copy link
Member

The validators on the node joining the cluster could be different from leader node as validators are pluggable using the Discovery Plugin. But, none of the plugin in OpenSearch distribution (ec2, gcp, azure discovery plugins) override the join validators. So, unless somebody is using some custom plugin of their own, this is simply overhead without value for others as built in validators are executed in Step 1.

@shwetathareja Thanks for the detailed explanation! Should the pluggable validator part of the API be deprecated? Given that adhere to semantic versioning, then we have to continue supporting this behavior even if it adds overhead. If we deprecate it now then we can potentially remove it in 3.0 if there are no use cases for this API. Alternatively we could allow the plugins to opt out of this check. All the bundled plugins that don't implement custom logic could opt out, whereas any existing external plugin would continue to see the same behavior.

@amkhar
Copy link
Contributor

amkhar commented May 23, 2023

It looks like we get most of the gains with the cluster state version-based logic, right?

Yes @andrross , I'll go ahead with this change only, for now. And remove the setting as we're also exploring of opting out of this check altogether.

Opened another issue so we can close the discussion about deprecation and making this check optional. #7678

@shwetathareja
Copy link
Member Author

shwetathareja commented May 23, 2023

@shwetathareja Thanks for the detailed explanation! Should the pluggable validator part of the API be deprecated? Given that adhere to semantic versioning, then we have to continue supporting this behavior even if it adds overhead. If we deprecate it now then we can potentially remove it in 3.0 if there are no use cases for this API. Alternatively we could allow the plugins to opt out of this check. All the bundled plugins that don't implement custom logic could opt out, whereas any existing external plugin would continue to see the same behavior.

@andrross : Yes, we need to get enough information on how customers are implementing this JoinValidator in their custom plugin and use case around it before we should deprecate the pluggable support in the plugin. As an alternative, I am thinking to auto opt out of this ValidateJoinRequest by dynamic checking if JoinValidators contains only buildInValidators, that means no plugin is overriding and every node during join will send that flag to Leader and accordingly leader would skip the ValidateJoinRequest altogether. This way customers/ plugins need not change.

@andrross
Copy link
Member

@amkhar @shwetathareja Thanks, good discussion!

@andrross
Copy link
Member

Completed by #7321

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request Performance This is for any performance related enhancements or bugs
Projects
None yet
Development

No branches or pull requests

4 participants