From 89936b6530690c6d03869b2ad8b82f9f84776f94 Mon Sep 17 00:00:00 2001 From: Philippe Antoine Date: Tue, 17 Oct 2023 10:26:57 +0200 Subject: [PATCH] mqtt: fix logic when setting event Especially sets transactions to complete when we get a response without having seen the request, so that the transactions end up getting cleaned (instead of living/leaking in the state). Also try to set the event on the relevant transaction, instead of creating a new transaction just for the purpose of having the event. Ticket: #6299 --- rust/src/mqtt/mqtt.rs | 141 ++++++++++++++++-------------------------- 1 file changed, 55 insertions(+), 86 deletions(-) diff --git a/rust/src/mqtt/mqtt.rs b/rust/src/mqtt/mqtt.rs index 7f60e2a757cd..fbf03e19af69 100644 --- a/rust/src/mqtt/mqtt.rs +++ b/rust/src/mqtt/mqtt.rs @@ -183,11 +183,11 @@ impl MQTTState { } fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction { - let direction = if toclient { - Direction::ToClient - } else { - Direction::ToServer - }; + let direction = if toclient { + Direction::ToClient + } else { + Direction::ToServer + }; let mut tx = MQTTTransaction::new(msg, direction); self.tx_id += 1; tx.tx_id = self.tx_id; @@ -217,104 +217,82 @@ impl MQTTState { match msg.op { MQTTOperation::CONNECT(ref conn) => { self.protocol_version = conn.protocol_version; + let mut tx = self.new_tx(msg, toclient); + tx.pkt_id = Some(MQTT_CONNECT_PKT_ID); if self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect); - self.transactions.push_back(tx); - } else { - let mut tx = self.new_tx(msg, toclient); - tx.pkt_id = Some(MQTT_CONNECT_PKT_ID); - self.transactions.push_back(tx); } + self.transactions.push_back(tx); } MQTTOperation::PUBLISH(ref publish) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } - match msg.header.qos_level { + let qos = msg.header.qos_level; + let pkt_id = publish.message_id; + let mut tx = self.new_tx(msg, toclient); + match qos { 0 => { // with QOS level 0, we do not need to wait for a // response - let mut tx = self.new_tx(msg, toclient); tx.complete = true; - self.transactions.push_back(tx); } 1..=2 => { - if let Some(pkt_id) = publish.message_id { - let mut tx = self.new_tx(msg, toclient); + if let Some(pkt_id) = pkt_id { tx.pkt_id = Some(pkt_id as u32); - self.transactions.push_back(tx); } else { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId); - self.transactions.push_back(tx); } } _ => { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); - self.transactions.push_back(tx); } } - } - MQTTOperation::SUBSCRIBE(ref subscribe) => { if !self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; } + self.transactions.push_back(tx); + } + MQTTOperation::SUBSCRIBE(ref subscribe) => { let pkt_id = subscribe.message_id as u32; - match msg.header.qos_level { + let qos = msg.header.qos_level; + let mut tx = self.new_tx(msg, toclient); + match qos { 0 => { // with QOS level 0, we do not need to wait for a // response - let mut tx = self.new_tx(msg, toclient); tx.complete = true; - self.transactions.push_back(tx); } 1..=2 => { - let mut tx = self.new_tx(msg, toclient); tx.pkt_id = Some(pkt_id); - self.transactions.push_back(tx); } _ => { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); - self.transactions.push_back(tx); } } - } - MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => { if !self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; } + self.transactions.push_back(tx); + } + MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => { let pkt_id = unsubscribe.message_id as u32; - match msg.header.qos_level { + let qos = msg.header.qos_level; + let mut tx = self.new_tx(msg, toclient); + match qos { 0 => { // with QOS level 0, we do not need to wait for a // response - let mut tx = self.new_tx(msg, toclient); tx.complete = true; - self.transactions.push_back(tx); } 1..=2 => { - let mut tx = self.new_tx(msg, toclient); tx.pkt_id = Some(pkt_id); - self.transactions.push_back(tx); } _ => { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); - self.transactions.push_back(tx); } } + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + self.transactions.push_back(tx); } MQTTOperation::CONNACK(ref _connack) => { if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) { @@ -325,31 +303,24 @@ impl MQTTState { } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect); + tx.complete = true; self.transactions.push_back(tx); } } MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { tx.msg.push(msg); } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; self.transactions.push_back(tx); } } MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { tx.msg.push(msg); tx.complete = true; @@ -357,16 +328,14 @@ impl MQTTState { } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; self.transactions.push_back(tx); } } MQTTOperation::SUBACK(ref suback) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) { tx.msg.push(msg); tx.complete = true; @@ -374,16 +343,14 @@ impl MQTTState { } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; self.transactions.push_back(tx); } } MQTTOperation::UNSUBACK(ref unsuback) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) { tx.msg.push(msg); tx.complete = true; @@ -391,6 +358,10 @@ impl MQTTState { } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; self.transactions.push_back(tx); } } @@ -406,25 +377,19 @@ impl MQTTState { self.transactions.push_back(tx); } MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => { + let mut tx = self.new_tx(msg, toclient); + tx.complete = true; if !self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; } - let mut tx = self.new_tx(msg, toclient); - tx.complete = true; self.transactions.push_back(tx); } MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => { + let mut tx = self.new_tx(msg, toclient); + tx.complete = true; if !self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; } - let mut tx = self.new_tx(msg, toclient); - tx.complete = true; self.transactions.push_back(tx); } } @@ -608,7 +573,11 @@ impl MQTTState { } fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) { - let mut tx = MQTTTransaction::new_empty(if toclient { Direction::ToClient } else { Direction::ToServer }); + let mut tx = MQTTTransaction::new_empty(if toclient { + Direction::ToClient + } else { + Direction::ToServer + }); self.tx_id += 1; tx.tx_id = self.tx_id; if toclient {