Skip to content

Commit 37acfe7

Browse files
committed
Handle subscriptions in connect callback
1 parent 79084c8 commit 37acfe7

File tree

3 files changed

+76
-75
lines changed

3 files changed

+76
-75
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ anyhow = "1.0.57"
88
clap = { version = "3.1.18", features = ["derive"] }
99
ctrlc = "3.2"
1010
env_logger = "0.9.0"
11+
futures = "0.3.21"
1112
humantime-serde = "1.1.1"
1213
log = "0.4.17"
1314
paho-mqtt = { version = "0.11.1", default-features = false, features = ["bundled"] }

src/main.rs

+74-75
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
mod config;
22

33
use crate::config::Config;
4-
use anyhow::{anyhow, Result};
4+
use anyhow::Result;
55
use clap::Parser;
6-
use paho_mqtt::{Client, ConnectOptionsBuilder, CreateOptionsBuilder, Message};
7-
use std::{env, path::PathBuf, thread, time::Duration};
6+
use futures::executor::block_on;
7+
use paho_mqtt::{
8+
AsyncClient, ConnectOptionsBuilder, CreateOptionsBuilder, Message, PersistenceType,
9+
};
10+
use std::{path::PathBuf, time::Duration};
811

912
/// A bridge between a serial port and MQTT
1013
#[derive(Clone, Debug, Parser)]
@@ -21,44 +24,59 @@ fn main() -> Result<()> {
2124
let config = Config::load(&args.config)?;
2225
log::debug!("Config: {:#?}", config);
2326

24-
let client = Client::new(
25-
CreateOptionsBuilder::new()
26-
.server_uri(&config.broker.address)
27-
.client_id(&config.broker.client_id)
28-
.persistence(env::temp_dir())
29-
.finalize(),
30-
)?;
31-
32-
let lwt = Message::new(&config.topics.availability, "offline", 1);
33-
34-
client.connect(
35-
ConnectOptionsBuilder::new()
36-
.user_name(&config.broker.username)
37-
.password(&config.broker.password)
38-
.will_message(lwt.clone())
39-
.finalize(),
40-
)?;
41-
42-
let rx = client.start_consuming();
43-
44-
client.subscribe(&config.topics.transmit, 2)?;
45-
client.subscribe(&config.topics.receive_control, 2)?;
46-
47-
let ctrlc_client = client.clone();
48-
ctrlc::set_handler(move || {
49-
ctrlc_client.stop_consuming();
50-
})?;
51-
52-
client.publish(Message::new(&config.topics.availability, "online", 1))?;
53-
log::info!("Connected to MQTT broker");
54-
55-
log::debug!("Opening serial port with config: {:?}", config.serial);
56-
let mut port = serialport::new(config.serial.device, config.serial.baud)
57-
.timeout(config.serial.timeout)
58-
.open()?;
59-
60-
for msg in rx.iter() {
61-
if let Some(msg) = msg {
27+
block_on(async {
28+
let client = AsyncClient::new(
29+
CreateOptionsBuilder::new()
30+
.server_uri(&config.broker.address)
31+
.client_id(&config.broker.client_id)
32+
.persistence(PersistenceType::None)
33+
.finalize(),
34+
)?;
35+
36+
let transmit_topic = config.topics.transmit.clone();
37+
let receive_control_topic = config.topics.receive_control.clone();
38+
client.set_connected_callback(move |c| {
39+
log::info!("Connected to MQTT broker");
40+
c.subscribe(&transmit_topic, 2);
41+
c.subscribe(&receive_control_topic, 2);
42+
});
43+
44+
let lwt = Message::new(&config.topics.availability, "offline", 1);
45+
46+
let response = client
47+
.connect(
48+
ConnectOptionsBuilder::new()
49+
.clean_session(true)
50+
.automatic_reconnect(Duration::from_secs(1), Duration::from_secs(5))
51+
.user_name(&config.broker.username)
52+
.password(&config.broker.password)
53+
.will_message(lwt.clone())
54+
.finalize(),
55+
)
56+
.await?;
57+
58+
log::info!(
59+
"Using MQTT version {}",
60+
response.connect_response().unwrap().mqtt_version
61+
);
62+
63+
let rx = client.start_consuming();
64+
65+
let ctrlc_client = client.clone();
66+
ctrlc::set_handler(move || {
67+
ctrlc_client.stop_consuming();
68+
})?;
69+
70+
client
71+
.publish(Message::new(&config.topics.availability, "online", 1))
72+
.await?;
73+
74+
log::debug!("Opening serial port with config: {:?}", config.serial);
75+
let mut port = serialport::new(config.serial.device, config.serial.baud)
76+
.timeout(config.serial.timeout)
77+
.open()?;
78+
79+
for msg in rx.iter().flatten() {
6280
if msg.topic() == config.topics.transmit {
6381
log::info!("Tx message: {:?}", msg);
6482
if let Err(e) = port.write(msg.payload()) {
@@ -73,11 +91,14 @@ fn main() -> Result<()> {
7391
match port.read(buffer.as_mut_slice()) {
7492
Ok(rx_bytes) => {
7593
log::info!("Received {} bytes from serial port", rx_bytes);
76-
if let Err(e) = client.publish(Message::new(
77-
&config.topics.receive,
78-
&buffer[..rx_bytes],
79-
1,
80-
)) {
94+
if let Err(e) = client
95+
.publish(Message::new(
96+
&config.topics.receive,
97+
&buffer[..rx_bytes],
98+
1,
99+
))
100+
.await
101+
{
81102
log::error!("Failed publish received bytes via MQTT: {}", e);
82103
}
83104
}
@@ -91,37 +112,15 @@ fn main() -> Result<()> {
91112
}
92113
}
93114
}
94-
} else if !client.is_connected() {
95-
if let Err(e) = try_reconnect(&client) {
96-
log::error!("{}", e);
97-
break;
98-
}
99115
}
100-
}
101-
102-
if client.is_connected() {
103-
client.publish(lwt)?;
104116

105-
log::info!("Disconnecting from MQTT broker");
106-
client.disconnect(None)?;
107-
}
108-
109-
Ok(())
110-
}
117+
if client.is_connected() {
118+
client.publish(lwt).await?;
111119

112-
fn try_reconnect(c: &Client) -> Result<()> {
113-
for i in 0..300 {
114-
log::info!("Attempting reconnection {}...", i);
115-
match c.reconnect() {
116-
Ok(_) => {
117-
log::info!("Reconnection successful");
118-
return Ok(());
119-
}
120-
Err(e) => {
121-
log::error!("Reconnection failed: {}", e);
122-
}
120+
log::info!("Disconnecting from MQTT broker");
121+
client.disconnect(None).await?;
123122
}
124-
thread::sleep(Duration::from_secs(1));
125-
}
126-
Err(anyhow!("Failed to reconnect to broker"))
123+
124+
Ok(())
125+
})
127126
}

0 commit comments

Comments
 (0)