Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance CheckpointState to support no-op replication #5282

Merged
merged 36 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
46e1fc5
CheckpointState enhanced to support no-op replication
ashking94 Nov 17, 2022
d8bdb51
Force replication for remote translog enabled indices
ashking94 Nov 17, 2022
619857b
Fix IT failures
ashking94 Nov 21, 2022
76bfac3
Fix Rolling Upgrade IT failures
ashking94 Nov 24, 2022
53add62
Add UT - testGlobalCheckpointUpdateWithRemoteTranslogEnabled
ashking94 Nov 30, 2022
ca9a3fa
Fix spotlessJavaCheck
ashking94 Dec 1, 2022
1c71cae
Add UT
ashking94 Dec 1, 2022
0b812f8
Handle Retention Lease creation during activatePrimaryMode
ashking94 Dec 1, 2022
fb96c02
Add UTs
ashking94 Dec 1, 2022
d849b0e
Handle assertion in markAllocationIdAsInSync for remote store index's…
ashking94 Dec 1, 2022
6523c4d
Add UTs
ashking94 Dec 1, 2022
71f0519
Add UTs
ashking94 Dec 1, 2022
8318e78
Add UTs
ashking94 Dec 1, 2022
043bfa1
Add UTs
ashking94 Dec 1, 2022
71ade98
Add UTs
ashking94 Dec 1, 2022
62d5959
Add UTs
ashking94 Dec 1, 2022
e76ff6f
Refactored UT test cases
ashking94 Dec 1, 2022
ad831e3
Add UTs with respect to ReplicationOperation changes
ashking94 Dec 2, 2022
73113e1
Add PerformOnReplicaProxy for extensibility in replication flow
ashking94 Dec 2, 2022
c6d8e18
Refactored low level design & generalised replication concepts
ashking94 Dec 5, 2022
b252dbc
Fix failing tests
ashking94 Dec 5, 2022
0f29d75
Empty-Commit
ashking94 Dec 5, 2022
f3f63e8
Incorporate PR review feedback
ashking94 Dec 5, 2022
897ef1c
Empty-Commit
ashking94 Dec 6, 2022
d30afb3
Incorporate PR review feedback
ashking94 Dec 6, 2022
16dc3fb
Incorporate PR review feedback
ashking94 Dec 6, 2022
af59752
Fix test failure
ashking94 Dec 6, 2022
4aa2790
Incorporate PR review feedback
ashking94 Dec 6, 2022
6e107f6
Empty-Commit
ashking94 Dec 6, 2022
5a6fea1
Empty-Commit
ashking94 Dec 6, 2022
366bfb0
Incorporate PR review feedback
ashking94 Dec 6, 2022
3b22b38
Incorporate PR review feedback
ashking94 Dec 7, 2022
2081f00
Incorporate PR review feedback
ashking94 Dec 7, 2022
7b0ce5c
Tuned UTs
ashking94 Dec 7, 2022
3c3fb27
Incorporated PR review feedback
ashking94 Dec 7, 2022
bfbb71f
Incorporated PR review feedback
ashking94 Dec 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
import java.util.function.Function;
import java.util.function.LongSupplier;

import org.opensearch.action.support.replication.ReplicationMode;

/**
* Performs shard-level bulk (index, delete or update) operations
*
Expand Down Expand Up @@ -193,6 +195,14 @@ protected long primaryOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

@Override
protected ReplicationMode getReplicationMode(IndexShard indexShard) {
if (indexShard.isRemoteTranslogEnabled()) {
return ReplicationMode.PRIMARY_TERM_VALIDATION;
}
return super.getReplicationMode(indexShard);
}

public static void performOnPrimary(
BulkShardRequest request,
IndexShard primary,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

import org.opensearch.cluster.routing.ShardRouting;

/**
* This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing if
* it is not the primary and has replication mode as {@link ReplicationMode#FULL_REPLICATION}.
*
* @opensearch.internal
*/
public class FanoutReplicationProxy<ReplicaRequest> extends ReplicationProxy<ReplicaRequest> {

@Override
ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) {
return shardRouting.isSameAllocation(primaryRouting) == false ? ReplicationMode.FULL_REPLICATION : ReplicationMode.NO_REPLICATION;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

/**
* The type of replication used for inter-node replication.
*
* @opensearch.internal
*/
public enum ReplicationMode {
/**
* In this mode, a {@code TransportReplicationAction} is fanned out to underlying concerned shard and is replicated logically.
* In short, this mode would replicate the {@link ReplicationRequest} to
* the replica shard along with primary term validation.
*/
FULL_REPLICATION,
/**
* In this mode, a {@code TransportReplicationAction} is fanned out to underlying concerned shard and used for
* primary term validation only. The request is not replicated logically.
*/
PRIMARY_TERM_VALIDATION,
/**
* In this mode, a {@code TransportReplicationAction} does not fan out to the underlying concerned shard.
*/
NO_REPLICATION;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

import org.opensearch.cluster.routing.ShardRouting;

import java.util.Objects;

/**
* This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing basis
* the shard routing's replication mode and replication override policy.
*
* @opensearch.internal
*/
public class ReplicationModeAwareProxy<ReplicaRequest> extends ReplicationProxy<ReplicaRequest> {

private final ReplicationMode replicationModeOverride;

public ReplicationModeAwareProxy(ReplicationMode replicationModeOverride) {
assert Objects.nonNull(replicationModeOverride);
this.replicationModeOverride = replicationModeOverride;
}

@Override
ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) {

// If the current routing is the primary, then it does not need to be replicated
if (shardRouting.isSameAllocation(primaryRouting)) {
return ReplicationMode.NO_REPLICATION;
}

if (primaryRouting.relocating() && shardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) {
return ReplicationMode.FULL_REPLICATION;
}

return replicationModeOverride;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.Assertions;
import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.action.support.TransportActions;
import org.opensearch.action.support.replication.ReplicationProxyRequest.Builder;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class ReplicationOperation<
private final TimeValue initialRetryBackoffBound;
private final TimeValue retryTimeout;
private final long primaryTerm;
private final ReplicationProxy<ReplicaRequest> replicationProxy;

// exposed for tests
private final ActionListener<PrimaryResultT> resultListener;
Expand All @@ -117,7 +119,8 @@ public ReplicationOperation(
String opType,
long primaryTerm,
TimeValue initialRetryBackoffBound,
TimeValue retryTimeout
TimeValue retryTimeout,
ReplicationProxy<ReplicaRequest> replicationProxy
) {
this.replicasProxy = replicas;
this.primary = primary;
Expand All @@ -129,6 +132,7 @@ public ReplicationOperation(
this.primaryTerm = primaryTerm;
this.initialRetryBackoffBound = initialRetryBackoffBound;
this.retryTimeout = retryTimeout;
this.replicationProxy = replicationProxy;
}

public void execute() throws Exception {
Expand Down Expand Up @@ -226,20 +230,26 @@ private void performOnReplicas(

final ShardRouting primaryRouting = primary.routingEntry();

for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions);
}
for (final ShardRouting shardRouting : replicationGroup.getReplicationTargets()) {
ReplicationProxyRequest<ReplicaRequest> proxyRequest = new Builder<ReplicaRequest>(
shardRouting,
primaryRouting,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
pendingReplicationActions,
replicaRequest
).build();
replicationProxy.performOnReplicaProxy(proxyRequest, this::performOnReplica);
}
}

private void performOnReplica(
final ShardRouting shard,
final ReplicaRequest replicaRequest,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final PendingReplicationActions pendingReplicationActions
) {
private void performOnReplica(final ReplicationProxyRequest<ReplicaRequest> replicationProxyRequest) {
final ShardRouting shard = replicationProxyRequest.getShardRouting();
final ReplicaRequest replicaRequest = replicationProxyRequest.getReplicaRequest();
final long globalCheckpoint = replicationProxyRequest.getGlobalCheckpoint();
final long maxSeqNoOfUpdatesOrDeletes = replicationProxyRequest.getMaxSeqNoOfUpdatesOrDeletes();
final PendingReplicationActions pendingReplicationActions = replicationProxyRequest.getPendingReplicationActions();

if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

import org.opensearch.cluster.routing.ShardRouting;

import java.util.function.Consumer;

/**
* Used for performing any replication operation on replicas. Depending on the implementation, the replication call
* can fanout or stops here.
*
* @opensearch.internal
*/
public abstract class ReplicationProxy<ReplicaRequest> {

/**
* Depending on the actual implementation and the passed {@link ReplicationMode}, the replication
* mode is determined using which the replication request is performed on the replica or not.
*
* @param proxyRequest replication proxy request
* @param originalPerformOnReplicaConsumer original performOnReplica method passed as consumer
*/
public void performOnReplicaProxy(
ReplicationProxyRequest<ReplicaRequest> proxyRequest,
Consumer<ReplicationProxyRequest<ReplicaRequest>> originalPerformOnReplicaConsumer
) {
ReplicationMode replicationMode = determineReplicationMode(proxyRequest.getShardRouting(), proxyRequest.getPrimaryRouting());
// If the replication modes are 1. Logical replication or 2. Primary term validation, we let the call get performed on the
// replica shard.
if (replicationMode == ReplicationMode.FULL_REPLICATION || replicationMode == ReplicationMode.PRIMARY_TERM_VALIDATION) {
originalPerformOnReplicaConsumer.accept(proxyRequest);
}
}

/**
* Determines what is the replication mode basis the constructor arguments of the implementation and the current
* replication mode aware shard routing.
*
* @param shardRouting replication mode aware ShardRouting
* @param primaryRouting primary ShardRouting
* @return the determined replication mode.
*/
abstract ReplicationMode determineReplicationMode(final ShardRouting shardRouting, final ShardRouting primaryRouting);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

import org.opensearch.index.shard.IndexShard;

/**
* Factory that returns the {@link ReplicationProxy} instance basis the {@link ReplicationMode}.
*
* @opensearch.internal
*/
public class ReplicationProxyFactory {

public static <ReplicaRequest> ReplicationProxy<ReplicaRequest> create(
final IndexShard indexShard,
final ReplicationMode replicationModeOverride
) {
if (indexShard.isRemoteTranslogEnabled()) {
return new ReplicationModeAwareProxy<>(replicationModeOverride);
}
return new FanoutReplicationProxy<>();
}
}
Loading