Skip to content

Commit

Permalink
Allow subsystems using priority channels (#77)
Browse files Browse the repository at this point in the history
Co-authored-by: Bernhard Schuster <[email protected]>
  • Loading branch information
AndreiEres and drahnr authored Jun 7, 2024
1 parent c21dd03 commit 3024e64
Show file tree
Hide file tree
Showing 6 changed files with 428 additions and 17 deletions.
15 changes: 12 additions & 3 deletions orchestra/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ pub(crate) fn impl_feature_gated_items(
let channel_name_rx = &cfg_set.channel_names_without_wip("_rx");
let channel_name_unbounded_rx = &info.channel_names_without_wip("_unbounded_rx");

let can_receive_priority_messages = &cfg_set.can_receive_priority_messages_without_wip();

let baggage_name = &info.baggage_names();
let baggage_generic_ty = &info.baggage_generic_types();

Expand Down Expand Up @@ -633,13 +635,20 @@ pub(crate) fn impl_feature_gated_items(
>();

#(
let (#channel_name_tx, #channel_name_rx)
=
let (#channel_name_tx, #channel_name_rx) = if #can_receive_priority_messages {
#support_crate ::metered::channel_with_priority::<
MessagePacket< #maybe_boxed_consumes >
>(
self.channel_capacity.unwrap_or(#message_channel_capacity),
self.channel_capacity.unwrap_or(#message_channel_capacity)
)
} else {
#support_crate ::metered::channel::<
MessagePacket< #maybe_boxed_consumes >
>(
self.channel_capacity.unwrap_or(#message_channel_capacity)
);
)
};
)*

#(
Expand Down
36 changes: 27 additions & 9 deletions orchestra/proc-macro/src/impl_channels_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,27 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
// when no defined messages in enum
impl ChannelsOut {
/// Send a message via a bounded channel.
pub async fn send_and_log_error(
pub async fn send_and_log_error<P: Priority>(
&mut self,
signals_received: usize,
message: #message_wrapper,
message: #message_wrapper
) {
let res: ::std::result::Result<_, _> = match message {
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) => {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
match P::priority() {
PriorityLevel::Normal => {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await
},
PriorityLevel::High => {
self. #channel_name .priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await
},
}.map_err(|_| stringify!( #channel_name ))
}
)*
// subsystems that are wip
Expand Down Expand Up @@ -116,7 +125,7 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
}

/// Try to send a message via a bounded channel.
pub fn try_send(
pub fn try_send<P: Priority>(
&mut self,
signals_received: usize,
message: #message_wrapper,
Expand All @@ -125,9 +134,18 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) => {
self. #channel_name .try_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).map_err(|err| match err {
match P::priority() {
PriorityLevel::Normal => {
self. #channel_name .try_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
)
},
PriorityLevel::High => {
self. #channel_name .try_priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
)
},
}.map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(err_inner) => #support_crate ::metered::TrySendError::Full(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
#support_crate ::metered::TrySendError::Closed(err_inner) => #support_crate ::metered::TrySendError::Closed(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
})
Expand Down
18 changes: 14 additions & 4 deletions orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,21 +278,31 @@ pub(crate) fn impl_subsystem_sender(
{
async fn send_message(&mut self, msg: OutgoingMessage)
{
self.channels.send_and_log_error(
self.send_message_with_priority::<NormalPriority>(msg).await;
}

async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage)
{
self.channels.send_and_log_error::<P>(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
)
),
).await;
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<OutgoingMessage>>
{
self.channels.try_send(
self.try_send_message_with_priority::<NormalPriority>(msg)
}

fn try_send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<OutgoingMessage>>
{
self.channels.try_send::<P>(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
)
),
).map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(inner) => #support_crate ::metered::TrySendError::Full(inner.try_into().expect("we should be able to unwrap what we wrap, qed")),
#support_crate ::metered::TrySendError::Closed(inner) => #support_crate ::metered::TrySendError::Closed(inner.try_into().expect("we should be able to unwrap what we wrap, qed")),
Expand Down
40 changes: 39 additions & 1 deletion orchestra/proc-macro/src/parse/parse_orchestra_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod kw {
syn::custom_keyword!(sends);
syn::custom_keyword!(message_capacity);
syn::custom_keyword!(signal_capacity);
syn::custom_keyword!(can_receive_priority_messages);
}

#[derive(Clone, Debug)]
Expand All @@ -58,6 +59,8 @@ pub(crate) enum SubSysAttrItem {
MessageChannelCapacity(ChannelCapacity<kw::message_capacity>),
/// Custom signal channels capacity for this subsystem
SignalChannelCapacity(ChannelCapacity<kw::signal_capacity>),
/// The subsystem can receive priority messages
CanReceivePriorityMessages(kw::can_receive_priority_messages),
}

impl Parse for SubSysAttrItem {
Expand All @@ -73,6 +76,8 @@ impl Parse for SubSysAttrItem {
Self::MessageChannelCapacity(input.parse::<ChannelCapacity<kw::message_capacity>>()?)
} else if lookahead.peek(kw::signal_capacity) {
Self::SignalChannelCapacity(input.parse::<ChannelCapacity<kw::signal_capacity>>()?)
} else if lookahead.peek(kw::can_receive_priority_messages) {
Self::CanReceivePriorityMessages(input.parse::<kw::can_receive_priority_messages>()?)
} else {
Self::Consumes(input.parse::<Consumes>()?)
})
Expand Down Expand Up @@ -100,6 +105,9 @@ impl ToTokens for SubSysAttrItem {
Self::SignalChannelCapacity(_) => {
quote! {}
},
Self::CanReceivePriorityMessages(can_receive_priority_messages) => {
quote! { #can_receive_priority_messages }
},
};
tokens.extend(ts.into_iter());
}
Expand Down Expand Up @@ -130,6 +138,8 @@ pub(crate) struct SubSysField {
pub(crate) message_capacity: Option<usize>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<usize>,
/// The subsystem can receive priority messages
pub(crate) can_receive_priority_messages: bool,

pub(crate) feature_gates: Option<CfgPredicate>,
}
Expand Down Expand Up @@ -352,6 +362,8 @@ pub(crate) struct SubSystemAttrItems {
pub(crate) message_capacity: Option<ChannelCapacity<kw::message_capacity>>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<ChannelCapacity<kw::signal_capacity>>,
/// The subsystem can receive priority messages
pub(crate) can_receive_priority_messages: bool,
}

impl Parse for SubSystemAttrItems {
Expand Down Expand Up @@ -393,8 +405,18 @@ impl Parse for SubSystemAttrItems {
let wip = extract_variant!(unique, Wip; default = false);
let message_capacity = extract_variant!(unique, MessageChannelCapacity take );
let signal_capacity = extract_variant!(unique, SignalChannelCapacity take );
let can_receive_priority_messages =
extract_variant!(unique, CanReceivePriorityMessages; default = false);

Ok(Self { blocking, wip, sends, consumes, message_capacity, signal_capacity })
Ok(Self {
blocking,
wip,
sends,
consumes,
message_capacity,
signal_capacity,
can_receive_priority_messages,
})
}
}

Expand Down Expand Up @@ -487,6 +509,10 @@ impl<'a> SubsystemConfigSet<'a> {
) -> Vec<LitInt> {
signal_channel_capacities_without_wip(&self.enabled_subsystems, default_capacity)
}

pub(crate) fn can_receive_priority_messages_without_wip(&self) -> Vec<syn::LitBool> {
can_receive_priority_messages_without_wip(&self.enabled_subsystems)
}
}

impl OrchestraInfo {
Expand Down Expand Up @@ -738,6 +764,7 @@ impl OrchestraGuts {
sends,
message_capacity,
signal_capacity,
can_receive_priority_messages,
..
} = subsystem_attrs;

Expand All @@ -761,6 +788,7 @@ impl OrchestraGuts {
message_capacity,
signal_capacity,
feature_gates,
can_receive_priority_messages,
});
} else {
// collect the "baggage"
Expand Down Expand Up @@ -893,3 +921,13 @@ pub(crate) fn consumes_without_wip<'a, T: Borrow<SubSysField>>(subsystems: &[T])
.map(|ssf| ssf.message_to_consume())
.collect::<Vec<_>>()
}

pub(crate) fn can_receive_priority_messages_without_wip(
subsystems: &Vec<&SubSysField>,
) -> Vec<syn::LitBool> {
subsystems
.iter()
.filter(|ssf| !ssf.wip)
.map(|ssf| syn::LitBool::new(ssf.can_receive_priority_messages, ssf.name.span()))
.collect::<Vec<_>>()
}
42 changes: 42 additions & 0 deletions orchestra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,37 @@ where
fn start(self, ctx: Ctx) -> SpawnedSubsystem<E>;
}

/// Priority of messages sending to the individual subsystems.
/// Only for the bounded channel sender.
pub enum PriorityLevel {
/// Normal priority.
Normal,
/// High priority.
High,
}
/// Normal priority.
pub struct NormalPriority;
/// High priority.
pub struct HighPriority;

/// Describes the priority of the message.
pub trait Priority {
/// The priority level.
fn priority() -> PriorityLevel {
PriorityLevel::Normal
}
}
impl Priority for NormalPriority {
fn priority() -> PriorityLevel {
PriorityLevel::Normal
}
}
impl Priority for HighPriority {
fn priority() -> PriorityLevel {
PriorityLevel::High
}
}

/// Sender end of a channel to interface with a subsystem.
#[async_trait::async_trait]
pub trait SubsystemSender<OutgoingMessage>: Clone + Send + 'static
Expand All @@ -503,6 +534,9 @@ where
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: OutgoingMessage);

/// Send a direct message with defined priority to some other `Subsystem`, routed based on message type.
async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage);

/// Tries to send a direct message to some other `Subsystem`, routed based on message type.
/// This method is useful for cases where the message queue is bounded and the message is ok
/// to be dropped if the queue is full. If the queue is full, this method will return an error.
Expand All @@ -512,6 +546,14 @@ where
msg: OutgoingMessage,
) -> Result<(), metered::TrySendError<OutgoingMessage>>;

/// Tries to send a direct message with defined priority to some other `Subsystem`, routed based on message type.
/// If the queue is full, this method will return an error.
/// This method is not async and will not block the current task.
fn try_send_message_with_priority<P: Priority>(
&mut self,
msg: OutgoingMessage,
) -> Result<(), metered::TrySendError<OutgoingMessage>>;

/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
async fn send_messages<I>(&mut self, msgs: I)
where
Expand Down
Loading

0 comments on commit 3024e64

Please sign in to comment.