Skip to content

Commit

Permalink
Enhance CheckpointState to support no-op replication (#5282)
Browse files Browse the repository at this point in the history
* CheckpointState enhanced to support no-op replication

Signed-off-by: Ashish Singh <[email protected]>
Co-authored-by: Bukhtawar Khan<[email protected]>
  • Loading branch information
ashking94 authored Dec 7, 2022
1 parent 5500114 commit 1069660
Show file tree
Hide file tree
Showing 18 changed files with 1,541 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
import java.util.function.Function;
import java.util.function.LongSupplier;

import org.opensearch.action.support.replication.ReplicationMode;

/**
* Performs shard-level bulk (index, delete or update) operations
*
Expand Down Expand Up @@ -193,6 +195,14 @@ protected long primaryOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

@Override
protected ReplicationMode getReplicationMode(IndexShard indexShard) {
if (indexShard.isRemoteTranslogEnabled()) {
return ReplicationMode.PRIMARY_TERM_VALIDATION;
}
return super.getReplicationMode(indexShard);
}

public static void performOnPrimary(
BulkShardRequest request,
IndexShard primary,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

import org.opensearch.cluster.routing.ShardRouting;

/**
* This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing if
* it is not the primary and has replication mode as {@link ReplicationMode#FULL_REPLICATION}.
*
* @opensearch.internal
*/
public class FanoutReplicationProxy<ReplicaRequest> extends ReplicationProxy<ReplicaRequest> {

@Override
ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) {
return shardRouting.isSameAllocation(primaryRouting) == false ? ReplicationMode.FULL_REPLICATION : ReplicationMode.NO_REPLICATION;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

/**
* The type of replication used for inter-node replication.
*
* @opensearch.internal
*/
public enum ReplicationMode {
/**
* In this mode, a {@code TransportReplicationAction} is fanned out to underlying concerned shard and is replicated logically.
* In short, this mode would replicate the {@link ReplicationRequest} to
* the replica shard along with primary term validation.
*/
FULL_REPLICATION,
/**
* In this mode, a {@code TransportReplicationAction} is fanned out to underlying concerned shard and used for
* primary term validation only. The request is not replicated logically.
*/
PRIMARY_TERM_VALIDATION,
/**
* In this mode, a {@code TransportReplicationAction} does not fan out to the underlying concerned shard.
*/
NO_REPLICATION;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

import org.opensearch.cluster.routing.ShardRouting;

import java.util.Objects;

/**
* This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing basis
* the shard routing's replication mode and replication override policy.
*
* @opensearch.internal
*/
public class ReplicationModeAwareProxy<ReplicaRequest> extends ReplicationProxy<ReplicaRequest> {

private final ReplicationMode replicationModeOverride;

public ReplicationModeAwareProxy(ReplicationMode replicationModeOverride) {
assert Objects.nonNull(replicationModeOverride);
this.replicationModeOverride = replicationModeOverride;
}

@Override
ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) {

// If the current routing is the primary, then it does not need to be replicated
if (shardRouting.isSameAllocation(primaryRouting)) {
return ReplicationMode.NO_REPLICATION;
}

if (primaryRouting.relocating() && shardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) {
return ReplicationMode.FULL_REPLICATION;
}

return replicationModeOverride;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.Assertions;
import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.action.support.TransportActions;
import org.opensearch.action.support.replication.ReplicationProxyRequest.Builder;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class ReplicationOperation<
private final TimeValue initialRetryBackoffBound;
private final TimeValue retryTimeout;
private final long primaryTerm;
private final ReplicationProxy<ReplicaRequest> replicationProxy;

// exposed for tests
private final ActionListener<PrimaryResultT> resultListener;
Expand All @@ -117,7 +119,8 @@ public ReplicationOperation(
String opType,
long primaryTerm,
TimeValue initialRetryBackoffBound,
TimeValue retryTimeout
TimeValue retryTimeout,
ReplicationProxy<ReplicaRequest> replicationProxy
) {
this.replicasProxy = replicas;
this.primary = primary;
Expand All @@ -129,6 +132,7 @@ public ReplicationOperation(
this.primaryTerm = primaryTerm;
this.initialRetryBackoffBound = initialRetryBackoffBound;
this.retryTimeout = retryTimeout;
this.replicationProxy = replicationProxy;
}

public void execute() throws Exception {
Expand Down Expand Up @@ -226,20 +230,26 @@ private void performOnReplicas(

final ShardRouting primaryRouting = primary.routingEntry();

for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions);
}
for (final ShardRouting shardRouting : replicationGroup.getReplicationTargets()) {
ReplicationProxyRequest<ReplicaRequest> proxyRequest = new Builder<ReplicaRequest>(
shardRouting,
primaryRouting,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
pendingReplicationActions,
replicaRequest
).build();
replicationProxy.performOnReplicaProxy(proxyRequest, this::performOnReplica);
}
}

private void performOnReplica(
final ShardRouting shard,
final ReplicaRequest replicaRequest,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final PendingReplicationActions pendingReplicationActions
) {
private void performOnReplica(final ReplicationProxyRequest<ReplicaRequest> replicationProxyRequest) {
final ShardRouting shard = replicationProxyRequest.getShardRouting();
final ReplicaRequest replicaRequest = replicationProxyRequest.getReplicaRequest();
final long globalCheckpoint = replicationProxyRequest.getGlobalCheckpoint();
final long maxSeqNoOfUpdatesOrDeletes = replicationProxyRequest.getMaxSeqNoOfUpdatesOrDeletes();
final PendingReplicationActions pendingReplicationActions = replicationProxyRequest.getPendingReplicationActions();

if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

import org.opensearch.cluster.routing.ShardRouting;

import java.util.function.Consumer;

/**
* Used for performing any replication operation on replicas. Depending on the implementation, the replication call
* can fanout or stops here.
*
* @opensearch.internal
*/
public abstract class ReplicationProxy<ReplicaRequest> {

/**
* Depending on the actual implementation and the passed {@link ReplicationMode}, the replication
* mode is determined using which the replication request is performed on the replica or not.
*
* @param proxyRequest replication proxy request
* @param originalPerformOnReplicaConsumer original performOnReplica method passed as consumer
*/
public void performOnReplicaProxy(
ReplicationProxyRequest<ReplicaRequest> proxyRequest,
Consumer<ReplicationProxyRequest<ReplicaRequest>> originalPerformOnReplicaConsumer
) {
ReplicationMode replicationMode = determineReplicationMode(proxyRequest.getShardRouting(), proxyRequest.getPrimaryRouting());
// If the replication modes are 1. Logical replication or 2. Primary term validation, we let the call get performed on the
// replica shard.
if (replicationMode == ReplicationMode.FULL_REPLICATION || replicationMode == ReplicationMode.PRIMARY_TERM_VALIDATION) {
originalPerformOnReplicaConsumer.accept(proxyRequest);
}
}

/**
* Determines what is the replication mode basis the constructor arguments of the implementation and the current
* replication mode aware shard routing.
*
* @param shardRouting replication mode aware ShardRouting
* @param primaryRouting primary ShardRouting
* @return the determined replication mode.
*/
abstract ReplicationMode determineReplicationMode(final ShardRouting shardRouting, final ShardRouting primaryRouting);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

import org.opensearch.index.shard.IndexShard;

/**
* Factory that returns the {@link ReplicationProxy} instance basis the {@link ReplicationMode}.
*
* @opensearch.internal
*/
public class ReplicationProxyFactory {

public static <ReplicaRequest> ReplicationProxy<ReplicaRequest> create(
final IndexShard indexShard,
final ReplicationMode replicationModeOverride
) {
if (indexShard.isRemoteTranslogEnabled()) {
return new ReplicationModeAwareProxy<>(replicationModeOverride);
}
return new FanoutReplicationProxy<>();
}
}
Loading

0 comments on commit 1069660

Please sign in to comment.