Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Rabbitmq] Allow configuration of queue durability and deletion policy #7365

Merged
merged 11 commits into from
Aug 29, 2024
39 changes: 39 additions & 0 deletions docs/en/connector-v2/sink/Rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ convenience method for setting the fields in an AMQP URI: host, port, username,

the queue to write the message to

### durable [boolean]

true: The queue will survive a server restart.
false: The queue will be deleted on server restart.

### exclusive [boolean]

true: The queue is used only by the current connection and will be deleted when the connection closes.
false: The queue can be used by multiple connections.

### auto_delete [boolean]

true: The queue will be deleted automatically when the last consumer unsubscribes.
false: The queue will not be automatically deleted.

### schema [Config]

#### fields [Config]
Expand Down Expand Up @@ -112,6 +127,30 @@ sink {
}
```

### Example 2

queue with durable, exclusive, auto_delete:

```hocon
sink {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test1"
durable = "true"
exclusive = "false"
auto_delete = "false"
rabbitmq.config = {
requested-heartbeat = 10
connection-timeout = 10
}
}
}
```

## Changelog

### next version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,16 @@ public void close() {

protected void setupQueue() throws IOException {
if (config.getQueueName() != null) {
declareQueueDefaults(channel, config.getQueueName());
declareQueueDefaults(channel, config);
}
}

private void declareQueueDefaults(Channel channel, String queueName) throws IOException {
channel.queueDeclare(queueName, true, false, false, null);
private void declareQueueDefaults(Channel channel, RabbitmqConfig config) throws IOException {
channel.queueDeclare(
config.getQueueName(),
config.getDurable(),
config.getExclusive(),
config.getAutoDelete(),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class RabbitmqConfig implements Serializable {
private Integer prefetchCount;
private long deliveryTimeout;
private String queueName;
private Boolean durable;
private Boolean exclusive;
private Boolean autoDelete;
private String routingKey;
private boolean logFailuresOnly = false;
private String exchange = "";
Expand Down Expand Up @@ -195,6 +198,30 @@ public class RabbitmqConfig implements Serializable {
"Whether the messages received are supplied with a unique"
+ "id to deduplicate messages (in case of failed acknowledgments).");

public static final Option<Boolean> DURABLE =
Options.key("durable")
.booleanType()
.defaultValue(true)
.withDescription(
"true: The queue will survive a server restart."
+ "false: The queue will be deleted on server restart.");

public static final Option<Boolean> EXCLUSIVE =
Options.key("exclusive")
.booleanType()
.defaultValue(false)
.withDescription(
"true: The queue is used only by the current connection and will be deleted when the connection closes."
+ "false: The queue can be used by multiple connections.");

public static final Option<Boolean> AUTO_DELETE =
Options.key("auto_delete")
.booleanType()
.defaultValue(false)
.withDescription(
"true: The queue will be deleted automatically when the last consumer unsubscribes."
+ "false: The queue will not be automatically deleted.");

private void parseSinkOptionProperties(Config pluginConfig) {
if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key())) {
pluginConfig
Expand Down Expand Up @@ -259,6 +286,15 @@ public RabbitmqConfig(Config config) {
if (config.hasPath(USE_CORRELATION_ID.key())) {
this.usesCorrelationId = config.getBoolean(USE_CORRELATION_ID.key());
}
if (config.hasPath(DURABLE.key())) {
this.durable = config.getBoolean(DURABLE.key());
}
if (config.hasPath(EXCLUSIVE.key())) {
this.exclusive = config.getBoolean(EXCLUSIVE.key());
}
if (config.hasPath(AUTO_DELETE.key())) {
this.autoDelete = config.getBoolean(AUTO_DELETE.key());
}
parseSinkOptionProperties(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class RabbitmqIT extends TestSuiteBase implements TestResource {
private static final String SINK_QUEUE_NAME = "test1";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final Boolean DURABLE = true;
private static final Boolean EXCLUSIVE = false;
private static final Boolean AUTO_DELETE = false;

private static final Pair<SeaTunnelRowType, List<SeaTunnelRow>> TEST_DATASET =
generateTestDataSet();
Expand Down Expand Up @@ -185,6 +188,9 @@ private void initRabbitMQ() {
config.setVirtualHost("/");
config.setUsername(USERNAME);
config.setPassword(PASSWORD);
config.setDurable(DURABLE);
config.setExclusive(EXCLUSIVE);
config.setAutoDelete(AUTO_DELETE);
rabbitmqClient = new RabbitmqClient(config);
} catch (Exception e) {
throw new RuntimeException("init Rabbitmq error", e);
Expand All @@ -201,6 +207,9 @@ private RabbitmqClient initSinkRabbitMQ() {
config.setVirtualHost("/");
config.setUsername(USERNAME);
config.setPassword(PASSWORD);
config.setDurable(DURABLE);
config.setExclusive(EXCLUSIVE);
config.setAutoDelete(AUTO_DELETE);
return new RabbitmqClient(config);
} catch (Exception e) {
throw new RuntimeException("init Rabbitmq error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ source {
username = "guest"
password = "guest"
queue_name = "test"
durable = "true"
exclusive = "false"
auto_delete = "false"
for_e2e_testing = true
schema = {
fields {
Expand Down Expand Up @@ -61,6 +64,9 @@ sink {
virtual_host = "/"
username = "guest"
password = "guest"
durable = "true"
exclusive = "false"
auto_delete = "false"
queue_name = "test1"
}
}
Loading