Skip to content

Commit

Permalink
Add a NULL Consumer or Sink for CDC Server
Browse files Browse the repository at this point in the history
The null consumer can be used for testing or to performance benchmark
CDC Server by isolating it from sink performance characteristics.

The NULL consumer throws away the records and commits the batch only.
  • Loading branch information
vrajat committed May 20, 2022
1 parent e7cb8f3 commit da78781
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cdc-server/cdc-server-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
<artifactId>cdc-server-redis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>cdc-server-null</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>cdc-server-kafka</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions cdc-server/cdc-server-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@
<groupId>io.debezium</groupId>
<artifactId>cdc-server-redis</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>cdc-server-null</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>cdc-server-kafka</artifactId>
Expand Down
163 changes: 163 additions & 0 deletions cdc-server/cdc-server-null/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>cdc-server</artifactId>
<version>1.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cdc-server-null</artifactId>
<name>Debezium Server Null Sink Adapter</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>cdc-server-core</artifactId>
</dependency>

<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>cdc-server-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<executions>
<execution>
<id>make-index</id>
<goals>
<goal>jandex</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemProperties>
<test.type>IT</test.type>
</systemProperties>
</configuration>
</plugin>
</plugins>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>**/build.properties</include>
</includes>
</resource>
</resources>
</build>
<profiles>
<profile>
<id>quick</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>quick</name>
</property>
</activation>
<properties>
<skipITs>true</skipITs>
<docker.skip>true</docker.skip>
</properties>
</profile>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Do not perform any Docker-related functionality
To use, specify "-DskipITs" on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>skip-integration-tests</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>skipITs</name>
</property>
</activation>
<properties>
<docker.skip>true</docker.skip>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.nullStream;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.inject.Named;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;

/**
* Implementation of the consumer that delivers the messages into Redis (stream) destination.
*
* @author M Sazzadul Hoque
*/
@Named("nullStream")
@Dependent
public class NullStreamChangeConsumer extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {

private static final Logger LOGGER = LoggerFactory.getLogger(NullStreamChangeConsumer.class);

private static final String PROP_PREFIX = "debezium.sink.null.";

private long numLogged = 0;
private long totalRecords = 0;

@Inject
@CustomConsumerBuilder

@PostConstruct
void connect() {
final Config config = ConfigProvider.getConfig();
LOGGER.info("Instaintiated NULL consumer");
}

@PreDestroy
void close() {
LOGGER.info("Total records processed = {}", totalRecords);
}

@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records,
RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {

totalRecords += records.size();
if (totalRecords / 100000 > numLogged) {
LOGGER.info("Log Number: {}. Total records processed = {}.", numLogged, totalRecords);
numLogged++;
}
committer.markBatchFinished();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.debezium.server.redis.RedisTestConfigSource
1 change: 1 addition & 0 deletions cdc-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<module>cdc-server-kafka</module>
<module>cdc-server-pravega</module>
<module>cdc-server-nats-streaming</module>
<module>cdc-server-null</module>
</modules>

<dependencyManagement>
Expand Down

0 comments on commit da78781

Please sign in to comment.