Skip to content

Commit

Permalink
Follow stats api should return a 404 when requesting stats for a non …
Browse files Browse the repository at this point in the history
…existing index (#37220)

Currently it returns an empty response with a 200 response code.

Closes #37021
  • Loading branch information
martijnvg committed Jan 22, 2019
1 parent bd5319e commit 6a0f149
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,22 @@ public void onFailure(Exception e) {
public void testGetFollowStats() throws Exception {
RestHighLevelClient client = highLevelClient();

{
// Create leader index:
CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader");
createIndexRequest.settings(Collections.singletonMap("index.soft_deletes.enabled", true));
CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
assertThat(response.isAcknowledged(), is(true));
}
{
// Follow index, so that we can query for follow stats:
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower");
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
assertThat(putFollowResponse.isIndexFollowingStarted(), is(true));
}

// tag::ccr-get-follow-stats-request
FollowStatsRequest request =
new FollowStatsRequest("follower"); // <1>
Expand Down Expand Up @@ -671,6 +687,12 @@ public void onFailure(Exception e) {
// end::ccr-get-follow-stats-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));

{
PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower");
AcknowledgedResponse pauseFollowResponse = client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT);
assertThat(pauseFollowResponse.isAcknowledged(), is(true));
}
}

static Map<String, Object> toMap(Response response) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
- is_true: follow_index_shards_acked
- is_true: index_following_started

- do:
ccr.follow_stats:
index: _all
- length: { indices: 1 }
- match: { indices.0.index: "bar" }

# we can not reliably wait for replication to occur so we test the endpoint without indexing any documents
- do:
ccr.follow_stats:
Expand Down Expand Up @@ -78,3 +84,7 @@
index: bar
- is_true: acknowledged

- do:
catch: missing
ccr.follow_stats:
index: unknown
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
Expand All @@ -14,6 +15,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -75,6 +77,15 @@ protected void doExecute(
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
}

if (Strings.isAllOrWildcard(request.indices()) == false) {
final ClusterState state = clusterService.state();
Set<String> shardFollowTaskFollowerIndices = findFollowerIndicesFromShardFollowTasks(state, request.indices());
if (shardFollowTaskFollowerIndices.isEmpty()) {
String resources = String.join(",", request.indices());
throw new ResourceNotFoundException("No shard follow tasks for follower indices [{}]", resources);
}
}
super.doExecute(task, request, listener);
}

Expand All @@ -95,21 +106,7 @@ protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in)
@Override
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
return;
}

final Set<String> requestedFollowerIndices = request.indices() != null ?
new HashSet<>(Arrays.asList(request.indices())) : Collections.emptySet();
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
.map(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName();
})
.filter(followerIndex -> requestedFollowerIndices.isEmpty() || requestedFollowerIndices.contains(followerIndex))
.collect(Collectors.toSet());
final Set<String> followerIndices = findFollowerIndicesFromShardFollowTasks(state, request.indices());

for (final Task task : taskManager.getTasks().values()) {
if (task instanceof ShardFollowNodeTask) {
Expand All @@ -129,4 +126,22 @@ protected void taskOperation(
listener.onResponse(new FollowStatsAction.StatsResponse(task.getStatus()));
}

static Set<String> findFollowerIndicesFromShardFollowTasks(ClusterState state, String[] indices) {
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
return Collections.emptySet();
}

final Set<String> requestedFollowerIndices = indices != null ?
new HashSet<>(Arrays.asList(indices)) : Collections.emptySet();
return persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
.map(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName();
})
.filter(followerIndex -> Strings.isAllOrWildcard(indices) || requestedFollowerIndices.contains(followerIndex))
.collect(Collectors.toSet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
Expand Down Expand Up @@ -116,4 +119,106 @@ public void testFollowStatsApiFollowerIndexFiltering() throws Exception {
});
}

public void testFollowStatsApiResourceNotFound() throws Exception {
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(0));

statsRequest.setIndices(new String[] {"follower1"});
Exception e = expectThrows(ResourceNotFoundException.class,
() -> client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet());
assertThat(e.getMessage(), equalTo("No shard follow tasks for follower indices [follower1]"));

final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader1");

PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
client().execute(PutFollowAction.INSTANCE, followRequest).get();

response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest.setIndices(new String[] {"follower2"});
e = expectThrows(ResourceNotFoundException.class,
() -> client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet());
assertThat(e.getMessage(), equalTo("No shard follow tasks for follower indices [follower2]"));

assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
}

public void testFollowStatsApiIncludeShardFollowStatsWithRemovedFollowerIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader1");

PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
client().execute(PutFollowAction.INSTANCE, followRequest).get();

FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[] {"follower1"});
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

assertAcked(client().admin().indices().delete(new DeleteIndexRequest("follower1")).actionGet());

statsRequest = new FollowStatsAction.StatsRequest();
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[] {"follower1"});
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
}

public void testFollowStatsApiIncludeShardFollowStatsWithClosedFollowerIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader1");

PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
client().execute(PutFollowAction.INSTANCE, followRequest).get();

FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[] {"follower1"});
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

assertAcked(client().admin().indices().close(new CloseIndexRequest("follower1")).actionGet());

statsRequest = new FollowStatsAction.StatsRequest();
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[] {"follower1"});
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(response.getStatsResponses().size(), equalTo(1));
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));

assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;

import java.util.Collections;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class TransportFollowStatsActionTests extends ESTestCase {

public void testFindFollowerIndicesFromShardFollowTasks() {
PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder()
.addTask("1", ShardFollowTask.NAME, createShardFollowTask("abc"), null)
.addTask("2", ShardFollowTask.NAME, createShardFollowTask("def"), null);

ClusterState clusterState = ClusterState.builder(new ClusterName("_cluster"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build()).build())
.build();
Set<String> result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, null);
assertThat(result.size(), equalTo(2));
assertThat(result.contains("abc"), is(true));
assertThat(result.contains("def"), is(true));

result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"def"});
assertThat(result.size(), equalTo(1));
assertThat(result.contains("def"), is(true));

result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"ghi"});
assertThat(result.size(), equalTo(0));
}

private static ShardFollowTask createShardFollowTask(String followerIndex) {
return new ShardFollowTask(
null,
new ShardId(followerIndex, "", 0),
new ShardId("leader_index", "", 0),
1024,
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
1,
1024,
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
1,
10240,
new ByteSizeValue(512, ByteSizeUnit.MB),
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10),
Collections.emptyMap()
);
}

}

0 comments on commit 6a0f149

Please sign in to comment.