-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector #8581
base: dev
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Override | ||
public TableSink createSink(TableSinkFactoryContext context) { | ||
return () -> new ActivemqSink(context.getOptions(), context.getCatalogTable()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use createSink
to create new instance. Do not use SPI in https://github.com/apache/seatunnel/pull/8581/files#diff-5e51ad244c8ce82135d5edbc4f9a925492b1e479c25f45f8ae722c36bd09edebR45
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.SCHEMA; | ||
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.URI; | ||
|
||
@AutoService(SeaTunnelSource.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please review. I have made corresponding modifications
# Conflicts: # seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/client/ActivemqClient.java # seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/config/ActivemqSinkOptions.java # seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
@@ -27,19 +24,11 @@ Used to write data to Activemq. | |||
| disable_timeStamps_by_default | boolean | no | - | | |||
| use_compression | boolean | no | - | | |||
| always_session_async | boolean | no | - | | |||
| dispatch_async | boolean | no | - | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removed this? cc @asapekia
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parameter applies to consumers, not producers
|
||
@Override | ||
public void pollNext(Collector output) throws Exception { | ||
consumer.setMessageListener( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this method will block until message consume finished?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Asynchronous processing, does not block, triggers a callback when a message arrives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is async process, the message will lose sometimes. Because the no more element evnet will be send to downstream before some messgae.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Activemq source connector mainly uses the flow mode, which is used for development testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any better suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will break our interface definition. After the pollNext
method returns, no data should continue to be generated. I think we should introduce CompletableFuture
to let pollNext
block until message consume finished. Please refer
Line 116 in a85fce4
CompletableFuture<Void> completableFuture = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add blocking queue, thread pool, CompletableFuture to let pollNext blocking until message consume finished, and concurrent processing. please review.
Please review. I added message acknowledge mechanism @Hisoka-X |
@@ -36,7 +36,7 @@ public class SeaTunnelEngineLocalExample { | |||
|
|||
public static void main(String[] args) | |||
throws FileNotFoundException, URISyntaxException, CommandException { | |||
String configurePath = args.length > 0 ? args[0] : "/examples/fake_to_console.conf"; | |||
String configurePath = args.length > 0 ? args[0] : "/examples/realtime-sync.conf"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please revert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recovered
…s. blocking until message consume and concurrent processing.
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note
.