Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RabbitMQ] Add queue config to service config #942

Merged
merged 9 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "rabbitmq"
version = "3.0.0"
version = "3.0.1"
authors = ["Ballerina"]
keywords = ["service", "client", "messaging", "network", "pubsub"]
repository = "https://github.com/ballerina-platform/module-ballerinax-rabbitmq"
Expand All @@ -18,8 +18,8 @@ path = "./lib/amqp-client-5.18.0.jar"
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "rabbitmq-native"
version = "3.0.0"
path = "../native/build/libs/rabbitmq-native-3.0.0.jar"
version = "3.0.1"
path = "../native/build/libs/rabbitmq-native-3.0.1-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "rabbitmq-compiler-plugin"
class = "io.ballerina.stdlib.rabbitmq.plugin.RabbitmqCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.0.0.jar"
path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.0.1-SNAPSHOT.jar"
4 changes: 2 additions & 2 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ modules = [
[[package]]
org = "ballerina"
name = "crypto"
version = "2.7.0"
version = "2.7.2"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "time"}
Expand Down Expand Up @@ -387,7 +387,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "rabbitmq"
version = "3.0.0"
version = "3.0.1"
dependencies = [
{org = "ballerina", name = "constraint"},
{org = "ballerina", name = "crypto"},
Expand Down
2 changes: 2 additions & 0 deletions ballerina/listener.bal
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,11 @@ public isolated class Listener {
# Configurations required to create a subscription.
#
# + queueName - The name of the queue to be subscribed
# + config - The configurations required to declare a queue
# + autoAck - If false, should manually acknowledge
public type RabbitMQServiceConfig record {|
string queueName;
QueueConfig config?;
boolean autoAck = true;
|};

Expand Down
116 changes: 111 additions & 5 deletions ballerina/tests/rabbitmq_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/lang.'string;
import ballerina/log;
import ballerina/lang.runtime as runtime;
import ballerina/log;
import ballerina/test;

Client? rabbitmqChannel = ();
Expand All @@ -29,6 +28,8 @@ const ACK_QUEUE2 = "MyAckQueue2";
const ACK_QUEUE3 = "MyAckQueue3";
const NACK_QUEUE2 = "MyNackQueue2";
const NACK_QUEUE3 = "MyNackQueue3";
const QUEUE_CONFIG_NEW = "MyDurableQueue";
const QUEUE_CONFIG_DUPLICATE = "MyDurableQueueDup";
const READONLY_MESSAGE_QUEUE = "ReadOnlyMessage";
const READONLY_MESSAGE_QUEUE_CALLER = "ReadOnlyMessageCaller";
const READONLY_REQUEST_QUEUE = "ReadOnlyRequest";
Expand Down Expand Up @@ -139,7 +140,8 @@ const CONSTRAINT_VALID_STRING_QUEUE = "ConstraintValidStringQueue";
const CONSTRAINT_VALID_NUMBER_QUEUE = "ConstraintValidNumberQueue";
const CONSTRAINT_VALID_RECORD_QUEUE = "ConstraintValidRecordQueue";
const CONSTRAINT_DISABLED_VALIDATION_QUEUE = "ConstraintDisabledValidationQueue";

string queueConfigNew = "";
string queueConfigDuplicate = "";
string asyncConsumerMessage = "";
string asyncConsumerMessage2 = "";
string onRequestMessage = "";
Expand Down Expand Up @@ -193,6 +195,7 @@ function setup() returns error? {
check clientObj->queueDeclare(DATA_BINDING_JSON_PUBLISH_QUEUE);
check clientObj->queueDeclare(DATA_BINDING_BYTES_PUBLISH_QUEUE);
check clientObj->queueDeclare(DATA_BINDING_REPLY_QUEUE);
check clientObj->queueDeclare(QUEUE_CONFIG_DUPLICATE, config = {durable: true, autoDelete: true, exclusive: false});
check setup2(clientObj);
}
Listener lis = check new (DEFAULT_HOST, DEFAULT_PORT);
Expand Down Expand Up @@ -387,7 +390,7 @@ public isolated function testDeclareQueueWithArgs() returns error? {
"x-message-ttl": 60000,
"x-expires": 800000
};
check newClient->queueDeclare(queue, { arguments: args });
check newClient->queueDeclare(queue, {arguments: args});
check newClient->close();
return;
}
Expand All @@ -408,7 +411,7 @@ public isolated function testDeclareQueueWithArgsNegative() returns error? {
"x-message-ttl": m,
"x-expires": 800000
};
error? result = newClient->queueDeclare(queue, { arguments: args });
error? result = newClient->queueDeclare(queue, {arguments: args});
if result !is error {
test:assertFail("Error when trying to consume messages using client.");
} else {
Expand Down Expand Up @@ -556,6 +559,53 @@ public function testAsyncConsumer2() returns error? {
return;
}

@test:Config {
dependsOn: [testListener, testSyncConsumer],
groups: ["rabbitmq"]
}
public function testListenerQueueDeclareNew() returns error? {
string message = "Testing Async Consumer With Durable Queue";
Listener? channelListener = rabbitmqListener;
if channelListener is Listener {
check channelListener.attach(queueConfigNewQueue);
check channelListener.'start();
check produceMessage(message, QUEUE_CONFIG_NEW);
runtime:sleep(5);
test:assertEquals(queueConfigNew, message, msg = "Message received does not match.");
}
return;
}

@test:Config {
dependsOn: [testListener, testSyncConsumer],
groups: ["rabbitmq"]
}
public function testListenerQueueDeclareDuplicate() returns error? {
string message = "Testing Async Consumer With Durable Queue Duplicate";
Listener? channelListener = rabbitmqListener;
if channelListener is Listener {
check channelListener.attach(queueConfigDuplicateQueue);
check channelListener.'start();
check produceMessage(message, QUEUE_CONFIG_DUPLICATE);
runtime:sleep(5);
test:assertEquals(queueConfigDuplicate, message, msg = "Message received does not match.");
}
return;
}

@test:Config {
dependsOn: [testListener, testSyncConsumer,testListenerQueueDeclareDuplicate],
groups: ["rabbitmq"]
}
public function testListenerQueueDeclareDuplicateError() returns error? {
Listener channelListener = check new(DEFAULT_HOST, DEFAULT_PORT);
if channelListener is Listener {
error? result = channelListener.attach(queueConfigDuplicateError);
test:assertTrue(result is error, msg = "Error expected when declaring same queue with different properties.");
}
return;
}

@test:Config {
dependsOn: [testListener],
groups: ["rabbitmq"]
Expand Down Expand Up @@ -1164,6 +1214,62 @@ service object {
}
};

Service queueConfigNewQueue =
@ServiceConfig {
queueName: QUEUE_CONFIG_NEW,
config: {
durable: true,
exclusive: true,
autoDelete: true
}
}
service object {
remote function onMessage(BytesMessage message) {
string|error messageContent = 'string:fromBytes(message.content);
if messageContent is string {
queueConfigNew = messageContent;
log:printInfo("The reply message received: " + messageContent);
} else {
log:printError("Error occurred while retrieving the message content.", 'error = messageContent);
}
}
aashikam marked this conversation as resolved.
Show resolved Hide resolved
};

Service queueConfigDuplicateQueue =
@ServiceConfig {
queueName: QUEUE_CONFIG_DUPLICATE,
config: {
durable: true,
exclusive: false,
autoDelete: true
}
}
service object {
remote function onMessage(BytesMessage message) {
string|error messageContent = 'string:fromBytes(message.content);
if messageContent is string {
queueConfigDuplicate = messageContent;
log:printInfo("The reply message received: " + messageContent);
} else {
log:printError("Error occurred while retrieving the message content.", 'error = messageContent);
}
}
};

Service queueConfigDuplicateError =
@ServiceConfig {
queueName: QUEUE_CONFIG_DUPLICATE,
config: {
durable: false,
exclusive: false,
autoDelete: false
}
}
service object {
remote function onMessage(BytesMessage message) {
}
};

function produceMessage(string message, string queueName, string? replyToQueue = ()) returns error? {
Client? clientObj = rabbitmqChannel;
if clientObj is Client {
Expand Down
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

### Fixed
- [Fix error in rabbitmq:Listener trying to re-declare the queue internally with mismatching properties](https://github.com/ballerina-platform/ballerina-library/issues/6629)

## [3.0.0] - 2024-05-06

### Changed
- [[#5069] Remove the definition and the usages of the deprecated rabbitmq:Message record](https://github.com/ballerina-platform/ballerina-library/issues/5069)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class RabbitMQConstants {
static final String RABBITMQ = "rabbitmq";

// Queue configuration constant fields
public static final BString QUEUE_CONFIG = StringUtils.fromString("config");
public static final BString QUEUE_NAME = StringUtils.fromString("queueName");
public static final BString QUEUE_DURABLE = StringUtils.fromString("durable");
public static final BString QUEUE_EXCLUSIVE = StringUtils.fromString("exclusive");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static Object queueDeclare(Environment environment, BObject clientObj,
return null;
}

private static Object getConvertedValue(Object v) {
public static Object getConvertedValue(Object v) {
if (v instanceof BString) {
return ((BString) v).getValue();
} else if (v instanceof Float) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static io.ballerina.runtime.api.constants.RuntimeConstants.ORG_NAME_SEPARATOR;
Expand Down Expand Up @@ -88,7 +90,7 @@ public static Object init(BString host, long port, BObject listenerBObject,
}

public static Object attach(Environment environment, BObject listenerBObject, BObject service,
Object queueName) {
Object queueName) {
runtime = environment.getRuntime();
Channel channel = (Channel) listenerBObject.getNativeData(RabbitMQConstants.CHANNEL_NATIVE_OBJECT);
if (service == null) {
Expand All @@ -102,7 +104,7 @@ public static Object attach(Environment environment, BObject listenerBObject, BO
} catch (IOException e) {
RabbitMQMetricsUtil.reportError(channel, RabbitMQObservabilityConstants.ERROR_TYPE_REGISTER);
return RabbitMQUtils.returnErrorValue("I/O Error occurred while declaring the queue: " +
e.getMessage());
e.getCause().getMessage());
}
if (isStarted()) {
services =
Expand Down Expand Up @@ -156,9 +158,9 @@ public static Object detach(Environment environment, BObject listenerBObject, BO
return RabbitMQUtils.returnErrorValue("Error occurred while detaching the service");
}
listenerBObject.addNativeData(RabbitMQConstants.CONSUMER_SERVICES,
RabbitMQUtils.removeFromList(services, service));
RabbitMQUtils.removeFromList(services, service));
listenerBObject.addNativeData(RabbitMQConstants.STARTED_SERVICES,
RabbitMQUtils.removeFromList(startedServices, service));
RabbitMQUtils.removeFromList(startedServices, service));
RabbitMQMetricsUtil.reportUnsubscription(channel, service);
RabbitMQTracingUtil.traceQueueResourceInvocation(channel, queueName, environment);
return null;
Expand All @@ -167,16 +169,44 @@ public static Object detach(Environment environment, BObject listenerBObject, BO
private static void declareQueueIfNotExists(BObject service, Channel channel) throws IOException {
BMap serviceConfig = (BMap) ((AnnotatableType) TypeUtils.getType(service))
.getAnnotation(StringUtils.fromString(ModuleUtils.getModule().getOrg() + ORG_NAME_SEPARATOR
+ ModuleUtils.getModule().getName() + VERSION_SEPARATOR
+ ModuleUtils.getModule().getVersion() + ":"
+ RabbitMQConstants.SERVICE_CONFIG));
String queueName;
+ ModuleUtils.getModule().getName() + VERSION_SEPARATOR
+ ModuleUtils.getModule().getVersion() + ":"
+ RabbitMQConstants.SERVICE_CONFIG));
String queueName = "";
Map<String, Object> argumentsMap = new HashMap<>();
boolean durable = false;
boolean exclusive = false;
boolean autoDelete = true;


if (service.getNativeData(RabbitMQConstants.QUEUE_NAME.getValue()) != null) {
// if the queue name is given as the service name
queueName = (String) service.getNativeData(RabbitMQConstants.QUEUE_NAME.getValue());
} else {
}

// if serviceConfig is not null, name and configs given in the service config will replace the service name
if (serviceConfig != null) {
queueName = serviceConfig.getStringValue(RabbitMQConstants.QUEUE_NAME).getValue();

if ((BMap<BString, Object>) serviceConfig.getMapValue(RabbitMQConstants.QUEUE_CONFIG) != null) {

@SuppressWarnings(RabbitMQConstants.UNCHECKED)
BMap<BString, Object> queueConfig =
(BMap<BString, Object>) serviceConfig.getMapValue(RabbitMQConstants.QUEUE_CONFIG);
durable = queueConfig.getBooleanValue(RabbitMQConstants.QUEUE_DURABLE);
exclusive = queueConfig.getBooleanValue(RabbitMQConstants.QUEUE_EXCLUSIVE);
autoDelete = queueConfig.getBooleanValue(RabbitMQConstants.QUEUE_AUTO_DELETE);
if (queueConfig.getMapValue(RabbitMQConstants.QUEUE_ARGUMENTS) != null) {
aashikam marked this conversation as resolved.
Show resolved Hide resolved
@SuppressWarnings(RabbitMQConstants.UNCHECKED)
HashMap<BString, Object> queueArgs =
(HashMap<BString, Object>) queueConfig.getMapValue(RabbitMQConstants.QUEUE_ARGUMENTS);
queueArgs.forEach((k, v) -> argumentsMap.put(k.getValue(), ChannelUtils.getConvertedValue(v)));
}
}
}
channel.queueDeclare(queueName, false, false, true, null);

// declare queue with user given values or default set
channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);
RabbitMQMetricsUtil.reportNewQueue(channel, queueName);
}

Expand Down Expand Up @@ -213,9 +243,9 @@ private static boolean getAckMode(BObject service) {
@SuppressWarnings("unchecked")
BMap<BString, Object> serviceConfig = (BMap<BString, Object>) serviceType
.getAnnotation(StringUtils.fromString(ModuleUtils.getModule().getOrg() + ORG_NAME_SEPARATOR
+ ModuleUtils.getModule().getName() + VERSION_SEPARATOR
+ ModuleUtils.getModule().getVersion() + ":"
+ RabbitMQConstants.SERVICE_CONFIG));
+ ModuleUtils.getModule().getName() + VERSION_SEPARATOR
+ ModuleUtils.getModule().getVersion() + ":"
+ RabbitMQConstants.SERVICE_CONFIG));
boolean autoAck = true;
if (serviceConfig != null && serviceConfig.containsKey(RabbitMQConstants.AUTO_ACK)) {
autoAck = serviceConfig.getBooleanValue(RabbitMQConstants.AUTO_ACK);
Expand All @@ -239,7 +269,7 @@ public static Object gracefulStop(BObject listenerBObject) {
connection.close();
} catch (IOException | TimeoutException | ShutdownSignalException exception) {
return RabbitMQUtils.returnErrorValue(RabbitMQConstants.CLOSE_CHANNEL_ERROR
+ exception.getMessage());
+ exception.getMessage());
}
}
return null;
Expand Down
Loading