Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Change FollowIndexAction.Request class to be more user friendly #33810

Merged
merged 2 commits into from
Sep 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ private static void refresh(String index) throws IOException {

private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ private static void refresh(String index) throws IOException {

private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,16 @@ private void followLeaderIndex(String clusterAlias, Index indexToFollow,

String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
clusterAlias + ":" + leaderIndexName;
FollowIndexAction.Request request =
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(),
pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(),
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(),
pattern.getIdleShardRetryDelay());
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex(leaderIndexNameWithClusterAliasPrefix);
request.setFollowerIndex(followIndexName);
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
request.setMaxOperationSizeInBytes(pattern.getMaxOperationSizeInBytes());
request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.setMaxRetryDelay(pattern.getMaxRetryDelay());
request.setPollTimeout(pattern.getIdleShardRetryDelay());

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -64,8 +63,8 @@ public static class Request extends SingleShardRequest<Request> {
private int maxOperationCount;
private ShardId shardId;
private String expectedHistoryUUID;
private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
private TimeValue pollTimeout = TransportFollowIndexAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;

public Request(ShardId shardId, String expectedHistoryUUID) {
super(shardId.getIndexName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
Expand Down Expand Up @@ -55,6 +56,14 @@

public class TransportFollowIndexAction extends HandledTransportAction<FollowIndexAction.Request, AcknowledgedResponse> {

static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
private static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500);
private static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1;
private static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240;
private static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024;
private static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1;
static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1);

private final Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
Expand Down Expand Up @@ -179,19 +188,8 @@ void start(
String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(ccrIndexMetadata);
String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId];

ShardFollowTask shardFollowTask = new ShardFollowTask(
clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.getMaxBatchOperationCount(),
request.getMaxConcurrentReadBatches(),
request.getMaxOperationSizeInBytes(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getPollTimeout(),
recordedLeaderShardHistoryUUID,
filteredHeaders);
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request,
leaderIndexMetadata, followIndexMetadata, recordedLeaderShardHistoryUUID, filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
Expand Down Expand Up @@ -299,6 +297,69 @@ static void validate(
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY);
}

private static ShardFollowTask createShardFollowTask(
int shardId,
String clusterAliasName,
FollowIndexAction.Request request,
IndexMetaData leaderIndexMetadata,
IndexMetaData followIndexMetadata,
String recordedLeaderShardHistoryUUID,
Map<String, String> filteredHeaders
) {
int maxBatchOperationCount;
if (request.getMaxBatchOperationCount() != null) {
maxBatchOperationCount = request.getMaxBatchOperationCount();
} else {
maxBatchOperationCount = DEFAULT_MAX_BATCH_OPERATION_COUNT;
}

int maxConcurrentReadBatches;
if (request.getMaxConcurrentReadBatches() != null){
maxConcurrentReadBatches = request.getMaxConcurrentReadBatches();
} else {
maxConcurrentReadBatches = DEFAULT_MAX_CONCURRENT_READ_BATCHES;
}

long maxOperationSizeInBytes;
if (request.getMaxOperationSizeInBytes() != null) {
maxOperationSizeInBytes = request.getMaxOperationSizeInBytes();
} else {
maxOperationSizeInBytes = DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
}

int maxConcurrentWriteBatches;
if (request.getMaxConcurrentWriteBatches() != null) {
maxConcurrentWriteBatches = request.getMaxConcurrentWriteBatches();
} else {
maxConcurrentWriteBatches = DEFAULT_MAX_CONCURRENT_WRITE_BATCHES;
}

int maxWriteBufferSize;
if (request.getMaxWriteBufferSize() != null) {
maxWriteBufferSize = request.getMaxWriteBufferSize();
} else {
maxWriteBufferSize = DEFAULT_MAX_WRITE_BUFFER_SIZE;
}

TimeValue maxRetryDelay = request.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : request.getMaxRetryDelay();
TimeValue pollTimeout = request.getPollTimeout() == null ? DEFAULT_POLL_TIMEOUT : request.getPollTimeout();

return new ShardFollowTask(
clusterAliasName,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
maxBatchOperationCount,
maxConcurrentReadBatches,
maxOperationSizeInBytes,
maxConcurrentWriteBatches,
maxWriteBufferSize,
maxRetryDelay,
pollTimeout,
recordedLeaderShardHistoryUUID,
filteredHeaders
);
}

private static String[] extractIndexShardHistoryUUIDs(Map<String, String> ccrIndexMetaData) {
String historyUUIDs = ccrIndexMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS);
return historyUUIDs.split(",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,12 @@ private void assertNonCompliantLicense(final Exception e) {
}

private FollowIndexAction.Request getFollowRequest() {
return new FollowIndexAction.Request(
"leader",
"follower",
FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT,
FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES,
FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES,
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE,
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10));
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex("leader");
request.setFollowerIndex("follower");
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setPollTimeout(TimeValue.timeValueMillis(10));
return request;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,11 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10));
atLeastDocsIndexed("index1", numDocsIndexed / 3);

final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", maxReadSize,
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
randomIntBetween(1024, 10240), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
followRequest.setMaxBatchOperationCount(maxReadSize);
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
followRequest.setMaxWriteBufferSize(randomIntBetween(1024, 10240));
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

Expand Down Expand Up @@ -358,9 +360,10 @@ public void testFollowIndexAndCloseNode() throws Exception {
});
thread.start();

final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", randomIntBetween(32, 2048),
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
followRequest.setMaxBatchOperationCount(randomIntBetween(32, 2048));
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get();

long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(),
Expand Down Expand Up @@ -438,7 +441,7 @@ public void testFollowNonExistentIndex() throws Exception {
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet());
}

public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
Expand All @@ -451,8 +454,8 @@ public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}

final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
followRequest.setMaxOperationSizeInBytes(1L);
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

Expand Down Expand Up @@ -480,25 +483,21 @@ public void testDontFollowTheWrongIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("index3");

FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

followRequest = new FollowIndexAction.Request("index3", "index4", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
followRequest = createFollowRequest("index3", "index4");
createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
unfollowIndex("index2", "index4");

FollowIndexAction.Request wrongRequest1 = new FollowIndexAction.Request("index1", "index4", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request wrongRequest1 = createFollowRequest("index1", "index4");
Exception e = expectThrows(IllegalArgumentException.class,
() -> client().execute(FollowIndexAction.INSTANCE, wrongRequest1).actionGet());
assertThat(e.getMessage(), containsString("follow index [index4] should reference"));

FollowIndexAction.Request wrongRequest2 = new FollowIndexAction.Request("index3", "index2", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request wrongRequest2 = createFollowRequest("index3", "index2");
e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest2).actionGet());
assertThat(e.getMessage(), containsString("follow index [index2] should reference"));
}
Expand Down Expand Up @@ -707,10 +706,12 @@ private void assertSameDocCount(String index1, String index2) throws Exception {
}, 60, TimeUnit.SECONDS);
}

public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followIndex) {
return new FollowIndexAction.Request(leaderIndex, followIndex, FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT,
FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES, FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE,
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10));
public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) {
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex(leaderIndex);
request.setFollowerIndex(followerIndex);
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setPollTimeout(TimeValue.timeValueMillis(10));
return request;
}
}
Loading