Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Retain last will message
Browse files Browse the repository at this point in the history
Fixes #74
  • Loading branch information
acogoluegnes committed Dec 5, 2016
1 parent 4a1192e commit 7fafe26
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 5 deletions.
9 changes: 7 additions & 2 deletions src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,13 @@ ensure_queue(Qos, #proc_state{ channels = {Channel, _},
{Q, PState}
end.

send_will(PState = #proc_state{ will_msg = WillMsg }) ->
amqp_pub(WillMsg, PState).
send_will(PState = #proc_state{ will_msg = WillMsg, retainer_pid = RPid }) ->
amqp_pub(WillMsg, PState),
#mqtt_msg{retain = Retain, topic = Topic} = WillMsg,
case Retain of
false -> ok;
true -> hand_off_to_retainer(RPid, Topic, WillMsg)
end.

amqp_pub(undefined, PState) ->
PState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,19 @@ public class MqttTest implements MqttCallback {
private Channel ch;

private static int getPort() {
Object port = System.getProperty("mqtt.port");
Object port = System.getProperty("mqtt.port", "1883");
Assert.assertNotNull(port);
return Integer.parseInt(port.toString());
}

private static int getAmqpPort() {
Object port = System.getProperty("amqp.port");
Object port = System.getProperty("amqp.port", "5672");
assertNotNull(port);
return Integer.parseInt(port.toString());
}

private static String getHost() {
Object host = System.getProperty("hostname");
Object host = System.getProperty("hostname", "localhost");
assertNotNull(host);
return host.toString();
}
Expand Down Expand Up @@ -566,6 +566,59 @@ public Socket createSocket() throws IOException {
client2.disconnect();
}

@Test public void willIsRetained() throws MqttException, InterruptedException, IOException {
conOpt.setCleanSession(false);
client2.connect(conOpt);
client2.setCallback(this);
clearRetained(client2, retainedTopic);
client2.subscribe(retainedTopic, 1);
client2.disconnect();

final SocketFactory factory = SocketFactory.getDefault();
final ArrayList<Socket> sockets = new ArrayList<Socket>();
SocketFactory testFactory = new SocketFactory() {
public Socket createSocket(String s, int i) throws IOException {
Socket sock = factory.createSocket(s, i);
sockets.add(sock);
return sock;
}
public Socket createSocket(String s, int i, InetAddress a, int i1) throws IOException {
return null;
}
public Socket createSocket(InetAddress a, int i) throws IOException {
return null;
}
public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) throws IOException {
return null;
}
@Override
public Socket createSocket() throws IOException {
Socket sock = new Socket();
sockets.add(sock);
return sock;
}
};
conOpt.setSocketFactory(testFactory);
MqttTopic willTopic = client.getTopic(retainedTopic);
byte[] willPayload = "willpayload".getBytes();
conOpt.setWill(willTopic, willPayload, 1, true);
conOpt.setCleanSession(true);
client.connect(conOpt);

Assert.assertEquals(1, sockets.size());
sockets.get(0).close();
Thread.sleep(testDelay);

conOpt.setCleanSession(false);
client2.connect(conOpt);
client2.setCallback(this);
client2.subscribe(retainedTopic, 1);

Assert.assertEquals(1, receivedMessages.size());
Assert.assertEquals(true, Arrays.equals(receivedMessages.get(0).getPayload(), willPayload));
client2.disconnect();
}

@Test public void subscribeMultiple() throws MqttException {
client.connect(conOpt);
publish(client, "/topic/1", 1, "msq1-qos1".getBytes());
Expand Down

0 comments on commit 7fafe26

Please sign in to comment.