Skip to content

Commit

Permalink
mqtt: fix logic when setting event
Browse files Browse the repository at this point in the history
Especially sets transactions to complete when we get a response
without having seen the request, so that the transactions
end up getting cleaned (instead of living/leaking in the state).

Also try to set the event on the relevant transaction, instead
of creating a new transaction just for the purpose of having
the event.

Ticket: OISF#6299
  • Loading branch information
catenacyber authored and victorjulien committed Jan 30, 2024
1 parent 2fb5059 commit 89936b6
Showing 1 changed file with 55 additions and 86 deletions.
141 changes: 55 additions & 86 deletions rust/src/mqtt/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ impl MQTTState {
}

fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
let direction = if toclient {
Direction::ToClient
} else {
Direction::ToServer
};
let direction = if toclient {
Direction::ToClient
} else {
Direction::ToServer
};
let mut tx = MQTTTransaction::new(msg, direction);
self.tx_id += 1;
tx.tx_id = self.tx_id;
Expand Down Expand Up @@ -217,104 +217,82 @@ impl MQTTState {
match msg.op {
MQTTOperation::CONNECT(ref conn) => {
self.protocol_version = conn.protocol_version;
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
if self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
self.transactions.push_back(tx);
}
self.transactions.push_back(tx);
}
MQTTOperation::PUBLISH(ref publish) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
match msg.header.qos_level {
let qos = msg.header.qos_level;
let pkt_id = publish.message_id;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
if let Some(pkt_id) = publish.message_id {
let mut tx = self.new_tx(msg, toclient);
if let Some(pkt_id) = pkt_id {
tx.pkt_id = Some(pkt_id as u32);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
self.transactions.push_back(tx);
}
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::SUBSCRIBE(ref subscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
self.transactions.push_back(tx);
}
MQTTOperation::SUBSCRIBE(ref subscribe) => {
let pkt_id = subscribe.message_id as u32;
match msg.header.qos_level {
let qos = msg.header.qos_level;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
self.transactions.push_back(tx);
}
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
let pkt_id = unsubscribe.message_id as u32;
match msg.header.qos_level {
let qos = msg.header.qos_level;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
self.transactions.push_back(tx);
}
MQTTOperation::CONNACK(ref _connack) => {
if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
Expand All @@ -325,72 +303,65 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::SUBACK(ref suback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::UNSUBACK(ref unsuback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
Expand All @@ -406,25 +377,19 @@ impl MQTTState {
self.transactions.push_back(tx);
}
MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
}
Expand Down Expand Up @@ -608,7 +573,11 @@ impl MQTTState {
}

fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) {
let mut tx = MQTTTransaction::new_empty(if toclient { Direction::ToClient } else { Direction::ToServer });
let mut tx = MQTTTransaction::new_empty(if toclient {
Direction::ToClient
} else {
Direction::ToServer
});
self.tx_id += 1;
tx.tx_id = self.tx_id;
if toclient {
Expand Down

0 comments on commit 89936b6

Please sign in to comment.