From 5fe779f7a8f3dae3bc8f1d04029fe2f928980c7c Mon Sep 17 00:00:00 2001 From: monster <60029759+MonsterChenzhuo@users.noreply.github.com> Date: Thu, 18 May 2023 16:26:56 +0800 Subject: [PATCH 1/7] [Docs][Connector-V2][Kafka]Reconstruct the kafka connector document --- docs/en/Connector-v2-release-state.md | 2 +- docs/en/connector-v2/sink/Kafka.md | 133 +++++++-------- docs/en/connector-v2/source/Kafka.md | 179 ++++++++++++++++++++ docs/en/connector-v2/source/kafka.md | 229 -------------------------- 4 files changed, 243 insertions(+), 300 deletions(-) create mode 100644 docs/en/connector-v2/source/Kafka.md delete mode 100644 docs/en/connector-v2/source/kafka.md diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md index 74a183c73dc..4e0577c6c93 100644 --- a/docs/en/Connector-v2-release-state.md +++ b/docs/en/Connector-v2-release-state.md @@ -46,7 +46,7 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex | [IoTDB](connector-v2/sink/IoTDB.md) | Sink | GA | 2.2.0-beta | | [Jdbc](connector-v2/source/Jdbc.md) | Source | GA | 2.2.0-beta | | [Jdbc](connector-v2/sink/Jdbc.md) | Sink | GA | 2.2.0-beta | -| [Kafka](connector-v2/source/kafka.md) | Source | GA | 2.3.0 | +| [Kafka](connector-v2/source/Kafka.md) | Source | GA | 2.3.0 | | [Kafka](connector-v2/sink/Kafka.md) | Sink | GA | 2.2.0-beta | | [Kudu](connector-v2/source/Kudu.md) | Source | Beta | 2.2.0-beta | | [Kudu](connector-v2/sink/Kudu.md) | Sink | Beta | 2.2.0-beta | diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md index 38a0e488b0f..92e939020c7 100644 --- a/docs/en/connector-v2/sink/Kafka.md +++ b/docs/en/connector-v2/sink/Kafka.md @@ -1,36 +1,51 @@ # Kafka > Kafka sink connector -> - ## Description -Write Rows to a Kafka topic. +## Support Those Engines + +> Spark
+> Flink
+> Seatunnel Zeta
-## Key features +## Key Features - [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +> By default, we will use 2pc to guarantee the message is sent to kafka exactly once. -By default, we will use 2pc to guarantee the message is sent to kafka exactly once. +## Description + +Write Rows to a Kafka topic. -## Options +## Supported DataSource Info -| name | type | required | default value | -|----------------------|--------|----------|---------------| -| topic | string | yes | - | -| bootstrap.servers | string | yes | - | -| kafka.config | map | no | - | -| semantics | string | no | NON | -| partition_key_fields | array | no | - | -| partition | int | no | - | -| assign_partitions | array | no | - | -| transaction_prefix | string | no | - | -| format | String | no | json | -| field_delimiter | String | no | , | -| common-options | config | no | - | +In order to use the Kafka connector, the following dependencies are required. +They can be downloaded via install-plugin.sh or from the Maven central repository. -### topic [string] +| Datasource | Supported Versions | Maven | +|------------|--------------------|-------------------------------------------------------------------------------------------------------------| +| Kafka | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka) | -Kafka Topic. +## Sink Options + +| Name | Type | Required | Default | Description | +|----------------------|--------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. | +| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | +| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). | +| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. | +| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. | +| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. | +| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. | +| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. | +| format | String | No | json | Data format. The default format is json. Optional text format. The default field separator is ",",If you customize the delimiter, add the "field_delimiter" option. | +| field_delimiter | String | No | , | Customize the field delimiter for data format. | + +## Parameter Interpretation + +### Topic Formats Currently two formats are supported: @@ -47,27 +62,13 @@ Currently two formats are supported: If `${name}` is set as the topic. So the first row is sent to Jack topic, and the second row is sent to Mary topic. -### bootstrap.servers [string] - -Kafka Brokers List. - -### kafka.config [kafka producer config] - -In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). - -### semantics [string] - -Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. +### Semantics In EXACTLY_ONCE, producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint. - In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. - NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated. -### partition_key_fields [array] - -Configure which fields are used as the key of the kafka message. +### Partition Key Fields For example, if you want to use value of fields from upstream data as key, you can assign field names to this property. @@ -79,53 +80,48 @@ Upstream data is the following: | Mary | 23 | data-example2 | If name is set as the key, then the hash value of the name column will determine which partition the message is sent to. - If not set partition key fields, the null message key will be sent to. - The format of the message key is json, If name is set as the key, for example '{"name":"Jack"}'. - The selected field must be an existing field in the upstream. -### partition [int] - -We can specify the partition, all messages will be sent to this partition. - -### assign_partitions [array] - -We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. +### Assign Partitions For example, there are five partitions in total, and the assign_partitions field in config is as follows: assign_partitions = ["shoe", "clothing"] - Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscribed as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions. - This function by `MessageContentPartitioner` class implements `org.apache.kafka.clients.producer.Partitioner` interface.If we need custom partitions, we need to implement this interface as well. -### transaction_prefix [string] - -If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction. -Kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. - -### format - -Data format. The default format is json. Optional text format. The default field separator is ",". -If you customize the delimiter, add the "field_delimiter" option. +## Task Example -### field_delimiter +### Simple: -Customize the field delimiter for data format. +> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Kafka Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target topic is test_topic will also be 16 rows of data in the topic. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. -### common options [config] - -Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. +```hocon +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} -## Examples +source { + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} -```hocon sink { - kafka { - topic = "seatunnel" + topic = "test_topic" bootstrap.servers = "localhost:9092" partition = 3 format = json @@ -137,7 +133,6 @@ sink { buffer.memory = 33554432 } } - } ``` @@ -160,7 +155,6 @@ sink { sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};" } } - } ``` @@ -197,7 +191,6 @@ sink { sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" } } - } ``` diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md new file mode 100644 index 00000000000..264601a7487 --- /dev/null +++ b/docs/en/connector-v2/source/Kafka.md @@ -0,0 +1,179 @@ +# Kafka + +> Kafka source connector + +## Support Those Engines + +> Spark
+> Flink
+> Seatunnel Zeta
+ +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Description + +Source connector for Apache Kafka. + +## Supported DataSource Info + +In order to use the Kafka connector, the following dependencies are required. +They can be downloaded via install-plugin.sh or from the Maven central repository. + +| Datasource | Supported Versions | Maven | +|------------|--------------------|-------------------------------------------------------------------------------------------------------------| +| Kafka | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka) | + +## Source Options + +| Name | Type | Required | Default | Description | +|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | +| bootstrap.servers | String | yes | - | Comma separated list of Kafka brokers. | +| pattern | Boolean | no | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | +| consumer.group | String | no | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | +| commit_on_checkpoint | Boolean | no | true | If true the consumer's offset will be periodically committed in the background. | +| kafka.config | Map | no | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | +| schema | Config | no | - | The structure of the data, including field names and field types. | +| format | String | no | json | Data format. The default format is json. Optional text format. The default field separator is ", ",if you customize the delimiter, add the "field_delimiter" option. | +| format_error_handle_way | String | no | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. | +| field_delimiter | String | no | , | Customize the field delimiter for data format. | +| start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | no | group_offsets | The initial consumption pattern of consumers. | +| start_mode.offsets | Config | no | | The offset required for consumption mode to be specific_offsets. | +| start_mode.timestamp | Long | no | | The time required for consumption mode to be "timestamp". | +| partition-discovery.interval-millis | Long | no | -1 | The interval for dynamically discovering topics and partitions. | + +## Task Example + +### Simple + +> This example reads the data of kafka's topic_1, topic_2, topic_3 and prints it to the client.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. + +```hocon +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 2 + job.mode = "BATCH" +} + +source { + Kafka { + schema = { + fields { + name = "string" + age = "int" + } + } + format = text + field_delimiter = "#" + topic = "topic_1,topic_2,topic_3" + bootstrap.servers = "localhost:9092" + kafka.config = { + client.id = client_1 + max.poll.records = 500 + auto.offset.reset = "earliest" + enable.auto.commit = "false" + } + } +} + +sink { + Console {} +} +``` + +### Regex Topic + +```hocon +source { + + Kafka { + topic = ".*seatunnel*." + pattern = "true" + bootstrap.servers = "localhost:9092" + consumer.group = "seatunnel_group" + } + +} +``` + +### AWS MSK SASL/SCRAM + +Replace the following `${username}` and `${password}` with the configuration values in AWS MSK. + +```hocon +source { + Kafka { + topic = "seatunnel" + bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096" + consumer.group = "seatunnel_group" + kafka.config = { + security.protocol=SASL_SSL + sasl.mechanism=SCRAM-SHA-512 + sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};" + #security.protocol=SASL_SSL + #sasl.mechanism=AWS_MSK_IAM + #sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" + #sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" + } + } +} +``` + +### AWS MSK IAM + +Download `aws-msk-iam-auth-1.1.5.jar` from https://github.com/aws/aws-msk-iam-auth/releases and put it in `$SEATUNNEL_HOME/plugin/kafka/lib` dir. + +Please ensure the IAM policy have `"kafka-cluster:Connect",`. Like this: + +```hocon +"Effect": "Allow", +"Action": [ + "kafka-cluster:Connect", + "kafka-cluster:AlterCluster", + "kafka-cluster:DescribeCluster" +], +``` + +Source Config + +```hocon +source { + Kafka { + topic = "seatunnel" + bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098" + consumer.group = "seatunnel_group" + kafka.config = { + #security.protocol=SASL_SSL + #sasl.mechanism=SCRAM-SHA-512 + #sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};" + security.protocol=SASL_SSL + sasl.mechanism=AWS_MSK_IAM + sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" + sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" + } + } +} +``` + +## Changelog + +### 2.3.0-beta 2022-10-20 + +- Add Kafka Source Connector + +### Next Version + +- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157)) +- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125)) +- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719) +- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810)) +- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/incubator-seatunnel/pull/4364)) + diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md deleted file mode 100644 index 9506712c202..00000000000 --- a/docs/en/connector-v2/source/kafka.md +++ /dev/null @@ -1,229 +0,0 @@ -# Kafka - -> Kafka source connector - -## Description - -Source connector for Apache Kafka. - -## Key features - -- [x] [batch](../../concept/connector-v2-features.md) -- [x] [stream](../../concept/connector-v2-features.md) -- [x] [exactly-once](../../concept/connector-v2-features.md) -- [ ] [column projection](../../concept/connector-v2-features.md) -- [x] [parallelism](../../concept/connector-v2-features.md) -- [ ] [support user-defined split](../../concept/connector-v2-features.md) - -## Options - -| name | type | required | default value | -|-------------------------------------|---------|----------|--------------------------| -| topic | String | yes | - | -| bootstrap.servers | String | yes | - | -| pattern | Boolean | no | false | -| consumer.group | String | no | SeaTunnel-Consumer-Group | -| commit_on_checkpoint | Boolean | no | true | -| kafka.config | Map | no | - | -| common-options | config | no | - | -| schema | | no | - | -| format | String | no | json | -| format_error_handle_way | String | no | fail | -| field_delimiter | String | no | , | -| start_mode | String | no | group_offsets | -| start_mode.offsets | | no | | -| start_mode.timestamp | Long | no | | -| partition-discovery.interval-millis | long | no | -1 | - -### topic [string] - -`Kafka topic` name. If there are multiple `topics`, use `,` to split, for example: `"tpc1,tpc2"`. - -### bootstrap.servers [string] - -`Kafka` cluster address, separated by `","`. - -### pattern [boolean] - -If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. - -### consumer.group [string] - -`Kafka consumer group id`, used to distinguish different consumer groups. - -### commit_on_checkpoint [boolean] - -If true the consumer's offset will be periodically committed in the background. - -## partition-discovery.interval-millis [long] - -The interval for dynamically discovering topics and partitions. - -### kafka.config [map] - -In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). - -### common-options [config] - -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. - -### schema - -The structure of the data, including field names and field types. - -## format - -Data format. The default format is json. Optional text format. The default field separator is ", ". -If you customize the delimiter, add the "field_delimiter" option. - -## format_error_handle_way - -The processing method of data format error. The default value is fail, and the optional value is (fail, skip). -When fail is selected, data format error will block and an exception will be thrown. -When skip is selected, data format error will skip this line data. - -## field_delimiter - -Customize the field delimiter for data format. - -## start_mode - -The initial consumption pattern of consumers,there are several types: -[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] - -## start_mode.timestamp - -The time required for consumption mode to be "timestamp". - -## start_mode.offsets - -The offset required for consumption mode to be specific_offsets. - -for example: - -```hocon -start_mode.offsets = { - info-0 = 70 - info-1 = 10 - info-2 = 10 - } -``` - -## Example - -### Simple - -```hocon -source { - - Kafka { - result_table_name = "kafka_name" - schema = { - fields { - name = "string" - age = "int" - } - } - format = text - field_delimiter = "#" - topic = "topic_1,topic_2,topic_3" - bootstrap.servers = "localhost:9092" - kafka.config = { - client.id = client_1 - max.poll.records = 500 - auto.offset.reset = "earliest" - enable.auto.commit = "false" - } - } - -} -``` - -### Regex Topic - -```hocon -source { - - Kafka { - topic = ".*seatunnel*." - pattern = "true" - bootstrap.servers = "localhost:9092" - consumer.group = "seatunnel_group" - } - -} -``` - -### AWS MSK SASL/SCRAM - -Replace the following `${username}` and `${password}` with the configuration values in AWS MSK. - -```hocon -source { - Kafka { - topic = "seatunnel" - bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096" - consumer.group = "seatunnel_group" - kafka.config = { - security.protocol=SASL_SSL - sasl.mechanism=SCRAM-SHA-512 - sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};" - #security.protocol=SASL_SSL - #sasl.mechanism=AWS_MSK_IAM - #sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" - #sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" - } - } -} -``` - -### AWS MSK IAM - -Download `aws-msk-iam-auth-1.1.5.jar` from https://github.com/aws/aws-msk-iam-auth/releases and put it in `$SEATUNNEL_HOME/plugin/kafka/lib` dir. - -Please ensure the IAM policy have `"kafka-cluster:Connect",`. Like this: - -```hocon -"Effect": "Allow", -"Action": [ - "kafka-cluster:Connect", - "kafka-cluster:AlterCluster", - "kafka-cluster:DescribeCluster" -], -``` - -Source Config - -```hocon -source { - Kafka { - topic = "seatunnel" - bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098" - consumer.group = "seatunnel_group" - kafka.config = { - #security.protocol=SASL_SSL - #sasl.mechanism=SCRAM-SHA-512 - #sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};" - security.protocol=SASL_SSL - sasl.mechanism=AWS_MSK_IAM - sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" - sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" - } - } -} -``` - -## Changelog - -### 2.3.0-beta 2022-10-20 - -- Add Kafka Source Connector - -### Next Version - -- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157)) -- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125)) -- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719) -- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810)) -- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/incubator-seatunnel/pull/4364)) - From 2f381cb4d1a1ec9105563c67384587b12f03a77e Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Mon, 10 Jul 2023 11:59:10 +0800 Subject: [PATCH 2/7] Fix the review problem --- docs/en/connector-v2/sink/Kafka.md | 3 +- docs/en/connector-v2/source/Kafka.md | 49 +++++++++++++--------------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md index 47b36eb56cc..bb072158b17 100644 --- a/docs/en/connector-v2/sink/Kafka.md +++ b/docs/en/connector-v2/sink/Kafka.md @@ -30,7 +30,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Sink Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |----------------------|--------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. | | bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | @@ -42,6 +42,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor | transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. | | format | String | No | json | Data format. The default format is json. Optional text format. The default field separator is ",",If you customize the delimiter, add the "field_delimiter" option. | | field_delimiter | String | No | , | Customize the field delimiter for data format. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ## Parameter Interpretation diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md index 264601a7487..8e7cb113d48 100644 --- a/docs/en/connector-v2/source/Kafka.md +++ b/docs/en/connector-v2/source/Kafka.md @@ -32,22 +32,23 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Source Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | String | yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | -| bootstrap.servers | String | yes | - | Comma separated list of Kafka brokers. | -| pattern | Boolean | no | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | -| consumer.group | String | no | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | -| commit_on_checkpoint | Boolean | no | true | If true the consumer's offset will be periodically committed in the background. | -| kafka.config | Map | no | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | -| schema | Config | no | - | The structure of the data, including field names and field types. | -| format | String | no | json | Data format. The default format is json. Optional text format. The default field separator is ", ",if you customize the delimiter, add the "field_delimiter" option. | -| format_error_handle_way | String | no | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. | -| field_delimiter | String | no | , | Customize the field delimiter for data format. | -| start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | no | group_offsets | The initial consumption pattern of consumers. | -| start_mode.offsets | Config | no | | The offset required for consumption mode to be specific_offsets. | -| start_mode.timestamp | Long | no | | The time required for consumption mode to be "timestamp". | -| partition-discovery.interval-millis | Long | no | -1 | The interval for dynamically discovering topics and partitions. | +| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | +| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | +| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | +| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | +| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. | +| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | +| schema | Config | No | - | The structure of the data, including field names and field types. | +| format | String | No | json | Data format. The default format is json. Optional text format. The default field separator is ", ",if you customize the delimiter, add the "field_delimiter" option. | +| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. | +| field_delimiter | String | No | , | Customize the field delimiter for data format. | +| start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | No | group_offsets | The initial consumption pattern of consumers. | +| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. | +| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". | +| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ## Task Example @@ -62,7 +63,6 @@ env { execution.parallelism = 2 job.mode = "BATCH" } - source { Kafka { schema = { @@ -83,7 +83,6 @@ source { } } } - sink { Console {} } @@ -93,14 +92,12 @@ sink { ```hocon source { - Kafka { topic = ".*seatunnel*." pattern = "true" bootstrap.servers = "localhost:9092" consumer.group = "seatunnel_group" } - } ``` @@ -117,7 +114,7 @@ source { kafka.config = { security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 - sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};" + sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";" #security.protocol=SASL_SSL #sasl.mechanism=AWS_MSK_IAM #sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" @@ -153,7 +150,7 @@ source { kafka.config = { #security.protocol=SASL_SSL #sasl.mechanism=SCRAM-SHA-512 - #sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};" + #sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";" security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" @@ -171,9 +168,9 @@ source { ### Next Version -- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157)) -- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125)) -- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719) -- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810)) -- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/incubator-seatunnel/pull/4364)) +- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/seatunnel/pull/3157)) +- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/seatunnel/pull/3125)) +- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719) +- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810)) +- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/seatunnel/pull/4364)) From 639be274a9889b8e10b6473f164b83e8b5a439fe Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Tue, 25 Jul 2023 20:55:36 +0800 Subject: [PATCH 3/7] Resolve conflict problems --- docs/en/connector-v2/sink/Kafka.md | 152 +++++++++++---------------- docs/en/connector-v2/source/kafka.md | 19 +--- 2 files changed, 66 insertions(+), 105 deletions(-) diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md index f971e5390b0..b941e42c64a 100644 --- a/docs/en/connector-v2/sink/Kafka.md +++ b/docs/en/connector-v2/sink/Kafka.md @@ -1,36 +1,52 @@ # Kafka > Kafka sink connector -> - ## Description -Write Rows to a Kafka topic. +## Support Those Engines + +> Spark
+> Flink
+> Seatunnel Zeta
-## Key features +## Key Features - [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +> By default, we will use 2pc to guarantee the message is sent to kafka exactly once. + +## Description + +Write Rows to a Kafka topic. -By default, we will use 2pc to guarantee the message is sent to kafka exactly once. +## Supported DataSource Info -## Options +In order to use the Kafka connector, the following dependencies are required. +They can be downloaded via install-plugin.sh or from the Maven central repository. -| name | type | required | default value | -|----------------------|--------|----------|---------------| -| topic | string | yes | - | -| bootstrap.servers | string | yes | - | -| kafka.config | map | no | - | -| semantics | string | no | NON | -| partition_key_fields | array | no | - | -| partition | int | no | - | -| assign_partitions | array | no | - | -| transaction_prefix | string | no | - | -| format | String | no | json | -| field_delimiter | String | no | , | -| common-options | config | no | - | +| Datasource | Supported Versions | Maven | +|------------|--------------------|-------------------------------------------------------------------------------------------------------------| +| Kafka | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka) | -### topic [string] +## Sink Options -Kafka Topic. +| Name | Type | Required | Default | Description | +|----------------------|--------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. | +| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | +| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). | +| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. | +| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. | +| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. | +| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. | +| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. | +| format | String | No | json | Data format. The default format is json. Optional text format. The default field separator is ",",If you customize the delimiter, add the "field_delimiter" option. | +| field_delimiter | String | No | , | Customize the field delimiter for data format. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +## Parameter Interpretation + +### Topic Formats Currently two formats are supported: @@ -47,27 +63,13 @@ Currently two formats are supported: If `${name}` is set as the topic. So the first row is sent to Jack topic, and the second row is sent to Mary topic. -### bootstrap.servers [string] - -Kafka Brokers List. - -### kafka.config [kafka producer config] - -In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). - -### semantics [string] - -Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. +### Semantics In EXACTLY_ONCE, producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint. - In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. - NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated. -### partition_key_fields [array] - -Configure which fields are used as the key of the kafka message. +### Partition Key Fields For example, if you want to use value of fields from upstream data as key, you can assign field names to this property. @@ -79,55 +81,48 @@ Upstream data is the following: | Mary | 23 | data-example2 | If name is set as the key, then the hash value of the name column will determine which partition the message is sent to. - If not set partition key fields, the null message key will be sent to. - The format of the message key is json, If name is set as the key, for example '{"name":"Jack"}'. - The selected field must be an existing field in the upstream. -### partition [int] - -We can specify the partition, all messages will be sent to this partition. - -### assign_partitions [array] - -We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. +### Assign Partitions For example, there are five partitions in total, and the assign_partitions field in config is as follows: assign_partitions = ["shoe", "clothing"] - Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscribed as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions. - This function by `MessageContentPartitioner` class implements `org.apache.kafka.clients.producer.Partitioner` interface.If we need custom partitions, we need to implement this interface as well. -### transaction_prefix [string] - -If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction. -Kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. - -### format - -Data format. The default format is json. Optional text format, canal-json and debezium-json. -If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option. -If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details. -If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. +## Task Example -### field_delimiter +### Simple: -Customize the field delimiter for data format. +> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Kafka Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target topic is test_topic will also be 16 rows of data in the topic. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. -### common options [config] - -Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. +```hocon +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} -## Examples +source { + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} -```hocon sink { - kafka { - topic = "seatunnel" + topic = "test_topic" bootstrap.servers = "localhost:9092" partition = 3 format = json @@ -139,7 +134,6 @@ sink { buffer.memory = 33554432 } } - } ``` @@ -162,7 +156,6 @@ sink { sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};" } } - } ``` @@ -199,22 +192,5 @@ sink { sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" } } - } ``` - -## Changelog - -### 2.3.0-beta 2022-10-20 - -- Add Kafka Sink Connector - -### next version - -- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230) -- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/incubator-seatunnel/pull/3711) -- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/incubator-seatunnel/pull/3742) -- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719) -- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950) -- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981) - diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 8e7cb113d48..eeb0236e347 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -32,7 +32,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Source Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | | bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | @@ -158,19 +158,4 @@ source { } } } -``` - -## Changelog - -### 2.3.0-beta 2022-10-20 - -- Add Kafka Source Connector - -### Next Version - -- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/seatunnel/pull/3157)) -- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/seatunnel/pull/3125)) -- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719) -- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810)) -- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/seatunnel/pull/4364)) - +``` \ No newline at end of file From 3953465f9a705acecbc114c791d67c1994475335 Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Tue, 25 Jul 2023 21:00:08 +0800 Subject: [PATCH 4/7] Resolve conflict problems --- docs/en/connector-v2/source/Kafka.md | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md index 8e7cb113d48..eeb0236e347 100644 --- a/docs/en/connector-v2/source/Kafka.md +++ b/docs/en/connector-v2/source/Kafka.md @@ -32,7 +32,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Source Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | | bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | @@ -158,19 +158,4 @@ source { } } } -``` - -## Changelog - -### 2.3.0-beta 2022-10-20 - -- Add Kafka Source Connector - -### Next Version - -- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/seatunnel/pull/3157)) -- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/seatunnel/pull/3125)) -- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719) -- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810)) -- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/seatunnel/pull/4364)) - +``` \ No newline at end of file From 91cac2ac301cf25c451596358d746ee81f4b4d23 Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Tue, 25 Jul 2023 21:03:40 +0800 Subject: [PATCH 5/7] Resolve conflict problems --- docs/en/connector-v2/sink/Kafka.md | 1 + docs/en/connector-v2/source/Kafka.md | 3 ++- docs/en/connector-v2/source/kafka.md | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md index b941e42c64a..be7fa40a512 100644 --- a/docs/en/connector-v2/sink/Kafka.md +++ b/docs/en/connector-v2/sink/Kafka.md @@ -194,3 +194,4 @@ sink { } } ``` + diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md index eeb0236e347..a5ff4d0e3eb 100644 --- a/docs/en/connector-v2/source/Kafka.md +++ b/docs/en/connector-v2/source/Kafka.md @@ -158,4 +158,5 @@ source { } } } -``` \ No newline at end of file +``` + diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index eeb0236e347..a5ff4d0e3eb 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -158,4 +158,5 @@ source { } } } -``` \ No newline at end of file +``` + From fc7fcb1efaaa20bb7bdff18a3b7d549fbac98a6d Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Tue, 25 Jul 2023 23:16:47 +0800 Subject: [PATCH 6/7] Resolve conflict problems --- docs/en/Connector-v2-release-state.md | 2 +- docs/en/connector-v2/source/Kafka.md | 162 -------------------------- 2 files changed, 1 insertion(+), 163 deletions(-) delete mode 100644 docs/en/connector-v2/source/Kafka.md diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md index 4e76fa2ef23..308cb010b42 100644 --- a/docs/en/Connector-v2-release-state.md +++ b/docs/en/Connector-v2-release-state.md @@ -46,7 +46,7 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex | [IoTDB](connector-v2/sink/IoTDB.md) | Sink | GA | 2.2.0-beta | | [Jdbc](connector-v2/source/Jdbc.md) | Source | GA | 2.2.0-beta | | [Jdbc](connector-v2/sink/Jdbc.md) | Sink | GA | 2.2.0-beta | -| [Kafka](connector-v2/source/Kafka.md) | Source | GA | 2.3.0 | +| [Kafka](connector-v2/source/kafka.md) | Source | GA | 2.3.0 | | [Kafka](connector-v2/sink/Kafka.md) | Sink | GA | 2.2.0-beta | | [Kudu](connector-v2/source/Kudu.md) | Source | Beta | 2.2.0-beta | | [Kudu](connector-v2/sink/Kudu.md) | Sink | Beta | 2.2.0-beta | diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md deleted file mode 100644 index a5ff4d0e3eb..00000000000 --- a/docs/en/connector-v2/source/Kafka.md +++ /dev/null @@ -1,162 +0,0 @@ -# Kafka - -> Kafka source connector - -## Support Those Engines - -> Spark
-> Flink
-> Seatunnel Zeta
- -## Key Features - -- [x] [batch](../../concept/connector-v2-features.md) -- [x] [stream](../../concept/connector-v2-features.md) -- [x] [exactly-once](../../concept/connector-v2-features.md) -- [ ] [column projection](../../concept/connector-v2-features.md) -- [x] [parallelism](../../concept/connector-v2-features.md) -- [ ] [support user-defined split](../../concept/connector-v2-features.md) - -## Description - -Source connector for Apache Kafka. - -## Supported DataSource Info - -In order to use the Kafka connector, the following dependencies are required. -They can be downloaded via install-plugin.sh or from the Maven central repository. - -| Datasource | Supported Versions | Maven | -|------------|--------------------|-------------------------------------------------------------------------------------------------------------| -| Kafka | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka) | - -## Source Options - -| Name | Type | Required | Default | Description | -|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | -| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | -| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | -| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | -| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. | -| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | -| schema | Config | No | - | The structure of the data, including field names and field types. | -| format | String | No | json | Data format. The default format is json. Optional text format. The default field separator is ", ",if you customize the delimiter, add the "field_delimiter" option. | -| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. | -| field_delimiter | String | No | , | Customize the field delimiter for data format. | -| start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | No | group_offsets | The initial consumption pattern of consumers. | -| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. | -| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". | -| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. | -| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | - -## Task Example - -### Simple - -> This example reads the data of kafka's topic_1, topic_2, topic_3 and prints it to the client.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. - -```hocon -# Defining the runtime environment -env { - # You can set flink configuration here - execution.parallelism = 2 - job.mode = "BATCH" -} -source { - Kafka { - schema = { - fields { - name = "string" - age = "int" - } - } - format = text - field_delimiter = "#" - topic = "topic_1,topic_2,topic_3" - bootstrap.servers = "localhost:9092" - kafka.config = { - client.id = client_1 - max.poll.records = 500 - auto.offset.reset = "earliest" - enable.auto.commit = "false" - } - } -} -sink { - Console {} -} -``` - -### Regex Topic - -```hocon -source { - Kafka { - topic = ".*seatunnel*." - pattern = "true" - bootstrap.servers = "localhost:9092" - consumer.group = "seatunnel_group" - } -} -``` - -### AWS MSK SASL/SCRAM - -Replace the following `${username}` and `${password}` with the configuration values in AWS MSK. - -```hocon -source { - Kafka { - topic = "seatunnel" - bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096" - consumer.group = "seatunnel_group" - kafka.config = { - security.protocol=SASL_SSL - sasl.mechanism=SCRAM-SHA-512 - sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";" - #security.protocol=SASL_SSL - #sasl.mechanism=AWS_MSK_IAM - #sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" - #sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" - } - } -} -``` - -### AWS MSK IAM - -Download `aws-msk-iam-auth-1.1.5.jar` from https://github.com/aws/aws-msk-iam-auth/releases and put it in `$SEATUNNEL_HOME/plugin/kafka/lib` dir. - -Please ensure the IAM policy have `"kafka-cluster:Connect",`. Like this: - -```hocon -"Effect": "Allow", -"Action": [ - "kafka-cluster:Connect", - "kafka-cluster:AlterCluster", - "kafka-cluster:DescribeCluster" -], -``` - -Source Config - -```hocon -source { - Kafka { - topic = "seatunnel" - bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098" - consumer.group = "seatunnel_group" - kafka.config = { - #security.protocol=SASL_SSL - #sasl.mechanism=SCRAM-SHA-512 - #sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";" - security.protocol=SASL_SSL - sasl.mechanism=AWS_MSK_IAM - sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" - sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" - } - } -} -``` - From 5996a6ea88310278c1153b667ca860442a1222af Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Wed, 26 Jul 2023 09:02:14 +0800 Subject: [PATCH 7/7] Resolve conflict problems --- docs/en/connector-v2/sink/Kafka.md | 26 ++++++++++----------- docs/en/connector-v2/source/kafka.md | 34 ++++++++++++++-------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md index be7fa40a512..1e258a058ad 100644 --- a/docs/en/connector-v2/sink/Kafka.md +++ b/docs/en/connector-v2/sink/Kafka.md @@ -30,19 +30,19 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Sink Options -| Name | Type | Required | Default | Description | -|----------------------|--------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. | -| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | -| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). | -| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. | -| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. | -| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. | -| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. | -| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. | -| format | String | No | json | Data format. The default format is json. Optional text format. The default field separator is ",",If you customize the delimiter, add the "field_delimiter" option. | -| field_delimiter | String | No | , | Customize the field delimiter for data format. | -| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | +| Name | Type | Required | Default | Description | +|----------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. | +| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | +| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). | +| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. | +| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. | +| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. | +| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. | +| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. | +| format | String | No | json | Data format. The default format is json. Optional text format, canal-json and debezium-json.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. | +| field_delimiter | String | No | , | Customize the field delimiter for data format. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ## Parameter Interpretation diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index a5ff4d0e3eb..16b9c5420b3 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -32,23 +32,23 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Source Options -| Name | Type | Required | Default | Description | -|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | -| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | -| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | -| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | -| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. | -| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | -| schema | Config | No | - | The structure of the data, including field names and field types. | -| format | String | No | json | Data format. The default format is json. Optional text format. The default field separator is ", ",if you customize the delimiter, add the "field_delimiter" option. | -| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. | -| field_delimiter | String | No | , | Customize the field delimiter for data format. | -| start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | No | group_offsets | The initial consumption pattern of consumers. | -| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. | -| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". | -| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. | -| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | +| Name | Type | Required | Default | Description | +|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | +| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | +| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | +| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | +| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. | +| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | +| schema | Config | No | - | The structure of the data, including field names and field types. | +| format | String | No | json | Data format. The default format is json. Optional text format, canal-json and debezium-json.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. | +| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. | +| field_delimiter | String | No | , | Customize the field delimiter for data format. | +| start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | No | group_offsets | The initial consumption pattern of consumers. | +| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. | +| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". | +| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ## Task Example