Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow stats api should return a 404 when requesting stats for a non existing index #37220

Merged
merged 13 commits into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,22 @@ public void onFailure(Exception e) {
public void testGetFollowStats() throws Exception {
RestHighLevelClient client = highLevelClient();

{
// Create leader index:
CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader");
createIndexRequest.settings(Collections.singletonMap("index.soft_deletes.enabled", true));
CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
assertThat(response.isAcknowledged(), is(true));
}
{
// Follow index, so that we can query for follow stats:
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower");
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
assertThat(putFollowResponse.isIndexFollowingStarted(), is(true));
}

// tag::ccr-get-follow-stats-request
FollowStatsRequest request =
new FollowStatsRequest("follower"); // <1>
Expand Down Expand Up @@ -671,6 +687,12 @@ public void onFailure(Exception e) {
// end::ccr-get-follow-stats-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));

{
PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower");
AcknowledgedResponse pauseFollowResponse = client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT);
assertThat(pauseFollowResponse.isAcknowledged(), is(true));
}
}

static Map<String, Object> toMap(Response response) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
- is_true: follow_index_shards_acked
- is_true: index_following_started

- do:
ccr.follow_stats:
index: _all
- length: { indices: 1 }
- match: { indices.0.index: "bar" }

# we can not reliably wait for replication to occur so we test the endpoint without indexing any documents
- do:
ccr.follow_stats:
Expand Down Expand Up @@ -77,3 +83,7 @@
index: bar
- is_true: acknowledged

- do:
catch: missing
ccr.follow_stats:
index: unknown
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@

package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
Expand Down Expand Up @@ -65,6 +67,15 @@ protected void doExecute(
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
}

if (Strings.isAllOrWildcard(request.indices()) == false) {
final ClusterState state = clusterService.state();
Set<String> shardFollowTaskFollowerIndices = findFollowerIndicesFromShardFollowTasks(state, request.indices());
if (shardFollowTaskFollowerIndices.isEmpty()) {
String resources = String.join(",", request.indices());
throw new ResourceNotFoundException("No shard follow tasks for follower indices [{}]", resources);
}
}
super.doExecute(task, request, listener);
}

Expand All @@ -80,21 +91,7 @@ protected FollowStatsAction.StatsResponses newResponse(
@Override
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
return;
}

final Set<String> requestedFollowerIndices = request.indices() != null ?
new HashSet<>(Arrays.asList(request.indices())) : Collections.emptySet();
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
.map(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName();
})
.filter(followerIndex -> requestedFollowerIndices.isEmpty() || requestedFollowerIndices.contains(followerIndex))
.collect(Collectors.toSet());
final Set<String> followerIndices = findFollowerIndicesFromShardFollowTasks(state, request.indices());

for (final Task task : taskManager.getTasks().values()) {
if (task instanceof ShardFollowNodeTask) {
Expand All @@ -114,4 +111,22 @@ protected void taskOperation(
listener.onResponse(new FollowStatsAction.StatsResponse(task.getStatus()));
}

static Set<String> findFollowerIndicesFromShardFollowTasks(ClusterState state, String[] indices) {
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
return Collections.emptySet();
}

final Set<String> requestedFollowerIndices = indices != null ?
new HashSet<>(Arrays.asList(indices)) : Collections.emptySet();
return persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
.map(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName();
})
.filter(followerIndex -> Strings.isAllOrWildcard(indices) || requestedFollowerIndices.contains(followerIndex))
.collect(Collectors.toSet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
Expand Down Expand Up @@ -116,4 +119,106 @@ public void testFollowStatsApiFollowerIndexFiltering() throws Exception {
});
}

public void testFollowStatsApiResourceNotFound() throws Exception {
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(0));

statsRequest.setIndices(new String[] {"follower1"});
Exception e = expectThrows(ResourceNotFoundException.class,
() -> client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet());
assertThat(e.getMessage(), equalTo("No shard follow tasks for follower indices [follower1]"));

final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader1");

PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
client().execute(PutFollowAction.INSTANCE, followRequest).get();

response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest.setIndices(new String[] {"follower2"});
e = expectThrows(ResourceNotFoundException.class,
() -> client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet());
assertThat(e.getMessage(), equalTo("No shard follow tasks for follower indices [follower2]"));

assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
}

public void testFollowStatsApiIncludeShardFollowStatsWithRemovedFollowerIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader1");

PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
client().execute(PutFollowAction.INSTANCE, followRequest).get();

FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[] {"follower1"});
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

assertAcked(client().admin().indices().delete(new DeleteIndexRequest("follower1")).actionGet());

statsRequest = new FollowStatsAction.StatsRequest();
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[] {"follower1"});
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
}

public void testFollowStatsApiIncludeShardFollowStatsWithClosedFollowerIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader1");

PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
client().execute(PutFollowAction.INSTANCE, followRequest).get();

FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[] {"follower1"});
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

assertAcked(client().admin().indices().close(new CloseIndexRequest("follower1")).actionGet());

statsRequest = new FollowStatsAction.StatsRequest();
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[] {"follower1"});
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;

import java.util.Collections;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class TransportFollowStatsActionTests extends ESTestCase {

public void testFindFollowerIndicesFromShardFollowTasks() {
PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder()
.addTask("1", ShardFollowTask.NAME, createShardFollowTask("abc"), null)
.addTask("2", ShardFollowTask.NAME, createShardFollowTask("def"), null);

ClusterState clusterState = ClusterState.builder(new ClusterName("_cluster"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build()).build())
.build();
Set<String> result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, null);
assertThat(result.size(), equalTo(2));
assertThat(result.contains("abc"), is(true));
assertThat(result.contains("def"), is(true));

result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"def"});
assertThat(result.size(), equalTo(1));
assertThat(result.contains("def"), is(true));

result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"ghi"});
assertThat(result.size(), equalTo(0));
}

private static ShardFollowTask createShardFollowTask(String followerIndex) {
return new ShardFollowTask(
null,
new ShardId(followerIndex, "", 0),
new ShardId("leader_index", "", 0),
1024,
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
1,
1024,
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
1,
10240,
new ByteSizeValue(512, ByteSizeUnit.MB),
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10),
Collections.emptyMap()
);
}

}