Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deafault to ByteArraySerializer when schema.registry.url is not set #38

Merged
merged 6 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ktlint fixes
  • Loading branch information
Ergels Gaxhaj committed Feb 24, 2023
commit 82eff2ddf72ceb1941c02c0b613e08659272f87d
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@ import org.apache.kafka.common.serialization.Serializer
class ProducerSettings<K, V>(
private val config: Config,
private val keySerializer: Serializer<K>? = null,
private val valueSerializer: Serializer<V>? = null,
private val valueSerializer: Serializer<V>? = null
) {

fun createKafkaProducer(): Producer<K, V> {
val properties = config.getConfig("kafka-clients").toProperties()
require(
keySerializer != null ||
properties.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).isNotEmpty()) {
properties.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).isNotEmpty()
) {
"Key serializer should be defined or declared in configuration"
}

require(
valueSerializer != null ||
properties.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).isNotEmpty()) {
properties.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).isNotEmpty()
) {
"Value serializer should be defined or declared in configuration"
}

return KafkaProducer(properties)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package io.provenance.abci.listener
import com.google.protobuf.Message
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
import java.io.ByteArrayOutputStream
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.Serializer
import java.io.ByteArrayOutputStream

/**
* ProtobufSerializer is a custom Protobuf serializer for Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.google.protobuf.ByteString
import com.google.protobuf.Message
import com.google.protobuf.Timestamp
import cosmos.base.store.v1beta1.Listening.StoreKVPair
import cosmos.streaming.abci.v1.ABCIListenerServiceGrpcKt
import cosmos.streaming.abci.v1.Grpc.ListenBeginBlockRequest
import cosmos.streaming.abci.v1.Grpc.ListenBeginBlockResponse
import cosmos.streaming.abci.v1.Grpc.ListenCommitRequest
Expand All @@ -13,10 +12,11 @@ import cosmos.streaming.abci.v1.Grpc.ListenDeliverTxRequest
import cosmos.streaming.abci.v1.Grpc.ListenDeliverTxResponse
import cosmos.streaming.abci.v1.Grpc.ListenEndBlockRequest
import cosmos.streaming.abci.v1.Grpc.ListenEndBlockResponse
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import tendermint.abci.Types
Expand All @@ -28,9 +28,6 @@ import tendermint.abci.Types.ResponseCommit
import tendermint.abci.Types.ResponseDeliverTx
import tendermint.abci.Types.ResponseEndBlock
import java.time.Instant
import org.apache.kafka.clients.producer.Producer
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class AbciListenerServerWithSchemaRegistryTests : BaseTests() {
Expand All @@ -40,7 +37,7 @@ class AbciListenerServerWithSchemaRegistryTests : BaseTests() {
schemaRegistry = testContainerFactory.createSchemaRegistry(kafka)
schemaRegistry.start()
producer = TestProtoProducer<String, Message>(config, schemaRegistry.baseUrl)
.createProducer(kafka.bootstrapServers)
.createProducer(kafka.bootstrapServers)
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import cosmos.streaming.abci.v1.Grpc.ListenEndBlockResponse
import io.grpc.inprocess.InProcessServerBuilder
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertDoesNotThrow
import tendermint.abci.Types
import tendermint.abci.Types.RequestBeginBlock
import tendermint.abci.Types.RequestDeliverTx
Expand All @@ -26,9 +29,6 @@ import tendermint.abci.Types.ResponseCommit
import tendermint.abci.Types.ResponseDeliverTx
import tendermint.abci.Types.ResponseEndBlock
import java.time.Instant
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.assertDoesNotThrow

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class AbciListenerServerWithoutSchemaRegistryTests : BaseTests() {
Expand All @@ -37,7 +37,7 @@ class AbciListenerServerWithoutSchemaRegistryTests : BaseTests() {
internal fun setUpAll() {
kafka.start()
producer = TestProtoProducer<String, Message>(config)
.createProducer(kafka.bootstrapServers)
.createProducer(kafka.bootstrapServers)
}

@AfterAll
Expand Down Expand Up @@ -100,7 +100,7 @@ class AbciListenerServerWithoutSchemaRegistryTests : BaseTests() {
val consumer = TestProtoConsumer<String, ByteArray>(
config = config,
bootstrapServers = kafka.bootstrapServers,
topic = topicConfig.getString(ListenTopic.BEGIN_BLOCK.topic),
topic = topicConfig.getString(ListenTopic.BEGIN_BLOCK.topic)
)
consumer.consumeAndClose()
assertThat(consumer.messages.size).isEqualTo(1)
Expand Down Expand Up @@ -132,7 +132,7 @@ class AbciListenerServerWithoutSchemaRegistryTests : BaseTests() {
val consumer = TestProtoConsumer<String, ByteArray>(
config = config,
bootstrapServers = kafka.bootstrapServers,
topic = topicConfig.getString(ListenTopic.END_BLOCK.topic),
topic = topicConfig.getString(ListenTopic.END_BLOCK.topic)
)
consumer.consumeAndClose()
assertThat(consumer.messages.size).isEqualTo(1)
Expand Down Expand Up @@ -165,7 +165,7 @@ class AbciListenerServerWithoutSchemaRegistryTests : BaseTests() {
val consumer = TestProtoConsumer<String, ByteArray>(
config = config,
bootstrapServers = kafka.bootstrapServers,
topic = topicConfig.getString(ListenTopic.DELIVER_TX.topic),
topic = topicConfig.getString(ListenTopic.DELIVER_TX.topic)
)
consumer.consumeAndClose()
assertThat(consumer.messages.size).isEqualTo(1)
Expand Down Expand Up @@ -214,7 +214,7 @@ class AbciListenerServerWithoutSchemaRegistryTests : BaseTests() {
val consumer = TestProtoConsumer<String, ByteArray>(
config = config,
bootstrapServers = kafka.bootstrapServers,
topic = topicConfig.getString(ListenTopic.COMMIT.topic),
topic = topicConfig.getString(ListenTopic.COMMIT.topic)
)
consumer.consumeAndClose()
assertThat(consumer.messages.size).isEqualTo(1)
Expand Down
3 changes: 1 addition & 2 deletions src/test/kotlin/io/provenance/abci/listener/BaseTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,4 @@ open class BaseTests {
InProcessChannelBuilder.forName("listenCommit").directExecutor().build()
)
)

}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
package io.provenance.abci.listener

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import java.time.Duration
import java.util.*
import mu.KotlinLogging
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import java.util.*

private val logger = KotlinLogging.logger {}

Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import mu.KotlinLogging
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.Collections
import java.util.Properties
import java.util.Random
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer

private val logger = KotlinLogging.logger {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package io.provenance.abci.listener

import com.google.protobuf.Message
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.Properties
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties

/**
* Produce Protobuf messages to Kafka.
Expand All @@ -35,8 +34,9 @@ class TestProtoProducer<K, V : Message>(
props[ProducerConfig.INTERCEPTOR_CLASSES_CONFIG] = LoggingProducerInterceptor::class.qualifiedName

// settings this property will tell the ProtobufSerializer ^^^ which serializer to use.
if (!schemaRegistryUrl.isNullOrEmpty())
if (!schemaRegistryUrl.isNullOrEmpty()) {
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = schemaRegistryUrl
}

return props
}
Expand Down