-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support ingestion deletion policy to fix replication and peer recover…
…y with pull based ingestion Signed-off-by: Varun Bharadwaj <[email protected]>
- Loading branch information
1 parent
cea42e1
commit 57b86ed
Showing
5 changed files
with
312 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
195 changes: 195 additions & 0 deletions
195
...afka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionSegRepIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.kafka; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
import org.junit.After; | ||
import org.junit.Assert; | ||
import org.junit.Before; | ||
import org.opensearch.action.admin.cluster.node.info.NodeInfo; | ||
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; | ||
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; | ||
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.index.query.RangeQueryBuilder; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.plugins.PluginInfo; | ||
import org.opensearch.test.InternalTestCluster; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
import org.testcontainers.containers.KafkaContainer; | ||
import org.testcontainers.utility.DockerImageName; | ||
|
||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Properties; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import static org.hamcrest.Matchers.is; | ||
|
||
/** | ||
* Integration test for Kafka ingestion with segment replication | ||
*/ | ||
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class) | ||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class KafkaIngestionSegRepIT extends OpenSearchIntegTestCase { | ||
static final String topicName = "test"; | ||
static final String indexName = "testindex"; | ||
static final String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"; | ||
|
||
private KafkaContainer kafka; | ||
private Producer<String, String> producer; | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return Arrays.asList(KafkaPlugin.class); | ||
} | ||
|
||
@Before | ||
private void setup() { | ||
setupKafka(); | ||
} | ||
|
||
@After | ||
private void cleanup() { | ||
stopKafka(); | ||
} | ||
|
||
public void testSegmentReplicationWithPeerRecovery() throws Exception { | ||
// Step 1: Create primary and replica shard. Initialize kafka topic. | ||
|
||
internalCluster().startClusterManagerOnlyNode(); | ||
final String nodeA = internalCluster().startDataOnlyNode(); | ||
|
||
createIndex( | ||
indexName, | ||
Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) | ||
.put("ingestion_source.type", "kafka") | ||
.put("ingestion_source.pointer.init.reset", "earliest") | ||
.put("ingestion_source.param.topic", topicName) | ||
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) | ||
.put("index.replication.type", "SEGMENT") | ||
.build(), | ||
mapping | ||
); | ||
|
||
ensureYellowAndNoInitializingShards(indexName); | ||
final String nodeB = internalCluster().startDataOnlyNode(); | ||
ensureGreen(indexName); | ||
assertTrue(nodeA.equals(primaryNodeName(indexName))); | ||
assertTrue(nodeB.equals(replicaNodeName(indexName))); | ||
|
||
// Step 2: Produce update messages and validate segment replication | ||
|
||
produceIndexMessage("1", "name1", "24"); | ||
produceIndexMessage("2", "name2", "20"); | ||
refresh(indexName); | ||
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB)); | ||
|
||
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21); | ||
SearchResponse primaryResponse = client(nodeA).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); | ||
assertThat(primaryResponse.getHits().getTotalHits().value(), is(1L)); | ||
SearchResponse replicaResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); | ||
assertThat(replicaResponse.getHits().getTotalHits().value(), is(1L)); | ||
|
||
// Step 3: Stop current primary node and validate replica promotion. | ||
|
||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA)); | ||
ensureYellowAndNoInitializingShards(indexName); | ||
assertTrue(nodeB.equals(primaryNodeName(indexName))); | ||
|
||
// Step 4: Verify new primary node is able to index documents | ||
|
||
produceIndexMessage("3", "name3", "30"); | ||
produceIndexMessage("4", "name4", "31"); | ||
refresh(indexName); | ||
waitForSearchableDocs(4, Arrays.asList(nodeB)); | ||
|
||
SearchResponse newPrimaryResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); | ||
assertThat(newPrimaryResponse.getHits().getTotalHits().value(), is(3L)); | ||
|
||
// Step 5: Add a new node and assign the replica shard. Verify peer replication works. | ||
|
||
final String nodeC = internalCluster().startDataOnlyNode(); | ||
client().admin().cluster().prepareReroute() | ||
.add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)) | ||
.get(); | ||
ensureGreen(indexName); | ||
assertTrue(nodeC.equals(replicaNodeName(indexName))); | ||
|
||
waitForSearchableDocs(4, Arrays.asList(nodeC)); | ||
SearchResponse newReplicaResponse = client(nodeC).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); | ||
assertThat(newReplicaResponse.getHits().getTotalHits().value(), is(3L)); | ||
|
||
// Step 6: Produce new updates and verify segment replication works when primary and replica index are not empty. | ||
produceIndexMessage("5", "name5", "40"); | ||
produceIndexMessage("6", "name6", "41"); | ||
refresh(indexName); | ||
waitForSearchableDocs(6, Arrays.asList(nodeB, nodeC)); | ||
} | ||
|
||
private void setupKafka() { | ||
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) | ||
// disable topic auto creation | ||
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"); | ||
kafka.start(); | ||
|
||
// setup producer | ||
String boostrapServers = kafka.getBootstrapServers(); | ||
KafkaUtils.createTopic(topicName, 1, boostrapServers); | ||
Properties props = new Properties(); | ||
props.put("bootstrap.servers", kafka.getBootstrapServers()); | ||
producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); | ||
} | ||
|
||
private void stopKafka() { | ||
if (producer != null) { | ||
producer.close(); | ||
} | ||
|
||
if (kafka != null) { | ||
kafka.stop(); | ||
} | ||
} | ||
|
||
private void produceIndexMessage(String id, String name, String age) { | ||
String payload = String.format("{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}", id, name, age); | ||
producer.send( | ||
new ProducerRecord<>( | ||
topicName, | ||
"null", | ||
payload | ||
) | ||
); | ||
} | ||
|
||
private void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception { | ||
assertBusy(() -> { | ||
for (String node : nodes) { | ||
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(); | ||
final long hits = response.getHits().getTotalHits().value(); | ||
if (hits < docCount) { | ||
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); | ||
} | ||
} | ||
}, 1, TimeUnit.MINUTES); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
85 changes: 85 additions & 0 deletions
85
server/src/main/java/org/opensearch/index/engine/IngestionDeletionPolicy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.engine; | ||
|
||
import org.apache.logging.log4j.Logger; | ||
import org.apache.lucene.index.IndexCommit; | ||
|
||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* A variant of {@link CombinedDeletionPolicy} to be used in pull-based ingestion. This policy deletes older commits | ||
* and prevents deletion of snapshotted commits. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class IngestionDeletionPolicy extends CombinedDeletionPolicy { | ||
private volatile IndexCommit lastCommit; | ||
private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY; | ||
|
||
// tracks number of references held on snapshotted commits | ||
private final Map<IndexCommit, Integer> snapshottedCommits; | ||
|
||
public IngestionDeletionPolicy(Logger logger) { | ||
super(logger, null, null, null); | ||
this.snapshottedCommits = new HashMap<>(); | ||
} | ||
|
||
@Override | ||
public void onInit(List<? extends IndexCommit> commits) throws IOException { | ||
assert commits.isEmpty() == false : "index is opened, but we have no commits"; | ||
onCommit(commits); | ||
} | ||
|
||
@Override | ||
public void onCommit(List<? extends IndexCommit> commits) throws IOException { | ||
synchronized (this) { | ||
this.safeCommitInfo = SafeCommitInfo.EMPTY; | ||
this.lastCommit = commits.get(commits.size() - 1); | ||
for (int i = commits.size() - 2; i >= 0; i--) { | ||
if (snapshottedCommits.containsKey(commits.get(i)) == false) { | ||
deleteCommit(commits.get(i)); | ||
} | ||
} | ||
} | ||
|
||
safeCommitInfo = new SafeCommitInfo(0, getDocCountOfCommit(lastCommit)); | ||
} | ||
|
||
synchronized IndexCommit acquireIndexCommit() { | ||
assert lastCommit != null : "Last commit is not initialized yet"; | ||
snapshottedCommits.merge(lastCommit, 1, Integer::sum); // increase refCount | ||
return new SnapshotIndexCommit(lastCommit); | ||
} | ||
|
||
synchronized boolean releaseCommit(final IndexCommit snapshotCommit) { | ||
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate; | ||
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" | ||
+ "snapshotted commits [" | ||
+ snapshottedCommits | ||
+ "], releasing commit [" | ||
+ releasingCommit | ||
+ "]"; | ||
final int refCount = snapshottedCommits.merge(releasingCommit, -1, Integer::sum); // release refCount | ||
assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]"; | ||
if (refCount == 0) { | ||
snapshottedCommits.remove(releasingCommit); | ||
} | ||
// The commit can be clean up only if no pending snapshot, and it is not the recent commit | ||
return refCount == 0 && releasingCommit.equals(lastCommit) == false; | ||
} | ||
|
||
@Override | ||
protected SafeCommitInfo getSafeCommitInfo() { | ||
return safeCommitInfo; | ||
} | ||
} |
Oops, something went wrong.