Skip to content

Commit

Permalink
[mqtt][homie] fix integration tests (openhab#5915)
Browse files Browse the repository at this point in the history
Signed-off-by: Jan N. Klug <[email protected]>
Signed-off-by: Maximilian Hess <[email protected]>
  • Loading branch information
J-N-K authored and ne0h committed Sep 15, 2019
1 parent 6ddf0db commit d595b21
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 34 deletions.
10 changes: 6 additions & 4 deletions itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ Fragment-Host: org.openhab.binding.mqtt.homie
io.netty.handler;version='[4.1.34,4.1.35)',\
io.netty.resolver;version='[4.1.34,4.1.35)',\
io.netty.transport;version='[4.1.34,4.1.35)',\
org.apache.servicemix.bundles.commons-codec;version='[1.3.0,1.3.1)',\
org.openhab.io.mqttembeddedbroker;version='[2.5.0,2.5.1)',\
io.moquette.moquette-broker;version='[0.12.1,0.12.2)',\
tec.uom.lib.uom-lib-common;version='[1.0.3,1.0.4)',\
tec.uom.se;version='[1.0.10,1.0.11)',\
ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
org.apache.servicemix.bundles.jaxb-impl;version='[2.2.11,2.2.12)'
org.apache.servicemix.bundles.jaxb-impl;version='[2.2.11,2.2.12)',\
jaxb-api;version='[2.2.11,2.2.12)',\
slf4j.simple;version='[1.7.21,1.7.22)',\
moquette-broker;version='[0.13.0,0.13.1)',\
org.apache.commons.codec;version='[1.10.0,1.10.1)'
-runvm: -Dio.netty.noUnsafe=true
6 changes: 3 additions & 3 deletions itests/org.openhab.binding.mqtt.homie.tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.openhab.osgiify</groupId>
<artifactId>io.moquette.moquette-broker</artifactId>
<version>0.12.1</version>
<groupId>com.github.j-n-k</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.13.0.OH2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -42,6 +44,9 @@
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.openhab.binding.mqtt.generic.ChannelState;
import org.openhab.binding.mqtt.generic.tools.ChildMap;
import org.openhab.binding.mqtt.generic.tools.WaitForTopicValue;
import org.openhab.binding.mqtt.homie.internal.handler.HomieThingHandler;
import org.openhab.binding.mqtt.homie.internal.homie300.Device;
import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes;
Expand All @@ -53,12 +58,6 @@
import org.openhab.binding.mqtt.homie.internal.homie300.PropertyAttributes;
import org.openhab.binding.mqtt.homie.internal.homie300.PropertyAttributes.DataTypeEnum;
import org.openhab.binding.mqtt.homie.internal.homie300.PropertyHelper;
import org.openhab.binding.mqtt.generic.ChannelState;
import org.openhab.binding.mqtt.generic.tools.ChildMap;
import org.openhab.binding.mqtt.generic.tools.WaitForTopicValue;
import org.osgi.service.cm.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A full implementation test, that starts the embedded MQTT broker and publishes a homie device tree.
Expand Down Expand Up @@ -89,7 +88,8 @@ public class HomieImplementationTest extends JavaOSGiTest {
* Create an observer that fails the test as soon as the broker client connection changes its connection state
* to something else then CONNECTED.
*/
private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state, is(MqttConnectionState.CONNECTED));
private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
is(MqttConnectionState.CONNECTED));

private String propertyTestTopic;

Expand All @@ -99,20 +99,19 @@ public void setUp() throws InterruptedException, ExecutionException, TimeoutExce
initMocks(this);
mqttService = getService(MqttService.class);

// Wait for the EmbeddedBrokerService internal connection to be connected
embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
embeddedConnection.setQos(1);
embeddedConnection.setRetain(true);

connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
embeddedConnection.isSecure(), "homie");
connection.setQos(1);
connection.start().get(200, TimeUnit.MILLISECONDS);
connection.setPersistencePath(Paths.get("subconn"));
connection.start().get(500, TimeUnit.MILLISECONDS);
assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
// If the connection state changes in between -> fail
connection.addConnectionObserver(failIfChange);

embeddedConnection.setRetain(true);
embeddedConnection.setQos(1);

List<CompletableFuture<Boolean>> futures = new ArrayList<>();
futures.add(embeddedConnection.publish(DEVICE_TOPIC + "/$homie", "3.0".getBytes()));
futures.add(embeddedConnection.publish(DEVICE_TOPIC + "/$name", "Name".getBytes()));
Expand All @@ -131,7 +130,7 @@ public void setUp() throws InterruptedException, ExecutionException, TimeoutExce
futures.add(embeddedConnection.publish(property, "10".getBytes()));
futures.add(embeddedConnection.publish(property + "/$name", "Testprop".getBytes()));
futures.add(embeddedConnection.publish(property + "/$settable", "true".getBytes()));
futures.add(embeddedConnection.publish(property + "/$unit", "°C".getBytes()));
futures.add(embeddedConnection.publish(property + "/$unit", "°C".getBytes(StandardCharsets.UTF_8)));
futures.add(embeddedConnection.publish(property + "/$datatype", "float".getBytes()));
futures.add(embeddedConnection.publish(property + "/$format", "-100:100".getBytes()));

Expand All @@ -148,9 +147,9 @@ public void setUp() throws InterruptedException, ExecutionException, TimeoutExce
futures.add(embeddedConnection.publish(propertyTestTopic + "/$datatype", "boolean".getBytes()));

registeredTopics = futures.size();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(200, TimeUnit.MILLISECONDS);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);

scheduler = new ScheduledThreadPoolExecutor(4);
scheduler = new ScheduledThreadPoolExecutor(6);
}

@After
Expand All @@ -164,16 +163,18 @@ public void tearDown() throws InterruptedException, ExecutionException, TimeoutE

@Test
public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
CountDownLatch c = new CountDownLatch(registeredTopics);
connection.subscribe(DEVICE_TOPIC + "/#", (topic, payload) -> c.countDown()).get(200, TimeUnit.MILLISECONDS);
assertTrue("Connection " + connection.getClientId() + " not retrieving all topics",
c.await(1000, TimeUnit.MILLISECONDS));
// four topics are not under /testnode !
CountDownLatch c = new CountDownLatch(registeredTopics - 4);
connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5000,
TimeUnit.MILLISECONDS);
assertTrue("Connection " + connection.getClientId() + " not retrieving all topics ",
c.await(5000, TimeUnit.MILLISECONDS));
}

@Test
public void retrieveOneAttribute() throws InterruptedException, ExecutionException {
WaitForTopicValue watcher = new WaitForTopicValue(connection, DEVICE_TOPIC + "/$homie");
assertThat(watcher.waitForTopicValue(100), is("3.0"));
assertThat(watcher.waitForTopicValue(1000), is("3.0"));
}

@SuppressWarnings("null")
Expand All @@ -189,23 +190,23 @@ public void retrieveAttributes() throws InterruptedException, ExecutionException
// Create a scheduler
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(4);

property.subscribe(connection, scheduler, 100).get();
property.subscribe(connection, scheduler, 500).get();

assertThat(property.attributes.settable, is(true));
assertThat(property.attributes.retained, is(true));
assertThat(property.attributes.name, is("Testprop"));
assertThat(property.attributes.unit, is("°C"));
assertThat(property.attributes.datatype, is(DataTypeEnum.float_));
assertThat(property.attributes.format, is("-100:100"));
verify(property).attributesReceived();
waitForAssert(()->assertThat(property.attributes.format, is("-100:100")));
verify(property, timeout(500).atLeastOnce()).attributesReceived();

// Receive property value
ChannelState channelState = spy(property.getChannelState());
PropertyHelper.setChannelState(property, channelState);

property.startChannel(connection, scheduler, 200).get();
property.startChannel(connection, scheduler, 500).get();
verify(channelState).start(any(), any(), anyInt());
verify(channelState).processMessage(any(), any());
verify(channelState, timeout(500)).processMessage(any(), any());
verify(callback).updateChannelState(any(), any());

assertThat(property.getChannelState().getCache().getChannelState(), is(new DecimalType(10)));
Expand Down Expand Up @@ -246,7 +247,7 @@ public void parseHomieTree() throws InterruptedException, ExecutionException, Ti

// initialize the device, subscribe and wait.
device.initialize(BASE_TOPIC, DEVICE_ID, Collections.emptyList());
device.subscribe(connection, scheduler, 200).get();
device.subscribe(connection, scheduler, 1500).get();

assertThat(device.isInitialized(), is(true));

Expand Down Expand Up @@ -304,7 +305,7 @@ public void parseHomieTree() throws InterruptedException, ExecutionException, Ti
WaitForTopicValue watcher = new WaitForTopicValue(embeddedConnection, propertyTestTopic + "/set");
// Watch the topic. Publish a retain=false value to MQTT
property.getChannelState().publishValue(OnOffType.OFF).get();
assertThat(watcher.waitForTopicValue(50), is("false"));
assertThat(watcher.waitForTopicValue(1000), is("false"));

// Publish a retain=false value to MQTT.
property.getChannelState().publishValue(OnOffType.ON).get();
Expand Down

0 comments on commit d595b21

Please sign in to comment.