From a692d351a2673a46436636a0956c6e58c78cd06e Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Mon, 24 Oct 2022 09:29:03 +0800
Subject: [PATCH 01/24] add amazondynamodb connnector
---
.../connector-amazondynamodb/pom.xml | 61 ++++++++++++
.../config/AmazondynamodbConfig.java | 29 ++++++
.../config/AmazondynamodbSourceOptions.java | 58 +++++++++++
.../sink/AmazondynamodbSink.java | 21 ++++
.../source/AmazondynamodbSource.java | 96 +++++++++++++++++++
.../source/AmazondynamodbSourceReader.java | 77 +++++++++++++++
.../source/AmazondynamodbSourceSplit.java | 35 +++++++
.../state/AmazonDynamodbSourceState.java | 23 +++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 ++
10 files changed, 407 insertions(+)
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/pom.xml
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceSplit.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/state/AmazonDynamodbSourceState.java
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml b/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml
new file mode 100644
index 00000000000..0b04ef2ab43
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+
+ seatunnel-connectors-v2
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+
+ connector-amazondynamodb
+
+
+ 8
+ 8
+ 2.18.1
+
+
+
+
+
+ software.amazon.awssdk
+ bom
+ ${amazon.awssdk}
+ pom
+ import
+
+
+
+
+
+
+ software.amazon.awssdk
+ dynamodb-enhanced
+
+
+ software.amazon.awssdk
+ dynamodb
+
+
+
+
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java
new file mode 100644
index 00000000000..05810dc4a81
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
+
+import java.io.Serializable;
+
+public class AmazondynamodbConfig implements Serializable {
+ public static final String URL = "url";
+ public static final String REGION = "region";
+ public static final String ACCESS_KEY_ID = "accessKeyId";
+ public static final String SECRET_ACCESS_KEY = "secretAccessKey";
+ public static final String QUERY = "query";
+
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
new file mode 100644
index 00000000000..3bce7650b6d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class AmazondynamodbSourceOptions implements Serializable {
+
+ private String url;
+
+ private String region;
+
+ private String accessKeyId;
+
+ private String secretAccessKey;
+
+ private String query;
+
+ public AmazondynamodbSourceOptions(Config config) {
+ if (config.hasPath(AmazondynamodbConfig.URL)) {
+ this.url = config.getString(AmazondynamodbConfig.URL);
+ }
+ if (config.hasPath(AmazondynamodbConfig.REGION)) {
+ this.region = config.getString(AmazondynamodbConfig.REGION);
+ }
+ if (config.hasPath(AmazondynamodbConfig.ACCESS_KEY_ID)) {
+ this.accessKeyId = config.getString(AmazondynamodbConfig.ACCESS_KEY_ID);
+ }
+ if (config.hasPath(AmazondynamodbConfig.SECRET_ACCESS_KEY)) {
+ this.secretAccessKey = config.getString(AmazondynamodbConfig.SECRET_ACCESS_KEY);
+ }
+ if (config.hasPath(AmazondynamodbConfig.QUERY)) {
+ this.query = config.getString(AmazondynamodbConfig.QUERY);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java
new file mode 100644
index 00000000000..44e0cb7c559
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+public class AmazondynamodbSink {
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
new file mode 100644
index 00000000000..0c9c22662e4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.state.AmazonDynamodbSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.protocol.MarshallingType;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbResponseMetadata;
+import software.amazon.awssdk.services.dynamodb.model.ExecuteStatementRequest;
+import software.amazon.awssdk.services.dynamodb.model.ExecuteStatementResponse;
+
+import java.net.URI;
+
+@Slf4j
+@AutoService(SeaTunnelSource.class)
+public class AmazondynamodbSource implements SeaTunnelSource {
+
+ private DynamoDbClient dynamoDbClient;
+
+ private AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+
+ @Override
+ public String getPluginName() {
+ return "Amazondynamodb";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ amazondynamodbSourceOptions = new AmazondynamodbSourceOptions(pluginConfig);
+ dynamoDbClient = DynamoDbClient.builder()
+ .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+ // The region is meaningless for local DynamoDb but required for client builder validation
+ .region(Region.of(amazondynamodbSourceOptions.getRegion()))
+ .credentialsProvider(StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey())))
+ .build();
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelDataType getProducedType() {
+ ExecuteStatementResponse executeStatementResponse = dynamoDbClient.executeStatement(ExecuteStatementRequest.builder().statement(amazondynamodbSourceOptions.getQuery()).build());
+
+ return null;
+ }
+
+ @Override
+ public SourceReader createReader(SourceReader.Context readerContext) {
+ return new AmazondynamodbSourceReader(dynamoDbClient, readerContext, amazondynamodbSourceOptions);
+ }
+
+ @Override
+ public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception {
+ return null;
+ }
+
+ @Override
+ public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, AmazonDynamodbSourceState checkpointState) throws Exception {
+ return null;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
new file mode 100644
index 00000000000..7803c83fc98
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.ExecuteStatementRequest;
+
+import java.io.IOException;
+import java.util.List;
+
+public class AmazondynamodbSourceReader implements SourceReader {
+
+ DynamoDbClient dynamoDbClient;
+ SourceReader.Context context;
+ ExecuteStatementRequest executeStatement;
+ AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+
+ public AmazondynamodbSourceReader(DynamoDbClient dynamoDbClient, SourceReader.Context context, AmazondynamodbSourceOptions amazondynamodbSourceOptions) {
+ this.dynamoDbClient = dynamoDbClient;
+ this.context = context;
+ this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+ }
+
+ @Override
+ public void open() throws Exception {
+ executeStatement = ExecuteStatementRequest.builder().statement(amazondynamodbSourceOptions.getQuery()).build();
+ }
+
+ @Override
+ public void close() throws IOException {
+ dynamoDbClient.close();
+ }
+
+ @Override
+ public void pollNext(Collector output) throws Exception {
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void addSplits(List splits) {
+
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceSplit.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceSplit.java
new file mode 100644
index 00000000000..f2a871fb06e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceSplit.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class AmazondynamodbSourceSplit implements SourceSplit {
+
+ Integer splitId;
+
+ @Override
+ public String splitId() {
+ return splitId.toString();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/state/AmazonDynamodbSourceState.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/state/AmazonDynamodbSourceState.java
new file mode 100644
index 00000000000..9edb9dae7d4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/state/AmazonDynamodbSourceState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.state;
+
+import java.io.Serializable;
+
+public class AmazonDynamodbSourceState implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 37dd5223c7e..cb91f71faf5 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -56,6 +56,7 @@
connector-mongodb
connector-iceberg
connector-influxdb
+ connector-amazondynamodb
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 4c602b04ee1..f7efdc6a888 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -290,6 +290,12 @@
${project.version}
provided
+
+ org.apache.seatunnel
+ connector-amazondynamodb
+ ${project.version}
+ provided
+
From 08908af6d5923eff555505f3e619113224169757 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Thu, 27 Oct 2022 20:53:33 +0800
Subject: [PATCH 02/24] add amazondynamodb source
---
.../connector-amazondynamodb/pom.xml | 5 ++
.../config/AmazondynamodbConfig.java | 1 -
.../config/AmazondynamodbSourceOptions.java | 7 ++
.../source/AmazondynamodbSource.java | 32 ++-----
.../source/AmazondynamodbSourceReader.java | 88 ++++++++++++++++---
.../AmazondynamodbSourceSplitEnumerator.java | 71 +++++++++++++++
.../seatunnel/common/config/CommonConfig.java | 22 +++++
7 files changed, 192 insertions(+), 34 deletions(-)
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceSplitEnumerator.java
create mode 100644 seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml b/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml
index 0b04ef2ab43..5215b2e0d4b 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml
@@ -48,6 +48,11 @@
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
software.amazon.awssdk
dynamodb-enhanced
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java
index 05810dc4a81..4e8ce67f2cc 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java
@@ -25,5 +25,4 @@ public class AmazondynamodbConfig implements Serializable {
public static final String ACCESS_KEY_ID = "accessKeyId";
public static final String SECRET_ACCESS_KEY = "secretAccessKey";
public static final String QUERY = "query";
-
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
index 3bce7650b6d..ad4956e82a4 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
+import org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig;
+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.AllArgsConstructor;
@@ -38,6 +40,8 @@ public class AmazondynamodbSourceOptions implements Serializable {
private String query;
+ private Config schema;
+
public AmazondynamodbSourceOptions(Config config) {
if (config.hasPath(AmazondynamodbConfig.URL)) {
this.url = config.getString(AmazondynamodbConfig.URL);
@@ -54,5 +58,8 @@ public AmazondynamodbSourceOptions(Config config) {
if (config.hasPath(AmazondynamodbConfig.QUERY)) {
this.query = config.getString(AmazondynamodbConfig.QUERY);
}
+ if (config.hasPath(CommonConfig.SCHEMA)) {
+ this.schema = config.getConfig(CommonConfig.SCHEMA);
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
index 0c9c22662e4..1db1e9b0b31 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
@@ -24,32 +24,24 @@
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.state.AmazonDynamodbSourceState;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.protocol.MarshallingType;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.DynamoDbResponseMetadata;
-import software.amazon.awssdk.services.dynamodb.model.ExecuteStatementRequest;
-import software.amazon.awssdk.services.dynamodb.model.ExecuteStatementResponse;
-
-import java.net.URI;
@Slf4j
@AutoService(SeaTunnelSource.class)
public class AmazondynamodbSource implements SeaTunnelSource {
- private DynamoDbClient dynamoDbClient;
-
private AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+ private SeaTunnelRowType typeInfo;
+
@Override
public String getPluginName() {
return "Amazondynamodb";
@@ -58,13 +50,7 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
amazondynamodbSourceOptions = new AmazondynamodbSourceOptions(pluginConfig);
- dynamoDbClient = DynamoDbClient.builder()
- .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
- // The region is meaningless for local DynamoDb but required for client builder validation
- .region(Region.of(amazondynamodbSourceOptions.getRegion()))
- .credentialsProvider(StaticCredentialsProvider.create(
- AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey())))
- .build();
+
}
@Override
@@ -74,14 +60,14 @@ public Boundedness getBoundedness() {
@Override
public SeaTunnelDataType getProducedType() {
- ExecuteStatementResponse executeStatementResponse = dynamoDbClient.executeStatement(ExecuteStatementRequest.builder().statement(amazondynamodbSourceOptions.getQuery()).build());
-
- return null;
+ Config schema = amazondynamodbSourceOptions.getSchema();
+ typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ return this.typeInfo;
}
@Override
public SourceReader createReader(SourceReader.Context readerContext) {
- return new AmazondynamodbSourceReader(dynamoDbClient, readerContext, amazondynamodbSourceOptions);
+ return new AmazondynamodbSourceReader(readerContext, amazondynamodbSourceOptions);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
index 7803c83fc98..a76780ebd77 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
@@ -19,31 +19,57 @@
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.ExecuteStatementRequest;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+@Slf4j
public class AmazondynamodbSourceReader implements SourceReader {
- DynamoDbClient dynamoDbClient;
- SourceReader.Context context;
- ExecuteStatementRequest executeStatement;
- AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+ protected DynamoDbClient dynamoDbClient;
+ protected SourceReader.Context context;
+ protected AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+ protected Deque splits = new LinkedList<>();
+ protected boolean noMoreSplit;
+ protected SeaTunnelRowType typeInfo;
- public AmazondynamodbSourceReader(DynamoDbClient dynamoDbClient, SourceReader.Context context, AmazondynamodbSourceOptions amazondynamodbSourceOptions) {
- this.dynamoDbClient = dynamoDbClient;
+ public AmazondynamodbSourceReader(SourceReader.Context context,
+ AmazondynamodbSourceOptions amazondynamodbSourceOptions,
+ SeaTunnelRowType typeInfo) {
this.context = context;
this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+ this.typeInfo = typeInfo;
}
@Override
public void open() throws Exception {
- executeStatement = ExecuteStatementRequest.builder().statement(amazondynamodbSourceOptions.getQuery()).build();
+ dynamoDbClient = DynamoDbClient.builder()
+ .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+ // The region is meaningless for local DynamoDb but required for client builder validation
+ .region(Region.of(amazondynamodbSourceOptions.getRegion()))
+ .credentialsProvider(StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey())))
+ .build();
}
@Override
@@ -52,7 +78,25 @@ public void close() throws IOException {
}
@Override
+ @SuppressWarnings("magicnumber")
public void pollNext(Collector output) throws Exception {
+ synchronized (output.getCheckpointLock()) {
+ AmazondynamodbSourceSplit split = splits.poll();
+ if (null != split) {
+ QueryResponse query = dynamoDbClient.query(QueryRequest.builder().build());
+ if (query.hasItems()) {
+ query.items().forEach(item -> {
+ output.collect(converterToRow(item, typeInfo));
+ });
+ }
+ } else if (noMoreSplit) {
+ // signal to the source that we have reached the end of the data.
+ log.info("Closed the bounded amazondynamodb source");
+ context.signalNoMoreElement();
+ } else {
+ Thread.sleep(1000L);
+ }
+ }
}
@Override
@@ -62,16 +106,40 @@ public List snapshotState(long checkpointId) throws E
@Override
public void addSplits(List splits) {
-
+ this.splits.addAll(splits);
}
@Override
public void handleNoMoreSplits() {
-
+ noMoreSplit = true;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
}
+
+ private SeaTunnelRow converterToRow(Map item, SeaTunnelRowType typeInfo) {
+ List
-
\ No newline at end of file
+
From 05c05d9d095a5e8ac5a2c01c1de41045425de480 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Sun, 30 Oct 2022 21:00:35 +0800
Subject: [PATCH 10/24] fix e2e test error
---
.../e2e/connector/amazondynamodb/AmazondynamodbIT.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java
index 56bba3d2995..30ff6c99d41 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java
@@ -32,6 +32,7 @@
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -107,13 +108,14 @@ public void startUp() throws Exception {
.withNetworkAliases(AMAZONDYNAMODB_CONTAINER_HOST)
.withExposedPorts(AMAZONDYNAMODB_CONTAINER_PORT)
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(AMAZONDYNAMODB_DOCKER_IMAGE)));
+ dynamoDB.setPortBindings(Lists.newArrayList(String.format("%s:%s", AMAZONDYNAMODB_CONTAINER_PORT, AMAZONDYNAMODB_CONTAINER_PORT)));
Startables.deepStart(Stream.of(dynamoDB)).join();
log.info("dynamodb container started");
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(30, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
.untilAsserted(this::initializeDynamodbClient);
batchInsertData();
@@ -129,7 +131,7 @@ public void tearDown() throws Exception {
private void initializeDynamodbClient() throws ConnectException {
dynamoDbClient = DynamoDbClient.builder()
- .endpointOverride(URI.create("http://" + AMAZONDYNAMODB_CONTAINER_HOST + ":" + AMAZONDYNAMODB_CONTAINER_PORT))
+ .endpointOverride(URI.create("http://" + dynamoDB.getHost() + ":" + AMAZONDYNAMODB_CONTAINER_PORT))
// The region is meaningless for local DynamoDb but required for client builder validation
.region(Region.US_EAST_1)
.credentialsProvider(StaticCredentialsProvider.create(
@@ -188,7 +190,7 @@ private Map randomRow() {
"c_timestamp"
},
new SeaTunnelDataType[]{
- BasicType.LONG_TYPE,
+ BasicType.STRING_TYPE,
new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
ArrayType.BYTE_ARRAY_TYPE,
BasicType.STRING_TYPE,
@@ -208,7 +210,7 @@ private Map randomRow() {
SeaTunnelRow row = new SeaTunnelRow(
new Object[]{
- 1L,
+ "1",
Collections.singletonMap("key", Short.parseShort("1")),
new Byte[]{Byte.parseByte("1")},
"string",
From b586ce96a7e2d43efd1beef776fec7ddddadaba2 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Sun, 30 Oct 2022 21:02:39 +0800
Subject: [PATCH 11/24] fix e2e test error
---
.../e2e/connector/amazondynamodb/AmazondynamodbIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java
index 30ff6c99d41..14730d25558 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java
@@ -83,7 +83,7 @@ public class AmazondynamodbIT extends TestSuiteBase implements TestResource {
private static final String AMAZONDYNAMODB_DOCKER_IMAGE = "amazon/dynamodb-local";
private static final String AMAZONDYNAMODB_CONTAINER_HOST = "dynamodb-host";
private static final int AMAZONDYNAMODB_CONTAINER_PORT = 8000;
- private static final String AMAZONDYNAMODB_JOB_CONFIG = "/clickhouse_to_clickhouse.conf";
+ private static final String AMAZONDYNAMODB_JOB_CONFIG = "/amazondynamodbIT_source_to_sink.conf";
private static final String SINK_TABLE = "sink_table";
private static final String SOURCE_TABLE = "source_table";
private static final String PARTITION_KEY = "id";
From c0a97519c313966ff099d19732f0663a717c8e1e Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Sun, 30 Oct 2022 22:00:36 +0800
Subject: [PATCH 12/24] fix e2e test error
---
.../src/test/resources/amazondynamodbIT_source_to_sink.conf | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
index 4f4e770b44b..cfbd7b4f97d 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf
@@ -30,10 +30,10 @@ source {
region = "us-east-1"
accessKeyId = "dummy-key"
secretAccessKey = "dummy-secret"
- table = "SINK_TABLE"
+ table = "source_table"
schema = {
fields {
- id = bigint
+ id = string
c_map = "map"
c_array = "array"
c_string = string
@@ -65,7 +65,7 @@ sink {
region = "us-east-1"
accessKeyId = "dummy-key"
secretAccessKey = "dummy-secret"
- table = "SINK_TABLE"
+ table = "sink_table"
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
From 91ab78bcf83776a3e4c743febb801192718a37fb Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Sun, 30 Oct 2022 22:32:35 +0800
Subject: [PATCH 13/24] fix e2e test error
---
.../amazondynamodb/source/AmazondynamodbSourceReader.java | 3 +++
1 file changed, 3 insertions(+)
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
index f3656b3ccd6..5c9a54faf5b 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
@@ -120,6 +120,9 @@ private Object convert(SeaTunnelDataType> seaTunnelDataType, AttributeValue at
} else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
return attributeValue.bool();
} else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+ if (attributeValue.n() != null) {
+ return Byte.parseByte(attributeValue.n());
+ }
return attributeValue.s().getBytes(StandardCharsets.UTF_8)[0];
} else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
return Short.parseShort(attributeValue.n());
From 38692ef267b12c5f3013e1df570837ecb4aaa6b2 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Sun, 30 Oct 2022 23:37:19 +0800
Subject: [PATCH 14/24] fix e2e test error
---
.../source/AmazondynamodbSource.java | 4 +--
.../source/AmazondynamodbSourceReader.java | 33 ++++++++++++-------
2 files changed, 23 insertions(+), 14 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
index bef92154107..b19854777f4 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
@@ -50,6 +50,8 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
amazondynamodbSourceOptions = new AmazondynamodbSourceOptions(pluginConfig);
+ Config schema = amazondynamodbSourceOptions.getSchema();
+ typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
}
@Override
@@ -59,8 +61,6 @@ public Boundedness getBoundedness() {
@Override
public SeaTunnelDataType getProducedType() {
- Config schema = amazondynamodbSourceOptions.getSchema();
- typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
return this.typeInfo;
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
index 5c9a54faf5b..d5d85e83df4 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
@@ -34,6 +34,7 @@
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -41,6 +42,7 @@
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import java.io.IOException;
+import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -153,23 +155,30 @@ private Object convert(SeaTunnelDataType> seaTunnelDataType, AttributeValue at
});
return seatunnelMap;
} else if (seaTunnelDataType instanceof ArrayType) {
- List seatunnelList = new ArrayList<>(attributeValue.l().size());
+ Object array = Array.newInstance(String.class, attributeValue.l().size());
if (attributeValue.hasL()) {
- attributeValue.l().forEach(l -> {
- seatunnelList.add(convert(((ArrayType, ?>) seaTunnelDataType).getElementType(), l));
- });
+ List datas = attributeValue.l();
+ array = Array.newInstance(((ArrayType, ?>) seaTunnelDataType).getElementType().getTypeClass(), attributeValue.l().size());
+ for (int index = 0; index < datas.size(); index++) {
+ Array.set(array, index, convert(((ArrayType, ?>) seaTunnelDataType).getElementType(), datas.get(index)));
+ }
} else if (attributeValue.hasSs()) {
- seatunnelList.addAll(attributeValue.ss());
+ List datas = attributeValue.ss();
+ for (int index = 0; index < datas.size(); index++) {
+ Array.set(array, index, AttributeValue.fromS(datas.get(index)));
+ }
} else if (attributeValue.hasNs()) {
- attributeValue.ns().forEach(s -> {
- seatunnelList.addAll(attributeValue.ns());
- });
+ List datas = attributeValue.ns();
+ for (int index = 0; index < datas.size(); index++) {
+ Array.set(array, index, AttributeValue.fromS(datas.get(index)));
+ }
} else if (attributeValue.hasBs()) {
- attributeValue.bs().forEach(s -> {
- seatunnelList.add(s.asByteArray());
- });
+ List datas = attributeValue.bs();
+ for (int index = 0; index < datas.size(); index++) {
+ Array.set(array, index, AttributeValue.fromB(datas.get(index)));
+ }
}
- return seatunnelList;
+ return array;
} else {
throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
}
From efebf11e31f31a8c1057701d823411812f389ffc Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Sun, 30 Oct 2022 23:46:26 +0800
Subject: [PATCH 15/24] fix e2e test error
---
.../e2e/connector/amazondynamodb/AmazondynamodbIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java
index 14730d25558..de78241c9c5 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java
@@ -92,7 +92,7 @@ public class AmazondynamodbIT extends TestSuiteBase implements TestResource {
protected DynamoDbClient dynamoDbClient;
@TestTemplate
- public void testClickhouse(TestContainer container) throws Exception {
+ public void testAmazondynamodb(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob(AMAZONDYNAMODB_JOB_CONFIG);
Assertions.assertEquals(0, execResult.getExitCode());
assertHasData();
From 73bb6dfbdae5cfb4a59e9228f76be1e2488199f2 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Mon, 31 Oct 2022 14:46:44 +0800
Subject: [PATCH 16/24] add deserializer and serializer
---
.../DefaultSeaTunnelRowDeserializer.java | 137 ++++++++++++++++
.../DefaultSeaTunnelRowSerializer.java | 147 ++++++++++++++++++
.../serialize/SeaTunnelRowDeserializer.java | 29 ++++
.../serialize/SeaTunnelRowSerializer.java | 27 ++++
.../sink/AmazondynamodbWriter.java | 121 +-------------
.../sink/DdynamoDbSinkClient.java | 29 ++++
.../source/AmazondynamodbSourceReader.java | 109 +------------
7 files changed, 379 insertions(+), 220 deletions(-)
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowDeserializer.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java
new file mode 100644
index 00000000000..f990665a90a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer {
+
+ private final SeaTunnelRowType typeInfo;
+
+ public DefaultSeaTunnelRowDeserializer(SeaTunnelRowType typeInfo) {
+ this.typeInfo = typeInfo;
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(Map item) {
+ SeaTunnelDataType>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+ return new SeaTunnelRow(convertRow(seaTunnelDataTypes, item).toArray());
+ }
+
+ private List convertRow(SeaTunnelDataType>[] seaTunnelDataTypes, Map item) {
+ List fields = new ArrayList<>();
+ String[] fieldNames = typeInfo.getFieldNames();
+ for (int i = 0; i < seaTunnelDataTypes.length; i++) {
+ SeaTunnelDataType> seaTunnelDataType = seaTunnelDataTypes[i];
+ AttributeValue attributeValue = item.get(fieldNames[i]);
+ fields.add(convert(seaTunnelDataType, attributeValue));
+ }
+ return fields;
+ }
+
+ private Object convert(SeaTunnelDataType> seaTunnelDataType, AttributeValue attributeValue) {
+ if (attributeValue.type().equals(AttributeValue.Type.NUL)) {
+ return null;
+ } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+ return attributeValue.bool();
+ } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+ if (attributeValue.n() != null) {
+ return Byte.parseByte(attributeValue.n());
+ }
+ return attributeValue.s().getBytes(StandardCharsets.UTF_8)[0];
+ } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+ return Short.parseShort(attributeValue.n());
+ } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+ return Integer.parseInt(attributeValue.n());
+ } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+ return Long.parseLong(attributeValue.n());
+ } else if (seaTunnelDataType instanceof DecimalType) {
+ return new BigDecimal(attributeValue.n());
+ } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+ return Float.parseFloat(attributeValue.n());
+ } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+ return Double.parseDouble(attributeValue.n());
+ } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+ return attributeValue.s();
+ } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
+ return LocalTime.parse(attributeValue.s());
+ } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) {
+ return LocalDate.parse(attributeValue.s());
+ } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
+ return LocalDateTime.parse(attributeValue.s());
+ } else if (PrimitiveByteArrayType.INSTANCE.equals(seaTunnelDataType)) {
+ return attributeValue.b().asByteArray();
+ } else if (seaTunnelDataType instanceof MapType) {
+ Map seatunnelMap = new HashMap<>();
+ attributeValue.m().forEach((s, attributeValueInfo) -> {
+ seatunnelMap.put(s, convert(((MapType) seaTunnelDataType).getValueType(), attributeValueInfo));
+ });
+ return seatunnelMap;
+ } else if (seaTunnelDataType instanceof ArrayType) {
+ Object array = Array.newInstance(String.class, attributeValue.l().size());
+ if (attributeValue.hasL()) {
+ List datas = attributeValue.l();
+ array = Array.newInstance(((ArrayType, ?>) seaTunnelDataType).getElementType().getTypeClass(), attributeValue.l().size());
+ for (int index = 0; index < datas.size(); index++) {
+ Array.set(array, index, convert(((ArrayType, ?>) seaTunnelDataType).getElementType(), datas.get(index)));
+ }
+ } else if (attributeValue.hasSs()) {
+ List datas = attributeValue.ss();
+ for (int index = 0; index < datas.size(); index++) {
+ Array.set(array, index, AttributeValue.fromS(datas.get(index)));
+ }
+ } else if (attributeValue.hasNs()) {
+ List datas = attributeValue.ns();
+ for (int index = 0; index < datas.size(); index++) {
+ Array.set(array, index, AttributeValue.fromS(datas.get(index)));
+ }
+ } else if (attributeValue.hasBs()) {
+ List datas = attributeValue.bs();
+ for (int index = 0; index < datas.size(); index++) {
+ Array.set(array, index, AttributeValue.fromB(datas.get(index)));
+ }
+ }
+ return array;
+ } else {
+ throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
+ }
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
new file mode 100644
index 00000000000..017436c20b3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
+
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+ private final List measurementsType;
+
+ public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType, AmazondynamodbSourceOptions amazondynamodbSourceOptions) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+ this.measurementsType = convertTypes(seaTunnelRowType);
+ }
+
+ @Override
+ public PutItemRequest serialize(SeaTunnelRow seaTunnelRow) {
+ HashMap itemValues = new HashMap<>();
+ for (int index = 0; index < seaTunnelRowType.getFieldNames().length; index++) {
+ itemValues.put(seaTunnelRowType.getFieldName(index), convertItem(seaTunnelRow.getField(index),
+ seaTunnelRowType.getFieldType(index),
+ measurementsType.get(index)));
+ }
+ return PutItemRequest.builder()
+ .tableName(amazondynamodbSourceOptions.getTable())
+ .item(itemValues)
+ .build();
+ }
+
+ private List convertTypes(SeaTunnelRowType seaTunnelRowType) {
+ return Arrays.stream(seaTunnelRowType.getFieldTypes()).map(this::convertType).collect(Collectors.toList());
+ }
+
+ private AttributeValue.Type convertType(SeaTunnelDataType> seaTunnelDataType) {
+ switch (seaTunnelDataType.getSqlType()) {
+ case INT:
+ case TINYINT:
+ case SMALLINT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ return AttributeValue.Type.N;
+ case STRING:
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ return AttributeValue.Type.S;
+ case BOOLEAN:
+ return AttributeValue.Type.BOOL;
+ case NULL:
+ return AttributeValue.Type.NUL;
+ case BYTES:
+ return AttributeValue.Type.B;
+ case MAP:
+ return AttributeValue.Type.M;
+ case ARRAY:
+ return AttributeValue.Type.L;
+ default:
+ throw new UnsupportedOperationException("Unsupported dataType: " + seaTunnelDataType);
+ }
+ }
+
+ private AttributeValue convertItem(Object value, SeaTunnelDataType seaTunnelDataType, AttributeValue.Type measurementsType) {
+ if (value == null) {
+ return AttributeValue.builder().nul(true).build();
+ }
+ switch (measurementsType) {
+ case N:
+ return AttributeValue.builder().n(Integer.toString(((Number) value).intValue())).build();
+ case S:
+ return AttributeValue.builder().s(String.valueOf(value)).build();
+ case BOOL:
+ return AttributeValue.builder().bool((Boolean) value).build();
+ case B:
+ return AttributeValue.builder().b(SdkBytes.fromByteArrayUnsafe((byte[]) value)).build();
+ case SS:
+ return AttributeValue.builder().ss((Collection) value).build();
+ case NS:
+ return AttributeValue.builder().ns(((Collection) value)
+ .stream().map(Object::toString).collect(Collectors.toList())).build();
+ case BS:
+ return AttributeValue.builder().bs(
+ ((Collection) value)
+ .stream().map(number ->
+ SdkBytes.fromByteArray((byte[]) value)).collect(Collectors.toList())
+ ).build();
+ case M:
+ MapType, ?> mapType = (MapType, ?>) seaTunnelDataType;
+ Map map = (Map) value;
+ Map resultMap = new HashMap<>(map.size());
+ for (Map.Entry entry : map.entrySet()) {
+ String mapKeyName = entry.getKey();
+ resultMap.put(mapKeyName, convertItem(entry.getValue(), mapType.getValueType(), convertType(mapType.getValueType())));
+ }
+ return AttributeValue.builder().m(resultMap).build();
+ case L:
+ ArrayType, ?> arrayType = (ArrayType, ?>) seaTunnelDataType;
+ BasicType> elementType = arrayType.getElementType();
+ Object[] l = (Object[]) value;
+ return AttributeValue.builder()
+ .l(Stream.of(l).map(o -> convertItem(o, elementType, convertType(elementType)))
+ .collect(Collectors.toList())).build();
+ case NUL:
+ return AttributeValue.builder().nul(true).build();
+ default:
+ throw new UnsupportedOperationException("Unsupported dataType: " + measurementsType);
+ }
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowDeserializer.java
new file mode 100644
index 00000000000..5596701a1a3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowDeserializer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.util.Map;
+
+public interface SeaTunnelRowDeserializer {
+
+ SeaTunnelRow deserialize(Map item);
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 00000000000..c4b30e27f0c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+
+public interface SeaTunnelRowSerializer {
+
+ PutItemRequest serialize(SeaTunnelRow seaTunnelRow);
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java
index 1da8ee6d11e..6b439add386 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java
@@ -17,43 +17,27 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
-import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import java.io.IOException;
import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
public class AmazondynamodbWriter extends AbstractSinkWriter {
- private final AmazondynamodbSourceOptions amazondynamodbSourceOptions;
- private final SeaTunnelRowType seaTunnelRowType;
private final DynamoDbClient dynamoDbClient;
- private final List measurementsType;
+ private final SeaTunnelRowSerializer serializer;
public AmazondynamodbWriter(AmazondynamodbSourceOptions amazondynamodbSourceOptions, SeaTunnelRowType seaTunnelRowType) {
- this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
- this.seaTunnelRowType = seaTunnelRowType;
dynamoDbClient = DynamoDbClient.builder()
.endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
// The region is meaningless for local DynamoDb but required for client builder validation
@@ -61,13 +45,12 @@ public AmazondynamodbWriter(AmazondynamodbSourceOptions amazondynamodbSourceOpti
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey())))
.build();
- this.measurementsType = convertTypes(seaTunnelRowType);
+ serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, amazondynamodbSourceOptions);
}
@Override
public void write(SeaTunnelRow element) throws IOException {
-
- dynamoDbClient.putItem(convertRow(element, seaTunnelRowType));
+ dynamoDbClient.putItem(serializer.serialize(element));
}
@Override
@@ -76,98 +59,4 @@ public void close() {
dynamoDbClient.close();
}
}
-
- private PutItemRequest convertRow(SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType) {
- HashMap itemValues = new HashMap<>();
- for (int index = 0; index < seaTunnelRowType.getFieldNames().length; index++) {
- itemValues.put(seaTunnelRowType.getFieldName(index), convertItem(element.getField(index),
- seaTunnelRowType.getFieldType(index),
- measurementsType.get(index)));
- }
- return PutItemRequest.builder()
- .tableName(amazondynamodbSourceOptions.getTable())
- .item(itemValues)
- .build();
- }
-
- private AttributeValue convertItem(Object value, SeaTunnelDataType seaTunnelDataType, AttributeValue.Type measurementsType) {
- if (value == null) {
- return AttributeValue.builder().nul(true).build();
- }
- switch (measurementsType) {
- case N:
- return AttributeValue.builder().n(Integer.toString(((Number) value).intValue())).build();
- case S:
- return AttributeValue.builder().s(String.valueOf(value)).build();
- case BOOL:
- return AttributeValue.builder().bool((Boolean) value).build();
- case B:
- return AttributeValue.builder().b(SdkBytes.fromByteArrayUnsafe((byte[]) value)).build();
- case SS:
- return AttributeValue.builder().ss((Collection) value).build();
- case NS:
- return AttributeValue.builder().ns(((Collection) value)
- .stream().map(Object::toString).collect(Collectors.toList())).build();
- case BS:
- return AttributeValue.builder().bs(
- ((Collection) value)
- .stream().map(number ->
- SdkBytes.fromByteArray((byte[]) value)).collect(Collectors.toList())
- ).build();
- case M:
- MapType, ?> mapType = (MapType, ?>) seaTunnelDataType;
- Map map = (Map) value;
- Map resultMap = new HashMap<>(map.size());
- for (Map.Entry entry : map.entrySet()) {
- String mapKeyName = entry.getKey();
- resultMap.put(mapKeyName, convertItem(entry.getValue(), mapType.getValueType(), convertType(mapType.getValueType())));
- }
- return AttributeValue.builder().m(resultMap).build();
- case L:
- ArrayType, ?> arrayType = (ArrayType, ?>) seaTunnelDataType;
- BasicType> elementType = arrayType.getElementType();
- Object[] l = (Object[]) value;
- return AttributeValue.builder()
- .l(Stream.of(l).map(o -> convertItem(o, elementType, convertType(elementType)))
- .collect(Collectors.toList())).build();
- case NUL:
- return AttributeValue.builder().nul(true).build();
- default:
- throw new UnsupportedOperationException("Unsupported dataType: " + measurementsType);
- }
- }
-
- private List convertTypes(SeaTunnelRowType seaTunnelRowType) {
- return Arrays.stream(seaTunnelRowType.getFieldTypes()).map(seaTunnelDataType -> convertType(seaTunnelDataType)).collect(Collectors.toList());
- }
-
- private AttributeValue.Type convertType(SeaTunnelDataType seaTunnelDataType) {
- switch (seaTunnelDataType.getSqlType()) {
- case INT:
- case TINYINT:
- case SMALLINT:
- case BIGINT:
- case FLOAT:
- case DOUBLE:
- case DECIMAL:
- return AttributeValue.Type.N;
- case STRING:
- case DATE:
- case TIME:
- case TIMESTAMP:
- return AttributeValue.Type.S;
- case BOOLEAN:
- return AttributeValue.Type.BOOL;
- case NULL:
- return AttributeValue.Type.NUL;
- case BYTES:
- return AttributeValue.Type.B;
- case MAP:
- return AttributeValue.Type.M;
- case ARRAY:
- return AttributeValue.Type.L;
- default:
- throw new UnsupportedOperationException("Unsupported dataType: " + seaTunnelDataType);
- }
- }
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
new file mode 100644
index 00000000000..dbeaac4e8b8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+
+/**
+ * Copyright @ 2022科大讯飞。 All rights reserved.
+ * @Title DdynamoDbSinkClient
+ * @Project incubator-seatunnel
+ * @Description TODO
+ * @author gdliu3
+ * @date 2022/10/31 14:21
+ */
+public class DdynamoDbSinkClient {
+}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
index d5d85e83df4..b4d4575cb50 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
@@ -18,41 +18,24 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import java.io.IOException;
-import java.lang.reflect.Array;
-import java.math.BigDecimal;
import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
@Slf4j
public class AmazondynamodbSourceReader extends AbstractSingleSplitReader {
@@ -61,6 +44,7 @@ public class AmazondynamodbSourceReader extends AbstractSingleSplitReader output) throws Exception {
.build());
if (scan.hasItems()) {
scan.items().forEach(item -> {
- output.collect(converterToRow(item, typeInfo));
+ output.collect(seaTunnelRowDeserializer.deserialize(item));
});
}
context.signalNoMoreElement();
}
-
- private SeaTunnelRow converterToRow(Map item, SeaTunnelRowType typeInfo) {
- SeaTunnelDataType>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
- return new SeaTunnelRow(convertRow(seaTunnelDataTypes, item).toArray());
- }
-
- private List convertRow(SeaTunnelDataType>[] seaTunnelDataTypes, Map item) {
- List fields = new ArrayList<>();
- String[] fieldNames = typeInfo.getFieldNames();
- for (int i = 0; i < seaTunnelDataTypes.length; i++) {
- SeaTunnelDataType> seaTunnelDataType = seaTunnelDataTypes[i];
- AttributeValue attributeValue = item.get(fieldNames[i]);
- fields.add(convert(seaTunnelDataType, attributeValue));
- }
- return fields;
- }
-
- private Object convert(SeaTunnelDataType> seaTunnelDataType, AttributeValue attributeValue) {
- if (attributeValue.type().equals(AttributeValue.Type.NUL)) {
- return null;
- } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
- return attributeValue.bool();
- } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
- if (attributeValue.n() != null) {
- return Byte.parseByte(attributeValue.n());
- }
- return attributeValue.s().getBytes(StandardCharsets.UTF_8)[0];
- } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
- return Short.parseShort(attributeValue.n());
- } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
- return Integer.parseInt(attributeValue.n());
- } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
- return Long.parseLong(attributeValue.n());
- } else if (seaTunnelDataType instanceof DecimalType) {
- return new BigDecimal(attributeValue.n());
- } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
- return Float.parseFloat(attributeValue.n());
- } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
- return Double.parseDouble(attributeValue.n());
- } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
- return attributeValue.s();
- } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
- return LocalTime.parse(attributeValue.s());
- } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) {
- return LocalDate.parse(attributeValue.s());
- } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
- return LocalDateTime.parse(attributeValue.s());
- } else if (PrimitiveByteArrayType.INSTANCE.equals(seaTunnelDataType)) {
- return attributeValue.b().asByteArray();
- } else if (seaTunnelDataType instanceof MapType) {
- Map seatunnelMap = new HashMap<>();
- attributeValue.m().forEach((s, attributeValueInfo) -> {
- seatunnelMap.put(s, convert(((MapType) seaTunnelDataType).getValueType(), attributeValueInfo));
- });
- return seatunnelMap;
- } else if (seaTunnelDataType instanceof ArrayType) {
- Object array = Array.newInstance(String.class, attributeValue.l().size());
- if (attributeValue.hasL()) {
- List datas = attributeValue.l();
- array = Array.newInstance(((ArrayType, ?>) seaTunnelDataType).getElementType().getTypeClass(), attributeValue.l().size());
- for (int index = 0; index < datas.size(); index++) {
- Array.set(array, index, convert(((ArrayType, ?>) seaTunnelDataType).getElementType(), datas.get(index)));
- }
- } else if (attributeValue.hasSs()) {
- List datas = attributeValue.ss();
- for (int index = 0; index < datas.size(); index++) {
- Array.set(array, index, AttributeValue.fromS(datas.get(index)));
- }
- } else if (attributeValue.hasNs()) {
- List datas = attributeValue.ns();
- for (int index = 0; index < datas.size(); index++) {
- Array.set(array, index, AttributeValue.fromS(datas.get(index)));
- }
- } else if (attributeValue.hasBs()) {
- List datas = attributeValue.bs();
- for (int index = 0; index < datas.size(); index++) {
- Array.set(array, index, AttributeValue.fromB(datas.get(index)));
- }
- }
- return array;
- } else {
- throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
- }
- }
}
From 39414cb7bf9b2fc9617d140a019106554c1850ed Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Mon, 31 Oct 2022 14:49:14 +0800
Subject: [PATCH 17/24] remove unnecessary code
---
.../amazondynamodb/sink/DdynamoDbSinkClient.java | 8 --------
1 file changed, 8 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
index dbeaac4e8b8..75166b69fc2 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
@@ -17,13 +17,5 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
-/**
- * Copyright @ 2022科大讯飞。 All rights reserved.
- * @Title DdynamoDbSinkClient
- * @Project incubator-seatunnel
- * @Description TODO
- * @author gdliu3
- * @date 2022/10/31 14:21
- */
public class DdynamoDbSinkClient {
}
From 4e1e17c6b430c76a0e6eb22aa5c6d31441c9def8 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Mon, 31 Oct 2022 19:38:17 +0800
Subject: [PATCH 18/24] add batch write.
---
docs/en/connector-v2/sink/Amazondynamodb.md | 20 ++--
docs/en/connector-v2/source/Amazondynamodb.md | 20 ++--
.../config/AmazondynamodbConfig.java | 6 +-
.../config/AmazondynamodbSourceOptions.java | 12 ++
.../DefaultSeaTunnelRowDeserializer.java | 6 +-
.../sink/AmazondynamodbWriter.java | 24 +---
.../sink/DdynamoDbSinkClient.java | 113 ++++++++++++++++++
.../source/AmazondynamodbSourceReader.java | 2 -
.../amazondynamodbIT_source_to_sink.conf | 8 +-
9 files changed, 161 insertions(+), 50 deletions(-)
diff --git a/docs/en/connector-v2/sink/Amazondynamodb.md b/docs/en/connector-v2/sink/Amazondynamodb.md
index fc0cf748441..5e30613dee2 100644
--- a/docs/en/connector-v2/sink/Amazondynamodb.md
+++ b/docs/en/connector-v2/sink/Amazondynamodb.md
@@ -17,14 +17,16 @@ Write data to `Amazondynamodb`
## Options
-| name | type | required | default value |
-|--------------- | ------ |----------| ------------- |
-| url | string | yes | - |
-| region | string | yes | - |
-| accessKeyId | string | yes | - |
-| secretAccessKey| string | yes | - |
-| table | string | yes | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|----------------- | ------ |----------| ------------- |
+| url | string | yes | - |
+| region | string | yes | - |
+| access_key_id | string | yes | - |
+| secret_access_key| string | yes | - |
+| table | string | yes | - |
+| batch_size | string | no | 25 |
+| batch_interval_ms| string | no | 1000 |
+| common-options | | no | - |
### url [string]
@@ -68,4 +70,4 @@ Amazondynamodb {
- Add Amazondynamodb Sink Connector
-````
\ No newline at end of file
+````
diff --git a/docs/en/connector-v2/source/Amazondynamodb.md b/docs/en/connector-v2/source/Amazondynamodb.md
index aa2f38180a7..050a95f951a 100644
--- a/docs/en/connector-v2/source/Amazondynamodb.md
+++ b/docs/en/connector-v2/source/Amazondynamodb.md
@@ -17,15 +17,15 @@ Read data from Amazondynamodb.
## Options
-| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| url | string | yes | - |
-| region | string | yes | - |
-| accessKeyId | string | yes | - |
-| secretAccessKey| string | yes | - |
-| table | string | yes | - |
-| schema | object | yes | - |
-| common-options | | yes | - |
+| name | type | required | default value |
+| ---------------- | ------ | -------- | ------------- |
+| url | string | yes | - |
+| region | string | yes | - |
+| access_key_id | string | yes | - |
+| secret_access_key| string | yes | - |
+| table | string | yes | - |
+| schema | object | yes | - |
+| common-options | | yes | - |
### url [string]
@@ -105,4 +105,4 @@ Source Plugin common parameters, refer to [Source Plugin](common-options.md) for
### next version
-- Add Amazondynamodb Source Connector
\ No newline at end of file
+- Add Amazondynamodb Source Connector
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java
index d615ee3047b..46f5a7f4e80 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java
@@ -22,7 +22,9 @@
public class AmazondynamodbConfig implements Serializable {
public static final String URL = "url";
public static final String REGION = "region";
- public static final String ACCESS_KEY_ID = "accessKeyId";
- public static final String SECRET_ACCESS_KEY = "secretAccessKey";
+ public static final String ACCESS_KEY_ID = "access_key_id";
+ public static final String SECRET_ACCESS_KEY = "secret_access_key";
public static final String TABLE = "table";
+ public static final String BATCH_SIZE = "batch_size";
+ public static final String DEFAULT_BATCH_INTERVAL_MS = "batch_interval_ms";
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
index 9929d84d06b..d04b888f647 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
@@ -30,6 +30,9 @@
@AllArgsConstructor
public class AmazondynamodbSourceOptions implements Serializable {
+ private static final int DEFAULT_BATCH_SIZE = 25;
+ private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
+
private String url;
private String region;
@@ -42,6 +45,9 @@ public class AmazondynamodbSourceOptions implements Serializable {
private Config schema;
+ public int batchSize = DEFAULT_BATCH_SIZE;
+ public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+
public AmazondynamodbSourceOptions(Config config) {
if (config.hasPath(AmazondynamodbConfig.URL)) {
this.url = config.getString(AmazondynamodbConfig.URL);
@@ -61,5 +67,11 @@ public AmazondynamodbSourceOptions(Config config) {
if (config.hasPath(CommonConfig.SCHEMA)) {
this.schema = config.getConfig(CommonConfig.SCHEMA);
}
+ if (config.hasPath(AmazondynamodbConfig.BATCH_SIZE)) {
+ this.batchSize = config.getInt(AmazondynamodbConfig.BATCH_SIZE);
+ }
+ if (config.hasPath(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS)) {
+ this.batchIntervalMs = config.getInt(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS);
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java
index f990665a90a..e70c1c1285a 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java
@@ -27,6 +27,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import lombok.AllArgsConstructor;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -41,14 +42,11 @@
import java.util.List;
import java.util.Map;
+@AllArgsConstructor
public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer {
private final SeaTunnelRowType typeInfo;
- public DefaultSeaTunnelRowDeserializer(SeaTunnelRowType typeInfo) {
- this.typeInfo = typeInfo;
- }
-
@Override
public SeaTunnelRow deserialize(Map item) {
SeaTunnelDataType>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java
index 6b439add386..9f6405790b6 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java
@@ -24,39 +24,25 @@
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-
import java.io.IOException;
-import java.net.URI;
public class AmazondynamodbWriter extends AbstractSinkWriter {
- private final DynamoDbClient dynamoDbClient;
+ private final DdynamoDbSinkClient ddynamoDbSinkClient;
private final SeaTunnelRowSerializer serializer;
public AmazondynamodbWriter(AmazondynamodbSourceOptions amazondynamodbSourceOptions, SeaTunnelRowType seaTunnelRowType) {
- dynamoDbClient = DynamoDbClient.builder()
- .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
- // The region is meaningless for local DynamoDb but required for client builder validation
- .region(Region.of(amazondynamodbSourceOptions.getRegion()))
- .credentialsProvider(StaticCredentialsProvider.create(
- AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey())))
- .build();
+ ddynamoDbSinkClient = new DdynamoDbSinkClient(amazondynamodbSourceOptions, seaTunnelRowType);
serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, amazondynamodbSourceOptions);
}
@Override
public void write(SeaTunnelRow element) throws IOException {
- dynamoDbClient.putItem(serializer.serialize(element));
+ ddynamoDbSinkClient.write(serializer.serialize(element));
}
@Override
- public void close() {
- if (dynamoDbClient != null) {
- dynamoDbClient.close();
- }
+ public void close() throws IOException {
+ ddynamoDbSinkClient.close();
}
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
index 75166b69fc2..d9d54919cd1 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
@@ -17,5 +17,118 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
public class DdynamoDbSinkClient {
+ private final AmazondynamodbSourceOptions amazondynamodbSourceOptions;
+ private ScheduledExecutorService scheduler;
+ private ScheduledFuture> scheduledFuture;
+ private volatile boolean initialize;
+ private volatile Exception flushException;
+ private DynamoDbClient dynamoDbClient;
+ private final List batchList;
+ protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
+
+ public DdynamoDbSinkClient(AmazondynamodbSourceOptions amazondynamodbSourceOptions, SeaTunnelRowType typeInfo) {
+ this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+ this.batchList = new ArrayList<>();
+ this.seaTunnelRowDeserializer = new DefaultSeaTunnelRowDeserializer(typeInfo);
+ }
+
+ private void tryInit() throws IOException {
+ if (initialize) {
+ return;
+ }
+ dynamoDbClient = DynamoDbClient.builder()
+ .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+ // The region is meaningless for local DynamoDb but required for client builder validation
+ .region(Region.of(amazondynamodbSourceOptions.getRegion()))
+ .credentialsProvider(StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey())))
+ .build();
+
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("DdynamoDb-sink-output-%s").build());
+ scheduledFuture = scheduler.scheduleAtFixedRate(
+ () -> {
+ try {
+ flush();
+ } catch (IOException e) {
+ flushException = e;
+ }
+ },
+ amazondynamodbSourceOptions.getBatchIntervalMs(),
+ amazondynamodbSourceOptions.getBatchIntervalMs(),
+ TimeUnit.MILLISECONDS);
+
+ initialize = true;
+ }
+
+ public synchronized void write(PutItemRequest putItemRequest) throws IOException {
+ tryInit();
+ checkFlushException();
+ batchList.add(WriteRequest.builder().putRequest(
+ PutRequest.builder().item(putItemRequest.item()).build()).build());
+ if (amazondynamodbSourceOptions.getBatchSize() > 0
+ && batchList.size() >= amazondynamodbSourceOptions.getBatchSize()) {
+ flush();
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ scheduler.shutdown();
+ }
+ flush();
+ if (dynamoDbClient != null) {
+ dynamoDbClient.close();
+ }
+ }
+
+ synchronized void flush() throws IOException {
+ checkFlushException();
+ if (batchList.isEmpty()) {
+ return;
+ }
+ Map> requestItems = new HashMap<>(1);
+ requestItems.put(amazondynamodbSourceOptions.getTable(), batchList);
+ dynamoDbClient.batchWriteItem(BatchWriteItemRequest
+ .builder()
+ .requestItems(requestItems)
+ .build());
+
+ batchList.clear();
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing items to DdynamoDb failed.", flushException);
+ }
+ }
+
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
index b4d4575cb50..fe8fd47bb7d 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java
@@ -43,7 +43,6 @@ public class AmazondynamodbSourceReader extends AbstractSingleSplitReader
Date: Tue, 1 Nov 2022 12:01:01 +0800
Subject: [PATCH 19/24] modify sink doc
---
docs/en/connector-v2/sink/Amazondynamodb.md | 6 +-----
1 file changed, 1 insertion(+), 5 deletions(-)
diff --git a/docs/en/connector-v2/sink/Amazondynamodb.md b/docs/en/connector-v2/sink/Amazondynamodb.md
index 5e30613dee2..47f003d6a49 100644
--- a/docs/en/connector-v2/sink/Amazondynamodb.md
+++ b/docs/en/connector-v2/sink/Amazondynamodb.md
@@ -1,4 +1,4 @@
-````
+
# Amazondynamodb
> Amazondynamodb sink connector
@@ -9,11 +9,8 @@ Write data to `Amazondynamodb`
## Key features
-- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [schema projection](../../concept/connector-v2-features.md)
-- [ ] [parallelism](../../concept/connector-v2-features.md)
-- [ ] [support user-defined split](../../concept/connector-v2-features.md)
## Options
@@ -70,4 +67,3 @@ Amazondynamodb {
- Add Amazondynamodb Sink Connector
-````
From aaf3d1d08c56b3cfe73da2ccc081931bca51c424 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Wed, 2 Nov 2022 18:05:41 +0800
Subject: [PATCH 20/24] fix cv bug.
---
.../amazondynamodb/sink/AmazondynamodbSink.java | 4 +++-
.../amazondynamodb/sink/DdynamoDbSinkClient.java | 2 +-
.../source/AmazondynamodbSource.java | 14 ++++++++++++++
3 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java
index 1dd43665344..3d196f92bd8 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.ACCESS_KEY_ID;
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.REGION;
+import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.SECRET_ACCESS_KEY;
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.URL;
@@ -54,7 +56,7 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, TABLE, REGION);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, TABLE, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY);
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
index d9d54919cd1..8066261aeb5 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java
@@ -104,8 +104,8 @@ public synchronized void close() throws IOException {
scheduledFuture.cancel(false);
scheduler.shutdown();
}
- flush();
if (dynamoDbClient != null) {
+ flush();
dynamoDbClient.close();
}
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
index b19854777f4..a770d0f4300 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
@@ -17,12 +17,22 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
+import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.ACCESS_KEY_ID;
+import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.REGION;
+import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.SECRET_ACCESS_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.TABLE;
+import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.URL;
+import static org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig.SCHEMA;
+
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
@@ -49,6 +59,10 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, TABLE, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, SCHEMA);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+ }
amazondynamodbSourceOptions = new AmazondynamodbSourceOptions(pluginConfig);
Config schema = amazondynamodbSourceOptions.getSchema();
typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
From 5a7ff67e07c5e308cd8a6946958e2849d0ff4257 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Wed, 2 Nov 2022 22:15:32 +0800
Subject: [PATCH 21/24] fix some error
---
.../config/AmazondynamodbSourceOptions.java | 37 ++++++-------------
.../source/AmazondynamodbSource.java | 3 +-
2 files changed, 12 insertions(+), 28 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
index d04b888f647..10843fe1984 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
@@ -45,33 +45,18 @@ public class AmazondynamodbSourceOptions implements Serializable {
private Config schema;
- public int batchSize = DEFAULT_BATCH_SIZE;
- public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+ private int batchSize;
+
+ private int batchIntervalMs;
public AmazondynamodbSourceOptions(Config config) {
- if (config.hasPath(AmazondynamodbConfig.URL)) {
- this.url = config.getString(AmazondynamodbConfig.URL);
- }
- if (config.hasPath(AmazondynamodbConfig.REGION)) {
- this.region = config.getString(AmazondynamodbConfig.REGION);
- }
- if (config.hasPath(AmazondynamodbConfig.ACCESS_KEY_ID)) {
- this.accessKeyId = config.getString(AmazondynamodbConfig.ACCESS_KEY_ID);
- }
- if (config.hasPath(AmazondynamodbConfig.SECRET_ACCESS_KEY)) {
- this.secretAccessKey = config.getString(AmazondynamodbConfig.SECRET_ACCESS_KEY);
- }
- if (config.hasPath(AmazondynamodbConfig.TABLE)) {
- this.table = config.getString(AmazondynamodbConfig.TABLE);
- }
- if (config.hasPath(CommonConfig.SCHEMA)) {
- this.schema = config.getConfig(CommonConfig.SCHEMA);
- }
- if (config.hasPath(AmazondynamodbConfig.BATCH_SIZE)) {
- this.batchSize = config.getInt(AmazondynamodbConfig.BATCH_SIZE);
- }
- if (config.hasPath(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS)) {
- this.batchIntervalMs = config.getInt(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS);
- }
+ this.url = config.getString(AmazondynamodbConfig.URL);
+ this.region = config.getString(AmazondynamodbConfig.REGION);
+ this.accessKeyId = config.getString(AmazondynamodbConfig.ACCESS_KEY_ID);
+ this.secretAccessKey = config.getString(AmazondynamodbConfig.SECRET_ACCESS_KEY);
+ this.table = config.getString(AmazondynamodbConfig.TABLE);
+ this.schema = config.getConfig(CommonConfig.SCHEMA);
+ this.batchSize = config.getInt(AmazondynamodbConfig.BATCH_SIZE);
+ this.batchIntervalMs = config.getInt(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS);
}
}
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
index a770d0f4300..40ac9eeef34 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java
@@ -64,8 +64,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
amazondynamodbSourceOptions = new AmazondynamodbSourceOptions(pluginConfig);
- Config schema = amazondynamodbSourceOptions.getSchema();
- typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ typeInfo = SeaTunnelSchema.buildWithConfig(amazondynamodbSourceOptions.getSchema()).getSeaTunnelRowType();
}
@Override
From b26981ab5d5f2e8059aa62f4333be39ff616af00 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Thu, 3 Nov 2022 11:30:41 +0800
Subject: [PATCH 22/24] fix bug.
---
.../config/AmazondynamodbSourceOptions.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
index 10843fe1984..b38ec648dfb 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
@@ -45,9 +45,8 @@ public class AmazondynamodbSourceOptions implements Serializable {
private Config schema;
- private int batchSize;
-
- private int batchIntervalMs;
+ public int batchSize = DEFAULT_BATCH_SIZE;
+ public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
public AmazondynamodbSourceOptions(Config config) {
this.url = config.getString(AmazondynamodbConfig.URL);
@@ -56,7 +55,12 @@ public AmazondynamodbSourceOptions(Config config) {
this.secretAccessKey = config.getString(AmazondynamodbConfig.SECRET_ACCESS_KEY);
this.table = config.getString(AmazondynamodbConfig.TABLE);
this.schema = config.getConfig(CommonConfig.SCHEMA);
- this.batchSize = config.getInt(AmazondynamodbConfig.BATCH_SIZE);
- this.batchIntervalMs = config.getInt(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS);
+
+ if (config.hasPath(AmazondynamodbConfig.BATCH_SIZE)) {
+ this.batchSize = config.getInt(AmazondynamodbConfig.BATCH_SIZE);
+ }
+ if (config.hasPath(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS)) {
+ this.batchIntervalMs = config.getInt(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS);
+ }
}
}
From 1e6e186d0bca4a198941192af969f7f5538065f8 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Thu, 3 Nov 2022 11:40:35 +0800
Subject: [PATCH 23/24] fix bug.
---
.../amazondynamodb/config/AmazondynamodbSourceOptions.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
index b38ec648dfb..0d0eec4a4b1 100644
--- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
+++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java
@@ -54,8 +54,10 @@ public AmazondynamodbSourceOptions(Config config) {
this.accessKeyId = config.getString(AmazondynamodbConfig.ACCESS_KEY_ID);
this.secretAccessKey = config.getString(AmazondynamodbConfig.SECRET_ACCESS_KEY);
this.table = config.getString(AmazondynamodbConfig.TABLE);
- this.schema = config.getConfig(CommonConfig.SCHEMA);
+ if (config.hasPath(CommonConfig.SCHEMA)) {
+ this.schema = config.getConfig(CommonConfig.SCHEMA);
+ }
if (config.hasPath(AmazondynamodbConfig.BATCH_SIZE)) {
this.batchSize = config.getInt(AmazondynamodbConfig.BATCH_SIZE);
}
From fca481e130433c55964c91faecdb811f481adfb0 Mon Sep 17 00:00:00 2001
From: liugddx <804167098@qq.com>
Date: Thu, 3 Nov 2022 23:05:21 +0800
Subject: [PATCH 24/24] fix ci error
---
tools/update_modules_check/update_modules_check.py | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/tools/update_modules_check/update_modules_check.py b/tools/update_modules_check/update_modules_check.py
index 59aaccfee66..0600b82f3db 100644
--- a/tools/update_modules_check/update_modules_check.py
+++ b/tools/update_modules_check/update_modules_check.py
@@ -40,13 +40,13 @@ def get_modules(files, index, start_pre, root_module):
update_files = json.loads(files)
modules_name_set = set([])
for file in update_files:
- module_name = file.split('/')[index]
+ names = file.split('/')
+ module_name = names[index]
if module_name.startswith(start_pre):
modules_name_set.add(module_name)
- sub_module_name = file.split('/')[index + 1]
- if sub_module_name.startswith(start_pre):
- modules_name_set.add(sub_module_name)
+ if len(names) > index + 1 and names[index + 1].startswith(start_pre):
+ modules_name_set.add(names[index + 1])
output_module = ""
if len(modules_name_set) > 0:
@@ -135,4 +135,4 @@ def main(argv):
get_sub_modules(argv[2])
if __name__ == "__main__":
- main(sys.argv)
\ No newline at end of file
+ main(sys.argv)