Skip to content

Commit

Permalink
Don’t ack if unable to remove failing replica (#39584)
Browse files Browse the repository at this point in the history
Today when a replicated write operation fails to execute on a replica,
the primary will reach out to the master to fail that replica (and mark
it stale). We then won't ack that request until the master removes the
failing replica; otherwise, we will lose the acked operation if the
failed replica is still in the in-sync set. However, if a node with the
primary is shutting down, we might ack such request even though we are
unable to send a shard-failure request to the master. This happens
because we ignore NodeClosedException which is triggered when the
ClusterService is being closed.

Closes #39467
  • Loading branch information
dnhatn committed Mar 7, 2019
1 parent 575a861 commit 517d03c
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
Expand Down Expand Up @@ -210,10 +209,18 @@ class ResyncActionReplicasProxy extends ReplicasProxy {
}

@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
listener.onResponse(null);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportException;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -43,7 +47,6 @@
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
Expand Down Expand Up @@ -133,10 +136,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
);
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
}

Expand Down Expand Up @@ -192,20 +192,32 @@ public void onFailure(Exception replicaException) {
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
replicasProxy.failShardIfNeeded(shard, message, replicaException,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
});
}

private void onPrimaryDemoted(Exception demotionFailure) {
String primaryFail = String.format(Locale.ROOT,
"primary shard [%s] was demoted while failing replica shard",
primary.routingEntry());
// we are no longer the primary, fail ourselves and start over
primary.failShard(primaryFail, demotionFailure);
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure));
private void onNoLongerPrimary(Exception failure) {
final boolean nodeIsClosing = failure instanceof NodeClosedException ||
(failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage()));
final String message;
if (nodeIsClosing) {
message = String.format(Locale.ROOT,
"node with primary [%s] is shutting down while failing replica shard", primary.routingEntry());
// We prefer not to fail the primary to avoid unnecessary warning log
// when the node with the primary shard is gracefully shutting down.
} else {
if (Assertions.ENABLED) {
if (failure instanceof ShardStateAction.NoLongerPrimaryShardException == false) {
throw new AssertionError("unexpected failure", failure);
}
}
// we are no longer the primary, fail ourselves and start over
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard", primary.routingEntry());
primary.failShard(message, failure);
}
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
}

/**
Expand Down Expand Up @@ -365,31 +377,23 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo
* of active shards. Whether a failure is needed is left up to the
* implementation.
*
* @param replica shard to fail
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
* @param replica shard to fail
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener);

/**
* Marks shard copy as stale if needed, removing its allocation id from
* the set of in-sync allocation ids. Whether marking as stale is needed
* is left up to the implementation.
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set.
* @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted
* by the master.
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
Expand Down Expand Up @@ -1177,47 +1176,21 @@ public void performOn(
}

@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
// This does not need to fail the shard. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to fail.
onSuccess.run();
listener.onResponse(null);
}

@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
// This does not need to make the shard stale. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to be marked as stale.
onSuccess.run();
}

protected final ShardStateAction.Listener createShardActionListener(final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onSuccess.run();
}

@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onPrimaryDemoted.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
}
};
listener.onResponse(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -376,20 +375,36 @@ class WriteActionReplicasProxy extends ReplicasProxy {
}

@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
if (TransportActions.isShardNotAvailableException(exception) == false) {
logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
}
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
listener.onResponse(null);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
listener.onResponse(null);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}
Loading

0 comments on commit 517d03c

Please sign in to comment.