Skip to content

Commit

Permalink
Merge pull request #262 from ballerina-platform/fix-issues
Browse files Browse the repository at this point in the history
Allow `onComplete` method for MQTT service declaration
  • Loading branch information
TharmiganK authored Jan 29, 2025
2 parents 29d451a + 0774923 commit 00d4d36
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 22 deletions.
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

[ballerina]
dependencies-toml-version = "2"
distribution-version = "2201.11.0-20241121-075100-c4c87cbc"
distribution-version = "2201.11.0-20250127-101700-a4b67fe5"

[[package]]
org = "ballerina"
Expand Down
35 changes: 17 additions & 18 deletions ballerina/tests/publish_subscribe_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -187,31 +187,30 @@ function publishSubscribeWithMTLSTrustKeyStoresTest() returns error? {
test:assertTrue(receivedMessages.indexOf(message) != ());
}

listener Listener manualAcksListener = new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/manualackstopic", {connectionConfig: mtlsConnConfig, manualAcks: true});

service on manualAcksListener {
remote function onMessage(Message message, Caller caller) returns error? {
log:printInfo(check string:fromBytes(message.payload));
receivedMessages.push(check string:fromBytes(message.payload));
check caller->complete();
}
remote function onError(Error err) returns error? {
log:printError("Error occured ", err);
}
remote function onComplete(DeliveryToken token) returns error? {
log:printInfo("Message delivered " + token.messageId.toString());
}
};

@test:Config {enable: true}
function subscribeWithManualAcks() returns error? {
Listener 'listener = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/manualackstopic", {connectionConfig: mtlsConnConfig, manualAcks: true});
Service manualAcksService = service object {
remote function onMessage(Message message, Caller caller) returns error? {
log:printInfo(check string:fromBytes(message.payload));
receivedMessages.push(check string:fromBytes(message.payload));
check caller->complete();
}
remote function onError(Error err) returns error? {
log:printError("Error occured ", err);
}
remote function onComplete(DeliveryToken token) returns error? {
log:printInfo("Message delivered " + token.messageId.toString());
}
};
check 'listener.attach(manualAcksService);
check 'listener.'start();

Client 'client = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: mtlsConnConfig});
string message = "Test message for manual acks";
_ = check 'client->publish("mqtt/manualackstopic", {payload: message.toBytes()});
runtime:sleep(1);

addListenerAndClientToArray('listener, 'client);
addListenerAndClientToArray(manualAcksListener, 'client);

test:assertTrue(receivedMessages.indexOf(message) != ());
}
Expand Down
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ This file contains all the notable changes done to the Ballerina MQTT package th

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- [Allow onComplete method in the service declaration](https://github.com/ballerina-platform/ballerina-library/issues/7272)

## [1.1.1] - 2024-07-17

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@
import static io.ballerina.stdlib.mqtt.compiler.CompilerPluginTestUtils.getEnvironmentBuilder;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.FUNCTION_SHOULD_BE_REMOTE;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.INVALID_CALLER_PARAMETER;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.INVALID_DELIVERY_TOKEN_PARAM_COUNT;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.INVALID_MESSAGE_PARAMETER;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.INVALID_MULTIPLE_LISTENERS;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.INVALID_REMOTE_FUNCTION;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.INVALID_RESOURCE_FUNCTION;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.INVALID_RETURN_TYPE_ERROR_OR_NIL;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.MUST_HAVE_CALLER_AND_MESSAGE;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.MUST_HAVE_DELIVERY_TOKEN;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.MUST_HAVE_ERROR;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.NO_ON_MESSAGE;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.ONLY_DELIVERY_TOKEN_ALLOWED;
import static io.ballerina.stdlib.mqtt.compiler.PluginConstants.CompilationErrors.ONLY_ERROR_ALLOWED;

/**
Expand Down Expand Up @@ -116,6 +119,14 @@ public void testValidService7() {
Assert.assertEquals(diagnosticResult.errors().size(), 0);
}

@Test(description = "Validate service with onMessage remote function")
public void testValidService8() {
Package currentPackage = loadPackage("valid_service_8");
PackageCompilation compilation = currentPackage.getCompilation();
DiagnosticResult diagnosticResult = compilation.diagnosticResult();
Assert.assertEquals(diagnosticResult.errors().size(), 0);
}

@Test(enabled = true, description = "Service validating return types")
public void testInvalidService1() {
Package currentPackage = loadPackage("invalid_service_1");
Expand Down Expand Up @@ -273,4 +284,34 @@ public void testInvalidService14() {
assertDiagnostic(diagnostic, INVALID_RESOURCE_FUNCTION);
}
}

@Test(description = "Validate parameter in onComplete")
public void testInvalidService15() {
Package currentPackage = loadPackage("invalid_service_15");
PackageCompilation compilation = currentPackage.getCompilation();
DiagnosticResult diagnosticResult = compilation.diagnosticResult();
Assert.assertEquals(diagnosticResult.errors().size(), 1);
Diagnostic diagnostic = (Diagnostic) diagnosticResult.errors().toArray()[0];
assertDiagnostic(diagnostic, ONLY_DELIVERY_TOKEN_ALLOWED);
}

@Test(description = "Validate parameter count in onComplete")
public void testInvalidService16() {
Package currentPackage = loadPackage("invalid_service_16");
PackageCompilation compilation = currentPackage.getCompilation();
DiagnosticResult diagnosticResult = compilation.diagnosticResult();
Assert.assertEquals(diagnosticResult.errors().size(), 1);
Diagnostic diagnostic = (Diagnostic) diagnosticResult.errors().toArray()[0];
assertDiagnostic(diagnostic, INVALID_DELIVERY_TOKEN_PARAM_COUNT);
}

@Test(description = "Validate no delivery token parameter in onComplete")
public void testInvalidService17() {
Package currentPackage = loadPackage("invalid_service_17");
PackageCompilation compilation = currentPackage.getCompilation();
DiagnosticResult diagnosticResult = compilation.diagnosticResult();
Assert.assertEquals(diagnosticResult.errors().size(), 1);
Diagnostic diagnostic = (Diagnostic) diagnosticResult.errors().toArray()[0];
assertDiagnostic(diagnostic, MUST_HAVE_DELIVERY_TOKEN);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[package]
org = "kafka_test"
name = "invalid_service_15"
version = "0.1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2025 WSO2 LLC. (https://www.wso2.com).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/mqtt;
import ballerina/uuid;

mqtt:ListenerConfiguration listenerConfiguration = {
connectionConfig: {
username: "ballerina",
password: "ballerinamqtt"
},
manualAcks: false
};

listener mqtt:Listener mqttSubscriber = check new (mqtt:DEFAULT_URL, uuid:createType1AsString(), "mqtt/test", listenerConfiguration);

service on mqttSubscriber {

private final string var1 = "Mqtt Service";
private final int var2 = 54;

remote function onMessage(mqtt:Message message) returns mqtt:Error? {
}

remote function onComplete(record{} token) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[package]
org = "kafka_test"
name = "invalid_service_16"
version = "0.1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2025 WSO2 LLC. (https://www.wso2.com).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/mqtt;
import ballerina/uuid;

mqtt:ListenerConfiguration listenerConfiguration = {
connectionConfig: {
username: "ballerina",
password: "ballerinamqtt"
},
manualAcks: false
};

listener mqtt:Listener mqttSubscriber = check new (mqtt:DEFAULT_URL, uuid:createType1AsString(), "mqtt/test", listenerConfiguration);

service on mqttSubscriber {

private final string var1 = "Mqtt Service";
private final int var2 = 54;

remote function onMessage(mqtt:Message message) returns mqtt:Error? {
}

remote function onComplete(mqtt:DeliveryToken token, record{} data) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[package]
org = "kafka_test"
name = "invalid_service_17"
version = "0.1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2025 WSO2 LLC. (https://www.wso2.com).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/mqtt;
import ballerina/uuid;

mqtt:ListenerConfiguration listenerConfiguration = {
connectionConfig: {
username: "ballerina",
password: "ballerinamqtt"
},
manualAcks: false
};

listener mqtt:Listener mqttSubscriber = check new (mqtt:DEFAULT_URL, uuid:createType1AsString(), "mqtt/test", listenerConfiguration);

service on mqttSubscriber {

private final string var1 = "Mqtt Service";
private final int var2 = 54;

remote function onMessage(mqtt:Message message) returns mqtt:Error? {
}

remote function onComplete() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[package]
org = "kafka_test"
name = "valid_service_08"
version = "0.1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2025 WSO2 LLC. (https://www.wso2.com).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/mqtt;
import ballerina/uuid;

mqtt:ListenerConfiguration listenerConfiguration = {
connectionConfig: {
username: "ballerina",
password: "ballerinamqtt"
},
manualAcks: false
};

listener mqtt:Listener mqttSubscriber = check new (mqtt:DEFAULT_URL, uuid:createType1AsString(), "mqtt/test", listenerConfiguration);

@display {
label: "mqttService1"
}
service on mqttSubscriber {
remote function onMessage(mqtt:Message message) returns mqtt:Error? {
}

remote function onComplete(mqtt:DeliveryToken token) returns mqtt:Error? {
}
}

@display {
label: "mqttService2"
}
service on mqttSubscriber {
remote function onMessage(mqtt:Message message) returns error? {
}

remote function onComplete(mqtt:DeliveryToken token) returns error? {
}
}

@display {
label: "mqttService3"
}
service on mqttSubscriber {
remote function onMessage(mqtt:Message message) {
}

remote function onComplete(mqtt:DeliveryToken token) {
}
}
Loading

0 comments on commit 00d4d36

Please sign in to comment.