Skip to content

Commit

Permalink
Merge branch 'apache:dev' into feature-flush-es-interval
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilinli123 authored Mar 14, 2023
2 parents 0200ea8 + cf55b07 commit 624d570
Show file tree
Hide file tree
Showing 57 changed files with 876 additions and 268 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,11 @@ jobs:
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 2 1`
[ ! -z $sub_modules ] && ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
echo "sub modules is empty, skipping"
fi
env:
MAVEN_OPTS: -Xmx2048m

Expand Down
2 changes: 2 additions & 0 deletions config/hazelcast.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ hazelcast:
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
tcp-ip:
enabled: true
Expand Down
3 changes: 0 additions & 3 deletions config/seatunnel-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,3 @@
SPARK_HOME=${SPARK_HOME:-/opt/spark}
# Home directory of flink distribution.
FLINK_HOME=${FLINK_HOME:-/opt/flink}

# Control whether to print the ascii logo
export SEATUNNEL_PRINT_ASCII_LOGO=true
4 changes: 2 additions & 2 deletions config/seatunnel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ seatunnel:
slot-service:
dynamic-slot: true
checkpoint:
interval: 300000
timeout: 10000
interval: 10000
timeout: 60000
max-concurrent: 5
tolerable-failure: 2
storage:
Expand Down
6 changes: 3 additions & 3 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Simple

```
jdbc {
url = "jdbc:mysql://localhost/test"
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
Expand All @@ -182,7 +182,7 @@ Exactly-once
```
jdbc {
url = "jdbc:mysql://localhost/test"
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
max_retries = 0
Expand All @@ -201,7 +201,7 @@ CDC(Change data capture) event
```
sink {
jdbc {
url = "jdbc:mysql://localhost/test"
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Whether to enable upsert/delete, only supports PrimaryKey model.

We use templates to automatically create starrocks tables,
which will create corresponding table creation statements based on the type of upstream data and schema type,
and the default template can be modified according to the situation
and the default template can be modified according to the situation. Only work on multi-table mode at now.

```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
Expand Down
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [Elasticsearch] Support https protocol & compatible with opensearch
- [Hbase] Add hbase sink connector #4049
- [Github] Add Github source connector #4155
- [CDC] Support export debezium-json format to kafka #4339
### Formats
- [Canal]Support read canal format message #3950

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ public static Optional<Catalog> createOptionalCatalog(
}

public static <T extends Factory> URL getFactoryUrl(T factory) {
URL jarUrl = factory.getClass().getProtectionDomain().getCodeSource().getLocation();
return jarUrl;
return factory.getClass().getProtectionDomain().getCodeSource().getLocation();
}

public static <T extends Factory> Optional<T> discoverOptionalFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.seatunnel.common;

public final class Constants {
public static final String ROW_ROOT = "__root__";
public static final String ROW_TMP = "__tmp__";

public static final String LOGO = "SeaTunnel";

Expand All @@ -39,8 +37,6 @@ public final class Constants {

public static final String HDFS_USER = "hdfs.user";

public static final String CHECKPOINT_INTERVAL = "checkpoint.interval";

public static final String CHECKPOINT_ID = "checkpoint.id";

public static final String UUID = "uuid";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class ExceptionUtils {
private ExceptionUtils() {}

public static String getMessage(Throwable e) {
if (e == null) {
return "";
}
try (StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
// Output the error stack information to the printWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-compatible-debezium-json</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Debezium dependencies -->
<dependency>
<groupId>io.debezium</groupId>
Expand Down Expand Up @@ -87,6 +92,10 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-compatible-debezium-json</artifactId>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;

import java.util.Map;

Expand Down Expand Up @@ -107,6 +108,13 @@ public class SourceOptions {
.withDescription(
"Decides if the table options contains Debezium client properties that start with prefix 'debezium'.");

public static final Option<DeserializeFormat> FORMAT =
Options.key("format")
.enumType(DeserializeFormat.class)
.defaultValue(DeserializeFormat.DEFAULT)
.withDescription(
"Data format. The default format is seatunnel row. Optional compatible with debezium-json format.");

public static OptionRule.Builder getBaseRule() {
return OptionRule.builder()
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,15 @@ protected void processElement(
splitState.asSnapshotSplitState().setHighWatermark(watermark);
}
} else if (isSchemaChangeEvent(element) && splitState.isIncrementalSplitState()) {
// TODO Currently not supported Schema Change
emitElement(element, output);
} else if (isDataChangeRecord(element)) {
if (splitState.isIncrementalSplitState()) {
Offset position = getOffsetPosition(element);
splitState.asIncrementalSplitState().setStartupOffset(position);
}
emitElement(element, output);
} else {
// unknown element
log.info("Meet unknown element {}, just skip.", element);
emitElement(element, output);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.debezium;

import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;

public enum DeserializeFormat {
DEFAULT("default"),
COMPATIBLE_DEBEZIUM_JSON(CompatibleDebeziumJsonDeserializationSchema.IDENTIFIER);

private String name;

DeserializeFormat(String name) {
this.name = name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.debezium.row;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;

import org.apache.kafka.connect.source.SourceRecord;

import lombok.extern.slf4j.Slf4j;

import java.util.Map;

@Slf4j
public class DebeziumJsonDeserializeSchema implements DebeziumDeserializationSchema<SeaTunnelRow> {
private static final String KEY_SCHEMA_ENABLE = "key.converter.schemas.enable";
private static final String VALUE_SCHEMA_ENABLE = "value.converter.schemas.enable";

private final CompatibleDebeziumJsonDeserializationSchema deserializationSchema;

public DebeziumJsonDeserializeSchema(Map<String, String> debeziumConfig) {
boolean keySchemaEnable =
Boolean.valueOf(debeziumConfig.getOrDefault(KEY_SCHEMA_ENABLE, "true"));
boolean valueSchemaEnable =
Boolean.valueOf(debeziumConfig.getOrDefault(VALUE_SCHEMA_ENABLE, "true"));
this.deserializationSchema =
new CompatibleDebeziumJsonDeserializationSchema(keySchemaEnable, valueSchemaEnable);
}

@Override
public void deserialize(SourceRecord record, Collector<SeaTunnelRow> out) throws Exception {
SeaTunnelRow row = deserializationSchema.deserialize(record);
out.collect(row);
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return deserializationSchema.getProducedType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Map;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isDataChangeRecord;

/** Deserialization schema from Debezium object to {@link SeaTunnelRow}. */
@Slf4j
Expand Down Expand Up @@ -109,6 +110,11 @@ public static Builder builder() {
@Override
public void deserialize(SourceRecord record, Collector<SeaTunnelRow> collector)
throws Exception {
if (!isDataChangeRecord(record)) {
log.debug("Unsupported record {}, just skip.", record);
return;
}

Envelope.Operation operation = Envelope.operationFor(record);
Struct messageStruct = (Struct) record.value();
Schema valueSchema = record.valueSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
Expand Down Expand Up @@ -81,6 +83,13 @@ public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(Readonly
@Override
public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
ReadonlyConfig config) {
if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
config.get(JdbcSourceOptions.FORMAT))) {
return (DebeziumDeserializationSchema<T>)
new DebeziumJsonDeserializeSchema(
config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
}

SeaTunnelDataType<SeaTunnelRow> physicalRowType;
if (dataType == null) {
// TODO: support metadata keys
Expand Down
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
<artifactId>seatunnel-format-text</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-compatible-debezium-json</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;

import java.util.List;
import java.util.Map;
Expand All @@ -35,6 +36,9 @@ public class Config {

public static final String CANAL_FORMAT = "canal-json";

public static final String COMPATIBLE_DEBEZIUM_JSON =
CompatibleDebeziumJsonSerializationSchema.IDENTIFIER;

/** The default field delimiter is “,” */
public static final String DEFAULT_FIELD_DELIMITER = ",";

Expand Down
Loading

0 comments on commit 624d570

Please sign in to comment.