-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Connector-V2][Slack] Add Slack sink connector #3226
Changes from 8 commits
7014a51
9b9df0a
e000574
8b27692
d950956
c550f0d
1ba3084
2d883ec
ee498f8
952927c
51baf1e
6385c38
4f4cc7a
9b53e47
7e0d71b
76d4610
4487f32
f09092b
280d4f5
471e733
9173702
4a9c82d
a76f4b3
b6ab1b4
1667aaa
8b337a7
b9e72b3
7bde171
8ec3500
8d0cd31
3104840
369172c
3900bb7
108c9dc
68dac35
7ff9f3d
6de19f1
7ec9f5f
a8c6423
d67353c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
# Slack | ||
|
||
> Slack sink connector | ||
|
||
## Description | ||
|
||
Used to send data to Slack Channel. Both support streaming and batch mode. | ||
> For example, if the data from upstream is [`age: 12, name: huan`], the content send to socket server is the following: `{"name":"huan","age":17}` | ||
|
||
|
||
## Key features | ||
|
||
- [ ] [exactly-once](../../concept/connector-v2-features.md) | ||
- [ ] [schema projection](../../concept/connector-v2-features.md) | ||
|
||
## Options | ||
|
||
| name | type | required | default value | | ||
| -------------- |--------|----------|---------------| | ||
| webhooks_url | String | Yes | - | | ||
| oauth_token | String | Yes | - | | ||
| slack_channel | String | Yes | - | | ||
| common-options | | no | - | | ||
|
||
### webhooks_url [string] | ||
|
||
Slack webhook url | ||
|
||
### oauth_token [string] | ||
|
||
Slack oauth token used for the actual authentication | ||
|
||
### slack_channel [string] | ||
|
||
slack channel for data write | ||
|
||
### common options | ||
|
||
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ||
|
||
## Example | ||
|
||
```hocon | ||
sink { | ||
SlackSink { | ||
webhooks_url = "https://hooks.slack.com/services/xxxxxxxxxxxx/xxxxxxxxxxxx/xxxxxxxxxxxxxxxx" | ||
oauth_token = "xoxp-xxxxxxxxxx-xxxxxxxx-xxxxxxxxx-xxxxxxxxxxx" | ||
slack_channel = "channel name" | ||
} | ||
} | ||
``` | ||
|
||
## Changelog | ||
|
||
### new version | ||
|
||
- Add Slack Sink Connector |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
|
||
Licensed to the Apache Software Foundation (ASF) under one or more | ||
contributor license agreements. See the NOTICE file distributed with | ||
this work for additional information regarding copyright ownership. | ||
The ASF licenses this file to You under the Apache License, Version 2.0 | ||
(the "License"); you may not use this file except in compliance with | ||
the License. You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
|
||
--> | ||
|
||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>seatunnel-connectors-v2</artifactId> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<version>${revision}</version> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>connector-slack</artifactId> | ||
|
||
<properties> | ||
<httpclient.version>4.5.13</httpclient.version> | ||
<slack-api-client>1.25.0</slack-api-client> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>connector-common</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.httpcomponents</groupId> | ||
<artifactId>httpclient</artifactId> | ||
<version>${httpclient.version}</version> | ||
</dependency> | ||
|
||
<!-- https://mvnrepository.com/artifact/com.slack.api/slack-api-client --> | ||
<dependency> | ||
<groupId>com.slack.api</groupId> | ||
<artifactId>slack-api-client</artifactId> | ||
<version>${slack-api-client}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>org.jetbrains.kotlin</groupId> | ||
<artifactId>kotlin-stdlib-common</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.jetbrains.kotlin</groupId> | ||
<artifactId>kotlin-stdlib</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
</dependencies> | ||
|
||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.client; | ||
|
||
import org.apache.seatunnel.connectors.seatunnel.config.SlackConfig; | ||
|
||
import com.slack.api.Slack; | ||
import com.slack.api.methods.MethodsClient; | ||
import com.slack.api.methods.SlackApiException; | ||
import com.slack.api.methods.response.chat.ChatPostMessageResponse; | ||
import com.slack.api.methods.response.conversations.ConversationsListResponse; | ||
import com.slack.api.model.Conversation; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
@Slf4j | ||
public class SlackClient { | ||
private SlackConfig slackConfig; | ||
private MethodsClient methodsClient; | ||
|
||
public SlackClient(SlackConfig slackConfig) { | ||
this.slackConfig = slackConfig; | ||
this.methodsClient = Slack.getInstance().methods(); | ||
} | ||
|
||
/** | ||
* Find conversation ID using the conversations.list method | ||
*/ | ||
public String findConversation() { | ||
String conversionId = ""; | ||
List<Conversation> channels; | ||
try { | ||
// Get Conversion List | ||
ConversationsListResponse conversationsListResponse = methodsClient.conversationsList(r -> r | ||
// The Token used to initialize app | ||
.token(slackConfig.getOauthToken()) | ||
); | ||
channels = conversationsListResponse.getChannels(); | ||
for (Conversation channel : channels) { | ||
if (channel.getName().equals(slackConfig.getSlackChannel())) { | ||
conversionId = channel.getId(); | ||
// Break from for loop | ||
break; | ||
} | ||
} | ||
} catch (IOException | SlackApiException e) { | ||
log.warn("Find Slack Conversion Fail.", e); | ||
throw new RuntimeException("Find Slack Conversion Fail.", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should create a |
||
} | ||
return conversionId; | ||
} | ||
|
||
/** | ||
* Post a message to a channel using Channel ID and message text | ||
*/ | ||
public boolean publishMessage(String channelId, String text) { | ||
boolean publishMessageSuccess = false; | ||
try { | ||
ChatPostMessageResponse chatPostMessageResponse = methodsClient.chatPostMessage(r -> r | ||
// The Token used to initialize app | ||
.token(slackConfig.getOauthToken()) | ||
.channel(channelId) | ||
.text(text) | ||
); | ||
publishMessageSuccess = chatPostMessageResponse.isOk(); | ||
} catch (IOException | SlackApiException e) { | ||
log.error("error: {}", e.getMessage(), e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "error: {}" only can add one argu, You can update reference this:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have fixed it! Thx |
||
} | ||
return publishMessageSuccess; | ||
} | ||
|
||
/** | ||
* Close Conversion | ||
*/ | ||
public void closeMethodClient() { | ||
methodsClient = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no close() method in Slack API. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,46 @@ | ||||||||||||||
/* | ||||||||||||||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||||
* contributor license agreements. See the NOTICE file distributed with | ||||||||||||||
* this work for additional information regarding copyright ownership. | ||||||||||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||||
* (the "License"); you may not use this file except in compliance with | ||||||||||||||
* the License. You may obtain a copy of the License at | ||||||||||||||
* | ||||||||||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||
* | ||||||||||||||
* Unless required by applicable law or agreed to in writing, software | ||||||||||||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||
* See the License for the specific language governing permissions and | ||||||||||||||
* limitations under the License. | ||||||||||||||
*/ | ||||||||||||||
|
||||||||||||||
package org.apache.seatunnel.connectors.seatunnel.config; | ||||||||||||||
|
||||||||||||||
import org.apache.seatunnel.shade.com.typesafe.config.Config; | ||||||||||||||
|
||||||||||||||
import lombok.Data; | ||||||||||||||
import lombok.NonNull; | ||||||||||||||
|
||||||||||||||
@Data | ||||||||||||||
public class SlackConfig { | ||||||||||||||
|
||||||||||||||
private static final String WEBHOOKS_URL = "webhooks_url"; | ||||||||||||||
private static final String OAUTH_TOKEN = "oauth_token"; | ||||||||||||||
private static final String SLACK_CHANNEL = "slack_channel"; | ||||||||||||||
private String webHooksUrl; | ||||||||||||||
private String oauthToken; | ||||||||||||||
private String slackChannel; | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
public SlackConfig(@NonNull Config pluginConfig) { | ||||||||||||||
if (pluginConfig.hasPath(WEBHOOKS_URL)) { | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your advice, I have done it! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These three parameters have already been checked in the prepare method, and this place does not need to check again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have removed it.PTAL |
||||||||||||||
this.webHooksUrl = pluginConfig.getString(WEBHOOKS_URL); | ||||||||||||||
} | ||||||||||||||
if (pluginConfig.hasPath(OAUTH_TOKEN)) { | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your reminder. |
||||||||||||||
this.oauthToken = pluginConfig.getString(OAUTH_TOKEN); | ||||||||||||||
} | ||||||||||||||
if (pluginConfig.hasPath(SLACK_CHANNEL)) { | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thx |
||||||||||||||
this.slackChannel = pluginConfig.getString(SLACK_CHANNEL); | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.sink; | ||
|
||
import org.apache.seatunnel.api.common.PrepareFailException; | ||
import org.apache.seatunnel.api.sink.SeaTunnelSink; | ||
import org.apache.seatunnel.api.sink.SinkWriter; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelDataType; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelRow; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelRowType; | ||
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; | ||
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; | ||
|
||
import org.apache.seatunnel.shade.com.typesafe.config.Config; | ||
|
||
import com.google.auto.service.AutoService; | ||
|
||
import java.io.IOException; | ||
|
||
|
||
/** | ||
* Slack sink class | ||
*/ | ||
@AutoService(SeaTunnelSink.class) | ||
public class SlackSink extends AbstractSimpleSink<SeaTunnelRow, Void> { | ||
|
||
private Config pluginConfig; | ||
private SeaTunnelRowType seaTunnelRowType; | ||
|
||
@Override | ||
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { | ||
this.seaTunnelRowType = seaTunnelRowType; | ||
} | ||
|
||
@Override | ||
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() { | ||
return this.seaTunnelRowType; | ||
} | ||
|
||
@Override | ||
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException { | ||
return new SlackWriter(seaTunnelRowType, pluginConfig); | ||
} | ||
|
||
@Override | ||
public String getPluginName() { | ||
return "SlackSink"; | ||
} | ||
|
||
@Override | ||
public void prepare(Config pluginConfig) throws PrepareFailException { | ||
this.pluginConfig = pluginConfig; | ||
} | ||
} | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! PTAL