Skip to content

Commit

Permalink
Merge pull request #942 from aashikam/queueissue
Browse files Browse the repository at this point in the history
[RabbitMQ] Add queue config to service config
  • Loading branch information
aashikam authored Jun 19, 2024
2 parents 1beb3e5 + e113656 commit 7bdd8fe
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 26 deletions.
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "rabbitmq"
version = "3.0.0"
version = "3.0.1"
authors = ["Ballerina"]
keywords = ["service", "client", "messaging", "network", "pubsub"]
repository = "https://github.com/ballerina-platform/module-ballerinax-rabbitmq"
Expand All @@ -18,8 +18,8 @@ path = "./lib/amqp-client-5.18.0.jar"
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "rabbitmq-native"
version = "3.0.0"
path = "../native/build/libs/rabbitmq-native-3.0.0.jar"
version = "3.0.1"
path = "../native/build/libs/rabbitmq-native-3.0.1-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "rabbitmq-compiler-plugin"
class = "io.ballerina.stdlib.rabbitmq.plugin.RabbitmqCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.0.0.jar"
path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.0.1-SNAPSHOT.jar"
4 changes: 2 additions & 2 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ modules = [
[[package]]
org = "ballerina"
name = "crypto"
version = "2.7.0"
version = "2.7.2"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "time"}
Expand Down Expand Up @@ -387,7 +387,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "rabbitmq"
version = "3.0.0"
version = "3.0.1"
dependencies = [
{org = "ballerina", name = "constraint"},
{org = "ballerina", name = "crypto"},
Expand Down
2 changes: 2 additions & 0 deletions ballerina/listener.bal
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,11 @@ public isolated class Listener {
# Configurations required to create a subscription.
#
# + queueName - The name of the queue to be subscribed
# + config - The configurations required to declare a queue
# + autoAck - If false, should manually acknowledge
public type RabbitMQServiceConfig record {|
string queueName;
QueueConfig config?;
boolean autoAck = true;
|};

Expand Down
116 changes: 111 additions & 5 deletions ballerina/tests/rabbitmq_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/lang.'string;
import ballerina/log;
import ballerina/lang.runtime as runtime;
import ballerina/log;
import ballerina/test;

Client? rabbitmqChannel = ();
Expand All @@ -29,6 +28,8 @@ const ACK_QUEUE2 = "MyAckQueue2";
const ACK_QUEUE3 = "MyAckQueue3";
const NACK_QUEUE2 = "MyNackQueue2";
const NACK_QUEUE3 = "MyNackQueue3";
const QUEUE_CONFIG_NEW = "MyDurableQueue";
const QUEUE_CONFIG_DUPLICATE = "MyDurableQueueDup";
const READONLY_MESSAGE_QUEUE = "ReadOnlyMessage";
const READONLY_MESSAGE_QUEUE_CALLER = "ReadOnlyMessageCaller";
const READONLY_REQUEST_QUEUE = "ReadOnlyRequest";
Expand Down Expand Up @@ -139,7 +140,8 @@ const CONSTRAINT_VALID_STRING_QUEUE = "ConstraintValidStringQueue";
const CONSTRAINT_VALID_NUMBER_QUEUE = "ConstraintValidNumberQueue";
const CONSTRAINT_VALID_RECORD_QUEUE = "ConstraintValidRecordQueue";
const CONSTRAINT_DISABLED_VALIDATION_QUEUE = "ConstraintDisabledValidationQueue";

string queueConfigNew = "";
string queueConfigDuplicate = "";
string asyncConsumerMessage = "";
string asyncConsumerMessage2 = "";
string onRequestMessage = "";
Expand Down Expand Up @@ -193,6 +195,7 @@ function setup() returns error? {
check clientObj->queueDeclare(DATA_BINDING_JSON_PUBLISH_QUEUE);
check clientObj->queueDeclare(DATA_BINDING_BYTES_PUBLISH_QUEUE);
check clientObj->queueDeclare(DATA_BINDING_REPLY_QUEUE);
check clientObj->queueDeclare(QUEUE_CONFIG_DUPLICATE, config = {durable: true, autoDelete: true, exclusive: false});
check setup2(clientObj);
}
Listener lis = check new (DEFAULT_HOST, DEFAULT_PORT);
Expand Down Expand Up @@ -387,7 +390,7 @@ public isolated function testDeclareQueueWithArgs() returns error? {
"x-message-ttl": 60000,
"x-expires": 800000
};
check newClient->queueDeclare(queue, { arguments: args });
check newClient->queueDeclare(queue, {arguments: args});
check newClient->close();
return;
}
Expand All @@ -408,7 +411,7 @@ public isolated function testDeclareQueueWithArgsNegative() returns error? {
"x-message-ttl": m,
"x-expires": 800000
};
error? result = newClient->queueDeclare(queue, { arguments: args });
error? result = newClient->queueDeclare(queue, {arguments: args});
if result !is error {
test:assertFail("Error when trying to consume messages using client.");
} else {
Expand Down Expand Up @@ -556,6 +559,53 @@ public function testAsyncConsumer2() returns error? {
return;
}

@test:Config {
dependsOn: [testListener, testSyncConsumer],
groups: ["rabbitmq"]
}
public function testListenerQueueDeclareNew() returns error? {
string message = "Testing Async Consumer With Durable Queue";
Listener? channelListener = rabbitmqListener;
if channelListener is Listener {
check channelListener.attach(queueConfigNewQueue);
check channelListener.'start();
check produceMessage(message, QUEUE_CONFIG_NEW);
runtime:sleep(5);
test:assertEquals(queueConfigNew, message, msg = "Message received does not match.");
}
return;
}

@test:Config {
dependsOn: [testListener, testSyncConsumer],
groups: ["rabbitmq"]
}
public function testListenerQueueDeclareDuplicate() returns error? {
string message = "Testing Async Consumer With Durable Queue Duplicate";
Listener? channelListener = rabbitmqListener;
if channelListener is Listener {
check channelListener.attach(queueConfigDuplicateQueue);
check channelListener.'start();
check produceMessage(message, QUEUE_CONFIG_DUPLICATE);
runtime:sleep(5);
test:assertEquals(queueConfigDuplicate, message, msg = "Message received does not match.");
}
return;
}

@test:Config {
dependsOn: [testListener, testSyncConsumer,testListenerQueueDeclareDuplicate],
groups: ["rabbitmq"]
}
public function testListenerQueueDeclareDuplicateError() returns error? {
Listener channelListener = check new(DEFAULT_HOST, DEFAULT_PORT);
if channelListener is Listener {
error? result = channelListener.attach(queueConfigDuplicateError);
test:assertTrue(result is error, msg = "Error expected when declaring same queue with different properties.");
}
return;
}

@test:Config {
dependsOn: [testListener],
groups: ["rabbitmq"]
Expand Down Expand Up @@ -1164,6 +1214,62 @@ service object {
}
};

Service queueConfigNewQueue =
@ServiceConfig {
queueName: QUEUE_CONFIG_NEW,
config: {
durable: true,
exclusive: true,
autoDelete: true
}
}
service object {
remote function onMessage(BytesMessage message) {
string|error messageContent = 'string:fromBytes(message.content);
if messageContent is string {
queueConfigNew = messageContent;
log:printInfo("The reply message received: " + messageContent);
} else {
log:printError("Error occurred while retrieving the message content.", 'error = messageContent);
}
}
};

Service queueConfigDuplicateQueue =
@ServiceConfig {
queueName: QUEUE_CONFIG_DUPLICATE,
config: {
durable: true,
exclusive: false,
autoDelete: true
}
}
service object {
remote function onMessage(BytesMessage message) {
string|error messageContent = 'string:fromBytes(message.content);
if messageContent is string {
queueConfigDuplicate = messageContent;
log:printInfo("The reply message received: " + messageContent);
} else {
log:printError("Error occurred while retrieving the message content.", 'error = messageContent);
}
}
};

Service queueConfigDuplicateError =
@ServiceConfig {
queueName: QUEUE_CONFIG_DUPLICATE,
config: {
durable: false,
exclusive: false,
autoDelete: false
}
}
service object {
remote function onMessage(BytesMessage message) {
}
};

function produceMessage(string message, string queueName, string? replyToQueue = ()) returns error? {
Client? clientObj = rabbitmqChannel;
if clientObj is Client {
Expand Down
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

### Fixed
- [Fix error in rabbitmq:Listener trying to re-declare the queue internally with mismatching properties](https://github.com/ballerina-platform/ballerina-library/issues/6629)

## [3.0.0] - 2024-05-06

### Changed
- [[#5069] Remove the definition and the usages of the deprecated rabbitmq:Message record](https://github.com/ballerina-platform/ballerina-library/issues/5069)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class RabbitMQConstants {
static final String RABBITMQ = "rabbitmq";

// Queue configuration constant fields
public static final BString QUEUE_CONFIG = StringUtils.fromString("config");
public static final BString QUEUE_NAME = StringUtils.fromString("queueName");
public static final BString QUEUE_DURABLE = StringUtils.fromString("durable");
public static final BString QUEUE_EXCLUSIVE = StringUtils.fromString("exclusive");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static Object queueDeclare(Environment environment, BObject clientObj,
return null;
}

private static Object getConvertedValue(Object v) {
public static Object getConvertedValue(Object v) {
if (v instanceof BString) {
return ((BString) v).getValue();
} else if (v instanceof Float) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static io.ballerina.runtime.api.constants.RuntimeConstants.ORG_NAME_SEPARATOR;
Expand Down Expand Up @@ -88,7 +90,7 @@ public static Object init(BString host, long port, BObject listenerBObject,
}

public static Object attach(Environment environment, BObject listenerBObject, BObject service,
Object queueName) {
Object queueName) {
runtime = environment.getRuntime();
Channel channel = (Channel) listenerBObject.getNativeData(RabbitMQConstants.CHANNEL_NATIVE_OBJECT);
if (service == null) {
Expand All @@ -102,7 +104,7 @@ public static Object attach(Environment environment, BObject listenerBObject, BO
} catch (IOException e) {
RabbitMQMetricsUtil.reportError(channel, RabbitMQObservabilityConstants.ERROR_TYPE_REGISTER);
return RabbitMQUtils.returnErrorValue("I/O Error occurred while declaring the queue: " +
e.getMessage());
e.getCause().getMessage());
}
if (isStarted()) {
services =
Expand Down Expand Up @@ -156,9 +158,9 @@ public static Object detach(Environment environment, BObject listenerBObject, BO
return RabbitMQUtils.returnErrorValue("Error occurred while detaching the service");
}
listenerBObject.addNativeData(RabbitMQConstants.CONSUMER_SERVICES,
RabbitMQUtils.removeFromList(services, service));
RabbitMQUtils.removeFromList(services, service));
listenerBObject.addNativeData(RabbitMQConstants.STARTED_SERVICES,
RabbitMQUtils.removeFromList(startedServices, service));
RabbitMQUtils.removeFromList(startedServices, service));
RabbitMQMetricsUtil.reportUnsubscription(channel, service);
RabbitMQTracingUtil.traceQueueResourceInvocation(channel, queueName, environment);
return null;
Expand All @@ -167,16 +169,44 @@ public static Object detach(Environment environment, BObject listenerBObject, BO
private static void declareQueueIfNotExists(BObject service, Channel channel) throws IOException {
BMap serviceConfig = (BMap) ((AnnotatableType) TypeUtils.getType(service))
.getAnnotation(StringUtils.fromString(ModuleUtils.getModule().getOrg() + ORG_NAME_SEPARATOR
+ ModuleUtils.getModule().getName() + VERSION_SEPARATOR
+ ModuleUtils.getModule().getVersion() + ":"
+ RabbitMQConstants.SERVICE_CONFIG));
String queueName;
+ ModuleUtils.getModule().getName() + VERSION_SEPARATOR
+ ModuleUtils.getModule().getVersion() + ":"
+ RabbitMQConstants.SERVICE_CONFIG));
String queueName = "";
Map<String, Object> argumentsMap = new HashMap<>();
boolean durable = false;
boolean exclusive = false;
boolean autoDelete = true;


if (service.getNativeData(RabbitMQConstants.QUEUE_NAME.getValue()) != null) {
// if the queue name is given as the service name
queueName = (String) service.getNativeData(RabbitMQConstants.QUEUE_NAME.getValue());
} else {
}

// if serviceConfig is not null, name and configs given in the service config will replace the service name
if (serviceConfig != null) {
queueName = serviceConfig.getStringValue(RabbitMQConstants.QUEUE_NAME).getValue();

if ((BMap<BString, Object>) serviceConfig.getMapValue(RabbitMQConstants.QUEUE_CONFIG) != null) {

@SuppressWarnings(RabbitMQConstants.UNCHECKED)
BMap<BString, Object> queueConfig =
(BMap<BString, Object>) serviceConfig.getMapValue(RabbitMQConstants.QUEUE_CONFIG);
durable = queueConfig.getBooleanValue(RabbitMQConstants.QUEUE_DURABLE);
exclusive = queueConfig.getBooleanValue(RabbitMQConstants.QUEUE_EXCLUSIVE);
autoDelete = queueConfig.getBooleanValue(RabbitMQConstants.QUEUE_AUTO_DELETE);
if (queueConfig.getMapValue(RabbitMQConstants.QUEUE_ARGUMENTS) != null) {
@SuppressWarnings(RabbitMQConstants.UNCHECKED)
HashMap<BString, Object> queueArgs =
(HashMap<BString, Object>) queueConfig.getMapValue(RabbitMQConstants.QUEUE_ARGUMENTS);
queueArgs.forEach((k, v) -> argumentsMap.put(k.getValue(), ChannelUtils.getConvertedValue(v)));
}
}
}
channel.queueDeclare(queueName, false, false, true, null);

// declare queue with user given values or default set
channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);
RabbitMQMetricsUtil.reportNewQueue(channel, queueName);
}

Expand Down Expand Up @@ -213,9 +243,9 @@ private static boolean getAckMode(BObject service) {
@SuppressWarnings("unchecked")
BMap<BString, Object> serviceConfig = (BMap<BString, Object>) serviceType
.getAnnotation(StringUtils.fromString(ModuleUtils.getModule().getOrg() + ORG_NAME_SEPARATOR
+ ModuleUtils.getModule().getName() + VERSION_SEPARATOR
+ ModuleUtils.getModule().getVersion() + ":"
+ RabbitMQConstants.SERVICE_CONFIG));
+ ModuleUtils.getModule().getName() + VERSION_SEPARATOR
+ ModuleUtils.getModule().getVersion() + ":"
+ RabbitMQConstants.SERVICE_CONFIG));
boolean autoAck = true;
if (serviceConfig != null && serviceConfig.containsKey(RabbitMQConstants.AUTO_ACK)) {
autoAck = serviceConfig.getBooleanValue(RabbitMQConstants.AUTO_ACK);
Expand All @@ -239,7 +269,7 @@ public static Object gracefulStop(BObject listenerBObject) {
connection.close();
} catch (IOException | TimeoutException | ShutdownSignalException exception) {
return RabbitMQUtils.returnErrorValue(RabbitMQConstants.CLOSE_CHANNEL_ERROR
+ exception.getMessage());
+ exception.getMessage());
}
}
return null;
Expand Down

0 comments on commit 7bdd8fe

Please sign in to comment.