Skip to content

Commit

Permalink
Refactor ingestion engine to work with segment replication and peer r…
Browse files Browse the repository at this point in the history
…ecovery

Signed-off-by: Varun Bharadwaj <[email protected]>
  • Loading branch information
varunbharadwaj committed Feb 16, 2025
1 parent 38e4b33 commit bc9716c
Show file tree
Hide file tree
Showing 11 changed files with 485 additions and 873 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -43,19 +47,33 @@
import static org.awaitility.Awaitility.await;

/**
* Integration test for Kafka ingestion
* Integration test for Kafka ingestion with segment replication
*/
@ThreadLeakFilters(filters = TestContainerWatchdogThreadLeakFilter.class)
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IngestFromKafkaIT extends OpenSearchIntegTestCase {
static final String topicName = "test";
static final String indexName = "testindex";
static final String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";

private KafkaContainer kafka;
private Producer<String, String> producer;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(KafkaPlugin.class);
}

@Before
private void setup() {
setupKafka();
}

@After
private void cleanup() {
stopKafka();
}

/**
* test ingestion-kafka-plugin is installed
*/
Expand All @@ -75,61 +93,147 @@ public void testPluginsAreInstalled() {
}

public void testKafkaIngestion() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test");
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
produceData("1", "name1", "24");
produceData("2", "name2", "20");

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test");
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
}

public void testSegmentReplicationWithPeerRecovery() throws Exception {
// Step 1: Create primary and replica nodes. Create index with 1 replica and kafka as ingestion source.

internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();

createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.build(),
mapping
);

ensureYellowAndNoInitializingShards(indexName);
final String nodeB = internalCluster().startDataOnlyNode();
ensureGreen(indexName);
assertTrue(nodeA.equals(primaryNodeName(indexName)));
assertTrue(nodeB.equals(replicaNodeName(indexName)));

// Step 2: Produce update messages and validate segment replication

produceData("1", "name1", "24");
produceData("2", "name2", "20");
refresh(indexName);
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
SearchResponse primaryResponse = client(nodeA).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get();
assertThat(primaryResponse.getHits().getTotalHits().value(), is(1L));
SearchResponse replicaResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get();
assertThat(replicaResponse.getHits().getTotalHits().value(), is(1L));

// Step 3: Stop current primary node and validate replica promotion.

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
ensureYellowAndNoInitializingShards(indexName);
assertTrue(nodeB.equals(primaryNodeName(indexName)));

// Step 4: Verify new primary node is able to index documents

produceData("3", "name3", "30");
produceData("4", "name4", "31");
refresh(indexName);
waitForSearchableDocs(4, Arrays.asList(nodeB));

SearchResponse newPrimaryResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get();
assertThat(newPrimaryResponse.getHits().getTotalHits().value(), is(3L));

// Step 5: Add a new node and assign the replica shard. Verify peer recovery works.

final String nodeC = internalCluster().startDataOnlyNode();
client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get();
ensureGreen(indexName);
assertTrue(nodeC.equals(replicaNodeName(indexName)));

waitForSearchableDocs(4, Arrays.asList(nodeC));
SearchResponse newReplicaResponse = client(nodeC).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get();
assertThat(newReplicaResponse.getHits().getTotalHits().value(), is(3L));

// Step 6: Produce new updates and verify segment replication works when primary and replica index are not empty.
produceData("5", "name5", "40");
produceData("6", "name6", "41");
refresh(indexName);
waitForSearchableDocs(6, Arrays.asList(nodeB, nodeC));
}

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
kafka.start();
prepareKafkaData();

// setup producer
String boostrapServers = kafka.getBootstrapServers();
KafkaUtils.createTopic(topicName, 1, boostrapServers);
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
}

private void stopKafka() {
if (producer != null) {
producer.close();
}

if (kafka != null) {
kafka.stop();
}
}

private void prepareKafkaData() {
String boostrapServers = kafka.getBootstrapServers();
KafkaUtils.createTopic(topicName, 1, boostrapServers);
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.send(new ProducerRecord<>(topicName, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}"));
producer.send(
new ProducerRecord<>(
topicName,
"null",
"{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}"
)
private void produceData(String id, String name, String age) {
String payload = String.format(
"{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
id,
name,
age
);
producer.close();
producer.send(new ProducerRecord<>(topicName, "null", payload));
}

private void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
assertBusy(() -> {
for (String node : nodes) {
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get();
final long hits = response.getHits().getTotalHits().value();
if (hits < docCount) {
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
}
}
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
/**
* The {@link org.testcontainers.images.TimeLimitedLoggedPullImageResultCallback} instance used by test containers,
* for example {@link org.testcontainers.containers.KafkaContainer} creates a watcher daemon thread which is never
* stopped. This filter excludes that thread from the thread leak detection logic.
* stopped. This filter excludes that thread from the thread leak detection logic. It also excludes ryuk resource reaper
* thread which is not closed on time.
*/
public final class TestContainerWatchdogThreadLeakFilter implements ThreadFilter {
public final class TestContainerThreadLeakFilter implements ThreadFilter {
@Override
public boolean reject(Thread t) {
return t.getName().startsWith("testcontainers-pull-watchdog-");
return t.getName().startsWith("testcontainers-pull-watchdog-") || t.getName().startsWith("testcontainers-ryuk");
}
}
Loading

0 comments on commit bc9716c

Please sign in to comment.