Skip to content

Commit

Permalink
Deafault to ByteArraySerializer when schema.registry.url is not set (#38
Browse files Browse the repository at this point in the history
)

* serialize to byte when no schema registry is present

* update to distribute only tar

* update changelog

* ktlint fixes

* use unique names for registered endpoints

* ktlint
  • Loading branch information
egaxhaj authored Feb 24, 2023
1 parent 58ea539 commit e447fbb
Show file tree
Hide file tree
Showing 17 changed files with 488 additions and 84 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
- name: Build artifacts
uses: gradle/[email protected]
with:
arguments: assembleDist
arguments: distTar
- name: Upload artifacts
uses: ncipollo/release-action@v1
with:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

* (docs/ci) [PR 32](https://github.com/provenance-io/provenance-abci-listener/pull/31) Update deploy doc and scripts and mark pre-releases
* (docs) [PR 33](https://github.com/provenance-io/provenance-abci-listener/pull/33) Fix typos in deploy steps
* [PR 38](https://github.com/provenance-io/provenance-abci-listener/pull/38) Default to ByteArraySerializer when `schema.registry.url` property is not set

---

Expand Down
12 changes: 7 additions & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ group = "io.provenance"
version = System.getenv("VERSION") ?: "0-SNAPSHOT"

application {
mainClass.set("io.provenance.abci.listener.ABCIListenerServerKt")
mainClass.set("io.provenance.abci.listener.AbciListenerServerKt")
}

repositories {
Expand Down Expand Up @@ -84,16 +84,18 @@ tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf(
"-Xjsr305=strict",
"-Xopt-in=kotlin.RequiresOptIn"
"-opt-in=kotlin.RequiresOptIn"
)
jvmTarget = "11"
languageVersion = "1.7"
apiVersion = "1.7"
}
}

tasks.assembleDist {
finalizedBy("checksumDist") // checksums are generated after assembleDist runs
tasks.distTar {
compression = Compression.GZIP
archiveExtension.set("tar.gz")
finalizedBy("checksumDist")
}

tasks.register<Checksum>("checksumDist") {
Expand All @@ -103,7 +105,7 @@ tasks.register<Checksum>("checksumDist") {
checksumAlgorithm.set(Checksum.Algorithm.MD5)
appendFileNameToChecksum.set(true)

dependsOn(tasks.assembleDist)
dependsOn(tasks.distTar)
}

tasks.withType<Test> {
Expand Down
13 changes: 7 additions & 6 deletions scripts/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#
# Download plugin distribution and extract the plugin.
# In addition, validate the md5 checksum of the zip file.
# In addition, validate the md5 checksum of the file.
#

usage() {
Expand All @@ -24,21 +24,22 @@ TAG=$1
PLUGINS_HOME="$PIO_HOME/plugins"
PLUGIN_NAME=provenance-abci-listener
PLUGIN_DIR="$PLUGINS_HOME/$PLUGIN_NAME-$TAG"
RELEASE_URL="https://github.com/provenance-io/provenance-abci-listener/releases/download/$TAG/provenance-abci-listener-$TAG.zip"
RELEASE_URL="https://github.com/provenance-io/provenance-abci-listener/releases/download/$TAG/provenance-abci-listener-$TAG.tar.gz"

[[ -z "$TAG" ]] && usage;

echo "Release: $TAG"

# download release distribution
echo "Downloading release..."
curl -s --create-dirs -o "$PLUGIN_DIR.zip" -L "$RELEASE_URL"
curl -s --create-dirs -o "$PLUGIN_DIR.tar.gz" -L "$RELEASE_URL"

# validate md5 checksum
echo "Validating release (md5)..."
curl -s --create-dirs -o "$PLUGIN_DIR.zip.md5" -L "$RELEASE_URL.md5"
curl -s --create-dirs -o "$PLUGIN_DIR.tar.gz.md5" -L "$RELEASE_URL.md5"
cd "$PLUGINS_HOME" || exit 1
md5sum -c "$PLUGIN_DIR.zip.md5" || exit 1
md5sum -c "$PLUGIN_DIR.tar.gz.md5" || exit 1

echo "Extracting release..."
unzip -qq "$PLUGIN_DIR.zip" -d "$PLUGINS_HOME"
tar -zxf "$PLUGIN_DIR.tar.gz" -C "$PLUGINS_HOME"
rm "$PLUGIN_DIR.tar.gz"
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,41 @@ import io.grpc.health.v1.HealthCheckResponse.ServingStatus
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
import io.grpc.protobuf.services.HealthStatusManager
import io.grpc.protobuf.services.ProtoReflectionService
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import java.net.InetSocketAddress
import java.time.Duration
import java.util.concurrent.TimeUnit

/**
* The [ABCIListenerServer] starts the gRPC server for the [ABCIListenerService].
* In addition to the [ABCIListenerService], a health service (managed by [HealthStatusManager])
* The [ABCIListenerServer] starts the gRPC server for the [AbciListenerService].
* In addition to the [AbciListenerService], a health service (managed by [HealthStatusManager])
* and a [ProtoReflectionService] are also added.
*
* The server is also responsible for initializing and closing the Kafka [Producer]
* used by the [ABCIListenerService].
* used by the [AbciListenerService].
*/
class ABCIListenerServer {
class AbciListenerServer {
private val config: Config = ConfigFactory.load()
private val inet = InetSocketAddress(
config.getString("grpc.server.addr"),
config.getInt("grpc.server.port")
)

// Kafka producer
private var topicConfig: Config = config.getConfig("kafka.producer.listen-topics")
private var kafkaConfig: Config = config.getConfig("kafka.producer.kafka-clients")
private val producer: Producer<String, Message> = KafkaProducer(kafkaConfig.toProperties())
private var producerConfig: Config = config.getConfig("kafka.producer")
private var topicConfig: Config = producerConfig.getConfig("listen-topics")
private val producer = ProducerSettings<String, Message>(producerConfig).createKafkaProducer()

private val health: HealthStatusManager = HealthStatusManager()
val server: Server = NettyServerBuilder
private val server: Server = NettyServerBuilder
.forAddress(inet)
.directExecutor()
.addService(health.healthService)
.addService(ProtoReflectionService.newInstance())
.addService(ABCIListenerService(topicConfig, producer))
.addService(AbciListenerService(topicConfig, producer))
.build()

/** Start the [ABCIListenerService] gRPC server. */
/** Start the [AbciListenerService] gRPC server. */
fun start() {
server.start()

Expand All @@ -57,13 +56,13 @@ class ABCIListenerServer {
Runtime.getRuntime().addShutdownHook(
Thread {
println("*** shutting down gRPC server since JVM is shutting down")
this@ABCIListenerServer.stop()
this@AbciListenerServer.stop()
println("*** server shut down")
}
)
}

/** Stop the [ABCIListenerService] gRPC server. */
/** Stop the [AbciListenerService] gRPC server. */
private fun stop() {
// shutdown producer
producer.close(Duration.ofMillis(3000))
Expand Down Expand Up @@ -98,7 +97,7 @@ class ABCIListenerServer {
}

fun main() {
val server = ABCIListenerServer()
val server = AbciListenerServer()
server.start()
server.blockUntilShutdown()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord

/**
* Producer topic names for the [ABCIListenerService]
* Producer topic names for the [AbciListenerService]
*/
enum class ListenTopic(val topic: String) {
BEGIN_BLOCK("listen-begin-block"),
Expand All @@ -31,7 +31,7 @@ enum class ListenTopic(val topic: String) {
* @property producer the Kafka Protobuf [Producer].
* @constructor Creates a gRPC ABCI listener service.
*/
class ABCIListenerService(
class AbciListenerService(
private val topicConfig: Config,
private val producer: Producer<String, Message>
) : ABCIListenerServiceGrpcKt.ABCIListenerServiceCoroutineImplBase() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/io/provenance/abci/listener/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fun Config.toProperties(): Properties {
/**
* Add a [dispatch] method to the Kafka [Producer] to produce messages
* in a non-blocking manner and `await` for acknowledgement from broker
* before responding on gRPC endpoints in [ABCIListenerService].
* before responding on gRPC endpoints in [AbciListenerService].
*
* Resumes with a [StatusException] when an exception is encountered.
*/
Expand Down
33 changes: 33 additions & 0 deletions src/main/kotlin/io/provenance/abci/listener/ProducerSettings.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.provenance.abci.listener

import com.typesafe.config.Config
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.Serializer

class ProducerSettings<K, V>(
private val config: Config,
private val keySerializer: Serializer<K>? = null,
private val valueSerializer: Serializer<V>? = null
) {

fun createKafkaProducer(): Producer<K, V> {
val properties = config.getConfig("kafka-clients").toProperties()
require(
keySerializer != null ||
properties.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).isNotEmpty()
) {
"Key serializer should be defined or declared in configuration"
}

require(
valueSerializer != null ||
properties.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).isNotEmpty()
) {
"Value serializer should be defined or declared in configuration"
}

return KafkaProducer(properties)
}
}
78 changes: 78 additions & 0 deletions src/main/kotlin/io/provenance/abci/listener/ProtobufSerializer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.provenance.abci.listener

import com.google.protobuf.Message
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.Serializer
import java.io.ByteArrayOutputStream

/**
* ProtobufSerializer is a custom Protobuf serializer for Kafka
* that chooses a serialization path based on whether the application
* is configured to use the Confluent Schema Registry or not.
*
* When the application is configured to use the Confluent Schema Registry
* the [KafkaProtobufSerializer] will be used. Otherwise, the [ByteArraySerializer] will be used.
*
* In order to enable the application to use the Confluent Schema Registry,
* you MUST set the following properties:
*
* `schema.registry.url={{ SR_URL }}`
* `value.serializer=io.provenance.abci.listener.ProtobufSerializer`.
*
* Example configuration:
*
* ```
* # application.conf
*
* # application configuration properties
* ...
*
* kafka.producer {
* ...
* # Properties defined by org.apache.kafka.clients.producer.ProducerConfig.
* # can be defined in this configuration section.
* kafka-clients {
* bootstrap.servers = "localhost:9092"
* // other producer properties
* ...
* key.serializer = org.apache.kafka.common.serialization.StringSerializer
* value.serializer = io.provenance.abci.listener.ProtobufSerializer
* schema.registry.url = "http://127.0.0.1:8081"
* }
* }
* ```
* For additional producer properties see [org.apache.kafka.clients.producer.ProducerConfig]
*
*/
class ProtobufSerializer<T : Message> : Serializer<T> {
private lateinit var serializer: Serializer<T>

override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
val useSchemaRegistry = configs!!.containsKey(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)
if (useSchemaRegistry) {
val serializer = KafkaProtobufSerializer<T>()
serializer.configure(configs, isKey)
this.serializer = serializer
} else {
val serializer = ByteArraySerializer()
@Suppress("UNCHECKED_CAST")
this.serializer = serializer as Serializer<T>
}
}

override fun serialize(topic: String?, data: T): ByteArray {
var bytes: ByteArray = byteArrayOf()
when (serializer) {
is KafkaProtobufSerializer -> bytes = serializer.serialize(topic, data)
is ByteArraySerializer -> {
val out = ByteArrayOutputStream()
data.writeTo(out)
bytes = out.toByteArray()
out.close()
}
}
return bytes
}
}
8 changes: 4 additions & 4 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ grpc.server {

# Kafka producer config
kafka.producer {
# Assign a topic name and an optional prefix where events will be written.
# Assign a topic name and optional prefix where events will be written.
listen-topics {
prefix = "local-"
prefix = "bytetest-"
listen-begin-block = ${?kafka.producer.listen-topics.prefix}"listen-begin-block"
listen-end-block = ${?kafka.producer.listen-topics.prefix}"listen-end-block"
listen-deliver-tx = ${?kafka.producer.listen-topics.prefix}"listen-deliver-tx"
Expand All @@ -25,7 +25,7 @@ kafka.producer {
linger.ms = 50
max.request.size = 204857600
key.serializer = org.apache.kafka.common.serialization.StringSerializer
value.serializer = io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
schema.registry.url = "http://127.0.0.1:8081"
value.serializer = io.provenance.abci.listener.ProtobufSerializer
// schema.registry.url = "http://127.0.0.1:8081"
}
}
Loading

0 comments on commit e447fbb

Please sign in to comment.