-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable acked indexing #17038
Enable acked indexing #17038
Changes from 1 commit
74194b8
57501ce
63ada98
25fae03
55465dd
51f2c3c
746ca07
4e39359
673a73d
285c3bf
7bb85e4
3353790
76465ec
4a7524f
2bd09c4
563304d
bd9e908
4793630
14ba0c3
37d739a
97be383
4e1f62e
2a93889
5576526
4de57fc
85d3d51
0e5b22a
c4324f9
649bcdc
27448dc
1f12bee
8b970d9
c7c8b1d
e201f5c
cffc315
7cdd647
3abf817
c2ed5a1
66cc202
95feb40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,6 @@ | |
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; | ||
import org.elasticsearch.discovery.Discovery; | ||
import org.elasticsearch.discovery.DiscoverySettings; | ||
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; | ||
import org.elasticsearch.discovery.zen.ZenDiscovery; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.BytesTransportRequest; | ||
|
@@ -64,6 +63,7 @@ | |
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* | ||
|
@@ -82,17 +82,22 @@ public interface NewPendingClusterStateListener { | |
} | ||
|
||
private final TransportService transportService; | ||
private final DiscoveryNodesProvider nodesProvider; | ||
private final Supplier<ClusterState> clusterStateSupplier; | ||
private final NewPendingClusterStateListener newPendingClusterStatelistener; | ||
private final DiscoverySettings discoverySettings; | ||
private final ClusterName clusterName; | ||
private final PendingClusterStatesQueue pendingStatesQueue; | ||
|
||
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, | ||
NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { | ||
public PublishClusterStateAction( | ||
Settings settings, | ||
TransportService transportService, | ||
Supplier<ClusterState> clusterStateSupplier, | ||
NewPendingClusterStateListener listener, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that node provider interface can go away (now that's were on java8 - function refs FTW) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought the same but can we keep it separate from this pull request? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for SURE! |
||
DiscoverySettings discoverySettings, | ||
ClusterName clusterName) { | ||
super(settings); | ||
this.transportService = transportService; | ||
this.nodesProvider = nodesProvider; | ||
this.clusterStateSupplier = clusterStateSupplier; | ||
this.newPendingClusterStatelistener = listener; | ||
this.discoverySettings = discoverySettings; | ||
this.clusterName = clusterName; | ||
|
@@ -364,7 +369,7 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request, | |
final ClusterState incomingState; | ||
// If true we received full cluster state - otherwise diffs | ||
if (in.readBoolean()) { | ||
incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode()); | ||
incomingState = ClusterState.Builder.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode()); | ||
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); | ||
} else if (lastSeenClusterState != null) { | ||
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in); | ||
|
@@ -395,19 +400,19 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus | |
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName); | ||
throw new IllegalStateException("received state from a node that is not part of the cluster"); | ||
} | ||
final DiscoveryNodes currentNodes = nodesProvider.nodes(); | ||
final DiscoveryNodes currentNodes = clusterStateSupplier.get().nodes(); | ||
|
||
if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { | ||
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode()); | ||
throw new IllegalStateException("received state from a node that is not part of the cluster"); | ||
throw new IllegalStateException("received state from local node that does not match the current local node"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we want it to read "received state with a local node that doesn't match the current one" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 7cdd647. |
||
} | ||
|
||
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); | ||
if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) { | ||
final String message = String.format( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very close to the new check. I'm not sure we need it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check rejects incoming states from the same master that are out of order. I do not think that the new check covers this case. I think it's needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the new check (based on the ClusterService.state()), a cluster state with a lower/equal version than the current state will be rejected. If it has a higher version it will get in. If it is lower than the last seen cluster state it means that it will be cleaned when the first of these happen:
Is there a case where the lastSeenClusterState.supersedes based protection helps and isn't covered by the above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Just to be clear, this is only if we are following a master. The line of code in question will reject older states from the same master that are out of order.
There is not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
Locale.ROOT, | ||
"received cluster state from current master superseded by last seen cluster state; " + | ||
"received version [%s] with uuid [%s], last seen version [%s] with uuid [%s]", | ||
"received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]", | ||
incomingState.version(), | ||
incomingState.stateUUID(), | ||
lastSeenClusterState.version(), | ||
|
@@ -416,6 +421,21 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus | |
logger.warn(message); | ||
throw new IllegalStateException(message); | ||
} | ||
|
||
final ClusterState state = clusterStateSupplier.get(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should strengthen ZenDiscovery.shouldIgnoreOrRejectNewClusterState to check for version equality (and assert uuid) and use it in line 410 then all of this can be change to just throwing an exception if it doesn't like it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if (state.nodes().getMasterNodeId() != null && incomingState.version() <= state.version()) { | ||
assert !incomingState.stateUUID().equals(state.stateUUID()); | ||
final String message = String.format( | ||
Locale.ROOT, | ||
"received cluster state older than current cluster state; " + | ||
"received version [%d] with uuid [%s], current version [%d]", | ||
incomingState.version(), | ||
incomingState.stateUUID(), | ||
state.version() | ||
); | ||
logger.warn(message); | ||
throw new IllegalStateException(message); | ||
} | ||
} | ||
|
||
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -195,10 +195,11 @@ public void testQueueStats() { | |
highestCommitted = context.state; | ||
} | ||
} | ||
assert highestCommitted != null; | ||
|
||
queue.markAsProcessed(highestCommitted); | ||
assertThat(queue.stats().getTotal(), equalTo(states.size() - committedContexts.size())); | ||
assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size())); | ||
assertThat((long)queue.stats().getTotal(), equalTo(states.size() - (1 + highestCommitted.version()))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bleskes Because uncommitted states with a lower version get cleaned now but they didn't before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doh. right. |
||
assertThat((long)queue.stats().getPending(), equalTo(states.size() - (1 + highestCommitted.version()))); | ||
assertThat(queue.stats().getCommitted(), equalTo(0)); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,7 +70,9 @@ | |
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Supplier; | ||
|
||
import static org.hamcrest.CoreMatchers.instanceOf; | ||
import static org.hamcrest.Matchers.containsString; | ||
import static org.hamcrest.Matchers.emptyIterable; | ||
import static org.hamcrest.Matchers.equalTo; | ||
|
@@ -161,7 +163,7 @@ public MockNode createMockNode(String name, Settings settings, Version version, | |
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings, version); | ||
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(service.boundAddress().publishAddress()); | ||
MockNode node = new MockNode(discoveryNode, service, listener, logger); | ||
node.action = buildPublishClusterStateAction(settings, service, node, node); | ||
node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node); | ||
final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1); | ||
TransportConnectionListener waitForConnection = new TransportConnectionListener() { | ||
@Override | ||
|
@@ -233,10 +235,21 @@ protected MockTransportService buildTransportService(Settings settings, Version | |
return transportService; | ||
} | ||
|
||
protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider, | ||
PublishClusterStateAction.NewPendingClusterStateListener listener) { | ||
DiscoverySettings discoverySettings = new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); | ||
return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT); | ||
protected MockPublishAction buildPublishClusterStateAction( | ||
Settings settings, | ||
MockTransportService transportService, | ||
Supplier<ClusterState> clusterStateSupplier, | ||
PublishClusterStateAction.NewPendingClusterStateListener listener | ||
) { | ||
DiscoverySettings discoverySettings = | ||
new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); | ||
return new MockPublishAction( | ||
settings, | ||
transportService, | ||
clusterStateSupplier, | ||
listener, | ||
discoverySettings, | ||
ClusterName.DEFAULT); | ||
} | ||
|
||
public void testSimpleClusterStatePublishing() throws Exception { | ||
|
@@ -598,18 +611,20 @@ public void testIncomingClusterStateValidation() throws Exception { | |
node.action.validateIncomingState(state, node.clusterState); | ||
fail("node accepted state from another master"); | ||
} catch (IllegalStateException OK) { | ||
assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting")); | ||
} | ||
|
||
logger.info("--> test state from the current master is accepted"); | ||
node.action.validateIncomingState(ClusterState.builder(node.clusterState) | ||
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(), node.clusterState); | ||
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).incrementVersion().build(), node.clusterState); | ||
|
||
|
||
logger.info("--> testing rejection of another cluster name"); | ||
try { | ||
node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build(), node.clusterState); | ||
fail("node accepted state with another cluster name"); | ||
} catch (IllegalStateException OK) { | ||
assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster")); | ||
} | ||
|
||
logger.info("--> testing rejection of a cluster state with wrong local node"); | ||
|
@@ -620,6 +635,7 @@ public void testIncomingClusterStateValidation() throws Exception { | |
node.action.validateIncomingState(state, node.clusterState); | ||
fail("node accepted state with non-existence local node"); | ||
} catch (IllegalStateException OK) { | ||
assertThat(OK.toString(), containsString("received state from local node that does not match the current local node")); | ||
} | ||
|
||
try { | ||
|
@@ -630,6 +646,7 @@ public void testIncomingClusterStateValidation() throws Exception { | |
node.action.validateIncomingState(state, node.clusterState); | ||
fail("node accepted state with existent but wrong local node"); | ||
} catch (IllegalStateException OK) { | ||
assertThat(OK.toString(), containsString("received state from local node that does not match the current local node")); | ||
} | ||
|
||
logger.info("--> testing acceptance of an old cluster state"); | ||
|
@@ -639,7 +656,7 @@ public void testIncomingClusterStateValidation() throws Exception { | |
expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState)); | ||
final String message = String.format( | ||
Locale.ROOT, | ||
"received older cluster state version [%s] from current master with uuid [%s] than last seen cluster state [%s] from current master with uuid [%s]", | ||
"received cluster state from current master superseded by last seen cluster state; received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]", | ||
incomingState.version(), | ||
incomingState.stateUUID(), | ||
node.clusterState.version(), | ||
|
@@ -678,19 +695,27 @@ public void testOutOfOrderCommitMessages() throws Throwable { | |
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); | ||
assertThat(channel.error.get(), nullValue()); | ||
channel.clear(); | ||
|
||
} | ||
|
||
logger.info("--> committing states"); | ||
|
||
long largestVersionSeen = Long.MIN_VALUE; | ||
Randomness.shuffle(states); | ||
for (ClusterState state : states) { | ||
node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel); | ||
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); | ||
if (channel.error.get() != null) { | ||
throw channel.error.get(); | ||
if (largestVersionSeen < state.getVersion()) { | ||
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); | ||
if (channel.error.get() != null) { | ||
throw channel.error.get(); | ||
} | ||
largestVersionSeen = state.getVersion(); | ||
} else { | ||
assertNotNull(channel.error.get()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add a comment as to why we expect an error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 3abf817. |
||
assertThat(channel.error.get(), instanceOf(IllegalStateException.class)); | ||
} | ||
channel.clear(); | ||
} | ||
channel.clear(); | ||
|
||
//now check the last state held | ||
assertSameState(node.clusterState, finalState); | ||
|
@@ -828,8 +853,8 @@ static class MockPublishAction extends PublishClusterStateAction { | |
AtomicBoolean timeoutOnCommit = new AtomicBoolean(); | ||
AtomicBoolean errorOnCommit = new AtomicBoolean(); | ||
|
||
public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { | ||
super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName); | ||
public MockPublishAction(Settings settings, TransportService transportService, Supplier<ClusterState> clusterStateSupplier, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { | ||
super(settings, transportService, clusterStateSupplier, listener, discoverySettings, clusterName); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is we're here, then the masters are equal. In this case supersedes == false, means version equality (because it's can't be lower. Also state.nodes().getMasterNodeId() is never null (and assigned to currentMaster.getId())) . I'm not sure what we're asserting than. Also note that it disables the next if clause, checking for uuid equality, so maybe we want to do the next if clause first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed assertion, reordered checks in c2ed5a1.