Skip to content
This repository was archived by the owner on Oct 22, 2024. It is now read-only.

Message queue reentrancy issue fix #37

Merged
merged 20 commits into from
Nov 24, 2023
Prev Previous commit
Next Next commit
Disallow reentrancy
Signed-off-by: Oliver Tale-Yazdi <[email protected]>
  • Loading branch information
ggwpez authored and claravanstaden committed Nov 24, 2023
commit 688cfb3e26eb90df44a2a9d53fe1fb56ccb5c4b6
85 changes: 58 additions & 27 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ pub mod pallet {
///
/// This can change at any time and may resolve in the future by re-trying.
QueuePaused,
/// Another call is in progress and needs to finish before this call can happen.
RecursiveDisallowed,
}

/// The index of the first and last (non-empty) pages.
Expand Down Expand Up @@ -869,6 +871,21 @@ impl<T: Config> Pallet<T> {
page_index: PageIndex,
index: T::Size,
weight_limit: Weight,
) -> Result<Weight, Error<T>> {
match with_service_mutex(|| {
Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
}) {
Err(()) => Err(Error::<T>::RecursiveDisallowed),
Ok(x) => x,
}
}

/// Same as `do_execute_overweight` but must be called while holding the `service_mutex`.
fn do_execute_overweight_inner(
origin: MessageOriginOf<T>,
page_index: PageIndex,
index: T::Size,
weight_limit: Weight,
) -> Result<Weight, Error<T>> {
let mut book_state = BookStateFor::<T>::get(&origin);
ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
Expand Down Expand Up @@ -925,6 +942,14 @@ impl<T: Config> Pallet<T> {

/// Remove a stale page or one which has no more messages remaining to be processed.
fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
Ok(x) => x,
}
}

/// Same as `do_reap_page` but must be called while holding the `service_mutex`.
fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
let mut book_state = BookStateFor::<T>::get(origin);
// definitely not reapable if the page's index is no less than the `begin`ning of ready
// pages.
Expand Down Expand Up @@ -1428,35 +1453,40 @@ impl<T: Config> ServiceQueues for Pallet<T> {
Weight::zero()
});

let mut next = match Self::bump_service_head(&mut weight) {
Some(h) => h,
None => return weight.consumed(),
};
// The last queue that did not make any progress.
// The loop aborts as soon as it arrives at this queue again without making any progress
// on other queues in between.
let mut last_no_progress = None;

loop {
let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight);
next = match n {
Some(n) =>
if !progressed {
if last_no_progress == Some(n.clone()) {
break
}
if last_no_progress.is_none() {
last_no_progress = Some(next.clone())
}
n
} else {
last_no_progress = None;
n
},
None => break,
match with_service_mutex(|| {
let mut next = match Self::bump_service_head(&mut weight) {
Some(h) => h,
None => return weight.consumed(),
};
// The last queue that did not make any progress.
// The loop aborts as soon as it arrives at this queue again without making any progress
// on other queues in between.
let mut last_no_progress = None;

loop {
let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight);
next = match n {
Some(n) =>
if !progressed {
if last_no_progress == Some(n.clone()) {
break
}
if last_no_progress.is_none() {
last_no_progress = Some(next.clone())
}
n
} else {
last_no_progress = None;
n
},
None => break,
}
}
weight.consumed()
}) {
Err(()) => weight.consumed(),
Ok(w) => w,
}
weight.consumed()
}

/// Execute a single overweight message.
Expand Down Expand Up @@ -1484,6 +1514,7 @@ impl<T: Config> ServiceQueues for Pallet<T> {
Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
ExecuteOverweightError::NotFound,
Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
_ => ExecuteOverweightError::Other,
},
)
Expand Down