diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md index e30979da596..d509b952eb8 100644 --- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md +++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md @@ -46,3 +46,10 @@ This document records some common error codes and corresponding solutions of Sea | CASSANDRA-03 | Close cql session of cassandra failed | When users encounter this error code, it means that cassandra has some problems, please check it whether is work | | CASSANDRA-04 | No data in source table | When users encounter this error code, it means that source cassandra table has no data, please check it | | CASSANDRA-05 | Parse ip address from string field field | When users encounter this error code, it means that upstream data does not match ip address format, please check it | + +## Slack Connector Error Codes + +| code | description | solution | +|-----------|---------------------------------------------|--------------------------------------------------------------------------------------------------------------------| +| SLACK-01 | Conversation can not be founded in channels | When users encounter this error code, it means that the channel is not existed in slack workspace, please check it | +| SLACK-02 | Write to slack channel failed | When users encounter this error code, it means that slack has some problems, please check it whether is work | \ No newline at end of file diff --git a/docs/en/connector-v2/sink/Slack.md b/docs/en/connector-v2/sink/Slack.md new file mode 100644 index 00000000000..6f011bfc362 --- /dev/null +++ b/docs/en/connector-v2/sink/Slack.md @@ -0,0 +1,57 @@ +# Slack + +> Slack sink connector + +## Description + +Used to send data to Slack Channel. Both support streaming and batch mode. +> For example, if the data from upstream is [`age: 12, name: huan`], the content send to socket server is the following: `{"name":"huan","age":17}` + + +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +| -------------- |--------|----------|---------------| +| webhooks_url | String | Yes | - | +| oauth_token | String | Yes | - | +| slack_channel | String | Yes | - | +| common-options | | no | - | + +### webhooks_url [string] + +Slack webhook url + +### oauth_token [string] + +Slack oauth token used for the actual authentication + +### slack_channel [string] + +slack channel for data write + +### common options + +Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details + +## Example + +```hocon +sink { + SlackSink { + webhooks_url = "https://hooks.slack.com/services/xxxxxxxxxxxx/xxxxxxxxxxxx/xxxxxxxxxxxxxxxx" + oauth_token = "xoxp-xxxxxxxxxx-xxxxxxxx-xxxxxxxxx-xxxxxxxxxxx" + slack_channel = "channel name" + } +} +``` + +## Changelog + +### new version + +- Add Slack Sink Connector diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 65271a258e1..0100e8ffd4c 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -150,4 +150,5 @@ seatunnel.source.GoogleSheets = connector-google-sheets seatunnel.sink.Tablestore = connector-tablestore seatunnel.source.Lemlist = connector-http-lemlist seatunnel.source.Klaviyo = connector-http-klaviyo +seatunnel.sink.Slack = connector-slack seatunnel.source.OneSignal = connector-http-onesignal diff --git a/seatunnel-connectors-v2/connector-slack/pom.xml b/seatunnel-connectors-v2/connector-slack/pom.xml new file mode 100644 index 00000000000..2bbb1fff010 --- /dev/null +++ b/seatunnel-connectors-v2/connector-slack/pom.xml @@ -0,0 +1,69 @@ + + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-slack + + + 4.5.13 + 1.25.0 + + + + + org.apache.seatunnel + connector-common + ${project.version} + + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + + + com.slack.api + slack-api-client + ${slack-api-client} + + + org.jetbrains.kotlin + kotlin-stdlib-common + + + org.jetbrains.kotlin + kotlin-stdlib + + + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java new file mode 100644 index 00000000000..4a94701749b --- /dev/null +++ b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.slack.client; + +import static org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig.OAUTH_TOKEN; +import static org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig.SLACK_CHANNEL; + +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.slack.api.Slack; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.response.chat.ChatPostMessageResponse; +import com.slack.api.methods.response.conversations.ConversationsListResponse; +import com.slack.api.model.Conversation; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.List; + +@Slf4j +public class SlackClient { + private final Config pluginConfig; + private final MethodsClient methodsClient; + + public SlackClient(Config pluginConfig) { + this.pluginConfig = pluginConfig; + this.methodsClient = Slack.getInstance().methods(); + } + + /** + * Find conversation ID using the conversations.list method + */ + public String findConversation() { + String conversionId = ""; + List channels; + try { + // Get Conversion List + ConversationsListResponse conversationsListResponse = methodsClient.conversationsList(r -> r + // The Token used to initialize app + .token(pluginConfig.getString(OAUTH_TOKEN.key())) + ); + channels = conversationsListResponse.getChannels(); + for (Conversation channel : channels) { + if (channel.getName().equals(pluginConfig.getString(SLACK_CHANNEL.key()))) { + conversionId = channel.getId(); + // Break from for loop + break; + } + } + } catch (IOException | SlackApiException e) { + log.warn("Find Slack Conversion Fail.", e); + throw new SlackConnectorException(SlackConnectorErrorCode.FIND_SLACK_CONVERSATION_FAILED, e); + } + return conversionId; + } + + /** + * Post a message to a channel using Channel ID and message text + */ + public boolean publishMessage(String channelId, String text) { + boolean publishMessageSuccess = false; + try { + ChatPostMessageResponse chatPostMessageResponse = methodsClient.chatPostMessage(r -> r + // The Token used to initialize app + .token(pluginConfig.getString(SLACK_CHANNEL.key())) + .channel(channelId) + .text(text) + ); + publishMessageSuccess = chatPostMessageResponse.isOk(); + } catch (IOException | SlackApiException e) { + log.error("error: {}", ExceptionUtils.getMessage(e)); + } + return publishMessageSuccess; + } + + /** + * Close Conversion + */ + public void closeMethodClient() { + } +} diff --git a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/config/SlackConfig.java b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/config/SlackConfig.java new file mode 100644 index 00000000000..b7235255f07 --- /dev/null +++ b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/config/SlackConfig.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.slack.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class SlackConfig implements Serializable { + + public static final Option WEBHOOKS_URL = + Options.key("webhooks_url") + .stringType() + .noDefaultValue() + .withDescription("Slack webhoooks url"); + + public static final Option OAUTH_TOKEN = + Options.key("oauth_token") + .stringType() + .noDefaultValue() + .withDescription("Slack oauth token"); + + public static final Option SLACK_CHANNEL = + Options.key("slack_channel") + .stringType() + .noDefaultValue() + .withDescription("Slack slack channel"); +} diff --git a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorErrorCode.java b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorErrorCode.java new file mode 100644 index 00000000000..d124dcca182 --- /dev/null +++ b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorErrorCode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.slack.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum SlackConnectorErrorCode implements SeaTunnelErrorCode { + FIND_SLACK_CONVERSATION_FAILED("SLACK-01", "Conversation can not be founded in channels"), + WRITE_TO_SLACK_CHANNEL_FAILED("SLACK-02", "Write to slack channel failed"); + + private final String code; + + private final String description; + + SlackConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } +} diff --git a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorException.java b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorException.java new file mode 100644 index 00000000000..a6f443e5622 --- /dev/null +++ b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/exception/SlackConnectorException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.slack.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class SlackConnectorException extends SeaTunnelRuntimeException { + public SlackConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public SlackConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public SlackConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java new file mode 100644 index 00000000000..a6d3022dd4b --- /dev/null +++ b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.slack.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig; +import org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + + + +/** + * Slack sink class + */ +@AutoService(SeaTunnelSink.class) +public class SlackSink extends AbstractSimpleSink { + + private Config pluginConfig; + private SeaTunnelRowType seaTunnelRowType; + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return this.seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new SlackWriter(seaTunnelRowType, pluginConfig); + } + + @Override + public String getPluginName() { + return "SlackSink"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, SlackConfig.WEBHOOKS_URL.key(), SlackConfig.OAUTH_TOKEN.key(), SlackConfig.SLACK_CHANNEL.key()); + if (!checkResult.isSuccess()) { + throw new SlackConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, checkResult.getMsg())); + } + this.pluginConfig = pluginConfig; + } +} + + + diff --git a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java new file mode 100644 index 00000000000..ccb0572e906 --- /dev/null +++ b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.slack.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class SlackSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Slack"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().required(SlackConfig.WEBHOOKS_URL, SlackConfig.OAUTH_TOKEN, SlackConfig.SLACK_CHANNEL).build(); + } +} diff --git a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java new file mode 100644 index 00000000000..c8201db9824 --- /dev/null +++ b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.slack.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.slack.client.SlackClient; +import org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.StringJoiner; + +@Slf4j +public class SlackWriter extends AbstractSinkWriter { + private final String conversationId; + private final SlackClient slackClient; + private final SeaTunnelRowType seaTunnelRowType; + private static final long POST_MSG_WAITING_TIME = 1500L; + + public SlackWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig) { + this.seaTunnelRowType = seaTunnelRowType; + this.slackClient = new SlackClient(pluginConfig); + this.conversationId = slackClient.findConversation(); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + Object[] fields = element.getFields(); + StringJoiner stringJoiner = new StringJoiner(",", "", "\n"); + for (Object field : fields) { + stringJoiner.add(String.valueOf(field)); + } + String message = stringJoiner.toString(); + try { + slackClient.publishMessage(conversationId, message); + // Slack has a limit on the frequency of sending messages + // One message can be sent as soon as one second + Thread.sleep(POST_MSG_WAITING_TIME); + } catch (Exception e) { + log.error("Write to Slack Fail.", ExceptionUtils.getMessage(e)); + throw new SlackConnectorException(SlackConnectorErrorCode.WRITE_TO_SLACK_CHANNEL_FAILED, e); + } + } + + @Override + public void close() throws IOException { + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 9696421c3a6..4d8776e9cd3 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -62,6 +62,7 @@ connector-cassandra connector-starrocks connector-google-sheets + connector-slack diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index bbdac1d8b5e..c005860e491 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -351,6 +351,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-slack + ${project.version} + provided + @@ -442,4 +448,4 @@ - + \ No newline at end of file