diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5541cf1..18fd63e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -40,7 +40,7 @@ jobs: - name: Build artifacts uses: gradle/gradle-build-action@v2.4.0 with: - arguments: assembleDist + arguments: distTar - name: Upload artifacts uses: ncipollo/release-action@v1 with: diff --git a/CHANGELOG.md b/CHANGELOG.md index 8638e8a..d4e8246 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (docs/ci) [PR 32](https://github.com/provenance-io/provenance-abci-listener/pull/31) Update deploy doc and scripts and mark pre-releases * (docs) [PR 33](https://github.com/provenance-io/provenance-abci-listener/pull/33) Fix typos in deploy steps +* [PR 38](https://github.com/provenance-io/provenance-abci-listener/pull/38) Default to ByteArraySerializer when `schema.registry.url` property is not set --- diff --git a/build.gradle.kts b/build.gradle.kts index ea2e0a2..a2039bf 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -14,7 +14,7 @@ group = "io.provenance" version = System.getenv("VERSION") ?: "0-SNAPSHOT" application { - mainClass.set("io.provenance.abci.listener.ABCIListenerServerKt") + mainClass.set("io.provenance.abci.listener.AbciListenerServerKt") } repositories { @@ -84,7 +84,7 @@ tasks.withType { kotlinOptions { freeCompilerArgs = listOf( "-Xjsr305=strict", - "-Xopt-in=kotlin.RequiresOptIn" + "-opt-in=kotlin.RequiresOptIn" ) jvmTarget = "11" languageVersion = "1.7" @@ -92,8 +92,10 @@ tasks.withType { } } -tasks.assembleDist { - finalizedBy("checksumDist") // checksums are generated after assembleDist runs +tasks.distTar { + compression = Compression.GZIP + archiveExtension.set("tar.gz") + finalizedBy("checksumDist") } tasks.register("checksumDist") { @@ -103,7 +105,7 @@ tasks.register("checksumDist") { checksumAlgorithm.set(Checksum.Algorithm.MD5) appendFileNameToChecksum.set(true) - dependsOn(tasks.assembleDist) + dependsOn(tasks.distTar) } tasks.withType { diff --git a/scripts/deploy.sh b/scripts/deploy.sh index 88434e5..9f5d6fd 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -2,7 +2,7 @@ # # Download plugin distribution and extract the plugin. -# In addition, validate the md5 checksum of the zip file. +# In addition, validate the md5 checksum of the file. # usage() { @@ -24,7 +24,7 @@ TAG=$1 PLUGINS_HOME="$PIO_HOME/plugins" PLUGIN_NAME=provenance-abci-listener PLUGIN_DIR="$PLUGINS_HOME/$PLUGIN_NAME-$TAG" -RELEASE_URL="https://github.com/provenance-io/provenance-abci-listener/releases/download/$TAG/provenance-abci-listener-$TAG.zip" +RELEASE_URL="https://github.com/provenance-io/provenance-abci-listener/releases/download/$TAG/provenance-abci-listener-$TAG.tar.gz" [[ -z "$TAG" ]] && usage; @@ -32,13 +32,14 @@ echo "Release: $TAG" # download release distribution echo "Downloading release..." -curl -s --create-dirs -o "$PLUGIN_DIR.zip" -L "$RELEASE_URL" +curl -s --create-dirs -o "$PLUGIN_DIR.tar.gz" -L "$RELEASE_URL" # validate md5 checksum echo "Validating release (md5)..." -curl -s --create-dirs -o "$PLUGIN_DIR.zip.md5" -L "$RELEASE_URL.md5" +curl -s --create-dirs -o "$PLUGIN_DIR.tar.gz.md5" -L "$RELEASE_URL.md5" cd "$PLUGINS_HOME" || exit 1 -md5sum -c "$PLUGIN_DIR.zip.md5" || exit 1 +md5sum -c "$PLUGIN_DIR.tar.gz.md5" || exit 1 echo "Extracting release..." -unzip -qq "$PLUGIN_DIR.zip" -d "$PLUGINS_HOME" +tar -zxf "$PLUGIN_DIR.tar.gz" -C "$PLUGINS_HOME" +rm "$PLUGIN_DIR.tar.gz" diff --git a/src/main/kotlin/io/provenance/abci/listener/ABCIListenerServer.kt b/src/main/kotlin/io/provenance/abci/listener/AbciListenerServer.kt similarity index 78% rename from src/main/kotlin/io/provenance/abci/listener/ABCIListenerServer.kt rename to src/main/kotlin/io/provenance/abci/listener/AbciListenerServer.kt index 2b4f0bf..c3b6daa 100644 --- a/src/main/kotlin/io/provenance/abci/listener/ABCIListenerServer.kt +++ b/src/main/kotlin/io/provenance/abci/listener/AbciListenerServer.kt @@ -8,21 +8,20 @@ import io.grpc.health.v1.HealthCheckResponse.ServingStatus import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder import io.grpc.protobuf.services.HealthStatusManager import io.grpc.protobuf.services.ProtoReflectionService -import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.Producer import java.net.InetSocketAddress import java.time.Duration import java.util.concurrent.TimeUnit /** - * The [ABCIListenerServer] starts the gRPC server for the [ABCIListenerService]. - * In addition to the [ABCIListenerService], a health service (managed by [HealthStatusManager]) + * The [ABCIListenerServer] starts the gRPC server for the [AbciListenerService]. + * In addition to the [AbciListenerService], a health service (managed by [HealthStatusManager]) * and a [ProtoReflectionService] are also added. * * The server is also responsible for initializing and closing the Kafka [Producer] - * used by the [ABCIListenerService]. + * used by the [AbciListenerService]. */ -class ABCIListenerServer { +class AbciListenerServer { private val config: Config = ConfigFactory.load() private val inet = InetSocketAddress( config.getString("grpc.server.addr"), @@ -30,20 +29,20 @@ class ABCIListenerServer { ) // Kafka producer - private var topicConfig: Config = config.getConfig("kafka.producer.listen-topics") - private var kafkaConfig: Config = config.getConfig("kafka.producer.kafka-clients") - private val producer: Producer = KafkaProducer(kafkaConfig.toProperties()) + private var producerConfig: Config = config.getConfig("kafka.producer") + private var topicConfig: Config = producerConfig.getConfig("listen-topics") + private val producer = ProducerSettings(producerConfig).createKafkaProducer() private val health: HealthStatusManager = HealthStatusManager() - val server: Server = NettyServerBuilder + private val server: Server = NettyServerBuilder .forAddress(inet) .directExecutor() .addService(health.healthService) .addService(ProtoReflectionService.newInstance()) - .addService(ABCIListenerService(topicConfig, producer)) + .addService(AbciListenerService(topicConfig, producer)) .build() - /** Start the [ABCIListenerService] gRPC server. */ + /** Start the [AbciListenerService] gRPC server. */ fun start() { server.start() @@ -57,13 +56,13 @@ class ABCIListenerServer { Runtime.getRuntime().addShutdownHook( Thread { println("*** shutting down gRPC server since JVM is shutting down") - this@ABCIListenerServer.stop() + this@AbciListenerServer.stop() println("*** server shut down") } ) } - /** Stop the [ABCIListenerService] gRPC server. */ + /** Stop the [AbciListenerService] gRPC server. */ private fun stop() { // shutdown producer producer.close(Duration.ofMillis(3000)) @@ -98,7 +97,7 @@ class ABCIListenerServer { } fun main() { - val server = ABCIListenerServer() + val server = AbciListenerServer() server.start() server.blockUntilShutdown() } diff --git a/src/main/kotlin/io/provenance/abci/listener/ABCIListenerService.kt b/src/main/kotlin/io/provenance/abci/listener/AbciListenerService.kt similarity index 97% rename from src/main/kotlin/io/provenance/abci/listener/ABCIListenerService.kt rename to src/main/kotlin/io/provenance/abci/listener/AbciListenerService.kt index cd952ee..2dc3a13 100644 --- a/src/main/kotlin/io/provenance/abci/listener/ABCIListenerService.kt +++ b/src/main/kotlin/io/provenance/abci/listener/AbciListenerService.kt @@ -15,7 +15,7 @@ import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord /** - * Producer topic names for the [ABCIListenerService] + * Producer topic names for the [AbciListenerService] */ enum class ListenTopic(val topic: String) { BEGIN_BLOCK("listen-begin-block"), @@ -31,7 +31,7 @@ enum class ListenTopic(val topic: String) { * @property producer the Kafka Protobuf [Producer]. * @constructor Creates a gRPC ABCI listener service. */ -class ABCIListenerService( +class AbciListenerService( private val topicConfig: Config, private val producer: Producer ) : ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineImplBase() { diff --git a/src/main/kotlin/io/provenance/abci/listener/Extensions.kt b/src/main/kotlin/io/provenance/abci/listener/Extensions.kt index 6d9c4df..b4a29c3 100644 --- a/src/main/kotlin/io/provenance/abci/listener/Extensions.kt +++ b/src/main/kotlin/io/provenance/abci/listener/Extensions.kt @@ -28,7 +28,7 @@ fun Config.toProperties(): Properties { /** * Add a [dispatch] method to the Kafka [Producer] to produce messages * in a non-blocking manner and `await` for acknowledgement from broker - * before responding on gRPC endpoints in [ABCIListenerService]. + * before responding on gRPC endpoints in [AbciListenerService]. * * Resumes with a [StatusException] when an exception is encountered. */ diff --git a/src/main/kotlin/io/provenance/abci/listener/ProducerSettings.kt b/src/main/kotlin/io/provenance/abci/listener/ProducerSettings.kt new file mode 100644 index 0000000..80d4d38 --- /dev/null +++ b/src/main/kotlin/io/provenance/abci/listener/ProducerSettings.kt @@ -0,0 +1,33 @@ +package io.provenance.abci.listener + +import com.typesafe.config.Config +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.Serializer + +class ProducerSettings( + private val config: Config, + private val keySerializer: Serializer? = null, + private val valueSerializer: Serializer? = null +) { + + fun createKafkaProducer(): Producer { + val properties = config.getConfig("kafka-clients").toProperties() + require( + keySerializer != null || + properties.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).isNotEmpty() + ) { + "Key serializer should be defined or declared in configuration" + } + + require( + valueSerializer != null || + properties.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).isNotEmpty() + ) { + "Value serializer should be defined or declared in configuration" + } + + return KafkaProducer(properties) + } +} diff --git a/src/main/kotlin/io/provenance/abci/listener/ProtobufSerializer.kt b/src/main/kotlin/io/provenance/abci/listener/ProtobufSerializer.kt new file mode 100644 index 0000000..909590b --- /dev/null +++ b/src/main/kotlin/io/provenance/abci/listener/ProtobufSerializer.kt @@ -0,0 +1,78 @@ +package io.provenance.abci.listener + +import com.google.protobuf.Message +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig +import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.Serializer +import java.io.ByteArrayOutputStream + +/** + * ProtobufSerializer is a custom Protobuf serializer for Kafka + * that chooses a serialization path based on whether the application + * is configured to use the Confluent Schema Registry or not. + * + * When the application is configured to use the Confluent Schema Registry + * the [KafkaProtobufSerializer] will be used. Otherwise, the [ByteArraySerializer] will be used. + * + * In order to enable the application to use the Confluent Schema Registry, + * you MUST set the following properties: + * + * `schema.registry.url={{ SR_URL }}` + * `value.serializer=io.provenance.abci.listener.ProtobufSerializer`. + * + * Example configuration: + * + * ``` + * # application.conf + * + * # application configuration properties + * ... + * + * kafka.producer { + * ... + * # Properties defined by org.apache.kafka.clients.producer.ProducerConfig. + * # can be defined in this configuration section. + * kafka-clients { + * bootstrap.servers = "localhost:9092" + * // other producer properties + * ... + * key.serializer = org.apache.kafka.common.serialization.StringSerializer + * value.serializer = io.provenance.abci.listener.ProtobufSerializer + * schema.registry.url = "http://127.0.0.1:8081" + * } + * } + * ``` + * For additional producer properties see [org.apache.kafka.clients.producer.ProducerConfig] + * + */ +class ProtobufSerializer : Serializer { + private lateinit var serializer: Serializer + + override fun configure(configs: MutableMap?, isKey: Boolean) { + val useSchemaRegistry = configs!!.containsKey(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG) + if (useSchemaRegistry) { + val serializer = KafkaProtobufSerializer() + serializer.configure(configs, isKey) + this.serializer = serializer + } else { + val serializer = ByteArraySerializer() + @Suppress("UNCHECKED_CAST") + this.serializer = serializer as Serializer + } + } + + override fun serialize(topic: String?, data: T): ByteArray { + var bytes: ByteArray = byteArrayOf() + when (serializer) { + is KafkaProtobufSerializer -> bytes = serializer.serialize(topic, data) + is ByteArraySerializer -> { + val out = ByteArrayOutputStream() + data.writeTo(out) + bytes = out.toByteArray() + out.close() + } + } + return bytes + } +} diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 2456adb..afe04ac 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -6,9 +6,9 @@ grpc.server { # Kafka producer config kafka.producer { - # Assign a topic name and an optional prefix where events will be written. + # Assign a topic name and optional prefix where events will be written. listen-topics { - prefix = "local-" + prefix = "bytetest-" listen-begin-block = ${?kafka.producer.listen-topics.prefix}"listen-begin-block" listen-end-block = ${?kafka.producer.listen-topics.prefix}"listen-end-block" listen-deliver-tx = ${?kafka.producer.listen-topics.prefix}"listen-deliver-tx" @@ -25,7 +25,7 @@ kafka.producer { linger.ms = 50 max.request.size = 204857600 key.serializer = org.apache.kafka.common.serialization.StringSerializer - value.serializer = io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer - schema.registry.url = "http://127.0.0.1:8081" + value.serializer = io.provenance.abci.listener.ProtobufSerializer + // schema.registry.url = "http://127.0.0.1:8081" } } diff --git a/src/test/kotlin/io/provenance/abci/listener/ABCIListenerServerTest.kt b/src/test/kotlin/io/provenance/abci/listener/AbciListenerServerWithSchemaRegistryTests.kt similarity index 83% rename from src/test/kotlin/io/provenance/abci/listener/ABCIListenerServerTest.kt rename to src/test/kotlin/io/provenance/abci/listener/AbciListenerServerWithSchemaRegistryTests.kt index 68937a8..92ef597 100644 --- a/src/test/kotlin/io/provenance/abci/listener/ABCIListenerServerTest.kt +++ b/src/test/kotlin/io/provenance/abci/listener/AbciListenerServerWithSchemaRegistryTests.kt @@ -3,8 +3,6 @@ package io.provenance.abci.listener import com.google.protobuf.ByteString import com.google.protobuf.Message import com.google.protobuf.Timestamp -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory import cosmos.base.store.v1beta1.Listening.StoreKVPair import cosmos.streaming.abci.v1.ABCIListenerServiceGrpcKt import cosmos.streaming.abci.v1.Grpc.ListenBeginBlockRequest @@ -17,18 +15,12 @@ import cosmos.streaming.abci.v1.Grpc.ListenEndBlockRequest import cosmos.streaming.abci.v1.Grpc.ListenEndBlockResponse import io.grpc.inprocess.InProcessChannelBuilder import io.grpc.inprocess.InProcessServerBuilder -import io.grpc.testing.GrpcCleanupRule import kotlinx.coroutines.runBlocking -import net.christophschubert.cp.testcontainers.CPTestContainerFactory -import net.christophschubert.cp.testcontainers.SchemaRegistryContainer -import org.apache.kafka.clients.producer.Producer import org.assertj.core.api.Assertions.assertThat -import org.junit.Rule import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance -import org.testcontainers.containers.KafkaContainer import tendermint.abci.Types import tendermint.abci.Types.RequestBeginBlock import tendermint.abci.Types.RequestDeliverTx @@ -40,46 +32,38 @@ import tendermint.abci.Types.ResponseEndBlock import java.time.Instant @TestInstance(TestInstance.Lifecycle.PER_CLASS) -class ABCIListenerServerTest { +class AbciListenerServerWithSchemaRegistryTests : BaseTests() { - private val testContainerFactory: CPTestContainerFactory = CPTestContainerFactory() - private val kafka: KafkaContainer = testContainerFactory.createKafka() - private val schemaRegistry: SchemaRegistryContainer = testContainerFactory.createSchemaRegistry(kafka) - private val config: Config = ConfigFactory.load() - private val topicConfig: Config = config.getConfig("kafka.producer.listen-topics") - private lateinit var producer: Producer - - @get:Rule - val grpcCleanupRule: GrpcCleanupRule = GrpcCleanupRule() private val listenBeginBlockStub = ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineStub( grpcCleanupRule.register( - InProcessChannelBuilder.forName("listenBeginBlock").directExecutor().build() + InProcessChannelBuilder.forName("listenBeginBlock_with_schemaRegistry").directExecutor().build() ) ) private val listenEndBlockStub = ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineStub( grpcCleanupRule.register( - InProcessChannelBuilder.forName("listenEndBlock").directExecutor().build() + InProcessChannelBuilder.forName("listenEndBlock_with_schemaRegistry").directExecutor().build() ) ) private val listenDeliverTxStub = ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineStub( grpcCleanupRule.register( - InProcessChannelBuilder.forName("listenDeliverTx").directExecutor().build() + InProcessChannelBuilder.forName("listenDeliverTx_with_schemaRegistry").directExecutor().build() ) ) private val listenCommitStub = ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineStub( grpcCleanupRule.register( - InProcessChannelBuilder.forName("listenCommit").directExecutor().build() + InProcessChannelBuilder.forName("listenCommit_with_schemaRegistry").directExecutor().build() ) ) @BeforeAll internal fun setUpAll() { + schemaRegistry = testContainerFactory.createSchemaRegistry(kafka) schemaRegistry.start() - producer = TestProtoProducer(schemaRegistry.baseUrl) + producer = TestProtoProducer(config, schemaRegistry.baseUrl) .createProducer(kafka.bootstrapServers) } @@ -95,8 +79,8 @@ class ABCIListenerServerTest { val time = Instant.now() grpcCleanupRule.register( - InProcessServerBuilder.forName("listenBeginBlock").directExecutor() - .addService(ABCIListenerService(topicConfig, producer)) + InProcessServerBuilder.forName("listenBeginBlock_with_schemaRegistry").directExecutor() + .addService(AbciListenerService(topicConfig, producer)) .build() .start() ) @@ -142,6 +126,7 @@ class ABCIListenerServerTest { assertThat(reply.javaClass).isEqualTo(ListenBeginBlockResponse::class.java) val consumer = TestProtoConsumer( + config = config, bootstrapServers = kafka.bootstrapServers, schemaRegistryUrl = schemaRegistry.baseUrl, topic = topicConfig.getString(ListenTopic.BEGIN_BLOCK.topic), @@ -156,8 +141,8 @@ class ABCIListenerServerTest { @Test fun listenEndBlock(): Unit = runBlocking { grpcCleanupRule.register( - InProcessServerBuilder.forName("listenEndBlock").directExecutor() - .addService(ABCIListenerService(topicConfig, producer)) + InProcessServerBuilder.forName("listenEndBlock_with_schemaRegistry").directExecutor() + .addService(AbciListenerService(topicConfig, producer)) .build() .start() ) @@ -171,6 +156,7 @@ class ABCIListenerServerTest { assertThat(reply.javaClass).isEqualTo(ListenEndBlockResponse::class.java) val consumer = TestProtoConsumer( + config = config, bootstrapServers = kafka.bootstrapServers, schemaRegistryUrl = schemaRegistry.baseUrl, topic = topicConfig.getString(ListenTopic.END_BLOCK.topic), @@ -185,8 +171,8 @@ class ABCIListenerServerTest { @Test fun listenDeliverTx(): Unit = runBlocking { grpcCleanupRule.register( - InProcessServerBuilder.forName("listenDeliverTx").directExecutor() - .addService(ABCIListenerService(topicConfig, producer)) + InProcessServerBuilder.forName("listenDeliverTx_with_schemaRegistry").directExecutor() + .addService(AbciListenerService(topicConfig, producer)) .build() .start() ) @@ -201,6 +187,7 @@ class ABCIListenerServerTest { assertThat(reply.javaClass).isEqualTo(ListenDeliverTxResponse::class.java) val consumer = TestProtoConsumer( + config = config, bootstrapServers = kafka.bootstrapServers, schemaRegistryUrl = schemaRegistry.baseUrl, topic = topicConfig.getString(ListenTopic.DELIVER_TX.topic), @@ -216,8 +203,8 @@ class ABCIListenerServerTest { @Test fun listenCommit(): Unit = runBlocking { grpcCleanupRule.register( - InProcessServerBuilder.forName("listenCommit").directExecutor() - .addService(ABCIListenerService(topicConfig, producer)) + InProcessServerBuilder.forName("listenCommit_with_schemaRegistry").directExecutor() + .addService(AbciListenerService(topicConfig, producer)) .build() .start() ) @@ -248,6 +235,7 @@ class ABCIListenerServerTest { assertThat(reply.javaClass).isEqualTo(ListenCommitResponse::class.java) val consumer = TestProtoConsumer( + config = config, bootstrapServers = kafka.bootstrapServers, schemaRegistryUrl = schemaRegistry.baseUrl, topic = topicConfig.getString(ListenTopic.COMMIT.topic), diff --git a/src/test/kotlin/io/provenance/abci/listener/AbciListenerServerWithoutSchemaRegistryTests.kt b/src/test/kotlin/io/provenance/abci/listener/AbciListenerServerWithoutSchemaRegistryTests.kt new file mode 100644 index 0000000..b98c298 --- /dev/null +++ b/src/test/kotlin/io/provenance/abci/listener/AbciListenerServerWithoutSchemaRegistryTests.kt @@ -0,0 +1,256 @@ +package io.provenance.abci.listener + +import com.google.protobuf.ByteString +import com.google.protobuf.Message +import com.google.protobuf.Timestamp +import cosmos.base.store.v1beta1.Listening.StoreKVPair +import cosmos.streaming.abci.v1.ABCIListenerServiceGrpcKt +import cosmos.streaming.abci.v1.Grpc.ListenBeginBlockRequest +import cosmos.streaming.abci.v1.Grpc.ListenBeginBlockResponse +import cosmos.streaming.abci.v1.Grpc.ListenCommitRequest +import cosmos.streaming.abci.v1.Grpc.ListenCommitResponse +import cosmos.streaming.abci.v1.Grpc.ListenDeliverTxRequest +import cosmos.streaming.abci.v1.Grpc.ListenDeliverTxResponse +import cosmos.streaming.abci.v1.Grpc.ListenEndBlockRequest +import cosmos.streaming.abci.v1.Grpc.ListenEndBlockResponse +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.junit.jupiter.api.assertDoesNotThrow +import tendermint.abci.Types +import tendermint.abci.Types.RequestBeginBlock +import tendermint.abci.Types.RequestDeliverTx +import tendermint.abci.Types.RequestEndBlock +import tendermint.abci.Types.ResponseBeginBlock +import tendermint.abci.Types.ResponseCommit +import tendermint.abci.Types.ResponseDeliverTx +import tendermint.abci.Types.ResponseEndBlock +import java.time.Instant + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class AbciListenerServerWithoutSchemaRegistryTests : BaseTests() { + + private val listenBeginBlockStub = + ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineStub( + grpcCleanupRule.register( + InProcessChannelBuilder.forName("listenBeginBlock").directExecutor().build() + ) + ) + private val listenEndBlockStub = + ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineStub( + grpcCleanupRule.register( + InProcessChannelBuilder.forName("listenEndBlock").directExecutor().build() + ) + ) + private val listenDeliverTxStub = + ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineStub( + grpcCleanupRule.register( + InProcessChannelBuilder.forName("listenDeliverTx").directExecutor().build() + ) + ) + private val listenCommitStub = + ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineStub( + grpcCleanupRule.register( + InProcessChannelBuilder.forName("listenCommit").directExecutor().build() + ) + ) + + @BeforeAll + internal fun setUpAll() { + kafka.start() + producer = TestProtoProducer(config) + .createProducer(kafka.bootstrapServers) + } + + @AfterAll + internal fun tearDownAll() { + producer.close() + kafka.stop() + } + + @Test + fun listenBeginBlock(): Unit = runBlocking { + val time = Instant.now() + + grpcCleanupRule.register( + InProcessServerBuilder.forName("listenBeginBlock").directExecutor() + .addService(AbciListenerService(topicConfig, producer)) + .build() + .start() + ) + + val reply: ListenBeginBlockResponse = listenBeginBlockStub.listenBeginBlock( + ListenBeginBlockRequest.newBuilder() + .setReq( + RequestBeginBlock.newBuilder() + .setHeader( + tendermint.types.Types.Header.newBuilder() + .setHeight(1) + .setTime( + Timestamp.newBuilder() + .setSeconds(time.epochSecond) + .setNanos(time.nano) + .build() + ) + .build() + ) + .addAllByzantineValidators(emptyList()) + .setHash(ByteString.copyFrom(byteArrayOf(1, 2, 3, 4, 5, 6, 7, 8, 9))) + .setLastCommitInfo( + Types.LastCommitInfo.newBuilder() + .setRound(1) + .addAllVotes(emptyList()) + .build() + ) + .build() + ) + .setRes( + ResponseBeginBlock.newBuilder() + .addAllEvents( + listOf( + Types.Event.newBuilder() + .setType("testEventType1") + .build() + ) + ) + .build() + ) + .build() + ) + assertThat(reply.javaClass).isEqualTo(ListenBeginBlockResponse::class.java) + + val consumer = TestProtoConsumer( + config = config, + bootstrapServers = kafka.bootstrapServers, + topic = topicConfig.getString(ListenTopic.BEGIN_BLOCK.topic) + ) + consumer.consumeAndClose() + assertThat(consumer.messages.size).isEqualTo(1) + assertThat(consumer.messages[0].value() is ByteArray) + val result = assertDoesNotThrow("ListenBeginBlockRequest.parseFrom() should not throw an exception") { + ListenBeginBlockRequest.parseFrom(consumer.messages[0].value()) + } + assertThat(result.req.header.height).isEqualTo(1) + assertThat(result.res.eventsList[0].type).isEqualTo("testEventType1") + } + + @Test + fun listenEndBlock(): Unit = runBlocking { + grpcCleanupRule.register( + InProcessServerBuilder.forName("listenEndBlock").directExecutor() + .addService(AbciListenerService(topicConfig, producer)) + .build() + .start() + ) + + val reply: ListenEndBlockResponse = listenEndBlockStub.listenEndBlock( + ListenEndBlockRequest.newBuilder() + .setReq(RequestEndBlock.newBuilder().setHeight(1).build()) + .setRes(ResponseEndBlock.newBuilder().build()) + .build() + ) + assertThat(reply.javaClass).isEqualTo(ListenEndBlockResponse::class.java) + + val consumer = TestProtoConsumer( + config = config, + bootstrapServers = kafka.bootstrapServers, + topic = topicConfig.getString(ListenTopic.END_BLOCK.topic) + ) + consumer.consumeAndClose() + assertThat(consumer.messages.size).isEqualTo(1) + assertThat(consumer.messages[0].value() is ByteArray) + val result = assertDoesNotThrow("ListenBeginBlockRequest.parseFrom() should not throw an exception") { + ListenEndBlockRequest.parseFrom(consumer.messages[0].value()) + } + assertThat(result.req.height).isEqualTo(1) + assertThat(result.res.eventsList.size).isEqualTo(0) + } + + @Test + fun listenDeliverTx(): Unit = runBlocking { + grpcCleanupRule.register( + InProcessServerBuilder.forName("listenDeliverTx").directExecutor() + .addService(AbciListenerService(topicConfig, producer)) + .build() + .start() + ) + + val reply: ListenDeliverTxResponse = listenDeliverTxStub.listenDeliverTx( + ListenDeliverTxRequest.newBuilder() + .setBlockHeight(1) + .setReq(RequestDeliverTx.newBuilder().setTx(ByteString.copyFrom("testTx1".toByteArray())).build()) + .setRes(ResponseDeliverTx.newBuilder().setCode(1).build()) + .build() + ) + assertThat(reply.javaClass).isEqualTo(ListenDeliverTxResponse::class.java) + + val consumer = TestProtoConsumer( + config = config, + bootstrapServers = kafka.bootstrapServers, + topic = topicConfig.getString(ListenTopic.DELIVER_TX.topic) + ) + consumer.consumeAndClose() + assertThat(consumer.messages.size).isEqualTo(1) + assertThat(consumer.messages[0].value() is ByteArray) + val result = assertDoesNotThrow("ListenDeliverTxRequest.parseFrom() should not throw an exception") { + ListenDeliverTxRequest.parseFrom(consumer.messages[0].value()) + } + assertThat(result.req.tx.toStringUtf8()).isEqualTo("testTx1") + assertThat(result.res.code).isEqualTo(1) + } + + @Test + fun listenCommit(): Unit = runBlocking { + grpcCleanupRule.register( + InProcessServerBuilder.forName("listenCommit").directExecutor() + .addService(AbciListenerService(topicConfig, producer)) + .build() + .start() + ) + + val changeSet = mutableListOf() + for (i in 1..2000) { + changeSet.add( + StoreKVPair.newBuilder() + .setStoreKey("mockStore1") + .setKey(ByteString.copyFrom("key$i".toByteArray())) + .setValue(ByteString.copyFrom("val$i".toByteArray())) + .setDelete(false) + .build() + ) + } + + val reply: ListenCommitResponse = listenCommitStub.listenCommit( + ListenCommitRequest.newBuilder() + .setBlockHeight(1) + .setRes( + ResponseCommit.newBuilder() + .setData(ByteString.copyFrom("data123".toByteArray())) + .build() + ) + .addAllChangeSet(changeSet) + .build() + ) + assertThat(reply.javaClass).isEqualTo(ListenCommitResponse::class.java) + + val consumer = TestProtoConsumer( + config = config, + bootstrapServers = kafka.bootstrapServers, + topic = topicConfig.getString(ListenTopic.COMMIT.topic) + ) + consumer.consumeAndClose() + assertThat(consumer.messages.size).isEqualTo(1) + assertThat(consumer.messages[0].value() is ByteArray) + val result = assertDoesNotThrow("ListenCommitRequest.parseFrom() should not throw an exception") { + ListenCommitRequest.parseFrom(consumer.messages[0].value()) + } + assertThat(result.blockHeight).isEqualTo(1) + assertThat(result.res.data.toStringUtf8()).isEqualTo("data123") + assertThat(result.changeSetList[0].storeKey).isEqualTo("mockStore1") + } +} diff --git a/src/test/kotlin/io/provenance/abci/listener/BaseTests.kt b/src/test/kotlin/io/provenance/abci/listener/BaseTests.kt new file mode 100644 index 0000000..57eb02b --- /dev/null +++ b/src/test/kotlin/io/provenance/abci/listener/BaseTests.kt @@ -0,0 +1,26 @@ +package io.provenance.abci.listener + +import com.google.protobuf.Message +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import io.grpc.testing.GrpcCleanupRule +import net.christophschubert.cp.testcontainers.CPTestContainerFactory +import net.christophschubert.cp.testcontainers.SchemaRegistryContainer +import org.apache.kafka.clients.producer.Producer +import org.junit.Rule +import org.testcontainers.containers.KafkaContainer + +open class BaseTests { + + protected val testContainerFactory: CPTestContainerFactory = CPTestContainerFactory() + protected val kafka: KafkaContainer = testContainerFactory.createKafka() + protected lateinit var schemaRegistry: SchemaRegistryContainer + + protected val config: Config = ConfigFactory.load("test.conf") + protected val topicConfig: Config = config.getConfig("kafka.producer.listen-topics") + + protected lateinit var producer: Producer + + @get:Rule + val grpcCleanupRule: GrpcCleanupRule = GrpcCleanupRule() +} diff --git a/src/test/kotlin/io/provenance/abci/listener/ByteArrayConsumer.kt b/src/test/kotlin/io/provenance/abci/listener/ByteArrayConsumer.kt new file mode 100644 index 0000000..1cee448 --- /dev/null +++ b/src/test/kotlin/io/provenance/abci/listener/ByteArrayConsumer.kt @@ -0,0 +1,6 @@ +package io.provenance.abci.listener + +import mu.KotlinLogging +import java.util.* + +private val logger = KotlinLogging.logger {} diff --git a/src/test/kotlin/io/provenance/abci/listener/TestProtoConsumer.kt b/src/test/kotlin/io/provenance/abci/listener/TestProtoConsumer.kt index 4a4391f..5461ddf 100644 --- a/src/test/kotlin/io/provenance/abci/listener/TestProtoConsumer.kt +++ b/src/test/kotlin/io/provenance/abci/listener/TestProtoConsumer.kt @@ -1,14 +1,15 @@ package io.provenance.abci.listener -import com.google.protobuf.Message import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig import mu.KotlinLogging import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.StringDeserializer import java.time.Duration import java.util.Collections import java.util.Properties @@ -27,14 +28,14 @@ private val logger = KotlinLogging.logger {} * @property valueClass the specific Protobuf value type. * @constructor Creates a ProtoConsumer object. */ -class TestProtoConsumer( +class TestProtoConsumer( + private val config: Config, private val bootstrapServers: String, - private val schemaRegistryUrl: String, + private val schemaRegistryUrl: String? = null, private val topic: String, - private val valueClass: Class + private val valueClass: Class? = null ) : TestConsumer { - private val config: Config = ConfigFactory.load() private val consumer: KafkaConsumer = KafkaConsumer(createConsumerProperties(bootstrapServers)) val messages: MutableList> = mutableListOf() @@ -50,14 +51,20 @@ class TestProtoConsumer( val props: Properties = config.getConfig("kafka.consumer.kafka-clients").toProperties() props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers!! props[ConsumerConfig.GROUP_ID_CONFIG] = "testgroup" + Random().nextInt() - props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = schemaRegistryUrl + props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - // Specifying the value parameter `V` type is not enough. We need to specify a - // specific protobuf value deserializer to be able to deserializer to a specific type. - // This is because the Cosmos Protobuf files do not include: `java_outer_classname` - // and `java_multiple_files = true` properties. - // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html#protobuf-deserializer - props[KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE] = valueClass + // When using the schema registry, use the KafkaProtobufDeserializer + if (!schemaRegistryUrl.isNullOrEmpty()) { + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaProtobufDeserializer::class.java + props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = schemaRegistryUrl + // Specifying the value parameter `V` type is not enough. We need to specify a + // specific protobuf value deserializer to be able to deserializer to a specific type. + // This is because the Cosmos Protobuf files do not include: `java_outer_classname` + // and `java_multiple_files = true` properties. + // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html#protobuf-deserializer + props[KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE] = valueClass + } return props } diff --git a/src/test/kotlin/io/provenance/abci/listener/TestProtoProducer.kt b/src/test/kotlin/io/provenance/abci/listener/TestProtoProducer.kt index 49c07b7..96a0e2e 100644 --- a/src/test/kotlin/io/provenance/abci/listener/TestProtoProducer.kt +++ b/src/test/kotlin/io/provenance/abci/listener/TestProtoProducer.kt @@ -1,11 +1,12 @@ package io.provenance.abci.listener import com.google.protobuf.Message -import com.typesafe.config.ConfigFactory +import com.typesafe.config.Config import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer import java.util.Properties /** @@ -16,8 +17,10 @@ import java.util.Properties * @property schemaRegistryUrl Confluent Schema Registry URL * @constructor Creates a [TestProtoProducer] object. */ -class TestProtoProducer(private val schemaRegistryUrl: String) : TestProducer { - private val config = ConfigFactory.load() +class TestProtoProducer( + private val config: Config, + private val schemaRegistryUrl: String? = null +) : TestProducer { /** * Specifies the [bootstrapServers] for the producer properties. @@ -26,8 +29,15 @@ class TestProtoProducer(private val schemaRegistryUrl: String) : override fun createProducerProperties(bootstrapServers: String?): Properties { val props: Properties = config.getConfig("kafka.producer.kafka-clients").toProperties() props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers!! - props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = schemaRegistryUrl + props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ProtobufSerializer::class.java props[ProducerConfig.INTERCEPTOR_CLASSES_CONFIG] = LoggingProducerInterceptor::class.qualifiedName + + // settings this property will tell the ProtobufSerializer ^^^ which serializer to use. + if (!schemaRegistryUrl.isNullOrEmpty()) { + props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = schemaRegistryUrl + } + return props } diff --git a/src/test/resources/application.conf b/src/test/resources/test.conf similarity index 88% rename from src/test/resources/application.conf rename to src/test/resources/test.conf index 21c49ee..37efa59 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/test.conf @@ -47,9 +47,6 @@ kafka { enable.auto.commit = true auto.commit.interval.ms = 1000 auto.offset.reset = earliest -// session.timout.ms = 30000 - key.deserializer=org.apache.kafka.common.serialization.StringDeserializer - value.deserializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer } } } \ No newline at end of file