diff --git a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java index 92acb42a5fc..b23f9c10e50 100644 --- a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java +++ b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java @@ -171,7 +171,6 @@ private Set buildWhiteList() { Set whiteList = new HashSet<>(); whiteList.add("JdbcSinkOptions"); whiteList.add("TypesenseSourceOptions"); - whiteList.add("RabbitmqSourceOptions"); whiteList.add("TypesenseSinkOptions"); whiteList.add("EmailSinkOptions"); whiteList.add("HudiSinkOptions"); @@ -191,7 +190,6 @@ private Set buildWhiteList() { whiteList.add("MongodbSinkOptions"); whiteList.add("IoTDBSinkOptions"); whiteList.add("EasysearchSourceOptions"); - whiteList.add("RabbitmqSinkOptions"); whiteList.add("StarRocksSourceOptions"); whiteList.add("IcebergSourceOptions"); whiteList.add("HbaseSourceOptions"); diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java index 3f5c862cadf..2efa0ced190 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java @@ -18,7 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.rabbitmq.client; import org.apache.seatunnel.common.Handover; -import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException; import org.apache.commons.lang3.StringUtils; @@ -47,12 +47,12 @@ @Slf4j @AllArgsConstructor public class RabbitmqClient { - private final RabbitmqConfig config; + private final RabbitmqOptions config; private final ConnectionFactory connectionFactory; private final Connection connection; private final Channel channel; - public RabbitmqClient(RabbitmqConfig config) { + public RabbitmqClient(RabbitmqOptions config) { this.config = config; try { this.connectionFactory = getConnectionFactory(); @@ -193,7 +193,7 @@ protected void setupQueue() throws IOException { } } - private void declareQueueDefaults(Channel channel, RabbitmqConfig config) throws IOException { + private void declareQueueDefaults(Channel channel, RabbitmqOptions config) throws IOException { channel.queueDeclare( config.getQueueName(), config.getDurable(), diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqOptions.java similarity index 98% rename from seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java rename to seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqOptions.java index 2872c9af810..67269b3ac76 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqOptions.java @@ -36,7 +36,7 @@ @Setter @Getter @AllArgsConstructor -public class RabbitmqConfig implements Serializable { +public class RabbitmqOptions implements Serializable { private String host; private Integer port; private String virtualHost; @@ -234,7 +234,7 @@ private void parseSinkOptionProperties(Config pluginConfig) { } } - public RabbitmqConfig(Config config) { + public RabbitmqOptions(Config config) { this.host = config.getString(HOST.key()); this.port = config.getInt(PORT.key()); this.queueName = config.getString(QUEUE_NAME.key()); @@ -299,5 +299,5 @@ public RabbitmqConfig(Config config) { } @VisibleForTesting - public RabbitmqConfig() {} + public RabbitmqOptions() {} } diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java new file mode 100644 index 00000000000..4c19334da80 --- /dev/null +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java @@ -0,0 +1,17 @@ +package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +@AllArgsConstructor +public class RabbitmqSinkOptions extends RabbitmqOptions { + + public RabbitmqSinkOptions(Config config) { + super(config); + } +} diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java new file mode 100644 index 00000000000..9e47b80097e --- /dev/null +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java @@ -0,0 +1,16 @@ +package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +@AllArgsConstructor +public class RabbitmqSourceOptions extends RabbitmqOptions { + public RabbitmqSourceOptions(Config config) { + super(config); + } +} diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java index 7d4f26272bf..439ee57151f 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java @@ -31,7 +31,7 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSinkOptions; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException; import com.google.auto.service.AutoService; @@ -39,18 +39,18 @@ import java.io.IOException; import java.util.Optional; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PORT; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.QUEUE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.USERNAME; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.VIRTUAL_HOST; @AutoService(SeaTunnelSink.class) public class RabbitmqSink extends AbstractSimpleSink { private SeaTunnelRowType seaTunnelRowType; private Config pluginConfig; - private RabbitmqConfig rabbitMQConfig; + private RabbitmqSinkOptions rabbitmqSinkOptions; @Override public String getPluginName() { @@ -77,7 +77,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SINK, result.getMsg())); } - rabbitMQConfig = new RabbitmqConfig(pluginConfig); + rabbitmqSinkOptions = new RabbitmqSinkOptions(pluginConfig); } @Override @@ -88,7 +88,7 @@ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { @Override public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { - return new RabbitmqSinkWriter(rabbitMQConfig, seaTunnelRowType); + return new RabbitmqSinkWriter(rabbitmqSinkOptions, seaTunnelRowType); } @Override diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java index 4618d351d93..c1b35518a40 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java @@ -23,20 +23,20 @@ import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.RABBITMQ_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.AUTOMATIC_RECOVERY_ENABLED; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.CONNECTION_TIMEOUT; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.EXCHANGE; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.NETWORK_RECOVERY_INTERVAL; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PORT; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.QUEUE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.RABBITMQ_CONFIG; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.ROUTING_KEY; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.TOPOLOGY_RECOVERY_ENABLED; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.URL; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.USERNAME; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.VIRTUAL_HOST; @AutoService(Factory.class) public class RabbitmqSinkFactory implements TableSinkFactory { diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkWriter.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkWriter.java index a5d7c06b5da..8f9e62caab5 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkWriter.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkWriter.java @@ -21,7 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient; -import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions; import org.apache.seatunnel.format.json.JsonSerializationSchema; import java.util.Optional; @@ -30,7 +30,7 @@ public class RabbitmqSinkWriter extends AbstractSinkWriter { private RabbitmqClient rabbitMQClient; private final JsonSerializationSchema jsonSerializationSchema; - public RabbitmqSinkWriter(RabbitmqConfig config, SeaTunnelRowType seaTunnelRowType) { + public RabbitmqSinkWriter(RabbitmqOptions config, SeaTunnelRowType seaTunnelRowType) { this.rabbitMQClient = new RabbitmqClient(config); this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType); } diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java index 5684b21d4f3..0d6d4d3e7f3 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java @@ -37,7 +37,7 @@ import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSourceOptions; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplitEnumeratorState; @@ -45,12 +45,12 @@ import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PORT; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.QUEUE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.USERNAME; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.VIRTUAL_HOST; @AutoService(SeaTunnelSource.class) public class RabbitmqSource @@ -59,7 +59,7 @@ public class RabbitmqSource private DeserializationSchema deserializationSchema; private JobContext jobContext; - private RabbitmqConfig rabbitMQConfig; + private RabbitmqSourceOptions rabbitmqSourceOptions; @Override public Boundedness getBoundedness() { @@ -70,7 +70,9 @@ public Boundedness getBoundedness() { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, "not support batch job mode")); } - return rabbitMQConfig.isForE2ETesting() ? Boundedness.BOUNDED : Boundedness.UNBOUNDED; + return rabbitmqSourceOptions.isForE2ETesting() + ? Boundedness.BOUNDED + : Boundedness.UNBOUNDED; } @Override @@ -97,7 +99,7 @@ public void prepare(Config config) throws PrepareFailException { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg())); } - this.rabbitMQConfig = new RabbitmqConfig(config); + this.rabbitmqSourceOptions = new RabbitmqSourceOptions(config); setDeserialization(config); } @@ -109,7 +111,8 @@ public SeaTunnelDataType getProducedType() { @Override public SourceReader createReader( SourceReader.Context readerContext) throws Exception { - return new RabbitmqSourceReader(deserializationSchema, readerContext, rabbitMQConfig); + return new RabbitmqSourceReader( + deserializationSchema, readerContext, rabbitmqSourceOptions); } @Override diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java index b809a6039f2..f552bdb29a2 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java @@ -25,24 +25,24 @@ import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.DELIVERY_TIMEOUT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PREFETCH_COUNT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_CHANNEL_MAX; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_FRAME_MAX; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_HEARTBEAT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.AUTOMATIC_RECOVERY_ENABLED; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.CONNECTION_TIMEOUT; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.DELIVERY_TIMEOUT; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.EXCHANGE; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.HOST; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.NETWORK_RECOVERY_INTERVAL; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PORT; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PREFETCH_COUNT; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.QUEUE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.REQUESTED_CHANNEL_MAX; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.REQUESTED_FRAME_MAX; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.REQUESTED_HEARTBEAT; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.ROUTING_KEY; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.TOPOLOGY_RECOVERY_ENABLED; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.URL; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.USERNAME; +import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.VIRTUAL_HOST; @AutoService(Factory.class) public class RabbitmqSourceFactory implements TableSourceFactory { diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java index df80294f147..7b98d349bd1 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java @@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.Handover; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient; -import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit; @@ -67,12 +67,12 @@ public class RabbitmqSourceReader implements SourceReader { private final DeserializationSchema deserializationSchema; private RabbitmqClient rabbitMQClient; private DefaultConsumer consumer; - private final RabbitmqConfig config; + private final RabbitmqOptions config; public RabbitmqSourceReader( DeserializationSchema deserializationSchema, SourceReader.Context context, - RabbitmqConfig config) { + RabbitmqOptions config) { this.handover = new Handover<>(); this.pendingDeliveryTagsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.pendingCorrelationIdsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java index a846949d857..1e7d364c6c8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java @@ -28,7 +28,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.Handover; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient; -import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.TestContainer; @@ -181,7 +181,7 @@ private static Pair> generateTestDataSet() private void initRabbitMQ() { try { - RabbitmqConfig config = new RabbitmqConfig(); + RabbitmqOptions config = new RabbitmqOptions(); config.setHost(rabbitmqContainer.getHost()); config.setPort(rabbitmqContainer.getFirstMappedPort()); config.setQueueName(QUEUE_NAME); @@ -200,7 +200,7 @@ private void initRabbitMQ() { private RabbitmqClient initSinkRabbitMQ() { try { - RabbitmqConfig config = new RabbitmqConfig(); + RabbitmqOptions config = new RabbitmqOptions(); config.setHost(rabbitmqContainer.getHost()); config.setPort(rabbitmqContainer.getFirstMappedPort()); config.setQueueName(SINK_QUEUE_NAME);