Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] RabbitMq connector options #8633

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ private Set<String> buildWhiteList() {
Set<String> whiteList = new HashSet<>();
whiteList.add("JdbcSinkOptions");
whiteList.add("TypesenseSourceOptions");
whiteList.add("RabbitmqSourceOptions");
whiteList.add("TypesenseSinkOptions");
whiteList.add("EmailSinkOptions");
whiteList.add("HudiSinkOptions");
Expand All @@ -191,7 +190,6 @@ private Set<String> buildWhiteList() {
whiteList.add("MongodbSinkOptions");
whiteList.add("IoTDBSinkOptions");
whiteList.add("EasysearchSourceOptions");
whiteList.add("RabbitmqSinkOptions");
whiteList.add("StarRocksSourceOptions");
whiteList.add("IcebergSourceOptions");
whiteList.add("HbaseSourceOptions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.client;

import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -47,12 +47,12 @@
@Slf4j
@AllArgsConstructor
public class RabbitmqClient {
private final RabbitmqConfig config;
private final RabbitmqOptions config;
private final ConnectionFactory connectionFactory;
private final Connection connection;
private final Channel channel;

public RabbitmqClient(RabbitmqConfig config) {
public RabbitmqClient(RabbitmqOptions config) {
this.config = config;
try {
this.connectionFactory = getConnectionFactory();
Expand Down Expand Up @@ -193,7 +193,7 @@ protected void setupQueue() throws IOException {
}
}

private void declareQueueDefaults(Channel channel, RabbitmqConfig config) throws IOException {
private void declareQueueDefaults(Channel channel, RabbitmqOptions config) throws IOException {
channel.queueDeclare(
config.getQueueName(),
config.getDurable(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@Setter
@Getter
@AllArgsConstructor
public class RabbitmqConfig implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should split Config and Options into two classes.
The Options class is used to record the parameters we can pass from the configuration file (external).
The RabbitmqConfig here is a class use to pass parameteres internal.

So, I think we need to split them. Please share your thoughts with me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt get it, could you please explain using an example

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take a look at this:

AmazonDynamoDB Connector

In this connector, we have Options classes (Source, Sink). Each class only contains the variables that the user can configure in the config file (What I call external).

image

Additionally, we have a Config class, which is a utility class that helps pass the user parameters to the DynamoDbSinkClient (What I call internal).

image

public class RabbitmqOptions implements Serializable {
private String host;
private Integer port;
private String virtualHost;
Expand Down Expand Up @@ -234,7 +234,7 @@ private void parseSinkOptionProperties(Config pluginConfig) {
}
}

public RabbitmqConfig(Config config) {
public RabbitmqOptions(Config config) {
this.host = config.getString(HOST.key());
this.port = config.getInt(PORT.key());
this.queueName = config.getString(QUEUE_NAME.key());
Expand Down Expand Up @@ -299,5 +299,5 @@ public RabbitmqConfig(Config config) {
}

@VisibleForTesting
public RabbitmqConfig() {}
public RabbitmqOptions() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Setter
@Getter
@AllArgsConstructor
public class RabbitmqSinkOptions extends RabbitmqOptions {

public RabbitmqSinkOptions(Config config) {
super(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Setter
@Getter
@AllArgsConstructor
public class RabbitmqSourceOptions extends RabbitmqOptions {
public RabbitmqSourceOptions(Config config) {
super(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,26 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PORT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.VIRTUAL_HOST;

@AutoService(SeaTunnelSink.class)
public class RabbitmqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private SeaTunnelRowType seaTunnelRowType;
private Config pluginConfig;
private RabbitmqConfig rabbitMQConfig;
private RabbitmqSinkOptions rabbitmqSinkOptions;

@Override
public String getPluginName() {
Expand All @@ -77,7 +77,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
rabbitMQConfig = new RabbitmqConfig(pluginConfig);
rabbitmqSinkOptions = new RabbitmqSinkOptions(pluginConfig);
}

@Override
Expand All @@ -88,7 +88,7 @@ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new RabbitmqSinkWriter(rabbitMQConfig, seaTunnelRowType);
return new RabbitmqSinkWriter(rabbitmqSinkOptions, seaTunnelRowType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.RABBITMQ_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.AUTOMATIC_RECOVERY_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.CONNECTION_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.EXCHANGE;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.NETWORK_RECOVERY_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PORT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.RABBITMQ_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.ROUTING_KEY;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.TOPOLOGY_RECOVERY_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.URL;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.VIRTUAL_HOST;

@AutoService(Factory.class)
public class RabbitmqSinkFactory implements TableSinkFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

import java.util.Optional;
Expand All @@ -30,7 +30,7 @@ public class RabbitmqSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private RabbitmqClient rabbitMQClient;
private final JsonSerializationSchema jsonSerializationSchema;

public RabbitmqSinkWriter(RabbitmqConfig config, SeaTunnelRowType seaTunnelRowType) {
public RabbitmqSinkWriter(RabbitmqOptions config, SeaTunnelRowType seaTunnelRowType) {
this.rabbitMQClient = new RabbitmqClient(config);
this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplitEnumeratorState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PORT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.VIRTUAL_HOST;

@AutoService(SeaTunnelSource.class)
public class RabbitmqSource
Expand All @@ -59,7 +59,7 @@ public class RabbitmqSource

private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private JobContext jobContext;
private RabbitmqConfig rabbitMQConfig;
private RabbitmqSourceOptions rabbitmqSourceOptions;

@Override
public Boundedness getBoundedness() {
Expand All @@ -70,7 +70,9 @@ public Boundedness getBoundedness() {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, "not support batch job mode"));
}
return rabbitMQConfig.isForE2ETesting() ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
return rabbitmqSourceOptions.isForE2ETesting()
? Boundedness.BOUNDED
: Boundedness.UNBOUNDED;
}

@Override
Expand All @@ -97,7 +99,7 @@ public void prepare(Config config) throws PrepareFailException {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
this.rabbitMQConfig = new RabbitmqConfig(config);
this.rabbitmqSourceOptions = new RabbitmqSourceOptions(config);
setDeserialization(config);
}

Expand All @@ -109,7 +111,8 @@ public SeaTunnelDataType getProducedType() {
@Override
public SourceReader<SeaTunnelRow, RabbitmqSplit> createReader(
SourceReader.Context readerContext) throws Exception {
return new RabbitmqSourceReader(deserializationSchema, readerContext, rabbitMQConfig);
return new RabbitmqSourceReader(
deserializationSchema, readerContext, rabbitmqSourceOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.DELIVERY_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PREFETCH_COUNT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_CHANNEL_MAX;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_FRAME_MAX;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_HEARTBEAT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.AUTOMATIC_RECOVERY_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.CONNECTION_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.DELIVERY_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.EXCHANGE;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.NETWORK_RECOVERY_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PORT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.PREFETCH_COUNT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.REQUESTED_CHANNEL_MAX;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.REQUESTED_FRAME_MAX;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.REQUESTED_HEARTBEAT;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.ROUTING_KEY;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.TOPOLOGY_RECOVERY_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.URL;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions.VIRTUAL_HOST;

@AutoService(Factory.class)
public class RabbitmqSourceFactory implements TableSourceFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqOptions;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit;

Expand Down Expand Up @@ -67,12 +67,12 @@ public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
private RabbitmqClient rabbitMQClient;
private DefaultConsumer consumer;
private final RabbitmqConfig config;
private final RabbitmqOptions config;

public RabbitmqSourceReader(
DeserializationSchema<SeaTunnelRow> deserializationSchema,
SourceReader.Context context,
RabbitmqConfig config) {
RabbitmqOptions config) {
this.handover = new Handover<>();
this.pendingDeliveryTagsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
this.pendingCorrelationIdsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
Expand Down
Loading