From c8522390c98163c5e6071e38b8f9ce7e61d90706 Mon Sep 17 00:00:00 2001 From: Laurent Broudoux Date: Fri, 4 Feb 2022 10:03:36 +0100 Subject: [PATCH 1/4] feat: Add schema registry infos to Kafka binding --- kafka/README.md | 46 +++++++++++++++++++++++-------- kafka/json_schemas/operation.json | 19 +++++++++++-- kafka/json_schemas/server.json | 37 +++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 14 deletions(-) create mode 100644 kafka/json_schemas/server.json diff --git a/kafka/README.md b/kafka/README.md index adc5606e..d93b6d84 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -6,16 +6,34 @@ This document defines how to describe Kafka-specific information on AsyncAPI. ## Version -Current version is `0.2.0`. +Current version is `0.3.0`. ## Server Binding Object -This object MUST NOT contain any properties. Its name is reserved for future use. +This object contains information about the server representation in Kafka. +##### Fixed Fields +Field Name | Type | Description | Applicability [default] | Constraints +---|:---:|:---:|:---:|--- +`schemaRegistryUrl` | string (url) | API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used) | OPTIONAL | - +`schemaRegistryVendor` | string | The vendor of Schema Registry and Kafka serdes library that should be used (e.g. `apicurio`, `confluent`, `ibm`, or `karapace`) | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified +`bindingVersion` | string | The version of this binding. | OPTIONAL [`latest`] + +##### Example + +```yaml +servers: + production: + bindings: + kafka: + schemaRegistryUrl: 'https://my-schema-registry.com' + schemaRegistryVendor: 'confluent' + bindingVersion: '0.3.0' +``` @@ -33,11 +51,14 @@ This object contains information about the operation representation in Kafka. ##### Fixed Fields -Field Name | Type | Description ----|:---:|--- -`groupId` | [Schema Object][schemaObject] | Id of the consumer group. -`clientId` | [Schema Object][schemaObject] | Id of the consumer inside a consumer group. -`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. +Field Name | Type | Description | Applicability [default] | Constraints +---|:---:|:---:|:---:|--- +`groupId` | [Schema Object][schemaObject] | Id of the consumer group. | OPTIONAL | - +`clientId` | [Schema Object][schemaObject] | Id of the consumer inside a consumer group. | OPTIONAL | - +`schemaIdLocation` | string | If a Schema Registry is used when performing this operation, tells where the id of schema is stored (e.g. `header` or `payload`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level +`schemaIdPayloadEncoding` | string | Number of bytes or vendor specific values when schema id is encoded in payload (e.g `confluent`/ `apicurio-legacy` / `apicurio-new`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level +`schemaLookupStrategy` | string | Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied. | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level +`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [`latest`] | - This object MUST contain only the properties defined above. @@ -55,7 +76,10 @@ channels: clientId: type: string enum: ['myClientId'] - bindingVersion: '0.1.0' + schemaIdLocation: 'payload' + schemaIdPayloadEncoding: 'apicurio-new' + schemaLookupStrategy: 'TopicIdStrategy' + bindingVersion: '0.3.0' ``` @@ -69,7 +93,7 @@ This object contains information about the message representation in Kafka. Field Name | Type | Description ---|:---:|--- -`key` | [Schema Object][schemaObject] \| [AVRO Schema Object](https://avro.apache.org/docs/current/spec.html) | The message key. **NOTE**: You can also use the [reference object](https://asyncapi.io/docs/specifications/v2.1.0#referenceObject) way. +`key` | [Schema Object][schemaObject] \| [AVRO Schema Object](https://avro.apache.org/docs/current/spec.html) | The message key. **NOTE**: You can also use the [reference object](https://asyncapi.io/docs/specifications/v2.1.0#referenceObject) way. `bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. This object MUST contain only the properties defined above. @@ -85,7 +109,7 @@ channels: key: type: string enum: ['myKey'] - bindingVersion: '0.1.0' + bindingVersion: '0.3.0' ``` -[schemaObject]: https://www.asyncapi.com/docs/specifications/2.0.0/#schemaObject +[schemaObject]: https://www.asyncapi.com/docs/specifications/2.1.0/#schemaObject diff --git a/kafka/json_schemas/operation.json b/kafka/json_schemas/operation.json index 500744ca..0a6cc4fb 100644 --- a/kafka/json_schemas/operation.json +++ b/kafka/json_schemas/operation.json @@ -7,18 +7,31 @@ "additionalProperties": false, "patternProperties": { "^x-[\\w\\d\\.\\-\\_]+$": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/specificationExtension" + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.9.0/schemas/2.1.0.json#/definitions/specificationExtension" } }, "properties": { "groupId": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/schema", + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.9.0/schemas/2.1.0.json#/definitions/schema", "description": "Id of the consumer group." }, "clientId": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/schema", + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.9.0/schemas/2.1.0.json#/definitions/schema", "description": "Id of the consumer inside a consumer group." }, + "schemaIdLocation": { + "type": "string", + "description": "If a Schema Registry is used when performing this operation, tells where the id of schema is stored." + "enum": ["header", "payload"] + }, + "schemaIdPayloadEncoding": { + "type": "string", + "description": "Number of bytes or vendor specific values when schema id is encoded in payload." + }, + "schemaLookupStrategy": { + "type": "string", + "description": "Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied." + }, "bindingVersion": { "type": "string", "enum": [ diff --git a/kafka/json_schemas/server.json b/kafka/json_schemas/server.json new file mode 100644 index 00000000..568fd377 --- /dev/null +++ b/kafka/json_schemas/server.json @@ -0,0 +1,37 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://asyncapi.com/bindings/ibmmq/server.json", + "title": "Server Schema", + "description": "This object contains server connection information to a Kafka broker. This object contains additional information not possible to represent within the core AsyncAPI specification.", + "type": "object", + "additionalProperties": false, + "patternProperties": { + "^x-[\\w\\d\\.\\-\\_]+$": { + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.9.0/schemas/2.1.0.json#/definitions/specificationExtension" + } + }, + "properties": { + "schemaRegistryUrl": { + "type": "string", + "description": "API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used)." + }, + "schemaRegistryVendor": { + "type": "string", + "description": "The vendor of the Schema Registry and Kafka serdes library that should be used." + }, + "bindingVersion": { + "type": "string", + "enum": [ + "0.3.0" + ], + "description": "The version of this binding." + } + }, + "examples": [ + { + "schemaRegistryUrl": "https://my-schema-registry.com", + "schemaRegistryVendor": "confluent", + "bindingVersion": "0.3.0" + } + ] +} From 7ea168fd624f4c4b19da436b552424504b5e2335 Mon Sep 17 00:00:00 2001 From: Laurent Broudoux Date: Tue, 30 Aug 2022 10:55:00 +0200 Subject: [PATCH 2/4] feat: add schema registry infos to Kafka binding - update to latest spec --- kafka/README.md | 4 ++-- kafka/json_schemas/message.json | 10 +++++----- kafka/json_schemas/operation.json | 12 ++++++------ kafka/json_schemas/server.json | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/kafka/README.md b/kafka/README.md index d93b6d84..083169c1 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -93,7 +93,7 @@ This object contains information about the message representation in Kafka. Field Name | Type | Description ---|:---:|--- -`key` | [Schema Object][schemaObject] \| [AVRO Schema Object](https://avro.apache.org/docs/current/spec.html) | The message key. **NOTE**: You can also use the [reference object](https://asyncapi.io/docs/specifications/v2.1.0#referenceObject) way. +`key` | [Schema Object][schemaObject] \| [AVRO Schema Object](https://avro.apache.org/docs/current/spec.html) | The message key. **NOTE**: You can also use the [reference object](https://asyncapi.io/docs/specifications/v2.4.0#referenceObject) way. `bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. This object MUST contain only the properties defined above. @@ -112,4 +112,4 @@ channels: bindingVersion: '0.3.0' ``` -[schemaObject]: https://www.asyncapi.com/docs/specifications/2.1.0/#schemaObject +[schemaObject]: https://www.asyncapi.com/docs/specifications/2.4.0/#schemaObject diff --git a/kafka/json_schemas/message.json b/kafka/json_schemas/message.json index 701beaa4..8abdda7c 100644 --- a/kafka/json_schemas/message.json +++ b/kafka/json_schemas/message.json @@ -6,18 +6,18 @@ "additionalProperties": false, "patternProperties": { "^x-[\\w\\d\\.\\-\\_]+$": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/specificationExtension" + "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension" } }, "properties": { "key": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/schema", + "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.14.0/schemas/2.4.0.json#/definitions/schema", "description": "The message key." }, "bindingVersion": { "type": "string", "enum": [ - "0.1.0" + "0.3.0" ], "description": "The version of this binding. If omitted, 'latest' MUST be assumed." } @@ -30,13 +30,13 @@ "myKey" ] }, - "bindingVersion": "0.1.0" + "bindingVersion": "0.3.0" }, { "key": { "$ref": "path/to/user-create.avsc#/UserCreate" }, - "bindingVersion": "0.2.0" + "bindingVersion": "0.3.0" } ] } diff --git a/kafka/json_schemas/operation.json b/kafka/json_schemas/operation.json index 0a6cc4fb..db496416 100644 --- a/kafka/json_schemas/operation.json +++ b/kafka/json_schemas/operation.json @@ -7,21 +7,21 @@ "additionalProperties": false, "patternProperties": { "^x-[\\w\\d\\.\\-\\_]+$": { - "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.9.0/schemas/2.1.0.json#/definitions/specificationExtension" + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension" } }, "properties": { "groupId": { - "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.9.0/schemas/2.1.0.json#/definitions/schema", + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/schema", "description": "Id of the consumer group." }, "clientId": { - "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.9.0/schemas/2.1.0.json#/definitions/schema", + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/schema", "description": "Id of the consumer inside a consumer group." }, "schemaIdLocation": { "type": "string", - "description": "If a Schema Registry is used when performing this operation, tells where the id of schema is stored." + "description": "If a Schema Registry is used when performing this operation, tells where the id of schema is stored.", "enum": ["header", "payload"] }, "schemaIdPayloadEncoding": { @@ -35,7 +35,7 @@ "bindingVersion": { "type": "string", "enum": [ - "0.1.0" + "0.3.0" ], "description": "The version of this binding. If omitted, 'latest' MUST be assumed." } @@ -55,7 +55,7 @@ "myClientId" ] }, - "bindingVersion": "0.1.0" + "bindingVersion": "0.3.0" } ] } diff --git a/kafka/json_schemas/server.json b/kafka/json_schemas/server.json index 568fd377..f29e3d33 100644 --- a/kafka/json_schemas/server.json +++ b/kafka/json_schemas/server.json @@ -1,13 +1,13 @@ { "$schema": "http://json-schema.org/draft-07/schema#", - "$id": "http://asyncapi.com/bindings/ibmmq/server.json", + "$id": "http://asyncapi.com/bindings/kafka/server.json", "title": "Server Schema", "description": "This object contains server connection information to a Kafka broker. This object contains additional information not possible to represent within the core AsyncAPI specification.", "type": "object", "additionalProperties": false, "patternProperties": { "^x-[\\w\\d\\.\\-\\_]+$": { - "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.9.0/schemas/2.1.0.json#/definitions/specificationExtension" + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension" } }, "properties": { From 7d43d72a3c5efb0148337caedfa5c17664a8ee8f Mon Sep 17 00:00:00 2001 From: Laurent Broudoux Date: Tue, 13 Sep 2022 21:54:25 +0200 Subject: [PATCH 3/4] feat: Move schema encoding to message and Add topic, partitions and replicas at the channel level --- kafka/README.md | 61 ++++++++++++++++++++++++++----- kafka/json_schemas/channel.json | 46 +++++++++++++++++++++++ kafka/json_schemas/message.json | 19 ++++++++++ kafka/json_schemas/operation.json | 13 ------- 4 files changed, 117 insertions(+), 22 deletions(-) create mode 100644 kafka/json_schemas/channel.json diff --git a/kafka/README.md b/kafka/README.md index 083169c1..6efcf74e 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -40,14 +40,39 @@ servers: ## Channel Binding Object -This object MUST NOT contain any properties. Its name is reserved for future use. +This object contains information about the channel representation in Kafka (eg. a Kafka topic). +##### Fixed Fields + +Field Name | Type | Description | Applicability [default] | Constraints +---|:---:|:---:|:---:|--- +`topic` | string | Kafka topic name if different from channel name. | OPTIONAL | - +`partitions` | integer | Number of partitions configured on this topic (useful to know how many parallel consumers you may run). | OPTIONAL | Must be positive +`replicas` | integer | Number of replicas configured on this topic. | OPTIONAL | MUST be positive +`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [`latest`] | - + +This object MUST contain only the properties defined above. + +##### Example + +This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier. + +```yaml +channels: + user-signedup: + bindings: + kafka: + topic: 'my-specific-topic-name' + partitions: 20 + replicas: 3 + bindingVersion: '0.3.0' +``` ## Operation Binding Object -This object contains information about the operation representation in Kafka. +This object contains information about the operation representation in Kafka (eg. the way to consume messages) ##### Fixed Fields @@ -55,9 +80,6 @@ Field Name | Type | Description | Applicability [default] | Constraints ---|:---:|:---:|:---:|--- `groupId` | [Schema Object][schemaObject] | Id of the consumer group. | OPTIONAL | - `clientId` | [Schema Object][schemaObject] | Id of the consumer inside a consumer group. | OPTIONAL | - -`schemaIdLocation` | string | If a Schema Registry is used when performing this operation, tells where the id of schema is stored (e.g. `header` or `payload`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level -`schemaIdPayloadEncoding` | string | Number of bytes or vendor specific values when schema id is encoded in payload (e.g `confluent`/ `apicurio-legacy` / `apicurio-new`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level -`schemaLookupStrategy` | string | Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied. | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level `bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [`latest`] | - This object MUST contain only the properties defined above. @@ -67,7 +89,7 @@ This object MUST contain only the properties defined above. ```yaml channels: user-signedup: - publish: + subscribe: bindings: kafka: groupId: @@ -76,9 +98,6 @@ channels: clientId: type: string enum: ['myClientId'] - schemaIdLocation: 'payload' - schemaIdPayloadEncoding: 'apicurio-new' - schemaLookupStrategy: 'TopicIdStrategy' bindingVersion: '0.3.0' ``` @@ -94,10 +113,31 @@ This object contains information about the message representation in Kafka. Field Name | Type | Description ---|:---:|--- `key` | [Schema Object][schemaObject] \| [AVRO Schema Object](https://avro.apache.org/docs/current/spec.html) | The message key. **NOTE**: You can also use the [reference object](https://asyncapi.io/docs/specifications/v2.4.0#referenceObject) way. +`schemaIdLocation` | string | If a Schema Registry is used when performing this operation, tells where the id of schema is stored (e.g. `header` or `payload`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level +`schemaIdPayloadEncoding` | string | Number of bytes or vendor specific values when schema id is encoded in payload (e.g `confluent`/ `apicurio-legacy` / `apicurio-new`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level +`schemaLookupStrategy` | string | Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied. | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level `bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. This object MUST contain only the properties defined above. +This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier. + +```yaml +channels: + test: + publish: + message: + bindings: + kafka: + key: + type: string + enum: ['myKey'] + schemaIdLocation: 'payload' + schemaIdPayloadEncoding: '4' + bindingVersion: '0.3.0' +``` + +This is another example that describes the use if Apicurio schema registry. We describe the `apicurio-new` way of serializing without details on how it's implemented. We reference a [specific lookup strategy](https://www.apicur.io/registry/docs/apicurio-registry/2.2.x/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-concepts-strategy_registry) that may be used to retrieve schema Id from registry during serialization. ```yaml channels: @@ -109,6 +149,9 @@ channels: key: type: string enum: ['myKey'] + schemaIdLocation: 'payload' + schemaIdPayloadEncoding: 'apicurio-new' + schemaLookupStrategy: 'TopicIdStrategy' bindingVersion: '0.3.0' ``` diff --git a/kafka/json_schemas/channel.json b/kafka/json_schemas/channel.json new file mode 100644 index 00000000..0e0aa33e --- /dev/null +++ b/kafka/json_schemas/channel.json @@ -0,0 +1,46 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://asyncapi.com/bindings/kafka/channel.json", + "title": "Channel Schema", + "description": "This object contains information about the channel representation in Kafka.", + "type": "object", + "additionalProperties": false, + "patternProperties": { + "^x-[\\w\\d\\.\\-\\_]+$": { + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension" + } + }, + "properties": { + "topic": { + "type": "string", + "description": "Kafka topic name if different from channel name." + }, + "partition": { + "type": "integer", + "minimum": 1, + "description": "Number of partitions configured on this topic." + }, + "replicas": { + "type": "integer", + "minimum": 1, + "description": "Number of replicas configured on this topic." + }, + "bindingVersion": { + "type": "string", + "enum": [ + "0.3.0" + ], + "description": "The version of this binding. If omitted, 'latest' MUST be assumed." + } + + }, + "examples": [ + { + "topic": "my-specific-topic", + "partitions": 20, + "replicas": 3, + "bindingVersion": "0.3.0" + } + ] + } + \ No newline at end of file diff --git a/kafka/json_schemas/message.json b/kafka/json_schemas/message.json index 8abdda7c..f1428c9c 100644 --- a/kafka/json_schemas/message.json +++ b/kafka/json_schemas/message.json @@ -14,6 +14,19 @@ "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.14.0/schemas/2.4.0.json#/definitions/schema", "description": "The message key." }, + "schemaIdLocation": { + "type": "string", + "description": "If a Schema Registry is used when performing this operation, tells where the id of schema is stored.", + "enum": ["header", "payload"] + }, + "schemaIdPayloadEncoding": { + "type": "string", + "description": "Number of bytes or vendor specific values when schema id is encoded in payload." + }, + "schemaLookupStrategy": { + "type": "string", + "description": "Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied." + }, "bindingVersion": { "type": "string", "enum": [ @@ -21,6 +34,7 @@ ], "description": "The version of this binding. If omitted, 'latest' MUST be assumed." } + }, "examples": [ { @@ -30,12 +44,17 @@ "myKey" ] }, + "schemaIdLocation": "payload", + "schemaIdPayloadEncoding": "apicurio-new", + "schemaLookupStrategy": "TopicIdStrategy", "bindingVersion": "0.3.0" }, { "key": { "$ref": "path/to/user-create.avsc#/UserCreate" }, + "schemaIdLocation": "payload", + "schemaIdPayloadEncoding": "4", "bindingVersion": "0.3.0" } ] diff --git a/kafka/json_schemas/operation.json b/kafka/json_schemas/operation.json index db496416..ea54cdab 100644 --- a/kafka/json_schemas/operation.json +++ b/kafka/json_schemas/operation.json @@ -19,19 +19,6 @@ "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/schema", "description": "Id of the consumer inside a consumer group." }, - "schemaIdLocation": { - "type": "string", - "description": "If a Schema Registry is used when performing this operation, tells where the id of schema is stored.", - "enum": ["header", "payload"] - }, - "schemaIdPayloadEncoding": { - "type": "string", - "description": "Number of bytes or vendor specific values when schema id is encoded in payload." - }, - "schemaLookupStrategy": { - "type": "string", - "description": "Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied." - }, "bindingVersion": { "type": "string", "enum": [ From 4e40d3b7c7691e03f87900d0306064b311dcc17a Mon Sep 17 00:00:00 2001 From: Khuda Dad Nomani <32505158+KhudaDad414@users.noreply.github.com> Date: Wed, 21 Sep 2022 00:22:28 +0500 Subject: [PATCH 4/4] typo --- kafka/json_schemas/channel.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/json_schemas/channel.json b/kafka/json_schemas/channel.json index 0e0aa33e..4db52eb5 100644 --- a/kafka/json_schemas/channel.json +++ b/kafka/json_schemas/channel.json @@ -15,7 +15,7 @@ "type": "string", "description": "Kafka topic name if different from channel name." }, - "partition": { + "partitions": { "type": "integer", "minimum": 1, "description": "Number of partitions configured on this topic."