Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not renew sync-id if all shards are sealed #29103

Merged
merged 2 commits into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ public Engine.CommitId getRawCommitId() {
return new Engine.CommitId(Base64.getDecoder().decode(id));
}

/**
* The synced-flush id of the commit if existed.
*/
public String syncId() {
return userData.get(InternalEngine.SYNC_COMMIT_ID);
}

/**
* Returns the number of documents in the in this commit
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -65,6 +67,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -216,9 +219,16 @@ public void onResponse(InFlightOpsResponse response) {
if (inflight != 0) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
} else {
// 3. now send the sync request to all the shards
String syncId = UUIDs.randomBase64UUID();
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
// 3. now send the sync request to all the shards;
final String sharedSyncId = sharedExistingSyncId(presyncResponses);
if (sharedSyncId != null) {
assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) :
"Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]";
reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener);
}else {
String syncId = UUIDs.randomBase64UUID();
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
}
}
}

Expand All @@ -244,6 +254,33 @@ public void onFailure(Exception e) {
}
}

private String sharedExistingSyncId(Map<String, PreSyncedFlushResponse> preSyncedFlushResponses) {
String existingSyncId = null;
for (PreSyncedFlushResponse resp : preSyncedFlushResponses.values()) {
if (Strings.isNullOrEmpty(resp.existingSyncId)) {
return null;
}
if (existingSyncId == null) {
existingSyncId = resp.existingSyncId;
}
if (existingSyncId.equals(resp.existingSyncId) == false) {
return null;
}
}
return existingSyncId;
}

private void reportSuccessWithExistingSyncId(ShardId shardId, String existingSyncId, List<ShardRouting> shards, int totalShards,
Map<String, PreSyncedFlushResponse> preSyncResponses, ActionListener<ShardsSyncedFlushResult> listener) {
final Map<ShardRouting, ShardSyncedFlushResponse> results = new HashMap<>();
for (final ShardRouting shard : shards) {
if (preSyncResponses.containsKey(shard.currentNodeId())) {
results.put(shard, new ShardSyncedFlushResponse());
}
}
listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results));
}

final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName());
if (indexRoutingTable == null) {
Expand Down Expand Up @@ -438,7 +475,7 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
final CommitStats commitStats = indexShard.commitStats();
final Engine.CommitId commitId = commitStats.getRawCommitId();
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs());
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
}

private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
Expand Down Expand Up @@ -512,21 +549,15 @@ static final class PreSyncedFlushResponse extends TransportResponse {

Engine.CommitId commitId;
int numDocs;
@Nullable String existingSyncId = null;

PreSyncedFlushResponse() {
}

PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs) {
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs, String existingSyncId) {
this.commitId = commitId;
this.numDocs = numDocs;
}

Engine.CommitId commitId() {
return commitId;
}

int numDocs() {
return numDocs;
this.existingSyncId = existingSyncId;
}

boolean includeNumDocs(Version version) {
Expand All @@ -537,6 +568,10 @@ boolean includeNumDocs(Version version) {
}
}

boolean includeExistingSyncId(Version version) {
return version.onOrAfter(Version.V_7_0_0_alpha1);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -546,6 +581,9 @@ public void readFrom(StreamInput in) throws IOException {
} else {
numDocs = UNKNOWN_NUM_DOCS;
}
if (includeExistingSyncId(in.getVersion())) {
existingSyncId = in.readOptionalString();
}
}

@Override
Expand All @@ -555,6 +593,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (includeNumDocs(out.getVersion())) {
out.writeInt(numDocs);
}
if (includeExistingSyncId(out.getVersion())) {
out.writeOptionalString(existingSyncId);
}
}
}

Expand Down
49 changes: 49 additions & 0 deletions server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.lucene.index.Term;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -59,6 +61,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class FlushIT extends ESIntegTestCase {
Expand Down Expand Up @@ -280,4 +283,50 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
}

public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
assertAcked(
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)).get()
);
ensureGreen();
final Index index = clusterService().state().metaData().index("test").getIndex();
final ShardId shardId = new ShardId(index, 0);
final int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
// Do not renew synced-flush
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
// Shards were updated, renew synced flush.
final int moreDocs = between(1, 10);
for (int i = 0; i < moreDocs; i++) {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
// Manually remove or change sync-id, renew synced flush.
IndexShard shard = internalCluster().getInstance(IndicesService.class, randomFrom(internalCluster().nodesInclude("test")))
.getShardOrNull(shardId);
if (randomBoolean()) {
// Change the existing sync-id of a single shard.
shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats().getRawCommitId());
assertThat(shard.commitStats().syncId(), not(equalTo(thirdSeal.syncId())));
} else {
// Flush will create a new commit without sync-id
shard.flush(new FlushRequest(shardId.getIndexName()).force(true).waitIfOngoing(true));
assertThat(shard.commitStats().syncId(), nullValue());
}
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
}
}