Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance CheckpointState to support no-op replication #5282

Merged
merged 36 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
46e1fc5
CheckpointState enhanced to support no-op replication
ashking94 Nov 17, 2022
d8bdb51
Force replication for remote translog enabled indices
ashking94 Nov 17, 2022
619857b
Fix IT failures
ashking94 Nov 21, 2022
76bfac3
Fix Rolling Upgrade IT failures
ashking94 Nov 24, 2022
53add62
Add UT - testGlobalCheckpointUpdateWithRemoteTranslogEnabled
ashking94 Nov 30, 2022
ca9a3fa
Fix spotlessJavaCheck
ashking94 Dec 1, 2022
1c71cae
Add UT
ashking94 Dec 1, 2022
0b812f8
Handle Retention Lease creation during activatePrimaryMode
ashking94 Dec 1, 2022
fb96c02
Add UTs
ashking94 Dec 1, 2022
d849b0e
Handle assertion in markAllocationIdAsInSync for remote store index's…
ashking94 Dec 1, 2022
6523c4d
Add UTs
ashking94 Dec 1, 2022
71f0519
Add UTs
ashking94 Dec 1, 2022
8318e78
Add UTs
ashking94 Dec 1, 2022
043bfa1
Add UTs
ashking94 Dec 1, 2022
71ade98
Add UTs
ashking94 Dec 1, 2022
62d5959
Add UTs
ashking94 Dec 1, 2022
e76ff6f
Refactored UT test cases
ashking94 Dec 1, 2022
ad831e3
Add UTs with respect to ReplicationOperation changes
ashking94 Dec 2, 2022
73113e1
Add PerformOnReplicaProxy for extensibility in replication flow
ashking94 Dec 2, 2022
c6d8e18
Refactored low level design & generalised replication concepts
ashking94 Dec 5, 2022
b252dbc
Fix failing tests
ashking94 Dec 5, 2022
0f29d75
Empty-Commit
ashking94 Dec 5, 2022
f3f63e8
Incorporate PR review feedback
ashking94 Dec 5, 2022
897ef1c
Empty-Commit
ashking94 Dec 6, 2022
d30afb3
Incorporate PR review feedback
ashking94 Dec 6, 2022
16dc3fb
Incorporate PR review feedback
ashking94 Dec 6, 2022
af59752
Fix test failure
ashking94 Dec 6, 2022
4aa2790
Incorporate PR review feedback
ashking94 Dec 6, 2022
6e107f6
Empty-Commit
ashking94 Dec 6, 2022
5a6fea1
Empty-Commit
ashking94 Dec 6, 2022
366bfb0
Incorporate PR review feedback
ashking94 Dec 6, 2022
3b22b38
Incorporate PR review feedback
ashking94 Dec 7, 2022
2081f00
Incorporate PR review feedback
ashking94 Dec 7, 2022
7b0ce5c
Tuned UTs
ashking94 Dec 7, 2022
3c3fb27
Incorporated PR review feedback
ashking94 Dec 7, 2022
bfbb71f
Incorporated PR review feedback
ashking94 Dec 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;

import static org.opensearch.action.support.replication.ReplicationOperation.ReplicationOverridePolicy;
import static org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;

/**
* Performs shard-level bulk (index, delete or update) operations
*
Expand Down Expand Up @@ -193,6 +197,14 @@ protected long primaryOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

@Override
protected Optional<ReplicationOverridePolicy> getReplicationOverridePolicy(IndexShard indexShard) {
if (indexShard.isRemoteTranslogEnabled()) {
return Optional.of(new ReplicationOverridePolicy(ReplicationMode.PRIMARY_TERM_VALIDATION));
}
return super.getReplicationOverridePolicy(indexShard);
}

public static void performOnPrimary(
BulkShardRequest request,
IndexShard primary,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.Assertions;
import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.support.ActiveShardCount;
Expand All @@ -50,8 +50,10 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.index.shard.ReplicationGroup.ReplicationModeAwareShardRouting;
import org.opensearch.index.shard.ShardId;
import org.opensearch.node.NodeClosedException;
import org.opensearch.rest.RestStatus;
Expand All @@ -63,6 +65,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -99,6 +103,7 @@ public class ReplicationOperation<
private final TimeValue initialRetryBackoffBound;
private final TimeValue retryTimeout;
private final long primaryTerm;
private final ReplicationProxy replicationProxy;

// exposed for tests
private final ActionListener<PrimaryResultT> resultListener;
Expand All @@ -117,7 +122,8 @@ public ReplicationOperation(
String opType,
long primaryTerm,
TimeValue initialRetryBackoffBound,
TimeValue retryTimeout
TimeValue retryTimeout,
Optional<ReplicationOverridePolicy> overridePolicy
) {
this.replicasProxy = replicas;
this.primary = primary;
Expand All @@ -129,6 +135,7 @@ public ReplicationOperation(
this.primaryTerm = primaryTerm;
this.initialRetryBackoffBound = initialRetryBackoffBound;
this.retryTimeout = retryTimeout;
this.replicationProxy = new ReplicationProxyFactory().create(overridePolicy);
}

public void execute() throws Exception {
Expand Down Expand Up @@ -226,10 +233,158 @@ private void performOnReplicas(

final ShardRouting primaryRouting = primary.routingEntry();

for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions);
for (final ReplicationModeAwareShardRouting shardRouting : replicationGroup.getReplicationTargets()) {
replicationProxy.performOnReplica(
shardRouting,
primaryRouting,
replicaRequest,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
pendingReplicationActions
);
}
}

/**
* Factory that return the {@link ReplicationProxy} instance basis the {@link ReplicationModeAwareOverrideProxy}.
*
* @opensearch.internal
*/
private class ReplicationProxyFactory {
ReplicationProxy create(final Optional<ReplicationOverridePolicy> overridePolicy) {
if (overridePolicy.isEmpty()) {
return new FanoutReplicationProxy();
} else {
return new ReplicationModeAwareOverrideProxy(overridePolicy.get());
}
}
}

/**
* Used for performing any replication operation on replicas. Depending on the implementation, the replication call
* can fanout or stops here.
*
* @opensearch.internal
*/
private abstract class ReplicationProxy {

/**
* Depending on the actual implementation and the passed {@link ReplicationModeAwareShardRouting}, the replication
* mode is determined using which the replication request is performed on the replica or not.
*
* @param shardRouting replication mode aware ShardRouting
* @param primaryRouting primary ShardRouting
* @param replicaRequest replication request
* @param globalCheckpoint current global checkpoint on primary
* @param maxSeqNoOfUpdatesOrDeletes maxSeqNoOfUpdatesOrDeletes
* @param pendingReplicationActions pendingReplicationActions
*/
private void performOnReplica(
final ReplicationModeAwareShardRouting shardRouting,
final ShardRouting primaryRouting,
final ReplicaRequest replicaRequest,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final PendingReplicationActions pendingReplicationActions
) {
ReplicationMode replicationMode = determineReplicationMode(shardRouting, primaryRouting);
// If the replication modes are 1. Logical replication or 2. Primary term validation, we let the call get performed on the
// replica shard.
if (replicationMode == ReplicationMode.LOGICAL_REPLICATION || replicationMode == ReplicationMode.PRIMARY_TERM_VALIDATION) {
ReplicationOperation.this.performOnReplica(
shardRouting.getShardRouting(),
replicaRequest,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we send to the same replicaRequest in case of LOGICAL_REPLICATION and PRIMARY_TERM_VALIDATION ?

Copy link
Member Author

@ashking94 ashking94 Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for full replication and primary term validation both, we send the request to the replica shard. There will be a follow up PR that would bring a thin layer for primary term validation (basically perform primary term valdiation upfront and fail fast if stale).

globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
pendingReplicationActions
);
}
}

/**
* Determines what is the replication mode basis the constructor arguments of the implementation and the current
* replication mode aware shard routing.
*
* @param shardRouting replication mode aware ShardRouting
* @param primaryRouting primary ShardRouting
* @return the determined replication mode.
*/
abstract ReplicationMode determineReplicationMode(
final ReplicationModeAwareShardRouting shardRouting,
final ShardRouting primaryRouting
);
}

/**
* This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing if
* it is not the primary and has replication mode as {@link ReplicationMode#LOGICAL_REPLICATION}.
*
* @opensearch.internal
*/
private class FanoutReplicationProxy extends ReplicationProxy {

private FanoutReplicationProxy() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the default constructor defined explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


}

@Override
ReplicationMode determineReplicationMode(ReplicationModeAwareShardRouting shardRouting, ShardRouting primaryRouting) {
return shardRouting.getShardRouting().isSameAllocation(primaryRouting) == false
? ReplicationMode.LOGICAL_REPLICATION
: ReplicationMode.NONE;
}
}

/**
* This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing basis
* the shard routing's replication mode and replication override policy.
*
* @opensearch.internal
*/
private class ReplicationModeAwareOverrideProxy extends ReplicationProxy {

private final ReplicationOverridePolicy overridePolicy;

private ReplicationModeAwareOverrideProxy(ReplicationOverridePolicy overridePolicy) {
assert Objects.nonNull(overridePolicy);
this.overridePolicy = overridePolicy;
}

@Override
ReplicationMode determineReplicationMode(ReplicationModeAwareShardRouting shardRouting, ShardRouting primaryRouting) {
ShardRouting currentRouting = shardRouting.getShardRouting();

// If the current routing is the primary, then it does not need to be replicated
if (currentRouting.isSameAllocation(primaryRouting)) {
return ReplicationMode.NONE;
}

// If the current routing's replication mode is not NONE, then we return the original replication mode.
if (shardRouting.getReplicationMode() != ReplicationMode.NONE) {
return shardRouting.getReplicationMode();
}

// If the current routing's replication mode is none, then we check for override and return overridden mode.
if (Objects.nonNull(overridePolicy)) {
return overridePolicy.overriddenMode;
}

// At the end, return NONE.
return ReplicationMode.NONE;
}
}

/**
* Defines the replication override policy which individual {@link TransportReplicationAction} can implement.
*
* @opensearch.internal
*/
public static class ReplicationOverridePolicy {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this abstraction of ReplicationOverridePolicy over ReplicationMode required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for extensibility. Currently we have only the replication mode as a field, but this can expand for future usecases. For the sake of this PR, consider ReplicationOverridePolicy synonymous to ReplicationMode.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use ReplicationMode for now ? Later on we can add abstraction if needed .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep it. I see cases where depending on flavours of replication mode, we might want to add may be couple of more things - dynamic or static in nature. Since this has huge penetration in tests and other places, i feel we can keep this unless there is a strong reason not to do so.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a good use case to keep ReplicationOverridePolicy at this point, we can add as needed as of now it just adds a level of indirection

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, will make the change.


private final ReplicationMode overriddenMode;

public ReplicationOverridePolicy(ReplicationMode overriddenMode) {
this.overriddenMode = Objects.requireNonNull(overriddenMode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.action.support.TransportAction;
import org.opensearch.action.support.TransportActions;
import org.opensearch.action.support.replication.ReplicationOperation.ReplicationOverridePolicy;
import org.opensearch.client.transport.NoNodeAvailableException;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
Expand All @@ -70,6 +71,7 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.seqno.ReplicationTracker.ReplicationMode;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
Expand All @@ -94,6 +96,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -258,6 +261,15 @@ protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy() {
return new ReplicasProxy();
}

/**
* This method is used for defining the {@link ReplicationMode} override policy per {@link TransportReplicationAction}.
* @param indexShard index shard used to determining the policy.
* @return the override policy.
*/
protected Optional<ReplicationOverridePolicy> getReplicationOverridePolicy(IndexShard indexShard) {
return Optional.empty();
}

protected abstract Response newResponseInstance(StreamInput in) throws IOException;

/**
Expand Down Expand Up @@ -533,7 +545,8 @@ public void handleException(TransportException exp) {
actionName,
primaryRequest.getPrimaryTerm(),
initialRetryBackoffBound,
retryTimeout
retryTimeout,
getReplicationOverridePolicy(primaryShardReference.indexShard)
).execute();
}
} catch (Exception e) {
Expand Down
Loading