Skip to content

Commit

Permalink
Incorporated PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Dec 7, 2022
1 parent 3c3fb27 commit bfbb71f
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import java.util.function.Function;
import java.util.function.LongSupplier;

import static org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationMode;

/**
* Performs shard-level bulk (index, delete or update) operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

import org.opensearch.cluster.routing.ShardRouting;

import static org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;

/**
* This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing if
* it is not the primary and has replication mode as {@link ReplicationMode#FULL_REPLICATION}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.replication;

/**
* The type of replication used for inter-node replication.
*
* @opensearch.internal
*/
public enum ReplicationMode {
/**
* In this mode, a {@code TransportReplicationAction} is fanned out to underlying concerned shard and is replicated logically.
* In short, this mode would replicate the {@link ReplicationRequest} to
* the replica shard along with primary term validation.
*/
FULL_REPLICATION,
/**
* In this mode, a {@code TransportReplicationAction} is fanned out to underlying concerned shard and used for
* primary term validation only. The request is not replicated logically.
*/
PRIMARY_TERM_VALIDATION,
/**
* In this mode, a {@code TransportReplicationAction} does not fan out to the underlying concerned shard.
*/
NO_REPLICATION;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.action.support.replication;

import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
package org.opensearch.action.support.replication;

import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;
import org.opensearch.index.shard.ReplicationGroup.ReplicationModeAwareShardRouting;

import java.util.function.Consumer;

Expand All @@ -23,10 +21,10 @@
public abstract class ReplicationProxy<ReplicaRequest> {

/**
* Depending on the actual implementation and the passed {@link ReplicationModeAwareShardRouting}, the replication
* Depending on the actual implementation and the passed {@link ReplicationMode}, the replication
* mode is determined using which the replication request is performed on the replica or not.
*
* @param proxyRequest replication proxy request
* @param proxyRequest replication proxy request
* @param originalPerformOnReplicaConsumer original performOnReplica method passed as consumer
*/
public void performOnReplicaProxy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.action.support.replication;

import org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;
import org.opensearch.index.shard.IndexShard;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,52 +784,6 @@ public int hashCode() {
}
}

/**
* The type of replication used for inter-node replication.
*/
public enum ReplicationMode implements Writeable {
/**
* In this mode, a {@code TransportReplicationAction} is fanned out to underlying concerned shard and is replicated logically.
* In short, this mode would replicate the {@link org.opensearch.action.support.replication.ReplicationRequest} to
* the replica shard along with primary term validation.
*/
FULL_REPLICATION(0),
/**
* In this mode, a {@code TransportReplicationAction} is fanned out to underlying concerned shard and used for
* primary term validation only. The request is not replicated logically.
*/
PRIMARY_TERM_VALIDATION(1),
/**
* In this mode, a {@code TransportReplicationAction} does not fan out to the underlying concerned shard.
*/
NO_REPLICATION(2);

private final byte type;

ReplicationMode(int type) {
this.type = (byte) type;
}

public static ReplicationMode readFrom(StreamInput in) throws IOException {
byte value = in.readByte();
switch (value) {
case 0:
return FULL_REPLICATION;
case 1:
return PRIMARY_TERM_VALIDATION;
case 2:
return NO_REPLICATION;
default:
throw new IllegalArgumentException("No replication mode for value [" + value + "]");
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(type);
}
}

/**
* Get the local knowledge of the persisted global checkpoints for all in-sync allocation IDs.
*
Expand Down Expand Up @@ -1104,7 +1058,7 @@ private ReplicationGroup calculateReplicationGroup() {
newVersion = replicationGroup.getVersion() + 1;
}

assert indexSettings().isRemoteTranslogStoreEnabled() != false
assert indexSettings().isRemoteTranslogStoreEnabled()
|| checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated)
: "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
Expand Down Expand Up @@ -167,33 +165,4 @@ public String toString() {
+ '}';
}

/**
* Replication aware ShardRouting used for fanning out replication requests smartly.
*
* @opensearch.internal
*/
public static final class ReplicationModeAwareShardRouting {

private final ShardRouting shardRouting;

private final ReplicationMode replicationMode;

public ShardRouting getShardRouting() {
return shardRouting;
}

public ReplicationMode getReplicationMode() {
return replicationMode;
}

public ReplicationModeAwareShardRouting(final ReplicationMode replicationMode, final ShardRouting shardRouting) {
// ReplicationMode has to be non-null always.
assert Objects.nonNull(replicationMode);
// ShardRouting has to be non-null always.
assert Objects.nonNull(shardRouting);
this.replicationMode = replicationMode;
this.shardRouting = shardRouting;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.io.IOException;
import java.util.Objects;

import static org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationMode;

/**
* Replication action responsible for publishing checkpoint to a replica shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;
import org.opensearch.index.shard.IndexShardNotStartedException;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ReplicationGroup;
Expand Down

0 comments on commit bfbb71f

Please sign in to comment.