Skip to content

Commit

Permalink
ensure max outgoing messages per channel is atleast one and ensure to…
Browse files Browse the repository at this point in the history
… return open channel that can still accept messages
  • Loading branch information
vedhavyas committed Jan 21, 2025
1 parent bf75508 commit 73c12b3
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 26 deletions.
3 changes: 3 additions & 0 deletions domains/pallets/messenger/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ mod benchmarks {
last_delivered_message_response_nonce: None,
};
Outbox::<T>::insert((dst_chain_id, channel_id, next_outbox_nonce), req_msg);
OutboxMessageCount::<T>::mutate((dst_chain_id, channel_id), |count| {
*count = count.saturating_add(1u32);
});
// Insert a dummy response message which will be handled during the `relay_message_response` call
let resp_msg: Message<BalanceOf<T>> = Message {
src_chain_id: dst_chain_id,
Expand Down
44 changes: 29 additions & 15 deletions domains/pallets/messenger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,25 +243,21 @@ mod pallet {
/// Used by the dst_chains to verify the message response.
#[pallet::storage]
#[pallet::getter(fn inbox_responses)]
pub(super) type InboxResponses<T: Config> = CountedStorageMap<
_,
Identity,
(ChainId, ChannelId, Nonce),
Message<BalanceOf<T>>,
OptionQuery,
>;
pub(super) type InboxResponses<T: Config> =
StorageMap<_, Identity, (ChainId, ChannelId, Nonce), Message<BalanceOf<T>>, OptionQuery>;

/// Stores the outgoing messages that are awaiting message responses from the dst_chain.
/// Messages are processed in the outbox nonce order of chain's channel.
#[pallet::storage]
#[pallet::getter(fn outbox)]
pub(super) type Outbox<T: Config> = CountedStorageMap<
_,
Identity,
(ChainId, ChannelId, Nonce),
Message<BalanceOf<T>>,
OptionQuery,
>;
pub(super) type Outbox<T: Config> =
StorageMap<_, Identity, (ChainId, ChannelId, Nonce), Message<BalanceOf<T>>, OptionQuery>;

/// Stores the outgoing messages count that are awaiting message responses from the dst_chain.
#[pallet::storage]
#[pallet::getter(fn outbox_message_count)]
pub(super) type OutboxMessageCount<T: Config> =
StorageMap<_, Identity, (ChainId, ChannelId), u32, ValueQuery>;

/// A temporary storage for storing decoded outbox response message between `pre_dispatch_relay_message_response`
/// and `relay_message_response`.
Expand Down Expand Up @@ -545,6 +541,15 @@ mod pallet {

/// Invalid channel reserve fee
InvalidChannelReserveFee,

/// Invalid max outgoing messages
InvalidMaxOutgoingMessages,

/// Message count overflow
MessageCountOverflow,

/// Message count underflow
MessageCountUnderflow,
}

#[pallet::call]
Expand Down Expand Up @@ -907,8 +912,11 @@ mod pallet {
// loop through channels in descending order until open channel is found.
// we always prefer latest opened channel.
while let Some(channel_id) = next_channel_id.checked_sub(ChannelId::one()) {
let message_count = OutboxMessageCount::<T>::get((dst_chain_id, channel_id));
if let Some(channel) = Channels::<T>::get(dst_chain_id, channel_id) {
if channel.state == ChannelState::Open {
if channel.state == ChannelState::Open
&& message_count < channel.max_outgoing_messages
{
return Some((channel_id, channel.fee));
}
}
Expand Down Expand Up @@ -1006,6 +1014,12 @@ mod pallet {
Error::<T>::InvalidChain,
);

// ensure max outgoing messages is atleast 1
ensure!(
init_params.max_outgoing_messages >= 1u32,
Error::<T>::InvalidMaxOutgoingMessages
);

// If the channel owner is in this chain then the channel reserve fee
// must not be empty
ensure!(
Expand Down
23 changes: 21 additions & 2 deletions domains/pallets/messenger/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(not(feature = "std"))]
extern crate alloc;

use crate::pallet::{ChainAllowlist, UpdatedChannels};
use crate::pallet::{ChainAllowlist, OutboxMessageCount, UpdatedChannels};
use crate::{
BalanceOf, ChannelId, ChannelState, Channels, CloseChannelBy, Config, Error, Event,
InboxResponses, MessageWeightTags as MessageWeightTagStore, Nonce, Outbox, OutboxMessageResult,
Expand Down Expand Up @@ -52,7 +52,7 @@ impl<T: Config> Pallet<T> {
|maybe_channel| -> Result<Nonce, DispatchError> {
let channel = maybe_channel.as_mut().ok_or(Error::<T>::MissingChannel)?;
// check if the outbox is full
let count = Outbox::<T>::count();
let count = OutboxMessageCount::<T>::get((dst_chain_id, channel_id));
ensure!(
count < channel.max_outgoing_messages,
Error::<T>::OutboxFull
Expand All @@ -72,6 +72,15 @@ impl<T: Config> Pallet<T> {
.latest_response_received_message_nonce,
};
Outbox::<T>::insert((dst_chain_id, channel_id, next_outbox_nonce), msg);
OutboxMessageCount::<T>::try_mutate(
(dst_chain_id, channel_id),
|count| -> Result<(), DispatchError> {
*count = count
.checked_add(1u32)
.ok_or(Error::<T>::MessageCountOverflow)?;
Ok(())
},
)?;

// update channel state
channel.next_outbox_nonce = next_outbox_nonce
Expand Down Expand Up @@ -340,6 +349,16 @@ impl<T: Config> Pallet<T> {
let req_msg = Outbox::<T>::take((dst_chain_id, channel_id, nonce))
.ok_or(Error::<T>::MissingMessage)?;

OutboxMessageCount::<T>::try_mutate(
(dst_chain_id, channel_id),
|count| -> Result<(), DispatchError> {
*count = count
.checked_sub(1u32)
.ok_or(Error::<T>::MessageCountUnderflow)?;
Ok(())
},
)?;

// clear out box message weight tag
MessageWeightTagStore::<T>::mutate(|maybe_messages| {
let mut messages = maybe_messages.as_mut().cloned().unwrap_or_default();
Expand Down
51 changes: 42 additions & 9 deletions domains/pallets/messenger/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::mock::{
chain_a, chain_b, consensus_chain, storage_proof_of_inbox_message_responses,
storage_proof_of_outbox_messages, AccountId, Balance, TestExternalities,
};
use crate::pallet::OutboxMessageCount;
use crate::{
ChainAllowlist, ChainAllowlistUpdate, Channel, ChannelId, ChannelState, Channels,
CloseChannelBy, Error, FeeModel, Inbox, InboxResponses, InitiateChannelParams, Nonce, Outbox,
Expand Down Expand Up @@ -59,7 +60,10 @@ fn create_channel(chain_id: ChainId, channel_id: ChannelId) {
assert_eq!(channel.next_inbox_nonce, Nonce::zero());
assert_eq!(channel.next_outbox_nonce, Nonce::one());
assert_eq!(channel.latest_response_received_message_nonce, None);
assert_eq!(Outbox::<Runtime>::count(), 1);
assert_eq!(
OutboxMessageCount::<Runtime>::get((chain_id, channel_id)),
1
);
let msg = Outbox::<Runtime>::get((chain_id, channel_id, Nonce::zero())).unwrap();
assert_eq!(msg.dst_chain_id, chain_id);
assert_eq!(msg.channel_id, channel_id);
Expand Down Expand Up @@ -338,6 +342,7 @@ fn send_message_between_chains(
msg: EndpointPayload,
channel_id: ChannelId,
) {
let chain_a_id = chain_a::SelfChainId::get();
let chain_b_id = chain_b::SelfChainId::get();

// send message form outbox
Expand Down Expand Up @@ -368,21 +373,35 @@ fn send_message_between_chains(
// check state on chain_b
chain_b_test_ext.execute_with(|| {
// Outbox, Outbox responses, Inbox, InboxResponses must be empty
assert_eq!(Outbox::<chain_b::Runtime>::count(), 0);
assert_eq!(
OutboxMessageCount::<chain_b::Runtime>::get((chain_a_id, channel_id)),
0
);
assert!(OutboxResponses::<chain_b::Runtime>::get().is_none());
assert!(Inbox::<chain_b::Runtime>::get().is_none());

// latest inbox message response is cleared on next message
assert_eq!(InboxResponses::<chain_b::Runtime>::count(), 1);
assert!(InboxResponses::<chain_b::Runtime>::contains_key((
chain_a_id,
channel_id,
Nonce::one()
)),);
});

// check state on chain_a
chain_a_test_ext.execute_with(|| {
// Outbox, Outbox responses, Inbox, InboxResponses must be empty
assert_eq!(Outbox::<chain_a::Runtime>::count(), 0);
assert_eq!(
OutboxMessageCount::<chain_a::Runtime>::get((chain_b_id, channel_id)),
0
);
assert!(OutboxResponses::<chain_a::Runtime>::get().is_none());
assert!(Inbox::<chain_a::Runtime>::get().is_none());
assert_eq!(InboxResponses::<chain_a::Runtime>::count(), 0);
assert!(!InboxResponses::<chain_a::Runtime>::contains_key((
chain_b_id,
channel_id,
Nonce::one()
)));

let channel = chain_a::Messenger::channels(chain_b_id, channel_id).unwrap();
assert_eq!(
Expand Down Expand Up @@ -435,12 +454,19 @@ fn close_channel_between_chains(
assert_eq!(channel.next_outbox_nonce, Nonce::zero());

// Outbox, Outbox responses, Inbox, InboxResponses must be empty
assert_eq!(Outbox::<chain_b::Runtime>::count(), 0);
assert_eq!(
OutboxMessageCount::<chain_b::Runtime>::get((chain_a_id, channel_id)),
0
);
assert!(OutboxResponses::<chain_b::Runtime>::get().is_none());
assert!(Inbox::<chain_b::Runtime>::get().is_none());

// latest inbox message response is cleared on next message
assert_eq!(InboxResponses::<chain_b::Runtime>::count(), 1);
assert!(InboxResponses::<chain_b::Runtime>::contains_key((
chain_a_id,
channel_id,
Nonce::one()
)));
});

// check channel state be closed on chain_a
Expand All @@ -464,10 +490,17 @@ fn close_channel_between_chains(
}));

// Outbox, Outbox responses, Inbox, InboxResponses must be empty
assert_eq!(Outbox::<chain_a::Runtime>::count(), 0);
assert_eq!(
OutboxMessageCount::<chain_a::Runtime>::get((chain_b_id, channel_id)),
0
);
assert!(OutboxResponses::<chain_a::Runtime>::get().is_none());
assert!(Inbox::<chain_a::Runtime>::get().is_none());
assert_eq!(InboxResponses::<chain_a::Runtime>::count(), 0);
assert!(!InboxResponses::<chain_a::Runtime>::contains_key((
chain_b_id,
channel_id,
Nonce::one()
)));
})
}

Expand Down

0 comments on commit 73c12b3

Please sign in to comment.