From 016acdd8e19f8696a397dc3480c7ab33fdb612b3 Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Sat, 22 Oct 2022 21:40:31 +0800 Subject: [PATCH 01/18] [Feature][Connector-V2]My Hours Source Connector --- docs/en/connector-v2/source/MyHours.md | 157 ++++++++++++++++++ .../seatunnel/http/config/HttpParameter.java | 18 +- .../seatunnel/http/source/HttpSource.java | 4 + .../connector-http-myhours/pom.xml | 22 +++ .../myhours/source/MyHoursSource.java | 102 ++++++++++++ .../source/config/MyHoursSourceConfig.java | 44 +++++ .../source/config/MyHoursSourceParameter.java | 101 +++++++++++ .../connector-http/pom.xml | 1 + 8 files changed, 440 insertions(+), 9 deletions(-) create mode 100644 docs/en/connector-v2/source/MyHours.md create mode 100644 seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml create mode 100644 seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java create mode 100644 seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md new file mode 100644 index 00000000000..e93e896587a --- /dev/null +++ b/docs/en/connector-v2/source/MyHours.md @@ -0,0 +1,157 @@ +# My Hours + +> My Hours source connector + +## Description + +Used to read data from My Hours. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [schema projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +| --------------------------- | ------ | -------- | ------------- | +| email | String | Yes | - | +| password | String | Yes | - | +| projects | String | No | - | +| users | String | No | - | +| schema.fields | Config | No | - | +| format | String | No | json | +| retry | int | No | - | +| retry_backoff_multiplier_ms | int | No | 100 | +| retry_backoff_max_ms | int | No | 10000 | +| common-options | | No | - | +### email [String] + +email for login + +### password [String] + +password for login + +### projects [String] + +Get the information on your Projects + +Projects can be configured as `all` or `active` + +If `all` is configured, all projects will be queried + +If configured as `active`, only the currently incomplete projects will be displayed + +### users [String] + +Get the users information on your account + +Users can be configured as `member` or `client` + +If `all` is configured, list all team members in the account + +If configured as `active`, list all clients in the account + +### retry [int] + +The max retry times if request http return to `IOException` + +### retry_backoff_multiplier_ms [int] + +The retry-backoff times(millis) multiplier if request http failed + +### retry_backoff_max_ms [int] + +The maximum retry-backoff times(millis) if request http failed + +### format [String] + +the format of upstream data, now only support `json` `text`, default `json`. + +when you assign format is `json`, you should also assign schema option, for example: + +upstream data is the following: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +you should assign schema as the following: + +```hocon + +schema { + fields { + code = int + data = string + success = boolean + } +} + +``` + +connector will generate data as the following: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +when you assign format is `text`, connector will do nothing for upstream data, for example: + +upstream data is the following: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +connector will generate data as the following: + +| content | +|---------| +| {"code": 200, "data": "get success", "success": true} | + +### schema [Config] + +#### fields [Config] + +the schema fields of upstream data + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details + +## Example + +simple: + +```hocon +MyHours{ + email = "seatunnel@test.com" + password = "seatunnel" + projects = "active" + schema { + fields { + clientId = string + clientName = string + name = string + archived = boolean + id = int + } + } +} +``` + +## Changelog + +### 2.2.0-beta 2022-10-22 + +- Add My Hours Source Connector diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java index f2f1b33cc60..aaccb57ac5b 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java @@ -28,15 +28,15 @@ @Data @SuppressWarnings("MagicNumber") public class HttpParameter implements Serializable { - private String url; - private String method; - private Map headers; - private Map params; - private String body; - private int pollIntervalMillis; - private int retry; - private int retryBackoffMultiplierMillis = 100; - private int retryBackoffMaxMillis = 10000; + protected String url; + protected String method; + protected Map headers; + protected Map params; + protected String body; + protected int pollIntervalMillis; + protected int retry; + protected int retryBackoffMultiplierMillis = 100; + protected int retryBackoffMaxMillis = 10000; public void buildWithConfig(Config pluginConfig) { // set url diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java index 9887f7c71ed..8fbb9bff2f0 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java @@ -65,6 +65,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } this.httpParameter.buildWithConfig(pluginConfig); + buildSchemaWithConfig(pluginConfig); + } + + protected void buildSchemaWithConfig(Config pluginConfig) { if (pluginConfig.hasPath(HttpConfig.SCHEMA)) { Config schema = pluginConfig.getConfig(HttpConfig.SCHEMA); this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType(); diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml b/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml new file mode 100644 index 00000000000..d31e5e6f83c --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml @@ -0,0 +1,22 @@ + + + + connector-http + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-http-myhours + + + + org.apache.seatunnel + connector-http-base + ${project.version} + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java new file mode 100644 index 00000000000..e278557c3d3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.myhours.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader; +import org.apache.seatunnel.connectors.seatunnel.myhours.source.config.MyHoursSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.myhours.source.config.MyHoursSourceParameter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.auto.service.AutoService; +import com.google.common.base.Strings; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +@Slf4j +@AutoService(SeaTunnelSource.class) +public class MyHoursSource extends HttpSource { + private final MyHoursSourceParameter myHoursSourceParameter = new MyHoursSourceParameter(); + @Override + public String getPluginName() { + return "MyHours"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, MyHoursSourceConfig.EMAIL, MyHoursSourceConfig.PASSWORD); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + //Login to get accessToken + String accessToken = null; + try { + accessToken = getAccessToken(pluginConfig); + } catch (IOException e) { + e.printStackTrace(); + } + this.myHoursSourceParameter.buildWithConfig(pluginConfig, accessToken); + buildSchemaWithConfig(pluginConfig); + } + + @Override + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { + return new HttpSourceReader(this.myHoursSourceParameter, readerContext, this.deserializationSchema); + } + + private String getAccessToken(Config pluginConfig) throws IOException { + MyHoursSourceParameter myHoursLoginParameter = new MyHoursSourceParameter(); + myHoursLoginParameter.buildWithLoginConfig(pluginConfig); + HttpClientProvider loginHttpClient = new HttpClientProvider(myHoursLoginParameter); + try { + HttpResponse response = loginHttpClient.doPost(myHoursLoginParameter.getUrl(), myHoursLoginParameter.getBody()); + if (HttpResponse.STATUS_OK == response.getCode()) { + String content = response.getContent(); + if (!Strings.isNullOrEmpty(content)) { + ObjectMapper om = new ObjectMapper(); + Map contentMap = om.readValue(content, Map.class); + return contentMap.get(MyHoursSourceConfig.ACCESSTOKEN); + } + } + log.error("login http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent()); + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + if (Objects.nonNull(loginHttpClient)) { + loginHttpClient.close(); + } + } + return ""; + } + +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java new file mode 100644 index 00000000000..e06115b6b86 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.myhours.source.config; + +import lombok.Data; + +@Data +public class MyHoursSourceConfig { + public static final String POST = "POST"; + public static final String EMAIL = "email"; + public static final String PASSWORD = "password"; + public static final String USERS = "users"; + public static final String PROJECTS = "projects"; + public static final String ALL = "all"; + public static final String ACTIVE = "active"; + public static final String MEMBER = "member"; + public static final String CLIENT = "client"; + public static final String GRANTTYPE = "grantType"; + public static final String CLIENTID = "clientId"; + public static final String API = "api"; + public static final String AUTHORIZATION = "Authorization"; + public static final String ACCESSTOKEN = "accessToken"; + public static final String ACCESSTOKEN_PREFIX = "Bearer"; + public static final String AUTHORIZATION_URL = "https://api2.myhours.com/api/tokens/login"; + public static final String ALL_PROJECTS_URL = "https://api2.myhours.com/api/Projects/getAll"; + public static final String ACTIVE_PROJECTS_URL = "https://api2.myhours.com/api/Projects"; + public static final String ALL_MEMBERS_URL = "https://api2.myhours.com/api/Users/getAll"; + public static final String ALL_CLIENTS_URL = "https://api2.myhours.com/api/Clients"; +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java new file mode 100644 index 00000000000..84d829083f6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.myhours.source.config; + +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; + +import java.util.HashMap; +import java.util.Map; + +@Data +@SuppressWarnings("MagicNumber") +public class MyHoursSourceParameter extends HttpParameter { + + public void buildWithConfig(Config pluginConfig, String accessToken) { + // set url + if (pluginConfig.hasPath(MyHoursSourceConfig.PROJECTS)) { + String projects = pluginConfig.getString(MyHoursSourceConfig.PROJECTS); + if (projects.equals(MyHoursSourceConfig.ALL)) { + this.setUrl(MyHoursSourceConfig.ALL_PROJECTS_URL); + } + else { + this.setUrl(MyHoursSourceConfig.ACTIVE_PROJECTS_URL); + } + } + else { + String users = pluginConfig.getString(MyHoursSourceConfig.USERS); + if (users.equals(MyHoursSourceConfig.MEMBER)) { + this.setUrl(MyHoursSourceConfig.ALL_MEMBERS_URL); + } + else { + this.setUrl(MyHoursSourceConfig.ALL_CLIENTS_URL); + } + } + // set method + this.setMethod(HttpConfig.METHOD_DEFAULT_VALUE); + // set headers + this.headers = new HashMap<>(); + this.headers.put(MyHoursSourceConfig.AUTHORIZATION, MyHoursSourceConfig.ACCESSTOKEN_PREFIX + " " + accessToken); + this.setHeaders(headers); + // set retry + if (pluginConfig.hasPath(HttpConfig.RETRY)) { + this.setRetry(pluginConfig.getInt(HttpConfig.RETRY)); + if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) { + this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)); + } + if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) { + this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS)); + } + } + } + + public void buildWithLoginConfig(Config pluginConfig) throws JsonProcessingException { + // set url + this.setUrl(MyHoursSourceConfig.AUTHORIZATION_URL); + // set method + this.setMethod(MyHoursSourceConfig.POST); + // set body + Map bodyParams = new HashMap(); + String email = pluginConfig.getString(MyHoursSourceConfig.EMAIL); + String password = pluginConfig.getString(MyHoursSourceConfig.PASSWORD); + bodyParams.put(MyHoursSourceConfig.GRANTTYPE, MyHoursSourceConfig.PASSWORD); + bodyParams.put(MyHoursSourceConfig.EMAIL, email); + bodyParams.put(MyHoursSourceConfig.PASSWORD, password); + bodyParams.put(MyHoursSourceConfig.CLIENTID, MyHoursSourceConfig.API); + ObjectMapper om = new ObjectMapper(); + String body = om.writeValueAsString(bodyParams); + this.setBody(body); + // set retry + if (pluginConfig.hasPath(HttpConfig.RETRY)) { + this.setRetry(pluginConfig.getInt(HttpConfig.RETRY)); + if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) { + this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)); + } + if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) { + this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS)); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-http/pom.xml b/seatunnel-connectors-v2/connector-http/pom.xml index 5b699bef0d3..7e61374d956 100644 --- a/seatunnel-connectors-v2/connector-http/pom.xml +++ b/seatunnel-connectors-v2/connector-http/pom.xml @@ -33,6 +33,7 @@ connector-http-base connector-http-feishu connector-http-wechat + connector-http-myhours \ No newline at end of file From fbbadb3fb64c1d4c540941774fa9dadfe716d995 Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Sat, 22 Oct 2022 22:04:20 +0800 Subject: [PATCH 02/18] update --- .../connector-http-myhours/pom.xml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml b/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml index d31e5e6f83c..c4d9d61c29e 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml @@ -1,4 +1,22 @@ + From 74cfb9fb22b5cd40923f6b1a36ebb00e2d256756 Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Sat, 22 Oct 2022 22:50:09 +0800 Subject: [PATCH 03/18] update --- docs/en/connector-v2/source/MyHours.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md index e93e896587a..ff4864f4c65 100644 --- a/docs/en/connector-v2/source/MyHours.md +++ b/docs/en/connector-v2/source/MyHours.md @@ -29,6 +29,7 @@ Used to read data from My Hours. | retry_backoff_multiplier_ms | int | No | 100 | | retry_backoff_max_ms | int | No | 10000 | | common-options | | No | - | + ### email [String] email for login From 34599d96cfea439ee6dc3b4cfd5388f726c5f344 Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Sun, 23 Oct 2022 21:35:16 +0800 Subject: [PATCH 04/18] update --- seatunnel-dist/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 4c602b04ee1..a45f9e1e32b 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -146,6 +146,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-http-myhours + ${project.version} + provided + org.apache.seatunnel connector-jdbc From 19dee803dae2c56c5f09871ff3edbb7be52e5f68 Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Sun, 23 Oct 2022 22:19:41 +0800 Subject: [PATCH 05/18] update --- docs/en/Connector-v2-release-state.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md index dca04b53edb..7e2bb2c1ad9 100644 --- a/docs/en/Connector-v2-release-state.md +++ b/docs/en/Connector-v2-release-state.md @@ -9,8 +9,8 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex ## Connector V2 Health -| Connector Name | Type | Status | Support Version | -|---------------------------------------------------|--------|--------|-----------------| +| Connector Name | Type | Status | Support Version | +| ----------------------------------------------------------- | ------ | ------ | --------------- | | [Asset](connector-v2/sink/Assert.md) | Sink | Beta | 2.2.0-beta | | [ClickHouse](connector-v2/source/Clickhouse.md) | Source | Beta | 2.2.0-beta | | [ClickHouse](connector-v2/sink/Clickhouse.md) | Sink | Beta | 2.2.0-beta | @@ -44,6 +44,7 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex | [LocalFile](connector-v2/source/LocalFile.md) | Source | Beta | 2.2.0-beta | | [MongoDB](connector-v2/source/MongoDB.md) | Source | Beta | 2.2.0-beta | | [MongoDB](connector-v2/sink/MongoDB.md) | Sink | Beta | 2.2.0-beta | +| [MyHours](connector-v2/source/MyHours.md) | Source | Alpha | 2.2.0-beta | | [Neo4j](connector-v2/sink/Neo4j.md) | Sink | Alpha | 2.2.0-beta | | [OssFile](connector-v2/sink/OssFile.md) | Sink | Alpha | 2.2.0-beta | | [OssFile](connector-v2/source/OssFile.md) | Source | Beta | 2.2.0-beta | From 8c75f9894bfcbf18901badea28b645278e025ced Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Tue, 25 Oct 2022 00:21:45 +0800 Subject: [PATCH 06/18] Remove useless annotation --- .../seatunnel/myhours/source/MyHoursSource.java | 14 +++++++------- .../myhours/source/config/MyHoursSourceConfig.java | 3 --- .../source/config/MyHoursSourceParameter.java | 9 ++------- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java index e278557c3d3..36914df697e 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider; @@ -34,7 +35,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.service.AutoService; import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; @@ -74,7 +74,7 @@ public AbstractSingleSplitReader createReader(SingleSplitReaderCon return new HttpSourceReader(this.myHoursSourceParameter, readerContext, this.deserializationSchema); } - private String getAccessToken(Config pluginConfig) throws IOException { + private String getAccessToken(Config pluginConfig) throws IOException, RuntimeException { MyHoursSourceParameter myHoursLoginParameter = new MyHoursSourceParameter(); myHoursLoginParameter.buildWithLoginConfig(pluginConfig); HttpClientProvider loginHttpClient = new HttpClientProvider(myHoursLoginParameter); @@ -83,20 +83,20 @@ private String getAccessToken(Config pluginConfig) throws IOException { if (HttpResponse.STATUS_OK == response.getCode()) { String content = response.getContent(); if (!Strings.isNullOrEmpty(content)) { - ObjectMapper om = new ObjectMapper(); - Map contentMap = om.readValue(content, Map.class); + Map contentMap = JsonUtils.toMap(content); return contentMap.get(MyHoursSourceConfig.ACCESSTOKEN); } } - log.error("login http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent()); + throw new RuntimeException(String.format("login http client execute exception, http response status code:[%d], content:[%s]", + response.getCode(), + response.getContent())); } catch (Exception e) { - log.error(e.getMessage(), e); + throw new RuntimeException("login http client execute exception"); } finally { if (Objects.nonNull(loginHttpClient)) { loginHttpClient.close(); } } - return ""; } } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java index e06115b6b86..d781135f801 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java @@ -17,9 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.myhours.source.config; -import lombok.Data; - -@Data public class MyHoursSourceConfig { public static final String POST = "POST"; public static final String EMAIL = "email"; diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java index 84d829083f6..b692b0c4389 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java @@ -17,22 +17,18 @@ package org.apache.seatunnel.connectors.seatunnel.myhours.source.config; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; import org.apache.seatunnel.shade.com.typesafe.config.Config; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.Data; import java.util.HashMap; import java.util.Map; -@Data -@SuppressWarnings("MagicNumber") public class MyHoursSourceParameter extends HttpParameter { - public void buildWithConfig(Config pluginConfig, String accessToken) { // set url if (pluginConfig.hasPath(MyHoursSourceConfig.PROJECTS)) { @@ -84,8 +80,7 @@ public void buildWithLoginConfig(Config pluginConfig) throws JsonProcessingExcep bodyParams.put(MyHoursSourceConfig.EMAIL, email); bodyParams.put(MyHoursSourceConfig.PASSWORD, password); bodyParams.put(MyHoursSourceConfig.CLIENTID, MyHoursSourceConfig.API); - ObjectMapper om = new ObjectMapper(); - String body = om.writeValueAsString(bodyParams); + String body = JsonUtils.toJsonString(bodyParams); this.setBody(body); // set retry if (pluginConfig.hasPath(HttpConfig.RETRY)) { From 84f8638d1386d935aa6bbc6a97ce190424321e84 Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Wed, 26 Oct 2022 00:37:28 +0800 Subject: [PATCH 07/18] [Feature][Connector-V2][My Hours]Add My Hours Source Connector With URL --- .../http/client/HttpClientProvider.java | 29 +++++++++++++- .../http/source/HttpSourceReader.java | 2 +- .../source/config/MyHoursSourceConfig.java | 10 ----- .../source/config/MyHoursSourceParameter.java | 40 +++++++++---------- 4 files changed, 48 insertions(+), 33 deletions(-) diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java index 0eae378722f..67743428197 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java @@ -100,11 +100,11 @@ public void onRetry(Attempt attempt) { .build(); } - public HttpResponse execute(String url, String method, Map headers, Map params) throws Exception { + public HttpResponse execute(String url, String method, Map headers, Map params, String body) throws Exception { // convert method option to uppercase method = method.toUpperCase(Locale.ROOT); if (HttpPost.METHOD_NAME.equals(method)) { - return doPost(url, headers, params); + return doPost(url, headers, params, body); } if (HttpGet.METHOD_NAME.equals(method)) { return doGet(url, headers, params); @@ -243,6 +243,31 @@ public HttpResponse doPost(String url, Map headers, String body) return getResponse(httpPost); } + /** + * Send a post request with request headers , request parameters and request body + * + * @param url request address + * @param headers request header map + * @param params request parameter map + * @param body request body + * @return http response result + * @throws Exception information + */ + public HttpResponse doPost(String url, Map headers, Map params, String body) throws Exception { + // create a new http get + HttpPost httpPost = new HttpPost(url); + // set default request config + httpPost.setConfig(REQUEST_CONFIG); + // set request header + addHeaders(httpPost, headers); + // set request params + addParameters(httpPost, params); + // add body in request + addBody(httpPost, body); + // return http response + return getResponse(httpPost); + } + /** * Send a put request without request parameters * diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java index 1582c644238..5bacae516b8 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java @@ -61,7 +61,7 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { try { - HttpResponse response = httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams()); + HttpResponse response = httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams(), this.httpParameter.getBody()); if (HttpResponse.STATUS_OK == response.getCode()) { String content = response.getContent(); if (!Strings.isNullOrEmpty(content)) { diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java index d781135f801..255ce863dd0 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java @@ -21,12 +21,6 @@ public class MyHoursSourceConfig { public static final String POST = "POST"; public static final String EMAIL = "email"; public static final String PASSWORD = "password"; - public static final String USERS = "users"; - public static final String PROJECTS = "projects"; - public static final String ALL = "all"; - public static final String ACTIVE = "active"; - public static final String MEMBER = "member"; - public static final String CLIENT = "client"; public static final String GRANTTYPE = "grantType"; public static final String CLIENTID = "clientId"; public static final String API = "api"; @@ -34,8 +28,4 @@ public class MyHoursSourceConfig { public static final String ACCESSTOKEN = "accessToken"; public static final String ACCESSTOKEN_PREFIX = "Bearer"; public static final String AUTHORIZATION_URL = "https://api2.myhours.com/api/tokens/login"; - public static final String ALL_PROJECTS_URL = "https://api2.myhours.com/api/Projects/getAll"; - public static final String ACTIVE_PROJECTS_URL = "https://api2.myhours.com/api/Projects"; - public static final String ALL_MEMBERS_URL = "https://api2.myhours.com/api/Users/getAll"; - public static final String ALL_CLIENTS_URL = "https://api2.myhours.com/api/Clients"; } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java index b692b0c4389..bef1a72226a 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java @@ -27,35 +27,35 @@ import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; public class MyHoursSourceParameter extends HttpParameter { public void buildWithConfig(Config pluginConfig, String accessToken) { // set url - if (pluginConfig.hasPath(MyHoursSourceConfig.PROJECTS)) { - String projects = pluginConfig.getString(MyHoursSourceConfig.PROJECTS); - if (projects.equals(MyHoursSourceConfig.ALL)) { - this.setUrl(MyHoursSourceConfig.ALL_PROJECTS_URL); - } - else { - this.setUrl(MyHoursSourceConfig.ACTIVE_PROJECTS_URL); - } - } - else { - String users = pluginConfig.getString(MyHoursSourceConfig.USERS); - if (users.equals(MyHoursSourceConfig.MEMBER)) { - this.setUrl(MyHoursSourceConfig.ALL_MEMBERS_URL); - } - else { - this.setUrl(MyHoursSourceConfig.ALL_CLIENTS_URL); - } - } + this.setUrl(pluginConfig.getString(HttpConfig.URL)); // set method - this.setMethod(HttpConfig.METHOD_DEFAULT_VALUE); + if (pluginConfig.hasPath(HttpConfig.METHOD)) { + this.setMethod(pluginConfig.getString(HttpConfig.METHOD)); + } else { + this.setMethod(HttpConfig.METHOD_DEFAULT_VALUE); + } // set headers this.headers = new HashMap<>(); this.headers.put(MyHoursSourceConfig.AUTHORIZATION, MyHoursSourceConfig.ACCESSTOKEN_PREFIX + " " + accessToken); this.setHeaders(headers); - // set retry + // set params + if (pluginConfig.hasPath(HttpConfig.PARAMS)) { + this.setParams(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2))); + } + // set body + if (pluginConfig.hasPath(HttpConfig.BODY)) { + this.setBody(pluginConfig.getString(HttpConfig.BODY)); + } + //set poll_interval_ms + if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS)) { + this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS)); + } + //set retry if (pluginConfig.hasPath(HttpConfig.RETRY)) { this.setRetry(pluginConfig.getInt(HttpConfig.RETRY)); if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) { From 8714a462fd3c932c4b7defb5d6c6c08f0cd2b85a Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Wed, 26 Oct 2022 00:54:02 +0800 Subject: [PATCH 08/18] [Feature][Connector-V2][My Hours]Add My Hours Source Connector With URL --- docs/en/connector-v2/source/MyHours.md | 37 +++++++++----------------- plugin-mapping.properties | 3 ++- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md index ff4864f4c65..fde0adb8f64 100644 --- a/docs/en/connector-v2/source/MyHours.md +++ b/docs/en/connector-v2/source/MyHours.md @@ -21,10 +21,12 @@ Used to read data from My Hours. | --------------------------- | ------ | -------- | ------------- | | email | String | Yes | - | | password | String | Yes | - | -| projects | String | No | - | -| users | String | No | - | +| method | String | No | get | | schema.fields | Config | No | - | | format | String | No | json | +| params | Map | No | - | +| body | String | No | - | +| poll_interval_ms | int | No | - | | retry | int | No | - | | retry_backoff_multiplier_ms | int | No | 100 | | retry_backoff_max_ms | int | No | 10000 | @@ -38,25 +40,21 @@ email for login password for login -### projects [String] +### method [String] -Get the information on your Projects +http request method, only supports GET, POST method -Projects can be configured as `all` or `active` +### params [Map] -If `all` is configured, all projects will be queried +http params -If configured as `active`, only the currently incomplete projects will be displayed +### body [String] -### users [String] +http body -Get the users information on your account +### poll_interval_ms [int] -Users can be configured as `member` or `client` - -If `all` is configured, list all team members in the account - -If configured as `active`, list all clients in the account +request http api interval(millis) in stream mode ### retry [int] @@ -138,16 +136,7 @@ simple: MyHours{ email = "seatunnel@test.com" password = "seatunnel" - projects = "active" - schema { - fields { - clientId = string - clientName = string - name = string - archived = boolean - id = int - } - } + url = "https://api2.myhours.com/api/Projects/getAll" } ``` diff --git a/plugin-mapping.properties b/plugin-mapping.properties index e022fa59b1b..e6ae9e1e794 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -135,4 +135,5 @@ seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg seatunnel.source.InfluxDB = connector-influxdb seatunnel.source.S3File = connector-file-s3 -seatunnel.sink.S3File = connector-file-s3 \ No newline at end of file +seatunnel.sink.S3File = connector-file-s3 +seatunnel.source.MyHours = connector-http-myhours \ No newline at end of file From bb9418efeb343ec95ca3eafa08ff53bee5338003 Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Sat, 29 Oct 2022 18:21:34 +0800 Subject: [PATCH 09/18] [Feature][Connector-V2][My Hours]Change release log --- docs/en/Connector-v2-release-state.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md index 57922becd76..a16a2f8e840 100644 --- a/docs/en/Connector-v2-release-state.md +++ b/docs/en/Connector-v2-release-state.md @@ -9,8 +9,8 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex ## Connector V2 Health -| Connector Name | Type | Status | Support Version | -|---------------------------------------------------|--------|--------|-----------------| +| Connector Name | Type | Status | Support Version | +|-------------------------------------------------------------|--------|--------|-----------------| | [Asset](connector-v2/sink/Assert.md) | Sink | Beta | 2.2.0-beta | | [ClickHouse](connector-v2/source/Clickhouse.md) | Source | Beta | 2.2.0-beta | | [ClickHouse](connector-v2/sink/Clickhouse.md) | Sink | Beta | 2.2.0-beta | @@ -22,13 +22,13 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex | [Email](connector-v2/sink/Email.md) | Sink | Alpha | 2.2.0-beta | | [Enterprise WeChat](connector-v2/sink/Enterprise-WeChat.md) | Sink | Alpha | 2.2.0-beta | | [FeiShu](connector-v2/sink/Feishu.md) | Sink | Alpha | 2.2.0-beta | -| [Fake](connector-v2/source/FakeSource.md) | Source | Alpha | 2.2.0-beta | +| [Fake](connector-v2/source/FakeSource.md) | Source | Beta | 2.2.0-beta | | [FtpFile](connector-v2/sink/FtpFile.md) | Sink | Alpha | 2.2.0-beta | | [Greenplum](connector-v2/sink/Greenplum.md) | Sink | Alpha | 2.2.0-beta | | [Greenplum](connector-v2/source/Greenplum.md) | Source | Alpha | 2.2.0-beta | | [HdfsFile](connector-v2/sink/HdfsFile.md) | Sink | Beta | 2.2.0-beta | | [HdfsFile](connector-v2/source/HdfsFile.md) | Source | Beta | 2.2.0-beta | -| [Hive](connector-v2/sink/Hive.md) | Sink | Alpha | 2.2.0-beta | +| [Hive](connector-v2/sink/Hive.md) | Sink | Beta | 2.2.0-beta | | [Hive](connector-v2/source/Hive.md) | Source | Beta | 2.2.0-beta | | [Http](connector-v2/sink/Http.md) | Sink | Beta | 2.2.0-beta | | [Http](connector-v2/source/Http.md) | Source | Beta | 2.2.0-beta | From d8a498554ad8132aecb6297f30efab274597674e Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Sun, 30 Oct 2022 08:56:59 +0800 Subject: [PATCH 10/18] [Feature][Connector-V2][My Hours]Add fields for example --- docs/en/connector-v2/source/MyHours.md | 38 +++++++++++++++++-- .../myhours/source/MyHoursSource.java | 2 +- .../source/config/MyHoursSourceConfig.java | 1 + 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md index fde0adb8f64..5ecd552101c 100644 --- a/docs/en/connector-v2/source/MyHours.md +++ b/docs/en/connector-v2/source/MyHours.md @@ -19,6 +19,7 @@ Used to read data from My Hours. | name | type | required | default value | | --------------------------- | ------ | -------- | ------------- | +| url | String | Yes | - | | email | String | Yes | - | | password | String | Yes | - | | method | String | No | get | @@ -32,6 +33,10 @@ Used to read data from My Hours. | retry_backoff_max_ms | int | No | 10000 | | common-options | | No | - | +### url [String] + +http request url + ### email [String] email for login @@ -130,18 +135,43 @@ Source plugin common parameters, please refer to [Source Common Options](common- ## Example -simple: - ```hocon MyHours{ + url = "https://api2.myhours.com/api/Projects/getAll" email = "seatunnel@test.com" password = "seatunnel" - url = "https://api2.myhours.com/api/Projects/getAll" + schema { + fields { + name = string + archived = boolean + dateArchived = string + dateCreated = string + clientName = string + budgetAlertPercent = string + budgetType = int + totalTimeLogged = double + budgetValue = double + totalAmount = double + totalExpense = double + laborCost = double + totalCost = double + billableTimeLogged = double + totalBillableAmount = double + billable = boolean + roundType = int + roundInterval = int + budgetSpentPercentage = double + budgetTarget = int + budgetPeriodType = string + budgetSpent = string + id = string + } + } } ``` ## Changelog -### 2.2.0-beta 2022-10-22 +### next version - Add My Hours Source Connector diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java index 36914df697e..0bc44aa626b 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java @@ -54,7 +54,7 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, MyHoursSourceConfig.EMAIL, MyHoursSourceConfig.PASSWORD); + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, MyHoursSourceConfig.URL, MyHoursSourceConfig.EMAIL, MyHoursSourceConfig.PASSWORD); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java index 255ce863dd0..32466a76152 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.myhours.source.config; public class MyHoursSourceConfig { + public static final String URL = "url"; public static final String POST = "POST"; public static final String EMAIL = "email"; public static final String PASSWORD = "password"; From 2216b8cb15e14a0cfa51b2ef8e206dc28f4e312f Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Tue, 1 Nov 2022 14:01:24 +0800 Subject: [PATCH 11/18] [Feature][Connector-V2][My Hours]Fix doc format --- docs/en/connector-v2/source/MyHours.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md index 5ecd552101c..d81ffa05272 100644 --- a/docs/en/connector-v2/source/MyHours.md +++ b/docs/en/connector-v2/source/MyHours.md @@ -140,7 +140,7 @@ MyHours{ url = "https://api2.myhours.com/api/Projects/getAll" email = "seatunnel@test.com" password = "seatunnel" - schema { + schema { fields { name = string archived = boolean From f58958e7824b6fc5fc0fae49a8d22c0fbe6752a2 Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Thu, 3 Nov 2022 00:28:56 +0800 Subject: [PATCH 12/18] [Feature][Connector-V2][My Hours]Delete stream feature --- docs/en/connector-v2/source/MyHours.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md index d81ffa05272..71578aca691 100644 --- a/docs/en/connector-v2/source/MyHours.md +++ b/docs/en/connector-v2/source/MyHours.md @@ -9,7 +9,7 @@ Used to read data from My Hours. ## Key features - [x] [batch](../../concept/connector-v2-features.md) -- [x] [stream](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) - [ ] [exactly-once](../../concept/connector-v2-features.md) - [x] [schema projection](../../concept/connector-v2-features.md) - [ ] [parallelism](../../concept/connector-v2-features.md) From d586d4526f6383f058b01556ba09549a67b16c2d Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Fri, 4 Nov 2022 11:48:57 +0800 Subject: [PATCH 13/18] [Feature][Connector-V2][My Hours]Add pugin_config --- config/plugin_config | 1 + .../connectors/seatunnel/myhours/source/MyHoursSource.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/config/plugin_config b/config/plugin_config index 71aec509502..b59e60ff064 100644 --- a/config/plugin_config +++ b/config/plugin_config @@ -68,6 +68,7 @@ connector-assert connector-kafka connector-http-base connector-http-feishu +connector-http-myhours connector-hive connector-clickhouse connector-jdbc diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java index 0bc44aa626b..4bfe4a327d5 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java @@ -63,7 +63,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { try { accessToken = getAccessToken(pluginConfig); } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage(), e); } this.myHoursSourceParameter.buildWithConfig(pluginConfig, accessToken); buildSchemaWithConfig(pluginConfig); From c9cd88e226a9f5a6a4a2bba45b531ffe55d3678c Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Fri, 4 Nov 2022 13:43:49 +0800 Subject: [PATCH 14/18] [Feature][Connector-V2][My Hours]Delete myhours in plugin_config --- config/plugin_config | 1 - 1 file changed, 1 deletion(-) diff --git a/config/plugin_config b/config/plugin_config index b59e60ff064..71aec509502 100644 --- a/config/plugin_config +++ b/config/plugin_config @@ -68,7 +68,6 @@ connector-assert connector-kafka connector-http-base connector-http-feishu -connector-http-myhours connector-hive connector-clickhouse connector-jdbc From 8d97c40eec749344183635eced8f43f031eeaafc Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Fri, 4 Nov 2022 22:26:36 +0800 Subject: [PATCH 15/18] [Feature][Connector-V2][My Hours]Fix format --- .../seatunnel/myhours/source/MyHoursSource.java | 15 ++++++++------- .../source/config/MyHoursSourceParameter.java | 4 +--- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java index 4bfe4a327d5..3a5d205de8c 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java @@ -46,6 +46,7 @@ @Slf4j @AutoService(SeaTunnelSource.class) public class MyHoursSource extends HttpSource { + private final MyHoursSourceParameter myHoursSourceParameter = new MyHoursSourceParameter(); @Override public String getPluginName() { @@ -60,11 +61,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { } //Login to get accessToken String accessToken = null; - try { - accessToken = getAccessToken(pluginConfig); - } catch (IOException e) { - log.error(e.getMessage(), e); - } + accessToken = getAccessToken(pluginConfig); this.myHoursSourceParameter.buildWithConfig(pluginConfig, accessToken); buildSchemaWithConfig(pluginConfig); } @@ -74,7 +71,7 @@ public AbstractSingleSplitReader createReader(SingleSplitReaderCon return new HttpSourceReader(this.myHoursSourceParameter, readerContext, this.deserializationSchema); } - private String getAccessToken(Config pluginConfig) throws IOException, RuntimeException { + private String getAccessToken(Config pluginConfig){ MyHoursSourceParameter myHoursLoginParameter = new MyHoursSourceParameter(); myHoursLoginParameter.buildWithLoginConfig(pluginConfig); HttpClientProvider loginHttpClient = new HttpClientProvider(myHoursLoginParameter); @@ -94,7 +91,11 @@ private String getAccessToken(Config pluginConfig) throws IOException, RuntimeEx throw new RuntimeException("login http client execute exception"); } finally { if (Objects.nonNull(loginHttpClient)) { - loginHttpClient.close(); + try { + loginHttpClient.close(); + } catch (IOException e) { + log.warn(e.getMessage(), e); + } } } } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java index bef1a72226a..7d0770cb2a4 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java @@ -23,8 +23,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import com.fasterxml.jackson.core.JsonProcessingException; - import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -67,7 +65,7 @@ public void buildWithConfig(Config pluginConfig, String accessToken) { } } - public void buildWithLoginConfig(Config pluginConfig) throws JsonProcessingException { + public void buildWithLoginConfig(Config pluginConfig) { // set url this.setUrl(MyHoursSourceConfig.AUTHORIZATION_URL); // set method From 555cd8749c96afb446bf201af596a53bda0ec03a Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Mon, 7 Nov 2022 19:48:40 +0800 Subject: [PATCH 16/18] [Feature][Connector-V2][My Hours]Update format --- .../connectors/seatunnel/myhours/source/MyHoursSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java index 3a5d205de8c..9d7e7a9375f 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java @@ -46,8 +46,8 @@ @Slf4j @AutoService(SeaTunnelSource.class) public class MyHoursSource extends HttpSource { - private final MyHoursSourceParameter myHoursSourceParameter = new MyHoursSourceParameter(); + @Override public String getPluginName() { return "MyHours"; From 986c37eae868fe527cebeaacc047182d311c1784 Mon Sep 17 00:00:00 2001 From: TaoZex <2633363995@qq.com> Date: Tue, 8 Nov 2022 17:57:56 +0800 Subject: [PATCH 17/18] [Feature][Connector-V2][My Hours]delete redundant code --- .../source/config/MyHoursSourceParameter.java | 38 ++----------------- 1 file changed, 4 insertions(+), 34 deletions(-) diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java index 7d0770cb2a4..40beedb2763 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java @@ -25,44 +25,14 @@ import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; public class MyHoursSourceParameter extends HttpParameter { public void buildWithConfig(Config pluginConfig, String accessToken) { - // set url - this.setUrl(pluginConfig.getString(HttpConfig.URL)); - // set method - if (pluginConfig.hasPath(HttpConfig.METHOD)) { - this.setMethod(pluginConfig.getString(HttpConfig.METHOD)); - } else { - this.setMethod(HttpConfig.METHOD_DEFAULT_VALUE); - } - // set headers - this.headers = new HashMap<>(); + super.buildWithConfig(pluginConfig); + //put authorization in headers + this.headers = this.getHeaders() == null ? new HashMap<>() : this.getHeaders(); this.headers.put(MyHoursSourceConfig.AUTHORIZATION, MyHoursSourceConfig.ACCESSTOKEN_PREFIX + " " + accessToken); - this.setHeaders(headers); - // set params - if (pluginConfig.hasPath(HttpConfig.PARAMS)) { - this.setParams(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2))); - } - // set body - if (pluginConfig.hasPath(HttpConfig.BODY)) { - this.setBody(pluginConfig.getString(HttpConfig.BODY)); - } - //set poll_interval_ms - if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS)) { - this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS)); - } - //set retry - if (pluginConfig.hasPath(HttpConfig.RETRY)) { - this.setRetry(pluginConfig.getInt(HttpConfig.RETRY)); - if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) { - this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)); - } - if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) { - this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS)); - } - } + this.setHeaders(this.headers); } public void buildWithLoginConfig(Config pluginConfig) { From ab85b1eaf0b264423c80c3c4bdbd5a5a9de7c90e Mon Sep 17 00:00:00 2001 From: Eric Date: Tue, 8 Nov 2022 19:23:18 +0800 Subject: [PATCH 18/18] Update docs/en/connector-v2/source/MyHours.md --- docs/en/connector-v2/source/MyHours.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md index 71578aca691..c5ca3268c83 100644 --- a/docs/en/connector-v2/source/MyHours.md +++ b/docs/en/connector-v2/source/MyHours.md @@ -31,7 +31,7 @@ Used to read data from My Hours. | retry | int | No | - | | retry_backoff_multiplier_ms | int | No | 100 | | retry_backoff_max_ms | int | No | 10000 | -| common-options | | No | - | +| common-options | config | No | - | ### url [String]